Source code for solarwindpy.core.alfvenic_turbulence

#!/usr/bin/env python
"""Alfvenic turbulence diagnostics using Elsasser variables.

Notes
-----
The implementation follows the formalism outlined in Bruno & Carbone [1].
Lloyd Woodham <https://orcid.org/0000-0003-2845-4250> helped me define these
calculations at the 2018 AGU Fall Meeting and understand [1]. Please cite [3]
if using this module.

References
----------
[1] Bruno, R., & Carbone, V. (2013). *Living Reviews in Solar Physics*,
    10(1), 1–208. https://doi.org/10.12942/lrsp-2013-2
[2] Telloni, D., & Bruno, R. (2016). *Monthly Notices of the Royal Astronomical
    Society: Letters*, 463(1), L79–L83. https://doi.org/10.1093/mnrasl/slw135
[3] Woodham, L. D., Wicks, R. T., Verscharen, D., & Owen, C. J. (2018).
    *Astrophys. J.*, 856, 49.
"""


import numpy as np
import pandas as pd

from collections import namedtuple

# We rely on views via DataFrame.xs to reduce memory size and do not
# `.copy(deep=True)`, so we want to make sure that this doesn't
# accidentally cause a problem.

from . import base

AlvenicTurbAveraging = namedtuple("AlvenicTurbAveraging", "window,min_periods")


[docs] class AlfvenicTurbulence(base.Core): r"""Alfv\'enic turbulence diagnostics using Elsasser variables. Parameters ---------- velocity : :class:`pandas.DataFrame` Plasma velocity in the same basis as ``bfield``. bfield : :class:`pandas.DataFrame` Magnetic field in the same basis as ``velocity``. rho : :class:`pandas.Series` Mass density used for normalising ``bfield``. species : str Species string used when converting to Alfv\'en units. Notes ----- Implementation follows the formalism of Bruno & Carbone (2013). """
[docs] def __init__( self, velocity, bfield, rho, species, raffaella_version=False, sc_vector=None, **kwargs, ): r"""Initialize an :py:class:`AlfvenicTurbulence` object. Parameters ---------- velocity: pd.DataFrame Vector velocity measurments. bfield: pd.DataFrame Vector mangetic field measurements. rho: pd.Series Mass density measurments, used to put `bfield` into Alfven units. kwargs: Passed to `rolling` method when mean-subtracing in `set_data`. """ super(AlfvenicTurbulence, self).__init__() self.set_data( velocity, bfield, rho, species, raffaella_version=raffaella_version, sc_vector=sc_vector, **kwargs, )
@property def data(self): r"""Mean-subtracted quantities used to calculated Elsasser variables.""" return self._data @property def averaging_info(self): r"""Averaging window and minimum number of measurements / average used. In calculating background component in :math:`\delta B` and :math:`\delta v`. """ return self._averaging_info @property def measurements(self): r"""Measurements used to calcualte mean-subtracted `data`.""" return self._measurements @property def velocity(self): r"""Velocity fluctuations (:math:`\delta v`) in Plasma's v-units.""" return self.data.loc[:, "v"] @property def v(self): r"""Shortcut for :py:attr:`velocity`""" return self.velocity @property def bfield(self): r"""B field fluctuations (:math:`\delta b`) in Alfven units.""" # return self.data.loc[:, "b"] b = self.data.loc[:, "b"] polarity = self.polarity if polarity is not None: self.logger.warning("Rectifying B") b = b.multiply(polarity, axis=0) # b = b.copy(deep=True) # b.loc[:, ["x", "y"]] = b.loc[:, ["x", "y"]].multiply(polarity, axis=0) return b @property def b(self): r"""Shortcut for :py:attr:`bfield`.""" return self.bfield @property def polarity(self): r"""Magnetic field polarity.""" return self._polarity @property def species(self): r"""Species used to create :class:`AlfvenicTurbulence`. Defines mass density in Alfven units. """ return self._species @property def z_plus(self): r""":math:`z^+` Elsasser variable.""" zp = self.v.add(self.b, axis=1) return zp @property def zp(self): r"""Shortcut for :py:attr:`z_plus`.""" return self.z_plus @property def z_minus(self): r""":math:`z^-` Elsasser variable.""" zm = self.v.subtract(self.b, axis=1) return zm @property def zm(self): r"""Shortcut for :py:attr:`z_minus`.""" return self.z_minus @property def e_plus(self): r"""Energy contained in :math:`z^+`.""" ep = 0.5 * self.zp.pow(2).sum(axis=1) return ep @property def ep(self): r"""Shortcut for :py:attr:`e_plus`.""" return self.e_plus @property def e_minus(self): r"""Energy contained in :math:`z^-`.""" em = 0.5 * self.zm.pow(2).sum(axis=1) return em @property def em(self): r"""Shortcut for :py:attr:`e_minus`.""" return self.e_minus @property def kinetic_energy(self): r"""Energy contained in velocity fluctuations :math:`\frac{1}{2}v^2`.""" ev = 0.5 * self.v.pow(2).sum(axis=1) return ev @property def ev(self): r"""Shortcut for :py:attr:`E_v = kinetic_energy`.""" return self.kinetic_energy @property def magnetic_energy(self): r"""Energy contained in magnetic field fluctuations :math:`E_b = \frac{1}{2}b^2`.""" eb = 0.5 * self.b.pow(2).sum(axis=1) return eb @property def eb(self): r"""Shortcut for :py:attr:`magnetic_energy`.""" return self.magnetic_energy @property def total_energy(self): r"""Total energy :math:`E_T = E_v + E_b`.""" return self.ev.add(self.eb, axis=0) @property def etot(self): r"""Shortcut for :py:attr:`total_energy`.""" return self.total_energy @property def residual_energy(self): r"""Residual energy :math:`E_R = E_v - E_b`.""" return self.ev.subtract(self.eb, axis=0) @property def eres(self): r"""Shortcut for :py:attr:`residual_energy`.""" return self.residual_energy @property def normalized_residual_energy(self): r"""Normalized residual energy :py:attr:`E_R/E_T`.""" return self.eres.divide(self.etot, axis=0) @property def eres_norm(self): r"""Shortcut for :py:attr:`normalized_residual_energy`.""" return self.normalized_residual_energy @property def sigma_r(self): r"""Shortcut for :py:attr:`normalized_residual_energy`.""" return self.normalized_residual_energy @property def cross_helicity(self): r"""Cross helicity :math:`\frac{1}{2} \delta v \cdot \delta b`.""" v = self.v b = self.b c = 0.5 * v.multiply(b).sum(axis=1) return c @property def normalized_cross_helicity(self): r"""Normalized cross helicity :math:`\frac{e^+ - e^-}{e^+ + e^-}`.""" ep = self.ep em = self.em num = ep.subtract(em) den = ep.add(em) out = num.divide(den) return out @property def sigma_c(self): r"""Shortcut to :py:attr:`normalized_cross_helicity`.""" return self.normalized_cross_helicity @property def alfven_ratio(self): r"""Alfv\'en ratio :math:`E_v/E_b`.""" return self.ev.divide(self.eb, axis=0) @property def rA(self): r"""Shortcut to :py:attr:`alfven_ratio`.""" return self.alfven_ratio @property def elsasser_ratio(self): r"""Elsasser ratio :math:`e^-/e^+`.""" return self.em.divide(self.ep, axis=0) @property def rE(self): r"""Shortcut to :py:attr:`elsasser_ratio`.""" return self.elsasser_ratio
[docs] def set_data( self, v_in, b_in, rho, species, raffaella_version=False, sc_vector=None, **kwargs, ): r"""Set data for the class, performing routine formatting checks. The `auto_reindex` kwarg can be set to False for batch analysis. So that, if running a large batch of analysis on the same data, one can reindex once outside of this class and avoid many unnecessary reindexing cases within it. Be sure to carefully check your reindexing so as to not introduce lots of NaNs. I ran into that bug when first writing this class. """ species = self._clean_species_for_setting(species) if not isinstance(v_in.index, pd.DatetimeIndex): raise TypeError if not isinstance(b_in.index, pd.DatetimeIndex): raise TypeError if not isinstance(rho.index, pd.DatetimeIndex): raise TypeError if not v_in.index.equals(b_in.index): self.logger.warn("v and b have unequal indices. Results may be unexpected.") if not v_in.index.equals(rho.index): self.logger.warn( """v and rho have unequal indices. Results may be unexpected.""" ) # Convert b -> Alfven units before averaging as in Bruno and Carbone # [2013], Section B.3.1. # Based on my read of Bruno and Carbone's definition in B.3.1 (p.166), # we first define the magnetic field in Alfven units. Then we calculate # averages. Note that I took the other option in my test cases in # `TS-analysis` project. (20181120) coef = self.units.b / ( # Convert b -> Alfven units. np.sqrt(self.units.rho * self.constants.misc.mu0) * self.units.v ) b_ca_units = b_in.divide(rho.pipe(np.sqrt), axis=0).multiply(coef) data = ( pd.concat({"v": v_in, "b": b_ca_units}, axis=1, names=["M"], sort=True) .sort_index(axis=1) .copy(deep=True) ) # if auto_reindex: # idx = v_in.index.union(b_in.index) # i0 = idx.min() # i1 = idx.max() + 1 # `stop` excludes `i1`, so use `i1 + 1`. # idx = pd.RangeIndex(start=i0, stop=i1, step=1) # # v = v_in.reindex(idx, axis=0) # b = b_ca_units.reindex(idx, axis=0) # # else: # v = v_in # b = b_ca_units # print("<set_data>", # "<species>: %s" % species, # "<v_in>", type(v_in), v_in, # "<v>", type(v), v, # "<rho>", type(rho), rho, # "<b_in>", type(b_in), b_in, # "<coef>: %s" % coef, # "<b>", type(b), b, # sep="\n", # end="\n\n") polarity = None if raffaella_version: self.logger.warning("Running Raffaella's version") if sc_vector is None: raise ValueError( "SC-Sun distance required to peform Raffaella's version." ) # # Drop normal component # data = data.drop("z", axis=1, level="C").copy(deep=True) # Convert GSE -> RTN data = data.multiply( pd.Series({"x": -1, "y": -1, "z": 1}), axis=1, level="C" ) # Project along nominal Parker Spiral omega = 2.865e-6 # rad/s pos = sc_vector.data.pos.copy(deep=True) r_rtn = ( pos.loc[:, ["x", "y", "z"]] .pow(2) .sum(axis=1, skipna=False) .pipe(np.sqrt) ) rho_rtn = ( pos.loc[:, ["x", "y"]].pow(2).sum(axis=1, skipna=False).pipe(np.sqrt) ) cos_colat = rho_rtn.divide(r_rtn) # cos_colat = 1 r = sc_vector.distance2sun * sc_vector.units.distance2sun * 1e-3 # [km] correction = r.multiply(cos_colat, axis=0).multiply( omega ) # [arc length speed] = [km/s] vt = data.loc[:, ("v", "y")].subtract(correction) data.loc[:, ("v", "y")] = vt # vy = data.loc[:, ("v", "y")].add(correction) # data.loc[:, ("v", "y")] = vy polarity = ( data.loc[:, "v"] .multiply(data.loc[:, "b"], axis=1) .drop("z", axis=1) .sum(axis=1) .pipe(np.sign) ) window = kwargs.pop("window", "15min") min_periods = kwargs.pop("min_periods", 5) rolled = data.rolling(window, min_periods=min_periods, **kwargs) agged = rolled.agg("mean") deltas = data.subtract(agged, axis=1) data.name = "measurements" deltas.name = "deltas" self._measurements = data self._data = deltas self._polarity = polarity self._species = species self._averaging_info = AlvenicTurbAveraging(window, min_periods)
def _clean_species_for_setting(self, species): if not isinstance(species, str): msg = "%s.species must be a single species w/ an optional `+` or `,`" raise TypeError(msg % self.__class__.__name__) if species.count(",") > 1: msg = "%s.species can contain at most one `,`\nspecies: %s" raise ValueError(msg % (self.__class__.__name__, species)) species = ",".join( ["+".join(tuple(sorted(s.split("+")))) for s in species.split(",")] ) return species
# lass AlfvenicTurbulenceDAmicis(base.Core): # r"""Handle and calculate Alfvenic turbulence quantities using the Elsasser # variables following R D'Amicis' email (20240214). # # Parameters # ---------- # velocity : pd.DataFrame, pd.Series (?) # The velocity vector in the same basis as `bfield`. # Can be a single species, a CoM species, or a differential flow. The # differential flow case is an area of curiosity for me and I do not # suggest passing it as an input. # Expect [v] = km/s (i.e. default stored in `units_constants.Units`). # bfield : pd.DataFrame, pd.Series (?) # Magnetic field vector in the same basis as `velocity`. # Expect [b] = nT (i.e. default stored in `units_contants.Units`). # rho : pd.Series # The total mass density of the plasma used to define velocity. # Expect [rho] = m_p / cm^3 (i.e. default stored in # `units_constants.Units`). # species: str # The species string. Can contain `+`. Can contain at most one `,`. # # Attributes # ---------- # data, species, z_plus, zp, z_minus, zm, e_plus, ep, # e_minus, em, total_energy, etot, # residual_energy, eres, normalized_residual_energy, eres_norm, sigma_r, # cross_helicity, normalized_cross_helicity, sigma_c, alfven_ratio, rA, # elsasser_ratio, rE # # Methods # ------- # set_data # # Notes # ----- # # """ # # def __init__(self, velocity, bfield, rho, species, sc_vector, **kwargs): # r"""Initialize an :py:class:`AlfvenicTurbulence` object. # # Parameters # ---------- # velocity: pd.DataFrame # Vector velocity measurments. # bfield: pd.DataFrame # Vector mangetic field measurements. # rho: pd.Series # Mass density measurments, used to put `bfield` into Alfven units. # kwargs: # Passed to `rolling` method when mean-subtracing in `set_data`. # """ # # print("<Module>", # # "__init__", # # sep="\n", # # end="\n") # # super(AlfvenicTurbulenceDAmicis, self).__init__() # self.set_data(velocity, bfield, rho, species, sc_vector, **kwargs) # # @property # def data(self): # r"""Mean-subtracted quantities used to calculated Elsasser variables. # """ # return self._data # # @property # def averaging_info(self): # r"""Averaging window and minimum number of measurements / average used # in calculating background component in :math:`\delta B` and :math:`\delta v`. # """ # return self._averaging_info # # @property # def measurements(self): # r"""Measurements used to calcualte mean-subtracted `data`. # """ # return self._measurements # # @property # def polarity(self): # r"""Magnetic field polarity. # """ # return self._polarity # # @property # def species(self): # r"""Species used to create `AlfvenicTurbulence`. Defines mass density in Alfven # units. # """ # return self._species # # @property # def z_plus(self): # r"""Z+ Elsasser variable. # """ # zp = self.data.loc[:, "zp"] # return zp # # @property # def zp(self): # r"""Shortcut for `AlfvenicTurbulence.z_plus`. # """ # return self.z_plus # # @property # def z_minus(self): # r"""Z- Elsasser variable. # """ # zm = self.data.loc[:, "zm"] # return zm # # @property # def zm(self): # r"""Shortcut for `AlfvenicTurbulence.z_minus`. # """ # return self.z_minus # # @property # def e_plus(self): # # I took the averages before I created the +/-z quantities in my # # previous test cases. Based on a more detailed read of Bruno and # # Carbone, I calculate +/-z before I take averages. Note that because # # I am adding v and b, the differene shouldn't matter. # ep = 0.5 * self.zp.pow(2).sum(axis=1) # return ep # # @property # def ep(self): # return self.e_plus # # @property # def e_minus(self): # em = 0.5 * self.zm.pow(2).sum(axis=1) # return em # # @property # def em(self): # return self.e_minus # # # @property # # def kinetic_energy(self): # # ev = 0.5 * self.v.pow(2).sum(axis=1) # # return ev # # # @property # # def ev(self): # # return self.kinetic_energy # # # @property # # def magnetic_energy(self): # # eb = 0.5 * self.b.pow(2).sum(axis=1) # # return eb # # # @property # # def eb(self): # # return self.magnetic_energy # # # @property # # def total_energy(self): # # return self.ev.add(self.eb, axis=0) # # # @property # # def etot(self): # # return self.total_energy # # # @property # # def residual_energy(self): # # return self.ev.subtract(self.eb, axis=0) # # # @property # # def eres(self): # # return self.residual_energy # # # @property # # def normalized_residual_energy(self): # # return self.eres.divide(self.etot, axis=0) # # # @property # # def eres_norm(self): # # return self.normalized_residual_energy # # # @property # # def sigma_r(self): # # return self.normalized_residual_energy # # # @property # # def cross_helicity(self): # # v = self.v # # b = self.b # # c = 0.5 * v.multiply(b).sum(axis=1) # # return c # # @property # def normalized_cross_helicity(self): # ep = self.ep # em = self.em # num = ep.subtract(em) # den = ep.add(em) # out = num.divide(den) # return out # # @property # def sigma_c(self): # """Normalized cross helicity. # # Returns # ------- # pd.Series # Sigma_c parameter. # """ # return self.normalized_cross_helicity # # # @property # # def alfven_ratio(self): # # return self.ev.divide(self.eb, axis=0) # # # @property # # def rA(self): # # return self.alfven_ratio # # @property # def elsasser_ratio(self): # return self.em.divide(self.ep, axis=0) # # @property # def rE(self): # """Elsasser ratio. # # Returns # ------- # pd.Series # Elsasser ratio parameter. # """ # return self.elsasser_ratio # # def set_data(self, v_in, b_in, rho, species, sc_vector, **kwargs): # r"""The `auto_reindex` kwarg can be set to False so that, if running a # large batch of analysis on the same data, one can reindex once outside # of this class and avoid many unnecessary reindexing cases within it. # # Be sure to carefully check your reindexing so as to not introduce lots # of NaNs. I ran into that bug when first writing this class. # """ # # species = self._clean_species_for_setting(species) # if not isinstance(v_in.index, pd.DatetimeIndex): # raise TypeError # if not isinstance(b_in.index, pd.DatetimeIndex): # raise TypeError # if not isinstance(rho.index, pd.DatetimeIndex): # raise TypeError # # if not v_in.index.equals(b_in.index): # self.logger.warn("v and b have unequal indices. Results may be unexpected.") # if not v_in.index.equals(rho.index): # self.logger.warn( # """v and rho have unequal indices. Results may be # unexpected.""" # ) # # auto_reindex = bool(auto_reindex) # # data = ( # pd.concat({"v": v_in, "b": b_in}, axis=1, names=["M"], sort=True) # .sort_index(axis=1) # .copy(deep=True) # ) # # # print("1", data.head().round(3), sep="\n", end="\n\n") # # # Convert GSE -> RTN # data = data.multiply(pd.Series({"x": -1, "y": -1, "z": 1}), axis=1, level="C") # # # print("2", data.head().round(3), sep="\n", end="\n\n") # # # Project along nominal Parker Spiral # omega = 2.865e-6 # rad/s # pos = sc_vector.data.pos.copy(deep=True) # r_rtn = ( # pos.loc[:, ["x", "y", "z"]].pow(2).sum(axis=1, skipna=False).pipe(np.sqrt) # ) # rho_rtn = pos.loc[:, ["x", "y"]].pow(2).sum(axis=1, skipna=False).pipe(np.sqrt) # cos_colat = rho_rtn.divide(r_rtn) # # r = sc_vector.distance2sun * sc_vector.units.distance2sun * 1e-3 # [km] # # correction = r.multiply(cos_colat, axis=0).multiply( # omega # ) # [arc length speed] = [km/s] # vt = data.loc[:, ("v", "y")].subtract(correction) # data.loc[:, ("v", "y")] = vt # # # # print("3", data.head().round(3), sep="\n", end="\n\n") # # polarity = ( # data.loc[:, "v"] # .multiply(data.loc[:, "b"], axis=1) # # .drop(["x", "y"], axis=1) # .drop("z", axis=1) # .sum(axis=1) # .pipe(np.sign) # ) # # coef = self.units.b / ( # Convert b -> Alfven units. # np.sqrt(self.units.rho * self.constants.misc.mu0) * self.units.v # ) # coef = rho.pow(-0.5).multiply(coef) # polarity_coef = polarity.multiply(coef) # # # print("Coef", polarity_coef.head(), sep="\n", end="\n\n") # # b_calc = data.loc[:, "b"].multiply(polarity_coef, axis=0) # # b_calc = data.loc[:, "b"].multiply(coef, axis=0) # v_calc = data.loc[:, "v"] # # # print("4", v_calc.head().round(3), b_calc.head().round(3),sep="\n", end="\n\n") # # zp = v_calc.add(b_calc) # zm = v_calc.subtract(b_calc) # z_raw = pd.concat({"zp": zp, "zm": zm}, axis=1).sort_index(axis=1) # # # print("5", z_raw.head().round(3), sep="\n", end="\n\n") # # window = kwargs.pop("window", "15min") # min_periods = kwargs.pop("min_periods", 5) # # rolled = z_raw.rolling(window, min_periods=min_periods, **kwargs) # agged = rolled.agg("mean") # deltas = z_raw.subtract(agged, axis=1) # # # print("6", agged.head().round(3), sep="\n", end="\n\n") # # print("y", deltas.head().round(3), sep="\n", end="\n\n") # # data.name = "measurements" # deltas.name = "deltas" # # self._measurements = data # self._data = deltas # self._polarity = polarity # self._species = species # self._averaging_info = AlvenicTurbAveraging(window, min_periods) # # def _clean_species_for_setting(self, species): # if not isinstance(species, str): # msg = "%s.species must be a single species w/ an optional `+` or `,`" # raise TypeError(msg % self.__class__.__name__) # if species.count(",") > 1: # msg = "%s.species can contain at most one `,`\nspecies: %s" # raise ValueError(msg % (self.__class__.__name__, species)) # # species = ",".join( # ["+".join(tuple(sorted(s.split("+")))) for s in species.split(",")] # ) # return species