"""Objects for managing the procedure of fitting models to TOAs.
The objects defined here can be used to set up a fitting problem and carry out
the fit, adjust the model, and repeat the fit as necessary.
The primary objects of interest will be :class:`pint.fitter.WLSFitter` for
basic fitting, :class:`pint.fitter.GLSFitter` for fitting with noise models
that imply correlated errors, and :class:`pint.fitter.WidebandTOAFitter` for
TOAs that contain DM information. However, the Downhill fitter variants may offer better convergence.
Fitters in use::
>>> fitter = WLSFitter(toas, model)
>>> fitter.fit_toas()
59.57431562376629118
>>> fitter.print_summary()
Fitted model using weighted_least_square method with 5 free parameters to 62 TOAs
Prefit residuals Wrms = 1090.5802622239905 us, Postfit residuals Wrms = 21.182038012901092 us
Chisq = 59.574 for 56 d.o.f. for reduced Chisq of 1.064
PAR Prefit Postfit Units
=================== ==================== ============================ =====
PSR 1748-2021E None
EPHEM DE421 None
UNITS TDB None
START 53478.3 d
FINISH 54187.6 d
POSEPOCH 53750 d
PX 0 mas
RAJ 17h48m52.75s 17h48m52.8003s +/- 0.00014 hourangle_second
DECJ -20d21m29s -20d21m29.3833s +/- 0.033 arcsec
PMRA 0 mas / yr
PMDEC 0 mas / yr
F0 61.4855 61.485476554373(18) Hz
F1 -1.181e-15 -1.1813(14)x10-15 Hz / s
PEPOCH 53750 d
CORRECT_TROPOSPHERE N None
PLANET_SHAPIRO N None
NE_SW 0 1 / cm3
SWM 0
DM 223.9 224.114(35) pc / cm3
DM1 0 pc / (cm3 yr)
TZRMJD 53801.4 d
TZRSITE 1 None
TZRFRQ 1949.61 MHz
Derived Parameters:
Period = 0.01626400340437608 s +/- 4.784091376048965e-15 s
Pdot = 3.1248325489308735e-19 +/- 3.8139606793005067e-22
Characteristic age = 8.246e+08 yr (braking index = 3)
Surface magnetic field = 2.28e+09 G
Magnetic field at light cylinder = 4806 G
Spindown Edot = 2.868e+33 erg / s (I=1e+45 cm2 g)
To automatically select a fitter based on the properties of the data and model::
>>> fitter = Fitter.auto(toas, model)
"""
import contextlib
import copy
from warnings import warn
import astropy.units as u
import numpy as np
import scipy.linalg
import scipy.optimize as opt
from loguru import logger as log
from numdifftools import Hessian
import pint
import pint.utils
import pint.derived_quantities
from pint.models.parameter import (
AngleParameter,
boolParameter,
strParameter,
)
from pint.pint_matrix import (
CorrelationMatrix,
CovarianceMatrix,
CovarianceMatrixMaker,
DesignMatrixMaker,
combine_covariance_matrix,
combine_design_matrices_by_param,
combine_design_matrices_by_quantity,
)
from pint.residuals import Residuals, WidebandTOAResiduals
from pint.toa import TOAs
from pint.utils import FTest, normalize_designmatrix
__all__ = [
"Fitter",
"WLSFitter",
"GLSFitter",
"WidebandTOAFitter",
"PowellFitter",
"DownhillFitter",
"DownhillWLSFitter",
"DownhillGLSFitter",
"WidebandDownhillFitter",
"WidebandLMFitter",
"ConvergenceFailure",
"StepProblem",
"MaxiterReached",
]
try:
from functools import cached_property
except ImportError:
# not supported in python 3.7
# This is just the code from python 3.8
from _thread import RLock
_NOT_FOUND = object()
class cached_property:
def __init__(self, func):
self.func = func
self.attrname = None
self.__doc__ = func.__doc__
self.lock = RLock()
def __set_name__(self, owner, name):
if self.attrname is None:
self.attrname = name
elif name != self.attrname:
raise TypeError(
"Cannot assign the same cached_property to two different names "
f"({self.attrname!r} and {name!r})."
)
def __get__(self, instance, owner=None):
if instance is None:
return self
if self.attrname is None:
raise TypeError(
"Cannot use cached_property instance without calling __set_name__ on it."
)
try:
cache = instance.__dict__
except AttributeError:
# not all objects have __dict__ (e.g. class defines slots)
msg = (
f"No '__dict__' attribute on {type(instance).__name__!r} "
f"instance to cache {self.attrname!r} property."
)
raise TypeError(msg) from None
val = cache.get(self.attrname, _NOT_FOUND)
if val is _NOT_FOUND:
with self.lock:
# check if another thread filled cache while we awaited lock
val = cache.get(self.attrname, _NOT_FOUND)
if val is _NOT_FOUND:
val = self.func(instance)
try:
cache[self.attrname] = val
except TypeError:
msg = (
f"The '__dict__' attribute on {type(instance).__name__!r} instance "
f"does not support item assignment for caching {self.attrname!r} property."
)
raise TypeError(msg) from None
return val
[docs]class DegeneracyWarning(UserWarning):
pass
[docs]class ConvergenceFailure(ValueError):
pass
[docs]class MaxiterReached(ConvergenceFailure):
pass
[docs]class StepProblem(ConvergenceFailure):
pass
[docs]class Fitter:
"""Base class for objects encapsulating fitting problems.
The fitting function should be defined as the fit_toas() method.
The Fitter object makes a :func:`copy.deepcopy` of the model and stores it
in the `.model` attribute. This is the model used for fitting, and it can
be modified, for example by freezing or thawing parameters
(``fitter.model.F0.frozen = False``). When
``.fit_toas()`` is executed this model will be updated to reflect the
results of the fitting process.
The Fitter also caches a copy of the original model so it can be restored
with ``reset_model()``.
Try :func:`pint.fitter.Fitter.auto` to automatically get the appropriate fitter type
Attributes
----------
model : :class:`pint.models.timing_model.TimingModel`
The model the fitter is working on. Once ``fit_toas()`` has been run,
this model will be modified to reflect the results.
model_init : :class:`pint.models.timing_model.TimingModel`
The initial, prefit model.
Parameters
----------
toas : a pint TOAs instance
The input toas.
model : a pint timing model instance
The initial timing model for fitting.
track_mode : str, optional
How to handle phase wrapping. This is used when creating
:class:`pint.residuals.Residuals` objects, and its meaning is defined there.
residuals : :class:`pint.residuals.Residuals`
Initial residuals. This argument exists to support an optimization, where
``GLSFitter`` is used to compute ``chi2`` for appropriate Residuals objects.
"""
def __init__(self, toas, model, track_mode=None, residuals=None):
if not set(model.free_params).issubset(model.fittable_params):
free_unfittable_params = set(model.free_params).difference(
model.fittable_params
)
raise ValueError(
f"Cannot create fitter because the following unfittable parameters "
f"were found unfrozen in the model: {free_unfittable_params}. "
f"Freeze these parameters before creating the fitter."
)
self.toas = toas
self.model_init = model
self.track_mode = track_mode
if residuals is None:
self.resids_init = self.make_resids(self.model_init)
else:
# residuals were provided, we're just going to use them
self.resids_init = residuals
# probably using GLSFitter to compute a chi-squared
self.model = copy.deepcopy(self.model_init)
self.resids = copy.deepcopy(self.resids_init)
self.fitresult = []
self.method = None
self.is_wideband = False
self.converged = False
[docs] @classmethod
def auto(
cls, toas, model, downhill=True, track_mode=None, residuals=None, **kwargs
):
"""Automatically return the proper :class:`pint.fitter.Fitter` object depending on the TOAs and model.
In general the `downhill` fitters are to be preferred.
See https://github.com/nanograv/PINT/wiki/How-To#choose-a-fitter for the logic used.
Parameters
----------
toas : a pint TOAs instance
The input toas.
model : a pint timing model instance
The initial timing model for fitting.
downhill : bool, optional
Whether or not to use the downhill fitter variant
track_mode : str, optional
How to handle phase wrapping. This is used when creating
:class:`pint.residuals.Residuals` objects, and its meaning is defined there.
residuals : :class:`pint.residuals.Residuals`
Initial residuals. This argument exists to support an optimization, where
``GLSFitter`` is used to compute ``chi2`` for appropriate Residuals objects.
Returns
-------
:class:`pint.fitter.Fitter`
Returns appropriate subclass
"""
if toas.wideband:
if downhill:
log.info(
"For wideband TOAs and downhill fitter, returning 'WidebandDownhillFitter'"
)
return WidebandDownhillFitter(
toas, model, track_mode=track_mode, residuals=residuals, **kwargs
)
else:
log.info(
"For wideband TOAs and non-downhill fitter, returning 'WidebandTOAFitter'"
)
return WidebandTOAFitter(toas, model, track_mode=track_mode, **kwargs)
elif model.has_correlated_errors:
if downhill:
log.info(
"For narrowband TOAs with correlated errors and downhill fitter, returning 'DownhillGLSFitter'"
)
return DownhillGLSFitter(
toas,
model,
track_mode=track_mode,
residuals=residuals,
**kwargs,
)
else:
log.info(
"For narrowband TOAs with correlated errors and non-downhill fitter, returning 'GLSFitter'"
)
return GLSFitter(
toas,
model,
track_mode=track_mode,
residuals=residuals,
**kwargs,
)
elif downhill:
log.info(
"For narrowband TOAs without correlated errors and downhill fitter, returning 'DownhillWLSFitter'"
)
return DownhillWLSFitter(
toas,
model,
track_mode=track_mode,
residuals=residuals,
**kwargs,
)
else:
log.info(
"For narrowband TOAs without correlated errors and non-downhill fitter, returning 'WLSFitter'"
)
return WLSFitter(
toas,
model,
track_mode=track_mode,
residuals=residuals,
**kwargs,
)
[docs] def fit_toas(self, maxiter=None, debug=False):
"""Run fitting operation.
This method needs to be implemented by subclasses. All implementations
should call ``self.model.validate()`` and
``self.model.validate_toas()`` before doing the fitting.
"""
raise NotImplementedError
[docs] def get_summary(self, nodmx=False):
"""Return a human-readable summary of the Fitter results.
Parameters
----------
nodmx : bool
Set to True to suppress printing DMX parameters in summary
"""
# Need to check that fit has been done first!
if not hasattr(self, "parameter_covariance_matrix"):
log.warning(
"fit_toas() has not been run, so pre-fit and post-fit will be the same!"
)
from uncertainties import ufloat
# Check if Wideband or not
is_wideband = self.is_wideband
# First, print fit quality metrics
s = f"Fitted model using {self.method} method with {len(self.model.free_params)} free parameters to {self.toas.ntoas} TOAs\n"
if is_wideband:
s += f"Prefit TOA residuals Wrms = {self.resids_init.toa.rms_weighted()}, Postfit TOA residuals Wrms = {self.resids.toa.rms_weighted()}\n"
s += f"Prefit DM residuals Wrms = {self.resids_init.dm.rms_weighted()}, Postfit DM residuals Wrms = {self.resids.dm.rms_weighted()}\n"
else:
s += f"Prefit residuals Wrms = {self.resids_init.rms_weighted()}, Postfit residuals Wrms = {self.resids.rms_weighted()}\n"
s += f"Chisq = {self.resids.chi2:.3f} for {self.resids.dof} d.o.f. for reduced Chisq of {self.resids.reduced_chi2:.3f}\n"
s += "\n"
# to handle all parameter names, determine the longest length for the first column
longestName = 0 # optionally specify the minimum length here instead of 0
for pn in self.model.params:
if nodmx and pn.startswith("DMX"):
continue
if len(pn) > longestName:
longestName = len(pn)
# convert to a string to insert before the format call
spacingName = str(longestName)
# Next, print the model parameters
s += ("{:<" + spacingName + "s} {:^20s} {:^28s} {}\n").format(
"PAR", "Prefit", "Postfit", "Units"
)
s += ("{:<" + spacingName + "s} {:>20s} {:>28s} {}\n").format(
"=" * longestName, "=" * 20, "=" * 28, "=" * 5
)
for pn in self.model.params:
if nodmx and pn.startswith("DMX"):
continue
prefitpar = getattr(self.model_init, pn)
par = getattr(self.model, pn)
if par.value is not None:
if isinstance(par, strParameter):
s += ("{:" + spacingName + "s} {:>20s} {:28s} {}\n").format(
pn,
prefitpar.value if prefitpar.value is not None else "",
par.value,
par.units,
)
elif isinstance(par, AngleParameter):
# Add special handling here to put uncertainty into arcsec
if par.frozen:
s += ("{:" + spacingName + "s} {:>20s} {:>28s} {} \n").format(
pn, str(prefitpar.quantity), "", par.units
)
else:
uncertainty_unit = (
pint.hourangle_second
if par.units == u.hourangle
else u.arcsec
)
s += (
"{:" + spacingName + "s} {:>20s} {:>16s} +/- {:.2g} \n"
).format(
pn,
str(prefitpar.quantity),
str(par.quantity),
par.uncertainty.to(uncertainty_unit),
)
elif isinstance(par, boolParameter):
s += ("{:" + spacingName + "s} {:>20s} {:28s} {}\n").format(
pn, prefitpar.str_quantity(prefitpar.value), "", par.units
)
elif par.frozen:
if par.name in ["START", "FINISH"] and prefitpar.value is None:
s += ("{:" + spacingName + "s} {:20s} {:28g} {} \n").format(
pn, " ", par.value, par.units
)
elif par.name in ["START", "FINISH"]:
s += ("{:" + spacingName + "s} {:20g} {:28g} {} \n").format(
pn, prefitpar.value, par.value, par.units
)
elif (
par.name in ["CHI2", "CHI2R", "TRES", "DMRES"]
and prefitpar.value is None
):
s += ("{:" + spacingName + "s} {:20s} {:28g} {} \n").format(
pn, " ", par.value, par.units
)
elif par.name in ["CHI2", "CHI2R", "TRES", "DMRES"]:
s += ("{:" + spacingName + "s} {:20g} {:28g} {} \n").format(
pn, prefitpar.value, par.value, par.units
)
else:
s += ("{:" + spacingName + "s} {:20g} {:28s} {} \n").format(
pn, prefitpar.value, "", par.units
)
else:
# s += "{:14s} {:20g} {:20g} {:20.2g} {} \n".format(
# pn,
# prefitpar.value,
# par.value,
# par.uncertainty.value,
# par.units,
# )
s += ("{:" + spacingName + "s} {:20g} {:28SP} {} \n").format(
pn,
prefitpar.value,
ufloat(par.value, par.uncertainty.value),
par.units,
)
s += "\n" + self.model.get_derived_params()
return s
[docs] def get_derived_params(self, returndict=False):
"""Return a string with various derived parameters from the fitted model
Parameters
----------
returndict : bool, optional
Whether to only return the string of results or also a dictionary
Returns
-------
results : str
parameters : dict, optional
See Also
--------
:func:`pint.models.timing_model.TimingModel.get_derived_params`
"""
return self.model.get_derived_params(
rms=self.resids.toa.rms_weighted()
if self.is_wideband
else self.resids.rms_weighted(),
ntoas=self.toas.ntoas,
returndict=returndict,
)
[docs] def print_summary(self):
"""Write a summary of the TOAs to stdout."""
print(self.get_summary())
[docs] def plot(self):
"""Make residuals plot.
This produces a time residual plot.
"""
import matplotlib.pyplot as plt
from astropy.visualization import quantity_support
quantity_support()
fig, ax = plt.subplots(figsize=(16, 9))
mjds = self.toas.get_mjds()
ax.errorbar(mjds, self.resids.time_resids, yerr=self.toas.get_errors(), fmt="+")
ax.set_xlabel("MJD")
ax.set_ylabel("Residuals")
try:
psr = self.model.PSR
except AttributeError:
psr = self.model.PSRJ
else:
psr = "Residuals"
ax.set_title(psr)
ax.grid(True)
plt.show()
[docs] def update_model(self, chi2=None):
"""Update the model to reflect fit results and TOA properties.
This is called by ``fit_toas`` to ensure that parameters like
``START``, ``FINISH``, ``EPHEM``, and ``DMDATA`` are set in the model
to reflect the TOAs in actual use.
"""
self.model.START.value = self.toas.first_MJD
self.model.FINISH.value = self.toas.last_MJD
self.model.NTOA.value = len(self.toas)
self.model.EPHEM.value = self.toas.ephem
self.model.DMDATA.value = hasattr(self.resids, "dm")
self.model.CLOCK.value = (
f"TT({self.toas.clock_corr_info['bipm_version']})"
if self.toas.clock_corr_info["include_bipm"]
else "TT(TAI)"
)
if chi2 is not None:
# assume a fit has been done
self.model.CHI2.value = chi2
self.model.CHI2R.value = chi2 / self.resids.dof
if not self.is_wideband:
self.model.TRES.quantity = self.resids.rms_weighted()
else:
self.model.TRES.quantity = self.resids.rms_weighted()["toa"]
self.model.DMRES.quantity = self.resids.rms_weighted()["dm"]
[docs] def reset_model(self):
"""Reset the current model to the initial model."""
self.model = copy.deepcopy(self.model_init)
self.update_resids()
self.fitresult = []
[docs] def update_resids(self):
"""Update the residuals.
Run after updating a model parameter.
"""
self.resids = self.make_resids(self.model)
def make_resids(self, model):
return Residuals(toas=self.toas, model=model, track_mode=self.track_mode)
[docs] def get_designmatrix(self):
"""Return the model's design matrix for these TOAs."""
return self.model.designmatrix(toas=self.toas, incfrozen=False, incoffset=True)
def _get_corr_cov_matrix(
self, matrix_type, with_phase, pretty_print, prec, usecolor
):
if hasattr(self, f"parameter_{matrix_type}_matrix"):
cm = getattr(self, f"parameter_{matrix_type}_matrix")
if not pretty_print:
return cm.prettyprint(prec=prec, offset=with_phase)
else:
print(cm.prettyprint(prec=prec, offset=with_phase, usecolor=usecolor))
else:
log.error(
f"You must run .fit_toas() before accessing the {matrix_type} matrix"
)
raise AttributeError
[docs] def get_parameter_covariance_matrix(
self, with_phase=False, pretty_print=False, prec=3
):
"""Show the parameter covariance matrix post-fit.
If with_phase, then show and return the phase column as well.
If pretty_print, then also pretty-print on stdout the matrix.
prec is the precision of the floating point results.
"""
return self._get_corr_cov_matrix(
"covariance", with_phase, pretty_print, prec, "False"
)
[docs] def get_parameter_correlation_matrix(
self, with_phase=False, pretty_print=False, prec=3, usecolor=True
):
"""Show the parameter correlation matrix post-fit.
If with_phase, then show and return the phase column as well.
If pretty_print, then also pretty-print on stdout the matrix.
prec is the precision of the floating point results. If
usecolor is True, then pretty printing will have color.
"""
return self._get_corr_cov_matrix(
"correlation", with_phase, pretty_print, prec, usecolor
)
[docs] def ftest(self, parameter, component, remove=False, full_output=False, maxiter=1):
"""Compare the significance of adding/removing parameters to a timing model.
Parameters
-----------
parameter : PINT parameter object
(may be a list of parameter objects)
component : String
Name of component of timing model that the parameter should be added to (may be a list)
The number of components must equal number of parameters.
remove : Bool
If False, will add the listed parameters to the model. If True will remove the input
parameters from the timing model.
full_output : Bool
If False, just returns the result of the F-Test. If True, will also return the new
model's residual RMS (us), chi-squared, and number of degrees of freedom of
new model.
maxiter : int
How many times to run the linear least-squares fit, re-evaluating
the derivatives at each step for the F-tested model. Default is one.
Returns
--------
dictionary
ft : Float
F-test significance value for the model with the larger number of
components over the other. Computed with pint.utils.FTest().
resid_rms_test : Float (Quantity)
If full_output is True, returns the RMS of the residuals of the tested model
fit. Will be in units of microseconds as an astropy quantity. If wideband fitter
this will be the time residuals.
resid_wrms_test : Float (Quantity)
If full_output is True, returns the Weighted RMS of the residuals of the tested model
fit. Will be in units of microseconds as an astropy quantity. If wideband fitter
this will be the time residuals.
chi2_test : Float
If full_output is True, returns the chi-squared of the tested model. If wideband
fitter this will be the total chi-squared of the combined residual.
dof_test : Int
If full_output is True, returns the degrees of freedom of the tested model.
If wideband fitter this will be the total chi-squared of the combined residual.
dm_resid_rms_test : Float (Quantity)
If full_output is True and a wideband timing fitter is used, returns the
RMS of the DM residuals of the tested model fit. Will be in units of
pc/cm^3 as an astropy quantity.
dm_resid_wrms_test : Float (Quantity)
If full_output is True and a wideband timing fitter is used, returns the
Weighted RMS of the DM residuals of the tested model fit. Will be in units of
pc/cm^3 as an astropy quantity.
"""
# Check if Wideband or not
NB = not self.is_wideband
# Copy the fitter that we do not change the initial model and fitter
fitter_copy = copy.deepcopy(self)
# We need the original degrees of freedom and chi-squared value
# Because this applies to nested models, model 1 must always have fewer parameters
if remove:
dof_2 = self.resids.dof
chi2_2 = self.resids.chi2
else:
dof_1 = self.resids.dof
chi2_1 = self.resids.chi2
# Single inputs are converted to lists to handle arb. number of parameters
if type(parameter) is not list:
parameter = [parameter]
# also do the components
if type(component) is not list:
component = [component]
# if not the same length, exit with error
if len(parameter) != len(component):
raise RuntimeError(
"Number of input parameters must match number of input components."
)
# Now check if we want to remove or add components; start with removing
if remove:
# Set values to zero and freeze them
for p in parameter:
getattr(fitter_copy.model, "{:}".format(p.name)).value = 0.0
getattr(fitter_copy.model, "{:}".format(p.name)).uncertainty_value = 0.0
getattr(fitter_copy.model, "{:}".format(p.name)).frozen = True
# validate and setup model
fitter_copy.model.validate()
fitter_copy.model.setup()
# Now refit
fitter_copy.fit_toas(maxiter=maxiter)
# FIXME: check convergence
# Now get the new values
dof_1 = fitter_copy.resids.dof
chi2_1 = fitter_copy.resids.chi2
else:
# Dictionary of parameters to check to makes sure input value isn't zero
check_params = {
"M2": 0.25,
"SINI": 0.8,
"PB": 10.0,
"T0": 54000.0,
"FB0": 1.1574e-6,
}
# Add the parameters
for ii in range(len(parameter)):
# Check if parameter already exists in model
if hasattr(fitter_copy.model, "{:}".format(parameter[ii].name)):
# Set frozen to False
getattr(
fitter_copy.model, "{:}".format(parameter[ii].name)
).frozen = False
# Check if parameter is one that needs to be checked
if (
parameter[ii].name in check_params
and parameter[ii].value == 0.0
):
log.warning(
f"Default value for {parameter[ii].name} cannot be 0, resetting to {check_params[parameter[ii].name]}"
)
parameter[ii].value = check_params[parameter[ii].name]
getattr(
fitter_copy.model, "{:}".format(parameter[ii].name)
).value = parameter[ii].value
else:
fitter_copy.model.components[component[ii]].add_param(
parameter[ii], setup=True
)
# validate and setup model
fitter_copy.model.validate()
fitter_copy.model.setup()
# Now refit
fitter_copy.fit_toas(maxiter=maxiter)
# FIXME: check convergence
# Now get the new values
dof_2 = fitter_copy.resids.dof
chi2_2 = fitter_copy.resids.chi2
# Now run the actual F-test
ft = FTest(chi2_1, dof_1, chi2_2, dof_2)
if not full_output:
return {"ft": ft}
if remove:
dof_test = dof_1
chi2_test = chi2_1
else:
dof_test = dof_2
chi2_test = chi2_2
if NB:
resid_rms_test = fitter_copy.resids.time_resids.std().to(u.us)
resid_wrms_test = fitter_copy.resids.rms_weighted() # units: us
return {
"ft": ft,
"resid_rms_test": resid_rms_test,
"resid_wrms_test": resid_wrms_test,
"chi2_test": chi2_test,
"dof_test": dof_test,
}
else:
# Return the dm and time resid values separately
resid_rms_test = fitter_copy.resids.toa.time_resids.std().to(u.us)
resid_wrms_test = fitter_copy.resids.toa.rms_weighted() # units: us
dm_resid_rms_test = fitter_copy.resids.dm.resids.std()
dm_resid_wrms_test = fitter_copy.resids.dm.rms_weighted()
return {
"ft": ft,
"resid_rms_test": resid_rms_test,
"resid_wrms_test": resid_wrms_test,
"chi2_test": chi2_test,
"dof_test": dof_test,
"dm_resid_rms_test": dm_resid_rms_test,
"dm_resid_wrms_test": dm_resid_wrms_test,
}
[docs] def minimize_func(self, x, *args):
"""Wrapper function for the residual class.
This is meant to be passed to
``scipy.optimize.minimize``. The function must take a single list of input
values, x, and a second optional tuple of input arguments. It returns
a quantity to be minimized (in this case chi^2).
"""
self.set_params(dict(zip(args, x)))
self.update_resids()
# Return chi^2
return self.resids.chi2
[docs] def get_params_dict(self, which="free", kind="quantity"):
"""Return a dict mapping parameter names to values.
See :func:`pint.models.timing_model.TimingModel.get_params_dict`.
"""
return self.model.get_params_dict(which=which, kind=kind)
[docs] def set_fitparams(self, *params):
"""Update the "frozen" attribute of model parameters. Deprecated."""
warn(
"This function is confusing and deprecated. Set self.model.free_params instead.",
category=DeprecationWarning,
)
# TODO, maybe reconsider for the input?
fit_params_name = []
if isinstance(params[0], (list, tuple)):
params = params[0]
for pn in params:
if pn in self.model.params:
fit_params_name.append(pn)
else:
rn = self.model.match_param_aliases(pn)
if rn != "":
fit_params_name.append(rn)
else:
raise ValueError(f"Unrecognized parameter {pn}")
self.model.fit_params = fit_params_name
[docs] def get_allparams(self):
"""Return a dict of all param names and values. Deprecated."""
warn(
"This function is confusing and deprecated. Use self.model.get_params_dict.",
category=DeprecationWarning,
)
return self.model.get_params_dict("all", "quantity")
[docs] def get_fitparams(self):
"""Return a dict of fittable param names and quantity. Deprecated."""
warn(
"This function is confusing and deprecated. Use self.model.get_params_dict.",
category=DeprecationWarning,
)
return self.model.get_params_dict("free", "quantity")
[docs] def get_fitparams_num(self):
"""Return a dict of fittable param names and numeric values. Deprecated."""
warn(
"This function is confusing and deprecated. Use self.model.get_params_dict.",
category=DeprecationWarning,
)
return self.model.get_params_dict("free", "num")
[docs] def get_fitparams_uncertainty(self):
"""Return a dict of fittable param names and numeric values. Deprecated."""
warn(
"This function is confusing and deprecated. Use self.model.get_params_dict.",
category=DeprecationWarning,
)
return self.model.get_params_dict("free", "uncertainty")
[docs] def set_params(self, fitp):
"""Set the model parameters to the value contained in the input dict.
See :func:`pint.models.timing_model.TimingModel.set_param_values`.
"""
self.model.set_param_values(fitp)
[docs] def set_param_uncertainties(self, fitp):
"""Set the model parameters to the value contained in the input dict.
See :func:`pint.models.timing_model.TimingModel.set_param_uncertainties`.
"""
self.model.set_param_uncertainties(fitp)
@property
def covariance_matrix(self):
warn(
"This parameter is deprecated. Use `parameter_covariance_matrix` instead of `covariance_matrix`",
category=DeprecationWarning,
)
return self.parameter_covariance_matrix
[docs]class InvalidModelParameters(ValueError):
pass
[docs]class ModelState:
"""Record a model state and cache calculations
This class keeps track of a particular model state and all the associated
matrices - design matrices, singular value decompositions, what have you -
that are needed to compute a step and evaluate the quality of the fit.
These objects should be regarded as immutable but lazily evaluated.
"""
def __init__(self, fitter, model):
self.fitter = fitter
self.model = model
@cached_property
def resids(self):
try:
return self.fitter.make_resids(self.model)
except ValueError as e:
raise InvalidModelParameters("Step landed at invalid point") from e
@cached_property
def chi2(self):
# there may be some shareable computation here
try:
return self.resids.chi2
except ValueError as e:
raise InvalidModelParameters("Cannot compute chi2") from e
@cached_property
def step(self):
raise NotImplementedError
@cached_property
def parameter_covariance_matrix(self):
raise NotImplementedError
@property
def covariance_matrix(self):
warn(
"This parameter is deprecated. Use `parameter_covariance_matrix` instead of `covariance_matrix`",
category=DeprecationWarning,
)
return self.parameter_covariance_matrix
[docs] def predicted_chi2(self, step, lambda_):
"""Predict the chi2 after taking a step based on the linear approximation"""
raise NotImplementedError
[docs] def take_step_model(self, step, lambda_=1):
"""Make a new model reflecting the new parameters."""
# log.debug(f"Taking step {lambda_} * {list(zip(self.params, step))}")
new_model = copy.deepcopy(self.model)
for p, s in zip(self.params, step * lambda_):
try:
with contextlib.suppress(ValueError):
log.trace(f"Adjusting {getattr(self.model, p)} by {s}")
pm = getattr(new_model, p)
if pm.value is None:
pm.value = 0
pm.value += s
# getattr(new_model, p).value = getattr(self.model, p).value + s
# getattr(self.model, p) + s
# getattr(new_model, p).value = s
except AttributeError:
if p != "Offset":
log.warning(f"Unexpected parameter {p}")
return new_model
[docs] def take_step(self, step, lambda_):
"""Return a new state moved by lambda_*step."""
raise NotImplementedError
[docs]class DownhillFitter(Fitter):
"""Abstract base class for downhill fitters.
These fitters use the algorithm implemented here, in
:func:`pint.fitter.DownhillFitter.fit_toas` to work their way towards a
solution, keeping track of convergence. The linear algebra required by
various kinds of fitting is abstracted away into
:class:`pint.fitter.ModelState` objects so that this same code can be used
for correlated or uncorrelated TOA errors and narrowband or wideband TOAs.
"""
def __init__(self, toas, model, track_mode=None, residuals=None):
super().__init__(
toas=toas, model=model, residuals=residuals, track_mode=track_mode
)
self.method = "downhill_checked"
def _fit_toas(
self,
maxiter=20,
required_chi2_decrease=1e-2,
max_chi2_increase=1e-2,
min_lambda=1e-3,
debug=False,
):
"""Downhill fit implementation for fitting the timing model parameters.
The `fit_toas()` calls this method iteratively to fit the timing model parameters
while also fitting for white noise parameters.
See documentation of the `fit_toas()` method for more details."""
# setup
self.model.validate()
self.model.validate_toas(self.toas)
current_state = self.create_state()
best_state = current_state
self.converged = False
# algorithm
exception = None
for i in range(maxiter):
step = current_state.step
lambda_ = 1
chi2_decrease = 0
while True:
try:
new_state = current_state.take_step(step, lambda_)
chi2_decrease = current_state.chi2 - new_state.chi2
if new_state.chi2 < best_state.chi2:
best_state = new_state
if chi2_decrease < -max_chi2_increase:
raise InvalidModelParameters(
f"chi2 increased from {current_state.chi2} to {new_state.chi2} "
f"when trying to take a step with lambda {lambda_}"
)
log.trace(
f"Iteration {i}: "
f"Updating state, chi2 goes down by {chi2_decrease} "
f"from {current_state.chi2} "
f"to {new_state.chi2}"
)
exception = None
current_state = new_state
break
except InvalidModelParameters as e:
# This could be an exception evaluating new_state.chi2 or an increase in value
# If bad parameter values escape, look in ModelState.resids for the except
# that should catch them
lambda_ /= 2
log.trace(f"Iteration {i}: Shortening step to {lambda_}: {e}")
if lambda_ < min_lambda:
log.warning(
f"Unable to improve chi2 even with very small steps, stopping "
f"but keeping best state, message was: {e}"
)
exception = e
break
if (
-max_chi2_increase <= chi2_decrease < required_chi2_decrease
and lambda_ == 1
):
log.debug(
f"Iteration {i}: chi2 does not improve, stopping; "
f"decrease: {chi2_decrease}"
)
self.converged = True
break
if exception is not None:
break
else:
log.debug(
f"Stopping because maximum number of iterations ({maxiter}) reached"
)
self.current_state = best_state
# collect results
self.model = self.current_state.model
self.resids = self.current_state.resids
self.parameter_covariance_matrix = (
self.current_state.parameter_covariance_matrix
)
self.errors = np.sqrt(np.diag(self.parameter_covariance_matrix.matrix))
self.parameter_correlation_matrix = (
self.parameter_covariance_matrix.to_correlation_matrix()
)
for p, e in zip(self.current_state.params, self.errors):
try:
# I don't know why this fails with multiprocessing, but bypass if it does
with contextlib.suppress(ValueError):
log.trace(f"Setting {getattr(self.model, p)} uncertainty to {e}")
pm = getattr(self.model, p)
except AttributeError:
if p != "Offset":
log.warning(f"Unexpected parameter {p}")
else:
pm.uncertainty = e * pm.units
self.update_model(self.current_state.chi2)
if exception is not None:
raise StepProblem(
"Unable to improve chi2 even with very small steps"
) from exception
if not self.converged:
raise MaxiterReached(f"Convergence not detected after {maxiter} steps.")
return self.converged
[docs] def fit_toas(
self,
maxiter=20,
noise_fit_niter=2,
required_chi2_decrease=1e-2,
max_chi2_increase=1e-2,
min_lambda=1e-3,
noisefit_method="Newton-CG",
compute_noise_uncertainties=True,
debug=False,
):
"""Carry out a cautious downhill fit.
This tries to take the same steps as
:func:`pint.fitter.WLSFitter.fit_toas` or
:func:`pint.fitter.GLSFitter.fit_toas` or
:func:`pint.fitter.WidebandTOAFitter.fit_toas`. At each step, it
checks whether the new model has a better ``chi2`` than the current
one; if the new model is invalid or worse than the current one, it
tries taking a shorter step in the same direction. This can exit if it
exceeds the maximum number of iterations or if improvement is not
possible even with very short steps, or it can exit successfully if a
full-size step is taken and it does not decrease the ``chi2`` by much.
The attribute ``self.converged`` is set to True or False depending on
whether the process actually converged.
This function can also estimate white noise parameters (EFACs and EQUADs)
and their uncertainties.
If there are no free white noise parameters, this function will do one
iteration of the downhill fit (implemented in the `_fit_toas()` method).
If free white noise parameters are present, it will fit for them by numerically
maximizing the likelihood function (implemented in the `_fit_noise()` method).
The timing model fit and the noise model fit are run iteratively in an alternating
fashion. Fitting for a white noise parameter is as simple as::
fitter.model.EFAC1.frozen = False
fitter.fit_toas()
Parameters
==========
maxiter : int
Abandon the process if this many successful steps have been taken.
required_chi2_decrease : float
A full-size step that makes less than this much improvement is taken
to indicate that the fitter has converged.
max_chi2_increase : float
If this is positive, consider taking steps that slightly worsen the chi2 in hopes
of eventually finding our way downhill.
min_lambda : float
If steps are shrunk by this factor and still don't result in improvement, abandon hope
of convergence and stop.
noisefit_method: str
Algorithm used to fit for noise parameters. See the documentation for
`scipy.optimize.minimize()` for more details and available options.
"""
free_noise_params = self._get_free_noise_params()
if len(free_noise_params) == 0:
return self._fit_toas(
maxiter=maxiter,
required_chi2_decrease=required_chi2_decrease,
max_chi2_increase=required_chi2_decrease,
min_lambda=required_chi2_decrease,
debug=debug,
)
log.debug("Will fit for noise parameters.")
for ii in range(noise_fit_niter):
self._fit_toas(
maxiter=maxiter,
required_chi2_decrease=required_chi2_decrease,
max_chi2_increase=max_chi2_increase,
min_lambda=min_lambda,
debug=debug,
)
if ii == noise_fit_niter - 1 and compute_noise_uncertainties:
values, errors = self._fit_noise(
noisefit_method=noisefit_method, uncertainty=True
)
self._update_noise_params(values, errors)
else:
values = self._fit_noise(
noisefit_method=noisefit_method, uncertainty=False
)
self._update_noise_params(values)
return self._fit_toas(
maxiter=maxiter,
required_chi2_decrease=required_chi2_decrease,
max_chi2_increase=max_chi2_increase,
min_lambda=min_lambda,
debug=debug,
)
@property
def fac(self):
return self.current_state.fac
def _get_free_noise_params(self):
"""Returns a list of all free noise parameters."""
return [
fp
for fp in self.model.get_params_of_component_type("NoiseComponent")
if not getattr(self.model, fp).frozen
]
def _update_noise_params(self, values, errors=None):
"""Update the model using estimated noise parameters."""
free_noise_params = self._get_free_noise_params()
if errors is not None:
for fp, val, err in zip(free_noise_params, values, errors):
getattr(self.model, fp).value = val
getattr(self.model, fp).uncertainty_value = err
else:
for fp, val in zip(free_noise_params, values):
getattr(self.model, fp).value = val
def _fit_noise(self, noisefit_method="Newton-CG", uncertainty=False):
"""Estimate noise parameters and their uncertainties. Noise parameters
are estimated by numerically maximizing the log-likelihood function including
the normalization term. The uncertainties thereof are computed using the
numerically-evaluated Hessian."""
free_noise_params = self._get_free_noise_params()
xs0 = [getattr(self.model, fp).value for fp in free_noise_params]
model1 = copy.deepcopy(self.model)
res = Residuals(self.toas, model1)
def _mloglike(xs):
"""Negative of the log-likelihood function."""
for fp, x in zip(free_noise_params, xs):
getattr(res.model, fp).value = x
return -res.lnlikelihood()
if not res.model.has_correlated_errors:
def _mloglike_grad(xs):
"""Gradient of the negative of the log-likelihood function w.r.t. white noise parameters."""
for fp, x in zip(free_noise_params, xs):
getattr(res.model, fp).value = x
return np.array(
[
-res.d_lnlikelihood_d_param(par).value
for par in free_noise_params
]
)
maxlike_result = opt.minimize(
_mloglike, xs0, method=noisefit_method, jac=_mloglike_grad
)
else:
maxlike_result = opt.minimize(_mloglike, xs0, method="Nelder-Mead")
if uncertainty:
hess = Hessian(_mloglike)
errs = np.sqrt(np.diag(np.linalg.pinv(hess(maxlike_result.x))))
return (maxlike_result.x, errs) if uncertainty else maxlike_result.x
[docs]class WLSState(ModelState):
def __init__(self, fitter, model, threshold=None):
super().__init__(fitter, model)
self.threshold = threshold
@cached_property
def step(self):
# Define the linear system
M, params, units = self.model.designmatrix(
toas=self.fitter.toas, incfrozen=False, incoffset=True
)
# Get residuals and TOA uncertainties in seconds
Nvec = self.model.scaled_toa_uncertainty(self.fitter.toas).to(u.s).value
scaled_resids = self.resids.time_resids.to(u.s).value / Nvec
# "Whiten" design matrix and residuals by dividing by uncertainties
M = M / Nvec.reshape((-1, 1))
# For each column in design matrix except for col 0 (const. pulse
# phase), subtract the mean value, and scale by the column RMS.
# This helps avoid numerical problems later. The scaling factors need
# to be saved to recover correct parameter units.
# NOTE, We remove subtract mean value here, since it did not give us a
# fast converge fitting.
# M[:,1:] -= M[:,1:].mean(axis=0)
M, fac = normalize_designmatrix(M, params)
# Singular value decomp of design matrix:
# M = U s V^T
# Dimensions:
# M, U are Ntoa x Nparam
# s is Nparam x Nparam diagonal matrix encoded as 1-D vector
# V^T is Nparam x Nparam
U, s, Vt = scipy.linalg.svd(M, full_matrices=False)
# Note, here we could do various checks like report
# matrix condition number or zero out low singular values.
# print 'log_10 cond=', np.log10(s.max()/s.min())
# Note, Check the threshold from data precision level.Borrowed from
# np Curve fit.
threshold = self.threshold
if threshold is None:
# M is float, not longdouble
# threshold = np.finfo(float).eps * max(M.shape)
threshold = 1e-14 * max(M.shape)
log.trace(f"Singular values for fit are {s}")
bad = np.where(s <= threshold * s[0])[0]
s[bad] = np.inf
for c in bad:
bad_col = Vt[c, :]
bad_col /= abs(bad_col).max()
bad_combination = " + ".join(
[
f"{co}*{p}"
for (co, p) in sorted(zip(bad_col, params))
if abs(co) > threshold
]
)
warn(
f"Parameter degeneracy; the following linear combination yields "
f"almost no change: {bad_combination}",
DegeneracyWarning,
)
self.M = M
self.U = U
self.Vt = Vt
self.s = s
self.fac = fac
self.params = params
self.units = units
self.scaled_resids = scaled_resids
# TODO: seems like doing this on every iteration is wasteful, and we should just do it once and then update the matrix
covariance_matrix_labels = {
param: (i, i + 1, unit)
for i, (param, unit) in enumerate(zip(params, units))
}
# covariance matrix is 2D and symmetric
covariance_matrix_labels = [covariance_matrix_labels] * 2
self.parameter_covariance_matrix_labels = covariance_matrix_labels
# The delta-parameter values
# dpars = V s^-1 U^T r
# Scaling by fac recovers original units
return (Vt.T @ ((U.T @ scaled_resids) / s)) / fac
[docs] def take_step(self, step, lambda_=1):
return WLSState(
self.fitter, self.take_step_model(step, lambda_), threshold=self.threshold
)
@cached_property
def parameter_covariance_matrix(self):
# make sure we compute the SVD
self.step
# Sigma = np.dot(Vt.T / s, U.T)
# The post-fit parameter covariance matrix
# Sigma = V s^-2 V^T
Sigma = np.dot(self.Vt.T / (self.s**2), self.Vt)
return CovarianceMatrix(
(Sigma / self.fac).T / self.fac, self.parameter_covariance_matrix_labels
)
[docs]class DownhillWLSFitter(DownhillFitter):
"""Fitter that uses the shortening-step procedure for WLS fits.
Most of the machinery here is in :class:`pint.fitter.WLSState`
or :class:`pint.fitter.DownhillFitter`.
"""
def __init__(self, toas, model, track_mode=None, residuals=None):
if model.has_correlated_errors:
raise CorrelatedErrors(model)
super().__init__(
toas=toas, model=model, residuals=residuals, track_mode=track_mode
)
self.method = "downhill_wls"
[docs] def fit_toas(self, maxiter=10, threshold=None, debug=False, **kwargs):
"""Fit TOAs.
This is mostly implemented in
:func:`pint.fitter.DownhillFitter.fit_toas`.
Parameters
==========
maxiter : int
Abandon hope if convergence hasn't occurred after this many steps (successful or not).
threshold : float
Discard singular values less than this times the largest; this makes the linear algebra
a little more stable, but the Levenberg-Marquardt algorithm is supposed to do that anyway.
kwargs : dict
Any additional arguments are passed down to
:func:`pint.fitter.DownhillFitter.fit_toas`
"""
self.threshold = threshold
super().fit_toas(maxiter=maxiter, debug=debug, **kwargs)
def create_state(self):
return WLSState(self, self.model)
[docs]class GLSState(ModelState):
def __init__(self, fitter, model, full_cov=False, threshold=None):
super().__init__(fitter, model)
self.threshold = threshold
self.full_cov = full_cov
@cached_property
def step(self):
# Define the linear system
M, params, units = self.model.designmatrix(
toas=self.fitter.toas, incfrozen=False, incoffset=True
)
self.params = params
self.units = units
# TODO: seems like doing this on every iteration is wasteful, and we should just do it once and then update the matrix
covariance_matrix_labels = {
param: (i, i + 1, unit)
for i, (param, unit) in enumerate(zip(params, units))
}
# covariance matrix is 2D and symmetric
covariance_matrix_labels = [covariance_matrix_labels] * 2
self.parameter_covariance_matrix_labels = covariance_matrix_labels
residuals = self.resids.time_resids.to(u.s).value
# get any noise design matrices and weight vectors
if not self.full_cov:
Mn = self.model.noise_model_designmatrix(self.fitter.toas)
phi = self.model.noise_model_basis_weight(self.fitter.toas)
phiinv = np.zeros(M.shape[1])
if Mn is not None and phi is not None:
phiinv = np.concatenate((phiinv, 1 / phi))
M = np.hstack((M, Mn))
# normalize the design matrix
M, norm = normalize_designmatrix(M, params)
self.M = M
self.fac = norm
# compute covariance matrices
if self.full_cov:
cov = self.model.toa_covariance_matrix(self.fitter.toas)
cf = scipy.linalg.cho_factor(cov)
cm = scipy.linalg.cho_solve(cf, M)
mtcm = np.dot(M.T, cm)
mtcy = np.dot(cm.T, residuals)
else:
phiinv /= norm**2
# Why are we scaling residuals by the *square* of the uncertainty?
Nvec = (
self.model.scaled_toa_uncertainty(self.fitter.toas).to(u.s).value ** 2
)
cinv = 1 / Nvec
mtcm = np.dot(M.T, cinv[:, None] * M)
mtcm += np.diag(phiinv)
mtcy = np.dot(M.T, cinv * residuals)
log.trace(f"mtcm: {mtcm}")
U, s, Vt = scipy.linalg.svd(mtcm, full_matrices=False)
log.trace(f"s: {s}")
bad = np.where(s <= self.threshold * s[0])[0]
s[bad] = np.inf
for c in bad:
bad_col = Vt[c, :]
bad_col /= abs(bad_col).max()
bad_combination = " ".join(
[
f"{p}"
for (co, p) in sorted(zip(bad_col, params))
if abs(co) > self.threshold
]
)
warn(
f"Parameter degeneracy; the following combination of parameters yields "
f"almost no change: {bad_combination}",
DegeneracyWarning,
)
self.norm = norm
self.s, self.Vt = s, Vt
xhat = np.dot(Vt.T, np.dot(U.T, mtcy) / s)
log.trace(f"norm: {norm}")
log.trace(f"xhat: {xhat}")
self.xhat = xhat
# newres = residuals - np.dot(M, xhat)
# compute absolute estimates, normalized errors, covariance matrix
return xhat / norm
[docs] def take_step(self, step, lambda_=1):
return GLSState(
self.fitter,
self.take_step_model(step, lambda_),
threshold=self.threshold,
full_cov=self.full_cov,
)
@cached_property
def parameter_covariance_matrix(self):
# make sure we compute the SVD
self.step
xvar = np.dot(self.Vt.T / self.s, self.Vt)
return CovarianceMatrix(
(xvar / self.norm).T / self.norm, self.parameter_covariance_matrix_labels
)
[docs]class DownhillGLSFitter(DownhillFitter):
"""Fitter that uses the shortening-step procedure for GLS fits.
Most of the machinery here is in :class:`pint.fitter.GLSState`
or :class:`pint.fitter.DownhillFitter`.
"""
# FIXME: do something clever to efficiently compute chi-squared
def __init__(self, toas, model, track_mode=None, residuals=None):
if not model.has_correlated_errors:
log.info(
"Model does not appear to have correlated errors so the GLS fitter "
"is unnecessary; DownhillWLSFitter may be faster and more stable."
)
super().__init__(
toas=toas, model=model, residuals=residuals, track_mode=track_mode
)
self.method = "downhill_gls"
self.full_cov = False
self.threshold = 0
def create_state(self):
return GLSState(
self, self.model, full_cov=self.full_cov, threshold=self.threshold
)
[docs] def fit_toas(self, maxiter=10, threshold=0, full_cov=False, debug=False, **kwargs):
"""Fit TOAs.
This is mostly implemented in
:func:`pint.fitter.DownhillFitter.fit_toas`.
Parameters
==========
maxiter : int
Abandon hope if convergence hasn't occurred after this many steps (successful or not).
threshold : float
Discard singular values less than this times the largest; this makes the linear algebra
a little more stable, but the Levenberg-Marquardt algorithm is supposed to do that anyway.
full_cov : bool
If True, use the full TOA covariance matrix, which can be huge; if False, use the
rank-reduced approach (for which Levenberg-Marquardt may not make sense).
kwargs : dict
Any additional arguments are passed down to
:func:`pint.fitter.DownhillFitter.fit_toas`
"""
self.threshold = threshold
self.full_cov = full_cov
r = super().fit_toas(maxiter=maxiter, debug=debug, **kwargs)
# FIXME: set up noise residuals et cetera
# Compute the noise realizations if possible
if not self.full_cov:
noise_dims = self.model.noise_model_dimensions(self.toas)
noise_resids = {}
ntmpar = len(self.model.free_params)
for comp in noise_dims:
# The first column of designmatrix is "offset", add 1 to match
# the indices of noise designmatrix
p0 = noise_dims[comp][0] + ntmpar + 1
p1 = p0 + noise_dims[comp][1]
noise_resids[comp] = (
np.dot(
self.current_state.M[:, p0:p1], self.current_state.xhat[p0:p1]
)
* u.s
)
if debug:
setattr(
self.resids,
f"{comp}_M",
(
self.current_state.M[:, p0:p1],
self.current_state.xhat[p0:p1],
),
)
setattr(self.resids, f"{comp}_M_index", (p0, p1))
self.resids.noise_resids = noise_resids
if debug:
setattr(self.resids, "norm", self.current_state.norm)
return r
[docs]class WidebandState(ModelState):
def __init__(self, fitter, model, full_cov=False, threshold=None):
super().__init__(fitter, model)
self.threshold = threshold
self.full_cov = full_cov
self.add_args = {} # for adding arguments to residual creation
@cached_property
def M_params_units_norm(self):
# Define the linear system
d_matrix = combine_design_matrices_by_quantity(
[
DesignMatrixMaker("toa", u.s)(
self.fitter.toas, self.model, self.model.free_params, offset=True
),
DesignMatrixMaker("dm", u.pc / u.cm**3)(
self.fitter.toas, self.model, self.model.free_params, offset=True
),
]
)
M, params, units = (
d_matrix.matrix,
d_matrix.derivative_params,
d_matrix.param_units,
)
# get any noise design matrices and weight vectors
if not self.full_cov:
# We assume the fit date type is toa
Mn = DesignMatrixMaker("toa_noise", u.s)(self.fitter.toas, self.model)
phi = self.model.noise_model_basis_weight(self.fitter.toas)
phiinv = np.zeros(M.shape[1])
if Mn is not None and phi is not None:
phiinv = np.concatenate((phiinv, 1 / phi))
new_d_matrix = combine_design_matrices_by_param(d_matrix, Mn)
M, params, units = (
new_d_matrix.matrix,
new_d_matrix.derivative_params,
new_d_matrix.param_units,
)
# normalize the design matrix
norm = np.sqrt(np.sum(M**2, axis=0))
# The fixed offset is an unlisted parameter
ntmpar = len(self.model.free_params) + 1
if M.shape[1] > ntmpar:
norm[ntmpar:] = 1
for c in np.where(norm == 0)[0]:
warn(
f"Parameter degeneracy; the following parameter yields "
f"almost no change: {params[c]}",
DegeneracyWarning,
)
norm[norm == 0] = 1
M /= norm
if not self.full_cov:
phiinv /= norm**2
self.phiinv = phiinv
self.fac = norm
return M, params, units, norm
@cached_property
def M(self):
return self.M_params_units_norm[0]
@cached_property
def params(self):
return self.M_params_units_norm[1]
@cached_property
def units(self):
return self.M_params_units_norm[2]
@cached_property
def norm(self):
return self.M_params_units_norm[3]
@cached_property
def mtcm_mtcy_mtcmplain(self):
# FIXME: ensure that TOAs are before DM
residuals = np.hstack(
(self.resids.toa.time_resids.to_value(u.s), self.resids.dm.resids_value)
)
# compute covariance matrices
if self.full_cov:
cov = combine_covariance_matrix(
[
CovarianceMatrixMaker("toa", u.s)(self.fitter.toas, self.model),
CovarianceMatrixMaker("dm", u.pc / u.cm**3)(
self.fitter.toas, self.model
),
]
).matrix
cf = scipy.linalg.cho_factor(cov)
cm = scipy.linalg.cho_solve(cf, self.M)
mtcm = np.dot(self.M.T, cm)
mtcy = np.dot(cm.T, residuals)
mtcmplain = mtcm
else:
Nvec = (
np.hstack(
[
self.model.scaled_toa_uncertainty(self.fitter.toas).to_value(
u.s
),
self.model.scaled_dm_uncertainty(self.fitter.toas).to_value(
u.pc / u.cm**3
),
]
)
** 2
)
cinv = 1 / Nvec
mtcm = np.dot(self.M.T, cinv[:, None] * self.M)
mtcmplain = mtcm
mtcm += np.diag(self.phiinv)
mtcy = np.dot(self.M.T, cinv * residuals)
return mtcm, mtcy, mtcmplain
@cached_property
def mtcm(self):
return self.mtcm_mtcy_mtcmplain[0]
@cached_property
def mtcy(self):
return self.mtcm_mtcy_mtcmplain[1]
@cached_property
def mtcmplain(self):
return self.mtcm_mtcy_mtcmplain[2]
@cached_property
def U_s_Vt_xhat(self):
U, s, Vt = scipy.linalg.svd(self.mtcm, full_matrices=False)
bad = np.where(s <= self.threshold * s[0])[0]
s[bad] = np.inf
for c in bad:
bad_col = Vt[c, :]
bad_col /= abs(bad_col).max()
bad_combination = " ".join(
[
f"{co}*{p}"
for (co, p) in reversed(sorted(zip(bad_col, self.params)))
if abs(co) > self.threshold
]
)
warn(
f"Parameter degeneracy; the following combination of parameters yields "
f"almost no change: {bad_combination}",
DegeneracyWarning,
)
xhat = np.dot(Vt.T, np.dot(U.T, self.mtcy) / s)
return U, s, Vt, xhat
@cached_property
def U(self):
return self.U_s_Vt_xhat[0]
@cached_property
def s(self):
return self.U_s_Vt_xhat[1]
@cached_property
def Vt(self):
return self.U_s_Vt_xhat[2]
@cached_property
def xhat(self):
return self.U_s_Vt_xhat[3]
@cached_property
def step(self):
# compute absolute estimates, normalized errors, covariance matrix
return self.xhat / self.norm
[docs] def take_step(self, step, lambda_=1):
return WidebandState(
self.fitter, self.take_step_model(step, lambda_), threshold=self.threshold
)
@cached_property
def parameter_covariance_matrix(self):
# make sure we compute the SVD
xvar = np.dot(self.Vt.T / self.s, self.Vt)
# is this the best place to do this?
covariance_matrix_labels = {
param: (i, i + 1, unit)
for i, (param, unit) in enumerate(zip(self.params, self.units))
}
# covariance matrix is 2D and symmetric
covariance_matrix_labels = [covariance_matrix_labels] * 2
return CovarianceMatrix(
(xvar / self.norm).T / self.norm, covariance_matrix_labels
)
[docs]class WidebandDownhillFitter(DownhillFitter):
"""Fitter that uses the shortening-step procedure for wideband GLS fits.
Most of the machinery here is in :class:`pint.fitter.WidebandState`
or :class:`pint.fitter.DownhillFitter`.
"""
# FIXME: do something clever to efficiently compute chi-squared
def __init__(self, toas, model, track_mode=None, residuals=None, add_args=None):
self.method = "downhill_wideband"
self.full_cov = False
self.threshold = 0
self.add_args = {} if add_args is None else add_args
super().__init__(
toas=toas, model=model, residuals=residuals, track_mode=track_mode
)
self.is_wideband = True
def make_resids(self, model):
return WidebandTOAResiduals(
self.toas,
model,
toa_resid_args=self.add_args.get("toa", {}),
dm_resid_args=self.add_args.get("dm", {}),
)
def create_state(self):
return WidebandState(
self, self.model, full_cov=self.full_cov, threshold=self.threshold
)
[docs] def fit_toas(
self, maxiter=10, threshold=1e-14, full_cov=False, debug=False, **kwargs
):
"""Fit TOAs.
This is mostly implemented in
:func:`pint.fitter.DownhillFitter.fit_toas`.
Parameters
==========
maxiter : int
Abandon hope if convergence hasn't occurred after this many steps (successful or not).
threshold : float
Discard singular values less than this times the largest; this makes the linear algebra
a little more stable, but the Levenberg-Marquardt algorithm is supposed to do that anyway.
full_cov : bool
If True, use the full TOA covariance matrix, which can be huge; if False, use the
rank-reduced approach (for which Levenberg-Marquardt may not make sense).
kwargs : dict
Any additional arguments are passed down to
:func:`pint.fitter.DownhillFitter.fit_toas`
"""
self.threshold = threshold
self.full_cov = full_cov
# FIXME: set up noise residuals et cetera
r = super().fit_toas(maxiter=maxiter, debug=debug, **kwargs)
# Compute the noise realizations if possibl
if not self.full_cov:
noise_dims = self.model.noise_model_dimensions(self.toas)
noise_resids = {}
ntmpar = len(self.model.free_params)
for comp in noise_dims:
# The first column of designmatrix is "offset", add 1 to match
# the indices of noise designmatrix
p0 = noise_dims[comp][0] + ntmpar + 1
p1 = p0 + noise_dims[comp][1]
noise_resids[comp] = (
np.dot(
self.current_state.M[:, p0:p1], self.current_state.xhat[p0:p1]
)
* u.s
)
if debug:
setattr(
self.resids,
f"{comp}_M",
(
self.current_state.M[:, p0:p1],
self.current_state.xhat[p0:p1],
),
)
setattr(self.resids, f"{comp}_M_index", (p0, p1))
self.resids.noise_resids = noise_resids
if debug:
setattr(self.resids, "norm", self.current_state.norm)
return r
[docs]class PowellFitter(Fitter):
"""A fitter that demonstrates how to work with a generic fitting function.
This class wraps ``scipy.optimize.minimize`` with ``method="Powell"``; this
uses Powell's method, a simplex optimizer that makes no use of derivatives
(and is thus rather slow by comparison).
It is unlikely that this will be useful for timing an actual pulsar, but it
may serve as an example of how to write your own fitting procedure.
"""
def __init__(self, toas, model, track_mode=None, residuals=None):
super().__init__(toas, model, residuals=residuals, track_mode=track_mode)
self.method = "Powell"
[docs] def fit_toas(self, maxiter=20, debug=False):
"""Carry out the fitting procedure."""
# check that params of timing model have necessary components
self.model.validate()
self.model.validate_toas(self.toas)
# Initial guesses are model params
fitp = self.model.get_params_dict("free", "num")
self.fitresult = opt.minimize(
self.minimize_func,
list(fitp.values()),
args=tuple(fitp.keys()),
options={"maxiter": maxiter},
method=self.method,
)
# Update model and resids, as the last iteration of minimize is not
# necessarily the one that yields the best fit
self.minimize_func(np.atleast_1d(self.fitresult.x), *list(fitp.keys()))
self.update_model(self.resids.chi2)
return self.resids.chi2
[docs]class WLSFitter(Fitter):
"""Weighted Least Squares fitter.
This is the primary fitting algorithm in TEMPO and TEMPO2 as well as PINT.
It requires derivatives and it cannot handle correlated errors (ECORR, for
example).
This fitter implements a rudimentary form of the Gauss-Newton algorithm: it
computes an initial set of residuals from the fit, it computes the design
matrix, and it treats the problem as a weighted linear least squares
problem, jumping immediately to the assumed solution. If the initial guess
is sufficiently close that the objective function is well-approximated by
its derivatives, this lands very close to the correct answer. This Fitter
can be told to repeat the process a number of times. The
Levenburg-Marquardt algorithm and its descendants (trust region algorithms)
generalize this process to handle situations where the initial guess is not
close enough that the derivatives are a good approximation.
"""
def __init__(self, toas, model, track_mode=None, residuals=None):
super().__init__(
toas=toas, model=model, residuals=residuals, track_mode=track_mode
)
self.method = "weighted_least_square"
[docs] def fit_toas(self, maxiter=1, threshold=None, debug=False):
"""Run a linear weighted least-squared fitting method.
Parameters
----------
maxiter: int
Repeat the least-squares fitting up to this many times if necessary.
threshold : float or None
Discard singular values smaller than ``threshold`` times the largest
singular value. If None, use a value based on floating-point epsilon
and the matrix sizes.
"""
# check that params of timing model have necessary components
self.model.validate()
self.model.validate_toas(self.toas)
chi2 = 0
for _ in range(maxiter):
fitp = self.model.get_params_dict("free", "quantity")
fitpv = self.model.get_params_dict("free", "num")
fitperrs = self.model.get_params_dict("free", "uncertainty")
# Define the linear system
M, params, units = self.get_designmatrix()
# Get residuals and TOA uncertainties in seconds
self.update_resids()
residuals = self.resids.time_resids.to(u.s).value
Nvec = self.model.scaled_toa_uncertainty(self.toas).to(u.s).value
# "Whiten" design matrix and residuals by dividing by uncertainties
M = M / Nvec.reshape((-1, 1))
residuals = residuals / Nvec
# For each column in design matrix except for col 0 (const. pulse
# phase), subtract the mean value, and scale by the column RMS.
# This helps avoid numerical problems later. The scaling factors need
# to be saved to recover correct parameter units.
# NOTE, We remove subtract mean value here, since it did not give us a
# fast converge fitting.
# M[:,1:] -= M[:,1:].mean(axis=0)
M, fac = normalize_designmatrix(M, params)
# Singular value decomp of design matrix:
# M = U s V^T
# Dimensions:
# M, U are Ntoa x Nparam
# s is Nparam x Nparam diagonal matrix encoded as 1-D vector
# V^T is Nparam x Nparam
U, s, Vt = scipy.linalg.svd(M, full_matrices=False)
# Note, here we could do various checks like report
# matrix condition number or zero out low singular values.
# print 'log_10 cond=', np.log10(s.max()/s.min())
# Note, Check the threshold from data precision level.Borrowed from
# np Curve fit.
if threshold is None:
# M is float, not longdouble
# threshold = np.finfo(float).eps * max(M.shape)
threshold = 1e-14 * max(M.shape)
bad = np.where(s <= threshold * s[0])[0]
s[bad] = np.inf
for c in bad:
bad_col = Vt[c, :]
bad_col /= abs(bad_col).max()
bad_combination = " + ".join(
[
f"{co}*{p}"
for (co, p) in reversed(sorted(zip(bad_col, params)))
if abs(co) > threshold
]
)
warn(
f"Parameter degeneracy; the following linear combination yields "
f"almost no change: {bad_combination}",
DegeneracyWarning,
)
# Sigma = np.dot(Vt.T / s, U.T)
# The post-fit parameter covariance matrix
# Sigma = V s^-2 V^T
Sigma = np.dot(Vt.T / (s**2), Vt)
# Parameter uncertainties. Scale by fac recovers original units.
errs = np.sqrt(np.diag(Sigma)) / fac
# covariance matrix stuff (for randomized models in pintk)
sigma_var = (Sigma / fac).T / fac
errors = np.sqrt(np.diag(sigma_var))
sigma_cov = (sigma_var / errors).T / errors
# covariance matrix = variances in diagonal, used for gaussian random models
covariance_matrix = sigma_var
covariance_matrix_labels = {
param: (i, i + 1, unit)
for i, (param, unit) in enumerate(zip(params, units))
}
# covariance matrix is 2D and symmetric
covariance_matrix_labels = [
covariance_matrix_labels
] * covariance_matrix.ndim
self.parameter_covariance_matrix = CovarianceMatrix(
covariance_matrix, covariance_matrix_labels
)
# correlation matrix = 1s in diagonal, use for comparison to tempo/tempo2 cov matrix
self.parameter_correlation_matrix = CorrelationMatrix(
sigma_cov, covariance_matrix_labels
)
self.fac = fac
self.errors = errors
# The delta-parameter values
# dpars = V s^-1 U^T r
# Scaling by fac recovers original units
dpars = np.dot(Vt.T, np.dot(U.T, residuals) / s) / fac
for pn in fitp.keys():
uind = params.index(pn) # Index of designmatrix
un = 1.0 / (units[uind]) # Unit in designmatrix
un *= u.s
pv, dpv = fitpv[pn] * fitp[pn].units, dpars[uind] * un
fitpv[pn] = np.longdouble((pv + dpv) / fitp[pn].units)
# NOTE We need some way to use the parameter limits.
fitperrs[pn] = errs[uind]
chi2 = self.minimize_func(list(fitpv.values()), *list(fitp.keys()))
# Update Uncertainties
self.set_param_uncertainties(fitperrs)
self.update_model(chi2)
return chi2
[docs]class GLSFitter(Fitter):
"""Generalized least-squares fitting.
This fitter extends the :class:`pint.fitter.WLSFitter` to permit the errors
on the data points to be correlated. The fitting proceeds by decomposing
the data covariance matrix.
"""
def __init__(self, toas=None, model=None, track_mode=None, residuals=None):
super().__init__(
toas=toas, model=model, residuals=residuals, track_mode=track_mode
)
self.method = "generalized_least_square"
[docs] def fit_toas(self, maxiter=1, threshold=0, full_cov=False, debug=False):
"""Run a generalized least-squares fitting method.
A first attempt is made to solve the fitting problem by Cholesky
decomposition, but if this fails singular value decomposition is
used instead. In this case singular values below threshold are removed.
Parameters
----------
maxiter: int
How many times to run the linear least-squares fit, re-evaluating
the derivatives at each step.
If maxiter is less than one, no fitting is done, just the
chi-squared computation. In this case, you must provide the residuals
argument when constructing the class.
If maxiter is one or more, so fitting is actually done, the
chi-squared value returned is only approximately the chi-squared
of the improved(?) model. In fact it is the chi-squared of the
solution to the linear fitting problem, and the full non-linear
model should be evaluated and new residuals produced if an accurate
chi-squared is desired.
threshold: float
When to start discarding singular values. Typical values are about
1e-14 - a singular value smaller than this indicates parameters that
are so degenerate the numerical precision cannot distinguish them.
Such highly degenerate parameter sets are reported to the user with
a DegeneracyWarning. Negative values force a faster but less stable
Cholesky decomposition method.
full_cov: bool
full_cov determines which calculation is used. If true, the full
covariance matrix is constructed and the calculation is relatively
straightforward but the full covariance matrix may be enormous.
If false, an algorithm is used that takes advantage of the structure
of the covariance matrix, based on information provided by the noise
model. The two algorithms should give the same result to numerical
accuracy where they both can be applied.
"""
# check that params of timing model have necessary components
self.model.validate()
self.model.validate_toas(self.toas)
chi2 = 0
for i in range(maxiter):
fitp = self.model.get_params_dict("free", "quantity")
fitpv = self.model.get_params_dict("free", "num")
fitperrs = self.model.get_params_dict("free", "uncertainty")
# Define the linear system
# normalize the design matrix
M, params, units = self.get_designmatrix()
# M /= norm
ntmpar = len(fitp)
# Get residuals and TOA uncertainties in seconds
if i == 0:
# Why is this here?
self.update_resids()
residuals = self.resids.time_resids.to(u.s).value
# get any noise design matrices and weight vectors
if not full_cov:
Mn = self.model.noise_model_designmatrix(self.toas)
phi = self.model.noise_model_basis_weight(self.toas)
phiinv = np.zeros(M.shape[1])
if Mn is not None and phi is not None:
phiinv = np.concatenate((phiinv, 1 / phi))
M = np.hstack((M, Mn))
ntmpar = len(fitp)
# normalize the design matrix
M, norm = normalize_designmatrix(M, params)
self.fac = norm
# compute covariance matrices
if full_cov:
cov = self.model.toa_covariance_matrix(self.toas)
cf = scipy.linalg.cho_factor(cov)
cm = scipy.linalg.cho_solve(cf, M)
mtcm = np.dot(M.T, cm)
mtcy = np.dot(cm.T, residuals)
else:
phiinv /= norm**2
Nvec = self.model.scaled_toa_uncertainty(self.toas).to(u.s).value ** 2
cinv = 1 / Nvec
mtcm = np.dot(M.T, cinv[:, None] * M)
mtcm += np.diag(phiinv)
mtcy = np.dot(M.T, cinv * residuals)
log.trace(f"mtcm: {mtcm}")
xhat, xvar = None, None
if threshold <= 0:
try:
c = scipy.linalg.cho_factor(mtcm)
xhat = scipy.linalg.cho_solve(c, mtcy)
xvar = scipy.linalg.cho_solve(c, np.eye(len(mtcy)))
except scipy.linalg.LinAlgError:
xhat, xvar = None, None
if xhat is None:
U, s, Vt = scipy.linalg.svd(mtcm, full_matrices=False)
log.trace(f"s: {s}")
bad = np.where(s <= threshold * s[0])[0]
s[bad] = np.inf
for c in bad:
bad_col = Vt[c, :]
bad_col /= abs(bad_col).max()
bad_combination = " ".join(
[
f"{co}*{p}"
for (co, p) in reversed(sorted(zip(bad_col, params)))
if abs(co) > threshold
]
)
warn(
f"Parameter degeneracy; the following combination of parameters yields "
f"almost no change: {bad_combination}",
DegeneracyWarning,
)
xvar = np.dot(Vt.T / s, Vt)
xhat = np.dot(Vt.T, np.dot(U.T, mtcy) / s)
log.trace(f"norm: {norm}")
log.trace(f"xhat: {xhat}")
newres = residuals - np.dot(M, xhat)
# compute linearized chisq
# if full_cov:
# chi2 = np.dot(newres, scipy.linalg.cho_solve(cf, newres))
# else:
# chi2 = np.dot(newres, cinv * newres) + np.dot(xhat, phiinv * xhat)
# compute absolute estimates, normalized errors, covariance matrix
dpars = xhat / norm
errs = np.sqrt(np.diag(xvar)) / norm
covmat = (xvar / norm).T / norm
covariance_matrix_labels = {
param: (i, i + 1, unit)
for i, (param, unit) in enumerate(zip(params, units))
}
# covariance matrix is 2D and symmetric
covariance_matrix_labels = [covariance_matrix_labels] * covmat.ndim
self.parameter_covariance_matrix = CovarianceMatrix(
covmat, covariance_matrix_labels
)
self.parameter_correlation_matrix = CorrelationMatrix(
(covmat / errs).T / errs, covariance_matrix_labels
)
for pn in fitp.keys():
uind = params.index(pn) # Index of designmatrix
un = 1.0 / (units[uind]) # Unit in designmatrix
un *= u.s
pv, dpv = fitpv[pn] * fitp[pn].units, dpars[uind] * un
fitpv[pn] = np.longdouble((pv + dpv) / fitp[pn].units)
# NOTE We need some way to use the parameter limits.
fitperrs[pn] = errs[uind]
newparams = dict(zip(list(fitp.keys()), list(fitpv.values())))
self.set_params(newparams)
self.update_resids()
# self.minimize_func(list(fitpv.values()), *list(fitp.keys()))
# Update Uncertainties
self.set_param_uncertainties(fitperrs)
# Compute the noise realizations if possible
if not full_cov:
noise_dims = self.model.noise_model_dimensions(self.toas)
noise_resids = {}
for comp in noise_dims:
# The first column of designmatrix is "offset", add 1 to match
# the indices of noise designmatrix
p0 = noise_dims[comp][0] + ntmpar + 1
p1 = p0 + noise_dims[comp][1]
noise_resids[comp] = np.dot(M[:, p0:p1], xhat[p0:p1]) * u.s
if debug:
setattr(self.resids, f"{comp}_M", (M[:, p0:p1], xhat[p0:p1]))
setattr(self.resids, f"{comp}_M_index", (p0, p1))
self.resids.noise_resids = noise_resids
if debug:
setattr(self.resids, "norm", norm)
chi2 = self.resids.calc_chi2()
self.update_model(chi2)
return chi2
[docs]class WidebandTOAFitter(Fitter): # Is GLSFitter the best here?
"""A class to for fitting TOAs and other independent measured data.
Currently this fitter is only capable of fitting sets of TOAs in which every
TOA has a DM measurement, and it should be constructed as
``WidebandTOAFitter(toas, model)``.
Parameters
----------
fit_data: data object or a tuple of data objects.
The data to fit for. Generally this should be a single TOAs object containing
DM information for every TOA. If more than one data
objects are provided, the size of ``fit_data`` has to match the
``fit_data_names``. In this fitter, the first fit data should be a TOAs object.
model: a pint timing model instance
The initial timing model for fitting.
fit_data_names: list of str
The names of the data fit for.
additional_args: dict, optional
The additional arguments for making residuals.
"""
def __init__(
self,
fit_data,
model,
fit_data_names=["toa", "dm"],
track_mode=None,
additional_args={},
):
self.model_init = model
# Check input data and data_type
self.fit_data_names = fit_data_names
# convert the non tuple input to a tuple
if not isinstance(fit_data, (tuple, list)):
fit_data = [fit_data]
if not isinstance(fit_data[0], TOAs):
raise ValueError(
f"The first data set should be a TOAs object but is {fit_data[0]}."
)
if len(fit_data_names) == 0:
raise ValueError("Please specify the fit data.")
if len(fit_data) > 1 and len(fit_data_names) != len(fit_data):
raise ValueError(
"If one more data sets are provided, the fit "
"data have to match the fit data names."
)
self.fit_data = fit_data
if track_mode is not None:
if "toa" not in additional_args:
additional_args["toa"] = {}
additional_args["toa"]["track_mode"] = track_mode
self.additional_args = additional_args
# Get the makers for fitting parts.
self.reset_model()
self.resids_init = copy.deepcopy(self.resids)
self.designmatrix_makers = [
DesignMatrixMaker(data_resids.residual_type, data_resids.unit)
for data_resids in self.resids.residual_objs.values()
]
# Add noise design matrix maker
self.noise_designmatrix_maker = DesignMatrixMaker("toa_noise", u.s)
#
self.covariancematrix_makers = [
CovarianceMatrixMaker(data_resids.residual_type, data_resids.unit)
for data_resids in self.resids.residual_objs.values()
]
self.is_wideband = True
self.method = "General_Data_Fitter"
@property
def toas(self):
return self.fit_data[0]
[docs] def make_combined_residuals(self, add_args={}, model=None):
"""Make the combined residuals between TOA residual and DM residual."""
return WidebandTOAResiduals(
self.toas,
self.model if model is None else model,
toa_resid_args=add_args.get("toa", {}),
dm_resid_args=add_args.get("dm", {}),
)
[docs] def reset_model(self):
"""Reset the current model to the initial model."""
self.model = copy.deepcopy(self.model_init)
self.update_resids()
self.fitresult = []
[docs] def make_resids(self, model):
"""Update the residuals. Run after updating a model parameter."""
return self.make_combined_residuals(add_args=self.additional_args, model=model)
[docs] def get_designmatrix(self):
design_matrixs = []
fit_params = self.model.free_params
if len(self.fit_data) == 1:
design_matrixs.extend(
dmatrix_maker(self.fit_data[0], self.model, fit_params, offset=True)
for dmatrix_maker in self.designmatrix_makers
)
else:
design_matrixs.extend(
dmatrix_maker(self.fit_data[ii], self.model, fit_params, offset=True)
for ii, dmatrix_maker in enumerate(self.designmatrix_makers)
)
return combine_design_matrices_by_quantity(design_matrixs)
def get_noise_covariancematrix(self):
# TODO This needs to be more general
cov_matrixs = []
if len(self.fit_data) == 1:
cov_matrixs.extend(
cmatrix_maker(self.fit_data[0], self.model)
for cmatrix_maker in self.covariancematrix_makers
)
else:
cov_matrixs.extend(
cmatrix_maker(self.fit_data[ii], self.model)
for ii, cmatrix_maker in enumerate(self.covariancematrix_makers)
)
return combine_covariance_matrix(cov_matrixs)
[docs] def get_data_uncertainty(self, data_name, data_obj):
"""Get the data uncertainty from the data object.
Note
----
TODO, make this more general.
"""
func_map = {"toa": "get_errors", "dm": "get_dm_errors"}
error_func_name = func_map[data_name]
if hasattr(data_obj, error_func_name):
return getattr(data_obj, error_func_name)()
else:
raise ValueError("No method to access data error is provided.")
[docs] def scaled_all_sigma(self):
"""Scale all data's uncertainty.
If the function of scaled_`data`_sigma is not given, it will just
return the original data uncertainty.
"""
scaled_sigmas = []
sigma_units = []
for ii, fd_name in enumerate(self.fit_data_names):
func_name = f"scaled_{fd_name}_uncertainty"
sigma_units.append(self.resids.residual_objs[fd_name].unit)
if hasattr(self.model, func_name):
scale_func = getattr(self.model, func_name)
if len(self.fit_data) == 1:
scaled_sigmas.append(scale_func(self.fit_data[0]))
else:
scaled_sigmas.append(scale_func(self.fit_data[ii]))
else:
if len(self.fit_data) == 1:
original_sigma = self.get_data_uncertainty(
fd_name, self.fit_data[0]
)
else:
original_sigma = self.get_data_uncertainty(
fd_name, self.fit_data[ii]
)
scaled_sigmas.append(original_sigma)
scaled_sigmas_no_unit = []
for ii, scaled_sigma in enumerate(scaled_sigmas):
if hasattr(scaled_sigma, "unit"):
scaled_sigmas_no_unit.append(scaled_sigma.to_value(sigma_units[ii]))
else:
scaled_sigmas_no_unit.append(scaled_sigma)
return np.hstack(scaled_sigmas_no_unit)
[docs] def fit_toas(self, maxiter=1, threshold=0, full_cov=False, debug=False):
"""Carry out a generalized least-squares fitting procedure.
The algorithm here is essentially the same as used in
:func:`pint.fitter.GLSFitter.fit_toas`. See that function
for details.
Parameters
----------
maxiter: int
How many times to run the linear least-squares fit, re-evaluating
the derivatives at each step.
threshold: float
When to start discarding singular values. Default is 1-e14*max(M.shape).
full_cov: bool
full_cov determines which calculation is used.
"""
# Maybe change the name to do_fit?
# check that params of timing model have necessary components
self.model.validate()
self.model.validate_toas(self.toas)
chi2 = 0
for i in range(maxiter):
fitp = self.model.get_params_dict("free", "quantity")
fitpv = self.model.get_params_dict("free", "num")
fitperrs = self.model.get_params_dict("free", "uncertainty")
# Define the linear system
d_matrix = self.get_designmatrix()
M, params, units = (
d_matrix.matrix,
d_matrix.derivative_params,
d_matrix.param_units,
)
# Get residuals and TOA uncertainties in seconds
if i == 0:
self.update_resids()
# Since the residuals may not have the same unit. Thus the residual here
# has no unit.
residuals = self.resids._combined_resids
# get any noise design matrices and weight vectors
if not full_cov:
# We assume the fit date type is toa
Mn = self.noise_designmatrix_maker(self.toas, self.model)
phi = self.model.noise_model_basis_weight(self.toas)
phiinv = np.zeros(M.shape[1])
if Mn is not None and phi is not None:
phiinv = np.concatenate((phiinv, 1 / phi))
new_d_matrix = combine_design_matrices_by_param(d_matrix, Mn)
M, params, units = (
new_d_matrix.matrix,
new_d_matrix.derivative_params,
new_d_matrix.param_units,
)
ntmpar = len(fitp)
# normalize the design matrix
M, norm = normalize_designmatrix(M, params)
self.fac = norm
# compute covariance matrices
if full_cov:
cov = self.get_noise_covariancematrix().matrix
cf = scipy.linalg.cho_factor(cov)
cm = scipy.linalg.cho_solve(cf, M)
mtcm = np.dot(M.T, cm)
mtcy = np.dot(cm.T, residuals)
else:
phiinv /= norm**2
Nvec = self.scaled_all_sigma() ** 2
cinv = 1 / Nvec
mtcm = np.dot(M.T, cinv[:, None] * M)
mtcm += np.diag(phiinv)
mtcy = np.dot(M.T, cinv * residuals)
xhat, xvar = None, None
if threshold <= 0:
try:
c = scipy.linalg.cho_factor(mtcm)
xhat = scipy.linalg.cho_solve(c, mtcy)
xvar = scipy.linalg.cho_solve(c, np.eye(len(mtcy)))
except scipy.linalg.LinAlgError:
xhat, xvar = None, None
if xhat is None:
U, s, Vt = scipy.linalg.svd(mtcm, full_matrices=False)
bad = np.where(s <= threshold * s[0])[0]
s[bad] = np.inf
for c in bad:
bad_col = Vt[c, :]
bad_col /= abs(bad_col).max()
bad_combination = " ".join(
[
f"{co}*{p}"
for (co, p) in reversed(sorted(zip(bad_col, params)))
if abs(co) > threshold
]
)
warn(
f"Parameter degeneracy; the following combination of parameters yields "
f"almost no change: {bad_combination}",
DegeneracyWarning,
)
xvar = np.dot(Vt.T / s, Vt)
xhat = np.dot(Vt.T, np.dot(U.T, mtcy) / s)
newres = residuals - np.dot(M, xhat)
# compute linearized chisq
if full_cov:
chi2 = np.dot(newres, scipy.linalg.cho_solve(cf, newres))
else:
chi2 = np.dot(newres, cinv * newres) + np.dot(xhat, phiinv * xhat)
# compute absolute estimates, normalized errors, covariance matrix
dpars = xhat / norm
errs = np.sqrt(np.diag(xvar)) / norm
covmat = (xvar / norm).T / norm
# TODO: seems like doing this on every iteration is wasteful, and we should just do it once and then update the matrix
covariance_matrix_labels = {
param: (i, i + 1, unit)
for i, (param, unit) in enumerate(zip(params, units))
}
# covariance matrix is 2D and symmetric
covariance_matrix_labels = [covariance_matrix_labels] * covmat.ndim
self.parameter_covariance_matrix = CovarianceMatrix(
covmat, covariance_matrix_labels
)
self.parameter_correlation_matrix = CorrelationMatrix(
(covmat / errs).T / errs, covariance_matrix_labels
)
# self.covariance_matrix = covmat
# self.correlation_matrix = (covmat / errs).T / errs
for pn in fitp.keys():
uind = params.index(pn) # Index of designmatrix
# Here we use design matrix's label, so the unit goes to normal.
# instead of un = 1 / (units[uind])
un = units[uind]
pv, dpv = fitpv[pn] * fitp[pn].units, dpars[uind] * un
fitpv[pn] = np.longdouble((pv + dpv) / fitp[pn].units)
# NOTE We need some way to use the parameter limits.
fitperrs[pn] = errs[uind]
newparams = dict(zip(list(fitp.keys()), list(fitpv.values())))
self.set_params(newparams)
self.update_resids()
# self.minimize_func(list(fitpv.values()), *list(fitp.keys()))
# Update Uncertainties
self.set_param_uncertainties(fitperrs)
# Compute the noise realizations if possible
if not full_cov:
noise_dims = self.model.noise_model_dimensions(self.toas)
noise_resids = {}
for comp in noise_dims:
# The first column of designmatrix is "offset", add 1 to match
# the indices of noise designmatrix
p0 = noise_dims[comp][0] + ntmpar + 1
p1 = p0 + noise_dims[comp][1]
noise_resids[comp] = np.dot(M[:, p0:p1], xhat[p0:p1]) * u.s
if debug:
setattr(self.resids, f"{comp}_M", (M[:, p0:p1], xhat[p0:p1]))
setattr(self.resids, f"{comp}_M_index", (p0, p1))
self.resids.noise_resids = noise_resids
if debug:
setattr(self.resids, "norm", norm)
self.update_model(chi2)
return chi2
[docs]class LMFitter(Fitter):
[docs] def fit_toas(
self,
maxiter=50,
*,
min_chi2_decrease=1e-3,
lambda_factor_decrease=2,
lambda_factor_increase=3,
lambda_factor_invalid=10,
threshold=1e-14,
min_lambda=0.5,
debug=False,
):
current_state = self.create_state()
try:
try:
current_state.chi2
except ValueError as e:
raise ValueError("Initial configuration is invalid") from e
self.converged = False
lambda_ = min_lambda
for i in range(maxiter):
lf = lambda_ if lambda_ > min_lambda else 0
# Attempt: do not scale the phiinv penalty factor by lambda
A = current_state.mtcm + lf * np.diag(np.diag(current_state.mtcmplain))
b = current_state.mtcy
ill_conditioned = False
if threshold is None:
dx = scipy.linalg.solve(A, b, assume_a="pos")
else:
U, s, Vt = scipy.linalg.svd(A, full_matrices=False)
log.trace(
f"Iteration {i}: Condition number for lambda_ = {lambda_} is {s[0]/s[-1]}"
)
bad = np.where(s <= threshold * s[0])[0]
s[bad] = np.inf
for c in bad:
ill_conditioned = True
# FIXME: maybe don't stop while ill-conditioned? Always try increasing lambda?
bad_col = Vt[c, :]
bad_col /= abs(bad_col).max()
bad_combination = " ".join(
[
f"{co}*{p}"
for (co, p) in reversed(
sorted(zip(bad_col, current_state.params))
)
if abs(co) > threshold
]
)
warn(
f"Parameter degeneracy; the following combination of parameters yields "
f"almost no change: {bad_combination}",
DegeneracyWarning,
)
dx = np.dot(Vt.T, np.dot(U.T, b) / s)
step = dx / current_state.norm
# FIXME: catch problems due to non-invertibility?
# FIXME: predicted (linear) chi-squared decrease can check how well the
# derivative matches the function and guide changes in lambda_
# predicted_chi2 = current_state.predicted_chi2(dx)
log.trace(f"Iteration {i}: Trying step with lambda_ = {lambda_}")
new_state = current_state.take_step(step)
try:
chi2_decrease = current_state.chi2 - new_state.chi2
if chi2_decrease < -min_chi2_decrease:
lambda_ *= (
lambda_factor_invalid
if ill_conditioned
else lambda_factor_increase
)
log.trace(
f"Iteration {i}: chi2 increased from {current_state.chi2} "
f"to {new_state.chi2} increasing lambda to {lambda_}"
)
elif chi2_decrease < 0:
log.info(
f"Iteration {i}: chi2 increased but only by {-chi2_decrease}, stopping."
)
self.converged = True
break
elif chi2_decrease < min_chi2_decrease:
log.debug(
f"Iteration {i}: chi2 decreased only by {chi2_decrease}, updating "
f"state and stopping."
)
current_state = new_state
self.converged = True
break
else:
lambda_ = max(lambda_ / lambda_factor_decrease, min_lambda)
log.debug(
f"Iteration {i}: Updating state, chi2 goes down by {chi2_decrease} "
f"from {current_state.chi2} "
f"to {new_state.chi2}; decreasing lambda to "
f"{lambda_ if lambda_ > min_lambda else 0}"
)
current_state = new_state
except InvalidModelParameters as e:
lambda_ *= lambda_factor_invalid
log.debug(
f"Iteration {i}: Step too aggressive, increasing lambda_ "
f"to {lambda_}: {e}"
)
else:
log.warning(
f"Maximum number of iterations ({maxiter}) reached, stopping "
f"without convergence."
)
self.iterations = i
except KeyboardInterrupt:
# could be a finally I suppose? but I'm not sure we want to update if something
# seriou went wrong.
log.info("KeyboardInterrupt detected, updating Fitter")
self.update_from_state(current_state, debug=debug)
raise
self.update_from_state(current_state, debug=debug)
return self.converged
[docs]class WidebandLMFitter(LMFitter):
"""Fitter for wideband data based on Levenberg-Marquardt.
This should carry out a more reliable fitting process than the plain
WidebandTOAFitter, and a more efficient one than WidebandDownhillFitter.
Unfortunately it doesn't.
"""
def __init__(self, toas, model, track_mode=None, residuals=None, add_args=None):
self.method = "downhill_wideband"
self.full_cov = False
self.threshold = 0
self.add_args = {} if add_args is None else add_args
super().__init__(
toas=toas, model=model, residuals=residuals, track_mode=track_mode
)
self.is_wideband = True
def make_resids(self, model):
return WidebandTOAResiduals(
self.toas,
model,
toa_resid_args=self.add_args.get("toa", {}),
dm_resid_args=self.add_args.get("dm", {}),
)
def create_state(self):
return WidebandState(
self, self.model, full_cov=self.full_cov, threshold=self.threshold
)
[docs] def fit_toas(self, maxiter=50, full_cov=False, debug=False, **kwargs):
self.full_cov = full_cov
# FIXME: set up noise residuals et cetera
return super().fit_toas(maxiter=maxiter, debug=debug, **kwargs)
def update_from_state(self, state, debug=False):
# Nicer not to keep this if we have a choice, it introduces reference cycles
self.current_state = state
self.model = state.model
self.resids = state.resids
self.parameter_covariance_matrix = state.parameter_covariance_matrix
self.errors = np.sqrt(np.diag(self.parameter_covariance_matrix.matrix))
for p, e in zip(state.params, self.errors):
try:
log.trace(f"Setting {getattr(self.model, p)} uncertainty to {e}")
pm = getattr(self.model, p)
except AttributeError:
if p != "Offset":
log.warning(f"Unexpected parameter {p}")
else:
pm.uncertainty = e * pm.units
# self.parameter_correlation_matrix = (
# self.parameter_covariance_matrix / self.errors
# ).T / self.errors
self.parameter_correlation_matrix = CorrelationMatrix(
(self.parameter_covariance_matrix.matrix / self.errors).T / self.errors,
self.parameter_covariance_matrix.axis_labels,
)
self.update_model(state.chi2)
# Compute the noise realizations if possible
if not self.full_cov:
noise_dims = self.model.noise_model_dimensions(self.toas)
noise_resids = {}
ntmpar = len(self.model.free_params)
for comp in noise_dims:
# The first column of designmatrix is "offset", add 1 to match
# the indices of noise designmatrix
p0 = noise_dims[comp][0] + ntmpar + 1
p1 = p0 + noise_dims[comp][1]
noise_resids[comp] = np.dot(state.M[:, p0:p1], state.xhat[p0:p1]) * u.s
if debug:
setattr(
self.resids, f"{comp}_M", (state.M[:, p0:p1], state.xhat[p0:p1])
)
setattr(self.resids, f"{comp}_M_index", (p0, p1))
self.resids.noise_resids = noise_resids
if debug:
setattr(self.resids, "norm", state.norm)