"""A wrapper around pulsar functions for `pintk` to use.
This object will be shared between widgets in the main frame
and will contain the pre/post fit model, toas,
pre/post fit residuals, and other useful information.
self.selected_toas = selected toas, self.all_toas = all toas in tim file
"""
import copy
import astropy.units as u
import numpy as np
import pint.fitter
import pint.models
from pint.pulsar_mjd import Time
from pint.simulation import (
make_fake_toas_uniform,
calculate_random_models,
zero_residuals,
)
from pint.residuals import Residuals
from pint.toa import get_TOAs, merge_TOAs
from pint.utils import FTest, akaike_information_criterion
import pint.logging
from loguru import logger as log
plot_labels = [
"pre-fit",
"post-fit",
"mjd",
"year",
"orbital phase",
"serial",
"day of year",
"frequency",
"TOA error",
"rounded MJD",
"model DM",
"WB DM",
"WB DM res",
"WB DM err",
"elongation",
]
# Some parameters we do not want to add a fitting checkbox for:
nofitboxpars = [
"PSR",
"START",
"FINISH",
"POSEPOCH",
"PEPOCH",
"DMEPOCH",
"EPHVER",
"TZRMJD",
"TZRFRQ",
"TRES",
"PLANET_SHAPIRO",
]
[docs]class Pulsar:
"""Wrapper class for a pulsar.
Contains the toas, model, residuals, and fitter
"""
def __init__(self, parfile=None, timfile=None, ephem=None, fitter="GLSFitter"):
super().__init__()
log.info(f"Loading pulsar parfile: {str(parfile)}")
if parfile is None or timfile is None:
raise ValueError("No valid pulsar model and/or TOAs to load")
self.parfile = parfile
self.timfile = timfile
self.prefit_model = pint.models.get_model(
self.parfile,
allow_tcb=True,
allow_T2=True,
)
if ephem is not None:
log.info(
f"Overriding model ephemeris {self.prefit_model.EPHEM.value} with {ephem}"
)
self.prefit_model.EPHEM.value = ephem
self.all_toas = get_TOAs(
self.timfile,
model=self.prefit_model,
usepickle=True,
)
self.all_toas.table.sort("index")
self.all_toas.get_clusters(add_column=True)
# Make sure that if we used a model, that any phase jumps from
# the parfile have their flags updated in the TOA table
if "PhaseJump" in self.prefit_model.components:
self.prefit_model.jump_params_to_flags(self.all_toas)
# turns pre-existing jump flags in toas.table['flags'] into parameters in parfile
self.prefit_model.jump_flags_to_params(self.all_toas)
self.selected_toas = copy.deepcopy(self.all_toas)
print("The prefit model as a parfile:")
print(self.prefit_model.as_parfile())
# adds extra prefix params for fitting
self.add_model_params()
self.all_toas.print_summary()
self.use_pulse_numbers = False
self.fitted = False
self.update_resids()
print(
"RMS pre-fit PINT residuals are %.3f us\n"
% self.prefit_resids.rms_weighted().to(u.us).value
)
# Set of indices from original list that are deleted
self.deleted = set([])
if fitter == "notdownhill":
self.fit_method = self.getDefaultFitter(downhill=False)
log.info(
f"Since wideband={self.all_toas.wideband} and correlated={self.prefit_model.has_correlated_errors}, selecting fitter={self.fit_method}"
)
elif fitter == "downhill":
self.fit_method = self.getDefaultFitter(downhill=True)
log.info(
f"Since wideband={self.all_toas.wideband} and correlated={self.prefit_model.has_correlated_errors}, selecting Downhill fitter={self.fit_method}"
)
else:
self.fit_method = fitter
self.fitter = None
self.stashed = None # for temporarily stashing some TOAs
self.faketoas1 = None # for random models
self.faketoas = None # for random models
@property
def name(self):
return getattr(self.prefit_model, "PSR").value
def __getitem__(self, key):
try:
return getattr(self.prefit_model, key)
except AttributeError:
log.error(f"Parameter {key} was not found in pulsar model {self.name}")
return None
def __contains__(self, key):
return key in self.prefit_model.params
def reset_model(self):
self.prefit_model = pint.models.get_model(self.parfile)
self.add_model_params()
self.postfit_model = None
self.postfit_resids = None
self.fitted = False
self.update_resids()
def reset_TOAs(self):
self.all_toas = get_TOAs(self.timfile, model=self.prefit_model, usepickle=True)
# Make sure that if we used a model, that any phase jumps from
# the parfile have their flags updated in the TOA table
if "PhaseJump" in self.prefit_model.components:
self.prefit_model.jump_params_to_flags(self.all_toas)
# turns pre-existing jump flags in toas.table['flags'] into parameters in parfile
self.prefit_model.jump_flags_to_params(self.all_toas)
self.selected_toas = copy.deepcopy(self.all_toas)
self.deleted = set([])
self.stashed = None
self.update_resids()
def resetAll(self):
self.prefit_model = pint.models.get_model(self.parfile)
self.postfit_model = None
self.postfit_resids = None
self.fitted = False
self.use_pulse_numbers = False
self.reset_TOAs()
def _delete_TOAs(self, toa_table):
del_inds = np.in1d(toa_table["index"], np.array(list(self.deleted)))
return toa_table[~del_inds] if del_inds.sum() < len(toa_table) else None
def delete_TOAs(self, indices, selected):
# note: indices should be a list or an array
self.deleted |= set(indices) # update the deleted indices
if selected is not None:
self.selected_toas.table = self._delete_TOAs(self.selected_toas.table)
# Now delete from all_toas
self.all_toas.table = self._delete_TOAs(self.all_toas.table)
if self.selected_toas.table is None: # all selected were deleted
self.selected_toas = copy.deepcopy(self.all_toas)
selected = np.zeros(self.selected_toas.ntoas, dtype=bool)
else:
# Make a new selected list by adding a value if the table
# index at that position is not in the new indices to
# delete, with a value that is the same as the previous
# selected array
newselected = [
sel
for idx, sel in zip(self.all_toas.table["index"], selected)
if idx not in indices
]
selected = np.asarray(newselected, dtype=bool)
self.selected_toas = self.all_toas[selected]
# delete the TOAs from the stashed list also
if self.stashed:
self.stashed.table = self._delete_TOAs(self.stashed.table)
return selected
def update_resids(self):
# update the pre and post fit residuals using all_toas
track_mode = "use_pulse_numbers" if self.use_pulse_numbers else None
self.prefit_resids = Residuals(
self.all_toas, self.prefit_model, subtract_mean=False, track_mode=track_mode
)
if self.selected_toas.ntoas and self.selected_toas.ntoas != self.all_toas.ntoas:
self.selected_prefit_resids = Residuals(
self.selected_toas,
self.prefit_model,
subtract_mean=False,
track_mode=track_mode,
)
else:
self.selected_prefit_resids = self.prefit_resids
if self.fitted:
self.postfit_resids = Residuals(
self.all_toas,
self.postfit_model,
subtract_mean=False,
track_mode=track_mode,
)
if (
self.selected_toas.ntoas > 0
and self.selected_toas.ntoas != self.all_toas.ntoas
):
self.selected_postfit_resids = Residuals(
self.selected_toas,
self.postfit_model,
subtract_mean=False,
track_mode=track_mode,
)
else:
self.selected_postfit_resids = self.postfit_resids
[docs] def orbitalphase(self):
"""
For a binary pulsar, calculate the orbital phase. Otherwise, return
an array of unitless quantities of zeros
"""
if not self.prefit_model.is_binary:
log.warning("This is not a binary pulsar")
return u.Quantity(np.zeros(self.all_toas.ntoas))
toas = self.all_toas
if self.fitted:
phase = self.postfit_model.orbital_phase(toas, anom="mean")
else:
phase = self.prefit_model.orbital_phase(toas, anom="mean")
return phase / (2 * np.pi * u.rad)
[docs] def dayofyear(self):
"""
Return the day of the year for all the TOAs of this pulsar
"""
t = Time(self.all_toas.get_mjds(), format="mjd")
year = Time(np.floor(t.decimalyear), format="decimalyear")
return np.asarray(t.mjd - year.mjd) << u.day
[docs] def year(self):
"""
Return the decimal year for all the TOAs of this pulsar
"""
t = Time(self.all_toas.get_mjds(), format="mjd")
return np.asarray(t.decimalyear) << u.year
[docs] def add_model_params(self):
"""This automatically adds the next available unfit prefix
parameters to the model so they show up on the GUI
"""
m = self.prefit_model
# Add next spin freq deriv
if "Spindown" in m.components:
c = m.components["Spindown"]
n = len(c.get_prefix_mapping_component("F"))
if f"F{n-1}" in m.free_params and not hasattr(m, f"F{n}"):
c.add_param(m.F0.new_param(n), setup=True)
log.debug(f"Adding F{n} to prefit model")
p = getattr(m, f"F{n}")
p.quantity = 0.0 * p.units
p.frozen = True
# Add next orbital freq deriv
if "BinaryBT" in m.components:
c = m.components["BinaryBT"]
n = len(c.get_prefix_mapping_component("FB"))
if f"FB{n-1}" in m.free_params and not hasattr(m, f"FB{n}"):
c.add_param(m.FB0.new_param(n), setup=True)
log.debug(f"Adding FB{n} to prefit model")
p = getattr(m, f"FB{n}")
p.quantity = 0.0 * p.units
p.frozen = True
# Add dispersion expansion component
if "DispersionDM" in m.components:
c = m.components["DispersionDM"]
n = len(c.get_prefix_mapping_component("DM")) + 1
# DM1 is always added, but might be unset
if n == 2 and m.DM1.value is None:
p = m.DM1
p.quantity = 0.0 * p.units
p.frozen = True
if f"DM{n-1}" in m.free_params and not hasattr(m, f"DM{n}"):
c.add_param(m.DM1.new_param(n), setup=True)
log.debug(f"Adding DM{n} to prefit model")
p = getattr(m, f"DM{n}")
p.quantity = 0.0 * p.units
p.frozen = True
m.setup() # Not sure if this is necessary
m.validate()
[docs] def write_fit_summary(self):
"""
Summarize fitting results
"""
if self.fitted:
wrms = self.selected_postfit_resids.rms_weighted()
print("Post-Fit Chi2: %.8g" % self.selected_postfit_resids.chi2)
print("Post-Fit DOF: %8d" % self.selected_postfit_resids.dof)
print(
"Post-Fit Reduced-Chi2: %.8g"
% self.selected_postfit_resids.reduced_chi2
)
print("Post-Fit Weighted RMS: %.8g us" % wrms.to(u.us).value)
print("------------------------------------")
print(
"%19s %24s\t%24s\t%16s %16s %16s"
% (
"Parameter",
"Pre-Fit",
"Post-Fit",
"Uncertainty",
"Difference",
"Diff/Unc",
)
)
print("-" * 132)
for key in self.prefit_model.free_params:
line = "%8s " % key
pre = getattr(self.prefit_model, key)
post = getattr(self.postfit_model, key)
line += "%10s " % ("" if post.units is None else str(post.units))
if post.quantity is not None:
line += "%24s\t" % pre.str_quantity(pre.quantity)
line += "%24s\t" % post.str_quantity(post.quantity)
try:
line += "%16.8g " % post.uncertainty.value
except Exception:
line += "%18s" % ""
diff = post.value - pre.value
line += "%16.8g " % diff
if pre.uncertainty is not None and pre.uncertainty.value != 0.0:
line += "%16.8g" % (diff / pre.uncertainty.value)
print(line)
else:
log.warning("Pulsar has not been fitted yet!")
[docs] def add_phase_wrap(self, selected, phase):
"""
Add a phase wrap to selected points in the TOAs object
Turn on pulse number tracking in the model, if it isn't already
:param selected: boolean array to apply to toas, True = selected toa
:param phase: phase difference to be added, i.e. -0.5, +2, etc.
"""
# Check if pulse numbers are in table already, if not, make the column
if (
"pulse_number" not in self.all_toas.table.colnames
or "pulse_number" not in self.selected_toas.table.colnames
):
if self.fitted:
self.all_toas.compute_pulse_numbers(self.postfit_model)
self.selected_toas.compute_pulse_numbers(self.postfit_model)
else:
self.all_toas.compute_pulse_numbers(self.prefit_model)
self.selected_toas.compute_pulse_numbers(self.prefit_model)
if (
"delta_pulse_number" not in self.all_toas.table.colnames
or "delta_pulse_number" not in self.selected_toas.table.colnames
):
self.all_toas.table["delta_pulse_number"] = np.zeros(self.all_toas.ntoas)
self.selected_toas.table["delta_pulse_number"] = np.zeros(
self.selected_toas.ntoas
)
# add phase wrap and update
self.all_toas.table["delta_pulse_number"][selected] += phase
self.use_pulse_numbers = True
self.update_resids()
[docs] def add_jump(self, selected):
"""
jump the toas selected or un-jump them if already jumped
:param selected: boolean array to apply to toas, True = selected toa
"""
# TODO: split into two functions
if "PhaseJump" not in self.prefit_model.components:
# if no PhaseJump component, add one
log.info("PhaseJump component added")
a = pint.models.jump.PhaseJump()
a.setup()
self.prefit_model.add_component(a)
retval = self.prefit_model.add_jump_and_flags(
self.all_toas.table["flags"][selected]
)
if self.fitted:
self.postfit_model.add_component(a)
log.info(f"New jump {retval} added for {selected.sum()} toas.")
return retval
# if gets here, has at least one jump param already
# and iif it doesn't overlap or cancel, add the param
numjumps = self.prefit_model.components["PhaseJump"].get_number_of_jumps()
if numjumps == 0:
log.warning(
"There are no jumps (maskParameter objects) in PhaseJump. Please delete the PhaseJump object and try again. "
)
return None
# delete the jump ad flags if the selected TOAs exactly overlap;
# else just delete the jump flag from the selected TOAs
for num in range(1, numjumps + 1):
# create boolean array corresponding to TOAs to be jumped
toas_jumped = [
"jump" in dict.keys() and str(num) in dict["jump"]
for dict in self.all_toas.table["flags"]
]
if np.array_equal(toas_jumped, selected):
# if current jump exactly matches selected, remove it
self.prefit_model.delete_jump_and_flags(
self.all_toas.table["flags"], num
)
if self.fitted:
self.postfit_model.delete_jump_and_flags(None, num)
log.info("removed param", f"JUMP{str(num)}")
return toas_jumped
# Has to be some overlap between jumps and selected TOAs
elif np.any(toas_jumped & selected):
# if not, then they don't exactly match, delete the common subset
jumped_selected = toas_jumped & selected
# Post fit model and prefit model share the same TOA table, so as long as we
# don't delete the jump altogether, modifying prefit model table flags is fine.
self.prefit_model.delete_not_all_jump_toas(
self.all_toas.table["flags"][jumped_selected], num
)
log.info(
f"Removed existing jump JUMP{str(num)} from {jumped_selected.astype(int).sum()} TOAs"
)
return list(jumped_selected)
# if here, then doesn't match anything
# add jump flags to selected TOAs at their perspective indices in the TOA tables
retval = self.prefit_model.add_jump_and_flags(
self.all_toas.table["flags"][selected]
)
log.info(f"New jump {retval} added for {selected.sum()} toas.")
if (
self.fitted
and self.prefit_model.components["PhaseJump"]
!= self.postfit_model.components["PhaseJump"]
):
param = self.prefit_model.components[
"PhaseJump"
].get_jump_param_objects() # array of jump objects
self.postfit_model.add_param_from_top(
param[-1], "PhaseJump"
) # add last (newest) jump
getattr(self.postfit_model, param[-1].name).frozen = False
self.postfit_model.components["PhaseJump"].setup()
return retval
def getDefaultFitter(self, downhill=False):
if self.all_toas.wideband:
return "WidebandDownhillFitter" if downhill else "WidebandTOAFitter"
if self.prefit_model.has_correlated_errors:
return "DownhillGLSFitter" if downhill else "GLSFitter"
else:
return "DownhillWLSFitter" if downhill else "WLSFitter"
def print_chi2(self, selected):
# Select all the TOAs if none are explicitly set
if not np.any(selected):
selected = ~selected
if self.fitted:
self.prefit_model = self.postfit_model
self.prefit_resids = self.postfit_resids
self.add_model_params()
self.update_resids()
wrms = self.selected_resids.rms_weighted()
print("------------------------------------")
print("Selected TOAs: %8d" % self.selected_toas.ntoas)
print("Selected Chi2: %.8g" % self.selected_resids.chi2)
print(
"Selected Chi2/Ntoa: %.8g"
% (self.selected_resids.chi2 / self.selected_toas.ntoas)
)
print("Selected Weighted RMS: %.8g us" % wrms.to(u.us).value)
print("------------------------------------")
[docs] def fit(self, selected, iters=4, compute_random=False):
"""
Run a fit using the specified fitter
"""
# Select all the TOAs if none are explicitly set
if not np.any(selected):
selected = ~selected
if self.fitted:
self.prefit_model = self.postfit_model
self.prefit_resids = self.postfit_resids
self.add_model_params()
self.update_resids()
# Have to change the fitter for each fit since TOAs and models change
log.info(f"Using {self.fit_method}")
self.fitter = getattr(pint.fitter, self.fit_method)(
self.selected_toas, self.prefit_model
)
wrms = self.selected_prefit_resids.rms_weighted()
print("\n------------------------------------")
print(" Pre-Fit Chi2: %.8g" % self.selected_prefit_resids.chi2)
print(" Pre-Fit reduced-Chi2: %.8g" % self.selected_prefit_resids.reduced_chi2)
print(" Pre-Fit Weighted RMS: %.8g us" % wrms.to(u.us).value)
print("------------------------------------")
# Do the actual fit and mark things as being fit
self.fitter.fit_toas(maxiter=iters)
self.fitter.update_model()
self.postfit_model = self.fitter.model
self.fitted = True
# Zero out all of the "delta_pulse_numbers" if they are set
if np.any(self.all_toas.table["delta_pulse_number"]):
self.all_toas.table["delta_pulse_number"] = np.zeros(self.all_toas.ntoas)
self.selected_toas.table["delta_pulse_number"] = np.zeros(
self.selected_toas.ntoas
)
# Re-calculate the pulse numbers here
self.all_toas.compute_pulse_numbers(self.postfit_model)
self.selected_toas.compute_pulse_numbers(self.postfit_model)
# Compute the residuals using correct pulse numbers
self.update_resids()
# Need this since it isn't updated using self.fitter.update_model()
self.fitter.model.CHI2.value = self.selected_postfit_resids.chi2
# And print the summary
self.write_fit_summary()
# Check to see if we should calculate an F-test
if (
hasattr(self, "lastfit")
and (len(self.postfit_model.free_params) > len(self.lastfit["free_params"]))
and (self.lastfit["ntoas"] == self.fitter.toas.ntoas)
):
prob = FTest(
self.lastfit["chi2"],
self.lastfit["dof"],
self.selected_postfit_resids.chi2,
self.selected_postfit_resids.dof,
)
new_params = set(self.postfit_model.free_params) - set(
self.lastfit["free_params"]
)
print(
f"F-test comparing post- to pre-fit models for addition of {new_params}:\n"
f" P = {prob:.3g} that the improvement is due to noise."
)
# plot the prefit without jumps
pm_no_jumps = copy.deepcopy(self.prefit_model)
for param in pm_no_jumps.params:
if param.startswith("JUMP"):
getattr(pm_no_jumps, param).value = 0.0
getattr(pm_no_jumps, param).frozen = True
self.prefit_resids_no_jumps = Residuals(self.all_toas, pm_no_jumps)
self.update_resids()
self.prefit_resids_no_jumps = self.prefit_resids
# Store some key params for possible F-testing
self.lastfit = {
"free_params": self.fitter.model.free_params,
"dof": self.selected_postfit_resids.dof,
"chi2": self.selected_postfit_resids.chi2,
"ntoas": self.fitter.toas.ntoas,
}
# adds extra prefix params for fitting
self.add_model_params()
print(
f"Akaike information criterion = {akaike_information_criterion(self.fitter.model, self.fitter.toas)}"
)
[docs] def random_models(self, selected):
"""Compute and plot random models"""
log.info("Computing random models based on parameter covariance matrix.")
if [p for p in self.postfit_model.free_params if p.startswith("DM")]:
log.warning(
"Fitting for DM while using random models can cause strange behavior."
)
# These are the currently selected TOAs in the fit
sim_sel = copy.deepcopy(self.selected_toas)
# These are single TOAs from each cluster of TOAs
inds = np.zeros(sim_sel.ntoas, dtype=bool)
inds[np.unique(sim_sel.get_clusters(), return_index=True)[1]] |= True
sim_sel = sim_sel[inds]
# Get the range of MJDs we are using in the fit
mjds = sim_sel.get_mjds().value
minselMJD, maxselMJD = mjds.min(), mjds.max()
extra = 0.1 # Fraction beyond TOAs to plot or calculate random models
if self.faketoas1 is None:
mjds = self.all_toas.get_mjds().value
minallMJD, maxallMJD = mjds.min(), mjds.max()
spanMJD = maxallMJD - minallMJD
# Select appropriate number of fake TOAs to generate.
if spanMJD < 1000:
Ntoas = 400
elif spanMJD < 4000:
Ntoas = 750
else:
Ntoas = 1500
log.debug(
f"Generating {Ntoas} fake TOAs for the random models over MJD {minallMJD - extra * spanMJD} to {minallMJD + extra * spanMJD}"
)
# By default we will use TOAs from the TopoCenter. This gets done only once.
self.faketoas1 = make_fake_toas_uniform(
minallMJD - extra * spanMJD,
maxallMJD + extra * spanMJD,
Ntoas,
self.postfit_model,
obs="coe",
freq=1 * u.THz, # effectively infinite frequency
include_bipm=sim_sel.clock_corr_info["include_bipm"],
include_gps=sim_sel.clock_corr_info["include_gps"],
)
self.faketoas1.compute_pulse_numbers(self.postfit_model)
self.faketoas1.get_clusters(add_column=True)
# Combine our TOAs
toas = merge_TOAs([sim_sel, self.faketoas1])
zero_residuals(toas, self.postfit_model)
# Get a selection array to select the non-fake TOAs
refs = np.asarray(toas.get_flag_value("name")[0]) != "fake"
# Compute the new random timing models
rs = calculate_random_models(
self.fitter,
toas,
Nmodels=15,
keep_models=False,
return_time=True,
)
# Get a selection array for the fake TOAs that covers the fit TOAs (plus extra)
mjds = toas.get_mjds().value
spanMJD = maxselMJD - minselMJD
toplot = np.bitwise_and(
mjds > (minselMJD - extra * spanMJD), mjds < (maxselMJD + extra * spanMJD)
)
# This is the mean of the reference resids for the selected TOAs
if selected.sum(): # shorthand for having some selected
ref_mean = self.postfit_resids.time_resids[selected][inds].mean()
else:
ref_mean = self.postfit_resids.time_resids[inds].mean()
# This is the means of the corresponding resids from the random models
ran_mean = rs[:, refs].mean(axis=1)
# Now adjust each set of random resids so that the ran_mean == ref_mean
rs -= ran_mean[:, np.newaxis]
rs += ref_mean
# And store the key things for plotting
self.faketoas = toas
self.random_resids = rs