"""Dispersion due to the solar wind."""
from warnings import warn
import astropy.constants as const
import astropy.units as u
import astropy.time
import numpy as np
import scipy.special
from pint.models.dispersion_model import Dispersion
from pint.models.parameter import floatParameter, prefixParameter
import pint.utils
from pint.models.timing_model import MissingTOAs
from pint.toa_select import TOASelect
def _dm_p_int(b, z, p):
"""Integral function for DM calculation
from https://github.com/nanograv/enterprise_extensions/blob/master/enterprise_extensions/chromatic/solar_wind.py#L299
See Figure 1 of Hazboun et al. (2022) for definitions of b, z
Parameters
----------
b : astropy.quantity.Quantity
Impact parameter
z : astropy.quantity.Quantity
distance from Earth to closest point to the Sun
p : float
power-law index
Returns
-------
astropy.quantity.Quantity
"""
return (z / b) * scipy.special.hyp2f1(
0.5, p / 2.0, 1.5, -((z**2) / b**2).decompose().value
)
def _gamma_function(p):
"""The second term in Eqn. 12 of Hazboun et al. involving Gamma functions
Note that this needs to be multiplied by b*sqrt(pi)/2
Parameters
----------
p : float
power-law index
Returns
-------
float
"""
return scipy.special.gamma(p / 2 - 0.5) / scipy.special.gamma(p / 2)
def _d_gamma_function_dp(p):
"""Derivative of the second term in Eqn. 12 of Hazboun et al. involving Gamma functions wrt p
Note that this needs to be multiplied by b*sqrt(pi)/2
Parameters
----------
p : float
power-law index
Returns
-------
float
"""
return scipy.special.gamma(p / 2 - 0.5) * scipy.special.polygamma(
0, p / 2 - 0.5
) / 2 / scipy.special.gamma(p / 2) - scipy.special.gamma(
p / 2 - 0.5
) * scipy.special.polygamma(
0, p / 2
) / 2 / scipy.special.gamma(
p / 2
)
def _hypergeom_function(b, z, p):
"""The first term in Eqn. 12 of Hazboun et al. involving hypergeometric functions
Note that this needs to be multiplied by b
Parameters
----------
b : astropy.quantity.Quantity
Impact parameter
z : astropy.quantity.Quantity
distance from Earth to closest point to the Sun
p : float
power-law index
Returns
-------
astropy.quantity.Quantity
"""
theta = np.arctan2(b, z)
return (1 / np.tan(theta)) * scipy.special.hyp2f1(
0.5, p / 2.0, 1.5, -((1 / np.tan(theta)).value ** 2)
)
def _d_hypergeom_function_dp(b, z, p):
"""Derivative of the first term in Eqn. 12 of Hazboun et al. involving hypergeometric functions wrt p
Note that this needs to be multiplied by b
Parameters
----------
b : astropy.quantity.Quantity
Impact parameter
z : astropy.quantity.Quantity
distance from Earth to closest point to the Sun
p : float
power-law index
Returns
-------
astropy.quantity.Quantity
"""
theta = np.arctan2(b, z)
x = theta.to_value(u.rad) - np.pi / 2
# the result of a order 4,4 Pade expansion of
# cot(theta) * hypergeom([1/2, p/2],[3/2],-cot(theta)**2)
# near theta=Pi/2
# in Maple:
# with(numapprox):
# with(CodeGeneration):
# Python(simplify(subs(theta = x + Pi/2, diff(pade(cot(theta)*hypergeom([1/2, p/2], [3/2], -cot(theta)^2), theta = Pi/2, [4, 4]), p))));
return (
8580
* x**3
* (
(
p**4
- 0.76e2 / 0.11e2 * p**3
+ 0.2996e4 / 0.429e3 * p**2
+ 0.5248e4 / 0.429e3 * p
- 0.1984e4 / 0.429e3
)
* x**4
+ 0.84e2
/ 0.11e2
* (p**2 - 0.115e3 / 0.39e2 * p - 0.44e2 / 0.39e2)
* (p + 4)
* x**2
+ 0.1960e4 / 0.143e3 * (p + 4) ** 2
)
/ (
39 * x**4 * p**3
- 186 * x**4 * p**2
+ 200 * x**4 * p
+ 360 * x**2 * p**2
+ 32 * x**4
- 480 * x**2 * p
- 1440 * x**2
+ 840 * p
+ 3360
)
** 2
)
def _solar_wind_geometry(r, theta, p):
"""Solar wind geometry factor (integral of path length)
For the models with variable power-law index (You et al., Hazboun et al.)
Parameters
----------
r : astropy.quantity.Quantity
Distance from the Earth to the Sun
theta : astropy.quantity.Quantity
Solar elongation
p : float
Power-law index
Returns
-------
astropy.quantity.Quantity
"""
# impact parameter
b = r * np.sin(theta)
# distance from the Earth to the impact point
z_sun = r * np.cos(theta)
# a big value for comparison
# this is what Enterprise uses
z_p = (1e14 * u.s * const.c).to(b.unit)
if p > 1:
solar_wind_geometry = (
(1 / b.to_value(u.AU)) ** p
* b
* (_dm_p_int(b, z_p, p) - _dm_p_int(b, -z_sun, p))
)
else:
raise NotImplementedError(
"Solar Wind geometry not implemented for power-law index p <= 1"
)
return solar_wind_geometry
def _d_solar_wind_geometry_d_p(r, theta, p):
"""Derivative of solar_wind_geometry (path length) wrt power-law index p
The evaluation is done using Eqn. 12 in Hazboun et al. (2022). The first term
involving hypergeometric functions (:func:`_hypergeom_function`)
has the derivative computed approximately using a Pade expansion (:func:`_d_hypergeom_function_dp`).
The second uses gamma functions (:func:`_gamma_function`) and has the derivative computed
using polygamma functions (:func:`_d_gamma_function_dp`).
"""
# impact parameter
b = r * np.sin(theta)
# distance from the Earth to the impact point
z_sun = r * np.cos(theta)
# a big value for comparison
# this is what Enterprise uses
z_p = (1e14 * u.s * const.c).to(b.unit)
if p > 1:
return (1 / b.to_value(u.AU)) ** p * (
b * _d_hypergeom_function_dp(b, z_sun, p)
+ (b * np.sqrt(np.pi) / 2) * _d_gamma_function_dp(p)
) - (1 / b.to_value(u.AU)) ** p * np.log(b.to_value(u.AU)) * (
b * _hypergeom_function(b, z_sun, p)
+ (b * np.sqrt(np.pi) / 2) * _gamma_function(p)
)
else:
return np.inf * np.ones(len(theta)) * u.pc
def _get_reference_time(
model,
params=["POSEPOCH", "PEPOCH", "DMEPOCH"],
default=astropy.time.Time(50000, format="mjd"),
):
"""Return a reference time for other calculations
Go through a list of possible times in a model, and return the first one that is not None.
If none is found, return the default.
Parameters
----------
model : pint.models.timing_model.TimingModel
params : list
Names of parameters to search through
default : astropy.time.Time
Returns
-------
astropy.time.Time
"""
for p in params:
if getattr(model, p).value is not None:
return getattr(model, p).quantity
return default
[docs]class SolarWindDispersionBase(Dispersion):
"""Abstract base class for solar wind dispersion components."""
pass
[docs]class SolarWindDispersion(SolarWindDispersionBase):
"""Dispersion due to the solar wind (basic model).
The model is a simple spherically-symmetric model that is fit
in its constant amplitude.
For ``SWM==0`` it assumes a power-law index of 2 (Edwards et al.)
For ``SWM==1`` it can have any power-law index (You et al., Hazboun et al.), which can also be fit
Parameters supported:
.. paramtable::
:class: pint.models.solar_wind_dispersion.SolarWindDispersion
References
----------
- Edwards et al. 2006, MNRAS, 372, 1549; Setion 2.5.4
- Madison et al. 2019, ApJ, 872, 150; Section 3.1.
- Hazboun et al. (2022, ApJ, 929, 39)
- You et al. (2012, MNRAS, 422, 1160)
"""
register = True
category = "solar_wind"
def __init__(self):
super().__init__()
self.add_param(
floatParameter(
name="NE_SW",
units="cm^-3",
value=0.0,
aliases=["NE1AU", "SOLARN0"],
description="Solar Wind density at 1 AU",
)
)
self.add_param(
floatParameter(
name="SWP",
value=2.0,
units="",
description="Solar Wind Model radial power-law index (only for SWM=1)",
)
)
self.add_param(
floatParameter(
name="SWM",
value=0.0,
units="",
description="Solar Wind Model (0 is from Edwards+ 2006, 1 is from You+2007,2012/Hazboun+ 2022)",
)
)
self.dm_value_funcs += [self.solar_wind_dm]
self.delay_funcs_component += [self.solar_wind_delay]
self.set_special_params(["NE_SW", "SWM", "SWP"])
[docs] def setup(self):
super().setup()
self.register_dm_deriv_funcs(self.d_dm_d_ne_sw, "NE_SW")
self.register_deriv_funcs(self.d_delay_d_ne_sw, "NE_SW")
self.register_dm_deriv_funcs(self.d_dm_d_swp, "SWP")
self.register_deriv_funcs(self.d_delay_d_swp, "SWP")
[docs] def solar_wind_geometry(self, toas):
"""Return the geometry of solar wind dispersion.
For SWM==0:
Implements the geometry part of equations 29, 30 of Edwards et al. 2006,
(i.e., without the n0, the solar wind DM amplitude part.)
Their rho is given as theta here.
rvec: radial vector from observatory to the center of the Sun
pos: pulsar position
For SWM==1:
Implements Eqn. 11 of Hazboun et al. (2022)
Parameters
----------
toas : pint.toa.TOAs
Returns
-------
astropy.quantity.Quantity
"""
swm = self.SWM.value
p = self.SWP.value
if swm == 0:
angle, r = self._parent.sun_angle(toas, also_distance=True)
rho = np.pi - angle.to_value(u.rad)
solar_wind_geometry = const.au**2.0 * rho / (r * np.sin(rho))
return solar_wind_geometry.to(u.pc)
elif swm == 1:
# get elongation angle, distance from Earth to Sun
theta, r = self._parent.sun_angle(toas, also_distance=True)
return _solar_wind_geometry(r, theta, p).to(u.pc)
else:
raise NotImplementedError(
"Solar Dispersion Delay not implemented for SWM %d" % swm
)
[docs] def solar_wind_dm(self, toas):
"""Return the solar wind dispersion measure.
SWM==0:
Uses equations 29, 30 of Edwards et al. 2006.
SWM==1:
Hazboun et al. 2022
"""
if self.NE_SW.value == 0:
return np.zeros(len(toas)) * u.pc / u.cm**3
if self.SWM.value not in [0, 1]:
raise NotImplementedError(
f"Solar Dispersion Delay not implemented for SWM {self.SWM.value}"
)
solar_wind_geometry = self.solar_wind_geometry(toas)
solar_wind_dm = self.NE_SW.quantity * solar_wind_geometry
return solar_wind_dm.to(u.pc / u.cm**3)
[docs] def solar_wind_delay(self, toas, acc_delay=None):
"""This is a wrapper function to compute solar wind dispersion delay."""
if self.NE_SW.value == 0:
return np.zeros(len(toas)) * u.s
return self.dispersion_type_delay(toas)
[docs] def d_dm_d_ne_sw(self, toas, param_name, acc_delay=None):
"""Derivative of of DM wrt the solar wind ne amplitude."""
if self.SWM.value in [0, 1]:
solar_wind_geometry = self.solar_wind_geometry(toas)
else:
raise NotImplementedError(
"Solar Dispersion Delay not implemented for SWM %d" % self.SWM.value
)
return solar_wind_geometry
def d_delay_d_ne_sw(self, toas, param_name, acc_delay=None):
try:
bfreq = self._parent.barycentric_radio_freq(toas)
except AttributeError:
warn("Using topocentric frequency for dedispersion!")
bfreq = toas.table["freq"]
deriv = self.d_delay_d_dmparam(toas, "NE_SW")
deriv[bfreq < 1.0 * u.MHz] = 0.0
return deriv
[docs] def d_solar_wind_geometry_d_swp(self, toas, param_name, acc_delay=None):
"""Derivative of solar_wind_geometry (path length) wrt power-law index p
The evaluation is done using Eqn. 12 in Hazboun et al. (2022). The first term
involving hypergeometric functions (:func:`_hypergeom_function`)
has the derivative computed approximately using a Pade expansion (:func:`_d_hypergeom_function_dp`).
The second uses gamma functions (:func:`_gamma_function`) and has the derivative computed
using polygamma functions (:func:`_d_gamma_function_dp`).
"""
if self.SWM.value == 0:
raise ValueError(
"Solar Wind power-law index not valid for SWM %d" % self.SWM.value
)
elif self.SWM.value == 1:
# get elongation angle, distance from Earth to Sun
theta, r = self._parent.sun_angle(toas, also_distance=True)
return _d_solar_wind_geometry_d_p(r, theta, self.SWP.value)
else:
raise NotImplementedError(
"Solar Dispersion Delay not implemented for SWM %d" % self.SWM.value
)
def d_dm_d_swp(self, toas, param_name, acc_delay=None):
d_geometry_dp = self.d_solar_wind_geometry_d_swp(
toas, param_name, acc_delay=acc_delay
)
return self.NE_SW.quantity * d_geometry_dp
def d_delay_d_swp(self, toas, param_name, acc_delay=None):
try:
bfreq = self._parent.barycentric_radio_freq(toas)
except AttributeError:
warn("Using topocentric frequency for dedispersion!")
bfreq = toas.table["freq"]
deriv = self.d_delay_d_dmparam(toas, "SWP")
deriv[bfreq < 1.0 * u.MHz] = 0.0
return deriv
[docs] def print_par(self, format="pint"):
result = ""
result += getattr(self, "NE_SW").as_parfile_line(format=format)
result += getattr(self, "SWM").as_parfile_line(format=format)
if self.SWM.value == 1:
result += getattr(self, "SWP").as_parfile_line(format=format)
return result
[docs] def get_max_dm(self):
"""Return approximate maximum DM from the Solar Wind (at conjunction)
Simplified model that assumes a circular orbit
Returns
-------
astropy.quantity.Quantity
"""
coord = self._parent.get_psr_coords()
t0 = _get_reference_time(self._parent)
# time and distance of conjunction
t0, elongation = pint.utils.get_conjunction(coord, t0, precision="high")
if self.SWM.value == 0:
r = 1 * u.AU
rho = (180 * u.deg - elongation).to(u.rad)
return (
self.NE_SW.quantity * (const.au**2.0 * rho / (r * np.sin(rho)))
).to(u.pc / u.cm**3, equivalencies=u.dimensionless_angles())
elif self.SWM.value == 1:
p = self.SWP.value
theta = elongation
r = 1 * u.AU
return (_solar_wind_geometry(r, theta, p) * self.NE_SW.quantity).to(
u.pc / u.cm**3
)
else:
raise NotImplementedError(
"Solar Dispersion Delay not implemented for SWM %d" % self.SWM.value
)
[docs] def get_min_dm(self):
"""Return approximate minimum DM from the Solar Wind (180deg away from conjunction)
Simplified model that assumes a circular orbit
Returns
-------
astropy.quantity.Quantity
"""
coord = self._parent.get_psr_coords()
t0 = _get_reference_time(self._parent)
# time and distance of conjunction
t0, elongation = pint.utils.get_conjunction(coord, t0, precision="high")
if self.SWM.value == 0:
r = 1 * u.AU
rho = (elongation).to(u.rad)
return (
self.NE_SW.quantity * (const.au**2.0 * rho / (r * np.sin(rho)))
).to(u.pc / u.cm**3, equivalencies=u.dimensionless_angles())
elif self.SWM.value == 1:
p = self.SWP.value
# for the min
theta = 180 * u.deg - elongation
r = 1 * u.AU
return (_solar_wind_geometry(r, theta, p) * self.NE_SW.quantity).to(
u.pc / u.cm**3
)
else:
raise NotImplementedError(
"Solar Dispersion Delay not implemented for SWM %d" % self.SWM.value
)
[docs]class SolarWindDispersionX(SolarWindDispersionBase):
"""This class provides a SWX model - multiple Solar Wind segments.
This model lets the user specify time ranges and fit for a different
SWXDM (max solar wind DM) value and SWXP (radial power-law index) in each time range.
The default radial power-law index value of 2 corresponds to the Edwards et al. model.
Other values are for the You et al./Hazboun et al. model.
Note that unlike the standard model, this model goes to 0 at opposition (the minimum).
So it really represents a Delta DM. This is to make it easier to join multiple segments.
However, to get the peak to still be the requested max DM the values are scaled compared
to the standard model: the standard model goes from opposition (min) to conjunction (max),
while this model goes from 0 to conjunction (max), so the scaling is ``((conjunction - opposition)/conjuction)``.
See `Solar Wind Examples <examples/solar_wind.html>`_.
To compare against a standard model:
Example
-------
>>> swxmodel.SWXDM_0001.quantity = model.get_max_dm() * swxmodel.get_swscalings()[0]
>>> np.allclose(swxmodel.get_min_dms(),model.get_min_dm()*swxmodel.get_swscalings())
>>> toas = pint.simulation.make_fake_toas_uniform(54000, 54000 + 2 * 365, 500, model=model, obs="gbt")
>>> np.allclose((swxmodel.swx_dm(toas) + model.get_min_dm()), model.solar_wind_dm(toas))
Parameters supported:
.. paramtable::
:class: pint.models.solar_wind_dispersion.SolarWindDispersionX
References
----------
- Edwards et al. 2006, MNRAS, 372, 1549; Setion 2.5.4
- Madison et al. 2019, ApJ, 872, 150; Section 3.1.
- Hazboun et al. (2022, ApJ, 929, 39)
- You et al. (2012, MNRAS, 422, 1160)
"""
register = True
category = "solar_windx"
def __init__(self):
super().__init__()
self.add_swx_range(None, None, swxdm=0, swxp=2, frozen=False, index=1)
self.set_special_params(["SWXDM_0001", "SWXP_0001", "SWXR1_0001", "SWXR2_0001"])
self.dm_value_funcs += [self.swx_dm]
self.delay_funcs_component += [self.swx_delay]
self._theta0 = None
[docs] def solar_wind_geometry(self, toas, p):
"""Return the geometry of solar wind dispersion (integrated path-length)
Implements Eqn. 11 of Hazboun et al. (2022)
Parameters
----------
toas : pint.toa.TOAs
p : float
Radial power-law index
Returns
-------
astropy.quantity.Quantity
"""
# get elongation angle, distance from Earth to Sun
theta, r = self._parent.sun_angle(toas, also_distance=True)
return _solar_wind_geometry(r, theta, p).to(u.pc)
[docs] def conjunction_solar_wind_geometry(self, p):
"""Return the geometry (integrated path-length) of solar wind dispersion at conjunction
Implements Eqn. 11 of Hazboun et al. (2022)
Parameters
----------
p : float
Radial power-law index
Returns
-------
astropy.quantity.Quantity
"""
r0 = 1 * u.AU
return _solar_wind_geometry(r0, self.theta0, p).to(u.pc)
[docs] def opposition_solar_wind_geometry(self, p):
"""Return the geometry (integrated path-length) of solar wind dispersion at opposition
Implements Eqn. 11 of Hazboun et al. (2022)
Parameters
----------
p : float
Radial power-law index
Returns
-------
astropy.quantity.Quantity
"""
r0 = 1 * u.AU
return _solar_wind_geometry(r0, 180 * u.deg - self.theta0, p).to(u.pc)
[docs] def d_solar_wind_geometry_d_swxp(self, toas, p):
"""Derivative of solar_wind_geometry (path length) wrt power-law index p
The evaluation is done using Eqn. 12 in Hazboun et al. (2022). The first term
involving hypergeometric functions (:func:`_hypergeom_function`)
has the derivative computed approximately using a Pade expansion (:func:`_d_hypergeom_function_dp`).
The second uses gamma functions (:func:`_gamma_function`) and has the derivative computed
using polygamma functions (:func:`_d_gamma_function_dp`).
Parameters
----------
toas : pint.toa.TOAs
p : float, optional
Radial power-law index
Returns
-------
astropy.quantity.Quantity
"""
# get elongation angle, distance from Earth to Sun
theta, r = self._parent.sun_angle(toas, also_distance=True)
return _d_solar_wind_geometry_d_p(r, theta, p)
[docs] def d_conjunction_solar_wind_geometry_d_swxp(self, p):
"""Derivative of conjunction_solar_wind_geometry (path length) wrt power-law index p
The evaluation is done using Eqn. 12 in Hazboun et al. (2022). The first term
involving hypergeometric functions (:func:`_hypergeom_function`)
has the derivative computed approximately using a Pade expansion (:func:`_d_hypergeom_function_dp`).
The second uses gamma functions (:func:`_gamma_function`) and has the derivative computed
using polygamma functions (:func:`_d_gamma_function_dp`).
"""
# fiducial values
r0 = 1 * u.AU
return _d_solar_wind_geometry_d_p(r0, self.theta0, p)
[docs] def d_opposition_solar_wind_geometry_d_swxp(self, p):
"""Derivative of opposition_solar_wind_geometry (path length) wrt power-law index p
The evaluation is done using Eqn. 12 in Hazboun et al. (2022). The first term
involving hypergeometric functions (:func:`_hypergeom_function`)
has the derivative computed approximately using a Pade expansion (:func:`_d_hypergeom_function_dp`).
The second uses gamma functions (:func:`_gamma_function`) and has the derivative computed
using polygamma functions (:func:`_d_gamma_function_dp`).
"""
# fiducial values
r0 = 1 * u.AU
return _d_solar_wind_geometry_d_p(r0, 180 * u.deg - self.theta0, p)
[docs] def add_swx_range(
self, mjd_start, mjd_end, index=None, swxdm=0, swxp=2, frozen=True
):
"""Add SWX range to a dispersion model with specified start/end MJD, SWXDM, and power-law index
Parameters
----------
mjd_start : float or astropy.quantity.Quantity or astropy.time.Time
MJD for beginning of DMX event.
mjd_end : float or astropy.quantity.Quantity or astropy.time.Time
MJD for end of DMX event.
index : int, None
Integer label for DMX event. If None, will increment largest used index by 1.
swxdm : float or astropy.quantity.Quantity
Max solar wind DM
swxp : float or astropy.quantity.Quantity
Solar wind power-law index
frozen : bool
Indicates whether SWXDM and SWXP will be fit.
Returns
-------
index : int
Index that has been assigned to new SWX event.
"""
#### Setting up the SWX title convention. If index is None, want to increment the current max SWX index by 1.
if index is None:
dct = self.get_prefix_mapping_component("SWXDM_")
index = np.max(list(dct.keys())) + 1
i = f"{int(index):04d}"
if mjd_end is not None and mjd_start is not None:
if mjd_end < mjd_start:
raise ValueError("Starting MJD is greater than ending MJD.")
elif mjd_start != mjd_end:
raise ValueError("Only one MJD bound is set.")
if int(index) in self.get_prefix_mapping_component("SWXDM_"):
raise ValueError(
f"Index '{index}' is already in use in this model. Please choose another."
)
if isinstance(swxdm, u.quantity.Quantity):
swxdm = swxdm.to_value(u.pc / u.cm**3)
if isinstance(mjd_start, astropy.time.Time):
mjd_start = mjd_start.mjd
elif isinstance(mjd_start, u.quantity.Quantity):
mjd_start = mjd_start.value
if isinstance(mjd_end, astropy.time.Time):
mjd_end = mjd_end.mjd
elif isinstance(mjd_end, u.quantity.Quantity):
mjd_end = mjd_end.value
if isinstance(swxp, u.quantity.Quantity):
swxp = swxp.value
self.add_param(
prefixParameter(
name=f"SWXDM_{i}",
units="pc cm^-3",
value=swxdm,
description="Max Solar Wind DM",
parameter_type="float",
frozen=frozen,
)
)
self.add_param(
prefixParameter(
name=f"SWXP_{i}",
value=swxp,
description="Solar wind power-law index",
parameter_type="float",
frozen=frozen,
)
)
self.add_param(
prefixParameter(
name=f"SWXR1_{i}",
units="MJD",
description="Beginning of SWX interval",
parameter_type="MJD",
time_scale="utc",
value=mjd_start,
)
)
self.add_param(
prefixParameter(
name=f"SWXR2_{i}",
units="MJD",
description="End of SWX interval",
parameter_type="MJD",
time_scale="utc",
value=mjd_end,
)
)
self.setup()
self.validate()
return index
[docs] def remove_swx_range(self, index):
"""Removes all SWX parameters associated with a given index/list of indices.
Parameters
----------
index : float, int, list, np.ndarray
Number or list/array of numbers corresponding to SWX indices to be removed from model.
"""
if isinstance(index, (int, float, np.int64)):
indices = [index]
elif isinstance(index, (list, np.ndarray)):
indices = index
else:
raise TypeError(
f"index must be a float, int, list, or array - not {type(index)}"
)
for index in indices:
index_rf = f"{int(index):04d}"
for prefix in ["SWXDM_", "SWXP_", "SWXR1_", "SWXR2_"]:
self.remove_param(prefix + index_rf)
self.validate()
[docs] def get_indices(self):
"""Returns an array of integers corresponding to SWX parameters.
Returns
-------
inds : np.ndarray
Array of SWX indices in model.
"""
inds = [int(p.split("_")[-1]) for p in self.params if "SWXDM_" in p]
return np.array(inds)
[docs] def setup(self):
super().setup()
# Get SWX mapping.
# Register the SWX derivatives
for prefix_par in self.get_params_of_type("prefixParameter"):
if prefix_par.startswith("SWXDM_"):
# check to make sure power-law index is present
# if not, put in default
p_name = f"SWXP_{pint.utils.split_prefixed_name(prefix_par)[1]}"
if not hasattr(self, p_name):
self.add_param(
prefixParameter(
name=p_name,
value=2,
description="Solar wind power-law index",
parameter_type="float",
)
)
self.register_deriv_funcs(self.d_delay_d_swxdm, prefix_par)
self.register_dm_deriv_funcs(self.d_dm_d_swxdm, prefix_par)
elif prefix_par.startswith("SWXP_"):
self.register_deriv_funcs(self.d_delay_d_swxp, prefix_par)
self.register_dm_deriv_funcs(self.d_dm_d_swxp, prefix_par)
[docs] def validate(self):
"""Validate the SWX parameters."""
super().validate()
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
SWXR1_mapping = self.get_prefix_mapping_component("SWXR1_")
SWXR2_mapping = self.get_prefix_mapping_component("SWXR2_")
if SWXDM_mapping.keys() != SWXP_mapping.keys():
# FIXME: report mismatch
raise ValueError(
"SWXDM_ parameters do not "
"match SWXP_ parameters. "
"Please check your prefixed parameters."
)
if SWXDM_mapping.keys() != SWXR1_mapping.keys():
# FIXME: report mismatch
raise ValueError(
"SWXDM_ parameters do not "
"match SWXR1_ parameters. "
"Please check your prefixed parameters."
)
if SWXDM_mapping.keys() != SWXR2_mapping.keys():
raise ValueError(
"SWXDM_ parameters do not "
"match SWXR2_ parameters. "
"Please check your prefixed parameters."
)
[docs] def get_theta0(self):
"""Get elongation at conjunction
Returns
-------
astropy.quantity.Quantity
"""
coord = self._parent.get_psr_coords()
t0 = _get_reference_time(self._parent)
t0, elongation = pint.utils.get_conjunction(coord, t0, precision="high")
# approximate elongation at conjunction
self._theta0 = elongation
@property
def theta0(self):
if self._theta0 is None:
self.get_theta0()
return self._theta0
[docs] def validate_toas(self, toas):
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
SWXR1_mapping = self.get_prefix_mapping_component("SWXR1_")
SWXR2_mapping = self.get_prefix_mapping_component("SWXR2_")
bad_parameters = []
for k in SWXR1_mapping.keys():
if self._parent[SWXDM_mapping[k]].frozen:
continue
b = self._parent[SWXR1_mapping[k]].quantity.mjd * u.d
e = self._parent[SWXR2_mapping[k]].quantity.mjd * u.d
mjds = toas.get_mjds()
n = np.sum((b <= mjds) & (mjds < e))
if n == 0:
bad_parameters.append(SWXDM_mapping[k])
if bad_parameters:
raise MissingTOAs(bad_parameters)
[docs] def swx_dm(self, toas):
"""Return solar wind Delta DM for given TOAs"""
condition = {}
p = {}
tbl = toas.table
if not hasattr(self, "swx_toas_selector"):
self.swx_toas_selector = TOASelect(is_range=True)
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
SWXR1_mapping = self.get_prefix_mapping_component("SWXR1_")
SWXR2_mapping = self.get_prefix_mapping_component("SWXR2_")
for epoch_ind in SWXDM_mapping.keys():
r1 = getattr(self, SWXR1_mapping[epoch_ind]).quantity
r2 = getattr(self, SWXR2_mapping[epoch_ind]).quantity
condition[SWXDM_mapping[epoch_ind]] = (r1.mjd, r2.mjd)
p[SWXDM_mapping[epoch_ind]] = getattr(self, SWXP_mapping[epoch_ind]).value
select_idx = self.swx_toas_selector.get_select_index(
condition, tbl["mjd_float"]
)
# Get SWX delays
dm = np.zeros(len(tbl)) * self._parent.DM.units
for k, v in select_idx.items():
if len(v) > 0:
dmmax = getattr(self, k).quantity
dm[v] += (
dmmax
* (
(
self.solar_wind_geometry(toas[v], p=p[k])
- self.opposition_solar_wind_geometry(p[k])
)
/ (
self.conjunction_solar_wind_geometry(p[k])
- self.opposition_solar_wind_geometry(p[k])
)
)
).to(u.pc / u.cm**3)
return dm
[docs] def swx_delay(self, toas, acc_delay=None):
"""This is a wrapper function for interacting with the TimingModel class"""
return self.dispersion_type_delay(toas)
def d_dm_d_swxdm(self, toas, param_name, acc_delay=None):
condition = {}
tbl = toas.table
if not hasattr(self, "swx_toas_selector"):
self.swx_toas_selector = TOASelect(is_range=True)
param = getattr(self, param_name)
swx_index = param.index
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
SWXR1_mapping = self.get_prefix_mapping_component("SWXR1_")
SWXR2_mapping = self.get_prefix_mapping_component("SWXR2_")
p = getattr(self, SWXP_mapping[swx_index]).value
r1 = getattr(self, SWXR1_mapping[swx_index]).quantity
r2 = getattr(self, SWXR2_mapping[swx_index]).quantity
condition = {param_name: (r1.mjd, r2.mjd)}
select_idx = self.swx_toas_selector.get_select_index(
condition, tbl["mjd_float"]
)
deriv = np.zeros(len(tbl)) * u.dimensionless_unscaled
for k, v in select_idx.items():
if len(v) > 0:
deriv[v] += (
self.solar_wind_geometry(toas[v], p=p)
- self.opposition_solar_wind_geometry(p)
) / (
self.conjunction_solar_wind_geometry(p)
- self.opposition_solar_wind_geometry(p)
)
return deriv
def d_delay_d_swxdm(self, toas, param_name, acc_delay=None):
try:
bfreq = self._parent.barycentric_radio_freq(toas)
except AttributeError:
warn("Using topocentric frequency for dedispersion!")
bfreq = toas.table["freq"]
deriv = self.d_delay_d_dmparam(toas, param_name)
deriv[bfreq < 1.0 * u.MHz] = 0.0
return deriv
def d_dm_d_swxp(self, toas, param_name, acc_delay=None):
condition = {}
tbl = toas.table
# still use the SWX selectors
if not hasattr(self, "swx_toas_selector"):
self.swx_toas_selector = TOASelect(is_range=True)
param = getattr(self, param_name)
swxp_index = param.index
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
SWXR1_mapping = self.get_prefix_mapping_component("SWXR1_")
SWXR2_mapping = self.get_prefix_mapping_component("SWXR2_")
swxdm = getattr(self, SWXDM_mapping[swxp_index]).quantity
p = getattr(self, SWXP_mapping[swxp_index]).value
r1 = getattr(self, SWXR1_mapping[swxp_index]).quantity
r2 = getattr(self, SWXR2_mapping[swxp_index]).quantity
swx_name = f"SWXDM_{pint.utils.split_prefixed_name(param_name)[1]}"
condition = {swx_name: (r1.mjd, r2.mjd)}
select_idx = self.swx_toas_selector.get_select_index(
condition, tbl["mjd_float"]
)
deriv = np.zeros(len(tbl)) * u.pc / u.cm**3
for k, v in select_idx.items():
if len(v) > 0:
geometry = self.solar_wind_geometry(toas[v], p)
conjunction_geometry = self.conjunction_solar_wind_geometry(p)
opposition_geometry = self.opposition_solar_wind_geometry(p)
d_geometry_dp = self.d_solar_wind_geometry_d_swxp(toas[v], p)
d_conjunction_geometry_dp = (
self.d_conjunction_solar_wind_geometry_d_swxp(p)
)
d_opposition_geometry_dp = self.d_opposition_solar_wind_geometry_d_swxp(
p
)
deriv[v] += swxdm * (
(d_geometry_dp - d_opposition_geometry_dp)
/ (conjunction_geometry - opposition_geometry)
- (geometry - opposition_geometry)
* (d_conjunction_geometry_dp - d_opposition_geometry_dp)
/ (conjunction_geometry - opposition_geometry) ** 2
)
return deriv
def d_delay_d_swxp(self, toas, param_name, acc_delay=None):
try:
bfreq = self._parent.barycentric_radio_freq(toas)
except AttributeError:
warn("Using topocentric frequency for dedispersion!")
bfreq = toas.table["freq"]
deriv = self.d_delay_d_dmparam(toas, param_name)
deriv[bfreq < 1.0 * u.MHz] = 0.0
return deriv
[docs] def print_par(self, format="pint"):
result = ""
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
SWXR1_mapping = self.get_prefix_mapping_component("SWXR1_")
SWXR2_mapping = self.get_prefix_mapping_component("SWXR2_")
sorted_list = sorted(SWXDM_mapping.keys())
for ii in sorted_list:
result += getattr(self, SWXDM_mapping[ii]).as_parfile_line(format=format)
result += getattr(self, SWXP_mapping[ii]).as_parfile_line(format=format)
result += getattr(self, SWXR1_mapping[ii]).as_parfile_line(format=format)
result += getattr(self, SWXR2_mapping[ii]).as_parfile_line(format=format)
return result
[docs] def get_swscalings(self):
"""Return the approximate scaling between the SWX model and the standard model
Returns
-------
np.ndarray"""
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
sorted_list = sorted(SWXDM_mapping.keys())
scalings = np.zeros(len(sorted_list))
for j, ii in enumerate(sorted_list):
swxdm = getattr(self, SWXDM_mapping[ii]).quantity
p = getattr(self, SWXP_mapping[ii]).value
scalings[j] = (
self.conjunction_solar_wind_geometry(p)
- self.opposition_solar_wind_geometry(p)
) / self.conjunction_solar_wind_geometry(p)
return scalings
[docs] def get_max_dms(self):
"""Return approximate maximum DMs for each segment from the Solar Wind (at conjunction)
Simplified model that assumes a circular orbit
Returns
-------
astropy.quantity.Quantity
"""
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
sorted_list = sorted(SWXDM_mapping.keys())
dms = np.zeros(len(sorted_list)) * u.pc / u.cm**3
for j, ii in enumerate(sorted_list):
swxdm = getattr(self, SWXDM_mapping[ii]).quantity
dms[j] = (swxdm).to(u.pc / u.cm**3)
return dms
[docs] def get_min_dms(self):
"""Return approximate minimum DMs for each segment from the Solar Wind (at opposition).
Simplified model that assumes a circular orbit
Note that this value has been subtracted off of the model
Returns
-------
astropy.quantity.Quantity
"""
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
sorted_list = sorted(SWXDM_mapping.keys())
dms = np.zeros(len(sorted_list)) * u.pc / u.cm**3
for j, ii in enumerate(sorted_list):
swxdm = getattr(self, SWXDM_mapping[ii]).quantity
p = getattr(self, SWXP_mapping[ii]).value
dms[j] = (
swxdm
* self.opposition_solar_wind_geometry(p)
/ self.conjunction_solar_wind_geometry(p)
).to(u.pc / u.cm**3)
return dms
[docs] def get_ne_sws(self):
"""Return Solar Wind electron densities at 1 AU for each segment
Simplified model that assumes a circular orbit
Returns
-------
astropy.quantity.Quantity
"""
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
sorted_list = sorted(SWXDM_mapping.keys())
ne_sws = np.zeros(len(sorted_list)) * u.cm**-3
for j, ii in enumerate(sorted_list):
swxdm = getattr(self, SWXDM_mapping[ii]).quantity
p = getattr(self, SWXP_mapping[ii]).value
ne_sws[j] = (swxdm / self.conjunction_solar_wind_geometry(p)).to(u.cm**-3)
return ne_sws
[docs] def set_ne_sws(self, ne_sws):
"""Set the DMMAXs based on an input NE_SW values (electron density at 1 AU)
Parameters
----------
ne_sws : astropy.quantity.Quantity
Desired NE_SWs (should be scalar or the same length as the number of segments)
Raises
------
ValueError : If length of input does ont match number of segments
"""
ne_sws = np.atleast_1d(ne_sws)
SWXDM_mapping = self.get_prefix_mapping_component("SWXDM_")
SWXP_mapping = self.get_prefix_mapping_component("SWXP_")
sorted_list = sorted(SWXDM_mapping.keys())
if len(ne_sws) == 1:
ne_sws = ne_sws[0] * np.ones(len(sorted_list))
if len(sorted_list) != len(ne_sws):
raise ValueError(
f"Length of input NE_SW values ({len(ne_sws)}) must match number of SWX segments ({len(sorted_list)})"
)
for j, ii in enumerate(sorted_list):
p = getattr(self, SWXP_mapping[ii]).value
getattr(self, SWXDM_mapping[ii]).quantity = (
self.conjunction_solar_wind_geometry(p) * ne_sws[j]
).to(u.pc / u.cm**3)