Source code for solarwindpy.plotting.hist1d

#!/usr/bin/env python
r"""One-dimensional histogram plotting utilities."""


import numpy as np
import pandas as pd
import matplotlib as mpl

from types import FunctionType
from matplotlib import pyplot as plt

from . import base
from . import labels as labels_module
from .agg_plot import AggPlot

# import os
# import psutil


# def log_mem_usage():
#    usage = psutil.Process(os.getpid()).memory_info()
#    usage = "\n".join(
#        ["{} {:.3f} GB".format(k, v * 1e-9) for k, v in usage._asdict().items()]
#    )
#    logging.getLogger("main").warning("Memory usage\n%s", usage)


[docs] class Hist1D(AggPlot): r"""Create 1D plot of `x`, optionally aggregating `y` in bins of `x`. Attributes ---------- _gb_axes, path Methods ------- set_path, set_data, agg, _format_axis, make_plot """
[docs] def __init__( self, x, y=None, logx=False, axnorm=None, clip_data=False, nbins=101, bin_precision=None, ): """Create a one-dimensional histogram. Parameters ---------- x : pandas.Series Data from which to create bins. y : pandas.Series or None, optional Values to aggregate in bins of ``x``. If ``None``, counts of ``x`` are used. logx : bool, optional If ``True``, compute bins in logarithmic space. axnorm : {"t", "d", None}, optional Normalisation applied to the histogram. ``"t"`` uses total counts and ``"d"`` yields a density. clip_data : bool, optional Remove extreme values at the 0.001 and 0.999 percentiles before binning or aggregation. nbins : int or array-like, optional Binning strategy passed to :func:`numpy.histogram_bin_edges` or :func:`pandas.cut` depending on the input type. bin_precision : int, optional Precision for decimal bin edges. """ super(Hist1D, self).__init__() self.set_log(x=logx) self.set_axnorm(axnorm) self.set_data(x, y, clip_data) self.set_labels(x="x", y=labels_module.Count(norm=axnorm) if y is None else "y") self.calc_bins_intervals(nbins=nbins, precision=bin_precision) self.make_cut() self.set_clim(None, None) self.set_alim(None, None)
@property def _gb_axes(self): return ("x",)
[docs] def set_path(self, new, add_scale=True): path, x, y, z, scale_info = super(Hist1D, self).set_path(new, add_scale) if new == "auto": path = path / x / y else: assert x is None assert y is None if add_scale: assert scale_info is not None scale_info = scale_info[0] path = path / scale_info self._path = path
set_path.__doc__ = base.Base.set_path.__doc__
[docs] def set_data(self, x, y, clip): data = pd.DataFrame({"x": np.log10(np.abs(x)) if self.log.x else x}) if y is None: y = pd.Series(1, index=x.index) data.loc[:, "y"] = y self._data = data self._clip = clip
[docs] def set_axnorm(self, new): r"""The method by which the gridded data is normalized. ===== ============================================================= key description ===== ============================================================= d Density normalize t Total normalize ===== =============================================================""" if new is not None: new = new.lower()[0] assert new == "d" ylbl = self.labels.y if isinstance(ylbl, labels_module.Count): ylbl.set_axnorm(new) ylbl.build_label() self._axnorm = new
[docs] def construct_cdf(self, only_plotted=True): r"""Convert the obsered measuremets. Returns ------- cdf: pd.DataFrame "x" column is the value of the measuremnt. "position" column is the normalized position in the cdf. To plot the cdf: cdf.plot(x="x", y="cdf") """ data = self.data if not data.loc[:, "y"].unique().size <= 2: raise ValueError("Only able to convert data to a cdf if it is a histogram.") tk = self.cut.loc[:, "x"].notna() if only_plotted: tk = tk & self.get_plotted_data_boolean_series() x = data.loc[tk, "x"] cdf = x.sort_values().reset_index(drop=True) if self.log.x: cdf = 10.0**cdf cdf = cdf.to_frame() cdf.loc[:, "position"] = cdf.index / cdf.index.max() return cdf
def _axis_normalizer(self, agg): r"""Takes care of row, column, total, and density normaliation. Written basically as `staticmethod` so that can be called in `OrbitHist2D`, but as actual method with `self` passed so we have access to `self.log` for density normalization. """ axnorm = self.axnorm if axnorm is None: pass elif axnorm == "d": n = agg.sum() dx = pd.Series(pd.IntervalIndex(agg.index).length, index=agg.index) if self.log.x: dx = 10.0**dx agg = agg.divide(dx.multiply(n)) elif axnorm == "t": agg = agg.divide(agg.max()) else: raise ValueError("Unrecognized axnorm: %s" % axnorm) return agg
[docs] def agg(self, **kwargs): if self.axnorm == "d": fcn = kwargs.get("fcn", None) if (fcn != "count") & (fcn is not None): raise ValueError("Unable to calculate a PDF with non-count aggregation") agg = super(Hist1D, self).agg(**kwargs) agg = self._axis_normalizer(agg) agg = self._agg_reindexer(agg) return agg
[docs] def set_labels(self, **kwargs): if "z" in kwargs: raise ValueError(r"{} doesn't have a z-label".format(self)) y = kwargs.pop("y", self.labels.y) if isinstance(y, labels_module.Count): y.set_axnorm(self.axnorm) y.build_label() super(Hist1D, self).set_labels(y=y, **kwargs)
[docs] def make_plot( self, ax=None, fcn=None, transpose_axes=False, gaussian_filter_std=0, plot_window=False, plot_window_edges=False, gaussian_filter_kwargs=None, **kwargs, ): """Make a plot. Parameters ---------- ax: None, mpl.axis.Axis If `None`, create a subplot axis. fcn: None, str, aggregative function, or 2-tuple Passed directly to `{self.__class__.__name__}.agg`. If None, use the default aggregation function. If str or a single aggregative function, use it. If a 2-tuple is passed, then the first element aggregates and the second element calculates an uncertainty. transpose_axes: bool If True, plot independent values on y-axis and dependent values on x-axis. Primary use case is plotting 1D projection of 2D plot adjascent to 2D axis. gaussian_filter_std: int If > 0, apply `scipy.ndimage.gaussian_filter` to the z-values using the standard deviation specified by `gaussian_filter_std`. gaussian_filter_kwargs: None, dict If not None and gaussian_filter_std > 0, passed to :py:meth:`scipy.ndimage.gaussian_filter` plot_window: bool Requires two functions passed to `fcn`. Instead of error bars, plots the uncertainty window as a semi-transparent band. plot_window_edges: bool If True, plot solid lines at the window boundaries. kwargs: Passed directly to `ax.plot`. """ agg = self.agg(fcn=fcn) x = pd.IntervalIndex(agg.index).mid dx = None # Initialize default value. Necessary for `transpose_axes`. if fcn is None or isinstance(fcn, (str, FunctionType)): y = agg dy = None elif len(fcn) == 2: f0, f1 = fcn if isinstance(f0, FunctionType): f0 = f0.__name__ if isinstance(f1, FunctionType): f1 = f1.__name__ y = agg.loc[:, f0] dy = agg.loc[:, f1] else: raise ValueError(f"Unrecognized `fcn` ({fcn})") if ax is None: fig, ax = plt.subplots() if self.log.x: x = 10.0**x if gaussian_filter_std: from scipy.ndimage import gaussian_filter if gaussian_filter_kwargs is None: gaussian_filter_kwargs = dict() y = gaussian_filter(y, gaussian_filter_std, **gaussian_filter_kwargs) drawstyle = kwargs.pop("drawstyle", "steps-mid") if transpose_axes: x, y = y, x dx, dy = dy, dx window_kwargs = kwargs.pop("window_kwargs", dict()) kwargs = mpl.cbook.normalize_kwargs(kwargs, mpl.lines.Line2D._alias_map) if plot_window: window_plotter = ax.fill_between if transpose_axes: window_plotter = ax.fill_betweenx color = kwargs.pop("color", None) ls = kwargs.pop("linestyle", "-") label = kwargs.pop("label", None) window_alpha = window_kwargs.pop("alpha", 0.15) window_color = window_kwargs.pop("color", color) window_linestyle = window_kwargs.pop("linestyle", ls) line = ax.plot(x, y, color=color, linestyle=ls, label=label, **kwargs) if plot_window_edges: ax.plot( x, y + dy, color=window_color, linestyle=window_linestyle, **window_kwargs, ) ax.plot( x, y - dy, color=window_color, linestyle=window_linestyle, **window_kwargs, ) polycol = window_plotter( x, y - dy, y + dy, color=window_color, linestyle=window_linestyle, alpha=window_alpha, **window_kwargs, ) out = (line, polycol) else: out = ax.errorbar(x, y, xerr=dx, yerr=dy, drawstyle=drawstyle, **kwargs) self._format_axis(ax, transpose_axes=transpose_axes) return ax, out
[docs] def take_data_in_yrange_across_x( self, ranges_by_x, get_x_bounds, get_y_bounds, ): r"""Take data within y-ranges across x-values. Parameters ---------- ranges_by_x: iterable An iterable with keys used to get the left and right bounds for the data and values used to get the top and bottom bounds for the data. get_x_bounds: function First argument is one key of `ranges_by_x` and returns `left, right`. Second argument is a kwarg (`expected_logx`) boolean to transform the returned values according to whether or not the keys are :math:`log(x)` or :math:`x` in a manner that matches data stored in Hist2D. get_y_bounds: functions Takes on value of `ranges_by_x` and returns `top, bottom`. Second argument Second argument is a kwarg (`expected_logx`) boolean to transform the returned values according to whether or not the keys are :math:`log(y)` or :math:`y` in a manner that matches data stored in Hist2D. Returns ------- taken: np.ndarray 1D Array of indices for selecting data in interval. """ available_x = self.agg().index assert not ranges_by_x.index.symmetric_difference(available_x).size data = self.data logx = self.log.x logy = self.log.y taken = [] for x, at_x in ranges_by_x.iterrows(): l, r = get_x_bounds(x, expected_logx=logx) b, t = get_y_bounds(at_x, expected_logy=logy) assert l < r assert b < t tkx = (l < data.x) & (data.x <= r) tky = (b < data.y) & (data.y <= t) tk = tkx & tky tk = tk.loc[tk].index taken.append(tk) taken = np.sort(np.concatenate(taken)) return taken