Source code for pint.models.jump

"""Phase jumps. """

import astropy.units as u
import numpy

from loguru import logger as log

from pint.models.parameter import maskParameter
from pint.models.timing_model import DelayComponent, PhaseComponent


[docs]class DelayJump(DelayComponent): """Phase jumps Parameters supported: .. paramtable:: :class: pint.models.jump.DelayJump Note ---- This component is disabled for now, since we don't have any method to identify the phase jumps and delay jumps. """ register = False category = "delay_jump" def __init__(self): super().__init__() self.add_param(maskParameter(name="JUMP", units="second")) self.delay_funcs_component += [self.jump_delay]
[docs] def setup(self): super().setup() self.jumps = [ mask_par for mask_par in self.get_params_of_type("maskParameter") if mask_par.startswith("JUMP") ] for j in self.jumps: self.register_deriv_funcs(self.d_delay_d_jump, j)
[docs] def jump_delay(self, toas, acc_delay=None): """This method returns the jump delays for each toas section collected by jump parameters. The delay value is determined by jump parameter value in the unit of seconds. """ tbl = toas.table jdelay = numpy.zeros(len(tbl)) for jump in self.jumps: jump_par = getattr(self, jump) mask = jump_par.select_toa_mask(toas) # NOTE: Currently parfile jump value has opposite sign with our # delay calculation. jdelay[mask] += -jump_par.value return jdelay * u.second
def d_delay_d_jump(self, toas, jump_param, acc_delay=None): tbl = toas.table d_delay_d_j = numpy.zeros(len(tbl)) jpar = getattr(self, jump_param) mask = jpar.select_toa_mask(toas) d_delay_d_j[mask] = -1.0 return d_delay_d_j * u.second / jpar.units
[docs] def print_par(self, format="pint"): result = "" for jump in self.jumps: jump_par = getattr(self, jump) result += jump_par.as_parfile_line(format=format) return result
[docs]class PhaseJump(PhaseComponent): """Arbitrary jumps in pulse phase. In spite of the name, the amounts here are specified in seconds and converted to phase using F0. Parameters supported: .. paramtable:: :class: pint.models.jump.PhaseJump """ register = True category = "phase_jump" def __init__(self): super().__init__() self.add_param( maskParameter( name="JUMP", units="second", description="Phase jump for selection.", ) ) self.phase_funcs_component += [self.jump_phase]
[docs] def setup(self): super().setup() self.jumps = [ mask_par for mask_par in self.get_params_of_type("maskParameter") if mask_par.startswith("JUMP") ] for j in self.jumps: # prevents duplicates from being added to phase_deriv_funcs if j in self.deriv_funcs.keys(): del self.deriv_funcs[j] self.register_deriv_funcs(self.d_phase_d_jump, j)
[docs] def jump_phase(self, toas, delay): """This method returns the jump phase for each toas section collected by jump parameters. The phase value is determined by jump parameter times F0. """ tbl = toas.table # base this on the first available jump (doesn't have to be JUMP1) jphase = numpy.zeros(len(tbl)) * ( getattr(self, self.get_params_of_type("maskParameter")[0]).units * self._parent.F0.units ) for jump in self.jumps: jump_par = getattr(self, jump) mask = jump_par.select_toa_mask(toas) # NOTE: Currently parfile jump value has opposite sign with our # phase calculation. jphase[mask] += jump_par.quantity * self._parent.F0.quantity return jphase
def d_phase_d_jump(self, toas, jump_param, delay): tbl = toas.table jpar = getattr(self, jump_param) d_phase_d_j = numpy.zeros(len(tbl)) mask = jpar.select_toa_mask(toas) d_phase_d_j[mask] = self._parent.F0.value return (d_phase_d_j * self._parent.F0.units).to(1 / u.second)
[docs] def print_par(self, format="pint"): result = "" for jump in self.jumps: jump_par = getattr(self, jump) result += jump_par.as_parfile_line(format=format) return result
[docs] def get_number_of_jumps(self): """Returns the number of jumps contained in this PhaseJump object.""" return len(self.jumps)
[docs] def get_jump_param_objects(self): """ Returns a list of the maskParameter objects representing the jumps in this PhaseJump object. """ return [getattr(self, jump) for jump in self.jumps]
[docs] def jump_params_to_flags(self, toas): """Take jumps created from .par file and add appropriate flags to toa table. This function was made specifically with pintk in mind so as to maintain a ``-jump`` flag on each TOA that is affected by any JUMP in the model, listing the numbers of all JUMPs affecting that TOA. This function wipes all ``-jump`` flags and re-initializes them to reflect the actual situation. Parameters ---------- toas: TOAs object The TOAs which contain the TOA table to be modified """ log.info(f"Initial TOAs {toas['jump']=}") toas["jump"] = "" log.info(f"Wiped TOAs {toas['jump']=}") # for every jump, set appropriate flag for TOAs it jumps for jump_par in self.get_jump_param_objects(): # find TOAs jump applies to mask = jump_par.select_toa_mask(toas) # apply to dictionaries for d in toas.table["flags"][mask]: if "jump" in d: index_list = d["jump"].split(",") index_list.append(str(jump_par.index)) d["jump"] = ",".join(index_list) else: d["jump"] = str(jump_par.index) log.info(f"Set up on {jump_par=} {d=}") log.info(f"TOAs {toas['jump']=}")
[docs] def add_jump_and_flags(self, toa_table): """Add jump object to PhaseJump and appropriate flags to TOA tables. Helper function for pintk. Primarily to be used when applying a jump through pintk to TOAs - since these jumps don't have keys that match to preexisting flags in the TOA tables, we must add the flags when adding the jump. Parameters ---------- toa_table: astropy.table.column.Column The TOA table which must be modified. In pintk (pulsar.py), this will be all_toas.table["flags"][selected] """ ind = None # index of jump name = None # name of jump # check if this is first jump added if len(self.jumps) == 0 or ( len(self.jumps) == 1 and getattr(self, "JUMP1").key is None ): param = maskParameter( name="JUMP", index=1, key="-gui_jump", key_value="1", value=0.0, units="second", frozen=False, ) # otherwise add on jump with next index else: # first, search for TOAs already jumped in inputted selection # - pintk does not allow jumps added through GUI to overlap # with existing jumps for d in toa_table: if "gui_jump" in d.keys(): log.warning( "The selected toa(s) overlap an existing jump." "Remove all interfering jumps before attempting to jump these toas." ) return None param = maskParameter( name="JUMP", index=len(self.jumps) + 1, key="-gui_jump", key_value=str(len(self.jumps) + 1), value=0.0, units="second", frozen=False, ) self.add_param(param) ind = param.index name = param.name self.setup() for dict1 in toa_table: dict1["jump"] = str(ind) dict1["gui_jump"] = str(ind) return name
[docs] def delete_not_all_jump_toas(self, toa_flags, jump_num): """Unjumps the selected TOAs if they are already jumped. This is only called when the selected TOAs are a subset of jumped TOAs with a particular jump. Parameters ---------- toa_flags: astropy.table.column.Column The TOA table which must be modified. In pintk (pulsar.py), this will be ``all_toas.table["flags"][selected]`` jump_num: int jump that needs to be deleted from the TOAs """ if toa_flags is not None: # This means there is overlap between selected TOAs and jump_num for d in toa_flags: if "jump" in d: jumps_list = d["jump"].split(",") jump_ind = jumps_list.index(str(jump_num)) del jumps_list[jump_ind] if not jumps_list: del d["jump"] else: d["jump"] = ",".join(jumps_list) return