Source code for pint.models.noise_model

"""Pulsar timing noise models."""

import copy
import warnings

import astropy.units as u
import numpy as np

from loguru import logger as log

from pint.models.parameter import floatParameter, maskParameter
from pint.models.timing_model import Component


[docs]class NoiseComponent(Component): def __init__( self, ): super().__init__() self.covariance_matrix_funcs = [] self.scaled_toa_sigma_funcs = [] # Need to move this to a special place. self.scaled_dm_sigma_funcs = [] # TODO This works right now. But if we want to expend noise model, we # need to think about the design now. If we do not define the list # here and calling the same name from other component, it will get # it from the component that hosts it. It has the risk to duplicate # the list elements. self.dm_covariance_matrix_funcs_component = [] self.basis_funcs = []
[docs]class ScaleToaError(NoiseComponent): """Correct reported template fitting uncertainties. Parameters supported: .. paramtable:: :class: pint.models.noise_model.ScaleToaError Note ---- Ref: NANOGrav 11 yrs data """ register = True category = "scale_toa_error" introduces_correlated_errors = False def __init__( self, ): super().__init__() self.add_param( maskParameter( name="EFAC", units="", aliases=["T2EFAC", "TNEF"], description="A multiplication factor on the measured TOA uncertainties,", ) ) self.add_param( maskParameter( name="EQUAD", units="us", aliases=["T2EQUAD"], description="An error term added in quadrature to the scaled (by EFAC) TOA uncertainty.", ) ) self.add_param( maskParameter( name="TNEQ", units=u.LogUnit(physical_unit=u.second), description="An error term added in quadrature to the scaled (by EFAC) TOA uncertainty in units of log10(second).", ) ) self.covariance_matrix_funcs += [self.sigma_scaled_cov_matrix] self.scaled_toa_sigma_funcs += [self.scale_toa_sigma] self.toasigma_deriv_funcs = {}
[docs] def setup(self): super().setup() self.EFACs = {} self.EQUADs = {} self.TNEQs = {} for mask_par in self.get_params_of_type("maskParameter"): if mask_par.startswith("EFAC"): par = getattr(self, mask_par) self.EFACs[mask_par] = (par.key, par.key_value) elif mask_par.startswith("EQUAD"): par = getattr(self, mask_par) self.EQUADs[mask_par] = (par.key, par.key_value) elif mask_par.startswith("TNEQ"): par = getattr(self, mask_par) self.TNEQs[mask_par] = (par.key, par.key_value) else: continue # convert all the TNEQ to EQUAD for tneq, value in self.TNEQs.items(): tneq_par = getattr(self, tneq) if tneq_par.key is None: continue if value in list(self.EQUADs.values()): log.warning( f"'{tneq} {tneq_par.key} {tneq_par.key_value}' is provided by parameter EQUAD, using EQUAD instead. " ) else: EQUAD_name = f"EQUAD{str(tneq_par.index)}" if EQUAD_name not in list(self.EQUADs.keys()): self.add_param( maskParameter( name="EQUAD", units="us", index=tneq_par.index, aliases=["T2EQUAD"], description="An error term added in quadrature to the scaled (by EFAC) TOA uncertainty.", ) ) EQUAD_par = getattr(self, EQUAD_name) EQUAD_par.quantity = tneq_par.quantity.to(u.us) EQUAD_par.key_value = tneq_par.key_value EQUAD_par.key = tneq_par.key for pp in self.params: if pp.startswith("EQUAD"): par = getattr(self, pp) self.EQUADs[pp] = (par.key, par.key_value) for ef in self.EFACs: self.register_toasigma_deriv_funcs(self.d_toasigma_d_EFAC, ef) for eq in self.EQUADs: self.register_toasigma_deriv_funcs(self.d_toasigma_d_EQUAD, eq)
def register_toasigma_deriv_funcs(self, func, param): pn = self.match_param_aliases(param) if pn not in list(self.toasigma_deriv_funcs.keys()): self.toasigma_deriv_funcs[pn] = [func] elif func in self.toasigma_deriv_funcs[pn]: return else: self.toasigma_deriv_funcs[pn] += [func]
[docs] def validate(self): super().validate() # check duplicate for el in ["EFACs", "EQUADs"]: l = list(getattr(self, el).values()) if [x for x in l if l.count(x) > 1] != []: raise ValueError(f"'{el}' have duplicated keys and key values.")
def scale_toa_sigma(self, toas, warn=True): sigma_scaled = toas.table["error"].quantity.copy() for equad_name in self.EQUADs: equad = getattr(self, equad_name) if equad.quantity is None: continue mask = equad.select_toa_mask(toas) if np.any(mask): sigma_scaled[mask] = np.hypot(sigma_scaled[mask], equad.quantity) elif warn: warnings.warn(f"EQUAD {equad} has no TOAs") for efac_name in self.EFACs: efac = getattr(self, efac_name) mask = efac.select_toa_mask(toas) if np.any(mask): sigma_scaled[mask] *= efac.quantity elif warn: warnings.warn(f"EFAC {efac} has no TOAs") return sigma_scaled def sigma_scaled_cov_matrix(self, toas): scaled_sigma = self.scale_toa_sigma(toas).to(u.s).value ** 2 return np.diag(scaled_sigma) def d_toasigma_d_EFAC(self, toas, param): par = getattr(self, param) mask = par.select_toa_mask(toas) result = np.zeros(len(toas)) << u.s result[mask] = self.scale_toa_sigma(toas[mask], warn=False).to( u.s ) / par.quantity.to(u.dimensionless_unscaled) return result def d_toasigma_d_EQUAD(self, toas, param): par = getattr(self, param) mask = par.select_toa_mask(toas) toas_mask = toas[mask] result = np.zeros(len(toas)) << u.dimensionless_unscaled sigma_mask = self.scale_toa_sigma(toas_mask, warn=False) sigma2_mask_noefac = toas_mask.get_errors().to(u.s) ** 2 for equad_name in self.EQUADs: equad = getattr(self, equad_name) if equad.quantity is None: continue eqmask = equad.select_toa_mask(toas_mask) if np.any(eqmask): sigma2_mask_noefac[eqmask] += equad.quantity**2 result[mask] = (sigma_mask * par.quantity / sigma2_mask_noefac).to( u.dimensionless_unscaled ) return result
[docs]class ScaleDmError(NoiseComponent): """Correction for estimated wideband DM measurement uncertainty. Parameters supported: .. paramtable:: :class: pint.models.noise_model.ScaleDmError Note ---- Ref: NANOGrav 12.5 yrs wideband data """ register = True category = "scale_dm_error" introduces_correlated_errors = False def __init__( self, ): super().__init__() self.add_param( maskParameter( name="DMEFAC", units="", description="A multiplication factor on the measured DM uncertainties,", ) ) self.add_param( maskParameter( name="DMEQUAD", units="pc / cm ^ 3", description="An error term added in quadrature to the scaled (by EFAC) TOA uncertainty.", ) ) self.dm_covariance_matrix_funcs_component = [self.dm_sigma_scaled_cov_matrix] self.scaled_dm_sigma_funcs += [self.scale_dm_sigma] self._paired_DMEFAC_DMEQUAD = None
[docs] def setup(self): super().setup() # Get all the EFAC parameters and EQUAD self.DMEFACs = {} self.DMEQUADs = {} for mask_par in self.get_params_of_type("maskParameter"): if mask_par.startswith("DMEFAC"): par = getattr(self, mask_par) if par.key is not None: self.DMEFACs[mask_par] = (par.key, tuple(par.key_value)) elif mask_par.startswith("DMEQUAD"): par = getattr(self, mask_par) if par.key is not None: self.DMEQUADs[mask_par] = (par.key, tuple(par.key_value)) else: continue
# if len(self.DMEFACs) != len(self.DMEQUADs): # self._match_DMEFAC_DMEQUAD() # else: # self._paired_DMEFAC_DMEQUAD = self.pair_DMEFAC_DMEQUAD()
[docs] def validate(self): super().validate() # check duplicate for el in ["DMEFACs", "DMEQUADs"]: l = list(getattr(self, el).values()) if [x for x in l if l.count(x) > 1] != []: raise ValueError(f"'{el}' have duplicated keys and key values.")
[docs] def scale_dm_sigma(self, toas): """ Scale the DM uncertainty. Parameters ---------- toas: `pint.toa.TOAs` object Input DM error object. We assume DM error is stored in the TOA objects. """ sigma_scaled = copy.deepcopy(toas.get_dm_errors()) # Apply DMEQUAD first for dmequad_name in self.DMEQUADs: dmequad = getattr(self, dmequad_name) if dmequad.quantity is None: continue mask = dmequad.select_toa_mask(toas) sigma_scaled[mask] = np.hypot(sigma_scaled[mask], dmequad.quantity) # Then apply the DMEFAC for dmefac_name in self.DMEFACs: dmefac = getattr(self, dmefac_name) sigma_scaled[dmefac.select_toa_mask(toas)] *= dmefac.quantity return sigma_scaled
def dm_sigma_scaled_cov_matrix(self, toas): scaled_sigma = self.scale_dm_sigma(toas).to_value(u.pc / u.cm**3) ** 2 return np.diag(scaled_sigma)
[docs]class EcorrNoise(NoiseComponent): """Noise correlated between nearby TOAs. This can occur, for example, if multiple TOAs were taken at different frequencies simultaneously: pulsar intrinsic emission jitters back and forth within the average profile, and this effect is the same for all frequencies. Thus these TOAs have correlated errors. Parameters supported: .. paramtable:: :class: pint.models.noise_model.EcorrNoise Note ---- Ref: NANOGrav 11 yrs data """ register = True category = "ecorr_noise" introduces_correlated_errors = True is_time_correlated = False def __init__( self, ): super().__init__() self.add_param( maskParameter( name="ECORR", units="us", aliases=["TNECORR"], description="An error term that is correlated among all TOAs in an observing epoch.", ) ) self.covariance_matrix_funcs += [self.ecorr_cov_matrix] self.basis_funcs += [self.ecorr_basis_weight_pair]
[docs] def setup(self): super().setup() # Get all the EFAC parameters and EQUAD self.ECORRs = {} for mask_par in self.get_params_of_type("maskParameter"): if mask_par.startswith("ECORR"): par = getattr(self, mask_par) self.ECORRs[mask_par] = (par.key, par.key_value) else: continue
[docs] def validate(self): super().validate() # check duplicate for el in ["ECORRs"]: l = list(getattr(self, el).values()) if [x for x in l if l.count(x) > 1] != []: raise ValueError(f"'{el}' have duplicated keys and key values.")
def get_ecorrs(self): return [getattr(self, ecorr) for ecorr, ecorr_key in list(self.ECORRs.items())]
[docs] def get_noise_basis(self, toas): """Return the quantization matrix for ECORR. A quantization matrix maps TOAs to observing epochs. """ tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value ecorrs = self.get_ecorrs() umats = [] for ec in ecorrs: mask = ec.select_toa_mask(toas) if np.any(mask): umats.append(create_ecorr_quantization_matrix(t[mask])) else: warnings.warn(f"ECORR {ec} has no TOAs") umats.append(np.zeros((0, 0))) nc = sum(u.shape[1] for u in umats) umat = np.zeros((len(t), nc)) nctot = 0 for ct, ec in enumerate(ecorrs): mask = ec.select_toa_mask(toas) nn = umats[ct].shape[1] umat[mask, nctot : nn + nctot] = umats[ct] nctot += nn return umat
[docs] def get_noise_weights(self, toas, nweights=None): """Return the ECORR weights The weights used are the square of the ECORR values. """ ecorrs = self.get_ecorrs() if nweights is None: ts = (toas.table["tdbld"].quantity * u.day).to(u.s).value nweights = [ get_ecorr_nweights(ts[ec.select_toa_mask(toas)]) for ec in ecorrs ] nc = sum(nweights) weights = np.zeros(nc) nctot = 0 for ec, nn in zip(ecorrs, nweights): weights[nctot : nn + nctot] = ec.quantity.to(u.s).value ** 2 nctot += nn return weights
[docs] def ecorr_basis_weight_pair(self, toas): """Return a quantization matrix and ECORR weights. A quantization matrix maps TOAs to observing epochs. The weights used are the square of the ECORR values. """ return (self.get_noise_basis(toas), self.get_noise_weights(toas))
[docs] def ecorr_cov_matrix(self, toas): """Full ECORR covariance matrix.""" U, Jvec = self.ecorr_basis_weight_pair(toas) return np.dot(U * Jvec[None, :], U.T)
[docs]class PLDMNoise(NoiseComponent): """Model of DM variations as radio frequency-dependent noise with a power-law spectrum. Variations in DM over time result from both the proper motion of the pulsar and the changing electron number density along the line of sight from the solar wind and ISM. In particular, Kolmogorov turbulence in the ionized ISM will induce stochastic DM variations with a power law spectrum. Timing errors due to unmodelled DM variations can therefore appear very similar to intrinsic red noise, however the amplitude of these variations will scale with the inverse of the square of the (Earth Doppler corrected) radio frequency. Parameters supported: .. paramtable:: :class: pint.models.noise_model.PLDMNoise Note ---- Ref: Lentati et al. 2014 """ register = True category = "pl_DM_noise" introduces_correlated_errors = True is_time_correlated = True def __init__( self, ): super().__init__() self.add_param( floatParameter( name="TNDMAMP", units="", aliases=[], description="Amplitude of powerlaw DM noise in tempo2 format", ) ) self.add_param( floatParameter( name="TNDMGAM", units="", aliases=[], description="Spectral index of powerlaw " "DM noise in tempo2 format", ) ) self.add_param( floatParameter( name="TNDMC", units="", aliases=[], description="Number of DM noise frequencies.", ) ) self.covariance_matrix_funcs += [self.pl_dm_cov_matrix] self.basis_funcs += [self.pl_dm_basis_weight_pair] def get_pl_vals(self): nf = int(self.TNDMC.value) if self.TNDMC.value is not None else 30 amp, gam = 10**self.TNDMAMP.value, self.TNDMGAM.value return (amp, gam, nf)
[docs] def get_noise_basis(self, toas): """Return a Fourier design matrix for DM noise. See the documentation for pl_dm_basis_weight_pair function for details.""" tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value freqs = self._parent.barycentric_radio_freq(toas).to(u.MHz) fref = 1400 * u.MHz D = (fref.value / freqs.value) ** 2 nf = self.get_pl_vals()[2] Fmat = create_fourier_design_matrix(t, nf) return Fmat * D[:, None]
[docs] def get_noise_weights(self, toas): """Return power law DM noise weights. See the documentation for pl_dm_basis_weight_pair for details.""" tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value amp, gam, nf = self.get_pl_vals() Ffreqs = get_rednoise_freqs(t, nf) return powerlaw(Ffreqs, amp, gam) * Ffreqs[0]
[docs] def pl_dm_basis_weight_pair(self, toas): """Return a Fourier design matrix and power law DM noise weights. A Fourier design matrix contains the sine and cosine basis_functions in a Fourier series expansion. Here we scale the design matrix by (fref/f)**2, where fref = 1400 MHz to match the convention used in enterprise. The weights used are the power-law PSD values at frequencies n/T, where n is in [1, TNDMC] and T is the total observing duration of the dataset. """ return (self.get_noise_basis(toas), self.get_noise_weights(toas))
def pl_dm_cov_matrix(self, toas): Fmat, phi = self.pl_dm_basis_weight_pair(toas) return np.dot(Fmat * phi[None, :], Fmat.T)
[docs]class PLRedNoise(NoiseComponent): """Timing noise with a power-law spectrum. Over the long term, pulsars are observed to experience timing noise dominated by low frequencies. This can occur, for example, if the torque on the pulsar varies randomly. If the torque experiences white noise, the phase we observe will experience "red" noise, that is noise dominated by the lowest frequency. This results in errors that are correlated between TOAs over fairly long time spans. Parameters supported: .. paramtable:: :class: pint.models.noise_model.PLRedNoise Note ---- Ref: NANOGrav 11 yrs data """ register = True category = "pl_red_noise" introduces_correlated_errors = True is_time_correlated = True def __init__( self, ): super().__init__() self.add_param( floatParameter( name="RNAMP", units="", aliases=[], description="Amplitude of powerlaw red noise.", ) ) self.add_param( floatParameter( name="RNIDX", units="", aliases=[], description="Spectral index of powerlaw red noise.", ) ) self.add_param( floatParameter( name="TNREDAMP", units="", aliases=[], description="Amplitude of powerlaw red noise in tempo2 format", ) ) self.add_param( floatParameter( name="TNREDGAM", units="", aliases=[], description="Spectral index of powerlaw red noise in tempo2 format", ) ) self.add_param( floatParameter( name="TNREDC", units="", aliases=[], description="Number of red noise frequencies.", ) ) self.covariance_matrix_funcs += [self.pl_rn_cov_matrix] self.basis_funcs += [self.pl_rn_basis_weight_pair] def get_pl_vals(self): nf = int(self.TNREDC.value) if self.TNREDC.value is not None else 30 if self.TNREDAMP.value is not None and self.TNREDGAM.value is not None: amp, gam = 10**self.TNREDAMP.value, self.TNREDGAM.value elif self.RNAMP.value is not None and self.RNIDX is not None: fac = (86400.0 * 365.24 * 1e6) / (2.0 * np.pi * np.sqrt(3.0)) amp, gam = self.RNAMP.value / fac, -1 * self.RNIDX.value return (amp, gam, nf)
[docs] def get_noise_basis(self, toas): """Return a Fourier design matrix for red noise. See the documentation for pl_rn_basis_weight_pair function for details.""" tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value nf = self.get_pl_vals()[2] return create_fourier_design_matrix(t, nf)
[docs] def get_noise_weights(self, toas): """Return power law red noise weights. See the documentation for pl_rn_basis_weight_pair for details.""" tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value amp, gam, nf = self.get_pl_vals() Ffreqs = get_rednoise_freqs(t, nf) return powerlaw(Ffreqs, amp, gam) * Ffreqs[0]
[docs] def pl_rn_basis_weight_pair(self, toas): """Return a Fourier design matrix and power law red noise weights. A Fourier design matrix contains the sine and cosine basis_functions in a Fourier series expansion. The weights used are the power-law PSD values at frequencies n/T, where n is in [1, TNREDC] and T is the total observing duration of the dataset. """ return (self.get_noise_basis(toas), self.get_noise_weights(toas))
def pl_rn_cov_matrix(self, toas): Fmat, phi = self.pl_rn_basis_weight_pair(toas) return np.dot(Fmat * phi[None, :], Fmat.T)
[docs]def get_ecorr_epochs(toas_table, dt=1, nmin=2): """Find only epochs with more than 1 TOA for applying ECORR.""" if len(toas_table) == 0: return [] isort = np.argsort(toas_table) bucket_ref = [toas_table[isort[0]]] bucket_ind = [[isort[0]]] for i in isort[1:]: if toas_table[i] - bucket_ref[-1] < dt: bucket_ind[-1].append(i) else: bucket_ref.append(toas_table[i]) bucket_ind.append([i]) return [ind for ind in bucket_ind if len(ind) >= nmin]
[docs]def get_ecorr_nweights(toas_table, dt=1, nmin=2): """Get the number of epochs associated with each ECORR. This is equal to the number of weights of that ECORR.""" return len(get_ecorr_epochs(toas_table, dt=dt, nmin=nmin))
[docs]def create_ecorr_quantization_matrix(toas_table, dt=1, nmin=2): """Create quantization matrix mapping TOAs to observing epochs. Only epochs with more than 1 TOA are included.""" bucket_ind2 = get_ecorr_epochs(toas_table, dt=dt, nmin=nmin) U = np.zeros((len(toas_table), len(bucket_ind2)), "d") for i, l in enumerate(bucket_ind2): U[l, i] = 1 return U
[docs]def get_rednoise_freqs(t, nmodes, Tspan=None): """Frequency components for creating the red noise basis matrix.""" T = Tspan if Tspan is not None else t.max() - t.min() f = np.linspace(1 / T, nmodes / T, nmodes) Ffreqs = np.zeros(2 * nmodes) Ffreqs[::2] = f Ffreqs[1::2] = f return Ffreqs
[docs]def create_fourier_design_matrix(t, nmodes, Tspan=None): """ Construct fourier design matrix from eq 11 of Lentati et al, 2013 :param t: vector of time series in seconds :param nmodes: number of fourier coefficients to use :param Tspan: option to some other Tspan :return: F: fourier design matrix :return: f: Sampling frequencies """ N = len(t) F = np.zeros((N, 2 * nmodes)) Ffreqs = get_rednoise_freqs(t, nmodes, Tspan=Tspan) F[:, ::2] = np.sin(2 * np.pi * t[:, None] * Ffreqs[::2]) F[:, 1::2] = np.cos(2 * np.pi * t[:, None] * Ffreqs[1::2]) return F
[docs]def powerlaw(f, A=1e-16, gamma=5): """Power-law PSD. :param f: Sampling frequencies :param A: Amplitude of red noise [GW units] :param gamma: Spectral index of red noise process """ fyr = 1 / 3.16e7 return A**2 / 12.0 / np.pi**2 * fyr ** (gamma - 3) * f ** (-gamma)