Source code for pint.templates.lcprimitives

"""Components of a pulsar light curve.

LCPrimitive and its subclasses implement
components of a pulsar light curve.  Includes primitives (Gaussian,
Lorentzian), etc.  as well as more sophisticated holistic templates that
provide single-parameter (location) representations of the light curve.

author: M. Kerr <matthew.kerr@gmail.com>

"""

# NB -- big TODO -- I don't think wrapped primitives quite correctly return
# Monte Carlo variables because they don't account for the uniform approx.
# perhaps this isn't a big deal

from math import atan, cosh, tan

import numpy as np
from scipy.integrate import quad, simps
from scipy.interpolate import interp1d
from scipy.special import erf, i0, i1, owens_t
from scipy.stats import cauchy, norm, vonmises, skewnorm

ROOT2PI = (2 * np.pi) ** 0.5
R2DI = (2 / np.pi) ** 0.5
ROOT2 = 2**0.5
TWOPI = 2 * np.pi
PI = np.pi * 1
MAXWRAPS = 15
MINWRAPS = 3
WRAPEPS = 1e-8


[docs]def isvector(x): return len(np.asarray(x).shape) > 0
[docs]def two_comp_mc(n, w1, w2, loc, func): """Generate MC photons from a two-sided distribution. Helper function. This should work as is if w1,w2,loc are vectors. Parameters ---------- n : int total number of photons w1 : float or array-like scale parameter for func, lefthand peak w2 : float or array-like scale parameter for func, righthand peak loc : float or array-like position of peak func : callable an 'rvs' function from scipy """ frac1 = w1 / (w1 + w2) # number of photons required from left side n1 = (np.random.rand(n) < frac1).sum() r1 = func(loc=0, scale=w1, size=n1) # reflect and relocate photons to right or lef side r1 = loc + np.where(r1 <= 0, r1, -r1) r2 = func(loc=0, scale=w2, size=n - n1) r2 = loc + np.where(r2 > 0, r2, -r2) return np.mod(np.append(r1, r2), 1)
[docs]def approx_gradient(func, phases, log10_ens=None, eps=1e-6): """Return a numerical gradient for LCPrimitive and LCTemplate objects. This is "dTemplate/dParam". """ orig_p = func.get_parameters(free=False).copy() g = np.zeros([len(orig_p), len(phases)]) weights = np.asarray([-1, 8, -8, 1]) / (12 * eps) def do_step(which, eps): p0 = orig_p.copy() p0[which] += eps func.set_parameters(p0, free=False) return func(phases, log10_ens=log10_ens) for i in range(len(orig_p)): # use a 4th-order central difference scheme for step, w in zip([2, 1, -1, -2], weights): g[i, :] += w * do_step(i, step * eps) func.set_parameters(orig_p, free=False) return g
[docs]def approx_hessian(func, phases, log10_ens=None, eps=1e-5): """Return a numerical hessian for LCPrimitive and LCTemplate objects. This is d^2Template/dParam1dParam2. """ orig_p = func.get_parameters(free=False).copy() g = np.zeros([len(orig_p), len(orig_p), len(phases)]) weights = np.asarray([1, -1, -1, 1]) / (4 * eps**2) def do_step(which, eps): which1, which2 = which eps1, eps2 = eps p0 = orig_p.copy() p0[which1] += eps1 p0[which2] += eps2 func.set_parameters(p0, free=False) return func(phases, log10_ens=log10_ens) steps = np.asarray([[1, 1], [1, -1], [-1, 1], [-1, -1]]) * eps for i in range(len(orig_p)): for j in range(len(orig_p)): # use a 2th-order central difference scheme for weight, step in zip(weights, steps): g[i, j, :] += weight * do_step((i, j), step) func.set_parameters(orig_p, free=False) return g
[docs]def approx_derivative(func, phases, log10_ens=None, order=1, eps=1e-7): """Return numerical derivative for LCPrimitive and LCTemplate objects. This is "dTemplate/dPhi." """ if order not in [1, 2]: raise NotImplementedError("Only 1st and 2nd derivs supported.") phhi = np.mod(phases + eps, 1) Fhi = func(phhi, log10_ens=log10_ens) phlo = np.mod(phases - eps, 1) Flo = func(phlo, log10_ens=log10_ens) if order == 1: return (Fhi - Flo) * (0.5 / eps) Fmid = func(phases, log10_ens=log10_ens) return (Fhi + Flo - 2 * Fmid) * (1.0 / eps**2)
[docs]def check_gradient(func, n=1000, atol=1e-8, rtol=1e-5, quiet=False, ph=None, en=None): """Test gradient function with a set of MC photons. This works with either LCPrimitive or LCTemplate objects. TODO -- there is trouble with the numerical gradient when a for the location-related parameters when the finite step causes the peak to shift from one side of an evaluation phase to the other.""" if en is None: en = np.random.rand(n) * 3 + 1 # 10 MeV to 10 GeV if ph is None: ph = func.random(n, log10_ens=en) assert len(en) == len(ph) if hasattr(func, "closest_to_peak"): eps = min(1e-6, 0.2 * func.closest_to_peak(ph)) else: eps = 1e-6 try: g1, t1 = func.gradient(ph, log10_ens=en, free=False, template_too=True) vals = func(ph, log10_ens=en) if not np.allclose(vals, t1): print("template_too value failed") return False # except AttributeError: except TypeError: g1 = func.gradient(ph, log10_ens=en, free=False) if np.any(np.isnan(g1)): print("Found NaN in gradient.") return False g2 = func.approx_gradient(ph, log10_ens=en, eps=eps) anyfail = False for i in range(g1.shape[0]): # loop over params d1 = np.abs(g1[i] - g2[i]) fail = np.any(d1 > (atol + rtol * np.abs(g2[i]))) if not quiet: pass_string = "FAILED" if fail else "passed" print("%02d (%s) %.3g (abs)" % (i, pass_string, d1.max())) anyfail = anyfail or fail return not anyfail
[docs]def check_derivative( func, n=1000, atol=1e-8, rtol=1e-5, order=1, eps=1e-7, quiet=False ): """Test derivative function with a set of MC photons. This works with either LCPrimitive or LCTemplate objects. """ en = np.random.rand(n) * 3 + 1 # 10 MeV to 10 GeV ph = func.random(n, log10_ens=en) g1 = func.derivative(ph, log10_ens=en, order=order) if np.any(np.isnan(g1)): print("Found NaN in derivative.") return False g2 = func.approx_derivative(ph, log10_ens=en, eps=eps, order=order) d1 = np.abs(g1 - g2) if order > 1: print( "Warning! numerical 2nd derivatives are not very accurate and should be assessed subjectively." ) return not np.any(d1 > (atol + rtol * np.abs(g1)))
[docs]class LCPrimitive: """Base class for various components of a light curve. All "analytic" light curve models must inherit and must implement the 'virtual' functions: * __call__ : evaluate the primitive at provided phases * gradient : evaluate the primitive derivative wrt the parameters * hwhm : convert between internal parameters and empirical HWMM Specialized class initialization is handled via inheriting init. """ def __init__(self, **kwargs): """Generally, class-specific setup work is performed in init. Here, init is called and certain guaranteed default members are established.""" self.init() self._einit() self.errors = np.zeros_like(self.p) self.parse_kwargs(kwargs) if not hasattr(self, "free"): self.free = np.asarray([True] * len(self.p)) if not hasattr(self, "bounds"): self.bounds = self._default_bounds() # default self._asarrays() ( self.gauss_prior_loc, self.gauss_prior_width, self.gauss_prior_enable, ) = self._default_priors() self.shift_mode = False assert len(self.p) == len(self.free) if hasattr(self, "slope"): assert len(self.slope) == len(self.p) assert len(self.slope_free) == len(self.p) if not self.check_bounds(): raise ValueError("Supplied parameter values were outside bounds.") def _einit(self): pass def parse_kwargs(self, kwargs): # acceptable keyword arguments, can be overridden by children recognized_kwargs = ["p", "free"] for key in kwargs.keys(): if key not in recognized_kwargs: raise ValueError(f"kwarg {key} not recognized") self.__dict__.update(kwargs)
[docs] def __call__(self, phases): raise NotImplementedError( "Virtual function must be implemented by child class." )
def num_parameters(self, free=True): return np.sum(self.free) if free else len(self.free)
[docs] def get_free_mask(self): """Return a mask with True if parameters are free, else False.""" return self.free
def is_energy_dependent(self): return False
[docs] def is_two_sided(self): """True if primitive is asymmetric. Default is False, two-sided child classes should override.""" return False
def copy(self): from copy import deepcopy return deepcopy(self)
[docs] def integrate(self, x1=0, x2=1, log10_ens=3): """Base implemention with scipy quad.""" if not isvector(log10_ens): f = lambda ph: self(ph, log10_ens) return quad(f, x1, x2)[0] else: rvals = np.empty_like(log10_ens) for i in range(len(log10_ens)): f = lambda ph: self(ph, log10_ens[i]) rvals[i] = quad(f, x1, x2)[0] return rvals
def cdf(self, x, log10_ens=3): return self.integrate(x1=0, x2=x, log10_ens=3)
[docs] def fwhm(self): """Return the full-width at half-maximum of the light curve model.""" return self.hwhm(0) + self.hwhm(1)
[docs] def hwhm(self, right=False): """Return the half-width at half-maximum of the light curve model.""" raise NotImplementedError( "Virtual function must be implemented by child class." )
def init(self): self.p = np.asarray([1]) self.pnames = [] self.name = "Default" self.shortname = "None" def _asarrays(self): for key in ["p", "free", "bounds", "errors", "slope", "slope_free"]: if hasattr(self, key): v = self.__dict__[key] if v is not None: self.__dict__[key] = np.asarray( v, dtype=bool if "free" in key else float ) def _default_bounds(self): bounds = [[]] * len(self.p) # this order works for LCHarmonic, too bounds[0] = [0.005, 0.5] # width bounds[-1] = [-1, 1] # position if len(bounds) > 2: bounds[1] = [0.005, 0.5] # width return bounds def _default_priors(self): loc = self.p.copy() width = np.asarray([0.1] * len(self.p)) enable = np.asarray([False] * len(self.p)) return loc, width, enable def _make_p(self, log10_ens=3): """Internal method to return parameters appropriate for use in functional form.""" return [None] + list(self.p) def set_parameters(self, p, free=True): if free: self.p[self.free] = p else: self.p[:] = p # adjust position to be between 0 and 1 self.p[-1] = self.p[-1] % 1 # return np.all(self.p>=self.bounds[:,0]) and np.all(self.p<=self.bounds[:,1]) def get_parameters(self, free=True): return self.p[self.free] if free else self.p def get_parameter_names(self, free=True): return [p for (p, b) in zip(self.pnames, self.free) if b] def set_errors(self, errs): n = self.free.sum() self.errors[:] = 0.0 self.errors[self.free] = errs[:n] return n def get_errors(self, free=True): return self.errors[self.free] if free else self.errors def get_bounds(self, free=True): return np.asarray(self.bounds)[self.free] if free else self.bounds def check_bounds(self, p=None): b = np.asarray(self.bounds) p = p or self.p return np.all((p >= b[:, 0]) & (p <= b[:, 1])) def get_gauss_prior_parameters(self): mod_array = [False] * (len(self.p) - 1) + [True] return ( self.gauss_prior_loc[self.free], self.gauss_prior_width[self.free], np.asarray(mod_array)[self.free], self.gauss_prior_enable[self.free], )
[docs] def enable_gauss_prior(self, enable=True): """[Convenience] Turn on gaussian prior.""" self.gauss_prior_enable[:] = enable
[docs] def center_gauss_prior(self, enable=False): """[Convenience] Set gauss mode to current params.""" self.gauss_prior_loc[:] = self.p[:] if enable: self.enable_gauss_prior()
def get_location(self, error=False): return np.asarray([self.p[-1], self.errors[-1]]) if error else self.p[-1] def set_location(self, loc): self.p[-1] = loc def get_norm(self, error=False): # if error: return np.asarray([self.p[0],self.errors[0]]) # return self.p[0] return 1
[docs] def get_width(self, error=False, hwhm=False, right=False): """Return the width of the distribution. Parameters ---------- error : bool if True, return tuple with value and error hwhm : bool if True, scale width to be HWHM right : bool if True, return "right" component, else "left". There is no distinction for symmetric dists. """ scale = self.hwhm(right=right) / self.p[int(right)] if hwhm else 1 if error: return np.asarray([self.p[int(right)], self.errors[int(right)]]) * scale return self.p[int(right)] * scale
[docs] def gradient(self, phases, log10_ens=3, free=False): """Return the gradient of the primitives wrt the parameters.""" raise NotImplementedError("No gradient function found for this object.")
[docs] def gradient_derivative(self, phases, log10_ens=3, free=False): """Return d/dphi(gradient). This is needed for computing the hessian of the profile for parameters that affect the timing model and hence pulse phase. """ raise NotImplementedError( "No gradient_derivative function found for this object." )
[docs] def derivative(self, phases, log10_ens=3, order=1): """Return d^np(phi)/dphi^n, with n=order.""" raise NotImplementedError("No derivative function found for this object.")
[docs] def random(self, n, log10_ens=3): """Default is accept/reject.""" if n < 1: return 0 if isvector(log10_ens): assert n == len(log10_ens) edep_ps = self._make_p(log10_ens) locs = edep_ps[-1] # this should be the peak at each energy M = self(locs, log10_ens=log10_ens) # this is the (edep) maximum rvals = np.empty(n) rfunc = np.random.rand mask = np.ones(n, dtype=bool) indices = np.arange(n) while True: N = mask.sum() if N == 0: break cand_phases = rfunc(N) if isvector(M): accept = ( rfunc(N) < self(cand_phases, log10_ens=log10_ens[mask]) / M[mask] ) elif isvector(log10_ens): accept = rfunc(N) < self(cand_phases, log10_ens=log10_ens[mask]) / M else: accept = rfunc(N) < self(cand_phases, log10_ens=log10_ens) / M rvals[indices[mask][accept]] = cand_phases[accept] mask[indices[mask][accept]] = False return rvals
def __str__(self): m = max(len(n) for n in self.pnames) l = [] errors = self.errors if hasattr(self, "errors") else [0] * len(self.pnames) for i in range(len(self.pnames)): fstring = "" if self.free[i] else " [FIXED]" n = self.pnames[i][:m] t_n = n + (m - len(n)) * " " l += [t_n + ": %.4f +\\- %.4f%s" % (self.p[i], errors[i], fstring)] l = [self.name + "\n------------------"] + l return "\n".join(l) def approx_gradient(self, phases, log10_ens=3, eps=1e-5): return approx_gradient(self, phases, log10_ens, eps=eps) def approx_hessian(self, phases, log10_ens=None, eps=1e-5): return approx_hessian(self, phases, log10_ens=log10_ens, eps=eps) def approx_derivative(self, phases, log10_ens=None, order=1, eps=1e-7): return approx_derivative( self, phases, log10_ens=log10_ens, order=order, eps=eps ) def check_gradient(self, atol=1e-8, rtol=1e-5, quiet=False): return check_gradient(self, atol=atol, rtol=rtol, quiet=quiet)
[docs] def sanity_checks(self, eps=1e-6): """A few checks on normalization, integration, etc.""" errfac = 1 # Normalization test y, ye = quad(self, 0, 1) # t1 = abs(self.p[0]-y)<(ye*errfac) t1 = abs(1 - y) < (ye * errfac) # integrate method test # t2 = abs(self.p[0]-self.integrate(0,1))<eps t2 = abs(1 - self.integrate(0, 1)) < eps # FWHM test t3 = (self(self.p[-1]) * 0.5 - self(self.p[-1] - self.fwhm() / 2)) < eps # gradient test try: t4 = self.check_gradient(quiet=True) except Exception: t4 = False # boundary conditions t5 = abs(self(0) - self(1 - eps)) < eps if not t1: print("Failed Normalization test") if not t2: print("Failed integrate method test") if not t3: print("Failed FWHM test") if not t4: print("Failed gradient test") if not t5: print("Did not pass boundary conditions") return np.all([t1, t2, t3, t4, t5])
[docs] def eval_string(self): """Return a string that can be evaluated to instantiate a nearly- identical object.""" return f'{self.__class__.__name__}(p={list(self.p)},free={list(self.free)},slope={str(list(self.slope)) if hasattr(self, "slope") else None},slope_free={str(list(self.slope_free)) if hasattr(self, "slope_free") else None})'
[docs] def dict_string(self): """Return a string to express the object as a dictionary that can be easily instantiated using its keys.""" def pretty_list(l, places=5): fmt = "%." + "%d" % places + "f" s = ", ".join([fmt % x for x in l]) return f"[{s}]" t = [ f"name = {self.__class__.__name__}", f"p = {pretty_list(self.p)}", f"free = {list(self.free)}", f'slope = {pretty_list(self.slope) if hasattr(self, "slope") else None}', f'slope_free = {str(list(self.slope_free)) if hasattr(self, "slope_free") else None}', ] # return 'dict(\n'+'\n '.join(t)+'\n return t
[docs] def closest_to_peak(self, phases): """Return the minimum distance between a member of the array of phases and the position of the mode of the primitive.""" return np.abs(phases - self.get_location()).min()
def get_fixed_energy_version(self, log10_en=3): return self
[docs]class LCWrappedFunction(LCPrimitive): """Super-class for profiles derived from wrapped functions. While some distributions (e.g. the wrapped normal) converge quickly, others (e.g. the wrapped Lorentzian) converge very slowly and must be truncated before machine precision is reached. In order to preserve normalization, the pdf is slightly adjusted: f(phi) = sum_(i,-N,N,g(phi+i)) + (1 - int(phi,-N,N,g(phi)) ). This introduces an additional parameteric dependence which must be accounted for by computation of the gradient. """ def _norm(self, nwraps, log10_ens=3): """Compute the truncated portion of the template.""" # return self.p[0]-self.base_int(-nwraps,nwraps+1) return 1 - self.base_int(-nwraps, nwraps + 1, log10_ens) def _grad_norm(self, nwraps, log10_ens=3): """Compute the gradient terms due to truncated portion. That is, since we add on a uniform component beyond nwraps, the amplitude of this component depends on the CDF and hence on the parameters. Default implementation is to ignore these terms, applicable for rapidly-converging distributions (e.g. wrapped normal with small width parameter). On the other hand, it is not negligible for long-tailed distributions, e.g. Lorentzians.""" return None def _grad_deriv_norm(self, nwraps, log10_ens=3): return None def _grad_hess(self, nwraps, log10_ens=3): """Compute the hessian terms due to truncated portion. See _grad_norm. """ return None
[docs] def __call__(self, phases, log10_ens=3): """Return wrapped template + DC component corresponding to truncation.""" results = self.base_func(phases, log10_ens) for i in range(1, MAXWRAPS + 1): t = self.base_func(phases, log10_ens, index=i) t += self.base_func(phases, log10_ens, index=-i) results += t if (i >= MINWRAPS) and (np.all(t < WRAPEPS)): return results # TODO -- only return remainder when MAXWRAPS? return results + self._norm(i, log10_ens)
[docs] def gradient(self, phases, log10_ens=3, free=False): """Return the gradient evaluated at a vector of phases. output : a num_parameter x len(phases) ndarray, the num_parameter-dim gradient at each phase """ results = self.base_grad(phases, log10_ens) for i in range(1, MAXWRAPS + 1): t = self.base_grad(phases, log10_ens, index=i) t += self.base_grad(phases, log10_ens, index=-i) results += t if (i >= MINWRAPS) and (np.all(t < WRAPEPS)): break gn = self._grad_norm(i, log10_ens) if gn is not None: for i in range(len(gn)): results[i, :] += gn[i] return results[self.free] if free else results
[docs] def gradient_derivative(self, phases, log10_ens=3, free=False): """Return the gradient evaluated at a vector of phases. output : a num_parameter x len(phases) ndarray, the num_parameter-dim gradient at each phase """ results = self.base_grad_deriv(phases, log10_ens) for i in range(1, MAXWRAPS + 1): t = self.base_grad_deriv(phases, log10_ens, index=i) t += self.base_grad_deriv(phases, log10_ens, index=-i) results += t if (i >= MINWRAPS) and (np.all(t < WRAPEPS)): break gn = self._grad_deriv_norm(i, log10_ens) if gn is not None: for i in range(len(gn)): results[i, :] += gn[i] return results[self.free] if free else results
[docs] def hessian(self, phases, log10_ens=3, free=False): """Return the hessian evaluated at a vector of phases. NB that this is restricted to the sub-space for this primitive. output : a num_parameter x num_parameter x len(phases) ndarray, the num_parameter-dim^2 hessian at each phase """ results = self.base_hess(phases, log10_ens=log10_ens) for i in range(1, MAXWRAPS + 1): t = self.base_hess(phases, log10_ens=log10_ens, index=i) t += self.base_hess(phases, log10_ens=log10_ens, index=-i) results += t if (i >= MINWRAPS) and (np.all(t < WRAPEPS)): break gh = self._grad_hess(i, log10_ens=log10_ens) if gh is not None: raise NotImplementedError # for i in range(len(gn)): # results[i,:] += gn[i] return results[self.free, self.free] if free else results
[docs] def derivative(self, phases, log10_ens=3, order=1): """Return the phase gradient (dprim/dphi) at a vector of phases. order: order of derivative (1=1st derivative, etc.) output : a len(phases) ndarray, dprim/dphi NB this will generally be opposite in sign to the gradient of the location parameter. """ results = self.base_derivative(phases, log10_ens, order=order) for i in range(1, MAXWRAPS + 1): t = self.base_derivative(phases, log10_ens, index=i, order=order) t += self.base_derivative(phases, log10_ens, index=-i, order=order) results += t if (i >= MINWRAPS) and (np.all(t < WRAPEPS)): break return results
[docs] def integrate(self, x1, x2, log10_ens=3): # if(x1==0) and (x2==0): return 1. # NB -- this method is probably overkill now. results = self.base_int(x1, x2, log10_ens, index=0) for i in range(1, MAXWRAPS + 1): t = self.base_int(x1, x2, log10_ens, index=i) t += self.base_int(x1, x2, log10_ens, index=-i) results += t if np.all(t < WRAPEPS): break return results + (x2 - x1) * self._norm(i, log10_ens)
def base_func(self, phases, log10_ens=3, index=0): raise NotImplementedError("No base_func function found for this object.") def base_grad(self, phases, log10_ens=3, index=0): raise NotImplementedError("No base_grad function found for this object.") def base_grad_deriv(self, phases, log10_ens=3, index=0): raise NotImplementedError("No base_grad_deriv function found for this object.") def base_hess(self, phases, log10_ens=3, index=0): raise NotImplementedError("No base_hess function found for this object.") def base_derivative(self, phases, log10_ens=3, index=0, order=1): raise NotImplementedError("No base_derivative function found for this object.") def base_int(self, phases, log10_ens=3, index=0): raise NotImplementedError("No base_int function found for this object.")
[docs]class LCGaussian(LCWrappedFunction): """Represent a (wrapped) Gaussian peak. Parameters Width the standard deviation parameter of the norm dist. Location the mode of the Gaussian distribution """ def init(self): self.p = np.asarray([0.03, 0.5]) self.pnames = ["Width", "Location"] self.name = "Gaussian" self.shortname = "G"
[docs] def hwhm(self, right=False): return self.p[0] * (2 * np.log(2)) ** 0.5
def base_func(self, phases, log10_ens=3, index=0): e, width, x0 = self._make_p(log10_ens) z = (phases + index - x0) / width return (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) def base_grad(self, phases, log10_ens=3, index=0): e, width, x0 = self._make_p(log10_ens) z = (phases + index - x0) / width f = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) return np.asarray([f / width * (z**2 - 1.0), f / width * z]) def base_grad_deriv(self, phases, log10_ens=3, index=0): e, width, x0 = self._make_p(log10_ens) z = (phases + index - x0) / width f = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) q = f / width**2 z2 = z**2 return np.asarray([q * z * (3 - z2), q * (1 - z2)]) def base_hess(self, phases, log10_ens=3, index=0): e, width, x0 = self._make_p(log10_ens=log10_ens) z = (phases + index - x0) / width f = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) q = f / width**2 z2 = z**2 rvals = np.empty((2, 2, len(z))) rvals[0, 0] = q * (z2**2 - 5 * z2 + 2) rvals[0, 1] = q * (z2 - 3) * z rvals[1, 1] = q * (z2 - 1) rvals[1, 0] = rvals[0, 1] return rvals def base_derivative(self, phases, log10_ens=3, index=0, order=1): e, width, x0 = self._make_p(log10_ens) z = (phases + index - x0) / width f = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) if order == 1: return f / (-width) * z elif order == 2: return f / width**2 * (z**2 - 1) else: raise NotImplementedError def base_int(self, x1, x2, log10_ens=3, index=0): e, width, x0 = self._make_p(log10_ens) z1 = (x1 + index - x0) / width z2 = (x2 + index - x0) / width return 0.5 * (erf(z2 / ROOT2) - erf(z1 / ROOT2))
[docs] def random(self, n, log10_ens=3): if isvector(log10_ens) and len(log10_ens) != n: raise ValueError("Provided log10_ens vector does not match requested n.") e, width, x0 = self._make_p(log10_ens) return np.mod(norm.rvs(loc=x0, scale=width, size=n), 1)
[docs]class LCGaussian2(LCWrappedFunction): """Represent a (wrapped) two-sided Gaussian peak. Parameters Width1 the standard deviation parameter of the norm dist. Width2 the standard deviation parameter of the norm dist. Location the mode of the distribution """ def init(self): self.p = np.asarray([0.03, 0.03, 0.5]) self.pnames = ["Width1", "Width2", "Location"] self.name = "Gaussian2" self.shortname = "G2"
[docs] def is_two_sided(self): return True
[docs] def hwhm(self, right=False): return (self.p[int(right)]) * (2 * np.log(2)) ** 0.5
def base_func(self, phases, log10_ens=3, index=0): e, width1, width2, x0 = self._make_p(log10_ens) z = phases + (index - x0) z *= np.where(z <= 0, 1.0 / width1, 1.0 / width2) return (R2DI / (width1 + width2)) * np.exp(-0.5 * z**2) def base_grad(self, phases, log10_ens=3, index=0): e, width1, width2, x0 = self._make_p(log10_ens) z = phases + (index - x0) m = z <= 0 w = np.where(m, width1, width2) z /= w f = (R2DI / (width1 + width2)) * np.exp(-0.5 * z**2) k = 1.0 / (width1 + width2) z2w = z**2 / w t = f * (z2w - k) g1 = f * (z2w * (m) - k) g2 = f * (z2w * (~m) - k) g3 = f * z / w return np.asarray([g1, g2, g3]) def base_int(self, x1, x2, log10_ens=3, index=0): e, width1, width2, x0 = self._make_p(log10_ens) if index == 0 and (x1 < x0) and (x2 > x0): z1 = (x1 + index - x0) / width1 z2 = (x2 + index - x0) / width2 k1 = 2 * width1 / (width1 + width2) k2 = 2 * width2 / (width1 + width2) return 0.5 * (k2 * erf(z2 / ROOT2) - k1 * erf(z1 / ROOT2)) w = width1 if ((x1 + index) < x0) else width2 z1 = (x1 + index - x0) / w z2 = (x2 + index - x0) / w k = 2 * w / (width1 + width2) return 0.5 * k * (erf(z2 / ROOT2) - erf(z1 / ROOT2))
[docs] def random(self, n): """Use multinomial technique to return random photons from both components.""" if hasattr(n, "__len__"): n = len(n) return two_comp_mc(n, self.p[0], self.p[1], self.p[-1], norm.rvs)
[docs]class LCSkewGaussian(LCWrappedFunction): """Represent a (wrapped) skew-normal (Gaussian) peak. Parameters Width the standard deviation parameter of the norm dist. Shape the degree of skewness, +ve values are right-skewed Location the mode of the Gaussian distribution """ def init(self): self.p = np.asarray([0.03, 0, 0.5]) self.pnames = ["Width", "Shape", "Location"] self.name = "SkewGaussian" self.shortname = "GS" def _default_bounds(self): bounds = [[]] * len(self.p) bounds[0] = [0.0015, 1.5] # width bounds[1] = [-10, 10] # asymmetry; this limit avoids sharp edges bounds[2] = [-1, 1] # position return bounds
[docs] def hwhm(self, right=False): raise NotImplementedError
# return self.p[0] * (2 * np.log(2)) ** 0.5 def base_func(self, phases, log10_ens=3, index=0): e, width, shape, x0 = self._make_p(log10_ens) z = (phases + index - x0) / width t1 = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) t2 = 1 + erf(shape * z * 2**-0.5) return t1 * t2 def base_grad(self, phases, log10_ens=3, index=0): # basic idea here: f(z) = 2*phi(z) * Phi(a*z) # df/dz = (-2z)phi(z)Phi(a*z) + (2a)phi(z)phi(a*z) # df/da = 2z*phi(z)*phi(a*z) # dz/dw = -z/w; dz/dx0 = -1/w # so remaining derivatives chain rule from dz/dparam e, width, shape, x0 = self._make_p(log10_ens) z = (phases + index - x0) / width t1 = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) t2 = 1 + erf(shape * z * (1.0 / 2**0.5)) t3 = (1.0 / ROOT2PI) * np.exp(-0.5 * (shape * z) ** 2) f1 = t1 * t2 f2 = t1 * t3 return np.asarray( [ f1 * (z**2 - 1.0) / width - 2 * f2 * z * (shape / width), 2 * f2 * z, f1 * (z / width) - 2 * f2 * (shape / width), ] ) def base_grad_deriv(self, phases, log10_ens=3, index=0): raise NotImplementedError # @abhisrkckl: commented out unreachable code. # e, width, x0 = self._make_p(log10_ens) # z = (phases + index - x0) / width # f = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) # q = f / width**2 # z2 = z**2 # return np.asarray([q * z * (3 - z2), q * (1 - z2)]) def base_hess(self, phases, log10_ens=3, index=0): raise NotImplementedError # @abhisrkckl: commented out unreachable code. # e, width, x0 = self._make_p(log10_ens=log10_ens) # z = (phases + index - x0) / width # f = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) # q = f / width**2 # z2 = z**2 # rvals = np.empty((2, 2, len(z))) # rvals[0, 0] = q * (z2**2 - 5 * z2 + 2) # rvals[0, 1] = q * (z2 - 3) * z # rvals[1, 1] = q * (z2 - 1) # rvals[1, 0] = rvals[0, 1] # return rvals def base_derivative(self, phases, log10_ens=3, index=0, order=1): e, width, shape, x0 = self._make_p(log10_ens) z = (phases + index - x0) / width t1 = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) t2 = 1 + erf(shape * z * (1.0 / 2**0.5)) t3 = (1.0 / ROOT2PI) * np.exp(-0.5 * (shape * z) ** 2) f1 = t1 * t2 f2 = t1 * t3 if order == 1: return -f1 * (z / width) + 2 * f2 * (shape / width) elif order == 2: return f1 * (z**2 - 1) / width**2 - f2 * ( 2 * shape * z / width**2 ) * (2 + shape**2) else: raise NotImplementedError def base_int(self, x1, x2, log10_ens=3, index=0): # the scipy.stats version is SUPER slow, for some reason, so just # calculate it ourselves e, width, shape, x0 = self._make_p(log10_ens) z1 = (x1 + index - x0) / width z2 = (x2 + index - x0) / width norm_1 = 0.5 * (1 + erf(z1 * (1.0 / 2**0.5))) norm_2 = 0.5 * (1 + erf(z2 * (1.0 / 2**0.5))) owens_1 = 2 * owens_t(z1, shape) owens_2 = 2 * owens_t(z2, shape) return (norm_2 - owens_2) - (norm_1 - owens_1)
[docs] def random(self, n, log10_ens=3): if isvector(log10_ens) and len(log10_ens) != n: raise ValueError("Provided log10_ens vector does not match requested n.") e, width, shape, x0 = self._make_p(log10_ens) return np.mod(skewnorm.rvs(shape, loc=x0, scale=width, size=n), 1)
[docs] def mean(self, log10_ens=3): """Return the approximate mode, which is not in general where the loc parameter is. This expression is approximate and taken from wikipedia. """ e, width, shape, x0 = self._make_p(log10_ens) delta = shape / (1 + shape**2) ** 0.5 f = (2 / np.pi) ** 0.5 muz = f * delta return x0 + width * muz
[docs] def mode(self, log10_ens=3): """Return the approximate mode, which is not in general where the loc parameter is. This expression is approximate and taken from wikipedia. """ e, width, shape, x0 = self._make_p(log10_ens) delta = shape / (1 + shape**2) ** 0.5 f = (2 / np.pi) ** 0.5 muz = f * delta sigz = (1 - muz**2) ** 0.5 sgn = np.where(shape >= 0, 1, -1) gam1 = 0.5 * (4 - np.pi) * muz**3 / (1 - muz**2) ** (1.5) m0 = muz - 0.5 * (gam1 * sigz * +sgn * np.exp(-(2 * np.pi) / np.abs(shape))) return x0 + width * m0
[docs]class LCLorentzian(LCPrimitive): """Represent a (wrapped) Lorentzian peak. Parameters ---------- Width the width paramater of the wrapped Cauchy distribution, namely HWHM*2PI for narrow distributions Location the center of the peak in phase """ def init(self): self.p = np.asarray([0.1, 0.5]) self.pnames = ["Width", "Location"] self.name = "Lorentzian" self.shortname = "L"
[docs] def hwhm(self, right=False): # NB -- bounds on p[1] set such that this is well-defined return np.arccos(2 - cosh(self.p[0])) / TWOPI
[docs] def __call__(self, phases, log10_ens=3): e, gamma, loc = self._make_p(log10_ens) z = TWOPI * (phases - loc) # NB -- numpy call not as efficient as math.sinh etc. # but this allows easy inheritance for the energy-dependence return np.sinh(gamma) / (np.cosh(gamma) - np.cos(z))
[docs] def gradient(self, phases, log10_ens=3, free=False): e, gamma, loc = self._make_p(log10_ens) z = TWOPI * (phases - loc) s1 = np.sinh(gamma) c1 = np.cosh(gamma) c = np.cos(z) s = np.sin(z) f = s1 / (c1 - c) f2 = f**2 g1 = f * (c1 / s1) - f2 g2 = f2 * (TWOPI / s1) * s return np.asarray([g1, g2])[self.free] if free else np.asarray([g1, g2])
[docs] def derivative(self, phases, log10_ens=3, index=0, order=1): """Return the phase gradient (dprim/dphi) at a vector of phases. order: order of derivative (1=1st derivative, etc.) output : a len(phases) ndarray, dprim/dphi NB this will generally be opposite in sign to the gradient of the location parameter. """ e, gamma, loc = self._make_p(log10_ens) z = TWOPI * (phases - loc) s1 = np.sinh(gamma) c1 = np.cosh(gamma) c = np.cos(z) s = np.sin(z) f = s1 / (c1 - c) f2 = f**2 if order == 1: return (-TWOPI / s1) * (f**2 * s) elif order == 2: return (-(TWOPI**2) / s1) * f**2 * (c - 2 * f * s**2 / s1) else: raise NotImplementedError
[docs] def random(self, n): if hasattr(n, "__len__"): n = len(n) return np.mod(cauchy.rvs(loc=self.p[-1], scale=self.p[0] / TWOPI, size=n), 1)
[docs] def integrate(self, x1, x2, log10_ens=3): # NB -- due to the use of tans below, must be careful to use an angle # range of -pi/2 to pi/2 rather than 0 to pi as one would want # I haven't carefully tested this solution e, gamma, loc = self._make_p(log10_ens) x1 = PI * (x1 - loc) x2 = PI * (x2 - loc) t = 1.0 / np.tanh(0.5 * gamma) # coth(gamma/2) v2 = np.arctan(t * tan(x2)) / PI v1 = np.arctan(t * tan(x1)) / PI return (v2 <= v1) + v2 - v1 # correction for tan wrapping
[docs]class LCLorentzian2(LCWrappedFunction): """Represent a (wrapped) two-sided Lorentzian peak. Parameters Width1 the HWHM of the distribution (left) Width2 the HWHM of the distribution (right) Location the mode of the distribution """ def init(self): self.p = np.asarray([0.03, 0.03, 0.5]) self.pnames = ["Width1", "Width2", "Location"] self.name = "Lorentzian2" self.shortname = "L2"
[docs] def is_two_sided(self): return True
[docs] def hwhm(self, right=False): return self.p[int(right)]
def _grad_norm(self, nwraps, log10_ens=3): e, gamma1, gamma2, x0 = self._make_p(log10_ens) z1 = (-nwraps - x0) / gamma1 z2 = (nwraps + 1 - x0) / gamma2 t = gamma2 * np.arctan(z2) - gamma1 * np.arctan(z1) t1 = 1.0 / (1 + z1**2) t2 = 1.0 / (1 + z2**2) k = 2 / (gamma1 + gamma2) / PI f = k * t g1 = -1.0 / (gamma1 + gamma2) - (np.arctan(z1) - z1 * t1) / t g2 = -1.0 / (gamma1 + gamma2) + (np.arctan(z2) - z2 * t2) / t g3 = (t1 - t2) / t return [-f * g1, -f * g2, -f * g3] def base_func(self, phases, log10_ens=3, index=0): e, gamma1, gamma2, x0 = self._make_p(log10_ens) z = phases + (index - x0) z *= np.where(z <= 0, 1.0 / gamma1, 1.0 / gamma2) k = 2 / (gamma1 + gamma2) / PI return k / (1 + z**2) def base_grad(self, phases, log10_ens=3, index=0): e, gamma1, gamma2, x0 = self._make_p(log10_ens) z = phases + (index - x0) m = z < 0 g = np.where(m, 1.0 / gamma1, 1.0 / gamma2) t1 = 1 + (z * g) ** 2 t2 = 2 * (z * g) / t1 g1 = -1 / (gamma1 + gamma2) + t2 * ((m * z) / gamma1**2) g2 = -1 / (gamma1 + gamma2) + t2 * ((~m * z) / gamma2**2) g3 = t2 * g f = (2.0 / (gamma1 + gamma2) / PI) / t1 return np.asarray([f * g1, f * g2, f * g3]) def base_derivative(self, phases, log10_ens=3, index=0, order=1): e, gamma1, gamma2, x0 = self._make_p(log10_ens) z = phases + (index - x0) g = np.where(z < 0, 1.0 / gamma1, 1.0 / gamma2) k = 2 / (gamma1 + gamma2) / PI z *= g f = k / (1 + z**2) if order == 1: return f**2 * (-2 / k) * z * g elif order == 2: fprime_on_z = f**2 * (-2 / k) * g return fprime_on_z * g + 2 * (fprime_on_z * z) ** 2 / f else: raise NotImplementedError def base_int(self, x1, x2, log10_ens=3, index=0): gamma1, gamma2, x0 = self.p # the only case where g1 and g2 can be different is if we're on the # 0th wrap, i.e. index=0; this also includes the case when we want # to use base_int to do a "full" integral if index == 0 and (x1 < x0) and (x2 > x0): g1, g2 = gamma1, gamma2 else: g1, g2 = [gamma1] * 2 if ((x1 + index) < x0) else [gamma2] * 2 z1 = (x1 + index - x0) / g1 z2 = (x2 + index - x0) / g2 k = 2.0 / (gamma1 + gamma2) / PI return k * (g2 * atan(z2) - g1 * atan(z1))
[docs] def random(self, n): """Use multinomial technique to return random photons from both components.""" return two_comp_mc(n, self.p[0], self.p[1], self.p[-1], cauchy.rvs)
[docs]class LCVonMises(LCPrimitive): """Represent a peak from the von Mises distribution. This function is used in directional statistics and is naturally wrapped. Parameters: Width inverse of the 'kappa' parameter in the std. def. Location the center of the peak in phase """ def init(self): self.p = np.asarray([0.05, 0.5]) self.pnames = ["Width", "Location"] self.name = "VonMises" self.shortname = "VM" def _default_bounds(self): bounds = [[]] * len(self.p) bounds[0] = [0.0015, 1.5] # width bounds[-1] = [-1, 1] # position return bounds
[docs] def hwhm(self, right=False): return (1.0 / TWOPI) * np.arccos(self.p[0] * np.log(0.5) + 1)
[docs] def __call__(self, phases, log10_ens=3): e, width, loc = self._make_p(log10_ens) z = TWOPI * (phases - loc) kappa = 1.0 / width return np.exp(np.cos(z) * kappa) / i0(kappa)
# TODO -- replace these i0/i1 calls with some thing that can handle # large arguments more gracefully, + cache.
[docs] def gradient(self, phases, log10_ens=3, free=False): e, width, loc = self._make_p(log10_ens) kappa = 1.0 / width I0 = i0(kappa) I1 = i1(kappa) z = TWOPI * (phases - loc) cz = np.cos(z) sz = np.sin(z) f = np.exp(cz * kappa) / I0 q = f * kappa rvals = np.empty([2, len(phases)]) rvals[0] = f * kappa**2 * (I1 / I0 - cz) rvals[1] = f * (TWOPI * kappa) * sz return rvals[self.free] if free else rvals
[docs] def derivative(self, phases, log10_ens=3, order=1): # NB -- same as the (-ve) loc gradient e, width, loc = self._make_p(log10_ens) z = TWOPI * (phases - loc) kappa = 1.0 / width f = np.exp(np.cos(z) * kappa) / i0(kappa) if order == 1: return f * ((-TWOPI) * kappa) * np.sin(z) elif order == 2: return (TWOPI**2 * kappa) * f * (kappa * np.sin(z) ** 2 - np.cos(z)) else: raise NotImplementedError
[docs] def random(self, n, log10_ens=3): if isvector(log10_ens) and len(log10_ens) != n: raise ValueError("Provided log10_ens vector does not match requested n.") e, width, x0 = self._make_p(log10_ens) return np.mod( vonmises.rvs(loc=x0 * TWOPI, kappa=1.0 / width, size=n) * (1.0 / TWOPI), 1 )
[docs] def integrate(self, x0, x1, log10_ens=3): e, width, loc = self._make_p(log10_ens) i0 = vonmises.cdf(x0 * TWOPI, loc=loc * TWOPI, kappa=1.0 / width) i1 = vonmises.cdf(x1 * TWOPI, loc=loc * TWOPI, kappa=1.0 / width) return i1 - i0
[docs]class LCKing(LCWrappedFunction): """Represent a (wrapped) King function peak. Parameters Sigma the width parameter Gamma the tail parameter Location the mode of the distribution """ # NOTES -- because we don't integrate over solid angle, the norm # integral / jacobean for the usual King function isn't trivial; # need to see if this is a show stopper def init(self): self.p = np.asarray([0.03, 0.5]) self.pnames = ["Sigma", "Gamma", "Location"] self.name = "King" self.shortname = "K"
[docs] def hwhm(self, right=False): raise NotImplementedError()
# @abhisrkckl: commented out unreachable code. # return self.p[0] * (2 * np.log(2)) ** 0.5 def base_func(self, phases, log10_ens=3, index=0): e, s, g, x0 = self._make_p(log10_ens) z = phases + index - x0 u = 0.5 * (z / s) ** 2 return (g - 1) / g * (1.0 + u / g) ** -g def base_grad(self, phases, log10_ens=3, index=0): raise NotImplementedError() # @abhisrkckl: commented out unreachable code. # e, width, x0 = self._make_p(log10_ens) # z = (phases + index - x0) / width # f = (1.0 / (width * ROOT2PI)) * np.exp(-0.5 * z**2) # return np.asarray([f / width * (z**2 - 1.0), f / width * z]) def base_int(self, x1, x2, log10_ens=3, index=0): e, s, g, x0 = self._make_p(log10_ens) z1 = x1 + index - x0 z2 = x2 + index - x0 u1 = 0.5 * ((x1 + index - x0) / s) ** 2 u2 = 0.5 * ((x2 + index - x0) / s) ** 2 f1 = 1 - (1.0 + u1 / g) ** (1 - g) f2 = 1 - (1.0 + u2 / g) ** (1 - g) if z1 * z2 < 0: # span the peak return 0.5 * (f1 + f2) return 0.5 * (f1 - f2) if z1 < 0 else 0.5 * (f2 - f1)
[docs] def random(self, n): raise NotImplementedError()
# @abhisrkckl: commented out unreachable code. # if hasattr(n, "__len__"): # n = len(n) # return np.mod(norm.rvs(loc=self.p[-1], scale=self.p[0], size=n), 1)
[docs]class LCTopHat(LCPrimitive): """Represent a top hat function. Parameters: Width right edge minus left edge Location center of top hat """ def init(self): self.p = np.asarray([0.03, 0.5]) self.pnames = ["Width", "Location"] self.name = "TopHat" self.shortname = "TH" self.fwhm_scale = 1
[docs] def hwhm(self, right=False): return self.p[0] / 2
[docs] def __call__(self, phases, wrap=True): width, x0 = self.p return np.where(np.mod(phases - x0 + width / 2, 1) < width, 1.0 / width, 0)
[docs] def random(self, n): if hasattr(n, "__len__"): n = len(n) return np.mod(np.random.rand(n) * self.p[0] + self.p[-1] - self.p[0] / 2, 1)
[docs]class LCHarmonic(LCPrimitive): """Represent a sinusoidal shape corresponding to a harmonic in a Fourier expansion. Parameters: Location the phase of maximum """ def init(self): self.p = np.asarray([0.0]) self.order = 1 self.pnames = ["Location"] self.name = "Harmonic" self.shortname = "H"
[docs] def __call__(self, phases, log10_ens=3): e, x0 = self._make_p(log10_ens) return 1 + 2 * np.cos((TWOPI * self.order) * (phases - x0))
[docs] def integrate(self, x1, x2, log10_ens=3): e, x0 = self._make_p(log10_ens) t = self.order * TWOPI return (x2 - x1) + (np.sin(t * (x2 - x0)) - np.sin(t * (x1 - x0))) / t
[docs]class LCEmpiricalFourier(LCPrimitive): """Calculate a Fourier representation of the light curve. The only parameter is an overall shift. Cannot be used with other LCPrimitive objects! Parameters: Shift : overall shift from original template phase """ def init(self): self.nharm = 20 self.p = np.asarray([0.0]) self.free = np.asarray([True]) self.pnames = ["Shift"] self.name = "Empirical Fourier Profile" self.shortname = "EF" self.shift_mode = True self.bounds = np.asarray([[0, 1]]) def __init__(self, phases=None, input_file=None, **kwargs): """Must provide either phases or a template input file!""" self.init() self.__dict__.update(kwargs) if input_file is not None: self.from_file(input_file) if phases is not None: self.from_phases(phases) def from_phases(self, phases): n = float(len(phases)) harmonics = np.arange(1, self.nharm + 1) * (2 * np.pi) self.alphas = np.asarray([(np.cos(k * phases)).sum() for k in harmonics]) self.betas = np.asarray([(np.sin(k * phases)).sum() for k in harmonics]) self.alphas /= n self.betas /= n self.harmonics = harmonics def from_file(self, input_file): if type(input_file) == type(""): toks = [ line.strip().split() for line in open(input_file) if len(line.strip()) > 0 and "#" not in line ] else: toks = input_file alphas = [] betas = [] for tok in toks: if len(tok) != 2: continue try: a = float(tok[0]) b = float(tok[1]) alphas += [a] betas += [b] except: pass n = len(alphas) self.alphas = np.asarray(alphas) self.betas = np.asarray(betas) self.nharm = n self.harmonics = np.arange(1, n + 1) * (2 * np.pi) def to_file(self, output_file): f = open(output_file, "w") f.write("# fourier\n") for i in range(self.nharm): f.write("%s\t%s\n" % (self.alphas[i], self.betas[i]))
[docs] def __call__(self, phases, log10_ens=None): """NB energy-evolution currently not supported.""" shift = self.p[0] harm = self.harmonics if shift != 0: """shift theorem, for real coefficients It's probably a wash whether it is faster to simply subtract from the phases, but it's more fun this way!""" c = np.cos(harm * shift) s = np.sin(harm * shift) a = c * self.alphas - s * self.betas b = s * self.alphas + c * self.betas else: a, b = self.alphas, self.betas ak = np.asarray([np.cos(phases * k) for k in harm]).transpose() bk = np.asarray([np.sin(phases * k) for k in harm]).transpose() return 1 + 2 * (a * ak + b * bk).sum(axis=1)
[docs] def integrate(self, x1, x2): """The Fourier expansion by definition includes the entire signal, so the norm is always unity.""" return 1
[docs]class LCKernelDensity(LCPrimitive): """Calculate a kernel density estimate of the light curve. The bandwidth is empirical, determined from examining several pulsars. The only parameter is an overall shift. Cannot be used with other LCPrimitive objects! Parameters: Shift : overall shift from original template phase """ def init(self): self.bw = None self.use_scale = True self.max_contrast = 1 self.resolution = 0.001 # interpolation sampling resolution self.p = np.asarray([0.0]) self.free = np.asarray([True]) self.pnames = ["Shift"] self.name = "Gaussian Kernel Density Estimate" self.shortname = "KD" self.shift_mode = True def __init__(self, phases=None, input_file=None, **kwargs): """Must provide either phases or a template input file!""" self.init() self.__dict__.update(kwargs) if input_file is not None: self.from_file(input_file) if phases is not None: self.from_phases(phases) def from_phases(self, phases): n = len(phases) # put in "ideal" HE bins after initial calculation of pulsed fraction # estimate pulsed fraction h = np.histogram(phases, bins=100) o = np.sort(h[0]) p = ( float((o[o > o[15]] - o[15]).sum()) / o.sum() ) # based on ~30% clean offpulse b = o[15] if self.bw is None: self.bw = (0.5 * (p**2 * n) ** -0.2) / (2 * np.pi) print(p, self.bw) local_p = np.maximum(h[0] - b, 0).astype(float) / h[0] print(local_p, b) bgbw = ((1 - p) ** 2 * n) ** -0.2 / (2 * np.pi) print(bgbw) self.bw = np.minimum((local_p**2 * h[0]) ** -0.2 / 100.0, bgbw) keys = np.searchsorted(h[1], phases) keys[keys == len(h[0])] = len(h[0]) - 1 bw = self.bw[keys] print(len(phases), len(bw), type(bw)) phases = phases.copy() self.phases = phases self.phases.sort() phases = np.asarray(phases) self.phases = np.asarray(phases) print(type(self.phases), type(phases)) hi_mask = np.asarray(phases > 0.9) lo_mask = np.asarray(phases < 0.1) self.num = len(phases) self.phases = np.concatenate([phases[hi_mask] - 1, phases]) self.phases = np.concatenate([self.phases, 1 + phases[lo_mask]]) print(len(hi_mask), type(hi_mask), type(bw), len(bw)) self.bw = np.concatenate([bw[hi_mask], bw]) self.bw = np.concatenate([self.bw, bw[lo_mask]]) # if self.bw is None: # self.bw = len(phases)**-0.5 dom = np.linspace(0, 1, int(1.0 / self.resolution)) vals = self.__all_phases__(dom) ip = interp1d(dom, vals) mask = (self.phases > 0) & (self.phases < 1) """ # this is a scaling that somehow works very well... vals = ip(self.phases[mask]) scale = vals/(vals.max()-vals.min())*self.max_contrast #scale = scale**2 #scale = (vals/vals.min())**1.5 if self.use_scale: bw = self.bw / scale else: bw = self.bw * np.ones(len(vals)) #bw = np.maximum(bw,self.resolution) """ hi_mask = self.phases[mask] > 0.9 lo_mask = self.phases[mask] < 0.1 self.bw = np.concatenate([bw[hi_mask], bw]) self.bw = np.concatenate([self.bw, bw[lo_mask]]) vals = self.__all_phases__(dom) # with new bandwidth self.interpolator = interp1d(dom, vals) self.xvals, self.yvals = dom, vals def __all_phases__(self, phases): return np.asarray( [ (np.exp(-0.5 * ((ph - self.phases) / self.bw) ** 2) / self.bw).sum() for ph in phases ] ) / ((2 * np.pi) ** 0.5 * self.num) def from_file(self, input_file): if type(input_file) == type(""): toks = [ line.strip().split() for line in open(input_file) if len(line.strip()) > 0 and "#" not in line ] else: toks = input_file xvals, yvals = np.asarray(toks).astype(float).transpose() self.xvals, self.yvals = xvals, yvals self.interpolator = interp1d(xvals, yvals)
[docs] def __call__(self, phases): shift = self.p[0] if shift == 0: return self.interpolator(phases) # think this sign convention consistent with other classes - check. phc = np.mod(phases.copy() - shift, 1) """ MTK changed 25 Jul 2011 if shift >= 0 : phc[phc<0] += 1 else: phc[phc > 1] -= 1 """ return self.interpolator(phc)
def to_file(self, output_file): f = open(output_file, "w") f.write("# kernel\n") for i in range(len(self.xvals)): f.write("%s\t%s\n" % (self.xvals[i], self.yvals[i]))
[docs] def integrate(self, x1=0, x2=1): if (x1 == 0) and (x2 == 1): return 1.0 # crude nearest neighbor approximation x = self.interpolator.x y = self.interpolator.y mask = (x >= x1) & (x <= x2) return simps(y[mask], x=x[mask])
# return self.interpolator.y[mask].sum()/len(mask)
[docs]def convert_primitive(p1, ptype=LCLorentzian): """Attempt to set the parameters of p2 to give a comparable primitive to p1.""" p2 = ptype() edep = p1.is_energy_dependent() and p2.is_energy_dependent() two_to_one = (len(p2.p) == 2) and (len(p1.p) == 3) one_to_two = (len(p2.p) == 3) and (len(p1.p) == 2) # handle VonMises as a special case since width is complicated if "VonMises" in p2.name: width = (np.cos(p1.hwhm() * (2 * np.pi)) - 1) / np.log(0.5) p2.p[0] = width p2.p[-1] = p1.p[-1] if edep: p2.free[-1] = p1.free[-1] p2.slope[-1] = p1.slope[-1] p2.slope_free[-1] = p1.slope_free[-1] p2.free[0] = p1.free[0] p2.slope[0] = p1.slope[0] * width / p1.p[0] p2.slope_free[0] = p1.slope_free[0] if two_to_one: if p1.free[1]: p2.free[0] = True if p1.slope_free[1]: p2.slope_free[0] = True assert p2.check_bounds() return p2 # get a rough idea of scale first for nonlinear cases p2_scale = p2.p[0] / p2.hwhm() # set position p2.p[-1] = p1.p[-1] # set width # default, 1->1 conversion p2.p[0] = p2_scale * p1.hwhm() p2.free[0] = p1.free[0] if edep: p2.slope[0] = p2_scale * p1.hwhm() * p1.slope[0] p2.slope_free[0] = p1.slope_free[0] p2.slope_free[-1] = p1.slope_free[-1] p2.slope[-1] = p1.slope[-1] # if we are going from 2->1, use mean of widths if two_to_one: p2.p[0] = p2_scale * (p1.hwhm(right=False) + p1.hwhm(right=True)) / 2 p2.free[0] = p1.free[0] or p1.free[1] if edep: p2.slope_free[0] = p1.slope_free[0] or p1.slope_free[1] elif one_to_two: p2.p[1] = p2.p[0] p2.slope[1] = p2.slope[0] p2.slope_free[1] = p2.slope[0] # special case of going from gauss to Lorentzian # this makes peaks closer in nature than going by equiv HWHM if "Gaussian" in str(type(p1)) and "Lorentzian" in str(type(p2)): scale = p2(p1.p[-1]) / p1(p1.p[-1]) p2.p[0] *= scale if len(p2.p) == 3: p2.p[1] *= scale if p1.is_energy_dependent() and p2.is_energy_dependent(): p2.slope[0] = p1.slope[0] * p2_scale assert p2.check_bounds() return p2
[docs]class FastBessel(object): """Special class to quickly evaluate modified Bessel function.""" def __init__(self, order=0): if order == 0: func = i0 elif order == 1: func = i1 else: raise NotImplementedError x = np.logspace(-1, 3.5, 2001) y = np.empty_like(x) mask = x < 700 y[mask] = np.log(func(x[mask])) mask = ~mask y[mask] = x[mask] + np.log( (1 + (4 * order**2 - 1) / (8 * x[mask])) / (2 * np.pi * x[mask]) ** 0.5 ) self._y = y self._x = np.log(x) self._interp = interp1d(self._x, self._y)
[docs] def __call__(self, x): return np.exp(self._interp(np.log(x)))