"""Normalized template representing directional data
Implements a mixture model of LCPrimitives to form a normalized template representing directional data.
author: M. Kerr <matthew.kerr@gmail.com>
"""
import contextlib
import logging
from collections import defaultdict
from copy import deepcopy
import numpy as np
from .lcnorm import NormAngles
from .lcenorm import ENormAngles
from .lceprimitives import *
log = logging.getLogger(__name__)
def isvector(x):
return len(np.asarray(x).shape) > 0
[docs]class LCTemplate:
"""Manage a lightcurve template (collection of LCPrimitive objects).
IMPORTANT: a constant background is assumed in the overall model,
so there is no need to furnish this separately.
Parameters
----------
primitives : list of LCPrimitive
norms : NormAngles or tuple of float, optional
If a tuple, they are relative amplitudes for the primitive components.
"""
def __init__(self, primitives, norms=None, cache_kwargs=None):
self.primitives = primitives
self.shift_mode = np.any([p.shift_mode for p in self.primitives])
if norms is None:
norms = np.ones(len(primitives)) / len(primitives)
self.norms = norms if hasattr(norms, "_make_p") else NormAngles(norms)
self._sanity_checks()
self._cache = defaultdict(None)
self._cache_dirty = defaultdict(lambda: True)
if cache_kwargs is None:
cache_kwargs = {}
self.set_cache_properties(**cache_kwargs)
def __setstate__(self, state):
# TEMPORARY to handle changed class definition
self.__dict__.update(state)
_cache_dirty = defaultdict(lambda: True)
if not hasattr(self, "_cache_dirty"):
self._cache = defaultdict(None)
else:
# make _cache_dirty a defaultdict from a normal dict
_cache_dirty.update(self._cache_dirty)
self._cache_dirty = _cache_dirty
if not hasattr(self, "ncache"):
self.ncache = 1000
if not hasattr(self, "ph_edges"):
self.ph_edges = np.linspace(0, 1, self.ncache + 1)
if not hasattr(self, "en_cens"):
self.en_cens = None
if not hasattr(self, "en_edges"):
self.en_edges = None
def __getstate__(self):
# transform _cache_dirty into a normal dict, necessary to pickle it
state = self.__dict__.copy()
state["_cache_dirty"] = dict(state["_cache_dirty"])
return state
def _sanity_checks(self):
if len(self.primitives) != len(self.norms):
raise ValueError("Must provide a normalization for each component.")
def is_energy_dependent(self):
c1 = np.any([p.is_energy_dependent() for p in self.primitives])
return c1 or self.norms.is_energy_dependent()
def has_bridge(self):
return False
def __getitem__(self, index):
if index < 0:
index += len(self.primitives) + 1
return self.norms if index == len(self.primitives) else self.primitives[index]
def __setitem__(self, index, value):
if index < 0:
index += len(self.primitives) + 1
if index == len(self.primitives):
self.norms = value
else:
self.primitives[index] = value
def __len__(self):
# sourcery skip: remove-unreachable-code
raise DeprecationWarning("I'd like to see if this is used.")
return len(self.primitives)
def copy(self):
prims = [deepcopy(x) for x in self.primitives]
norms = self.norms.copy()
cache_kwargs = dict(ncache=self.ncache, en_edges=self.en_edges)
newcopy = self.__class__(prims, norms, cache_kwargs=cache_kwargs)
for key in self._cache.keys():
newcopy._cache[key] = self._cache[key]
newcopy._cache_dirty[key] = self._cache_dirty[key]
return newcopy
[docs] def set_cache_properties(self, ncache=1000, en_edges=None):
"""Set the granularity and behavior of the cache.
In all cases, ncache sets the phase resolution. If it is desired
to have energy dependence, en_edges must be specified as a set of
edges in log10 space which fully encompass all possible photon
energies that wil be used.
Interpolation is always linear (bilinear in energy if applicable.)
"""
if hasattr(self, "ncache") and (ncache == self.ncache):
if (en_edges is None) and (self.en_edges is None):
return
elif np.all(en_edges == self.en_edges):
return
self.ncache = ncache
self.ph_edges = np.linspace(0, 1, ncache + 1)
if en_edges is None:
self.en_edges = None
self.en_cens = None
else:
en_edges = np.asarray(en_edges)
if len(en_edges) < 2:
raise ValueError("len(en_edges) must be >=2.")
self.en_edges = en_edges
self.en_cens = 0.5 * (en_edges[1:] + en_edges[:-1])
self.mark_cache_dirty()
def mark_cache_dirty(self):
for k in self._cache_dirty.keys():
self._cache_dirty[k] = True
def get_cache(self, order=0):
if self._cache_dirty[order]:
self.set_cache(order=order)
# I don't see how it's possible to have a cache with wrong shape.
rval = self._cache[order]
if self.en_edges is not None:
assert rval.shape[0] == len(self.en_edges)
assert rval.shape[-1] == (self.ncache + 1)
return rval
[docs] def set_cache(self, order=0):
"""Populate the cache with values along the bin edges."""
ncache = self.ncache
if self.en_edges is None:
if order == 0:
new_cache = self(self.ph_edges)
else:
new_cache = self.derivative(self.ph_edges, order=order)
else:
new_cache = np.empty((len(self.en_edges), len(self.ph_edges)))
if order == 0:
for ibin, en in enumerate(self.en_edges):
new_cache[ibin] = self(self.ph_edges, log10_ens=en)
else:
for ibin, en in enumerate(self.en_edges):
new_cache[ibin] = self.derivative(
self.ph_edges, log10_ens=en, order=order
)
self._cache[order] = new_cache
self._cache_dirty[order] = False
[docs] def eval_cache(self, phases, log10_ens=3, order=0):
"""
Cached values are stored on edges in both phase and, if applicable,
energy, so lookup is straightforward.
"""
cache = self.get_cache(order=order)
dphi = np.atleast_1d(phases) * self.ncache
phind_lo = dphi.astype(int)
phind_hi = phind_lo + (phind_lo < self.ncache) # allows ph==1
dphi_hi = dphi - phind_lo
dphi_lo = 1.0 - dphi_hi
assert np.all(dphi_hi >= 0)
assert np.all(dphi_hi <= 1)
assert np.all(dphi_lo >= 0)
assert np.all(dphi_lo <= 1)
edges = self.en_edges
if edges is None:
return cache[phind_lo] * dphi_lo + cache[phind_hi] * dphi_hi
de = (log10_ens - edges[0]) / (edges[1] - edges[0])
eind_lo = de.astype(int)
eind_hi = eind_lo + 1
de_hi = de - eind_lo
de_lo = 1.0 - de_hi
assert np.all(de_hi >= 0)
assert np.all(de_hi <= 1)
assert np.all(de_lo >= 0)
assert np.all(de_lo <= 1)
v00 = cache[eind_lo, phind_lo] * (de_lo * dphi_lo)
v01 = cache[eind_lo, phind_hi] * (de_lo * dphi_hi)
v10 = cache[eind_hi, phind_lo] * (de_hi * dphi_lo)
v11 = cache[eind_hi, phind_hi] * (de_hi * dphi_hi)
return v00 + v01 + v10 + v11
def set_parameters(self, p, free=True):
start = 0
params_ok = True
for prim in self.primitives:
n = len(prim.get_parameters(free=free))
prim.set_parameters(p[start : start + n], free=free)
start += n
self.norms.set_parameters(p[start:], free)
self.mark_cache_dirty()
return self.check_bounds(free=free)
def set_errors(self, errs):
start = 0
for prim in self.primitives:
start += prim.set_errors(errs[start:])
self.norms.set_errors(errs[start:])
def get_parameters(self, free=True):
return np.append(
np.concatenate([prim.get_parameters(free) for prim in self.primitives]),
self.norms.get_parameters(free),
)
[docs] def num_parameters(self, free=True):
"""Return the total number of free parameters."""
nprim = sum((prim.num_parameters(free) for prim in self.primitives))
return nprim + self.norms.num_parameters(free)
def _set_all_free_or_fixed(self, freeze=True):
for prim in self.primitives:
prim.free[:] = not freeze
self.norms.free[:] = not freeze
[docs] def free_parameters(self):
"""Free all parameters. Convenience function."""
self._set_all_free_or_fixed(freeze=False)
[docs] def freeze_parameters(self):
"""Freeze all parameters. Convenience function."""
self._set_all_free_or_fixed(freeze=True)
def get_errors(self, free=True):
return np.append(
np.concatenate([prim.get_errors(free) for prim in self.primitives]),
self.norms.get_errors(free),
)
[docs] def get_free_mask(self):
"""Return a mask with True if parameters are free, else False."""
m1 = np.concatenate([p.get_free_mask() for p in self.primitives])
return np.append(m1, self.norms.get_free_mask())
def get_parameter_names(self, free=True):
# I will no doubt hate myself in future for below comprehension
# (or rather lack thereof); this comment will not assuage my rage
prim_names = [
"P%d_%s_%s"
% (
iprim + 1,
prim.name[:3] + (prim.name[-1] if prim.name[-1].isdigit() else ""),
pname[:3] + (pname[-1] if pname[-1].isdigit() else ""),
)
for iprim, prim in enumerate(self.primitives)
for pname in prim.get_parameter_names(free=free)
]
norm_names = [
"Norm_%s" % pname for pname in self.norms.get_parameter_names(free=free)
]
return prim_names + norm_names
# return np.append(np.concatenate( [prim.pnames(free) for prim in self.primitives]) , self.norms.get_parameters(free))
def get_gaussian_prior(self):
locs, widths, mods, enables = [], [], [], []
for prim in self.primitives:
l, w, m, e = prim.get_gauss_prior_parameters()
locs.append(l)
widths.append(w)
mods.append(m)
enables.append(e)
t = np.zeros_like(self.norms.get_parameters())
locs = np.append(np.concatenate(locs), t)
widths = np.append(np.concatenate(widths), t)
mods = np.append(np.concatenate(mods), t.astype(bool))
enables = np.append(np.concatenate(enables), t.astype(bool))
return GaussianPrior(locs, widths, mods, mask=enables)
def get_bounds(self, free=True):
b1 = np.concatenate([prim.get_bounds(free) for prim in self.primitives])
b2 = self.norms.get_bounds(free)
return np.concatenate((b1, b2)).tolist()
def check_bounds(self, free=True):
bounds = np.asarray(self.get_bounds(free=free))
x0 = self.get_parameters(free=free)
return np.all((x0 >= bounds[:, 0]) & (x0 <= bounds[:, 1]))
[docs] def set_overall_phase(self, ph):
"""Put the peak of the first component at phase ph."""
self.mark_cache_dirty()
if self.shift_mode:
self.primitives[0].p[0] = ph
return
shift = ph - self.primitives[0].get_location()
for prim in self.primitives:
new_location = (prim.get_location() + shift) % 1
prim.set_location(new_location)
def get_location(self):
return self.primitives[0].get_location()
[docs] def get_amplitudes(self, log10_ens=3):
"""Return maximum amplitude of a component."""
ampls = [p(p.get_location(), log10_ens) for p in self.primitives]
return self.norms(log10_ens) * np.asarray(ampls)
[docs] def get_code(self):
"""Return a short string encoding the components in the template."""
return "/".join((p.shortname for p in self.primitives))
def norm(self):
return self.norms.get_total()
def norm_ok(self):
return self.norm() <= 1
[docs] def integrate(self, phi1, phi2, log10_ens=3, suppress_bg=False):
"""Integrate profile from phi1 to phi2.
NB that because of the phase modulo ambiguity, it is not uniquely
definite what the phi2 < phi1 case means:
integral(0.8,0.2) == -integral(0.2,0.8)
integral(0.8,1.2) == 1-integral(0.2,0.8)
To break the ambiguity, we support non-modulo phase here, so you
can just write integral(0.8,1.2) if that's what you mean.
"""
phi1 = np.asarray(phi1)
phi2 = np.asarray(phi2)
if isvector(log10_ens):
assert len(log10_ens) == len(phi1)
with contextlib.suppress(TypeError):
assert len(phi1) == len(phi2)
norms = self.norms(log10_ens=log10_ens)
t = norms.sum(axis=0)
dphi = phi2 - phi1
rvals = np.zeros(phi1.shape, dtype=float)
for n, prim in zip(norms, self.primitives):
rvals += n * prim.integrate(phi1, phi2, log10_ens=log10_ens)
return rvals * (1.0 / t) if suppress_bg else (1 - t) * dphi + rvals
def cdf(self, x, log10_ens=3):
return self.integrate(np.zeros_like(x), x, log10_ens, suppress_bg=False)
def max(self, resolution=0.01):
return self(np.arange(0, 1, resolution)).max()
def _get_scales(self, phases, log10_ens=3):
"""Method to allow abstraction for setting amplitudes for each
peak. Trivial in typical cases, but important for linked
components, e.g. the bridge pedestal.
"""
rvals = np.zeros(np.asarray(phases).shape, dtype=float)
norms = self.norms(log10_ens)
return rvals, norms, norms.sum(axis=0)
[docs] def __call__(self, phases, log10_ens=3, suppress_bg=False, use_cache=False):
"""Evaluate template at the provided phases and (if provided)
energies. If "suppress_bg" is set, ignore the DC component."""
# TMP -- check phase range. Add this as a formal check?
phases = np.asarray(phases)
log10_ens = np.asarray(log10_ens)
assert np.all(phases >= 0)
assert np.all(phases <= 1)
# end TM
if use_cache:
return self.eval_cache(phases, log10_ens=log10_ens, order=0)
rvals, norms, norm = self._get_scales(phases, log10_ens)
for n, prim in zip(norms, self.primitives):
rvals += n * prim(phases, log10_ens=log10_ens)
return rvals / norm if suppress_bg else (1.0 - norm) + rvals
[docs] def derivative(self, phases, log10_ens=3, order=1, use_cache=False):
"""Return the derivative of the template with respect to pulse
phase (as opposed to the gradient of the template with respect to
some of the template parameters)."""
if use_cache:
return self.eval_cache(phases, order=order)
rvals = np.zeros_like(phases)
norms = self.norms(log10_ens=log10_ens)
for n, prim in zip(norms, self.primitives):
rvals += n * prim.derivative(phases, log10_ens=log10_ens, order=order)
return rvals
[docs] def single_component(self, index, phases, log10_ens=3, add_bg=False):
"""Evaluate a single component of template."""
n = self.norms(log10_ens=log10_ens)
rvals = self.primitives[index](phases, log10_ens=log10_ens) * n[index]
return rvals + n.sum(axis=0) if add_bg else rvals
def gradient(self, phases, log10_ens=3, free=True, template_too=False):
r = np.empty((self.num_parameters(free), len(phases)))
c = 0
rvals, norms, norm = self._get_scales(phases, log10_ens=log10_ens)
prim_terms = np.empty((len(self.primitives), len(phases)))
for i, (nm, prim) in enumerate(zip(norms, self.primitives)):
n = len(prim.get_parameters(free=free))
r[c : c + n, :] = nm * prim.gradient(phases, log10_ens=log10_ens, free=free)
c += n
prim_terms[i] = prim(phases, log10_ens=log10_ens) - 1
# handle case where no norm parameters are free
if c != r.shape[0]:
# "prim_terms" are df/dn_i and have shape nnorm x nphase
# the output of gradient is the matrix M_ij or M_ijk, depending
# on whether or not there is energy dependence, which is
# dnorm_i/dnorm_angle_j (at energy k). The sum is over the
# "internal parameter" norm_j, while the phase axis and
# norm_angle axis are retained.
m = self.norms.gradient(log10_ens=log10_ens, free=free)
if len(m.shape) == 2:
m = m[..., None]
np.einsum("ij,ikj->kj", prim_terms, m, out=r[c:])
if template_too:
rvals[:] = 1 - norm
for i in range(len(prim_terms)):
rvals += (prim_terms[i] + 1) * norms[i]
return r, rvals
return r
[docs] def gradient_derivative(self, phases, log10_ens=3, free=False):
"""Return d/dphi(gradient). This is the derivative with respect
to pulse phase of the gradient with respect to the parameters.
"""
# sourcery skip: remove-unreachable-code
raise NotImplementedError() # is this used anymore?
free_mask = self.get_free_mask()
nparam = len(free_mask)
nnorm_param = len(self.norms.p)
nprim_param = nparam - nnorm_param
rvals = np.empty([nparam, len(phases)])
prim_terms = np.empty([len(self.primitives), len(phases)])
norms = self.norms()
c = 0
for iprim, prim in enumerate(self.primitives):
n = len(prim.p)
rvals[c : c + n] = norms[iprim] * prim.gradient_derivative(phases)
prim_terms[iprim] = prim.derivative(phases)
c += n
norm_grads = self.norms.gradient(phases, free=False)
for j in range(nnorm_param):
rvals[nprim_param + j] = 0
for i in range(nnorm_param):
rvals[nprim_param + j] += norm_grads[i, j] * prim_terms[i]
return rvals
def approx_gradient(self, phases, log10_ens=None, eps=1e-5):
return approx_gradient(self, phases, log10_ens=log10_ens, eps=eps)
def approx_hessian(self, phases, log10_ens=None, eps=1e-5):
return approx_hessian(self, phases, log10_ens=log10_ens, eps=eps)
def approx_derivative(self, phases, log10_ens=None, order=1, eps=1e-7):
return approx_derivative(
self, phases, log10_ens=log10_ens, order=order, eps=eps
)
def check_gradient(
self, atol=1e-7, rtol=1e-5, quiet=False, seed=None, en=None, ph=None
):
if seed is not None:
# essentially set a known good state of the RNG to avoid any
# numerical issues interfering with the test
np.random.seed(seed)
return check_gradient(self, atol=atol, rtol=rtol, quiet=quiet, en=en, ph=ph)
def check_derivative(self, atol=1e-7, rtol=1e-5, order=1, eps=1e-7, quiet=False):
return check_derivative(
self, atol=atol, rtol=rtol, quiet=quiet, eps=1e-7, order=order
)
[docs] def hessian(self, phases, log10_ens=3, free=True):
"""Return the hessian of the primitive and normalization angles.
The primitives components are not coupled due to the additive form
of the template. However, because each normalization depends on
multiple hyper angles, there is in general coupling between the
normalization components and the primitive components. The
general form of the terms is
(1) block diagonal hessian terms from primitive
(2 ) for the unmixed derivative of the norms, the sum of the
hessian of the hyper angles over the primitive terms
(3) for mixed derivatives, the product gradient of the norm
In general, this is pretty complicated if some parameters are free
and some are not, and (currently) this method isn't used in
fitting, so for ease of implementation, simply evaluate the whole
hessian, then return only the relevant parts for the free
parameters.
"""
free_mask = self.get_free_mask()
nparam = len(free_mask)
nnorm_param = self.norms.num_parameters()
nprim_param = nparam - nnorm_param
r = np.zeros([nparam, nparam, len(phases)])
norms = self.norms(log10_ens=log10_ens)
norm_grads = self.norms.gradient(log10_ens=log10_ens, free=False)
prim_terms = np.empty([len(self.primitives), len(phases)])
c = 0
for i, prim in enumerate(self.primitives):
h = prim.hessian(phases, log10_ens=log10_ens, free=False)
pg = prim.gradient(phases, log10_ens=log10_ens, free=False)
n = len(prim.p)
# put hessian in diagonal elements
r[c : c + n, c : c + n, :] = norms[i] * h
# put cross-terms with normalization; although only one primitive
# survives in the second derivative, all of the normalization angles
# feature
for j in range(n):
for k in range(nnorm_param):
r[nprim_param + k, c + j, :] = pg[j] * norm_grads[i, k]
r[c + j, nprim_param + k, :] = r[nprim_param + k, c + j, :]
prim_terms[i, :] = prim(phases, log10_ens=log10_ens) - 1
c += n
# now put in normalization hessian
hnorm = self.norms.hessian(
log10_ens=log10_ens
) # nnorm_param x nnorm_param x nnorm_param
for j in range(nnorm_param):
for k in range(j, nnorm_param):
for i in range(nnorm_param):
r[c + j, c + k, :] += hnorm[i, j, k] * prim_terms[i]
r[c + k, c + j, :] = r[c + j, c + k, :]
return r[free_mask][:, free_mask] if free else r
[docs] def delta(self, index=None):
"""Return radio lag -- reckoned by default as the position of the first peak following phase 0."""
if (index is not None) and (index <= (len(self.primitives))):
return self[index].get_location(error=True)
return self.Delta(delta=True)
[docs] def Delta(self, delta=False):
"""Report peak separation -- reckoned by default as the distance
between the first and final component locations.
delta [False] -- if True, return the first peak position"""
if len(self.primitives) == 1:
return -1, 0
prim0, prim1 = self.primitives[0], self.primitives[-1]
for p in self.primitives:
if p.get_location() < prim0.get_location():
prim0 = p
if p.get_location() > prim1.get_location():
prim1 = p
p1, e1 = prim0.get_location(error=True)
p2, e2 = prim1.get_location(error=True)
return (p1, e1) if delta else (p2 - p1, (e1**2 + e2**2) ** 0.5)
def _sorted_prims(self):
def cmp(p1, p2):
if p1.p[-1] < p2.p[-1]:
return -1
elif p1.p[-1] == p2.p[-1]:
return 0
else:
return 1
return sorted(self.primitives, cmp=cmp)
def __str__(self):
prims = self.primitives
s0 = str(self.norms)
s1 = (
"\n\n"
+ "\n\n".join(
["P%d -- " % (i + 1) + str(prim) for i, prim in enumerate(prims)]
)
+ "\n"
)
if len(prims) > 1:
s1 += "\ndelta : %.4f +\\- %.4f" % self.delta()
s1 += "\nDelta : %.4f +\\- %.4f" % self.Delta()
return s0 + s1
[docs] def prof_string(self, outputfile=None):
"""Return a string compatible with the format used by pygaussfit.
Assume all primitives are gaussians."""
rstrings = []
dashes = "-" * 25
norm, errnorm = 0, 0
for nprim, prim in enumerate(self.primitives):
phas = prim.get_location(error=True)
fwhm = 2 * prim.get_width(error=True, hwhm=True)
ampl = [self.norms()[nprim], 0]
norm += ampl[0]
errnorm += ampl[1] ** 2
for st, va in zip(["phas", "fwhm", "ampl"], [phas, fwhm, ampl]):
rstrings += ["%s%d = %.5f +/- %.5f" % (st, nprim + 1, va[0], va[1])]
const = "const = %.5f +/- %.5f" % (1 - norm, errnorm**0.5)
rstring = [dashes] + [const] + rstrings + [dashes]
if outputfile is not None:
f = open(outputfile, "w")
f.write("# gauss\n")
for s in rstring:
f.write(s + "\n")
return "\n".join(rstring)
[docs] def random(self, n, weights=None, log10_ens=3, return_partition=False):
"""Return n pseudo-random variables drawn from the distribution
given by this light curve template.
For simplicity, if weights are not provided, unit weights are
assumed. If energies are not provided, a vector of 1 GeV (3)
is assumed.
Next, optionally using the weights and the energy vectors, the
probability for each realization to arise from the primitives or
the background is determined. Those probabilities are used in a
multinomial to determine which component will generate each photon,
and finally using that partition the correct number of phases are
simulated from each component.
Weights ("w") are interpreted as the probability to originate from
the source, which includes the DC component, so the total prob. to
be DC is (1-w) (background) + w*sum_prims (unpulsed).
"""
n = int(round(n))
if len(self.primitives) == 0:
return (np.random.rand(n), [n]) if return_partition else np.random.rand(n)
# check weights
if weights is None:
weights = np.ones(n)
elif len(weights) != n:
raise ValueError("Provided weight vector does not match requested n.")
# check energies
if isvector(log10_ens):
if len(log10_ens) != n:
raise ValueError(
"Provided log10_ens vector does not match requested n."
)
else:
log10_ens = np.full(n, log10_ens)
# first, calculate the energy dependent norm of each vector
norms = self.norms(log10_ens=log10_ens) # nnorm x nen array
N = norms.sum(axis=0)
nDC = weights * N
pDC = 1 - nDC
partition_probs = np.append(norms / N * nDC, pDC[None, :], axis=0)
# now, draw a component for each bit of the partition
cpp = np.cumsum(partition_probs, axis=0)
assert np.allclose(cpp[-1], 1)
comps = np.full(n, len(self.primitives))
Q = np.random.rand(n)
for i in np.arange(len(self.primitives))[::-1]:
mask = Q < cpp[i]
comps[mask] = i
total = 0
rvals = np.empty(n)
rvals[:] = np.nan # TMP
total = 0
for iprim, prim in enumerate(self.primitives):
mask = comps == iprim
total += mask.sum()
rvals[mask] = prim.random(mask.sum(), log10_ens=log10_ens[mask])
# DC component
mask = comps == len(self.primitives)
total += mask.sum()
rvals[mask] = np.random.rand(mask.sum())
assert not np.any(np.isnan(rvals)) # TMP
return (rvals, comps) if return_partition else rvals
[docs] def swap_primitive(self, index, ptype=LCLorentzian):
"""Swap the specified primitive for a new one with the parameters
that match the old one as closely as possible."""
self.primitives[index] = convert_primitive(self.primitives[index], ptype)
[docs] def delete_primitive(self, index, inplace=False):
"""Return a new LCTemplate with the primitive at index removed.
The flux is renormalized to preserve the same pulsed ratio (in the
case of an energy-dependent template, at the pivot energy).
"""
norms, prims = self.norms, self.primitives
if len(prims) == 1:
raise ValueError("Template only has a single primitive.")
if index < 0:
index += len(prims)
newprims = [deepcopy(p) for ip, p in enumerate(prims) if index != ip]
newnorms = self.norms.delete_component(index)
if not inplace:
return LCTemplate(newprims, newnorms)
self.primitives = newprims
self.norms = newnorms
[docs] def add_primitive(self, prim, norm=0.1, inplace=False):
"""[Convenience] -- return a new LCTemplate with the specified
LCPrimitive added and renormalized."""
norms, prims = self.norms, self.primitives
if len(prims) == 0:
# special case of an empty profile
return LCTemplate([prim], [1])
nprims = [deepcopy(prims[i]) for i in range(len(prims))] + [prim]
nnorms = self.norms.add_component(norm)
if not inplace:
return LCTemplate(nprims, nnorms)
self.norms = nnorms
self.primitives = nprims
[docs] def order_primitives(self, order=0):
"""Re-order components in place.
order == 0: order by ascending position
order == 1: order by descending maximum amplitude
order == 2: order by descending normalization
"""
if order == 0:
indices = np.argsort([p.get_location() for p in self.primitives])
elif order == 1:
indices = np.argsort(self.get_amplitudes())[::-1]
elif order == 2:
indices = np.argsort(self.norms())[::-1]
else:
raise NotImplementedError("Specified order not supported.")
self.primitives = [self.primitives[i] for i in indices]
self.norms.reorder_components(indices)
def get_fixed_energy_version(self, log10_en=3):
return self
def add_energy_dependence(self, index, slope_free=True):
comp = self[index]
if comp.is_energy_dependent():
return
if comp.name == "NormAngles":
# normalization
newcomp = ENormAngles(self.norms())
else:
# primitive
if comp.name == "Gaussian":
constructor = LCEGaussian
elif comp.name == "VonMises":
constructor = LCEVonMises
else:
raise NotImplementedError(f"{comp.name} not supported.")
newcomp = constructor(p=comp.p)
newcomp.free[:] = comp.free
newcomp.slope_free[:] = slope_free
self[index] = newcomp
[docs] def get_eval_string(self):
"""Return a string that can be "eval"ed to make a cloned set of
primitives and template."""
ps = "\n".join(
("p%d = %s" % (i, p.eval_string()) for i, p in enumerate(self.primitives))
)
prims = f'[{",".join("p%d" % i for i in range(len(self.primitives)))}]'
ns = f"norms = {self.norms.eval_string()}"
return f"{self.__class__.__name__}({prims},norms)"
def closest_to_peak(self, phases):
return min((p.closest_to_peak(phases) for p in self.primitives))
[docs] def mean_value(self, phases, log10_ens=None, weights=None, bins=20):
"""Compute the mean value of the profile over the codomain of
phases. Mean is taken over energy and is unweighted unless
a set of weights are provided."""
if (log10_ens is None) or (not self.is_energy_dependent()):
return self(phases)
if weights is None:
weights = np.ones_like(log10_ens)
edges = np.linspace(log10_ens.min(), log10_ens.max(), bins + 1)
w = np.histogram(log10_ens, weights=weights, bins=edges)
rvals = np.zeros_like(phases)
for weight, en in zip(w[0], (edges[:-1] + edges[1:]) / 2):
rvals += weight * self(phases, en)
rvals /= w[0].sum()
return rvals
def mean_single_component(
self, index, phases, log10_ens=None, weights=None, bins=20, add_pedestal=True
):
prim = self.primitives[index]
if (log10_ens is None) or (not self.is_energy_dependent()):
n = self.norms()
return prim(phases) * n[index] + add_pedestal * (1 - n.sum())
if weights is None:
weights = np.ones_like(log10_ens)
edges = np.linspace(log10_ens.min(), log10_ens.max(), bins + 1)
w = np.histogram(log10_ens, weights=weights, bins=edges)
rvals = np.zeros_like(phases)
for weight, en in zip(w[0], (edges[:-1] + edges[1:]) / 2):
rvals += weight * prim(phases, en) * self.norms(en)[index]
rvals /= w[0].sum()
return rvals
[docs] def rotate(self, dphi):
"""Adjust the template by dphi."""
self.mark_cache_dirty()
log.info(f"Shifting template by {dphi}.")
for prim in self.primitives:
new_location = (prim.get_location() + dphi) % 1
prim.set_location(new_location)
[docs] def get_display_point(self, do_rotate=False):
# TODO -- need to fix this to scan all the way around, either
# from -0.5 to 0 or from 0.5 to 1.0, whichever -- see J0102
"""Return phase shift which optimizes the display of the profile.
This is determined by finding the 60% window which contains the
most flux and returning the left edge. Rotating the profile such
that this edge is at phi=0.20 would then center this interval, so
the resulting phase shift would do that.
"""
N = 50
dom = np.linspace(0, 1, 2 * N + 1)[:-1]
cod = self.integrate(dom, dom + 0.6)
dphi = 0.20 - dom[np.argmax(cod)]
if do_rotate:
self.rotate(dphi)
return dphi
[docs] def write_profile(self, fname, nbin, integral=False, suppress_bg=False):
"""Write out a two-column tabular profile to file fname.
The first column indicates the left edge of the phase bin, while
the right column indicates the profile value.
Parameters
----------
integral : bool
if True, integrate the profile over the bins. Otherwise, differential
value at indicated bin phase.
suppress_bg : bool
if True, do not include the unpulsed (DC) component
"""
if not integral:
bin_phases = np.linspace(0, 1, nbin + 1)[:-1]
bin_values = self(bin_phases, suppress_bg=suppress_bg)
bin_values *= 1.0 / bin_values.mean()
else:
phases = np.linspace(0, 1, 2 * nbin + 1)
values = self(phases, suppress_bg=suppress_bg)
hi = values[2::2]
lo = values[:-1:2]
mid = values[1::2]
bin_phases = phases[:-1:2]
bin_values = 1.0 / (6 * nbin) * (hi + 4 * mid + lo)
bin_values *= 1.0 / bin_values.mean()
open(fname, "w").write(
"".join(("%.6f %.6f\n" % (x, y) for x, y in zip(bin_phases, bin_values)))
)
[docs]def get_gauss2(
pulse_frac=1,
x1=0.1,
x2=0.55,
ratio=1.5,
width1=0.01,
width2=0.02,
lorentzian=False,
bridge_frac=0,
skew=False,
):
"""Return a two-gaussian template. Convenience function."""
n1, n2 = np.asarray([ratio, 1.0]) * (1 - bridge_frac) * (pulse_frac / (1.0 + ratio))
if skew:
prim = LCLorentzian2 if lorentzian else LCGaussian2
p1, p2 = [width1, width1 * (1 + skew), x1], [width2 * (1 + skew), width2, x2]
else:
if lorentzian:
prim = LCLorentzian
width1 *= 2 * np.pi
width2 *= 2 * np.pi
else:
prim = LCGaussian
p1, p2 = [width1, x1], [width2, x2]
if bridge_frac > 0:
nb = bridge_frac * pulse_frac
b = LCGaussian(p=[0.1, (x2 + x1) / 2])
return LCTemplate([prim(p=p1), b, prim(p=p2)], [n1, nb, n2])
return LCTemplate([prim(p=p1), prim(p=p2)], [n1, n2])
[docs]def get_gauss1(pulse_frac=1, x1=0.5, width1=0.01):
"""Return a one-gaussian template. Convenience function."""
return LCTemplate([LCGaussian(p=[width1, x1])], [pulse_frac])
[docs]def get_2pb(pulse_frac=0.9, lorentzian=False):
"""Convenience function to get a 2 Lorentzian + Gaussian bridge template."""
prim = LCLorentzian if lorentzian else LCGaussian
p1 = prim(p=[0.03, 0.1])
b = LCGaussian(p=[0.15, 0.3])
p2 = prim(p=[0.03, 0.55])
return LCTemplate(
primitives=[p1, b, p2],
norms=[0.3 * pulse_frac, 0.4 * pulse_frac, 0.3 * pulse_frac],
)
[docs]def make_twoside_gaussian(one_side_gaussian):
"""Make a two-sided gaussian with the same initial shape as the
input one-sided gaussian."""
g2 = LCGaussian2()
g1 = one_side_gaussian
g2.p[0] = g2.p[1] = g1.p[0]
g2.p[-1] = g1.p[-1]
return g2
[docs]def adaptive_samples(func, npt, log10_ens=3, nres=200):
"""func should have a .cdf method.
NB -- log10_ens needs to be a scalar!
The idea is to return a set of points on [0,1] which are approximately
distributed uniformly in F(phi) and thus more densely sample the
peaks. First, the cdf is evaluated on nres points. Then npt estimates
of the inverse cdf are obtained by linear interpolation.
"""
assert np.isscalar(log10_ens)
x = np.linspace(0, 1, nres)
F = func.cdf(x, log10_ens=log10_ens)
Y = np.linspace(0, 1, npt)
idx = np.searchsorted(F, Y[1:-1])
assert idx.min() > 0
F1 = F[idx]
F0 = F[idx - 1]
m = (F1 - F0) * (1.0 / (x[1] - x[0]))
X1 = x[idx]
X0 = x[idx - 1]
Y[1:-1] = X0 + (Y[1:-1] - F0) / (F1 - F0) * (x[1] - x[0])
return Y
[docs]class GaussianPrior:
def __init__(self, locations, widths, mod, mask=None):
self.x0 = np.where(mod, np.mod(locations, 1), locations)
self.s0 = np.asarray(widths) * 2**0.5
self.mod = np.asarray(mod)
if mask is None:
self.mask = np.asarray([True] * len(locations))
else:
self.mask = np.asarray(mask)
self.x0 = self.x0[self.mask]
self.s0 = self.s0[self.mask]
self.mod = self.mod[self.mask]
def __len__(self):
"""Return number of parameters with a prior."""
return self.mask.sum()
[docs] def __call__(self, parameters):
if not np.any(self.mask):
return 0
parameters = parameters[self.mask]
parameters = np.where(self.mod, np.mod(parameters, 1), parameters)
return np.sum(((parameters - self.x0) / self.s0) ** 2)
def gradient(self, parameters):
if not np.any(self.mask):
return np.zeros_like(parameters)
parameters = parameters[self.mask]
parameters = np.where(self.mod, np.mod(parameters, 1), parameters)
rvals = np.zeros(len(self.mask))
rvals[self.mask] = 2 * (parameters - self.x0) / self.s0**2
return rvals
[docs]def prim_io(template, bound_eps=1e-5):
"""Read files and build LCPrimitives."""
def read_gaussian(toks):
primitives = []
norms = []
for i, tok in enumerate(toks):
if tok[0].startswith("phas"):
g = LCGaussian()
g.p[-1] = float(tok[2])
g.errors[-1] = float(tok[4])
primitives += [g]
elif tok[0].startswith("fwhm"):
g = primitives[-1]
g.p[0] = float(tok[2]) / 2.3548200450309493 # kluge for now
g.errors[0] = float(tok[4]) / 2.3548200450309493
elif tok[0].startswith("ampl"):
norms.append(float(tok[2]))
# check to that bounds are OK
for iprim, prim in enumerate(primitives):
if prim.check_bounds():
continue
for ip, p in enumerate(prim.p):
lo, hi = prim.bounds[ip]
if (p < lo) and (abs(p - lo) < bound_eps):
prim.p[ip] = lo
if (p > hi) and (abs(p - hi) < bound_eps):
prim.p[ip] = hi
if not prim.check_bounds():
raise ValueError("Unrecoverable bounds errors on input.")
# check norms
norms = np.asarray(norms)
n = norms.sum()
if (n > 1) and (abs(n - 1) < bounds_eps):
norms *= 1.0 / n
return primitives, list(norms)
lines = None
try:
with open(template, "r") as f:
lines = f.readlines()
except FileNotFoundError:
lines = template.split("\n")
if lines is None:
raise ValueError("Could not load lines from template.")
toks = [line.strip().split() for line in lines if len(line.strip()) > 0]
label, toks = toks[0], toks[1:]
if "gauss" in label:
return read_gaussian(toks)
elif "kernel" in label:
return [LCKernelDensity(input_file=toks)], None
elif "fourier" in label:
return [LCEmpiricalFourier(input_file=toks)], None
raise ValueError("Template format not recognized!")
[docs]def check_gradient_derivative(templ):
dom = np.linspace(0, 1, 10001)
pcs = 0.5 * (dom[:-1] + dom[1:])
ngd = templ.gradient(dom)
ngd = (ngd[:, 1:] - ngd[:, :-1]) / (dom[1] - dom[0])
gd = templ.gradient_derivative(templ, pcs)
for i in range(gd.shape[0]):
print(np.max(np.abs(gd[i] - ngd[i])))
return pcs, gd, ngd
[docs]def isvector(x):
return len(np.asarray(x).shape) > 0