"""Base classes for solar activity indicators."""
import logging
import re
import urllib
import numpy as np
import pandas as pd
from pathlib import Path
from abc import ABC, abstractmethod, abstractproperty, abstractstaticmethod
from collections import namedtuple
from scipy.interpolate import InterpolatedUnivariateSpline
_Loader_Dtypes_Columns = namedtuple("LoaderNamedTuple", ["dtypes", "columns"])
_data_date_pattern = re.compile(r"\d{4}\d{2}\d{2}")
pd.set_option("mode.chained_assignment", "raise")
[docs]
class Base(ABC):
"""Abstract base class providing a logger interface."""
@property
def logger(self):
"""``logging.Logger`` attached to the instance."""
return self._logger
def _init_logger(self):
logger = logging.getLogger(
name="{}.{}".format(__name__, self.__class__.__name__)
)
self._logger = logger
def __str__(self):
return self.__class__.__name__
[docs]
class ID(Base):
"""Container for identifying a particular data product."""
[docs]
def __init__(self, key: str) -> None:
"""Instantiate the identifier and set the corresponding URL.
Parameters
----------
key : str
Key that maps to a URL fragment in :py:attr:`_trans_url`.
"""
self._init_logger()
self.set_key(key)
@abstractproperty
def _url_base(self):
pass
@abstractproperty
def _trans_url(self):
pass
@property
def key(self):
return self._key
@property
def url(self):
r"""URL specifying location from which data was downloaded."""
return self._url
[docs]
def set_key(self, key):
"""Set the identifier key and construct the download URL."""
try:
url_end = self._trans_url[key]
except KeyError:
msg = "{} key unavailable".format(key)
self.logger.exception(msg)
raise NotImplementedError(msg)
url = urllib.parse.urljoin(self._url_base, url_end)
self.logger.info("Setting key\nkey : %s\nurl : %s", key, url)
self._key = key
self._url = url
[docs]
class DataLoader(Base):
[docs]
def __init__(self, key, url):
r"""Initialize a data loader.
Parameters
----------
key : str
Unique data identifier, typically from something like
:class:`SIDC_ID`.
url : str
Full download URL for the data source.
"""
self._init_logger()
self.set_key(key)
self.set_url(url)
self.get_data_ctime()
self.get_data_age()
@abstractproperty
def data_path(self):
# return Path(__file__).parent / "data"
return Path.home() / "solarwindpy" / "data"
[docs]
@abstractstaticmethod
def convert_nans(data):
pass
[docs]
@abstractmethod
def download_data(self):
pass
[docs]
@abstractmethod
def load_data(self):
self.logger.info(f"""Loading {self.key!s} data""")
self.maybe_update_stale_data()
today = pd.to_datetime("today").strftime("%Y%m%d")
fpath = (self.data_path / today).with_suffix(".csv")
data = pd.read_csv(fpath, index_col=0, header=0)
data.set_index(pd.DatetimeIndex(data.index), inplace=True)
self._data = data
@property
def logger(self):
return self._logger
@property
def data(self):
return self._data
@property
def key(self):
return self._key
@property
def url(self):
return self._url
@property
def ctime(self):
return self._ctime
@property
def age(self):
return self._age
def _init_logger(self):
logger = logging.getLogger(
name="{}.{}".format(__name__, self.__class__.__name__)
)
self._logger = logger
[docs]
def set_key(self, key):
self._key = key
[docs]
def set_url(self, new):
self._url = new
[docs]
def get_data_ctime(self):
r"""Determine when the current data set was created.
Returns
-------
pandas.Timestamp
Creation time inferred from the file name, or ``1970-01-01`` if no
prior data are found.
"""
path = self.data_path
files = [str(x) for x in path.rglob("*.csv")]
dates = [re.findall(_data_date_pattern, f) for f in files]
if not len(dates):
# This will automatically return a date in 1970, so we
# will clear and download new data.
self._ctime = pd.to_datetime(0)
return
dates = np.concatenate(dates)
dates = [x.strip("/") for x in dates]
dates = np.unique(dates)
# BUG: This fails if there are more than two dated data directories.
# if dates.size > 1:
# raise ValueError("Too many dates: %s" % ", ".join(dates.astype(str)))
# elif not dates.size:
# ctime = pd.to_datetime(0)
# else:
# ctime = pd.to_datetime(dates[0])
assert dates.size == 1
ctime = pd.to_datetime(dates[0])
self.logger.info("Local data ctime %s", ctime)
self._ctime = ctime
[docs]
def get_data_age(self):
ctime = self.ctime
today = pd.to_datetime("today")
dt = today - ctime
self._data_age = dt
[docs]
def maybe_update_stale_data(self):
r"""Download new data if the existing cache is stale."""
self.logger.info("Updating stale data")
old_ctime = self.ctime
new_ctime = pd.to_datetime("today")
if new_ctime.date() > old_ctime.date():
old_data_path = self.data_path / old_ctime.strftime("%Y%m%d")
new_data_path = self.data_path / new_ctime.strftime("%Y%m%d")
new_data_path.parent.mkdir(parents=True, exist_ok=True)
self.download_data(new_data_path, old_data_path)
[docs]
class ActivityIndicator(Base):
@property
def id(self):
return self._id
@property
def loader(self):
return self._loader
@property
def data(self):
r"""Shortcut to `self.loader.data`."""
return self.loader.data
@property
def extrema(self):
return self._extrema
@property
def norm_by(self):
try:
return self._norm_by
except AttributeError:
raise AttributeError("Please calculate normalized quantity")
@property
def interpolated(self):
return self._interpolated
[docs]
def set_id(self, new):
assert isinstance(new, ID)
self._id = new
# Need to store `interpolated` in subclass. Allows normalized ssn to be interpolated.
[docs]
@abstractmethod
def interpolate_data(self, source_data, target_index):
"""Interpolate ``source_data`` onto ``target_index``.
Parameters
----------
source_data : pandas.Series or pandas.DataFrame
Data with a :class:`~pandas.DatetimeIndex` to interpolate.
target_index : pandas.DatetimeIndex
Target time axis for the interpolation.
Returns
-------
pandas.DataFrame
Data interpolated onto ``target_index``.
"""
assert isinstance(target_index, pd.DatetimeIndex)
assert isinstance(source_data.index, pd.DatetimeIndex)
if isinstance(source_data, pd.Series):
source_data = source_data.to_frame()
nans = source_data.isna().any().any()
if nans:
raise NotImplementedError(
"You must drop NaNs in the subclass's caller before "
"calling parent method."
)
x_target = target_index.asi8
x_source = source_data.index.asi8
interpolated = {}
for k, y_source in source_data.items():
interpolator = InterpolatedUnivariateSpline(
x_source, y_source, check_finite=True
)
interped = interpolator(x_target)
interpolated[k] = interped
interpolated = pd.DataFrame(interpolated, index=target_index)
# Remove extrapolated data
tk = pd.Series(True, target_index)
t1 = target_index[-1]
s1 = source_data.index[-1]
if s1 < t1:
tk = tk & (target_index <= s1)
t0 = target_index[0]
s0 = source_data.index[0]
if t0 < s0:
tk = tk & (s0 <= target_index)
if not tk.all():
interpolated.where(tk, inplace=True, axis=0)
self._interpolated = interpolated
return interpolated
@abstractproperty
def normalized(self):
pass
[docs]
@abstractmethod
def set_extrema(self):
pass
[docs]
@abstractmethod
def run_normalization(self):
r"""Normalize the indicator within each solar cycle.
Parameters
----------
norm_by : {{"max", "zscore", "feature-scale"}}
Normalization algorithm to apply.
Returns
-------
pandas.Series
Normalized values indexed by time.
"""
pass
def _run_normalization(self, indicator, norm_fcn):
cut = self.extrema.cut_spec_by_interval(indicator.index, kind="Cycle")
joint = pd.concat(
[indicator, cut], axis=1, keys=["indicator", "cycle"]
).sort_index(axis=1)
grouped = joint.groupby("cycle")
normed = {}
for k, g in grouped:
g = g.loc[:, "indicator"]
normed[k] = norm_fcn(g)
normed = pd.concat(normed.values(), axis=0).sort_index()
return normed
[docs]
class IndicatorExtrema(Base):
"""Base class for objects describing indicator extrema."""
[docs]
def __init__(self, *args, **kwargs):
self._init_logger()
self.load_or_set_data(*args, **kwargs)
self.calculate_intervals()
@property
def data(self):
return self._data
@property
def cycle_intervals(self):
r""":class:`pd.Interval` for rising and falling edges and full cycle."""
return self._cycle_intervals
@property
def extrema_bands(self):
r"""Bands of time (:math:`\Delta t`) about indicator extrema.
Parameters
----------
dt : str or pandas.Timedelta
Window half-width used in :meth:`calculate_extrema_bands`.
"""
try:
return self._extrema_bands
except AttributeError:
raise AttributeError("Have you called `extrema.calculate_extrema_bands`?")
[docs]
@abstractmethod
def load_or_set_data(self):
pass
# path = Path(__file__).parent / "ssn_extrema.csv"
# data = pd.read_csv(path, header=0, skiprows=15, index_col=0)
# data = pd.to_datetime(data.stack(), format="%Y-%m-%d").unstack(level=1)
# data.columns.names = ["kind"]
# self._data = data
#######################################################################
# Tools for grouping data by Cycle and Cycle Edge (Rising or Falling) #
#######################################################################
[docs]
def calculate_intervals(self):
r"""Compute rising, falling, and full-cycle time intervals.
Notes
-----
The rising edge comes before the falling edge in time, i.e. it's
Min ``N`` followed by Max ``N``. Also calculate intervals for a full
SSN cycle.
"""
extrema = self.data
intervals = pd.DataFrame(
index=extrema.index,
columns=pd.Index(["Rise", "Fall", "Cycle"], name="kind"),
)
# Make `today` only keep the date.
today = pd.to_datetime(pd.to_datetime("today").date())
for c, r in extrema.iterrows():
t0 = r.loc["Min"]
t1 = r.loc["Max"]
if pd.isna(t1):
# No maximum yet, then use Today for maximum
t1 = today
try:
# Get next cycle's Minimum to calculate Falling edge
t2 = extrema.loc[c + 1, "Min"]
except KeyError:
if t1 < today:
# We haven't reached next Min yet, but have current cycle Max
# so use today.
t2 = today
else:
# This cycle does not have a falling edge.
t2 = t1 + pd.to_timedelta(6 * 365, unit="D")
rise_ = pd.Interval(t0, t1)
fall_ = pd.Interval(t1, t2)
all_ = pd.Interval(t0, t2)
if t1 == t2:
# Then no falling edge
fall_ = pd.NaT
intervals.loc[c] = pd.Series({"Rise": rise_, "Fall": fall_, "Cycle": all_})
intervals = intervals.sort_index(axis=1)
self._cycle_intervals = intervals
return intervals
[docs]
def cut_spec_by_interval(self, epoch, kind=None, tk_cycles=None):
r"""Assign epochs to solar-cycle intervals.
Parameters
----------
epoch : pandas.Series or pandas.DatetimeIndex
Data to cut.
kind : str, optional
If provided, restricts the cut to a subset of interval types.
========= ===============================
Key Description
========= ===============================
None Cut by all available options.
"Cycle" Cut by solar cycle
"Rise" Cut by rising edge
"Fall" Cut by falling edge
"Edges" Cut by `["Fall", "Rise"]`.
Exclusive option.
========= ===============================
Note that ``"Edges"`` is exclusive and will specify
``["Fall", "Rise"]`` alone.
tk_cycles : list or slice, optional
If not ``None``, a selector used to choose target solar cycles.
Returns
-------
pandas.Series
Series of :class:`pandas.Interval` objects labeling each epoch.
"""
if isinstance(epoch, pd.DatetimeIndex):
epoch = epoch.to_series()
intervals = self.cycle_intervals
available_kind = intervals.columns.get_level_values("kind")
if kind is None:
kind = available_kind
elif isinstance(kind, str):
if kind == "Edges":
intervals = intervals.loc[:, ["Fall", "Rise"]]
# kind = ["Rise", "Fall"]
else:
if kind not in available_kind:
raise ValueError(f"""Interval `{kind!s}` is unavailable""")
intervals = intervals.loc[:, [kind]]
# kind = [kind]
elif hasattr(kind, "__iter__"):
if not np.all([k in available_kind for k in kind]):
raise ValueError(f"""Interval `{kind!s}` is unavailable""")
intervals = intervals.loc[:, kind]
else:
raise ValueError(f"""Interval `{kind!s}` is unavailable""")
if tk_cycles is not None:
intervals = intervals.loc[tk_cycles]
ii = pd.IntervalIndex(intervals.stack()).sort_values()
if not (ii.is_unique and ii.is_monotonic_increasing):
raise ValueError
cut = pd.cut(epoch, ii)
cut.name = "Cycle_Interval"
return cut
############################################################
# Tools for selecting data within some dt of cycle extrema #
############################################################
[docs]
def calculate_extrema_bands(self, dt="365d"):
r"""Return time windows around indicator extrema.
Parameters
----------
dt : str or pandas.Timedelta, optional
Half-width of the window around each extremum. Defaults to ``"365d"``.
Returns
-------
pandas.DataFrame
``Min`` and ``Max`` intervals for each cycle.
"""
dt = pd.to_timedelta(dt)
extrema = self.data.stack()
dt = pd.to_timedelta(dt)
dt = np.atleast_1d(dt)
if dt.size == 2:
dl, dr = dt
elif dt.size == 1:
dl = dr = dt[0]
else:
raise ValueError("Only know how to handle 1 or 2 dt options.")
left = extrema.subtract(dl)
right = extrema.add(dr)
lr = pd.DataFrame({"left": left, "right": right})
def make_interval(x):
return pd.Interval(x.loc["left"], x.loc["right"])
bands = lr.apply(make_interval, axis=1).unstack(level="kind")
self._extrema_bands = bands
return bands
[docs]
def cut_about_extrema_bands(self, epoch, tk_cycles=None, kind=None):
r"""Bin epochs relative to extrema bands.
Computed with :py:meth:`calculate_extrema_bands`.
Parameters
----------
epoch : pandas.DatetimeIndex
Times to classify.
tk_cycles : slice, optional
Subset of cycles to use when cutting.
kind : {{"Min", "Max"}}, optional
Restrict the classification to minima or maxima.
Returns
-------
tuple[pandas.Series, pandas.Series]
A series of intervals and a mapped series of the form ``"N-Min"`` or
``"N-Max"``.
"""
bands = self.extrema_bands
if tk_cycles is not None:
bands = bands.loc[tk_cycles]
if kind is None:
kind = ["Min", "Max"]
elif isinstance(kind, str):
kind = [kind]
bands = bands.loc[:, kind]
bands = bands.stack()
# TODO: verify bands shape
intervals = pd.IntervalIndex(bands.values).sort_values()
cut = pd.cut(epoch, intervals)
cut = pd.Series(cut, index=epoch, name="spec_by_extrema_band")
mapper = bands.reset_index(name="Intervals").set_index("Intervals")
mapper = mapper.loc[:, "Number"].astype(str) + "-" + mapper.loc[:, "kind"]
mapped = cut.map(mapper)
return cut, mapped