Source code for solarwindpy.fitfunctions.core

#!/usr/bin/env python
r"""Base classes used to implement specific fit functions.

The :class:`FitFunction` abstract base class handles selecting the
observations to include in a fit, running SciPy optimizers and
providing convenient plotting helpers.  Subclasses need only define
the functional form and an initial parameter guess.
"""

import logging  # noqa: F401
import warnings

import numpy as np
import pandas as pd

from abc import ABC, abstractmethod
from collections import namedtuple
from inspect import getfullargspec
from docstring_inheritance import NumpyDocstringInheritanceMeta

# from scipy.optimize import curve_fit
from scipy.optimize import least_squares, OptimizeWarning

try:
    from scipy.optimize._minpack_py import (
        _wrap_func,
        _wrap_jac,
        _initialize_feasible,
    )
except ImportError:  # pragma: no cover - fall back for older SciPy versions
    from scipy.optimize.minpack import (
        _wrap_func,
        _wrap_jac,
        _initialize_feasible,
    )
from scipy.optimize._lsq.least_squares import prepare_bounds
from scipy.linalg import svd, cholesky, LinAlgError

from .tex_info import TeXinfo
from .plots import FFPlot

Observations = namedtuple("Observations", "x,y,w")
UsedRawObs = namedtuple("UsedRawObs", "used,raw,tk_observed")
InitialGuessInfo = namedtuple("InitialGuessInfo", "p0,bounds")
ChisqPerDegreeOfFreedom = namedtuple("ChisqPerDegreeOfFreedom", "linear,robust")
FitBounds = namedtuple("FitBounds", "lower,upper")


[docs] class FitFunctionError(Exception): """Base exception for fit function errors.""" pass
[docs] class InsufficientDataError(FitFunctionError): """Raised when there is insufficient data to perform the fit.""" pass
[docs] class FitFailedError(FitFunctionError): """Raised when the fitting algorithm fails to converge.""" pass
[docs] class InvalidParameterError(FitFunctionError): """Raised when invalid parameters are provided to fit functions.""" pass
# Combine ABC and docstring inheritance metaclasses
[docs] class FitFunctionMeta(NumpyDocstringInheritanceMeta, type(ABC)): """Metaclass combining ABC and docstring inheritance.""" pass
[docs] class FitFunction(ABC, metaclass=FitFunctionMeta): r"""Assuming that you don't want special formatting, call order is: fit_function = FitFunction(function, TeX_string) fit_function.make_fit() Instances are callable. If the fit fails, calling the instance will return an array of NaNs the same shape as the x-values. """
[docs] def __init__( self, xobs, yobs, xmin=None, xmax=None, xoutside=None, ymin=None, ymax=None, youtside=None, weights=None, wmin=None, wmax=None, logx=False, logy=False, ): """Initialize fit function with observed data. Parameters ---------- xobs : array-like Observed x values (independent variable). Shape must match yobs. yobs : array-like Observed y values (dependent variable). Shape must match xobs. xmin, xmax : float, optional Range limits for x used in fitting. Values outside this range are excluded from the fit. All boundaries are inclusive (>= or <=). xoutside : tuple(float, float), optional Include only data outside this range in the fit. Useful for excluding a central region. Format: (lower, upper) where lower < upper. ymin, ymax : float, optional Range limits for y used in fitting. Values outside this range are excluded from the fit. youtside : tuple(float, float), optional Include only data outside this range. Format: (lower, upper) where lower < upper. weights : array-like, optional Uncertainties (1-sigma) associated with y values. Used for weighted least squares fitting. If 1-d array, interpreted as diagonal covariance matrix. If 2-d, must be positive definite covariance matrix. wmin, wmax : float, optional Weight limits. Observations with weights outside this range are excluded from the fit. logx, logy : bool, default False Whether to interpret x or y on a log10 scale. If logy=True, weight selection uses w/(y*ln(10)) for proper error propagation in log space. Notes ----- The fitting procedure uses scipy.optimize.least_squares with robust loss functions (Huber by default) to handle outliers. The initial parameter guess is provided by the p0 property, which must be implemented by subclasses. All subclasses inherit this documentation automatically through the docstring-inheritance metaclass. Examples -------- >>> import numpy as np # doctest: +SKIP >>> from solarwindpy.fitfunctions import Gaussian # doctest: +SKIP >>> x = np.linspace(-5, 5, 100) # doctest: +SKIP >>> y = 3 * np.exp(-0.5 * x**2) + np.random.normal(0, 0.1, 100) # doctest: +SKIP >>> fit = Gaussian(x, y, xmin=-3, xmax=3) # doctest: +SKIP >>> fit.make_fit() # doctest: +SKIP >>> print(f"Fitted mu: {fit.popt['mu']:.3f}") # doctest: +SKIP See Also -------- make_fit : Execute the fitting procedure popt : Access optimized parameters rsq : Calculate coefficient of determination """ self._init_logger() self._set_argnames() if weights is None: assert wmin is None assert wmax is None self.set_fit_obs( xobs, yobs, weights, xmin=xmin, xmax=xmax, xoutside=xoutside, ymin=ymin, ymax=ymax, youtside=youtside, wmin=wmin, wmax=wmax, logx=logx, logy=logy, )
def __str__(self): return f"{self.__class__.__name__} ({self.TeX_function})" def __call__(self, x): """Evaluate the fitted model at ``x``.""" # Design decision: Keep interface simple - __call__ evaluates the fitted # function using stored parameters. For parameter overrides, users should # call self.function(x, param1, param2, ...) directly. # Sort the parameter keywords into the proper order to pass to the # numerical function. # try: popt_ = self.popt popt_ = [popt_[k] for k in self.argnames] # NOTE # An instance of FitFunction is for a given function. To change the # function itself, a new instance of FitFunction should be required. # Therefore, we access the function directly. y = self.function(x, *popt_) # except AttributeError as e: # if "'PowerLaw' object has no attribute '_popt'" in str(e): # y = np.full_like(x, np.nan, dtype=np.float64) return y @property def logger(self): return self._logger def _init_logger(self): """Create a module-level logger for the instance.""" logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") self._logger = logger @property @abstractmethod def function(self): r"""Get the function that`curve_fit` fits. The function is set at instantiation. It doesn't make sense to change it unless you redefine the entire FitFunction, so there is no `new` kwarg. """ pass @property @abstractmethod def p0(self): r"""The initial guess for the FitFunction.""" pass @property @abstractmethod def TeX_function(self): r"""Function written in LaTeX.""" pass @property def argnames(self): r"""The names of the actual function arguments pulled by getfullargspec.""" return self._argnames @property def fit_bounds(self): r"""Bounds used when running the fit.""" return dict(self._fit_bounds) @property def chisq_dof(self): r"""Chisq per degree of freedom :math:`\chi^2_\nu`. If None, not calculated by `make_fit_old`. If `np.nan`, fit failed. """ # r = self.residuals(pct=False) # sigma = self.observations.used.w # if sigma is not None: # r = r / sigma # # chisq = (r ** 2).sum() # dof = r.size - len(self.p0) # chisq_dof = chisq / dof # return chisq_dof try: return self._chisq_dof except AttributeError: return None @property def dof(self): r"""Degrees of freedom in the fit.""" return self.observations.used.y.size - len(self.p0) @property def fit_result(self): return self._fit_result @property def initial_guess_info(self): # If failed to make an initial guess, then don't build the info. try: p0 = self.p0 bounds = self.fit_bounds except AttributeError: return None names = self.argnames info = { name: InitialGuessInfo(guess, tuple(bounds[name])) for name, guess in zip(names, p0) } # info = ["\n".join(param) for param in info] # info = "\n\n".join(info) return info @property def nobs(self): r"""The total number of observations used in the fit.""" return self.observations.tk_observed.sum() @property def observations(self): return self._observations @property def plotter(self): # try: return self._plotter # except AttributeError: # return self.build_plotter() @property def popt(self): r"""Optimized fit parameters.""" return dict(self._popt) @property def psigma(self): return dict(self._psigma) @property def combined_popt_psigma(self): r"""Return optimized parameters and uncertainties as a DataFrame. Returns ------- pd.DataFrame DataFrame with columns 'popt' and 'psigma', indexed by parameter names. Relative uncertainty can be computed as: df['psigma'] / df['popt'] """ return pd.DataFrame({"popt": self.popt, "psigma": self.psigma}) @property def pcov(self): r"""Returns a copy so that the matrix isn't accidentally edited.""" return self._pcov.copy() @property def rsq(self): r"""Coefficient of determination. Source: <en.wikipedia.org/wiki/Coefficient_of_determination#Definitions> """ y = self.observations.used.y ybar = y.mean() yfit = self(self.observations.used.x) sum_squares_total = ((y - ybar) ** 2).sum() sum_squares_residual = ((y - yfit) ** 2).sum() rsq = 1 - (sum_squares_residual / sum_squares_total) return rsq @property def sufficient_data(self): r"""Ensure that we can fit the data before doing any computations.""" chk = self.nobs >= len(self.argnames) if not chk: msg = "There is insufficient data to fit the model." raise InsufficientDataError(msg) else: return True @property def TeX_info(self): # try: return self._TeX_info # except AttributeError: # return self.build_TeX_info() def _clean_raw_obs(self, xobs, yobs, weights): r"""Set the raw x- and y-values along with weights for the fit. Doesn't account for extrema, finite data, etc. """ xobs = np.asarray(xobs) yobs = np.asarray(yobs) if weights is not None: weights = np.asarray(weights) if xobs.shape != yobs.shape: raise InvalidParameterError( f"""xobs and yobs must have the same shape. xobs: {xobs.shape}, yobs: {yobs.shape}""" ) if weights is not None and weights.shape != xobs.shape: raise InvalidParameterError( f"""weights and xobs must have the same shape. weights: {weights.shape}, xobs: {xobs.shape}""" ) return xobs, yobs, weights def _build_one_obs_mask(self, axis, x, xmin, xmax): """Build observation mask with in-place operations for efficiency.""" mask = np.isfinite(x) if xmin is not None: mask &= x >= xmin # In-place AND instead of creating xmin_mask if xmax is not None: mask &= x <= xmax # In-place AND instead of creating xmax_mask return mask def _build_outside_mask(self, axis, x, outside): """Build outside mask with in-place operations for efficiency.""" if outside is None: return np.full_like(x, True, dtype=bool) lower, upper = outside assert lower < upper mask = x <= lower mask |= x >= upper # In-place OR instead of creating separate u_mask return mask def _set_argnames(self): r"""Set the arguments of the function/ Assume that the first is dependent variable. Should be called after function is set. """ args = getfullargspec(self.function).args[1:] self._argnames = args
[docs] def build_plotter(self): obs = self.observations try: yfit = self(self.observations.raw.x) except AttributeError: yfit = np.full_like(self.observations.raw.x, np.nan) # robust_residuals = self.fit_result.fun tex_info = self.TeX_info fit_result = self.fit_result # try: # fit_result = self.fit_result # except AttributeError: # fit_result = None plotter = FFPlot( obs, yfit, # robust_residuals, tex_info, fit_result, fitfunction_name=self.__class__.__name__, ) self._plotter = plotter return plotter
[docs] def build_TeX_info(self): # Allows annotating of TeX_info when fit fails in a manner # that is easily identifiable. try: popt = self.popt except AttributeError: popt = {k: np.nan for k in self.argnames} try: psigma = self.psigma except AttributeError: psigma = {k: np.nan for k in self.argnames} tex_info = TeXinfo( popt, psigma, self.TeX_function, self.chisq_dof, self.rsq, initial_guess_info=self.initial_guess_info, ) self._TeX_info = tex_info return tex_info
[docs] def residuals(self, pct=False, use_all=False): r""" Calculate fit residuals. Parameters ---------- pct : bool, default=False If True, return percentage residuals. use_all : bool, default=False If True, calculate residuals for all input data including points excluded by constraints (xmin, xmax, etc.) passed during initialization. If False (default), calculate only for points used in fit. Returns ------- numpy.ndarray Residuals as observed - fitted. Examples -------- >>> # Create FitFunction with constraints >>> ff = Gaussian(x, y, xmin=3, xmax=7) >>> ff.make_fit() >>> >>> # Residuals for fitted region only >>> r_fit = ff.residuals() >>> >>> # Residuals for all original data >>> r_all = ff.residuals(use_all=True) >>> >>> # Percentage residuals >>> r_pct = ff.residuals(pct=True) Notes ----- Addresses TODO: "calculate with all values...including those excluded by set_extrema" (though set_extrema doesn't exist - constraints are passed in __init__). """ if use_all: # Use all original observations x = self.observations.raw.x y = self.observations.raw.y else: # Use only observations included in fit (default) x = self.observations.used.x y = self.observations.used.y # Calculate residuals (observed - fitted) fitted_values = self(x) r = y - fitted_values if pct: # Avoid division by zero with np.errstate(divide="ignore", invalid="ignore"): r = 100.0 * (r / fitted_values) r[fitted_values == 0] = np.nan return r
[docs] def set_fit_obs( self, xobs_raw, yobs_raw, weights_raw, xmin=None, xmax=None, xoutside=None, ymin=None, ymax=None, youtside=None, wmin=None, wmax=None, logx=False, logy=False, ): r"""Set the observed values we'll actually use in the fit. By applying limits to xobs_raw and yobs_raw and checking for finite values. All boundaries are inclusive <= or >=. If logy, then make selection of `wmin` and `wmax` based on :math:`w/(y \ln(10))`. """ xobs_raw, yobs_raw, weights_raw = self._clean_raw_obs( xobs_raw, yobs_raw, weights_raw ) xmask = self._build_one_obs_mask("xobs", xobs_raw, xmin, xmax) ymask = self._build_one_obs_mask("yobs", yobs_raw, ymin, ymax) xout_mask = self._build_outside_mask("xobs", xobs_raw, xoutside) yout_mask = self._build_outside_mask("yobs", yobs_raw, youtside) mask = xmask & ymask & xout_mask & yout_mask if weights_raw is not None: weights_for_mask = weights_raw if logy: # Enables picking weights based on normalized scale for log stuff weights_for_mask = weights_for_mask / (yobs_raw * np.log(10)) wmask = self._build_one_obs_mask("weights", weights_for_mask, wmin, wmax) mask = mask & wmask xobs = xobs_raw[mask] yobs = yobs_raw[mask] weights = None if weights_raw is not None: weights = weights_raw[mask] used = Observations(xobs, yobs, weights) raw = Observations(xobs_raw, yobs_raw, weights_raw) usedrawobs = UsedRawObs(used, raw, mask) self._observations = usedrawobs
def _run_least_squares(self, **kwargs): """Execute :func:`scipy.optimize.least_squares` with defaults.""" p0 = kwargs.pop("p0", self.p0) bounds = kwargs.pop("bounds", (-np.inf, np.inf)) method = kwargs.pop("method", "trf") loss = kwargs.pop("loss", "huber") max_nfev = kwargs.pop("max_nfev", 10000) f_scale = kwargs.pop("f_scale", 0.1) jac = kwargs.pop("jac", "2-point") # loss_fcn = _loss_fcns.pop(loss, loss) # Copied from `curve_fit` line 704 (20200527) if p0 is None: # determine number of parameters by inspecting the function from scipy._lib._util import getargspec_no_self as _getargspec args, varargs, varkw, defaults = _getargspec(self.function) if len(args) < 2: raise ValueError("Unable to determine number of fit parameters.") n = len(args) - 1 else: p0 = np.atleast_1d(p0) n = p0.size if isinstance(bounds, dict): # Monkey patch to work with bounds being stored as # dict for TeX_info. (20201202) bounds = [bounds[k] for k in self.argnames] bounds = np.array(bounds).T # Copied from `curve_fit` line 715 (20200527) lb, ub = prepare_bounds(bounds, n) if p0 is None: p0 = _initialize_feasible(lb, ub) if "args" in kwargs: raise ValueError( "Adopted `curve_fit` convention for which'args' is not a supported keyword argument." ) xdata = self.observations.used.x ydata = self.observations.used.y sigma = self.observations.used.w # Copied from `curve_fit` line 749 (20200527) # Determine type of sigma if sigma is not None: sigma = np.asarray(sigma) # sigma = sigma / np.nansum(sigma) # if 1-d, sigma are errors, define transform = 1/sigma if sigma.shape == (ydata.size,): transform = 1.0 / sigma # if 2-d, sigma is the covariance matrix, # define transform = L such that L L^T = C elif sigma.shape == (ydata.size, ydata.size): try: # scipy.linalg.cholesky requires lower=True to return L L^T = A transform = cholesky(sigma, lower=True) except LinAlgError: raise ValueError("`sigma` must be positive definite.") else: raise ValueError("`sigma` has incorrect shape.") else: transform = None # Copied from `curve_fit` line 769 (20200527) loss_func = _wrap_func(self.function, xdata, ydata, transform) # Already define default `jac` with `kwargs`. Don't need ELSE clause. if callable(jac): jac = _wrap_jac(jac, xdata, transform) res = least_squares( loss_func, p0, jac=jac, bounds=bounds, method=method, loss=loss, max_nfev=max_nfev, f_scale=f_scale, **kwargs, ) if not res.success: raise FitFailedError("Optimal parameters not found: " + res.message) fit_bounds = np.concatenate([lb, ub]).reshape((2, -1)).T fit_bounds = {k: FitBounds(*b) for k, b in zip(self.argnames, fit_bounds)} fit_bounds = tuple(fit_bounds.items()) self._fit_bounds = fit_bounds # self._loss_fcn = loss_fcn return res, p0 def _calc_popt_pcov_psigma_chisq(self, res, p0): """Compute optimized parameters and statistics from the result.""" xdata = self.observations.used.x ydata = self.observations.used.y sigma = self.observations.used.w # The following is from `curve_fit` line 801 and following. (20200625) # `cost` is the robust loss, i.e. residuals passed through loss funciton. ysize = len(res.fun) cost = 2 * res.cost # res.cost is half sum of squares! popt = res.x # Linear chisq_dof value. dof = ydata.size - len(p0) chisq_dof = np.inf # Divide by zero => infinity if dof: r = self.function(xdata, *popt) - ydata if sigma is not None: r /= sigma chisq_dof = (r**2).sum() / dof # Do Moore-Penrose inverse discarding zero singular values. _, s, VT = svd(res.jac, full_matrices=False) threshold = np.finfo(float).eps * max(res.jac.shape) * s[0] s = s[s > threshold] VT = VT[: s.size] pcov = np.dot(VT.T / (s**2), VT) warn_cov = False if ysize > p0.size: # Cost is robust residuals, so this is robust chisq per dof. s_sq = cost / (ysize - p0.size) pcov = pcov * s_sq else: s_sq = np.nan pcov.fill(np.inf) warn_cov = True if warn_cov: warnings.warn( "Covariance of the parameters could not be estimated", category=OptimizeWarning, ) psigma = np.sqrt(np.diag(pcov)) # Based on `curve_fit`'s `absolute_sigma` documentation and reading # `least_square`, `s_sq` should be chisq_nu based on robust residuals # that account for `f_scale`(20200527). all_chisq = ChisqPerDegreeOfFreedom(chisq_dof, s_sq) return popt, pcov, psigma, all_chisq
[docs] def make_fit(self, return_exception=False, **kwargs): """Fit the function with the independent `xobs` and dependent `yobs`. Uses `least_squares` and returns the `OptimizeResult` object, but treats weights as in `curve_fit`. Parameters ---------- return_exception: bool If True, return exceptions from fitting routine, instead of raising. This is useful when looping through many fits and wanting to identify failed fits after the fact. kwargs: Unless specified here, defaults are as defined by `curve_fit`. ============= ====================================== kwarg default ============= ====================================== p0 Setup by `{self.__class__.__name__}` return_full False method "trf" loss "huber" max_nfev 10000 f_scale 0.1 ============= ====================================== """ try: assert self.sufficient_data # Check we have enough data to fit. except (AssertionError, ValueError, InsufficientDataError) as e: # raise if isinstance(e, AssertionError): e = InsufficientDataError("Insufficient data to fit the model") if return_exception: return e else: raise absolute_sigma = kwargs.pop("absolute_sigma", False) if absolute_sigma: raise NotImplementedError("We want to rescale fit errors by chisq_dof") try: res, p0 = self._run_least_squares(**kwargs) except (RuntimeError, ValueError, FitFailedError) as e: # print("fitting failed", flush=True) # raise if return_exception: return e else: raise popt, pcov, psigma, all_chisq = self._calc_popt_pcov_psigma_chisq(res, p0) self._popt = list(zip(self.argnames, popt)) self._psigma = list(zip(self.argnames, psigma)) self._pcov = pcov self._chisq_dof = all_chisq self._fit_result = res self.build_TeX_info() self.build_plotter()