"""Objects for comparing models to data.
These objects can be constructed directly, as ``Residuals(toas, model)``, or
they are constructed during fitting operations with :class:`pint.fitter.Fitter`
objects, as ``fitter.residual``. Variants exist for arrival-time-only data
(:class:`pint.residuals.Residuals`) and for arrival times that come paired with
dispersion measures (:class:`pint.residuals.WidebandTOAResiduals`).
"""
import collections
import copy
import warnings
import astropy.units as u
import numpy as np
from scipy.linalg import LinAlgError
from loguru import logger as log
from pint.models.dispersion_model import Dispersion
from pint.models.parameter import maskParameter
from pint.phase import Phase
from pint.utils import (
sherman_morrison_dot,
weighted_mean,
taylor_horner_deriv,
woodbury_dot,
)
__all__ = [
"Residuals",
"WidebandTOAResiduals",
"WidebandDMResiduals",
"CombinedResiduals",
]
[docs]class Residuals:
"""Class to compute residuals between TOAs and a TimingModel.
This class serves to store the results of a comparison between TOAs and a
model. It also implements certain basic statistical calculations. This
class also serves as a base class providing some infrastructure to support
residuals from other kinds of data/model comparison.
This class provides access to the residuals in both phase (turns) and time
(seconds) form through the ``.phase_resids`` and the ``.time_resids``
attributes.
Uncertainties on these residuals are available in time units using
``.get_data_error()``; this can include or not include any rescaling
of the uncertainties implied by the model's EFAC or EQUAD.
Attributes
----------
phase_resids : :class:`astropy.units.Quantity`
Residuals in phase units
time_resids : :class:`astropy.units.Quantity`
Residuals in time units
Parameters
----------
toas: :class:`pint.toa.TOAs`, optional
The input TOAs object. Default: None
model: :class:`pint.models.timing_model.TimingModel`, optional
Input model object. Default: None
residual_type: str, optional
The type of the residuals. Default: 'toa'
unit: :class:`astropy.units.Unit`, optional
The default unit of the residuals. Default: u.s
subtract_mean : bool
Controls whether mean will be subtracted from the residuals.
This option will be ignored if a `PhaseOffset` is present in the timing model.
use_weighted_mean : bool
Controls whether mean computation is weighted (by errors) or not.
track_mode : None, "nearest", "use_pulse_numbers"
Controls how pulse numbers are assigned. ``"nearest"`` assigns
each TOA to the nearest integer pulse. ``"use_pulse_numbers"`` uses the
``pulse_number`` column of the TOAs table to assign pulse numbers. If the
default, None, is passed, use the pulse numbers if the model has the
parameter TRACK == "-2" and not if it has TRACK == "0". If neither of the
above is set, use pulse numbers if there are pulse numbers present and not
if there aren't.
"""
def __new__(
cls,
toas=None,
model=None,
residual_type="toa",
unit=u.s,
subtract_mean=True,
use_weighted_mean=True,
track_mode=None,
use_abs_phase=True,
):
if cls is Residuals:
try:
cls = residual_map[residual_type.lower()]
except KeyError as e:
raise ValueError(
f"'{residual_type}' is not a PINT supported residual. Currently supported data types are {list(residual_map.keys())}"
) from e
return super().__new__(cls)
def __init__(
self,
toas=None,
model=None,
residual_type="toa",
unit=u.s,
subtract_mean=True,
use_weighted_mean=True,
track_mode=None,
use_abs_phase=True,
):
self.toas = toas
self.model = model
self.residual_type = residual_type
if "PhaseOffset" in model.components and subtract_mean:
log.debug(
"Disabling implicit `subtract_mean` because `PhaseOffset` is present in the timing model."
)
self.subtract_mean = subtract_mean and "PhaseOffset" not in model.components
self.use_abs_phase = use_abs_phase
self.use_weighted_mean = use_weighted_mean
if track_mode is None:
if getattr(self.model, "TRACK").value == "-2":
self.track_mode = "use_pulse_numbers"
elif getattr(self.model, "TRACK").value == "0":
self.track_mode = "nearest"
elif "pulse_number" in self.toas.table.columns:
if np.any(np.isnan(toas.table["pulse_number"])):
log.warning(
"Some TOAs are missing pulse numbers, they will not be used."
)
self.track_mode = "nearest"
else:
self.track_mode = "use_pulse_numbers"
else:
self.track_mode = "nearest"
else:
self.track_mode = track_mode
if toas is not None and model is not None:
self.phase_resids = self.calc_phase_resids()
self.time_resids = self.calc_time_resids()
else:
self.phase_resids = None
self.time_resids = None
# delay chi-squared computation until needed to avoid infinite recursion
# also it's expensive
# only relevant if there are correlated errors
self._chi2 = None
self.noise_resids = {}
# For residual debugging
self.debug_info = {}
# We should be carefully for the other type of residuals
self.unit = unit
# A flag to identify if this residual object is combined with residual
# class.
self._is_combined = False
@property
def resids(self):
"""Residuals in time units."""
if self.time_resids is None:
self.update()
return self.time_resids
@property
def resids_value(self):
"""Residuals in seconds, with the units stripped."""
return self.resids.to_value(self.unit)
[docs] def update(self):
"""Recalculate everything in residuals class after changing model or TOAs"""
if self.toas is None or self.model is None:
self.phase_resids = None
self.time_resids = None
if self.toas is None:
raise ValueError("No TOAs provided for residuals update")
if self.model is None:
raise ValueError("No model provided for residuals update")
self.phase_resids = self.calc_phase_resids()
self.time_resids = self.calc_time_resids()
self._chi2 = None # trigger chi2 recalculation when needed
@property
def chi2(self):
"""Compute chi-squared as needed and cache the result."""
if self._chi2 is None:
self._chi2 = self.calc_chi2()
assert self._chi2 is not None
return self._chi2
@property
def dof(self):
"""Return number of degrees of freedom for the model."""
if self._is_combined:
raise AttributeError(
"Please use the `.dof` in the CombinedResidual"
" class. The individual residual's dof is not "
"calculated correctly in the combined residuals."
)
dof = self.toas.ntoas
# Now subtract 1 for the implicit global offset parameter
# Note that we should do two things eventually
# 1. Make the offset not be a hidden parameter
dof -= len(self.model.free_params) + 1
return dof
@property
def reduced_chi2(self):
"""Return the weighted reduced chi-squared for the model and toas."""
return self.chi2 / self.dof
@property
def chi2_reduced(self):
"""Reduced chi-squared."""
warnings.warn(
"Do not use 'residuals.chi2_reduced'. Please use 'residuals.reduced_chi2' instead.",
DeprecationWarning,
)
return self.chi2 / self.dof
[docs] def get_data_error(self, scaled=True):
"""Get errors on time residuals.
This returns the uncertainties on the time residuals, optionally scaled
by the noise model.
Parameters
----------
scaled: bool, optional
If errors get scaled by the noise model.
"""
return (
self.model.scaled_toa_uncertainty(self.toas)
if scaled
else self.toas.get_errors()
)
[docs] def rms_weighted(self):
"""Compute weighted RMS of the residuals in time."""
# Use scaled errors, if the noise model is not presented, it will
# return the raw errors
scaled_errors = self.get_data_error()
if np.any(scaled_errors.value == 0):
raise ValueError(
"Some TOA errors are zero - cannot calculate weighted RMS of residuals"
)
w = 1.0 / (scaled_errors.to(u.s) ** 2)
wmean, werr, wsdev = weighted_mean(self.time_resids, w, sdev=True)
return wsdev.to(u.us)
[docs] def get_PSR_freq(self, calctype="modelF0"):
"""Return pulsar rotational frequency in Hz.
Parameters
----------
calctype : {'modelF0', 'numerical', 'taylor'}
Type of calculation. If `calctype` == "modelF0", then simply the ``F0``
parameter from the model.
If `calctype` == "numerical", then try a numerical derivative
If `calctype` == "taylor", evaluate the frequency with a Taylor series
Returns
-------
freq : astropy.units.Quantity
Either the single ``F0`` in the model or the spin frequency at the moment of each TOA.
"""
assert calctype.lower() in ["modelf0", "taylor", "numerical"]
if calctype.lower() == "modelf0":
# TODO this function will be re-write and move to timing model soon.
# The following is a temporary patch.
if "Spindown" in self.model.components:
F0 = self.model.F0.quantity
elif "P0" in self.model.params:
F0 = 1.0 / self.model.P0.quantity
else:
raise AttributeError(
"No pulsar spin parameter(e.g., 'F0'," " 'P0') found."
)
return F0.to(u.Hz)
elif calctype.lower() == "taylor":
# see Spindown.spindown_phase
dt = self.model.get_dt(self.toas, 0)
# if the model is defined through F0, F1, ...
if "F0" in self.model.params:
fterms = [0.0 * u.dimensionless_unscaled] + self.model.get_spin_terms()
# otherwise assume P0, PDOT
else:
F0 = 1.0 / self.model.P0.quantity
if "PDOT" in self.model.params:
F1 = -self.model.PDOT.quantity / self.model.P0.quantity**2
else:
F1 = 0 * u.Hz / u.s
fterms = [0.0 * u.dimensionless_unscaled, F0, F1]
return taylor_horner_deriv(dt, fterms, deriv_order=1).to(u.Hz)
elif calctype.lower() == "numerical":
return self.model.d_phase_d_toa(self.toas)
[docs] def calc_phase_resids(
self, subtract_mean=None, use_weighted_mean=None, use_abs_phase=None
):
"""Compute timing model residuals in pulse phase.
if ``subtract_mean`` or ``use_weighted_mean`` is None, will use the values set for the object itself
Parameters
----------
subtract_mean : bool or None, optional
Subtract the mean of the residuals. This is ignored if the `PhaseOffset` component
is present in the model. Default is to use the class attribute.
use_weighted_mean : bool or None, optional
Whether to use weighted mean for mean subtraction. Default is to use the class attribute.
use_abs_phase : bool or None, optional
Whether to use absolute phase (w.r.t. the TZR TOA). Default is to use the class attribute.
Returns
-------
Phase
See Also
--------
:meth:`pint.residuals.Residuals.get_PSR_freq`
:meth:`pint.residuals.Residuals.calc_time_resids`
:meth:`pint.residuals.Residuals.calc_whitened_resids`
"""
if subtract_mean is None:
subtract_mean = self.subtract_mean
if "PhaseOffset" in self.model.components and subtract_mean:
log.debug(
"Ignoring `subtract_mean` because `PhaseOffset` is present in the timing model."
)
subtract_mean = subtract_mean and "PhaseOffset" not in self.model.components
if use_weighted_mean is None:
use_weighted_mean = self.use_weighted_mean
if use_abs_phase is None:
use_abs_phase = self.use_abs_phase
# Read any delta_pulse_numbers that are in the TOAs table.
# These are for PHASE statements, -padd flags, as well as user-inserted phase jumps
# Check for the column, and if not there then create it as zeros
if "delta_pulse_number" not in self.toas.table.colnames:
self.toas.table["delta_pulse_number"] = np.zeros(len(self.toas.get_mjds()))
delta_pulse_numbers = Phase(self.toas.table["delta_pulse_number"])
# Track on pulse numbers, if requested
if self.track_mode == "use_pulse_numbers":
pulse_num = self.toas.get_pulse_numbers()
if pulse_num is None:
raise ValueError(
"Pulse numbers missing from TOAs but track_mode requires them"
)
# Compute model phase. For pulse numbers tracking
# we need absolute phases, since TZRMJD serves as the pulse
# number reference.
modelphase = (
self.model.phase(self.toas, abs_phase=use_abs_phase)
+ delta_pulse_numbers
)
# First assign each TOA to the correct relative pulse number, including
# and delta_pulse_numbers (from PHASE lines or adding phase jumps in GUI)
i = pulse_num.copy()
f = np.zeros_like(pulse_num)
c = np.isnan(pulse_num)
if np.any(c):
# i[c] = 0
raise ValueError("Pulse numbers are missing on some TOAs")
residualphase = modelphase - Phase(i, f)
# This converts from a Phase object to a np.float128
full = residualphase.int + residualphase.frac
elif self.track_mode == "nearest":
# Compute model phase
modelphase = self.model.phase(self.toas) + delta_pulse_numbers
# Here it subtracts the first phase, so making the first TOA be the
# reference. Not sure this is a good idea.
if subtract_mean:
modelphase -= Phase(modelphase.int[0], modelphase.frac[0])
# Here we discard the integer portion of the residual and replace it with 0
# This is effectively selecting the nearest pulse to compute the residual to.
residualphase = Phase(np.zeros_like(modelphase.frac), modelphase.frac)
# This converts from a Phase object to a np.float128
full = residualphase.int + residualphase.frac
else:
raise ValueError(f"Invalid track_mode '{self.track_mode}'")
# If we are using pulse numbers, do we really want to subtract any kind of mean?
if not subtract_mean:
return full
if not use_weighted_mean:
mean = full.mean()
else:
# Errs for weighted sum. Units don't matter since they will
# cancel out in the weighted sum.
if np.any(self.get_data_error() == 0):
raise ValueError(
"Some TOA errors are zero - cannot calculate residuals"
)
w = 1.0 / (self.get_data_error().value ** 2)
mean, err = weighted_mean(full, w)
return full - mean
def _calc_mean(self, weighted, type, calctype=None):
assert type in ["time", "phase"]
r = (
self.calc_phase_resids(subtract_mean=False)
if type == "phase"
else self.calc_time_resids(subtract_mean=False, calctype=calctype)
)
if not weighted:
return r.mean()
if np.any(self.get_data_error() == 0):
raise ValueError("Some TOA errors are zero - cannot calculate residuals")
w = 1.0 / (self.get_data_error().value ** 2)
mean, _ = weighted_mean(r, w)
return mean
[docs] def calc_phase_mean(self, weighted=True):
"""Calculate mean phase of residuals, optionally weighted
Parameters
----------
weighted : bool, optional
Returns
-------
astropy.units.Quantity
"""
return self._calc_mean(weighted, "phase")
[docs] def calc_time_mean(self, calctype="taylor", weighted=True):
"""Calculate mean time of residuals, optionally weighted
Parameters
----------
calctype : str, optional
Calculation time for phase to time conversion. See :meth:`pint.residuals.Residuals.calc_time_resids` for details.
weighted : bool, optional
Returns
-------
astropy.units.Quantity
"""
return self._calc_mean(weighted, "time", calctype=calctype)
[docs] def calc_time_resids(
self,
calctype="taylor",
subtract_mean=None,
use_weighted_mean=None,
use_abs_phase=None,
):
"""Compute timing model residuals in time (seconds).
Converts from phase residuals to time residuals using several possible ways
to calculate the frequency.
If ``subtract_mean`` or ``use_weighted_mean`` is None, will use the values set for the object itself
Parameters
----------
calctype : {'taylor', 'modelF0', 'numerical'}
Type of calculation. If `calctype` == "modelF0", then simply the ``F0``
parameter from the model.
If `calctype` == "numerical", then try a numerical derivative
If `calctype` == "taylor", evaluate the frequency with a Taylor series
subtract_mean : bool or None, optional
Subtract the mean of the residuals. This is ignored if the `PhaseOffset` component
is present in the model. Default is to use the class attribute.
use_weighted_mean : bool or None, optional
Whether to use weighted mean for mean subtraction. Default is to use the class attribute.
use_abs_phase : bool or None, optional
Whether to use absolute phase (w.r.t. the TZR TOA). Default is to use the class attribute.
Returns
-------
residuals : astropy.units.Quantity
See Also
--------
:meth:`pint.residuals.Residuals.get_PSR_freq`
:meth:`pint.residuals.Residuals.calc_phase_resids`
:meth:`pint.residuals.Residuals.calc_whitened_resids`
"""
assert calctype.lower() in ["modelf0", "taylor", "numerical"]
if subtract_mean is None and use_weighted_mean is None:
# if we are using the defaults, save the calculation
if self.phase_resids is None:
self.phase_resids = self.calc_phase_resids(
subtract_mean=subtract_mean,
use_weighted_mean=use_weighted_mean,
use_abs_phase=use_abs_phase,
)
phase_resids = self.phase_resids
else:
phase_resids = self.calc_phase_resids(
subtract_mean=subtract_mean,
use_weighted_mean=use_weighted_mean,
use_abs_phase=use_abs_phase,
)
return (phase_resids / self.get_PSR_freq(calctype=calctype)).to(u.s)
[docs] def calc_whitened_resids(self):
"""Compute whitened timing residuals (dimensionless).
Whitened residuals are computed by subtracting the correlated
noise realization from the time residuals and normalizes the
result using scaled TOA uncertainties.
This requires the `noise_resids` attribute to be set. This is
usually available in a post-fit residuals object.
Example usage::
>>> ftr = Fitter.auto(toas, model)
>>> ftr.fit_toas()
>>> res = ftr.resids
>>> white_res = res.calc_whitened_resids()
See Also
--------
:meth:`pint.residuals.Residuals.calc_time_resids`
:meth:`pint.residuals.Residuals.calc_phase_resids`
"""
r = self.calc_time_resids()
nr = sum(self.noise_resids.values())
sigma = self.get_data_error()
return ((r - nr) / sigma).to(u.dimensionless_unscaled)
def _calc_gls_chi2(self, lognorm=False):
"""Compute the chi2 when correlated noise is present in the timing model.
If the system is not singular, it uses Cholesky factorization to evaluate this.
If the system is singular, it uses singular value decomposition instead.
If `lognorm=True` is given, the log-normalization-factor of the likelihood
function will also be returned.
"""
assert self.model.has_correlated_errors
s = self.time_resids.to_value(u.s)
Ndiag = self.get_data_error().to_value(u.s) ** 2
U = self.model.noise_model_designmatrix(self.toas)
Phidiag = self.model.noise_model_basis_weight(self.toas)
if "PHOFF" not in self.model.free_params:
U = np.append(U, np.ones((len(self.toas), 1)), axis=1)
Phidiag = np.append(Phidiag, [1e40])
chi2, logdet_C = woodbury_dot(Ndiag, U, Phidiag, s, s)
return (chi2, logdet_C / 2) if lognorm else chi2
def _calc_ecorr_chi2(self, lognorm=False):
"""Compute the chi2 when ECORR is present in the timing
model without any other correlated noise components."""
assert (
self.model.has_correlated_errors
and not self.model.has_time_correlated_errors
and "PHOFF" in self.model.free_params
)
m, t = self.model, self.toas
ecorr_masks = m.components["EcorrNoise"].get_noise_basis(t).T.astype(bool)
ecorr_weights = m.components["EcorrNoise"].get_noise_weights(t)
r = self.time_resids
sigma = self.get_data_error()
# For TOAs which don't belong to any ECORR group.
noecmask = np.logical_not(np.any(ecorr_masks, axis=0))
Ndiag = sigma[noecmask] ** 2
s = r[noecmask]
chisq = (
np.dot(s, s / Ndiag)
if len(s) > 0
else u.Quantity(0, u.dimensionless_unscaled)
)
logdet_C = np.sum(np.log(Ndiag.to_value(u.s**2)))
# For TOAs which belong to an ECORR group.
for ecmask, ecw in zip(ecorr_masks, ecorr_weights):
Ndiag = sigma[ecmask] ** 2
s = r[ecmask]
v = np.ones(len(s)) << u.s
chisq_sm, logdet_sm = sherman_morrison_dot(Ndiag, v, ecw, s, s)
chisq += chisq_sm
logdet_C += logdet_sm
return (
(chisq.to_value(u.dimensionless_unscaled), 0.5 * logdet_C)
if lognorm
else chisq.to_value(u.dimensionless_unscaled)
)
def _calc_wls_chi2(self, lognorm=False):
"""Compute the chi2 when no correlated noise components are present."""
assert not self.model.has_correlated_errors
# Residual units are in seconds. Error units are in microseconds.
toa_errors = self.get_data_error()
if (toa_errors == 0.0).any():
return np.inf
# The self.time_resids is in the unit of "s", the error "us".
# This is more correct way, but it is the slowest.
# return (((self.time_resids / self.toas.get_errors()).decompose()**2.0).sum()).value
# This method is faster then the method above but not the most correct way
# return ((self.time_resids.to(u.s) / self.toas.get_errors().to(u.s)).value**2.0).sum()
# This the fastest way, but highly depend on the assumption of time_resids and
# error units. Ensure only a pure number is returned.
r = self.time_resids
err = toa_errors.to(u.s)
chi2 = ((r / err) ** 2.0).sum().value
if not lognorm:
return chi2
log_norm = np.sum(np.log(err.value))
return chi2, log_norm
[docs] def calc_chi2(self, lognorm=False):
"""Return the weighted chi-squared for the model and toas.
If the errors on the TOAs are independent this is a straightforward
calculation, but if the noise model introduces correlated errors then
obtaining a meaningful chi-squared value requires a Cholesky
decomposition. This is carried out using the :method:`~pint.residuals.Residuals._calc_gls_chi2`
helper function.
The return value here is available as self.chi2, which will not
redo the computation unless necessary.
The chi-squared value calculated here is suitable for use in downhill
minimization algorithms and Bayesian approaches.
Handling of problematic results - degenerate conditions explored by
a minimizer for example - may need to be checked to confirm that they
correctly return infinity.
If `lognorm=True` is given, the log-normalization-factor of the likelihood
function will also be returned.
Parameters
----------
lognorm: bool
If True, return the the log-normalization-factor of the likelihood
function along with the chi2 value.
Returns
-------
chi2 if lognorm is False
(chi2, log_norm) if lognorm is True
"""
if not self.model.has_correlated_errors:
return self._calc_wls_chi2(lognorm=lognorm)
elif (
not self.model.has_time_correlated_errors
and "PhaseOffset" in self.model.components
):
return self._calc_ecorr_chi2(lognorm=lognorm)
else:
return self._calc_gls_chi2(lognorm=lognorm)
[docs] def lnlikelihood(self):
"""Compute the log-likelihood for the model and TOAs."""
chi2, log_norm = self.calc_chi2(lognorm=True)
return -(chi2 / 2 + log_norm)
def d_lnlikelihood_d_Ndiag(self):
r = self.time_resids
sigma = self.get_data_error()
if not self.model.has_correlated_errors:
Ndiag = sigma**2
term1 = -(r**2) / Ndiag**2
term2 = 1 / Ndiag
return -0.5 * (term1 + term2)
else:
if (
self.model.has_time_correlated_errors
or "PHOFF" not in self.model.free_params
):
raise NotImplementedError
ecorr_masks = (
self.model.components["EcorrNoise"]
.get_noise_basis(self.toas)
.T.astype(bool)
)
ecorr_weights = self.model.components["EcorrNoise"].get_noise_weights(
self.toas
)
# For TOAs which don't belong to any ECORR group.
noecmask = np.logical_not(np.any(ecorr_masks, axis=0))
Ndiag = sigma[noecmask] ** 2
s = r[noecmask]
term1 = -(s**2) / Ndiag**2
term2 = 1 / Ndiag
term3 = term4 = 0
# For TOAs which belong to an ECORR group.
for ecmask, c in zip(ecorr_masks, ecorr_weights):
Ndiag = sigma[ecmask] ** 2
s = r[ecmask]
v = np.ones(len(s)) << u.s
s_Ninv_v = np.sum(s * v / Ndiag)
v_Ninv_v = np.sum(v**2 / Ndiag)
denom = 1 + c**2 * v_Ninv_v
term1 += -(s**2) / Ndiag**2
term2 += 1 / Ndiag
term3 += 2 * c**2 * (
s_Ninv_v / denom * (s * v / Ndiag**2)
) - c**4 * (s_Ninv_v**2 / denom**2 * (v**2 / Ndiag**2))
term4 += -(c**2) * (v**2 / Ndiag**2) / denom
return -0.5 * (term1 + term2 + term3 + term4)
[docs] def d_Ndiag_d_param(self, param):
"""Derivative of the white noise covariance matrix diagonal elements
w.r.t. a white noise parameter (EFAC or EQUAD)."""
sigma = self.get_data_error()
d_sigma_d_param = self.model.d_toasigma_d_param(self.toas, param)
return 2 * sigma * d_sigma_d_param
def d_lnlikelihood_d_ECORR(self, param):
par = self.model[param]
fullmask = par.select_toa_mask(self.toas)
t = self.toas[fullmask]
sigma = self.get_data_error()
r = self.time_resids
c = par.quantity
ecorr_masks = (
self.model.components["EcorrNoise"].get_noise_basis(t).T.astype(bool)
)
result = 0
# For TOAs which belong to an ECORR group.
for ecmask in ecorr_masks:
Ndiag = sigma[ecmask] ** 2
s = r[ecmask]
v = np.ones_like(s)
s_Ninv_v = np.sum(s * v / Ndiag)
v_Ninv_v = np.sum(v**2 / Ndiag)
denom = 1 + c**2 * v_Ninv_v
result += (
c * (s_Ninv_v**2 - v_Ninv_v - c**2 * v_Ninv_v**2) / denom**2
)
return result
def d_lnlikelihood_d_param(self, param):
par = self.model[param]
if self.model.has_correlated_errors and (
self.model.has_time_correlated_errors
or "PHOFF" not in self.model.free_params
):
raise NotImplementedError
if isinstance(par, maskParameter):
if par.prefix in ["EFAC", "EQUAD"]:
return np.sum(
self.d_lnlikelihood_d_Ndiag() * self.d_Ndiag_d_param(param)
).to(1 / par.units)
elif par.prefix == "ECORR":
return self.d_lnlikelihood_d_ECORR(param).to(1 / par.units)
raise NotImplementedError(
f"d_lnlikelihood_d_param is not defined for parameter {param}."
)
# def d_lnlikelihood_d_whitenoise_param(self, param):
# if self.model.has_correlated_errors:
# raise NotImplementedError(
# "d_lnlikelihood_d_whitenoise_param is not implemented "
# "for the case when correlated noise is present."
# )
# r = self.time_resids
# sigma = self.get_data_error()
# d_sigma_d_param = self.model.d_toasigma_d_param(self.toas, param)
# return np.sum(((r / sigma) ** 2 - 1) / sigma * d_sigma_d_param).to(1/self.model[param].units)
[docs] def ecorr_average(self, use_noise_model=True):
"""Uses the ECORR noise model time-binning to compute "epoch-averaged" residuals.
Requires ECORR be used in the timing model. If
``use_noise_model`` is true, the noise model terms (EFAC, EQUAD, ECORR) will
be applied to the TOA uncertainties, otherwise only the raw
uncertainties will be used.
Returns a dictionary with the following entries:
mjds Average MJD for each segment
freqs Average topocentric frequency for each segment
time_resids Average residual for each segment, time units
noise_resids Dictionary of per-noise-component average residual
errors Uncertainty on averaged residuals
indices List of lists giving the indices of TOAs in the original TOA table for each segment
"""
# ECORR is required
try:
ecorr = self.model.get_components_by_category()["ecorr_noise"][0]
except KeyError:
raise ValueError("ECORR not present in noise model")
# "U" matrix gives the TOA binning, "weight" is ECORR
# uncertainty in seconds, squared.
U, ecorr_err2 = ecorr.ecorr_basis_weight_pair(self.toas)
ecorr_err2 *= u.s * u.s
if use_noise_model:
err = self.model.scaled_toa_uncertainty(self.toas)
else:
err = self.toas.get_errors()
ecorr_err2 *= 0.0
# Weight for sums, and normalization
wt = 1.0 / (err * err)
a_norm = np.dot(U.T, wt)
def wtsum(x):
return np.dot(U.T, wt * x) / a_norm
# Weighted average of various quantities
avg = {}
avg["mjds"] = wtsum(self.toas.get_mjds())
avg["freqs"] = wtsum(self.toas.get_freqs())
avg["time_resids"] = wtsum(self.time_resids)
avg["noise_resids"] = {}
for k in self.noise_resids.keys():
avg["noise_resids"][k] = wtsum(self.noise_resids[k])
# Uncertainties
# TODO could add an option to incorporate residual scatter
avg["errors"] = np.sqrt(1.0 / a_norm + ecorr_err2)
# Indices back into original TOA list
avg["indices"] = [list(np.where(U[:, i])[0]) for i in range(U.shape[1])]
return avg
[docs]class WidebandDMResiduals(Residuals):
"""Residuals for independent DM measurement (i.e. Wideband TOAs).
This class manages the DM residuals from data that includes direct DM
measurements associated with the TOAs.
:class:`pint.residuals.WidebandTOAResiduals` combines one of these objects
with a :class:`pint.residuals.Residuals` object.
The values of interest are probably best accessed using the ``.resids``
property, and the uncertainty using the ``.get_data_error()``.
Attributes
----------
dm_data : :class:`astropy.units.Quantity`
The DM data extracted from the TOAs.
dm_error : :class:`astropy.units.Quantity`
The DM uncertainties extracted from the TOAs.
Parameters
----------
toas : :class:`pint.toa.TOAs`
TOAs. They should include DM measurement data.
model : :class:`pint.models.timing_model.TimingModel`
The timing model.
"""
def __init__(
self,
toas=None,
model=None,
residual_type="dm",
unit=u.pc / u.cm**3,
subtract_mean=False,
use_weighted_mean=True,
):
self.toas = toas
self.model = model
self.residual_type = residual_type
self.unit = unit
self.subtract_mean = subtract_mean
self.use_weighted_mean = use_weighted_mean
self.base_unit = u.pc / u.cm**3
self.get_model_value = self.model.total_dm
self.dm_data, self.dm_error, self.relevant_toas = self.get_dm_data()
self._chi2 = None
self._is_combined = False
# For residual debugging
self.debug_info = {}
@property
def resids(self):
return self.calc_resids()
@property
def resids_value(self):
"""Get pure value of the residuals use the given base unit."""
return self.resids.to_value(self.unit)
@property
def dof(self):
"""Return number of degrees of freedom for the DM model."""
if self._is_combined:
raise AttributeError(
"Please use the `.dof` in the CombinedResidual"
" class. The individual residual's dof is not "
"calculated correctly in the combined residuals."
)
# only get dm type of model component
# TODO provide a function in the timing model to get one type of component
dof = len(self.dm_data) - sum(
len(cp.free_params_component)
for cp in self.model.components.values()
if Dispersion in cp.__class__.__bases__
)
dof -= 1
return dof
[docs] def get_data_error(self, scaled=True):
"""Get data errors.
Parameters
----------
scaled: bool, optional
If errors get scaled by the noise model.
"""
return self.model.scaled_dm_uncertainty(self.toas) if scaled else self.dm_error
def calc_resids(self):
model_value = self.get_model_value(self.toas)[self.relevant_toas]
resids = self.dm_data - model_value
if self.subtract_mean:
if self.use_weighted_mean:
# Errs for weighted sum. Units don't matter since they will
# cancel out in the weighted sum.
if self.dm_error is None or np.any(self.dm_error == 0):
raise ValueError(
"Some DM errors are zero - cannot calculate the "
"weighted residuals."
)
wm = np.average(resids, weights=1.0 / (self.dm_error**2))
resids -= wm
else:
resids -= resids.mean()
return resids
[docs] def calc_chi2(self):
data_errors = self.get_data_error()
if (data_errors == 0.0).any():
return np.inf
try:
return ((self.resids / data_errors) ** 2.0).sum().decompose().value
except ValueError:
return ((self.resids / data_errors) ** 2.0).sum().decompose()
[docs] def rms_weighted(self):
"""Compute weighted RMS of the residuals in time."""
scaled_errors = self.get_data_error()
if np.any(scaled_errors.value == 0):
raise ValueError(
"Some DM errors are zero - cannot calculate weighted RMS of residuals"
)
w = 1.0 / (scaled_errors**2)
wmean, werr, wsdev = weighted_mean(self.resids, w, sdev=True)
return wsdev
[docs] def get_dm_data(self):
"""Get the independent measured DM data from TOA flags.
This is to extract DM and uncertainty data from its representation in
the flags on TOAs.
FIXME: there should be a ``set_dm_data`` function.
Returns
-------
valid_dm: `numpy.ndarray`
Independent measured DM data from TOA line. It only returns the DM
values that is present in the TOA flags.
valid_error: `numpy.ndarray`
The error associated with DM values in the TOAs.
valid_index: list
The TOA with DM data index.
"""
dm_data, valid_data = self.toas.get_flag_value("pp_dm", as_type=float)
dm_error, valid_error = self.toas.get_flag_value("pp_dme", as_type=float)
if valid_data == []:
raise ValueError("Input TOA object does not have wideband DM values")
# Check valid error, if an error is none, change it to zero
if valid_data != valid_error:
raise ValueError("Input TOA object' DM data and DM errors do not match.")
valid_dm = np.array(dm_data)[valid_data]
valid_error = np.array(dm_error)[valid_error]
return valid_dm * self.unit, valid_error * self.unit, valid_data
[docs] def update_model(self, new_model, **kwargs):
"""Up date DM models from a new PINT timing model
Parameters
----------
new_model : `pint.timing_model.TimingModel`
"""
self.model = new_model
self.model_func = self.model.dm_value
residual_map = {"toa": Residuals, "dm": WidebandDMResiduals}
[docs]class CombinedResiduals:
"""Collect results from different type of residuals.
Parameters
----------
residuals: List of residual objects
A list of different types of residual objects
Note
----
Since different type of residuals has different units, the overall
residuals will have no units.
"""
def __init__(self, residuals):
self.residual_objs = collections.OrderedDict()
for res in residuals:
res._is_combined = True
self.residual_objs[res.residual_type] = res
# For residual debugging
self.debug_info = {}
@property
def model(self):
"""Return the single timing model object."""
raise AttributeError(
"Combined residuals object does not provide a "
"single timing model object. Please use the "
"dedicated subclass."
)
@property
def _combined_resids(self):
"""Residuals from all of the residual types."""
all_resids = [res.resids_value for res in self.residual_objs.values()]
return np.hstack(all_resids)
@property
def _combined_data_error(self):
# Since it is the combined residual, the units are removed.
dr = self.data_error
return np.hstack([rv.value for rv in dr.values()])
@property
def unit(self):
return {k: v.unit for k, v in self.residual_objs.items()}
@property
def chi2(self):
return sum(res.chi2 for res in self.residual_objs.values())
@property
def data_error(self):
errors = [
(rs.residual_type, rs.get_data_error())
for rs in self.residual_objs.values()
]
return collections.OrderedDict(errors)
[docs] def rms_weighted(self):
"""Compute weighted RMS of the residuals in time."""
if np.any(self._combined_data_error == 0):
raise ValueError(
"Some data errors are zero - cannot calculate weighted RMS of residuals"
)
wrms = {}
for rs in self.residual_objs.values():
w = 1.0 / (rs.get_data_error() ** 2)
wmean, werr, wsdev = weighted_mean(rs.resids, w, sdev=True)
wrms[rs.residual_type] = wsdev
return wrms
[docs]class WidebandTOAResiduals(CombinedResiduals):
"""A class for handling the wideband toa residuals.
Wideband TOAs have independent measurement of DM values. The residuals for
wideband TOAs have two parts, the TOA residuals and DM residuals. Both
residuals will be used for fitting one timing model. Currently, the DM
values are stored at the TOA object.
The TOA and DM residuals are probably best accessed using the ``.toa`` and
``.dm`` properties.
This class inherits the ``.chi2`` property from :class:`pint.residuals.CombinedResiduals`.
Parameters
----------
toas: :class:`pint.toa.TOAs`, optional
The input TOAs object. Default: None
model: :class:`pint.models.timing_model.TimingModel`, optional
The input timing model. Default: None
toa_resid_args: dict, optional
The additional arguments(not including toas and model) for TOA residuals.
Default: {}
dm_resid_args: dict, optional
The additional arguments(not including toas and model) for DM residuals.
Default: {}
"""
def __init__(self, toas, model, toa_resid_args={}, dm_resid_args={}):
self.toas = toas
self._model = model
toa_resid = Residuals(
self.toas, self.model, residual_type="toa", **toa_resid_args
)
dm_resid = Residuals(self.toas, self.model, residual_type="dm", **dm_resid_args)
self._chi2 = None
super().__init__([toa_resid, dm_resid])
@property
def toa(self):
"""Residuals object containing the TOA residuals."""
return self.residual_objs["toa"]
@property
def dm(self):
"""WidebandDMResiduals object containing the DM residuals."""
return self.residual_objs["dm"]
@property
def chi2(self):
"""Compute chi-squared as needed and cache the result."""
if self._chi2 is None:
self._chi2 = self.calc_chi2()
assert self._chi2 is not None
return self._chi2
[docs] def calc_chi2(self, full_cov=False):
"""Return the weighted chi-squared for the model and toas.
If the errors on the TOAs are independent this is a straightforward
calculation, but if the noise model introduces correlated errors then
obtaining a meaningful chi-squared value requires a Cholesky
decomposition. This is carried out, here, by constructing a GLSFitter
and asking it to do the chi-squared computation but not a fit.
The return value here is available as self.chi2, which will not
redo the computation unless necessary.
The chi-squared value calculated here is suitable for use in downhill
minimization algorithms and Bayesian approaches.
Handling of problematic results - degenerate conditions explored by
a minimizer for example - may need to be checked to confirm that they
correctly return infinity.
"""
log.debug("Using wideband GLS fitter to compute residual chi2")
# Use GLS but don't actually fit
from pint.fitter import WidebandTOAFitter
m = copy.deepcopy(self.model)
m.free_params = []
f = WidebandTOAFitter(
self.toas, m, additional_args=dict(toa=dict(track_mode=self.toa.track_mode))
)
try:
return f.fit_toas(maxiter=1, full_cov=full_cov)
except LinAlgError as e:
log.warning(
"Degenerate conditions encountered when "
"computing chi-squared: %s" % (e,)
)
return np.inf
@property
def model(self):
"""The model used to construct the residuals.
Modifying this model, even changing its parameters, may have confusing
effects. It is probably best to use :func:`copy.deepcopy` to duplicate
it before making any changes.
"""
return self._model
@property
def dof(self):
"""The number of degrees of freedom for the wideband residuals."""
dof = len(self._combined_resids)
dof -= len(self.model.free_params) + 1
return dof
@property
def reduced_chi2(self):
"""Return the weighted reduced chi-squared."""
return self.chi2 / self.dof