"""Markov Chain Monte Carlo fitting."""
import copy
import matplotlib.pyplot as plt
import numpy as np
from astropy.table import vstack
from scipy.stats import norm, uniform
from loguru import logger as log
import pint.plot_utils as plot_utils
from pint.eventstats import hm, hmw
from pint.fitter import Fitter
from pint.models.priors import Prior
from pint.residuals import Residuals
from pint.templates.lctemplate import LCTemplate
__all__ = [
"MCMCFitter",
"MCMCFitterAnalyticTemplate",
"MCMCFitterBinnedTemplate",
"CompositeMCMCFitter",
]
[docs]def concat_toas(toas):
"""Concatenate a list of TOAs objects into a single TOAs object"""
if len(toas) == 1:
return toas[0]
ts = copy.deepcopy(toas[0])
for t in toas[1:]:
print(len(ts.table), len(t.table))
ts.table = vstack([ts.table, t.table])
print("\t%d" % len(ts.table))
ts.table = ts.table.group_by("obs")
return ts
[docs]def lnprior_basic(ftr, theta):
"""Basic implementation of log prior.
This will work for both analytic and
binned templates, including when the template parameters are part of the
search space.
"""
theta_model = ftr.get_model_parameters(theta)
theta_templ = ftr.get_template_parameters(theta)
lnsum = 0.0
for val, key in zip(theta_model, ftr.fitkeys):
lnsum += getattr(ftr.model, key).prior_pdf(val, logpdf=True)
# Loop over template parameters here: hard coded uniform for now
if theta_templ is not None:
for val, bounds in zip(theta_templ, ftr.tbounds):
if (val < bounds[0]) or (val > bounds[1]):
return -np.inf
return lnsum
[docs]def lnlikelihood_basic(ftr, theta):
"""The log of the likelihood function, basic implementation.
Assumes that the phase is the last parmameter in the parameter list
"""
ftr.set_parameters(theta)
phases = ftr.get_event_phases()
phss = phases.astype(np.float64)
phss[phss < 0] += 1.0
phss[phss >= 1] -= 1.0
probs = ftr.get_template_vals(phss)
if ftr.weights is None:
return np.log(probs).sum()
else:
return np.log(ftr.weights * probs + 1.0 - ftr.weights).sum()
[docs]def lnlikelihood_chi2(ftr, theta):
ftr.set_parameters(theta)
return -Residuals(toas=ftr.toas, model=ftr.model).calc_chi2()
[docs]def set_priors_basic(ftr, priorerrfact=10.0):
"""Basic method to set priors on parameters in the model.
This adds a gaussian prior on each parameter with width equal to
the par file uncertainty * priorerrfact and then puts in some special cases
"""
fkeys, fvals, ferrs = ftr.fitkeys, ftr.fitvals, ftr.fiterrs
for key, val, err in zip(fkeys, fvals, ferrs):
if key in ["SINI", "E", "ECC"]:
getattr(ftr.model, key).prior = Prior(uniform(0.0, 1.0))
elif key == "PX":
getattr(ftr.model, key).prior = Prior(uniform(0.0, 10.0))
elif key.startswith("GLPH"):
getattr(ftr.model, key).prior = Prior(uniform(-0.4, 1.0))
else:
if err == 0 and not getattr(ftr.model, key).frozen:
ftr.priors_set = False
raise ValueError(
f"Parameter {key} does not have uncertainty in par file"
)
getattr(ftr.model, key).prior = Prior(
norm(loc=float(val), scale=float(err * priorerrfact))
)
ftr.priors_set = True
[docs]class MCMCFitter(Fitter):
"""A class for Markov-Chain Monte Carlo optimization style-fitting
This fitting is similar to that implemented in event_optimize.py
Parameters
----------
toas : pint.toas.TOAs
model
sampler : pint.sampler.MCMCSampler
template : object or None
A template profile, for example, of a gaussian pulse.
If template is none, then all template methods will do nothing,
or raise an error, or return None. If a template is set, it is
assumed that a subclass is being used.
lnprior : callable
The log prior function - defaults to lnprior above
lnlike : callable
The log likelihood function - defaults to lnlikelihood above
setpriors : callable
The function for setting the priors on model parameters
weights : optional
Weights for likelihood calculations
minMJD : optional
Minimium MJD in dataset (used sometimes for get_initial_pos)
maxMJD : optional
Maximum MJD in dataset (used sometimes for get_initial_pos)
"""
def __init__(self, toas, model, sampler, **kwargs):
super().__init__(toas, model, track_mode=kwargs.get("track_mode", None))
self.toas = toas
self.model_init = model
self.use_resids = kwargs.get("resids", True)
if self.use_resids:
self.resids_init = Residuals(toas=toas, model=model)
self.reset_model()
else:
self.model = model
self.method = "MCMC"
self.sampler = sampler
self.lnprior = kwargs.get("lnprior", lnprior_basic)
self.lnlikelihood = kwargs.get("lnlike", lnlikelihood_basic)
self.set_priors = kwargs.get("setpriors", set_priors_basic)
# Default values for these arguments were taken from event_optimize.py
self.weights = kwargs.get("weights", None)
self.minMJD = kwargs.get("minMJD", 40000)
self.maxMJD = kwargs.get("maxMJD", 60000)
self.fitkeys, self.fitvals, self.fiterrs = self.generate_fit_keyvals()
self.n_fit_params = len(self.fitvals)
template = kwargs.get("template", None)
if template is not None:
self.set_template(template)
else:
self.template = None
log.info("Fit Keys:\t%s" % (self.fitkeys))
log.info("Fit Vals:\t%s" % (self.fitvals))
self.numcalls = 0
self.maxpost = -np.inf
self.maxpost_fitvals = self.fitvals
self.priors_set = False
[docs] def set_template(self, template):
"""
Sets template and template metadata. Implementation depends on whether
template is analytic or binned.
"""
raise NotImplementedError
[docs] def get_template_vals(self, phases):
"""
Use the template (if it exists) to get probabilities for given phases
"""
if self.template is None:
return None
raise NotImplementedError
[docs] def clip_template_params(self, pos):
"""
If template parameters are changeable, ensure that they are within bounds
Any passing the template bounds will be clipped to the edges.
If template is not being fit to, then this does nothing
"""
if self.template is None:
return pos
raise NotImplementedError
[docs] def get_model_parameters(self, theta):
"""
Split the parameters related to the model
"""
if self.template is None:
return theta
raise NotImplementedError
[docs] def get_template_parameters(self, theta):
"""
Split the parameters related to the template
"""
if self.template is None:
return None
raise NotImplementedError
[docs] def get_parameters(self):
"""
Get all parameters for this fitter
"""
if self.template is None:
return self.fitvals
raise NotImplementedError
[docs] def get_parameter_names(self):
"""
Get parameter names for this fitter
"""
if self.template is None:
return self.fitkeys
raise NotImplementedError
[docs] def set_parameters(self, theta):
"""
Set timing and template parameters as necessary
"""
if self.template is None:
self.set_params(dict(zip(self.fitkeys, theta)))
else:
raise NotImplementedError
[docs] def get_errors(self):
"""
Get errors associated with all fit parameters
"""
if self.template is None:
return self.fiterrs
raise NotImplementedError
[docs] def get_fit_keyvals(self):
"""
Basic getter, useful in event_optimize script
"""
return self.fitkeys, self.fitvals, self.fiterrs
[docs] def generate_fit_keyvals(self):
"""Read the model to determine fitted keys and their values and errors
from the par file
"""
fitkeys = [p for p in self.model.params if not getattr(self.model, p).frozen]
fitvals = []
fiterrs = []
for p in fitkeys:
fitvals.append(getattr(self.model, p).value)
fiterrs.append(getattr(self.model, p).uncertainty_value)
return fitkeys, np.asarray(fitvals), np.asarray(fiterrs)
def get_weights(self):
return self.weights
[docs] def get_event_phases(self):
"""
Return pulse phases based on the current model
"""
phases = self.model.phase(self.toas).frac
# ensure all positive
return np.where(phases < 0.0, phases + 1.0, phases)
[docs] def lnposterior(self, theta):
"""
The log posterior (priors * likelihood)
"""
self.numcalls += 1
# Evaluate prior first. Don't compute posterior if prior is not finite
lnprior = self.lnprior(self, theta)
if not np.isfinite(lnprior):
return -np.inf
lnlikelihood = self.lnlikelihood(self, theta)
lnpost = lnprior + lnlikelihood
if lnpost > self.maxpost:
log.info("New max: %f\tCall %d" % (lnpost, self.numcalls))
for name, val in zip(self.fitkeys, theta):
log.info("\t%8s: %25.15g" % (name, val))
self.maxpost = lnpost
self.maxpost_fitvals = theta
return lnpost
[docs] def minimize_func(self, theta):
"""Override superclass minimize_func to make compatible with scipy.optimize"""
# Scale params based on errors
ntheta = (self.get_model_parameters(theta) * self.fiterrs) + self.fitvals
self.set_params(dict(zip(self.fitkeys, ntheta)))
if not np.isfinite(self.lnprior(self, ntheta)):
return np.inf
lnlikelihood = self.lnlikelihood(self, theta)
return -lnlikelihood
[docs] def fit_toas(self, maxiter=100, pos=None, errfact=0.1, priorerrfact=10.0):
"""Fitting function - calls sampler.run_mcmc to converge using MCMC approach
Parameters
----------
maxiter : int
The number of iterations to run_mcmc for
pos
The intiial position of the sampler. Default behavior calls
sampler.get_initial_pos()
errfact : float, optional
Multiplicative factor for errors in get_intial_pos
priorerrfact : float, optional
Error factor in setting prior widths
"""
# Set model priors if it hasn't been done yet
if not self.priors_set:
self.set_priors(self, priorerrfact)
# Set initial positions for walkers if they haven't been specified
if pos is None:
pos = self.sampler.get_initial_pos(
self.fitkeys,
self.fitvals,
self.fiterrs,
errfact,
minMJD=self.minMJD,
maxMJD=self.maxMJD,
)
# If template exists, make sure that template params are within tbound
pos = self.clip_template_params(pos)
# Initialize sampler
self.sampler.initialize_sampler(self.lnposterior, self.n_fit_params)
# Run sampler for some number of iterations
self.sampler.run_mcmc(pos, maxiter)
# Process results and get chi2 for new parameters
self.set_params(dict(zip(self.fitkeys, self.maxpost_fitvals)))
if self.use_resids:
self.resids.update()
return self.lnposterior(self.maxpost_fitvals)
[docs] def phaseogram(
self, weights=None, bins=100, rotate=0.0, size=5, alpha=0.25, plotfile=None
):
"""Make a nice 2-panel phaseogram for the current model"""
mjds = self.toas.table["tdbld"].data
phss = self.get_event_phases()
plot_utils.phaseogram(
mjds,
phss,
weights=self.get_weights(),
bins=bins,
rotate=rotate,
size=size,
alpha=alpha,
plotfile=plotfile,
)
[docs] def prof_vs_weights(self, nbins=50, use_weights=False):
"""
Show binned profiles (and H-test values) as a function
of the minimum weight used. nbins is only for the plots.
"""
f, ax = plt.subplots(3, 3, sharex=True)
phss = self.get_event_phases()
htests = []
weights = np.linspace(0.0, 0.95, 20)
swgts = self.get_weights()
for ii, minwgt in enumerate(weights):
good = swgts > minwgt
nphotons = np.sum(good)
wgts = swgts[good] if use_weights else None
if nphotons <= 0:
hval = 0
else:
hval = hmw(phss[good], weights=wgts) if use_weights else hm(phss[good])
htests.append(hval)
if ii > 0 and ii % 2 == 0 and ii < 20:
r, c = ((ii - 2) / 2) / 3, ((ii - 2) / 2) % 3
ax[r][c].hist(
phss[good],
nbins,
range=[0, 1],
weights=wgts,
color="k",
histtype="step",
)
ax[r][c].set_title(
"%.1f / %.1f / %.0f" % (minwgt, hval, nphotons), fontsize=11
)
if c == 0:
ax[r][c].set_ylabel("Htest")
if r == 2:
ax[r][c].set_xlabel("Phase")
f.suptitle(
f"{self.model.PSR.value}: Minwgt / H-test / Approx # events",
fontweight="bold",
)
if use_weights:
plt.savefig(f"{self.model.PSR.value}_profs_v_wgtcut.png")
else:
plt.savefig(f"{self.model.PSR.value}_profs_v_wgtcut_unweighted.png")
plt.close()
plt.plot(weights, htests, "k")
plt.xlabel("Min Weight")
plt.ylabel("H-test")
plt.title(self.model.PSR.value)
if use_weights:
plt.savefig(f"{self.model.PSR.value}_htest_v_wgtcut.png")
else:
plt.savefig(f"{self.model.PSR.value}_htest_v_wgtcut_unweighted.png")
plt.close()
def plot_priors(self, chains, burnin, bins=100, scale=False):
plot_utils.plot_priors(
self.model,
chains,
self.maxpost_fitvals,
self.fitvals,
burnin=burnin,
bins=bins,
scale=scale,
)
[docs]class MCMCFitterBinnedTemplate(MCMCFitter):
"""A subclass of MCMCFitter, designed to use a binned template with
interpolation instead of an analytic function
"""
def __init__(self, toas, model, sampler, **kwargs):
super().__init__(toas, model, sampler, **kwargs)
[docs] def set_template(self, template):
"""
Set template and template metadata. For binned template, we want to
set the x values for interpolation purposes here.
"""
self.template = template
self.ltemp = len(template)
self.xtemp = np.arange(self.ltemp) * 1.0 / self.ltemp
[docs] def get_template_vals(self, phases):
if self.template is None:
raise ValueError("Template has not been initialized in MCMCFitter")
return np.interp(phases, self.xtemp, self.template, right=self.template[0])
[docs] def clip_template_params(self, pos):
return pos
[docs] def get_model_parameters(self, theta):
return theta
[docs] def get_template_parameters(self, theta):
return None
[docs] def get_parameters(self):
return self.fitvals
[docs] def get_parameter_names(self):
return self.fitkeys
[docs] def set_parameters(self, theta):
self.set_params(dict(zip(self.fitkeys, theta)))
[docs] def get_errors(self):
return self.fiterrs
[docs]class MCMCFitterAnalyticTemplate(MCMCFitter):
"""A subclass of MCMCFitter, designed to use an analytic template rather
than a binned one that uses interpolation.
"""
def __init__(self, toas, model, sampler, template, **kwargs):
super().__init__(toas, model, sampler, template=template, **kwargs)
[docs] def set_template(self, template):
"""
Metadata for analytic template
"""
self.template = template
self.tfitkeys = template.get_parameter_names()
self.tfitvals = template.get_parameters()
self.tfiterrs = template.get_errors()
self.tbounds = template.get_bounds()
self.n_fit_params = len(self.fitvals) + len(self.tfitvals)
[docs] def get_template_vals(self, phases):
return self.template(phases, use_cache=True)
[docs] def clip_template_params(self, pos):
nfitkeys = len(self.fitkeys)
ret = pos
for i in range(nfitkeys, self.n_fit_params):
lo, hi = self.tbounds[i - nfitkeys]
ret = np.clip(ret[:, i], lo, hi)
return ret
[docs] def get_parameter_names(self):
return self.fitkeys + self.template.get_parameter_names()
[docs] def get_model_parameters(self, theta):
return theta[: len(self.fitkeys)]
[docs] def get_template_parameters(self, theta):
return theta[len(self.fitkeys) :]
[docs] def get_parameters(self):
return np.append(self.fitvals, self.tfitvals)
[docs] def set_parameters(self, theta):
self.set_params(dict(zip(self.fitkeys, self.get_model_parameters(theta))))
self.template.set_parameters(self.get_template_parameters(theta))
[docs] def get_errors(self):
return np.append(self.fiterrs, self.tfiterrs)
[docs]class CompositeMCMCFitter(MCMCFitter):
"""A subclass of MCMCFitter, designed to work on composite datasets
Requires a list of TOAs objects formed from different datafiles
to make up the toas table, as well as a list of log-likelihood methods
Here, the toas argument to the constructor is a list of TOAs objects,
while the toas parameter for this class is a concatenated TOAs object
containing all TOA information from all datasets
The goal is to fit all of the data sets to a single model, so only one
model is required in the construction of this object. In addition, only
one sampler is required.
Parameters
----------
weights
an array of weight lists for weighting individual TOAs
set_weights
an array of weights for each individual data set in
toas_list. The basic lnlikelihood function will be given by
lnlike = sum(setweight(i) * lnlike(toas_list(i)))
Defaults to an array of 1s
lnlikes
a list of lnlikelihood functions to be used on each entry
in toas_list. This is a required argument
templates
a list of templates for fitting to each individual dataset.
Defaults to None for everything
TODO: Add support for fitting templates here
"""
def __init__(self, toas, model, sampler, lnlikes, **kwargs):
self.toas_list = toas
self.toas = concat_toas(toas)
self.model = model
self.method = "MCMC"
self.sampler = sampler
self.lnprior = kwargs.get("lnprior", lnprior_basic)
self.lnlikelihoods = lnlikes
self.set_priors = kwargs.get("setpriors", set_priors_basic)
self.weights = kwargs.get("weights", [None] * len(self.toas_list))
self.set_weights = kwargs.get("set_weights", [1.0] * len(self.toas_list))
self.templates = kwargs.get("templates", [None] * len(self.toas_list))
self.xtemps = [None] * len(self.toas_list)
phs = kwargs.get("phs", 0.0)
phserr = kwargs.get("phserr", 0.03)
self.minMJD = kwargs.get("minMJD", 0)
self.maxMJD = kwargs.get("maxMJD", 100000)
self.fitkeys, self.fitvals, self.fiterrs = self.generate_fit_keyvals(
phs, phserr
)
self.n_fit_params = len(self.fitvals)
self.numcalls = 0
self.maxpost = -np.inf
self.maxpost_fitvals = self.fitvals
self.priors_set = False
self.use_resids = False
[docs] def clip_template_params(self, pos):
return pos
[docs] def get_model_parameters(self, theta):
return theta
[docs] def get_template_parameters(self, theta):
return None
[docs] def get_parameters(self):
return self.fitvals
[docs] def get_parameter_names(self):
return self.fitkeys
[docs] def set_parameters(self, theta):
self.set_params(dict(zip(self.fitkeys, theta)))
[docs] def get_errors(self):
return self.fiterrs
[docs] def get_event_phases(self, index=None):
"""Get phases for the TOAs object specified by index in toas_list.
If index is None, then it will return phases for all TOAs
"""
if index is None:
phases = self.model.phase(self.toas)[1]
print("Showing all %d phases" % len(phases))
else:
phases = self.model.phase(self.toas_list[index])[1]
return np.where(phases < 0.0, phases + 1.0, phases)
[docs] def get_template_vals(self, phases, index):
if self.templates[index] is None:
raise ValueError(
"Template for index %d has not been initialized in CompositeMCMCFitter"
% index
)
if isinstance(self.templates[index], LCTemplate):
return self.templates[index](phases, use_cache=True)
if self.xtemps[index] is None:
ltemp = len(self.templates[index])
self.xtemps[index] = np.arange(ltemp) * 1.0 / ltemp
return np.interp(
phases,
self.xtemps[index],
self.templates[index],
right=self.templates[index][0],
)
def get_weights(self, index=None):
if index is not None:
return self.weights[index]
wgts = np.zeros(len(self.toas.table))
curr = 0
for i in range(len(self.toas_list)):
ts = self.toas_list[i]
nxt = curr + len(ts.table)
print(curr, nxt, len(ts.table))
wgts[curr:nxt] = (
1.0 * self.set_weights[i]
if self.weights[i] is None
else self.weights[i] * self.set_weights[i]
)
curr = nxt
return wgts
[docs] def lnlikelihood(self, fitter, theta):
"""Sum over the log-likelihood functions for each dataset, multiply by weights in the sum.
Note
----
Requires a fitter passed because that is how this function is called by
lnposterior in the super class.
"""
self.set_parameters(theta)
lnsum = 0.0
curr = 0
for i in range(len(self.lnlikelihoods)):
lnsum += self.lnlikelihoods[i](self, theta, i) * self.set_weights[i]
return lnsum