"""Timing model objects.
Defines the basic timing model interface classes.
A PINT timing model will be an instance of
:class:`~pint.models.timing_model.TimingModel`. It will have a number of
"components", each an instance of a subclass of
:class:`~pint.models.timing_model.Component`. These components each
implement some part of the timing model, whether astrometry (for
example :class:`~pint.models.astrometry.AstrometryEcliptic`), noise
modelling (for example :class:`~pint.models.noise_model.ScaleToaError`),
interstellar dispersion (for example
:class:`~pint.models.dispersion_model.DispersionDM`), or pulsar binary orbits.
This last category is somewhat unusual in that the code for each model is
divided into a PINT-facing side (for example
:class:`~pint.models.binary_bt.BinaryBT`) and an internal model that does the
actual computation (for example
:class:`~pint.models.stand_alone_psr_binaries.BT_model.BTmodel`); the management of
data passing between these two parts is carried out by
:class:`~pint.models.pulsar_binary.PulsarBinary` and
:class:`~pint.models.stand_alone_psr_binaries.binary_generic.PSR_BINARY`.
To actually create a timing model, you almost certainly want to use
:func:`~pint.models.model_builder.get_model`.
See :ref:`Timing Models` for more details on how PINT's timing models work.
"""
import abc
import copy
import inspect
import contextlib
from collections import OrderedDict, defaultdict
from functools import wraps
from warnings import warn
from uncertainties import ufloat
import astropy.time as time
from astropy import units as u, constants as c
import numpy as np
from astropy.utils.decorators import lazyproperty
import astropy.coordinates as coords
from pint.pulsar_ecliptic import OBL, PulsarEcliptic
from scipy.optimize import brentq
from loguru import logger as log
import pint
from pint.models.parameter import (
_parfile_formats,
AngleParameter,
MJDParameter,
Parameter,
boolParameter,
floatParameter,
funcParameter,
intParameter,
maskParameter,
strParameter,
prefixParameter,
)
from pint.phase import Phase
from pint.toa import TOAs
from pint.utils import (
PrefixError,
split_prefixed_name,
open_or_use,
colorize,
xxxselections,
)
from pint.derived_quantities import dispersion_slope
__all__ = [
"DEFAULT_ORDER",
"TimingModel",
"Component",
"AllComponents",
"TimingModelError",
"MissingParameter",
"MissingTOAs",
"MissingBinaryError",
"UnknownBinaryModel",
]
# Parameters or lines in par files we don't understand but shouldn't
# complain about. These are still passed to components so that they
# can use them if they want to.
#
# Other unrecognized parameters produce warnings and possibly indicate
# errors in the par file.
#
# Comparisons with keywords in par file lines is done in a case insensitive way.
ignore_params = {
# "TRES",
"TZRMJD",
"TZRFRQ",
"TZRSITE",
"NITS",
"IBOOT",
# "CHI2R",
"MODE",
"PLANET_SHAPIRO2",
}
ignore_prefix = {"DMXF1_", "DMXF2_", "DMXEP_"}
DEFAULT_ORDER = [
"astrometry",
"jump_delay",
"troposphere",
"solar_system_shapiro",
"solar_wind",
"dispersion_constant",
"dispersion_dmx",
"dispersion_jump",
"pulsar_system",
"frequency_dependent",
"absolute_phase",
"spindown",
"phase_jump",
"wave",
"wavex",
]
[docs]class MissingTOAs(ValueError):
"""Some parameter does not describe any TOAs."""
def __init__(self, parameter_names):
if isinstance(parameter_names, str):
parameter_names = [parameter_names]
if len(parameter_names) == 1:
msg = f"Parameter {parameter_names[0]} does not correspond to any TOAs: you might need to run `model.find_empty_masks(toas, freeze=True)`"
elif len(parameter_names) > 1:
msg = f"Parameters {' '.join(parameter_names)} do not correspond to any TOAs: you might need to run `model.find_empty_masks(toas, freeze=True)`"
else:
raise ValueError("Incorrect attempt to construct MissingTOAs")
super().__init__(msg)
self.parameter_names = parameter_names
[docs]class PropertyAttributeError(ValueError):
pass
[docs]def property_exists(f):
"""Mark a function as a property but handle AttributeErrors.
Normal @property has the unfortunate feature that if the called function
should accidentally emit an AttributeError, if __getattr__ is in use, this
will be reported as if the attribute does not exist. With this decorator
instead, the AttributeError will be caught and re-raised as a specific kind
of ValueError, so it will be treated like an error and the backtrace printed.
"""
@property
@wraps(f)
def wrapper(self):
try:
return f(self)
except AttributeError as e:
raise PropertyAttributeError(
f"Property {f} raised AttributeError internally"
) from e
return wrapper
[docs]class TimingModel:
"""Timing model object built from Components.
This object is the primary object to represent a timing model in PINT. It
is normally constructed with :func:`~pint.models.model_builder.get_model`,
and it contains a variety of :class:`~pint.models.timing_model.Component`
objects, each representing a
physical process that either introduces delays in the pulse arrival time or
introduces shifts in the pulse arrival phase. These components have
parameters, described by :class:`~pint.models.parameter.Parameter` objects,
and methods. Both the parameters and the methods are accessible through
this object using attribute access, for example as ``model.F0`` or
``model.coords_as_GAL()``.
Components in a TimingModel objects are accessible through the
``model.components`` property, using their class name to index the
TimingModel, as ``model.components["Spindown"]``. They can be added and
removed with methods on this object, and for many of them additional
parameters in families (``DMXEP_1234``) can be added.
Parameters in a TimingModel object are listed in the ``model.params`` object.
Each Parameter can be set as free or frozen using its ``.frozen`` attribute,
and a list of the free parameters is available through the ``model.free_params``
property; this can also be used to set which parameters are free. Several methods
are available to get and set some or all parameters in the forms of dictionaries.
TimingModel objects also support a number of functions for computing
various things like orbital phase, and barycentric versions of TOAs,
as well as the various derivatives and matrices needed to support fitting.
TimingModel objects forward attribute lookups to their components, so
that you can access any method or attribute (in particular Parameters)
of any Component directly on the TimingModel object, for example as
``model.F0``.
TimingModel objects can be written out to ``.par`` files using
:func:`pint.models.timing_model.TimingModel.write_parfile` or .
:func:`pint.models.timing_model.TimingModel.as_parfile`::
>>> model.write_parfile("output.par")
PINT Parameters supported (here, rather than in any Component):
.. paramtable::
:class: pint.models.timing_model.TimingModel
Parameters
----------
name: str, optional
The name of the timing model.
components: list of Component, optional
The model components for timing model.
Notes
-----
PINT models pulsar pulse time of arrival at observer from its emission process and
propagation to observer. Emission generally modeled as pulse 'Phase' and propagation.
'time delay'. In pulsar timing different astrophysics phenomenons are separated to
time model components for handling a specific emission or propagation effect.
Each timing model component generally requires the following parts:
- Timing Parameters
- Delay/Phase functions which implements the time delay and phase.
- Derivatives of delay and phase respect to parameter for fitting toas.
Each timing parameters are stored as TimingModel attribute in the type of
:class:`~pint.models.parameter.Parameter` delay or phase and its derivatives
are implemented as TimingModel Methods.
Attributes
----------
name : str
The name of the timing model
component_types : list
A list of the distinct categories of component. For example,
delay components will be register as 'DelayComponent'.
top_level_params : list
Names of parameters belonging to the TimingModel as a whole
rather than to any particular component.
"""
def __init__(self, name="", components=[]):
if not isinstance(name, str):
raise ValueError(
"First parameter should be the model name, was {!r}".format(name)
)
self.name = name
self.component_types = []
self.top_level_params = []
self.add_param_from_top(
strParameter(
name="PSR", description="Source name", aliases=["PSRJ", "PSRB"]
),
"",
)
self.add_param_from_top(
strParameter(name="TRACK", description="Tracking Information"), ""
)
self.add_param_from_top(
strParameter(name="EPHEM", description="Ephemeris to use"), ""
)
self.add_param_from_top(
strParameter(name="CLOCK", description="Timescale to use", aliases=["CLK"]),
"",
)
self.add_param_from_top(
strParameter(name="UNITS", description="Units (TDB assumed)"), ""
)
self.add_param_from_top(
MJDParameter(name="START", description="Start MJD for fitting"), ""
)
self.add_param_from_top(
MJDParameter(name="FINISH", description="End MJD for fitting"), ""
)
self.add_param_from_top(
floatParameter(
name="RM", description="Rotation measure", units=u.radian / u.m**2
),
"",
)
self.add_param_from_top(
strParameter(
name="INFO",
description="Tells TEMPO to write some extra information about frontend/backend combinations; -f is recommended",
),
"",
)
self.add_param_from_top(
strParameter(
name="TIMEEPH",
description="Time ephemeris to use for TDB conversion; for PINT, always FB90",
),
"",
)
self.add_param_from_top(
strParameter(
name="T2CMETHOD",
description="Method for transforming from terrestrial to celestial frame (IAU2000B/TEMPO; PINT only supports ????)",
),
"",
)
self.add_param_from_top(
strParameter(
name="BINARY",
description="Pulsar System/Binary model",
value=None,
),
"",
)
self.add_param_from_top(
boolParameter(
name="DILATEFREQ",
value=False,
description="Whether or not TEMPO2 should apply gravitational redshift and time dilation to observing frequency (Y/N; PINT only supports N)",
),
"",
)
self.add_param_from_top(
boolParameter(
name="DMDATA",
value=False,
description="Was the fit done using per-TOA DM information?",
),
"",
)
self.add_param_from_top(
intParameter(
name="NTOA", value=0, description="Number of TOAs used in the fitting"
),
"",
)
self.add_param_from_top(
floatParameter(
name="CHI2",
units="",
description="Chi-squared value obtained during fitting",
),
"",
)
self.add_param_from_top(
floatParameter(
name="CHI2R",
units="",
description="Reduced chi-squared value obtained during fitting",
),
"",
)
self.add_param_from_top(
floatParameter(
name="TRES",
units=u.us,
description="TOA residual after fitting",
),
"",
)
self.add_param_from_top(
floatParameter(
name="DMRES",
units=u.pc / u.cm**3,
description="DM residual after fitting (wideband only)",
),
"",
)
for cp in components:
self.add_component(cp, setup=False, validate=False)
def __repr__(self):
return "{}(\n {}\n)".format(
self.__class__.__name__,
",\n ".join(str(v) for k, v in sorted(self.components.items())),
)
def __str__(self):
return self.as_parfile()
[docs] def validate(self, allow_tcb=False):
"""Validate component setup.
The checks include required parameters and parameter values, and component types.
See also: :func:`pint.models.timing_model.TimingModel.validate_component_types`.
"""
if self.DILATEFREQ.value:
warn("PINT does not support 'DILATEFREQ Y'")
self.DILATEFREQ.value = False
if self.TIMEEPH.value not in [None, "FB90"]:
warn("PINT only supports 'TIMEEPH FB90'")
self.TIMEEPH.value = "FB90"
if self.T2CMETHOD.value not in [None, "IAU2000B"]: # FIXME: really?
warn("PINT only supports 'T2CMETHOD IAU2000B'")
self.T2CMETHOD.value = "IAU2000B"
if self.UNITS.value not in [None, "TDB", "TCB"]:
error_message = f"PINT only supports 'UNITS TDB'. The given timescale '{self.UNITS.value}' is invalid."
raise ValueError(error_message)
elif self.UNITS.value == "TCB":
if not allow_tcb:
error_message = """The TCB timescale is not fully supported by PINT.
PINT only supports 'UNITS TDB' internally. See https://nanograv-pint.readthedocs.io/en/latest/explanation.html#time-scales
for an explanation on different timescales. A TCB par file can be
converted to TDB using the `tcb2tdb` command like so:
$ tcb2tdb J1234+6789_tcb.par J1234+6789_tdb.par
However, this conversion is not exact and a fit must be performed to obtain
reliable results. Note that PINT only supports writing TDB par files.
"""
raise ValueError(error_message)
else:
log.warning(
"PINT does not support 'UNITS TCB' internally. Reading this par file nevertheless "
"because the `allow_tcb` option was given. This `TimingModel` object should not be "
"used for anything except converting to TDB."
)
if not self.START.frozen:
warn("START cannot be unfrozen... Setting START.frozen to True")
self.START.frozen = True
if not self.FINISH.frozen:
warn("FINISH cannot be unfrozen... Setting FINISH.frozen to True")
self.FINISH.frozen = True
for cp in self.components.values():
cp.validate()
self.validate_component_types()
[docs] def validate_component_types(self):
"""Physically motivated validation of a timing model. This method checks the
compatibility of different model components when used together.
This function throws an error if multiple deterministic components that model
the same effect are used together (e.g. :class:`pint.models.astrometry.AstrometryEquatorial`
and :class:`pint.models.astrometry.AstrometryEcliptic`). It emits a warning if
a deterministic component and a stochastic component that model the same effect
are used together (e.g. :class:`pint.models.noise_model.PLDMNoise`
and :class:`pint.models.dispersion_model.DispersionDMX`). It also requires that
one and only one :class:`pint.models.spindown.SpindownBase` component is present
in a timing model.
"""
def num_components_of_type(type):
return len(
list(filter(lambda c: isinstance(c, type), self.components.values()))
)
from pint.models.spindown import SpindownBase
assert (
num_components_of_type(SpindownBase) == 1
), "Model must have one and only one spindown component (Spindown or another subclass of SpindownBase)."
from pint.models.astrometry import Astrometry
assert (
num_components_of_type(Astrometry) <= 1
), "Model can have at most one Astrometry component."
from pint.models.solar_system_shapiro import SolarSystemShapiro
if num_components_of_type(SolarSystemShapiro) == 1:
assert (
num_components_of_type(Astrometry) == 1
), "Model cannot have SolarSystemShapiro component without an Astrometry component."
from pint.models.pulsar_binary import PulsarBinary
has_binary_attr = hasattr(self, "BINARY") and self.BINARY.value
if has_binary_attr:
assert (
num_components_of_type(PulsarBinary) == 1
), "BINARY attribute is set but no PulsarBinary component found."
assert (
num_components_of_type(PulsarBinary) <= 1
), "Model can have at most one PulsarBinary component."
from pint.models.solar_wind_dispersion import SolarWindDispersionBase
assert (
num_components_of_type(SolarWindDispersionBase) <= 1
), "Model can have at most one solar wind dispersion component."
from pint.models.dispersion_model import DispersionDMX
from pint.models.wave import Wave
from pint.models.wavex import WaveX
from pint.models.dmwavex import DMWaveX
from pint.models.noise_model import PLRedNoise, PLDMNoise
if num_components_of_type((DispersionDMX, PLDMNoise, DMWaveX)) > 1:
log.warning(
"DispersionDMX, PLDMNoise, and DMWaveX cannot be used together. "
"They are ways of modelling the same effect."
)
if num_components_of_type((Wave, WaveX, PLRedNoise)) > 1:
log.warning(
"Wave, WaveX, and PLRedNoise cannot be used together. "
"They are ways of modelling the same effect."
)
# def __str__(self):
# result = ""
# comps = self.components
# for k, cp in list(comps.items()):
# result += "In component '%s'" % k + "\n\n"
# for pp in cp.params:
# result += str(getattr(cp, pp)) + "\n"
# return result
def __getattr__(self, name):
if name in ["components", "component_types", "search_cmp_attr"]:
raise AttributeError
if not hasattr(self, "component_types"):
raise AttributeError
for cp in self.components.values():
try:
return getattr(cp, name)
except AttributeError:
continue
raise AttributeError(
f"Attribute {name} not found in TimingModel or any Component"
)
def __setattr__(self, name, value):
"""Mostly this just sets ``self.name = value``. But in the case where they are both :class:`Parameter` instances
with different names, this copies the ``quantity``, ``uncertainty``, ``frozen`` attributes only.
"""
if isinstance(value, (Parameter, prefixParameter)) and name != value.name:
for p in ["quantity", "uncertainty", "frozen"]:
setattr(getattr(self, name), p, getattr(value, p))
else:
super().__setattr__(name, value)
@property_exists
def params_ordered(self):
"""List of all parameter names in this model and all its components.
This is the same as `params`."""
# Historically, this was different from `params` because Python
# dictionaries were unordered until Python 3.7. Now there is no reason for
# them to be different.
warn(
"`TimingModel.params_ordered` is now deprecated and may be removed in the future. "
"Use `TimingModel.params` instead. It gives the same output as `TimingModel.params_ordered`.",
DeprecationWarning,
)
return self.params
@property_exists
def params(self):
"""List of all parameter names in this model and all its components, in a sensible order."""
# Define the order of components in the list
# Any not included will be printed between the first and last set.
# FIXME: make order completely canonical (sort components by name?)
start_order = ["astrometry", "spindown", "dispersion"]
last_order = ["jump_delay"]
compdict = self.get_components_by_category()
used_cats = set()
pstart = copy.copy(self.top_level_params)
for cat in start_order:
if cat not in compdict:
continue
cp = compdict[cat]
for cpp in cp:
pstart += cpp.params
used_cats.add(cat)
pend = []
for cat in last_order:
if cat not in compdict:
continue
cp = compdict[cat]
for cpp in cp:
pend += cpp.parms
used_cats.add(cat)
# Now collect any components that haven't already been included in the list
pmid = []
for cat in compdict:
if cat in used_cats:
continue
cp = compdict[cat]
for cpp in cp:
pmid += cpp.params
used_cats.add(cat)
return pstart + pmid + pend
@property_exists
def free_params(self):
"""List of all the free parameters in the timing model.
Can be set to change which are free.
These are ordered as ``self.params`` does.
Upon setting, order does not matter, and aliases are accepted.
ValueError is raised if a parameter is not recognized.
On setting, parameter aliases are converted with
:func:`pint.models.timing_model.TimingModel.match_param_aliases`.
"""
return [p for p in self.params if not getattr(self, p).frozen]
@free_params.setter
def free_params(self, params):
params_true = {self.match_param_aliases(p) for p in params}
for p in self.params:
getattr(self, p).frozen = p not in params_true
params_true.discard(p)
if params_true:
raise ValueError(
f"Parameter(s) are familiar but not in the model: {params}"
)
@property_exists
def fittable_params(self):
"""List of parameters that are fittable, i.e., the parameters
which have a derivative implemented. These derivatives are usually
accessed via the `d_delay_d_param` and `d_phase_d_param` methods."""
return [
p
for p in self.params
if (
p in self.phase_deriv_funcs
or p in self.delay_deriv_funcs
or (
(
hasattr(self, "toasigma_deriv_funcs")
and p in self.toasigma_deriv_funcs
)
)
or (hasattr(self[p], "prefix") and self[p].prefix == "ECORR")
)
]
[docs] def match_param_aliases(self, alias):
"""Return PINT parameter name corresponding to this alias.
Parameters
----------
alias: str
Parameter's alias.
Returns
-------
str
PINT parameter name corresponding to the input alias.
"""
# Search the top level first.
for p in self.top_level_params:
if p == alias:
return p
if alias in getattr(self, p).aliases:
return p
# if not in the top level, check parameters.
pint_par = None
for cp in self.components.values():
try:
pint_par = cp.match_param_aliases(alias)
except UnknownParameter:
continue
return pint_par
raise UnknownParameter(f"{alias} is not recognized as a parameter or alias")
[docs] def get_params_dict(self, which="free", kind="quantity"):
"""Return a dict mapping parameter names to values.
This can return only the free parameters or all; and it can return the
parameter objects, the floating-point values, or the uncertainties.
Parameters
----------
which : "free", "all"
kind : "quantity", "value", "uncertainty"
Returns
-------
OrderedDict
"""
if which == "free":
ps = self.free_params
elif which == "all":
ps = self.params
else:
raise ValueError("get_params_dict expects which to be 'all' or 'free'")
c = OrderedDict()
for p in ps:
q = getattr(self, p)
if kind == "quantity":
c[p] = q
elif kind in ("value", "num"):
c[p] = q.value
elif kind == "uncertainty":
c[p] = q.uncertainty_value
else:
raise ValueError(f"Unknown kind {kind!r}")
return c
[docs] def get_params_of_component_type(self, component_type):
"""Get a list of parameters belonging to a component type.
Parameters
----------
component_type : "PhaseComponent", "DelayComponent", "NoiseComponent"
Returns
-------
list
"""
component_type_list_str = f"{component_type}_list"
if hasattr(self, component_type_list_str):
component_type_list = getattr(self, component_type_list_str)
return [
param for component in component_type_list for param in component.params
]
else:
return []
[docs] def set_param_values(self, fitp):
"""Set the model parameters to the value contained in the input dict.
Ex. model.set_param_values({'F0':60.1,'F1':-1.3e-15})
"""
# In Powell fitter this sometimes fails because after some iterations the values change from
# plain float to Quantities. No idea why.
for k, v in fitp.items():
p = getattr(self, k)
if isinstance(v, (Parameter, prefixParameter)):
if v.value is None:
raise ValueError(f"Parameter {v} is unset")
p.value = v.value
elif isinstance(v, u.Quantity):
p.value = v.to_value(p.units)
else:
p.value = v
[docs] def set_param_uncertainties(self, fitp):
"""Set the model parameters to the value contained in the input dict."""
for k, v in fitp.items():
p = getattr(self, k)
p.uncertainty = v if isinstance(v, u.Quantity) else v * p.units
@property_exists
def components(self):
"""All the components in a dictionary indexed by name."""
comps = {}
for ct in self.component_types:
for cp in getattr(self, f"{ct}_list"):
comps[cp.__class__.__name__] = cp
return comps
@property_exists
def delay_funcs(self):
"""List of all delay functions."""
dfs = []
for d in self.DelayComponent_list:
dfs += d.delay_funcs_component
return dfs
@property_exists
def phase_funcs(self):
"""List of all phase functions."""
pfs = []
for p in self.PhaseComponent_list:
pfs += p.phase_funcs_component
return pfs
@property_exists
def is_binary(self):
"""Does the model describe a binary pulsar?"""
return any(x.startswith("Binary") for x in self.components.keys())
[docs] def orbital_phase(self, barytimes, anom="mean", radians=True):
"""Return orbital phase (in radians) at barycentric MJD times.
Parameters
----------
barytimes: Time, TOAs, array-like, or float
MJD barycentric time(s). The times to compute the
orbital phases. Needs to be a barycentric time in TDB.
If a TOAs instance is passed, the barycentering will happen
automatically. If an astropy Time object is passed, it must
be in scale='tdb'. If an array-like object is passed or
a simple float, the time must be in MJD format.
anom: str, optional
Type of phase/anomaly. Defaults to "mean".
Other options are "eccentric" or "true"
radians: bool, optional
Units to return. Defaults to True.
If False, will return unitless phases in cycles (i.e. 0-1).
Raises
------
ValueError
If anom.lower() is not "mean", "ecc*", or "true",
or if an astropy Time object is passed with scale!="tdb".
Returns
-------
array
The specified anomaly in radians (with unit), unless
radians=False, which return unitless cycles (0-1).
"""
if not self.is_binary: # punt if not a binary
return None
# Find the binary model
b = self.components[
[x for x in self.components.keys() if x.startswith("Binary")][0]
]
# Make sure that the binary instance has the binary params
b.update_binary_object(None)
# Handle input times and update them in stand-alone binary models
if isinstance(barytimes, TOAs):
# If we pass the TOA table, then barycenter the TOAs
bts = self.get_barycentric_toas(barytimes)
elif isinstance(barytimes, time.Time):
if barytimes.scale == "tdb":
bts = np.asarray(barytimes.mjd_long)
else:
raise ValueError("barytimes as Time instance needs scale=='tdb'")
elif isinstance(barytimes, MJDParameter):
bts = np.asarray(barytimes.value) # .value is always a MJD long double
else:
bts = np.asarray(barytimes)
bbi = b.binary_instance # shorthand
# Update the times in the stand-alone binary model
updates = {"barycentric_toa": bts}
bbi.update_input(**updates)
if anom.lower() == "mean":
anoms = bbi.M()
elif anom.lower().startswith("ecc"):
anoms = bbi.E()
elif anom.lower() == "true":
anoms = bbi.nu() # can be negative
else:
raise ValueError(f"anom='{anom}' is not a recognized type of anomaly")
# Make sure all angles are between 0-2*pi
anoms = np.remainder(anoms.value, 2 * np.pi)
# return with radian units or return as unitless cycles from 0-1
return anoms * u.rad if radians else anoms / (2 * np.pi)
[docs] def pulsar_radial_velocity(self, barytimes):
"""Return line-of-sight velocity of the pulsar relative to the system barycenter at barycentric MJD times.
Parameters
----------
barytimes: Time, TOAs, array-like, or float
MJD barycentric time(s). The times to compute the
orbital phases. Needs to be a barycentric time in TDB.
If a TOAs instance is passed, the barycentering will happen
automatically. If an astropy Time object is passed, it must
be in scale='tdb'. If an array-like object is passed or
a simple float, the time must be in MJD format.
Raises
------
ValueError
If an astropy Time object is passed with scale!="tdb".
Returns
-------
array
The line-of-sight velocity
Notes
-----
This is the radial velocity of the pulsar.
See [1]_
.. [1] Lorimer & Kramer, 2008, "The Handbook of Pulsar Astronomy", Eqn. 8.24
"""
# this should also update the binary instance
nu = self.orbital_phase(barytimes, anom="true")
b = self.components[
[x for x in self.components.keys() if x.startswith("Binary")][0]
]
bbi = b.binary_instance # shorthand
psi = nu + bbi.omega()
return (
2
* np.pi
* bbi.a1()
/ (bbi.pb() * np.sqrt(1 - bbi.ecc() ** 2))
* (np.cos(psi) + bbi.ecc() * np.cos(bbi.omega()))
).cgs
[docs] def companion_radial_velocity(self, barytimes, massratio):
"""Return line-of-sight velocity of the companion relative to the system barycenter at barycentric MJD times.
Parameters
----------
barytimes: Time, TOAs, array-like, or float
MJD barycentric time(s). The times to compute the
orbital phases. Needs to be a barycentric time in TDB.
If a TOAs instance is passed, the barycentering will happen
automatically. If an astropy Time object is passed, it must
be in scale='tdb'. If an array-like object is passed or
a simple float, the time must be in MJD format.
massratio : float
Ratio of pulsar mass to companion mass
Raises
------
ValueError
If an astropy Time object is passed with scale!="tdb".
Returns
-------
array
The line-of-sight velocity
Notes
-----
This is the radial velocity of the companion.
See [1]_
.. [1] Lorimer & Kramer, 2008, "The Handbook of Pulsar Astronomy", Eqn. 8.24
"""
return -self.pulsar_radial_velocity(barytimes) * massratio
[docs] def conjunction(self, baryMJD):
"""Return the time(s) of the first superior conjunction(s) after baryMJD.
Args
----
baryMJD: floats or Time
barycentric (tdb) MJD(s) prior to the
conjunction we are looking for. Can be an array.
Raises
------
ValueError
If baryMJD is an astropy Time object with scale!="tdb".
Returns
-------
float or array
The barycentric MJD(tdb) time(s) of the next superior conjunction(s) after baryMJD
"""
if not self.is_binary: # punt if not a binary
return None
# Find the binary model
b = self.components[
[x for x in self.components.keys() if x.startswith("Binary")][0]
]
bbi = b.binary_instance # shorthand
# Superior conjunction occurs when true anomaly + omega == 90 deg
# We will need to solve for this using a root finder (brentq)
# This is the function to root-find:
def funct(t):
nu = self.orbital_phase(t, anom="true")
return np.remainder((nu + bbi.omega()).value, 2 * np.pi) - np.pi / 2
# Handle the input time(s)
if isinstance(baryMJD, time.Time):
if baryMJD.scale == "tdb":
bts = np.atleast_1d(baryMJD.mjd)
else:
raise ValueError("baryMJD as Time instance needs scale=='tdb'")
else:
bts = np.atleast_1d(baryMJD)
# Step over the maryMJDs
scs = []
for bt in bts:
# Make 11 times over one orbit after bt
pb = self.pb()[0].to_value("day")
ts = np.linspace(bt, bt + pb, 11)
# Compute the true anomalies and omegas for those times
nus = self.orbital_phase(ts, anom="true")
omegas = bbi.omega()
x = np.remainder((nus + omegas).value, 2 * np.pi) - np.pi / 2
# find the lowest index where x is just below 0
for lb in range(len(x)):
if x[lb] < 0 and x[lb + 1] > 0:
break
# Now use scipy to find the root
scs.append(brentq(funct, ts[lb], ts[lb + 1]))
return scs[0] if len(scs) == 1 else np.asarray(scs)
@property_exists
def dm_funcs(self):
"""List of all dm value functions."""
dmfs = []
for cp in self.components.values():
if hasattr(cp, "dm_value_funcs"):
dmfs += cp.dm_value_funcs
else:
continue
return dmfs
@property_exists
def has_correlated_errors(self):
"""Whether or not this model has correlated errors."""
return (
"NoiseComponent" in self.component_types
and len(
[
nc
for nc in self.NoiseComponent_list
if nc.introduces_correlated_errors
]
)
> 0
)
@property_exists
def has_time_correlated_errors(self):
"""Whether or not this model has time-correlated errors."""
return (
"NoiseComponent" in self.component_types
and len(
[
nc
for nc in self.NoiseComponent_list
if (nc.introduces_correlated_errors and nc.is_time_correlated)
]
)
> 0
)
@property_exists
def covariance_matrix_funcs(self):
"""List of covariance matrix functions."""
cvfs = []
if "NoiseComponent" in self.component_types:
for nc in self.NoiseComponent_list:
cvfs += nc.covariance_matrix_funcs
return cvfs
@property_exists
def dm_covariance_matrix_funcs(self):
"""List of covariance matrix functions."""
cvfs = []
if "NoiseComponent" in self.component_types:
for nc in self.NoiseComponent_list:
cvfs += nc.dm_covariance_matrix_funcs_component
return cvfs
# Change sigma to uncertainty to avoid name conflict.
@property_exists
def scaled_toa_uncertainty_funcs(self):
"""List of scaled toa uncertainty functions."""
ssfs = []
if "NoiseComponent" in self.component_types:
for nc in self.NoiseComponent_list:
ssfs += nc.scaled_toa_sigma_funcs
return ssfs
# Change sigma to uncertainty to avoid name conflict.
@property_exists
def scaled_dm_uncertainty_funcs(self):
"""List of scaled dm uncertainty functions."""
ssfs = []
if "NoiseComponent" in self.component_types:
for nc in self.NoiseComponent_list:
if hasattr(nc, "scaled_dm_sigma_funcs"):
ssfs += nc.scaled_dm_sigma_funcs
return ssfs
@property_exists
def basis_funcs(self):
"""List of scaled uncertainty functions."""
bfs = []
if "NoiseComponent" in self.component_types:
for nc in self.NoiseComponent_list:
bfs += nc.basis_funcs
return bfs
@property_exists
def phase_deriv_funcs(self):
"""List of derivative functions for phase components."""
return self.get_deriv_funcs("PhaseComponent")
@property_exists
def delay_deriv_funcs(self):
"""List of derivative functions for delay components."""
return self.get_deriv_funcs("DelayComponent")
@property_exists
def dm_derivs(self): # TODO need to be careful about the name here.
"""List of DM derivative functions."""
return self.get_deriv_funcs("DelayComponent", "dm")
@property_exists
def toasigma_derivs(self):
"""List of scaled TOA uncertainty derivative functions"""
return self.get_deriv_funcs("NoiseComponent", "toasigma")
@property_exists
def d_phase_d_delay_funcs(self):
"""List of d_phase_d_delay functions."""
Dphase_Ddelay = []
for cp in self.PhaseComponent_list:
Dphase_Ddelay += cp.phase_derivs_wrt_delay
return Dphase_Ddelay
[docs] def get_deriv_funcs(self, component_type, derivative_type=""):
"""Return a dictionary of derivative functions.
Parameters
----------
component_type: str
Type of component to look for derivatives ("PhaseComponent",
"DelayComponent", or "NoiseComponent")
derivative_type: str
Derivative type ("", "dm", or "toasigma". Empty string
denotes delay and phase derivatives.)
"""
# TODO, this function can be a more generic function collector.
deriv_funcs = defaultdict(list)
if derivative_type != "":
derivative_type += "_"
for cp in getattr(self, f"{component_type}_list"):
try:
df = getattr(cp, f"{derivative_type}deriv_funcs")
except AttributeError:
continue
for k, v in df.items():
deriv_funcs[k] += v
return dict(deriv_funcs)
[docs] def search_cmp_attr(self, name):
"""Search for an attribute in all components.
Return the component, or None.
If multiple components have same attribute, it will return the first
component.
"""
for cp in list(self.components.values()):
if hasattr(cp, name):
return cp
raise AttributeError(f"{name} not found in any component")
[docs] def get_component_type(self, component):
"""Identify the component object's type.
Parameters
----------
component: component instance
The component object need to be inspected.
Note
----
Since a component can be an inheritance from other component We inspect
all the component object bases. "inspect getmro" method returns the
base classes (including 'object') in method resolution order. The
third level of inheritance class name is what we want.
Object --> component --> TypeComponent. (i.e. DelayComponent)
This class type is in the third to the last of the getmro returned
result.
"""
# check component type
comp_base = inspect.getmro(component.__class__)
if comp_base[-2].__name__ != "Component":
raise TypeError(
f"Class '{component.__class__.__name__}' is not a Component type class."
)
elif len(comp_base) < 3:
raise TypeError(
f"'{component.__class__.__name__}' class is not a subclass of 'Component' class."
)
else:
comp_type = comp_base[-3].__name__
return comp_type
[docs] def map_component(self, component):
"""Get the location of component.
Parameters
----------
component: str or `Component` object
Component name or component object.
Returns
-------
comp: `Component` object
Component object.
order: int
The index/order of the component in the component list
host_list: List
The host list of the component.
comp_type: str
The component type (e.g., Delay or Phase)
"""
comps = self.components
if isinstance(component, str):
if component not in list(comps.keys()):
raise AttributeError(f"No '{component}' in the timing model.")
comp = comps[component]
elif component in list(comps.values()):
comp = component
else:
raise AttributeError(
f"No '{component.__class__.__name__}' in the timing model."
)
comp_type = self.get_component_type(comp)
host_list = getattr(self, f"{comp_type}_list")
order = host_list.index(comp)
return comp, order, host_list, comp_type
[docs] def add_component(
self, component, order=DEFAULT_ORDER, force=False, setup=True, validate=True
):
"""Add a component into TimingModel.
Parameters
----------
component : Component
The component to be added to the timing model.
order : list, optional
The component category order list. Default is the DEFAULT_ORDER.
force : bool, optional
If true, add a duplicate component. Default is False.
"""
comp_type = self.get_component_type(component)
cur_cps = []
if comp_type in self.component_types:
comp_list = getattr(self, f"{comp_type}_list")
for cp in comp_list:
# If component order is not defined.
cp_order = (
order.index(cp.category) if cp.category in order else len(order) + 1
)
cur_cps.append((cp_order, cp))
# Check if the component has been added already.
if component.__class__ in (x.__class__ for x in comp_list):
log.warning(
f"Component '{component.__class__.__name__}' is already present but was added again."
)
if not force:
raise ValueError(
f"Component '{component.__class__.__name__}' is already present and will not be "
f"added again. To force add it, use force=True option."
)
else:
self.component_types.append(comp_type)
# link new component to TimingModel
component._parent = self
# If the category is not in the order list, it will be added to the end.
if component.category not in order:
new_cp = len(order) + 1, component
else:
new_cp = order.index(component.category), component
# add new component
cur_cps.append(new_cp)
cur_cps.sort(key=lambda x: x[0])
new_comp_list = [c[1] for c in cur_cps]
setattr(self, f"{comp_type}_list", new_comp_list)
# Set up components
if setup:
self.setup()
# Validate inputs
if validate:
self.validate()
[docs] def remove_component(self, component):
"""Remove one component from the timing model.
Parameters
----------
component: str or `Component` object
Component name or component object.
"""
cp, co_order, host, cp_type = self.map_component(component)
host.remove(cp)
def _locate_param_host(self, param):
"""Search for the parameter host component in the timing model.
Parameters
----------
param: str
Target parameter.
Return
------
list of tuples
All possible components that host the target parameter. The first
element is the component object that have the target parameter, the
second one is the parameter object. If it is a prefix-style parameter,
it will return one example of such parameter.
"""
result_comp = []
for cp_name, cp in self.components.items():
if param in cp.params:
result_comp.append((cp_name, cp, getattr(cp, param)))
else:
# search for prefixed parameter
prefixs = cp.param_prefixs
try:
prefix, index_str, index = split_prefixed_name(param)
except PrefixError:
prefix = param
if prefix in prefixs.keys():
result_comp.append(cp_name, cp, getattr(cp, prefixs[param][0]))
return result_comp
[docs] def get_components_by_category(self):
"""Return a dict of this model's component objects keyed by the category name."""
categorydict = defaultdict(list)
for cp in self.components.values():
categorydict[cp.category].append(cp)
# Convert from defaultdict to dict
return dict(categorydict)
[docs] def add_param_from_top(self, param, target_component, setup=False):
"""Add a parameter to a timing model component.
Parameters
----------
param: pint.models.parameter.Parameter
Parameter instance
target_component: str
Parameter host component name. If given as "" it would add
parameter to the top level `TimingModel` class
setup: bool, optional
Flag to run setup() function.
"""
if target_component == "":
setattr(self, param.name, param)
self.top_level_params += [param.name]
elif target_component in list(self.components.keys()):
self.components[target_component].add_param(param, setup=setup)
else:
raise AttributeError(
f"Can not find component '{target_component}' in " "timing model."
)
[docs] def remove_param(self, param):
"""Remove a parameter from timing model.
Parameters
----------
param: str
The name of parameter to be removed.
"""
param_map = self.get_params_mapping()
if param not in param_map:
raise AttributeError(f"Can not find '{param}' in timing model.")
if param_map[param] == "timing_model":
delattr(self, param)
self.top_level_params.remove(param)
else:
target_component = param_map[param]
self.components[target_component].remove_param(param)
self.setup()
[docs] def get_params_mapping(self):
"""Report which component each parameter name comes from."""
param_mapping = {p: "TimingModel" for p in self.top_level_params}
for cp in list(self.components.values()):
for pp in cp.params:
param_mapping[pp] = cp.__class__.__name__
return param_mapping
def get_params_of_type_top(self, param_type):
result = []
for cp in self.components.values():
result += cp.get_params_of_type(param_type)
return result
[docs] def get_prefix_mapping(self, prefix):
"""Get the index mapping for the prefix parameters.
Parameters
----------
prefix : str
Name of prefix.
Returns
-------
dict
A dictionary with prefix parameter real index as key and parameter
name as value.
"""
for cp in self.components.values():
mapping = cp.get_prefix_mapping_component(prefix)
if len(mapping) != 0:
return mapping
raise ValueError(f"Can not find prefix {prefix!r}")
[docs] def get_prefix_list(self, prefix, start_index=0):
"""Return the Quantities associated with a sequence of prefix parameters.
Parameters
----------
prefix : str
Name of prefix.
start_index : int
The index to start the sequence at (DM1, DM2, ... vs F0, F1, ...)
Returns
-------
list of astropy.units.Quantity
The ``.quantity`` associated with parameter prefix + start_index,
prefix + (start_index+1), ... up to the last that exists and is set.
Raises
------
ValueError
If any prefix parameters exist outside the sequence that would be returned
(for example if there are DM1 and DM3 but not DM2, or F0 exists but start_index
was given as 1).
"""
matches = {}
for p in self.params:
if not p.startswith(prefix):
continue
pm = getattr(self, p)
if not pm.is_prefix:
continue
if pm.quantity is None:
continue
if pm.prefix != prefix:
continue
matches[pm.index] = pm
r = []
i = start_index
while True:
try:
r.append(matches.pop(i).quantity)
except KeyError:
break
i += 1
if matches:
raise ValueError(
f"Unused prefix parameters for start_index {start_index}: {matches}"
)
return r
[docs] def param_help(self):
"""Print help lines for all available parameters in model."""
return "".join(
"{:<40}{}\n".format(cp, getattr(self, par).help_line())
for par, cp in self.get_params_mapping().items()
)
[docs] def delay(self, toas, cutoff_component="", include_last=True):
"""Total delay for the TOAs.
Return the total delay which will be subtracted from the given
TOA to get time of emission at the pulsar.
Parameters
----------
toas: pint.toa.TOAs
The toas for analysis delays.
cutoff_component: str
The delay component name that a user wants the calculation to stop
at.
include_last: bool
If the cutoff delay component is included.
"""
delay = np.zeros(toas.ntoas) * u.second
if cutoff_component == "":
idx = len(self.DelayComponent_list)
else:
delay_names = [x.__class__.__name__ for x in self.DelayComponent_list]
if cutoff_component not in delay_names:
raise KeyError(f"No delay component named '{cutoff_component}'.")
idx = delay_names.index(cutoff_component)
if include_last:
idx += 1
# Do NOT cycle through delay_funcs - cycle through components until cutoff
for dc in self.DelayComponent_list[:idx]:
for df in dc.delay_funcs_component:
delay += df(toas, delay)
return delay
[docs] def phase(self, toas, abs_phase=None):
"""Return the model-predicted pulse phase for the given TOAs.
This is the phase as observed at the observatory at the exact moment
specified in each TOA. The result is a :class:`pint.phase.Phase` object.
"""
# First compute the delays to "pulsar time"
delay = self.delay(toas)
phase = Phase(np.zeros(toas.ntoas), np.zeros(toas.ntoas))
# Then compute the relevant pulse phases
for pf in self.phase_funcs:
phase += Phase(pf(toas, delay))
# abs_phase defaults to True if AbsPhase is in the model, otherwise to
# False. Of course, if you manually set it, it will use that setting.
if abs_phase is None:
abs_phase = "AbsPhase" in list(self.components.keys())
# This function gets called in `Residuals.calc_phase_resids()` with `abs_phase=True`
# by default. Hence, this branch is not run by default.
if not abs_phase:
return phase
if "AbsPhase" not in list(self.components.keys()):
log.info("Creating a TZR TOA (AbsPhase) using the given TOAs object.")
# if no absolute phase (TZRMJD), add the component to the model and calculate it
self.add_tzr_toa(toas)
tz_toa = self.get_TZR_toa(toas)
tz_delay = self.delay(tz_toa)
tz_phase = Phase(np.zeros(len(toas.table)), np.zeros(len(toas.table)))
for pf in self.phase_funcs:
tz_phase += Phase(pf(tz_toa, tz_delay))
return phase - tz_phase
[docs] def add_tzr_toa(self, toas):
"""Create a TZR TOA for the given TOAs object and add it to
the timing model. This corresponds to TOA closest to the PEPOCH."""
from pint.models.absolute_phase import AbsPhase
self.add_component(AbsPhase(), validate=False)
self.make_TZR_toa(toas)
self.validate()
[docs] def total_dm(self, toas):
"""Calculate dispersion measure from all the dispersion type of components."""
# Here we assume the unit would be the same for all the dm value function.
# By doing so, we do not have to hard code an unit here.
dm = self.dm_funcs[0](toas)
for dm_f in self.dm_funcs[1::]:
dm += dm_f(toas)
return dm
[docs] def total_dispersion_slope(self, toas):
"""Calculate the dispersion slope from all the dispersion-type components."""
dm_tot = self.total_dm(toas)
return dispersion_slope(dm_tot)
[docs] def toa_covariance_matrix(self, toas):
"""Get the TOA covariance matrix for noise models.
If there is no noise model component provided, a diagonal matrix with
TOAs error as diagonal element will be returned.
"""
result = np.zeros((len(toas), len(toas)))
if "ScaleToaError" not in self.components:
result += np.diag(toas.table["error"].quantity.to(u.s).value ** 2)
for nf in self.covariance_matrix_funcs:
result += nf(toas)
return result
[docs] def dm_covariance_matrix(self, toas):
"""Get the DM covariance matrix for noise models.
If there is no noise model component provided, a diagonal matrix with
TOAs error as diagonal element will be returned.
"""
dms, valid_dm = toas.get_flag_value("pp_dm", as_type=float)
dmes, valid_dme = toas.get_flag_value("pp_dme", as_type=float)
dms = np.array(dms)[valid_dm]
n_dms = len(dms)
dmes = np.array(dmes)[valid_dme]
result = np.zeros((n_dms, n_dms))
# When there is no noise model.
# FIXME: specifically when there is no DMEFAC
if len(self.dm_covariance_matrix_funcs) == 0:
result += np.diag(dmes**2)
return result
for nf in self.dm_covariance_matrix_funcs:
result += nf(toas)
return result
[docs] def scaled_toa_uncertainty(self, toas):
"""Get the scaled TOA data uncertainties noise models.
If there is no noise model component provided, a vector with
TOAs error as values will be returned.
Parameters
----------
toas: pint.toa.TOAs
The input data object for TOAs uncertainty.
"""
ntoa = toas.ntoas
tbl = toas.table
result = np.zeros(ntoa) * u.us
# When there is no noise model.
if len(self.scaled_toa_uncertainty_funcs) == 0:
result += tbl["error"].quantity
return result
for nf in self.scaled_toa_uncertainty_funcs:
result += nf(toas)
return result
[docs] def scaled_dm_uncertainty(self, toas):
"""Get the scaled DM data uncertainties noise models.
If there is no noise model component provided, a vector with
DM error as values will be returned.
Parameters
----------
toas: pint.toa.TOAs
The input data object for DM uncertainty.
"""
dm_error, valid = toas.get_flag_value("pp_dme", as_type=float)
dm_error = np.array(dm_error)[valid] * u.pc / u.cm**3
result = np.zeros(len(dm_error)) * u.pc / u.cm**3
# When there is no noise model.
if len(self.scaled_dm_uncertainty_funcs) == 0:
result += dm_error
return result
for nf in self.scaled_dm_uncertainty_funcs:
result += nf(toas)
return result
def noise_model_designmatrix(self, toas):
if len(self.basis_funcs) == 0:
return None
result = [nf(toas)[0] for nf in self.basis_funcs]
return np.hstack(list(result))
def noise_model_basis_weight(self, toas):
if len(self.basis_funcs) == 0:
return None
result = [nf(toas)[1] for nf in self.basis_funcs]
return np.hstack(list(result))
[docs] def noise_model_dimensions(self, toas):
"""Number of basis functions for each noise model component.
Returns a dictionary of correlated-noise components in the noise
model. Each entry contains a tuple (offset, size) where size is the
number of basis functions for the component, and offset is their
starting location in the design matrix and weights vector.
"""
result = {}
# Correct results rely on this ordering being the
# same as what is done in the self.basis_funcs
# property.
if len(self.basis_funcs) > 0:
ntot = 0
for nc in self.NoiseComponent_list:
bfs = nc.basis_funcs
if len(bfs) == 0:
continue
nbf = sum(len(bf(toas)[1]) for bf in bfs)
result[nc.category] = (ntot, nbf)
ntot += nbf
return result
[docs] def jump_flags_to_params(self, toas):
"""Add JUMP parameters corresponding to tim_jump flags.
When a ``.tim`` file contains pairs of JUMP lines, the user's expectation
is that the TOAs between each pair of flags will be affected by a JUMP, even
if that JUMP does not appear in the ``.par`` file. (This is how TEMPO works.)
In PINT, those TOAs have a flag attached, `-tim_jump N`, where N is a
number that is different for each JUMPed set of TOAs. The goal of this function
is to add JUMP parameters to the model corresponding to these.
Some complexities arise: TOAs may also have `-tim_jump` flags associated
with them, just as flags, for example if such a ``.tim`` file were exported
in PINT-native format and then reloaded. And models may already have JUMPs
associated with some or all ``tim_jump`` values.
This function looks at all the ``tim_jump`` values and adds JUMP parameters
for any that do not have any. It does not change the TOAs object it is passed.
"""
from . import jump
tjvals, idxs = toas.get_flag_value("tim_jump")
tim_jump_values = set(tjvals)
tim_jump_values.remove(None)
if not tim_jump_values:
log.info("No jump flags to process from .tim file")
return None
log.info(f"There are {len(tim_jump_values)} JUMPs from the timfile.")
if "PhaseJump" not in self.components:
log.info("PhaseJump component added")
a = jump.PhaseJump()
a.setup()
self.add_component(a)
self.remove_param("JUMP1")
a.setup()
used_indices = set()
for p in self.get_jump_param_objects():
if p.key == "-tim_jump":
used_indices.add(p.index)
(tjv,) = p.key_value
if tjv in tim_jump_values:
log.info(f"JUMP -tim_jump {tjv} already exists")
tim_jump_values.remove(tjv)
num = max(used_indices) + 1 if used_indices else 1
if not tim_jump_values:
log.info("All tim_jump values have corresponding JUMPs")
return
# FIXME: arrange for these to be in a sensible order (might not be integers
# but if they are then lexicographical order is not wanted)
t_j_v = set()
for v in tim_jump_values:
try:
vi = int(v)
except ValueError:
vi = v
t_j_v.add(vi)
for v in sorted(t_j_v):
# Now we need to add a JUMP for each of these
log.info(f"Adding JUMP -tim_jump {v}")
param = maskParameter(
name="JUMP",
index=num,
key="-tim_jump",
key_value=v,
value=0.0,
units="second",
uncertainty=0.0,
)
self.add_param_from_top(param, "PhaseJump")
getattr(self, param.name).frozen = False
num += 1
self.components["PhaseJump"].setup()
[docs] def delete_jump_and_flags(self, toa_table, jump_num):
"""Delete jump object from PhaseJump and remove its flags from TOA table.
This is a helper function for pintk.
Parameters
----------
toa_table: list or None
The TOA table which must be modified. In pintk (pulsar.py), for the
prefit model, this will be all_toas.table["flags"].
For the postfit model, it will be None (one set of TOA tables for both
models).
jump_num: int
Specifies the index of the jump to be deleted.
"""
# remove jump of specified index
self.remove_param(f"JUMP{jump_num}")
# remove jump flags from selected TOA tables
if toa_table is not None:
for d in toa_table:
if "jump" in d:
index_list = d["jump"].split(",")
if str(jump_num) in index_list:
del index_list[index_list.index(str(jump_num))]
if not index_list:
del d["jump"]
else:
d["jump"] = ",".join(index_list)
# if last jump deleted, remove PhaseJump object from model
if (
self.components["PhaseJump"].get_number_of_jumps() == 0
): # means last jump just deleted
comp_list = getattr(self, "PhaseComponent_list")
for item in comp_list:
if isinstance(item, pint.models.jump.PhaseJump):
self.remove_component(item)
return
self.components["PhaseJump"].setup()
[docs] def get_barycentric_toas(self, toas, cutoff_component=""):
"""Conveniently calculate the barycentric TOAs.
Parameters
----------
toas: TOAs object
The TOAs the barycentric corrections are applied on
cutoff_delay: str, optional
The cutoff delay component name. If it is not provided, it will
search for binary delay and apply all the delay before binary.
Return
------
astropy.units.Quantity
Barycentered TOAs.
"""
tbl = toas.table
if cutoff_component == "":
delay_list = self.DelayComponent_list
for cp in delay_list:
if cp.category == "pulsar_system":
cutoff_component = cp.__class__.__name__
corr = self.delay(toas, cutoff_component, False)
return tbl["tdbld"] * u.day - corr
[docs] def d_phase_d_toa(self, toas, sample_step=None):
"""Return the finite-difference derivative of phase wrt TOA.
Parameters
----------
toas : pint.toa.TOAs
The toas when the derivative of phase will be evaluated at.
sample_step : float, optional
Finite difference steps. If not specified, it will take 1000 times the
spin period.
"""
copy_toas = copy.deepcopy(toas)
if sample_step is None:
pulse_period = 1.0 / (self.F0.quantity)
sample_step = pulse_period * 2
# Note that sample_dt is applied cumulatively, so this evaluates phase at TOA-dt and TOA+dt
sample_dt = [-sample_step, 2 * sample_step]
sample_phase = []
for dt in sample_dt:
dt_array = [dt.value] * copy_toas.ntoas * dt._unit
deltaT = time.TimeDelta(dt_array)
copy_toas.adjust_TOAs(deltaT)
phase = self.phase(copy_toas, abs_phase=False)
sample_phase.append(phase)
# Use finite difference method.
# phase'(t) = (phase(t+h)-phase(t-h))/2+ 1/6*F2*h^2 + ..
# The error should be near 1/6*F2*h^2
dp = sample_phase[1] - sample_phase[0]
d_phase_d_toa = dp.int / (2 * sample_step) + dp.frac / (2 * sample_step)
del copy_toas
return d_phase_d_toa.to(u.Hz)
[docs] def d_phase_d_tpulsar(self, toas):
"""Return the derivative of phase wrt time at the pulsar.
NOT implemented yet.
"""
raise NotImplementedError
[docs] def d_phase_d_param(self, toas, delay, param):
"""Return the derivative of phase with respect to the parameter.
This is the derivative of the phase observed at each TOA with
respect to each parameter. This is closely related to the derivative
of residuals with respect to each parameter, differing only by a
factor of the spin frequency and possibly a minus sign. See
:meth:`pint.models.timing_model.TimingModel.designmatrix` for a way
of evaluating many derivatives at once.
The calculation is done by combining the analytical derivatives
reported by all the components in the model.
Parameters
----------
toas : pint.toa.TOAs
The TOAs at which the derivative should be evaluated.
delay : astropy.units.Quantity or None
The delay at the TOAs where the derivatives should be evaluated.
This permits certain optimizations in the derivative calculations;
the value should be ``self.delay(toas)``.
param : str
The name of the parameter to differentiate with respect to.
Returns
-------
astropy.units.Quantity
The derivative of observed phase with respect to the model parameter.
"""
# TODO need to do correct chain rule stuff wrt delay derivs, etc
# Is it safe to assume that any param affecting delay only affects
# phase indirectly (and vice-versa)??
if delay is None:
delay = self.delay(toas)
par = getattr(self, param)
result = np.longdouble(np.zeros(toas.ntoas)) / par.units
phase_derivs = self.phase_deriv_funcs
if param in list(phase_derivs.keys()):
for df in phase_derivs[param]:
result += df(toas, param, delay).to(
result.unit, equivalencies=u.dimensionless_angles()
)
else:
# Apply chain rule for the parameters in the delay.
# total_phase = Phase1(delay(param)) + Phase2(delay(param))
# d_total_phase_d_param = d_Phase1/d_delay*d_delay/d_param +
# d_Phase2/d_delay*d_delay/d_param
# = (d_Phase1/d_delay + d_Phase2/d_delay) *
# d_delay_d_param
d_delay_d_p = self.d_delay_d_param(toas, param)
dpdd_result = np.longdouble(np.zeros(toas.ntoas)) / u.second
for dpddf in self.d_phase_d_delay_funcs:
dpdd_result += dpddf(toas, delay)
result = dpdd_result * d_delay_d_p
return result.to(result.unit, equivalencies=u.dimensionless_angles())
[docs] def d_delay_d_param(self, toas, param, acc_delay=None):
"""Return the derivative of delay with respect to the parameter."""
par = getattr(self, param)
result = np.longdouble(np.zeros(toas.ntoas) << (u.s / par.units))
delay_derivs = self.delay_deriv_funcs
if param not in list(delay_derivs.keys()):
raise AttributeError(
f"Derivative function for '{param}' is not provided"
f" or not registered; parameter '{param}' may not be fittable. "
)
for df in delay_derivs[param]:
result += df(toas, param, acc_delay).to(
result.unit, equivalencies=u.dimensionless_angles()
)
return result
[docs] def d_phase_d_param_num(self, toas, param, step=1e-2):
"""Return the derivative of phase with respect to the parameter.
Compute the value numerically, using a symmetric finite difference.
"""
# TODO : We need to know the range of parameter.
par = getattr(self, param)
ori_value = par.value
unit = par.units
h = 1.0 * step if ori_value == 0 else ori_value * step
parv = [par.value - h, par.value + h]
phase_i = (
np.zeros((toas.ntoas, 2), dtype=np.longdouble) * u.dimensionless_unscaled
)
phase_f = (
np.zeros((toas.ntoas, 2), dtype=np.longdouble) * u.dimensionless_unscaled
)
for ii, val in enumerate(parv):
par.value = val
ph = self.phase(toas, abs_phase=False)
phase_i[:, ii] = ph.int
phase_f[:, ii] = ph.frac
res_i = -phase_i[:, 0] + phase_i[:, 1]
res_f = -phase_f[:, 0] + phase_f[:, 1]
result = (res_i + res_f) / (2.0 * h * unit)
# shift value back to the original value
par.value = ori_value
return result
[docs] def d_delay_d_param_num(self, toas, param, step=1e-2):
"""Return the derivative of delay with respect to the parameter.
Compute the value numerically, using a symmetric finite difference.
"""
# TODO : We need to know the range of parameter.
par = getattr(self, param)
ori_value = par.value
if ori_value is None:
# A parameter did not get to use in the model
log.warning(f"Parameter '{param}' is not used by timing model.")
return np.zeros(toas.ntoas) * (u.second / par.units)
unit = par.units
h = 1.0 * step if ori_value == 0 else ori_value * step
parv = [par.value - h, par.value + h]
delay = np.zeros((toas.ntoas, 2))
for ii, val in enumerate(parv):
par.value = val
try:
delay[:, ii] = self.delay(toas)
except:
par.value = ori_value
raise
d_delay = (-delay[:, 0] + delay[:, 1]) / 2.0 / h
par.value = ori_value
return d_delay * (u.second / unit)
[docs] def d_dm_d_param(self, data, param):
"""Return the derivative of DM with respect to the parameter."""
par = getattr(self, param)
result = np.zeros(len(data)) << (u.pc / u.cm**3 / par.units)
dm_df = self.dm_derivs.get(param, None)
if dm_df is None:
if param not in self.params: # Maybe add differentiable params
raise AttributeError(f"Parameter {param} does not exist")
else:
return result
for df in dm_df:
result += df(data, param).to(
result.unit, equivalencies=u.dimensionless_angles()
)
return result
[docs] def d_toasigma_d_param(self, data, param):
"""Return the derivative of the scaled TOA uncertainty with respect to the parameter."""
par = getattr(self, param)
result = np.zeros(len(data)) << (u.s / par.units)
sigma_df = self.toasigma_derivs.get(param, None)
if sigma_df is None:
if param not in self.params: # Maybe add differentiable params
raise AttributeError(f"Parameter {param} does not exist")
else:
return result
for df in sigma_df:
result += df(data, param).to(
result.unit, equivalencies=u.dimensionless_angles()
)
return result
[docs] def designmatrix(self, toas, acc_delay=None, incfrozen=False, incoffset=True):
"""Return the design matrix.
The design matrix is the matrix with columns of ``d_phase_d_param/F0``
or ``d_toa_d_param``; it is used in fitting and calculating parameter
covariances.
The value of ``F0`` used here is the parameter value in the model.
The order of parameters that are included is that returned by
``self.params``.
Parameters
----------
toas : pint.toa.TOAs
The TOAs at which to compute the design matrix.
acc_delay
???
incfrozen : bool
Whether to include frozen parameters in the design matrix
incoffset : bool
Whether to include the constant offset in the design matrix
This option is ignored if a `PhaseOffset` component is present.
Returns
-------
M : array
The design matrix, with shape (len(toas), len(self.free_params)+1)
names : list of str
The names of parameters in the corresponding parts of the design matrix
units : astropy.units.Unit
The units of the corresponding parts of the design matrix
Notes
-----
1. We have negative sign here. Since the residuals are calculated as
(Phase - int(Phase)) in pulsar timing, which is different from the conventional
definition of least square definition (Data - model), we have decided to add
a minus sign here in the design matrix so that the fitter keeps the conventional
sign.
2. Design matrix entries can be computed only for parameters for which the
derivatives are implemented. If a parameter without a derivative is unfrozen
while calling this method, it will raise an informative error, except in the
case of unfrozen noise parameters, which are simply ignored.
"""
noise_params = self.get_params_of_component_type("NoiseComponent")
if (
not set(self.free_params)
.difference(noise_params)
.issubset(self.fittable_params)
):
free_unfittable_params = (
set(self.free_params)
.difference(noise_params)
.difference(self.fittable_params)
)
raise ValueError(
f"Cannot compute the design matrix because the following unfittable parameters "
f"were found unfrozen in the model: {free_unfittable_params}. "
f"Freeze these parameters before computing the design matrix."
)
# unfrozen_noise_params = [
# param for param in noise_params if not getattr(self, param).frozen
# ]
# The entries for any unfrozen noise parameters will not be
# included in the design matrix as they are not well-defined.
incoffset = incoffset and "PhaseOffset" not in self.components
params = ["Offset"] if incoffset else []
params += [
par
for par in self.params
if (incfrozen or not getattr(self, par).frozen) and par not in noise_params
]
F0 = self.F0.quantity # 1/sec
ntoas = len(toas)
nparams = len(params)
delay = self.delay(toas)
units = []
# Apply all delays ?
# tt = toas['tdbld']
# for df in self.delay_funcs:
# tt -= df(toas)
M = np.zeros((ntoas, nparams))
for ii, param in enumerate(params):
if param == "Offset":
M[:, ii] = 1.0 / F0.value
units.append(u.s / u.s)
else:
q = -self.d_phase_d_param(toas, delay, param)
the_unit = u.Unit("") / getattr(self, param).units
M[:, ii] = q.to_value(the_unit) / F0.value
units.append(the_unit / F0.unit)
return M, params, units
[docs] def compare(
self,
othermodel,
nodmx=True,
convertcoordinates=True,
threshold_sigma=3.0,
unc_rat_threshold=1.05,
verbosity="max",
usecolor=True,
format="text",
):
"""Print comparison with another model
Parameters
----------
othermodel
TimingModel object to compare to
nodmx : bool, optional
If True, don't print the DMX parameters in
the comparison
convertcoordinates : bool, optional
Convert coordinates from ICRS<->ECL to make models consistent
threshold_sigma : float, optional
Pulsar parameters for which diff_sigma > threshold will be printed
with an exclamation point at the end of the line
unc_rat_threshold : float, optional
Pulsar parameters for which the uncertainty has increased by a
factor of unc_rat_threshold will be printed with an asterisk at
the end of the line
verbosity : string, optional
Dictates amount of information returned. Options include "max",
"med", and "min", which have the following results:
"max" - print all lines from both models whether they are fit or not (note that nodmx will override this); DEFAULT
"med" - only print lines for parameters that are fit
"min" - only print lines for fit parameters for which diff_sigma > threshold
"check" - only print significant changes with logging.warning, not as string (note that all other modes will still print this)
usecolor : bool, optional
Use colors on the output to complement use of "!" and "*"
format : string, optional
One of "text" or "markdown"
Returns
-------
str
Human readable comparison, for printing.
Formatted as a five column table with titles of
``PARAMETER NAME | Model1 | Model2 | Diff_Sigma1 | Diff_Sigma2``
where ``ModelX`` refer to self and othermodel Timing Model objects,
and ``Diff_SigmaX`` is the difference in a given parameter as reported by the two models,
normalized by the uncertainty in model X. If model X has no reported uncertainty,
nothing will be printed.
If ``format="text"``, when either ``Diff_SigmaX`` value is greater than ``threshold_sigma``,
an exclamation point (``!``) will be appended to the line and color will be added if ``usecolor=True``. If the uncertainty in the first model
if smaller than the second, an asterisk (``*``) will be appended to the line and color will be added if ``usecolor=True``.
If ``format="markdown"`` then will be formatted as a markdown table with bold, colored, and highlighted text as appropriate.
For both output formats, warnings and info statements will be printed.
Note
----
Prints logging warnings for parameters that have changed significantly
and/or have increased in uncertainty.
Examples
--------
To use this in a Jupyter notebook with and without markdown::
>>> from pint.models import get_model
>>> import pint.logging
>>> from IPython.display import display_markdown
>>> pint.logging.setup(level="WARNING")
>>> m1 = get_model(<file1>)
>>> m2 = get_model(<file2>)
>>> print(m1.compare(m2))
>>> display_markdown(m1.compare(m2, format="markdown"), raw=True)
Make sure to use ``raw=True`` to get the markdown output in a notebook.
"""
assert verbosity.lower() in ["max", "med", "min", "check"]
verbosity = verbosity.lower()
assert format.lower() in ["text", "markdown"]
format = format.lower()
if self.name != "":
model_name = self.name.split("/")[-1]
else:
model_name = "Model 1"
if othermodel.name != "":
other_model_name = othermodel.name.split("/")[-1]
else:
other_model_name = "Model 2"
# 5 columns of the output, + a way to keep track of values/uncertainties that have changed a lot
parameter = {}
value1 = {}
value2 = {}
diff1 = {}
diff2 = {}
modifier = {}
parameter["TITLE"] = "PARAMETER"
value1["TITLE"] = model_name
value2["TITLE"] = other_model_name
diff1["TITLE"] = "Diff_Sigma1"
diff2["TITLE"] = "Diff_Sigma2"
modifier["TITLE"] = []
log.info("Comparing ephemerides for PSR %s" % self.PSR.value)
log.debug("Threshold sigma = %2.2f" % threshold_sigma)
log.debug("Threshold uncertainty ratio = %2.2f" % unc_rat_threshold)
log.debug("Creating a copy of model from %s" % other_model_name)
if verbosity == "max":
log.debug("Maximum verbosity - printing all parameters")
elif verbosity == "med":
log.debug("Medium verbosity - printing parameters that are fit")
elif verbosity == "min":
log.debug(
"Minimum verbosity - printing parameters that are fit and significantly changed"
)
elif verbosity == "check":
log.debug("Check verbosity - only warnings/info will be displayed")
othermodel = copy.deepcopy(othermodel)
if (
"POSEPOCH" in self.params
and "POSEPOCH" in othermodel.params
and self.POSEPOCH.value is not None
and othermodel.POSEPOCH.value is not None
and self.POSEPOCH.value != othermodel.POSEPOCH.value
):
log.info(
"Updating POSEPOCH in %s to match %s" % (other_model_name, model_name)
)
othermodel.change_posepoch(self.POSEPOCH.value)
if (
"PEPOCH" in self.params
and "PEPOCH" in othermodel.params
and self.PEPOCH.value is not None
and self.PEPOCH.value != othermodel.PEPOCH.value
):
log.info(
"Updating PEPOCH in %s to match %s" % (other_model_name, model_name)
)
othermodel.change_pepoch(self.PEPOCH.value)
if (
"DMEPOCH" in self.params
and "DMEPOCH" in othermodel.params
and self.DMEPOCH.value is not None
and self.DMEPOCH.value != othermodel.DMEPOCH.value
):
log.info(
"Updating DMEPOCH in %s to match %s" % (other_model_name, model_name)
)
othermodel.change_dmepoch(self.DMEPOCH.value)
if (
self.BINARY.value is not None
and othermodel.BINARY.value is not None
and self.BINARY.value == othermodel.BINARY.value
):
log.info(
"Updating binary epoch (T0 or TASC) in %s to match %s"
% (other_model_name, model_name)
)
if (
"T0" in self
and "T0" in othermodel
and self.T0.value is not None
and othermodel.T0.value is not None
and self.T0.value != othermodel.T0.value
):
othermodel.change_binary_epoch(self.T0.quantity)
elif (
"TASC" in self
and "TASC" in othermodel
and self.TASC.value is not None
and othermodel.TASC.value is not None
and self.TASC.value != othermodel.TASC.value
):
othermodel.change_binary_epoch(self.TASC.quantity)
if (
"AstrometryEquatorial" in self.components
and "AstrometryEcliptic" in othermodel.components
):
if convertcoordinates:
log.warning(f"Converting {other_model_name} from ECL to ICRS")
othermodel = othermodel.as_ICRS()
else:
log.warning(
f"{model_name} is in ICRS coordinates but {other_model_name} is in ECL coordinates and convertcoordinates=False"
)
elif (
"AstrometryEcliptic" in self.components
and "AstrometryEquatorial" in othermodel.components
):
if convertcoordinates:
log.warning(
f"Converting {other_model_name} from ICRS to ECL({self.ECL.value})"
)
othermodel = othermodel.as_ECL(ecl=self.ECL.value)
else:
log.warning(
f"{model_name} is in ECL({self.ECL.value}) coordinates but {other_model_name} is in ICRS coordinates and convertcoordinates=False"
)
for pn in self.params:
par = getattr(self, pn)
if par.value is None:
continue
try:
otherpar = getattr(othermodel, pn)
except AttributeError:
otherpar = None
if isinstance(par, strParameter):
parameter[pn] = str(pn)
value1[pn] = str(par.value)
if otherpar is not None and otherpar.value is not None:
value2[pn] = str(otherpar.value)
else:
value2[pn] = "Missing"
diff1[pn] = ""
diff2[pn] = ""
modifier[pn] = []
elif isinstance(par, AngleParameter):
if par.frozen:
# If not fitted, just print both values
parameter[pn] = str(pn)
value1[pn] = str(par.quantity)
if otherpar is not None and otherpar.quantity is not None:
value2[pn] = str(otherpar.quantity)
if otherpar.quantity != par.quantity:
log.info(
"Parameter %s not fit, but has changed between these models"
% par.name
)
else:
value2[pn] = "Missing"
diff1[pn] = ""
diff2[pn] = ""
modifier[pn] = []
else:
# If fitted, print both values with uncertainties
if par.units == u.hourangle:
uncertainty_unit = pint.hourangle_second
else:
uncertainty_unit = u.arcsec
parameter[pn] = pn
modifier[pn] = []
value1[pn] = "{:>16s} +/- {:7.2g}".format(
str(par.quantity),
par.uncertainty.to_value(uncertainty_unit),
)
if otherpar is not None:
if otherpar.uncertainty is not None:
value2[pn] = "{:>16s} +/- {:7.2g}".format(
str(otherpar.quantity),
otherpar.uncertainty.to_value(uncertainty_unit),
)
else:
# otherpar must have no uncertainty
if otherpar.quantity is not None:
value2[pn] = "{:>s}".format(str(otherpar.quantity))
else:
value2[pn] = "Missing"
else:
value2[pn] = "Missing"
diff1[pn] = ""
diff2[pn] = ""
if otherpar is not None and otherpar.quantity is not None:
diff = otherpar.quantity - par.quantity
if par.uncertainty is not None:
diff_sigma = (diff / par.uncertainty).decompose()
else:
diff_sigma = np.inf
if abs(diff_sigma) != np.inf:
diff1[pn] = "{:>10.2f}".format(diff_sigma)
if abs(diff_sigma) > threshold_sigma:
modifier[pn].append("diff1")
else:
diff1[pn] = ""
if otherpar.uncertainty is not None:
diff_sigma2 = (diff / otherpar.uncertainty).decompose()
else:
diff_sigma2 = np.inf
if abs(diff_sigma2) != np.inf:
diff2[pn] = "{:>10.2f}".format(diff_sigma2)
if abs(diff_sigma2) > threshold_sigma:
modifier[pn].append("diff2")
else:
diff2[pn] = ""
if (
otherpar is not None
and par.uncertainty is not None
and otherpar.uncertainty is not None
and (unc_rat_threshold * par.uncertainty < otherpar.uncertainty)
):
modifier[pn].append("unc_rat")
else:
# Assume numerical parameter
if nodmx and pn.startswith("DMX"):
continue
parameter[pn] = str(pn)
modifier[pn] = []
if par.frozen:
# If not fitted, just print both values
value1[pn] = str(par.value)
diff1[pn] = ""
diff2[pn] = ""
if otherpar is not None and otherpar.value is not None:
if otherpar.uncertainty is not None:
value2[pn] = "{:SP}".format(
ufloat(otherpar.value, otherpar.uncertainty.value)
)
else:
value2[pn] = str(otherpar.value)
if otherpar.value != par.value:
if par.name in ["START", "FINISH", "CHI2", "CHI2R", "NTOA"]:
if verbosity == "max":
log.info(
"Parameter %s has changed between these models"
% par.name
)
elif isinstance(par, boolParameter):
if otherpar.value is True:
status = "ON"
else:
status = "OFF"
log.info(
"Parameter %s has changed between these models (turned %s in %s)"
% (par.name, status, other_model_name)
)
else:
log.warning(
"Parameter %s not fit, but has changed between these models"
% par.name
)
modifier[pn].append("change")
if (
par.uncertainty is not None
and otherpar.uncertainty is not None
and (
par.uncertainty * unc_rat_threshold
< otherpar.uncertainty
)
):
modifier[pn].append("unc_rat")
else:
value2[pn] = "Missing"
else:
# If fitted, print both values with uncertainties
if par.uncertainty is not None:
value1[pn] = "{:SP}".format(
ufloat(par.value, par.uncertainty.value)
)
else:
value1[pn] = str(par.value)
if otherpar is not None and otherpar.value is not None:
if otherpar.uncertainty is not None:
value2[pn] = "{:SP}".format(
ufloat(otherpar.value, otherpar.uncertainty.value)
)
else:
# otherpar must have no uncertainty
if otherpar.value is not None:
value2[pn] = str(otherpar.value)
else:
value2[pn] = "Missing"
else:
value2[pn] = "Missing"
if value2[pn] == "Missing":
log.info(
"Parameter %s missing from %s"
% (par.name, other_model_name)
)
if value1[pn] == "Missing":
log.info(
"Parameter %s missing from %s" % (par.name, model_name)
)
if otherpar is not None and otherpar.value is not None:
diff = otherpar.value - par.value
diff_sigma = diff / par.uncertainty.value
if abs(diff_sigma) != np.inf:
diff1[pn] = "{:>10.2f}".format(diff_sigma)
if abs(diff_sigma) > threshold_sigma:
modifier[pn].append("diff1")
else:
diff1[pn] = ""
if otherpar.uncertainty is not None:
diff_sigma2 = diff / otherpar.uncertainty.value
if abs(diff_sigma2) != np.inf:
diff2[pn] = "{:>10.2f}".format(diff_sigma2)
if abs(diff_sigma2) > threshold_sigma:
modifier[pn].append("diff2")
else:
diff2[pn] = ""
else:
diff2[pn] = ""
if (
par.uncertainty is not None
and otherpar.uncertainty is not None
and (
par.uncertainty * unc_rat_threshold
< otherpar.uncertainty
)
):
modifier[pn].append("unc_rat")
else:
diff1[pn] = ""
diff2[pn] = ""
if "diff1" in modifier[pn] and not par.frozen:
log.warning(
"Parameter %s has changed significantly (%s sigma1)"
% (parameter[pn], diff1[pn])
)
if "diff2" in modifier[pn] and not par.frozen:
log.warning(
"Parameter %s has changed significantly (%s sigma2)"
% (parameter[pn], diff2[pn])
)
if "unc_rat" in modifier[pn]:
log.warning(
"Uncertainty on parameter %s has increased (unc2/unc1 = %2.2f)"
% (parameter[pn], float(otherpar.uncertainty / par.uncertainty))
)
# Now print any parameters in othermodel that were missing in self.
mypn = self.params
for opn in othermodel.params:
if opn in mypn and getattr(self, opn).value is not None:
continue
if nodmx and opn.startswith("DMX"):
continue
try:
otherpar = getattr(othermodel, opn)
except AttributeError:
otherpar = None
if otherpar.value is None:
continue
log.info("Parameter %s missing from %s" % (opn, model_name))
if verbosity == "max":
parameter[opn] = str(opn)
value1[opn] = "Missing"
value2[opn] = str(otherpar.quantity)
diff1[opn] = ""
diff2[opn] = ""
modifier[opn] = []
separation = self.get_psr_coords().separation(othermodel.get_psr_coords())
pn = "SEPARATION"
parameter[pn] = "SEPARATION"
if separation < 60 * u.arcsec:
value1[pn] = "{:>f} arcsec".format(separation.arcsec)
elif separation < 60 * u.arcmin:
value1[pn] = "{:>f} arcmin".format(separation.arcmin)
else:
value1[pn] = "{:>f} deg".format(separation.deg)
value2[pn] = ""
diff1[pn] = ""
diff2[pn] = ""
modifier[pn] = []
s = []
pad = 2
longest_parameter = len(max(parameter.values(), key=len))
longest_value1 = len(max(value1.values(), key=len))
longest_value2 = len(max(value2.values(), key=len))
longest_diff1 = len(max(diff1.values(), key=len))
longest_diff2 = len(max(diff2.values(), key=len))
param = "TITLE"
if format == "text":
p = parameter[param]
v1 = value1[param]
v2 = value2[param]
d1 = diff1[param]
d2 = diff2[param]
s.append(
f"{p:<{longest_parameter+pad}} {v1:>{longest_value1+pad}} {v2:>{longest_value2+pad}} {d1:>{longest_diff1+pad}} {d2:>{longest_diff2+pad}}"
)
p = "-" * longest_parameter
v1 = "-" * longest_value1
v2 = "-" * longest_value2
d1 = "-" * longest_diff1
d2 = "-" * longest_diff2
s.append(
f"{p:<{longest_parameter+pad}} {v1:>{longest_value1+pad}} {v2:>{longest_value2+pad}} {d1:>{longest_diff1+pad}} {d2:>{longest_diff2+pad}}"
)
elif format == "markdown":
p = parameter[param]
v1 = value1[param]
v2 = value2[param]
d1 = diff1[param]
d2 = diff2[param]
s.append(f"| {p} | {v1} | {v2} | {d1} | {d2} |")
s.append(f" :--- | ---: | ---: | ---: | ---: |")
for param in parameter:
if param == "TITLE":
continue
p = parameter[param]
v1 = value1[param]
v2 = value2[param]
d1 = diff1[param]
d2 = diff2[param]
m = modifier[param]
if format == "text":
sout = f"{p:<{longest_parameter+pad}} {v1:>{longest_value1+pad}} {v2:>{longest_value2+pad}} {d1:>{longest_diff1+pad}} {d2:>{longest_diff2+pad}}"
if "change" in m or "diff1" in m or "diff2" in m:
sout += " !"
if "unc_rat" in m:
sout += " *"
if usecolor:
if (
"change" in m
or "diff1" in m
or "diff2" in m
and not "unc_rat" in m
):
sout = colorize(sout, "red")
elif (
"change" in m or "diff1" in m or "diff2" in m and "unc_rat" in m
):
sout = colorize(sout, "red", bg_color="green")
elif "unc_rat" in m:
sout = colorize(sout, bg_color="green")
elif format == "markdown":
sout = [p.strip(), v1.strip(), v2.strip(), d1.strip(), d2.strip()]
if "change" in m or "diff1" in m or "diff2" in m:
sout = [
f"<span style='color:red'>**{x}**</span>" if len(x) > 0 else x
for x in sout
]
if "unc_rat" in m:
sout = [f"<mark>{x}</mark>" if len(x) > 0 else x for x in sout]
sout = " | ".join(sout).strip()
if verbosity == "max":
s.append(sout)
elif verbosity == "med" and len(d1) > 0:
# not frozen so has uncertainty
s.append(sout)
elif verbosity == "min" and len(m) > 0:
# has a modifier
s.append(sout)
if verbosity != "check":
return "\n".join(s)
[docs] def use_aliases(self, reset_to_default=True, alias_translation=None):
"""Set the parameters to use aliases as specified upon writing.
Parameters
----------
reset_to_default : bool
If True, forget what name was used for each parameter in the input par file.
alias_translation : dict or None
If not None, use this to map PINT parameter names to output names. This overrides
input names even if they are not otherwise being reset to default.
This is to allow compatibility with TEMPO/TEMPO2. The dictionary
``pint.toa.tempo_aliases`` should provide a reasonable selection.
"""
for p in self.params:
po = getattr(self, p)
if reset_to_default:
po.use_alias = None
if alias_translation is not None:
if hasattr(po, "origin_name"):
try:
po.use_alias = alias_translation[po.origin_name]
except KeyError:
pass
else:
try:
po.use_alias = alias_translation[p]
except KeyError:
pass
[docs] def as_parfile(
self,
start_order=["astrometry", "spindown", "dispersion"],
last_order=["jump_delay"],
*,
include_info=True,
comment=None,
format="pint",
):
"""Represent the entire model as a parfile string.
See also :func:`pint.models.TimingModel.write_parfile`.
Parameters
----------
start_order : list
Categories to include at the beginning
last_order : list
Categories to include at the end
include_info : bool, optional
Include information string if True
comment : str, optional
Additional comment string to include in parfile
format : str, optional
Parfile output format. PINT outputs in 'tempo', 'tempo2' and 'pint'
formats. The defaul format is `pint`.
"""
if not format.lower() in _parfile_formats:
raise ValueError(f"parfile format must be one of {_parfile_formats}")
self.validate()
if include_info:
info_string = pint.utils.info_string(prefix_string="# ", comment=comment)
info_string += f"\n# Format: {format.lower()}"
result_begin = info_string + "\n"
else:
result_begin = ""
result_end = ""
result_middle = ""
cates_comp = self.get_components_by_category()
printed_cate = []
# make sure TEMPO2 format start with "MODE 1"
if format.lower() == "tempo2":
result_begin += "MODE 1\n"
for p in self.top_level_params:
if p == "BINARY": # Will print the Binary model name in the binary section
continue
result_begin += getattr(self, p).as_parfile_line(format=format)
for cat in start_order:
if cat in list(cates_comp.keys()):
# print("Starting: %s" % cat)
cp = cates_comp[cat]
for cpp in cp:
result_begin += cpp.print_par(format=format)
printed_cate.append(cat)
else:
continue
for cat in last_order:
if cat in list(cates_comp.keys()):
# print("Ending: %s" % cat)
cp = cates_comp[cat]
for cpp in cp:
result_end += cpp.print_par(format=format)
printed_cate.append(cat)
else:
continue
for cat in list(cates_comp.keys()):
if cat in printed_cate:
continue
else:
cp = cates_comp[cat]
for cpp in cp:
result_middle += cpp.print_par(format=format)
printed_cate.append(cat)
return result_begin + result_middle + result_end
[docs] def write_parfile(
self,
filename,
start_order=["astrometry", "spindown", "dispersion"],
last_order=["jump_delay"],
*,
include_info=True,
comment=None,
format="pint",
):
"""Write the entire model as a parfile.
See also :func:`pint.models.TimingModel.as_parfile`.
Parameters
----------
filename : str or Path or file-like
The destination to write the parfile to
start_order : list
Categories to include at the beginning
last_order : list
Categories to include at the end
include_info : bool, optional
Include information string if True
comment : str, optional
Additional comment string to include in parfile
format : str, optional
Parfile output format. PINT outputs in 'tempo', 'tempo2' and 'pint'
formats. The defaul format is `pint`.
"""
with open_or_use(filename, "wt") as f:
f.write(
self.as_parfile(
start_order=start_order,
last_order=last_order,
include_info=include_info,
comment=comment,
format=format,
)
)
[docs] def validate_toas(self, toas):
"""Sanity check to verify that this model is compatible with these toas.
This checks that where this model needs TOAs to constrain parameters,
that there is at least one TOA. This includes checking that every DMX
range for with the DMX is free has at least one TOA, and it verifies
that each "mask parameter" (for example JUMP) corresponds to at least one
TOA.
Individual components can implement a ``validate_toas`` method; this
method will automatically call such a method on each component that has
one.
If some TOAs are missing, this method will raise a MissingTOAError that
lists some (at least one) of the problem parameters.
"""
bad_parameters = []
for maskpar in self.get_params_of_type_top("maskParameter"):
par = getattr(self, maskpar)
if par.frozen:
continue
if len(par.select_toa_mask(toas)) == 0:
bad_parameters.append(f"'{maskpar}, {par.key}, {par.key_value}'")
for c in self.components.values():
try:
c.validate_toas(toas)
except MissingTOAs as e:
bad_parameters += e.parameter_names
if bad_parameters:
raise MissingTOAs(bad_parameters)
[docs] def find_empty_masks(self, toas, freeze=False):
"""Find unfrozen mask parameters with no TOAs before trying to fit
Parameters
----------
toas : pint.toa.TOAs
freeze : bool, optional
Should the parameters with on TOAs be frozen
Returns
-------
list
Parameters with no TOAs
"""
bad_parameters = []
for maskpar in self.get_params_of_type_top("maskParameter"):
par = getattr(self, maskpar)
if par.frozen:
continue
if len(par.select_toa_mask(toas)) == 0:
bad_parameters.append(maskpar)
if freeze:
log.info(f"'{maskpar}' has no TOAs so freezing")
getattr(self, maskpar).frozen = True
for prefix in ["DM", "SW"]:
mapping = pint.utils.xxxselections(self, toas, prefix=prefix)
for k in mapping:
if len(mapping[k]) == 0:
if freeze:
log.info(f"'{k}' has no TOAs so freezing")
getattr(self, k).frozen = True
bad_parameters.append(k)
return bad_parameters
[docs] def setup(self):
"""Run setup methods on all components."""
for cp in self.components.values():
cp.setup()
def __contains__(self, name):
return name in self.params
def __getitem__(self, name):
if name in self.top_level_params:
return getattr(self, name)
for cp in self.components.values():
if name in cp.params:
return getattr(cp, name)
raise KeyError(f"TimingModel does not have parameter {name}")
def __setitem__(self, name, value):
# FIXME: This could be the right way to add Parameters?
raise NotImplementedError
def keys(self):
return self.params
def items(self):
return [(p, self[p]) for p in self.params]
def __len__(self):
return len(self.params)
def __iter__(self):
for p in self.params:
yield p
[docs] def as_ECL(self, epoch=None, ecl="IERS2010"):
"""Return TimingModel in PulsarEcliptic frame.
Parameters
----------
epoch : float or astropy.time.Time or astropy.units.Quantity, optional
If float or Quantity, MJD(TDB) is assumed.
New epoch for position.
ecl : str, optional
Obliquity for PulsarEcliptic frame
Returns
-------
pint.models.timing_model.TimingModel
In PulsarEcliptic frame
Notes
-----
For the ``DDK`` model, the ``KOM`` vector is also transformed
"""
if "AstrometryEquatorial" in self.components:
astrometry_model_type = "AstrometryEquatorial"
elif "AstrometryEcliptic" in self.components:
astrometry_model_type = "AstrometryEcliptic"
astrometry_model_component = self.components[astrometry_model_type]
new_astrometry_model_component = astrometry_model_component.as_ECL(
epoch=epoch, ecl=ecl
)
new_model = copy.deepcopy(self)
new_model.remove_component(astrometry_model_type)
new_model.add_component(new_astrometry_model_component)
if "BinaryDDK" in self.components and "AstrometryEquatorial" in self.components:
c = coords.SkyCoord(
lon=new_model.ELONG.quantity,
lat=new_model.ELAT.quantity,
obstime=self.POSEPOCH.quantity,
pm_lon_coslat=np.sin(self.KOM.quantity) * u.mas / u.yr,
pm_lat=np.cos(self.KOM.quantity) * u.mas / u.yr,
obliquity=OBL[new_model.ECL.value],
frame=PulsarEcliptic,
)
c_ICRS = c.transform_to(coords.ICRS)
new_model.KOM.quantity = (
np.arctan2(c_ICRS.pm_ra_cosdec.value, c_ICRS.pm_dec.value) * u.rad
).to(self.KOM.units)
return new_model
[docs] def as_ICRS(self, epoch=None):
"""Return TimingModel in ICRS frame.
Parameters
----------
epoch : float or astropy.time.Time or astropy.units.Quantity, optional
If float or Quantity, MJD(TDB) is assumed.
New epoch for position.
Returns
-------
pint.models.timing_model.TimingModel
In ICRS frame
Notes
-----
For the ``DDK`` model, the ``KOM`` vector is also transformed
"""
if "AstrometryEquatorial" in self.components:
astrometry_model_type = "AstrometryEquatorial"
elif "AstrometryEcliptic" in self.components:
astrometry_model_type = "AstrometryEcliptic"
astrometry_model_component = self.components[astrometry_model_type]
new_astrometry_model_component = astrometry_model_component.as_ICRS(epoch=epoch)
new_model = copy.deepcopy(self)
new_model.remove_component(astrometry_model_type)
new_model.add_component(new_astrometry_model_component)
if "BinaryDDK" in self.components and "AstrometryEcliptic" in self.components:
c = coords.SkyCoord(
ra=new_model.RAJ.quantity,
dec=new_model.DECJ.quantity,
obstime=self.POSEPOCH.quantity,
pm_ra_cosdec=np.sin(self.KOM.quantity) * u.mas / u.yr,
pm_dec=np.cos(self.KOM.quantity) * u.mas / u.yr,
frame=coords.ICRS,
)
c_ECL = c.transform_to(PulsarEcliptic(ecl=self.ECL.value))
new_model.KOM.quantity = (
np.arctan2(c_ECL.pm_lon_coslat.value, c_ECL.pm_lat.value) * u.rad
).to(self.KOM.units)
return new_model
[docs] def get_derived_params(self, rms=None, ntoas=None, returndict=False):
"""Return a string with various derived parameters from the fitted model
Parameters
----------
rms : astropy.units.Quantity, optional
RMS of fit for checking ELL1 validity
ntoas : int, optional
Number of TOAs for checking ELL1 validity
returndict : bool, optional
Whether to only return the string of results or also a dictionary
Returns
-------
results : str
parameters : dict, optional
"""
import uncertainties.umath as um
from uncertainties import ufloat
outdict = {}
# Now print some useful derived parameters
s = "Derived Parameters:\n"
if hasattr(self, "F0"):
F0 = self.F0.as_ufloat()
p = 1 / F0
s += f"Period = {p:P} s\n"
outdict["P (s)"] = p
if hasattr(self, "F1"):
F1 = self.F1.as_ufloat()
pdot = -F1 / F0**2
outdict["Pdot (s/s)"] = pdot
s += f"Pdot = {pdot:P}\n"
if self.F1.value < 0.0: # spinning-down
brakingindex = 3
s += f"Characteristic age = {pint.derived_quantities.pulsar_age(self.F0.quantity, self.F1.quantity, n=brakingindex):.4g} (braking index = {brakingindex})\n"
s += f"Surface magnetic field = {pint.derived_quantities.pulsar_B(self.F0.quantity, self.F1.quantity):.3g}\n"
s += f"Magnetic field at light cylinder = {pint.derived_quantities.pulsar_B_lightcyl(self.F0.quantity, self.F1.quantity):.4g}\n"
I_NS = I = 1.0e45 * u.g * u.cm**2
s += f"Spindown Edot = {pint.derived_quantities.pulsar_edot(self.F0.quantity, self.F1.quantity, I=I_NS):.4g} (I={I_NS})\n"
outdict["age"] = pint.derived_quantities.pulsar_age(
self.F0.quantity, self.F1.quantity, n=brakingindex
)
outdict["B"] = pint.derived_quantities.pulsar_B(
self.F0.quantity, self.F1.quantity
)
outdict["Blc"] = pint.derived_quantities.pulsar_B_lightcyl(
self.F0.quantity, self.F1.quantity
)
outdict["Edot"] = pint.derived_quantities.pulsar_B_lightcyl(
self.F0.quantity, self.F1.quantity
)
else:
s += "Not computing Age, B, or Edot since F1 > 0.0\n"
if hasattr(self, "PX") and not self.PX.frozen:
s += "\n"
px = self.PX.as_ufloat(u.arcsec)
s += f"Parallax distance = {1.0/px:.3uP} pc\n"
outdict["Dist (pc)"] = 1.0 / px
# Now binary system derived parameters
if self.is_binary:
for x in self.components:
if x.startswith("Binary"):
binary = x
s += f"\nBinary model {binary}\n"
outdict["Binary"] = binary
btx = False
if (
hasattr(self, "FB0")
and self.FB0.quantity is not None
and self.FB0.value != 0.0
):
btx = True
pb = 1 / self.FB0.as_ufloat(1 / u.d)
s += f"Orbital Period (PB) = {pb:P} (d)\n"
else:
pb = self.PB.as_ufloat(u.d)
outdict["PB (d)"] = pb
pbdot = None
if (
hasattr(self, "FB1")
and self.FB1.quantity is not None
and self.FB1.value != 0.0
):
pbdot = -self.FB1.as_ufloat(u.Hz / u.s) / self.FB0.as_ufloat(u.Hz) ** 2
elif (
hasattr(self, "PBDOT")
and self.PBDOT.quantity is not None
and self.PBDOT.value != 0
):
pbdot = self.PBDOT.as_ufloat(u.s / u.s)
if pbdot is not None:
s += f"Orbital Pdot (PBDOT) = {pbdot:P} (s/s)\n"
outdict["PBDOT (s/s)"] = pbdot
ell1 = False
if binary.startswith("BinaryELL1"):
ell1 = True
eps1 = self.EPS1.as_ufloat()
eps2 = self.EPS2.as_ufloat()
tasc = ufloat(
# This is a time in MJD
self.TASC.quantity.mjd,
(
self.TASC.uncertainty.to(u.d).value
if self.TASC.uncertainty is not None
else 0
),
)
s += "Conversion from ELL1 parameters:\n"
ecc = um.sqrt(eps1**2 + eps2**2)
s += "ECC = {:P}\n".format(ecc)
outdict["ECC"] = ecc
om = um.atan2(eps1, eps2) * 180.0 / np.pi
if om < 0.0:
om += 360.0
s += f"OM = {om:P} deg\n"
outdict["OM (deg)"] = om
t0 = tasc + pb * om / 360.0
s += f"T0 = {t0:SP}\n"
outdict["T0"] = t0
a1 = self.A1.quantity if self.A1.quantity is not None else 0 * pint.ls
if rms is not None and ntoas is not None:
s += pint.utils.ELL1_check(
a1,
ecc.nominal_value * u.s / u.s,
rms,
ntoas,
outstring=True,
)
s += "\n"
# Masses and inclination
if not self.A1.frozen:
a1 = self.A1.as_ufloat(pint.ls)
# This is the mass function, done explicitly so that we get
# uncertainty propagation automatically.
# TODO: derived quantities funcs should take uncertainties
fm = 4.0 * np.pi**2 * a1**3 / (4.925490947e-6 * (pb * 86400) ** 2)
s += f"Mass function = {fm:SP} Msun\n"
outdict["Mass Function (Msun)"] = fm
mcmed = pint.derived_quantities.companion_mass(
pb.n * u.d,
self.A1.quantity,
i=60.0 * u.deg,
mp=1.4 * u.solMass,
)
mcmin = pint.derived_quantities.companion_mass(
pb.n * u.d,
self.A1.quantity,
i=90.0 * u.deg,
mp=1.4 * u.solMass,
)
s += f"Min / Median Companion mass (assuming Mpsr = 1.4 Msun) = {mcmin.value:.4f} / {mcmed.value:.4f} Msun\n"
outdict["Mc,med (Msun)"] = mcmed.value
outdict["Mc,min (Msun)"] = mcmin.value
if (
hasattr(self, "OMDOT")
and self.OMDOT.quantity is not None
and self.OMDOT.value != 0.0
):
omdot = self.OMDOT.as_ufloat(u.rad / u.s)
e = ecc if ell1 else self.ECC.as_ufloat()
mt = (
(
omdot
/ (
3
* (c.G * u.Msun / c.c**3).to_value(u.s) ** (2.0 / 3)
* ((pb * 86400 / 2 / np.pi)) ** (-5.0 / 3)
* (1 - e**2) ** -1
)
)
) ** (3.0 / 2)
s += f"Total mass, assuming GR, from OMDOT is {mt:SP} Msun\n"
outdict["Mtot (Msun)"] = mt
if (
hasattr(self, "SINI")
and self.SINI.quantity is not None
and (self.SINI.value >= 0.0 and self.SINI.value < 1.0)
):
with contextlib.suppress(TypeError, ValueError):
# Put this in a try in case SINI is UNSET or an illegal value
if not self.SINI.frozen:
si = self.SINI.as_ufloat()
s += f"From SINI in model:\n"
s += f" cos(i) = {um.sqrt(1 - si**2):SP}\n"
s += f" i = {um.asin(si) * 180.0 / np.pi:SP} deg\n"
psrmass = pint.derived_quantities.pulsar_mass(
pb.n * u.d,
self.A1.quantity,
self.M2.quantity,
np.arcsin(self.SINI.quantity),
)
s += f"Pulsar mass (Shapiro Delay) = {psrmass}"
outdict["Mp (Msun)"] = psrmass
if not returndict:
return s
return s, outdict
[docs]class Component(metaclass=ModelMeta):
"""Timing model components.
When such a class is defined, it registers itself in
``Component.component_types`` so that it can be found and used
when parsing par files.
Note that classes are registered when their modules are imported,
so ensure all classes of interest are imported before this list
is checked.
These objects can be constructed with no particular values, but
their `.setup()` and `.validate()` methods should be called
before using them to compute anything. These should check
parameter values for validity, raising an exception if
invalid parameter values are chosen.
"""
component_types = {}
def __init__(self):
self.params = []
self._parent = None
self.deriv_funcs = {}
self.component_special_params = []
def __repr__(self):
return "{}(\n {})".format(
self.__class__.__name__,
",\n ".join(
str(getattr(self, p))
for p in self.params
if not isinstance(p, funcParameter)
),
)
[docs] def setup(self):
"""Finalize construction loaded values."""
pass
[docs] def validate(self):
"""Validate loaded values."""
pass
[docs] def validate_toas(self, toas):
"""Check that this model component has TOAs where needed."""
pass
@property_exists
def category(self):
"""Category is a feature the class, so delegate."""
return self.__class__.category
@property_exists
def free_params_component(self):
"""Return the free parameters in the component.
This function collects the non-frozen parameters.
Returns
-------
A list of free parameters.
"""
free_param = []
for p in self.params:
par = getattr(self, p)
if not par.frozen:
free_param.append(p)
return free_param
@property_exists
def param_prefixs(self):
prefixs = {}
for p in self.params:
par = getattr(self, p)
if par.is_prefix:
if par.prefix not in prefixs.keys():
prefixs[par.prefix] = [p]
else:
prefixs[par.prefix].append(p)
return prefixs
@property_exists
def aliases_map(self):
"""Return all the aliases and map to the PINT parameter name.
This property returns a dictionary from the current in timing model
parameters' aliase to the pint defined parameter names. For the aliases
of a prefixed parameter, the aliase with an existing prefix index maps
to the PINT defined parameter name with the same index. Behind the scenes,
the indexed parameter adds the indexed aliase to its aliase list.
"""
ali_map = {}
for p in self.params:
par = getattr(self, p)
ali_map[p] = p
for ali in par.aliases:
ali_map[ali] = p
return ali_map
[docs] def add_param(self, param, deriv_func=None, setup=False):
"""Add a parameter to the Component.
The parameter is stored in an attribute on the Component object.
Its name is also recorded in a list, ``self.params``.
Parameters
----------
param : pint.models.Parameter
The parameter to be added.
deriv_func: function
Derivative function for parameter.
"""
# This is the case for add "JUMP" like parameters, It will add an
# index to the parameter name for avoding the conflicts
# TODO: this is a work around in the current system, but it will be
# optimized in the future release.
if isinstance(param, maskParameter):
# TODO, right now maskParameter add index to parameter name by
# default. But This is should be optimized. In the future versions,
# it will change.
# First get prefix and index from input parameter name
try:
prefix, idx_str, idx = split_prefixed_name(param.name)
except PrefixError:
prefix = param.name
idx = 1
# Check existing prefix
prefix_map = self.get_prefix_mapping_component(prefix)
exist_par_name = prefix_map.get(idx, None)
# Check if parameter value has been set.
if exist_par_name and getattr(self, exist_par_name).value is not None:
idx = max(list(prefix_map.keys())) + 1
# TODO here we have an assumption that maskParameter follow the
# convention of name + no_leading_zero_index
param.name = prefix + str(idx)
param.index = idx
if hasattr(self, f"{prefix}1"):
param.description = getattr(self, f"{prefix}1").description
# A more general check
if param.name in self.params:
exist_par = getattr(self, param.name)
if exist_par.value is not None:
raise ValueError(
"Tried to add a second parameter called {}. "
"Old value: {} New value: {}".format(
param.name, getattr(self, param.name), param
)
)
else:
setattr(self, param.name, param)
else: # When parameter not in the params list, we also need to add it.
setattr(self, param.name, param)
self.params.append(param.name)
# Adding parameters to an existing model sometimes need to run setup()
# function again.
if setup:
self.setup()
if deriv_func is not None:
self.register_deriv_funcs(deriv_func, param.name)
param._parent = self
[docs] def remove_param(self, param):
"""Remove a parameter from the Component.
Parameters
----------
param : str or pint.models.Parameter
The parameter to remove.
"""
if isinstance(param, str):
param_name = param
else:
param_name = param.name
if param_name not in self.params:
raise ValueError(
f"Tried to remove parameter {param_name} but it is not listed: {self.params}"
)
self.params.remove(param_name)
par = getattr(self, param_name)
all_names = [param] + par.aliases
if param in self.component_special_params:
for pn in all_names:
self.component_special_params.remove(pn)
delattr(self, param)
def set_special_params(self, spcl_params):
als = []
for p in spcl_params:
als += getattr(self, p).aliases
spcl_params += als
for sp in spcl_params:
if sp not in self.component_special_params:
self.component_special_params.append(sp)
[docs] def param_help(self):
"""Print help lines for all available parameters in model."""
s = "Available parameters for %s\n" % self.__class__
for par in self.params:
s += "%s\n" % getattr(self, par).help_line()
return s
[docs] def get_params_of_type(self, param_type):
"""Get all the parameters in timing model for one specific type."""
result = []
for p in self.params:
par = getattr(self, p)
par_type = type(par).__name__
par_prefix = par_type[:-9]
if (
param_type.upper() == par_type.upper()
or param_type.upper() == par_prefix.upper()
):
result.append(par.name)
return result
[docs] def get_prefix_mapping_component(self, prefix):
"""Get the index mapping for the prefix parameters.
Parameters
----------
prefix : str
Name of prefix.
Returns
-------
dict
A dictionary with prefix parameter real index as key and parameter
name as value.
"""
parnames = [x for x in self.params if x.startswith(prefix)]
mapping = {}
for parname in parnames:
par = getattr(self, parname)
if par.is_prefix and par.prefix == prefix:
mapping[par.index] = parname
return OrderedDict(sorted(mapping.items()))
[docs] def match_param_aliases(self, alias):
"""Return the parameter corresponding to this alias.
Parameters
----------
alias: str
Alias name.
Note
----
This function only searches the parameter aliases within the current
component. If one wants to search the aliases in the scope of TimingModel,
please use :py:meth:`TimingModel.match_param_aliase`.
"""
pname = self.aliases_map.get(alias, None)
# Split the alias prefix, see if it is a perfix alias
try:
prefix, idx_str, idx = split_prefixed_name(alias)
except PrefixError: # Not a prefixed name
if pname is not None:
par = getattr(self, pname)
if par.is_prefix:
raise UnknownParameter(
f"Prefix {alias} maps to mulitple parameters"
". Please specify the index as well."
)
else:
# Not a prefix, not an alias
raise UnknownParameter(f"Unknown parameter name or alias {alias}")
# When the alias is a prefixed name but not in the parameter list yet
if pname is None:
prefix_pname = self.aliases_map.get(prefix, None)
if prefix_pname:
par = getattr(self, prefix_pname)
if par.is_prefix:
raise UnknownParameter(
f"Found a similar prefixed parameter '{prefix_pname}'"
f" But parameter {par.prefix}{idx} need to be added"
f" to the model."
)
else:
raise UnknownParameter(
f"{par} is not a prefixed parameter, howere the input"
f" {alias} has index with it."
)
else:
raise UnknownParameter(f"Unknown parameter name or alias {alias}")
else:
return pname
[docs] def register_deriv_funcs(self, func, param):
"""Register the derivative function in to the deriv_func dictionaries.
Parameters
----------
func : callable
Calculates the derivative
param : str
Name of parameter the derivative is with respect to
"""
pn = self.match_param_aliases(param)
if pn not in list(self.deriv_funcs.keys()):
self.deriv_funcs[pn] = [func]
else:
# TODO:
# Runing setup() mulitple times can lead to adding derivative
# function multiple times. This prevent it from happening now. But
# in the future, we should think a better way to do so.
if func in self.deriv_funcs[pn]:
return
else:
self.deriv_funcs[pn] += [func]
[docs] def is_in_parfile(self, para_dict):
"""Check if this subclass included in parfile.
Parameters
----------
para_dict : dictionary
A dictionary contain all the parameters with values in string
from one parfile
Returns
-------
bool
Whether the subclass is included in the parfile.
"""
if self.component_special_params:
for p in self.component_special_params:
if p in para_dict:
return True
return False
pNames_inpar = list(para_dict.keys())
pNames_inModel = self.params
# FIXME: we have derived classes, this is the sort of thing that
# should go in them.
# For solar system Shapiro delay component
if hasattr(self, "PLANET_SHAPIRO"):
if "NO_SS_SHAPIRO" in pNames_inpar:
return False
else:
return True
try:
bmn = getattr(self, "binary_model_name")
except AttributeError:
# This isn't a binary model, keep looking
pass
else:
if "BINARY" in para_dict:
return bmn == para_dict["BINARY"][0]
else:
return False
# Compare the componets parameter names with par file parameters
compr = list(set(pNames_inpar).intersection(pNames_inModel))
if compr == []:
# Check aliases
for p in pNames_inModel:
al = getattr(self, p).aliases
# No aliases in parameters
if al == []:
continue
# Find alias check if match any of parameter name in parfile
if list(set(pNames_inpar).intersection(al)):
return True
else:
continue
# TODO Check prefix parameter
return False
return True
[docs] def print_par(self, format="pint"):
"""
Parameters
----------
format : str, optional
Parfile output format. PINT outputs the 'tempo', 'tempo2' and 'pint'
format. The defaul format is `pint`. Actual formatting done elsewhere.
Returns
-------
str : formatted line for par file
"""
result = ""
for p in self.params:
result += getattr(self, p).as_parfile_line(format=format)
return result
[docs]class DelayComponent(Component):
def __init__(self):
super().__init__()
self.delay_funcs_component = []
[docs]class PhaseComponent(Component):
def __init__(self):
super().__init__()
self.phase_funcs_component = []
self.phase_derivs_wrt_delay = []
[docs]class AllComponents:
"""A class for the components pool.
This object stores and manages the instances of component classes with class
attribute .register = True. This includes the PINT built-in components and
user defined components and there is no need to import the component class.
This class constructs the available component instances, but without any
valid parameter values (parameters are initialized when a component instance
gets constructed, however, the parameter values are unknown to the components
at the moment). Thus, runing `.validate()` function in the component instance
will fail. This class is designed for helping model building and parameter
seraching, not for direct data analysis.
Note
----
This is a low level class for managing all the components. To build a timing
model, we recommend to use the subclass `models.model_builder.ModelBuilder`,
where higher level interface are provided. If one wants to use this class
directly, one has to construct the instance separately.
"""
def __init__(self):
self.components = {}
for k, v in Component.component_types.items():
self.components[k] = v()
@lazyproperty
def param_component_map(self):
"""Return the parameter to component map.
This property returns the all PINT defined parameters to their host
components. The parameter aliases are not included in this map. If
searching the host component for a parameter alias, pleaase use
`alias_to_pint_param` method to translate the alias to PINT parameter
name first.
"""
p2c_map = defaultdict(list)
for k, cp in self.components.items():
for p in cp.params:
p2c_map[p].append(k)
# Add alias
par = getattr(cp, p)
for ap in par.aliases:
p2c_map[ap].append(k)
tm = TimingModel()
for tp in tm.params:
p2c_map[tp].append("timing_model")
par = getattr(tm, tp)
for ap in par.aliases:
p2c_map[ap].append("timing_model")
return p2c_map
def _check_alias_conflict(self, alias, param_name, alias_map):
"""Check if a aliase has conflict in the alias map.
This function checks if an alias already have record in the alias_map.
If there is a record, it will check if the record matches the given
paramter name, `param_name`. If not match, it will raise a AliasConflict
error.
Parameter
---------
alias: str
The alias name that needs to check if it has entry in the alias_map.
param_name: str
The parameter name that a alias is going to be mapped to.
Raise
-----
AliasConflict
When the input alias has a record in the aliases map, but the record
does not match the input parameter name that is going to be mapped
to the input alias.
"""
if alias in alias_map.keys():
if param_name == alias_map[alias]:
return
else:
raise AliasConflict(
f"Alias {alias} has been used by" f" parameter {param_name}."
)
else:
return
@lazyproperty
def _param_alias_map(self):
"""Return the aliases map of all parameters
The returned map includes: 1. alias to PINT parameter name. 2. PINT
parameter name to pint parameter name. 3.prefix to PINT parameter name.
Notes
-----
Please use `alias_to_pint_param` method to map an alias to a PINT parameter.
"""
alias = {}
for k, cp in self.components.items():
for p in cp.params:
par = getattr(cp, p)
# Check if an existing record
self._check_alias_conflict(p, p, alias)
alias[p] = p
for als in par.aliases:
self._check_alias_conflict(als, p, alias)
alias[als] = p
tm = TimingModel()
for tp in tm.params:
par = getattr(tm, tp)
self._check_alias_conflict(tp, tp, alias)
alias[tp] = tp
for als in par.aliases:
self._check_alias_conflict(als, tp, alias)
alias[als] = tp
return alias
@lazyproperty
def _param_unit_map(self):
"""A dictionary to map parameter names to their units
This excludes prefix parameters and aliases. Use :func:`param_to_unit` to handle those.
"""
units = {}
for k, cp in self.components.items():
for p in cp.params:
if p in units.keys() and units[p] != getattr(cp, p).units:
raise TimingModelError(
f"Units of parameter '{p}' in component '{cp}' ({getattr(cp, p).units}) do not match those of existing parameter ({units[p]})"
)
units[p] = getattr(cp, p).units
tm = TimingModel()
for tp in tm.params:
units[p] = getattr(tm, tp).units
return units
@lazyproperty
def repeatable_param(self):
"""Return the repeatable parameter map."""
repeatable = []
for k, cp in self.components.items():
for p in cp.params:
par = getattr(cp, p)
if par.repeatable:
repeatable.append(p)
repeatable.append(par._parfile_name)
# also add the aliases to the repeatable param
for als in par.aliases:
repeatable.append(als)
return set(repeatable)
@lazyproperty
def category_component_map(self):
"""A dictionary mapping category to a list of component names.
Return
------
dict
The mapping from categories to the componens belongs to the categore.
The key is the categore name, and the value is a list of all the
components in the categore.
"""
category = defaultdict(list)
for k, cp in self.components.items():
cat = cp.category
category[cat].append(k)
return category
@lazyproperty
def component_category_map(self):
"""A dictionary mapping component name to its category name.
Return
------
dict
The mapping from components to its categore. The key is the component
name and the value is the component's category name.
"""
cp_ca = {}
for k, cp in self.components.items():
cp_ca[k] = cp.category
return cp_ca
@lazyproperty
def component_unique_params(self):
"""Return the parameters that are only present in one component.
Return
------
dict
A mapping from a component name to a list of parameters are only
in this component.
Note
----
This function only returns the PINT defined parameter name, not
including the aliases.
"""
component_special_params = defaultdict(list)
for param, cps in self.param_component_map.items():
if len(cps) == 1:
component_special_params[cps[0]].append(param)
return component_special_params
[docs] def search_binary_components(self, system_name):
"""Search the pulsar binary component based on given name.
Parameters
----------
system_name : str
Searching name for the pulsar binary/system
Return
------
The matching binary model component instance.
Raises
------
UnknownBinaryModel
If the input binary model name does not match any PINT defined binary
model.
"""
all_systems = self.category_component_map["pulsar_system"]
# Search the system name first
if system_name in all_systems:
return self.components[system_name]
else: # search for the pulsar system aliases
for cp_name in all_systems:
if system_name == self.components[cp_name].binary_model_name:
return self.components[cp_name]
if system_name == "BTX":
raise UnknownBinaryModel(
"`BINARY BTX` is not supported bt PINT. Use "
"`BINARY BT` instead. It supports both orbital "
"period (PB, PBDOT) and orbital frequency (FB0, ...) "
"parametrizations."
)
elif system_name == "DDFWHE":
raise UnknownBinaryModel(
"`BINARY DDFWHE` is not supported, but the same model "
"is available as `BINARY DDH`."
)
elif system_name in ["MSS", "EH", "H88", "DDT", "BT1P", "BT2P"]:
# Binary model list taken from
# https://tempo.sourceforge.net/ref_man_sections/binary.txt
raise UnknownBinaryModel(
f"`The binary model {system_name} is not yet implemented."
)
raise UnknownBinaryModel(
f"Pulsar system/Binary model component"
f" {system_name} is not provided."
)
[docs] def alias_to_pint_param(self, alias):
"""Translate a alias to a PINT parameter name.
This is a wrapper function over the property ``_param_alias_map``. It
also handles indexed parameters (e.g., `pint.models.parameter.prefixParameter`
and `pint.models.parameter.maskParameter`) with an index beyond those currently
initialized.
Parameters
----------
alias : str
Alias name to be translated
Returns
-------
pint_par : str
PINT parameter name the given alias maps to. If there is no matching
PINT parameters, it will raise a `UnknownParameter` error.
first_init_par : str
The parameter name that is first initialized in a component. If the
paramere is non-indexable, it is the same as ``pint_par``, otherwrise
it returns the parameter with the first index. For example, the
``first_init_par`` for 'T2EQUAD25' is 'EQUAD1'
Notes
-----
Providing a indexable parameter without the index attached, it returns
the PINT parameter with first index (i.e. ``0`` or ``1``). If with index,
the function returns the matched parameter with the index provided.
The index format has to match the PINT defined index format. For instance,
if PINT defines a parameter using leading-zero indexing, the provided
index has to use the same leading-zeros, otherwrise, returns a `UnknownParameter`
error.
Examples
--------
>>> from pint.models.timing_model import AllComponents
>>> ac = AllComponents()
>>> ac.alias_to_pint_param('RA')
('RAJ', 'RAJ')
>>> ac.alias_to_pint_param('T2EQUAD')
('EQUAD1', 'EQUAD1')
>>> ac.alias_to_pint_param('T2EQUAD25')
('EQUAD25', 'EQUAD1')
>>> ac.alias_to_pint_param('DMX_0020')
('DMX_0020', 'DMX_0001')
>>> ac.alias_to_pint_param('DMX20')
UnknownParameter: Can not find matching PINT parameter for 'DMX020'
"""
pint_par = self._param_alias_map.get(alias, None)
# If it is not in the map, double check if it is a repeatable par.
if pint_par is None:
try:
prefix, idx_str, idx = split_prefixed_name(alias)
# assume the index 1 parameter is in the alias map
# count length of idx_str and dectect leading zeros
# TODO fix the case for searching `DMX`
num_lzero = len(idx_str) - len(str(idx))
if num_lzero > 0: # Has leading zero
fmt = len(idx_str)
else:
fmt = 0
first_init_par = None
# Handle the case of start index from 0 and 1
for start_idx in [0, 1]:
first_init_par_alias = prefix + f"{start_idx:0{fmt}}"
first_init_par = self._param_alias_map.get(
first_init_par_alias, None
)
if first_init_par:
# Find the first init par move to the next step
pint_par = split_prefixed_name(first_init_par)[0] + idx_str
break
except PrefixError:
pint_par = None
else:
first_init_par = pint_par
if pint_par is None:
raise UnknownParameter(
"Can not find matching PINT parameter for '{}'".format(alias)
)
return pint_par, first_init_par
[docs] def param_to_unit(self, name):
"""Return the unit associated with a parameter
This is a wrapper function over the property ``_param_unit_map``. It
also handles aliases and indexed parameters (e.g., `pint.models.parameter.prefixParameter`
and `pint.models.parameter.maskParameter`) with an index beyond those currently
initialized.
This can be used without an existing :class:`~pint.models.TimingModel`.
Parameters
----------
name : str
Name of PINT parameter or alias
Returns
-------
astropy.u.Unit
"""
pintname, firstname = self.alias_to_pint_param(name)
if pintname == firstname:
# not a prefix parameter
return self._param_unit_map[pintname]
prefix, idx_str, idx = split_prefixed_name(pintname)
component = self.param_component_map[firstname][0]
if getattr(self.components[component], firstname).unit_template is None:
return self._param_unit_map[firstname]
return u.Unit(getattr(self.components[component], firstname).unit_template(idx))
[docs]class TimingModelError(ValueError):
"""Generic base class for timing model errors."""
pass
[docs]class MissingParameter(TimingModelError):
"""A required model parameter was not included.
Parameters
----------
module
name of the model class that raised the error
param
name of the missing parameter
msg
additional message
"""
def __init__(self, module, param, msg=None):
super().__init__(msg)
self.module = module
self.param = param
self.msg = msg
def __str__(self):
result = self.module + "." + self.param
if self.msg is not None:
result += "\n " + self.msg
return result
[docs]class AliasConflict(TimingModelError):
"""If the same alias is used for different parameters."""
pass
[docs]class UnknownParameter(TimingModelError):
"""Signal that a parameter name does not match any PINT parameters and their aliases."""
pass
[docs]class UnknownBinaryModel(TimingModelError):
"""Signal that the par file requested a binary model not in PINT."""
def __init__(self, message, suggestion=None):
super().__init__(message)
self.suggestion = suggestion
def __str__(self):
base_message = super().__str__()
if self.suggestion:
return f"{base_message} Perhaps use {self.suggestion}?"
return base_message
[docs]class MissingBinaryError(TimingModelError):
"""Error for missing BINARY parameter."""
pass