Source code for pint.models.astrometry

"""Astrometric models for describing pulsar sky positions."""

import copy
import sys
import warnings
from typing import Optional, List, Union

import astropy.constants as const
import astropy.coordinates as coords
import astropy.units as u
import numpy as np
from astropy.time import Time

from loguru import logger as log

from erfa import ErfaWarning, pmsafe
from pint import ls
from pint.types import time_like, quantity_like
from pint.models.parameter import (
    AngleParameter,
    MJDParameter,
    floatParameter,
    strParameter,
)
import pint.toa
from pint.models.timing_model import DelayComponent, MissingParameter
from pint.pulsar_ecliptic import OBL, PulsarEcliptic
from pint.utils import add_dummy_distance, remove_dummy_distance

astropy_version = sys.modules["astropy"].__version__
mas_yr = u.mas / u.yr

__all__ = [
    "AstrometryEquatorial",
    "AstrometryEcliptic",
    "Astrometry",
]


[docs]class Astrometry(DelayComponent): """Common tools for astrometric calculations.""" register = False category = "astrometry" def __init__(self): super().__init__() self.add_param( MJDParameter( name="POSEPOCH", description="Reference epoch for position", time_scale="tdb", ) ) self.add_param( floatParameter(name="PX", units="mas", value=0.0, description="Parallax") ) self.delay_funcs_component += [self.solar_system_geometric_delay] self.register_deriv_funcs(self.d_delay_astrometry_d_PX, "PX")
[docs] def ssb_to_psb_xyz_ICRS(self, epoch: Optional[time_like] = None) -> u.Quantity: """Returns unit vector(s) from SSB to pulsar system barycenter under ICRS. If epochs (MJD) are given, proper motion is included in the calculation. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed Returns ------- np.ndarray : (len(epoch), 3) array of unit vectors """ # TODO: would it be better for this to return a 6-vector (pos, vel)? # this is somewhat slow, since it repeatedly created different SkyCoord Objects # but for consistency only change the method in the subclasses below return self.coords_as_ICRS(epoch=epoch).cartesian.xyz.transpose()
[docs] def ssb_to_psb_xyz_ECL( self, epoch: Optional[time_like] = None, ecl: str = None ) -> u.Quantity: """Returns unit vector(s) from SSB to pulsar system barycenter under Ecliptic coordinates. If epochs (MJD) are given, proper motion is included in the calculation. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed ecl : str, optional Obliquity (IERS2010 by default) Returns ------- np.ndarray : (len(epoch), 3) array of unit vectors """ # TODO: would it be better for this to return a 6-vector (pos, vel)? return self.coords_as_ECL(epoch=epoch, ecl=ecl).cartesian.xyz.transpose()
[docs] def sun_angle( self, toas: pint.toa.TOAs, heliocenter: bool = True, also_distance: bool = False ) -> np.ndarray: """Compute the pulsar-observatory-Sun angle. This is the angle between the center of the Sun and the direction to the pulsar, as seen from the observatory (for each TOA). This angle takes into account the motion of the Sun around the solar system barycenter. Parameters ---------- toas: :class:`pint.toa.TOAs` The pulse arrival times at which to evaluate the sun angle. heliocenter: bool Whether to use the Sun's actual position (the heliocenter) or the solar system barycenter. The latter may be useful for comparison with other software. also_distance: bool If True, also return the observatory-Sun distance as a Quantity Returns ------- array The angle in radians """ tbl = toas.table if heliocenter: osv = tbl["obs_sun_pos"].quantity.copy() else: osv = -tbl["ssb_obs_pos"].quantity.copy() psr_vec = self.ssb_to_psb_xyz_ICRS(epoch=tbl["tdbld"]) r = (osv**2).sum(axis=1) ** 0.5 osv /= r[:, None] cos = (osv * psr_vec).sum(axis=1) return (np.arccos(cos), r) if also_distance else np.arccos(cos)
def barycentric_radio_freq(self, toas): raise NotImplementedError
[docs] def solar_system_geometric_delay( self, toas: pint.toa.TOAs, acc_delay=None ) -> u.Quantity: """Returns geometric delay (in sec) due to position of site in solar system. This includes Roemer delay and parallax. NOTE: currently assumes XYZ location of TOA relative to SSB is available as 3-vector toa.xyz, in units of light-seconds. """ tbl = toas.table delay = np.zeros(len(toas)) # c selects the non-barycentric TOAs that need actual calculation c = np.logical_and.reduce(tbl["ssb_obs_pos"] != 0, axis=1) if np.any(c): L_hat = self.ssb_to_psb_xyz_ICRS(epoch=tbl["tdbld"][c].astype(np.float64)) re_dot_L = np.sum(tbl["ssb_obs_pos"][c] * L_hat, axis=1) delay[c] = -re_dot_L.to(ls).value if self.PX.value != 0.0: L = (1.0 / self.PX.value) * u.kpc # TODO: np.sum currently loses units in some cases... re_sqr = ( np.sum(tbl["ssb_obs_pos"][c] ** 2, axis=1) * tbl["ssb_obs_pos"].unit ** 2 ) delay[c] += ( (0.5 * (re_sqr / L) * (1.0 - re_dot_L**2 / re_sqr)).to(ls).value ) return delay * u.second
[docs] def get_d_delay_quantities(self, toas: pint.toa.TOAs) -> dict: """Calculate values needed for many d_delay_d_param functions""" # TODO: Should delay not have units of u.second? delay = self._parent.delay(toas) # TODO: tbl['tdbld'].quantity should have units of u.day # NOTE: Do we need to include the delay here? tbl = toas.table rd = {"epoch": tbl["tdbld"].quantity * u.day} # Distance from SSB to observatory, and from SSB to psr ssb_obs = tbl["ssb_obs_pos"].quantity ssb_psr = self.ssb_to_psb_xyz_ICRS(epoch=np.array(rd["epoch"])) # Cartesian coordinates, and derived quantities rd["ssb_obs_r"] = np.sqrt(np.sum(ssb_obs**2, axis=1)) rd["ssb_obs_z"] = ssb_obs[:, 2] rd["ssb_obs_xy"] = np.sqrt(ssb_obs[:, 0] ** 2 + ssb_obs[:, 1] ** 2) rd["ssb_obs_x"] = ssb_obs[:, 0] rd["ssb_obs_y"] = ssb_obs[:, 1] rd["in_psr_obs"] = np.sum(ssb_obs * ssb_psr, axis=1) # Earth right ascension and declination rd["earth_dec"] = np.arctan2(rd["ssb_obs_z"], rd["ssb_obs_xy"]) rd["earth_ra"] = np.arctan2(rd["ssb_obs_y"], rd["ssb_obs_x"]) return rd
def get_params_as_ICRS(self): raise NotImplementedError def get_psr_coords(self, epoch=None): raise NotImplementedError
[docs] def d_delay_astrometry_d_PX( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt PX Roughly following Smart, 1977, chapter 9. px_r: Extra distance to Earth, wrt SSB, from pulsar r_e: Position of earth (vector) wrt SSB u_p: Unit vector from SSB pointing to pulsar t_d: Parallax delay c: Speed of light delta: Parallax The parallax delay is due to a distance orthogonal to the line of sight to the pulsar from the SSB: px_r = sqrt( r_e**2 - (r_e.u_p)**2 ), with delay t_d = 0.5 * px_r * delta'/ c, and delta = delta' * px_r / (1 AU) """ rd = self.get_d_delay_quantities(toas) px_r = np.sqrt(rd["ssb_obs_r"] ** 2 - rd["in_psr_obs"] ** 2) dd_dpx = 0.5 * (px_r**2 / (u.AU * const.c)) * (u.mas / u.radian) # We want to return sec / mas return dd_dpx.decompose(u.si.bases) / u.mas
[docs] def d_delay_astrometry_d_POSEPOCH(self, toas, param="", acc_delay=None): """Calculate the derivative wrt POSEPOCH""" pass
[docs] def change_posepoch(self, new_epoch: time_like): """Change POSEPOCH to a new value and update the position accordingly. Parameters ---------- new_epoch: float or `astropy.Time` object The new POSEPOCH value. """ raise NotImplementedError
def as_ECL(self, epoch=None, ecl="IERS2010"): raise NotImplementedError def as_ICRS(self, epoch=None, ecl="IERS2010"): raise NotImplementedError
[docs]class AstrometryEquatorial(Astrometry): """Astrometry in equatorial coordinates. Parameters supported: .. paramtable:: :class: pint.models.astrometry.AstrometryEquatorial """ register = True def __init__(self): super().__init__() self.add_param( AngleParameter( name="RAJ", units="H:M:S", description="Right ascension (J2000)", aliases=["RA"], ) ) self.add_param( AngleParameter( name="DECJ", units="D:M:S", description="Declination (J2000)", aliases=["DEC"], ) ) self.add_param( floatParameter( name="PMRA", units="mas/year", description="Proper motion in RA", value=0.0, ) ) self.add_param( floatParameter( name="PMDEC", units="mas/year", description="Proper motion in DEC", value=0.0, ) ) self.set_special_params(["RAJ", "DECJ", "PMRA", "PMDEC"]) for param in ["RAJ", "DECJ", "PMRA", "PMDEC"]: deriv_func_name = f"d_delay_astrometry_d_{param}" func = getattr(self, deriv_func_name) self.register_deriv_funcs(func, param)
[docs] def validate(self): """Validate the input parameter.""" super().validate() # RA/DEC are required for p in ("RAJ", "DECJ"): if getattr(self, p).value is None: raise MissingParameter("Astrometry", p) # Check for POSEPOCH if ( np.any(self.PMRA.quantity != 0) or np.any(self.PMDEC.quantity != 0) ) and self.POSEPOCH.quantity is None: if self._parent.PEPOCH.quantity is None: raise MissingParameter( "AstrometryEquatorial", "POSEPOCH", "POSEPOCH or PEPOCH are required if PM is set.", ) else: self.POSEPOCH.quantity = self._parent.PEPOCH.quantity
[docs] def print_par(self, format: str = "pint") -> str: result = "" print_order = ["RAJ", "DECJ", "PMRA", "PMDEC", "PX", "POSEPOCH"] for p in print_order: par = getattr(self, p) if par.quantity is not None: result += getattr(self, p).as_parfile_line(format=format) return result
[docs] def barycentric_radio_freq(self, toas: pint.toa.TOAs) -> u.Quantity: """Return radio frequencies (MHz) of the toas corrected for Earth motion""" tbl = toas.table L_hat = self.ssb_to_psb_xyz_ICRS(epoch=tbl["tdbld"].astype(np.float64)) v_dot_L_array = np.sum(tbl["ssb_obs_vel"] * L_hat, axis=1) return tbl["freq"] * (1.0 - v_dot_L_array / const.c)
[docs] def get_psr_coords(self, epoch: Optional[time_like] = None) -> coords.SkyCoord: """Returns pulsar sky coordinates as an astropy ICRS object instance. Parameters ---------- epoch: `astropy.time.Time` or Float, optional new epoch for position. If Float, MJD(TDB) is assumed Returns ------- position ICRS SkyCoord object optionally with proper motion applied If epoch (MJD) is specified, proper motion is included to return the position at the given epoch. """ if epoch is None or (self.PMRA.value == 0.0 and self.PMDEC.value == 0.0): return coords.SkyCoord( ra=self.RAJ.quantity, dec=self.DECJ.quantity, pm_ra_cosdec=self.PMRA.quantity, pm_dec=self.PMDEC.quantity, obstime=self.POSEPOCH.quantity, frame=coords.ICRS, ) newepoch = ( epoch if isinstance(epoch, Time) else Time(epoch, scale="tdb", format="mjd") ) position_now = add_dummy_distance(self.get_psr_coords()) with warnings.catch_warnings(): warnings.simplefilter("ignore", ErfaWarning) # for the most part the dummy distance should remove any potential erfa warnings # but for some very large proper motions that does not quite work # so we catch the warnings position_then = position_now.apply_space_motion(new_obstime=newepoch) position_then = remove_dummy_distance(position_then) return position_then
[docs] def coords_as_ICRS(self, epoch: Optional[time_like] = None) -> coords.SkyCoord: """Return the pulsar's ICRS coordinates as an astropy coordinate object. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed Returns ------- astropy.coordinates.SkyCoord """ return self.get_psr_coords(epoch)
[docs] def coords_as_ECL( self, epoch: Optional[time_like] = None, ecl: str = None ) -> coords.SkyCoord: """Return the pulsar's ecliptic coordinates as an astropy coordinate object. The value used for the obliquity of the ecliptic can be controlled with the `ecl` keyword, which should be one of the codes listed in `ecliptic.dat`. If `ecl` is left unspecified, the global default IERS2010 will be used. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed ecl : str Returns ------- astropy.coordinates.SkyCoord """ if ecl is None: log.debug("ECL not specified; using IERS2010.") ecl = "IERS2010" pos_icrs = self.get_psr_coords(epoch=epoch) return pos_icrs.transform_to(PulsarEcliptic(ecl=ecl))
[docs] def coords_as_GAL(self, epoch: Optional[time_like] = None) -> coords.SkyCoord: """Return the pulsar's galactic coordinates as an astropy coordinate object. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed Returns ------- astropy.coordinates.SkyCoord """ pos_icrs = self.get_psr_coords(epoch=epoch) return pos_icrs.transform_to(coords.Galactic)
def get_params_as_ICRS(self) -> dict: return { "RAJ": self.RAJ.quantity, "DECJ": self.DECJ.quantity, "PMRA": self.PMRA.quantity, "PMDEC": self.PMDEC.quantity, }
[docs] def ssb_to_psb_xyz_ICRS(self, epoch: Optional[time_like] = None) -> u.Quantity: """Returns unit vector(s) from SSB to pulsar system barycenter under ICRS. If epochs (MJD) are given, proper motion is included in the calculation. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed Returns ------- np.ndarray : (len(epoch), 3) array of unit vectors """ # TODO: would it be better for this to return a 6-vector (pos, vel)? # this was somewhat slow, since it repeatedly created different SkyCoord Objects # return self.coords_as_ICRS(epoch=epoch).cartesian.xyz.transpose() # Instead look at what https://docs.astropy.org/en/stable/_modules/astropy/coordinates/sky_coordinate.html#SkyCoord.apply_space_motion # does, which is to use https://github.com/liberfa/erfa/blob/master/src/starpm.c # and then just use the relevant pieces of that if epoch is None or (self.PMRA.quantity == 0 and self.PMDEC.quantity == 0): ra, dec = self.RAJ.quantity, self.DECJ.quantity return self.xyz_from_radec(ra, dec) # return self.coords_as_ICRS(epoch=epoch).cartesian.xyz.transpose() if isinstance(epoch, Time): jd1 = epoch.jd1 jd2 = epoch.jd2 elif isinstance(epoch, u.Quantity): epoch = Time(epoch, format="mjd", scale="tdb") jd1 = epoch.jd1 jd2 = epoch.jd2 else: # assume MJD jd1 = 2400000.5 jd2 = epoch # compared to the general case above we can assume that the coordinates are ICRS # so just access those components with warnings.catch_warnings(): warnings.simplefilter("ignore", ErfaWarning) # note that starpm wants mu_alpha not mu_alpha * cos(delta) starpmout = pmsafe( self.RAJ.quantity.to_value(u.radian), self.DECJ.quantity.to_value(u.radian), self.PMRA.quantity.to_value(u.radian / u.yr) / np.cos(self.DECJ.quantity).value, self.PMDEC.quantity.to_value(u.radian / u.yr), self.PX.quantity.to_value(u.arcsec), 0.0, self.POSEPOCH.quantity.jd1, self.POSEPOCH.quantity.jd2, jd1, jd2, ) # ra,dec now in radians ra, dec = starpmout[0], starpmout[1] return self.xyz_from_radec(ra, dec)
def xyz_from_radec(self, ra, dec): x = np.cos(ra) * np.cos(dec) y = np.sin(ra) * np.cos(dec) z = np.sin(dec) return u.Quantity([x, y, z]).T
[docs] def d_delay_astrometry_d_RAJ( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt RAJ For the RAJ and DEC derivatives, use the following approximate model for the pulse delay. (Inner-product between two Cartesian vectors): - de = Earth declination (wrt SSB) - ae = Earth right ascension - dp = pulsar declination - aa = pulsar right ascension - r = distance from SSB to Earh - c = speed of light delay = r*[cos(de)*cos(dp)*cos(ae-aa)+sin(de)*sin(dp)]/c """ rd = self.get_d_delay_quantities(toas) psr_ra = self.RAJ.quantity psr_dec = self.DECJ.quantity geom = ( np.cos(rd["earth_dec"]) * np.cos(psr_dec) * np.sin(psr_ra - rd["earth_ra"]) ) dd_draj = rd["ssb_obs_r"] * geom / (const.c * u.radian) return dd_draj.decompose(u.si.bases)
[docs] def d_delay_astrometry_d_DECJ( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt DECJ Definitions as in d_delay_d_RAJ """ rd = self.get_d_delay_quantities(toas) psr_ra = self.RAJ.quantity psr_dec = self.DECJ.quantity geom = np.cos(rd["earth_dec"]) * np.sin(psr_dec) * np.cos( psr_ra - rd["earth_ra"] ) - np.sin(rd["earth_dec"]) * np.cos(psr_dec) dd_ddecj = rd["ssb_obs_r"] * geom / (const.c * u.radian) return dd_ddecj.decompose(u.si.bases)
[docs] def d_delay_astrometry_d_PMRA( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt PMRA Definitions as in d_delay_d_RAJ. Now we have a derivative in mas/yr for the pulsar RA """ rd = self.get_d_delay_quantities(toas) psr_ra = self.RAJ.quantity te = rd["epoch"] - self.POSEPOCH.quantity.tdb.mjd_long * u.day geom = np.cos(rd["earth_dec"]) * np.sin(psr_ra - rd["earth_ra"]) deriv = rd["ssb_obs_r"] * geom * te / (const.c * u.radian) dd_dpmra = deriv * u.mas / u.year # We want to return sec / (mas / yr) return dd_dpmra.decompose(u.si.bases) / (u.mas / u.year)
[docs] def d_delay_astrometry_d_PMDEC( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt PMDEC Definitions as in d_delay_d_RAJ. Now we have a derivative in mas/yr for the pulsar DEC """ rd = self.get_d_delay_quantities(toas) psr_ra = self.RAJ.quantity psr_dec = self.DECJ.quantity te = rd["epoch"] - self.POSEPOCH.quantity.tdb.mjd_long * u.day geom = np.cos(rd["earth_dec"]) * np.sin(psr_dec) * np.cos( psr_ra - rd["earth_ra"] ) - np.cos(psr_dec) * np.sin(rd["earth_dec"]) deriv = rd["ssb_obs_r"] * geom * te / (const.c * u.radian) dd_dpmdec = deriv * u.mas / u.year # We want to return sec / (mas / yr) return dd_dpmdec.decompose(u.si.bases) / (u.mas / u.year)
[docs] def change_posepoch(self, new_epoch: time_like): """Change POSEPOCH to a new value and update the position accordingly. Parameters ---------- new_epoch: `astropy.time.Time` or float or astropy.units.Quantity If float or Quantity, MJD(TDB) is assumed. Note that uncertainties are not adjusted. The new POSEPOCH value. """ if isinstance(new_epoch, Time): new_epoch = Time(new_epoch, scale="tdb", precision=9) else: new_epoch = Time(new_epoch, scale="tdb", format="mjd", precision=9) if self.POSEPOCH.value is None: raise ValueError("POSEPOCH is not currently set.") new_coords = self.get_psr_coords(new_epoch.mjd_long) self.RAJ.value = new_coords.ra self.DECJ.value = new_coords.dec self.POSEPOCH.value = new_epoch
[docs] def as_ICRS(self, epoch: Optional[time_like] = None) -> "AstrometryEquatorial": """Return pint.models.astrometry.Astrometry object in ICRS frame. Parameters ---------- epoch : `astropy.time.Time` or float or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed. New epoch for position. Returns ------- pint.models.astrometry.AstrometryEquatorial """ m = copy.deepcopy(self) if epoch is not None: m.change_posepoch(epoch) return m
[docs] def as_ECL( self, epoch: Optional[time_like] = None, ecl: str = "IERS2010" ) -> "AstrometryEcliptic": """Return pint.models.astrometry.Astrometry object in PulsarEcliptic frame. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed. New epoch for position. ecl : str, optional Obliquity for PulsarEcliptic frame Returns ------- pint.models.astrometry.AstrometryEcliptic """ m_ecl = AstrometryEcliptic() # transfer over parallax and POSEPOCH: don't need to change m_ecl.PX = self.PX m_ecl.POSEPOCH = self.POSEPOCH # get ELONG, ELAT, PM c = self.coords_as_ECL(epoch=epoch, ecl=ecl) m_ecl.ELONG.quantity = c.lon m_ecl.ELAT.quantity = c.lat m_ecl.PMELONG.quantity = c.pm_lon_coslat m_ecl.PMELAT.quantity = c.pm_lat m_ecl.ECL.value = ecl # use fake proper motions to convert uncertainties on ELONG, ELAT # assume that ELONG uncertainty does not include cos(ELAT) # and that the RA uncertainty does not include cos(DEC) # put it in here as pm_ra_cosdec since astropy complains otherwise dt = 1 * u.yr c = coords.SkyCoord( ra=self.RAJ.quantity, dec=self.DECJ.quantity, obstime=self.POSEPOCH.quantity, pm_ra_cosdec=( self.RAJ.uncertainty * np.cos(self.DECJ.quantity) / dt if self.RAJ.uncertainty is not None else 0 * self.RAJ.units / dt ), pm_dec=( self.DECJ.uncertainty / dt if self.DECJ.uncertainty is not None else 0 * self.DECJ.units / dt ), frame=coords.ICRS, ) c_ECL = c.transform_to(PulsarEcliptic(ecl=ecl)) m_ecl.ELONG.uncertainty = c_ECL.pm_lon_coslat * dt / np.cos(c_ECL.lat) m_ecl.ELAT.uncertainty = c_ECL.pm_lat * dt # use fake proper motions to convert uncertainties on proper motion # assume that the PM_RA _does_ include cos(DEC) c = coords.SkyCoord( ra=self.RAJ.quantity, dec=self.DECJ.quantity, obstime=self.POSEPOCH.quantity, pm_ra_cosdec=( self.PMRA.uncertainty if self.PMRA.uncertainty is not None else 0 * self.PMRA.units ), pm_dec=( self.PMDEC.uncertainty if self.PMDEC.uncertainty is not None else 0 * self.PMDEC.units ), frame=coords.ICRS, ) c_ECL = c.transform_to(PulsarEcliptic(ecl=ecl)) m_ecl.PMELONG.uncertainty = c_ECL.pm_lon_coslat m_ecl.PMELAT.uncertainty = c_ECL.pm_lat # freeze comparable parameters m_ecl.ELONG.frozen = self.RAJ.frozen m_ecl.ELAT.frozen = self.DECJ.frozen m_ecl.PMELONG.frozen = self.PMRA.frozen m_ecl.PMELAT.frozen = self.PMDEC.frozen return m_ecl
[docs]class AstrometryEcliptic(Astrometry): """Astrometry in ecliptic coordinates. Parameters supported: .. paramtable:: :class: pint.models.astrometry.AstrometryEcliptic """ register = True def __init__(self): super().__init__() self.add_param( AngleParameter( name="ELONG", units="deg", description="Ecliptic longitude", aliases=["LAMBDA"], ) ) self.add_param( AngleParameter( name="ELAT", units="deg", description="Ecliptic latitude", aliases=["BETA"], ) ) self.add_param( floatParameter( name="PMELONG", units="mas/year", description="Proper motion in ecliptic longitude", aliases=["PMLAMBDA"], value=0.0, ) ) self.add_param( floatParameter( name="PMELAT", units="mas/year", description="Proper motion in ecliptic latitude", aliases=["PMBETA"], value=0.0, ) ) self.add_param( strParameter( name="ECL", value="IERS2010", description="Obliquity of the ecliptic (reference)", ) ) self.set_special_params(["ELONG", "ELAT", "PMELONG", "PMELAT"]) for param in ["ELAT", "ELONG", "PMELAT", "PMELONG"]: deriv_func_name = f"d_delay_astrometry_d_{param}" func = getattr(self, deriv_func_name) self.register_deriv_funcs(func, param)
[docs] def validate(self): """Validate Ecliptic coordinate parameter inputs.""" super().validate() # ELONG/ELAT are required for p in ("ELONG", "ELAT"): if getattr(self, p).value is None: raise MissingParameter("AstrometryEcliptic", p) # Check for POSEPOCH if ( np.any(self.PMELONG.value != 0) or np.any(self.PMELAT.value != 0) ) and self.POSEPOCH.quantity is None: if self._parent.PEPOCH.quantity is None: raise MissingParameter( "Astrometry", "POSEPOCH", "POSEPOCH or PEPOCH are required if PM is set.", ) else: self.POSEPOCH.quantity = self._parent.PEPOCH.quantity
[docs] def barycentric_radio_freq(self, toas: pint.toa.TOAs) -> u.Quantity: """Return radio frequencies (MHz) of the toas corrected for Earth motion""" if "ssb_obs_vel_ecl" not in toas.table.colnames: obliquity = OBL[self.ECL.value] toas.add_vel_ecl(obliquity) tbl = toas.table L_hat = self.ssb_to_psb_xyz_ECL(epoch=tbl["tdbld"].astype(np.float64)) v_dot_L_array = np.sum(tbl["ssb_obs_vel_ecl"] * L_hat, axis=1) return tbl["freq"] * (1.0 - v_dot_L_array / const.c)
[docs] def get_psr_coords(self, epoch: Optional[time_like] = None) -> coords.SkyCoord: """Returns pulsar sky coordinates as an astropy ecliptic coordinate instance. Parameters ---------- epoch: `astropy.time.Time` or float or astropy.units.Quantity, optional new epoch for position. If float or Quantity, MJD(TDB) is assumed Returns ------- position PulsarEcliptic SkyCoord object optionally with proper motion applied If epoch (MJD) is specified, proper motion is included to return the position at the given epoch. """ try: obliquity = OBL[self.ECL.value] except KeyError as e: raise ValueError( f"No obliquity {str(self.ECL.value)} provided. Check your pint/datafile/ecliptic.dat file." ) from e if epoch is None or (self.PMELONG.value == 0.0 and self.PMELAT.value == 0.0): # Compute only once return coords.SkyCoord( obliquity=obliquity, lon=self.ELONG.quantity, lat=self.ELAT.quantity, pm_lon_coslat=self.PMELONG.quantity, pm_lat=self.PMELAT.quantity, obstime=self.POSEPOCH.quantity, frame=PulsarEcliptic, ) # Compute for each time because there is proper motion newepoch = ( epoch if isinstance(epoch, Time) else Time(epoch, scale="tdb", format="mjd") ) position_now = add_dummy_distance(self.get_psr_coords()) with warnings.catch_warnings(): # This is a fake position, no point ERFA warning the user it's bogus warnings.filterwarnings("ignore", r".*distance overridden", ErfaWarning) position_then = position_now.apply_space_motion(new_obstime=newepoch) return remove_dummy_distance(position_then)
[docs] def coords_as_ICRS(self, epoch: Optional[time_like] = None) -> coords.SkyCoord: """Return the pulsar's ICRS coordinates as an astropy coordinate object. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed. Returns ------- astropy.coordinates.SkyCoord """ pos_ecl = self.get_psr_coords(epoch=epoch) return pos_ecl.transform_to(coords.ICRS)
[docs] def coords_as_GAL(self, epoch: Optional[time_like] = None) -> coords.SkyCoord: """Return the pulsar's galactic coordinates as an astropy coordinate object. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed Returns ------- astropy.coordinates.SkyCoord """ pos_ecl = self.get_psr_coords(epoch=epoch) return pos_ecl.transform_to(coords.Galactic)
[docs] def coords_as_ECL( self, epoch: Optional[time_like] = None, ecl: str = None ) -> coords.SkyCoord: """Return the pulsar's ecliptic coordinates as an astropy coordinate object. The value used for the obliquity of the ecliptic can be controlled with the `ecl` keyword, which should be one of the codes listed in `ecliptic.dat`. If `ecl` is left unspecified, the model's ECL parameter will be used. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed ecl : str Returns ------- astropy.coordinates.SkyCoord """ pos_ecl = self.get_psr_coords(epoch=epoch) if ecl is not None: pos_ecl = pos_ecl.transform_to(PulsarEcliptic(ecl=ecl)) return pos_ecl
[docs] def ssb_to_psb_xyz_ECL( self, epoch: Optional[time_like] = None, ecl: str = None ) -> u.Quantity: """Returns unit vector(s) from SSB to pulsar system barycenter under ECL. If epochs (MJD) are given, proper motion is included in the calculation. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed ecl : str, optional Obliquity (IERS2010 by default) Returns ------- np.ndarray : (len(epoch), 3) array of unit vectors """ # TODO: would it be better for this to return a 6-vector (pos, vel)? # this was somewhat slow, since it repeatedly created different SkyCoord Objects # return self.coords_as_ICRS(epoch=epoch).cartesian.xyz.transpose() # Instead look at what https://docs.astropy.org/en/stable/_modules/astropy/coordinates/sky_coordinate.html#SkyCoord.apply_space_motion # does, which is to use https://github.com/liberfa/erfa/blob/master/src/starpm.c # and then just use the relevant pieces of that # but we need to check that the obliquity is the same if ecl is not None and ecl != self.ECL.quantity: return super().ssb_to_psb_xyz_ECL(epoch=epoch, ecl=ecl) if ecl is None: log.debug("ECL not specified; using IERS2010.") ecl = "IERS2010" if epoch is None or (self.PMELONG.value == 0 and self.PMELAT.value == 0): # return self.coords_as_ECL(epoch=epoch, ecl=ecl).cartesian.xyz.transpose() lon, lat = self.ELONG.quantity, self.ELAT.quantity return self.xyz_from_latlong(lon, lat) if isinstance(epoch, Time): jd1 = epoch.jd1 jd2 = epoch.jd2 elif isinstance(epoch, u.Quantity): epoch = Time(epoch, format="mjd", scale="tdb") jd1 = epoch.jd1 jd2 = epoch.jd2 else: jd1 = 2400000.5 jd2 = epoch # compared to the general case above we can assume that the coordinates are ECL # so just access those components lon = self.ELONG.quantity.to_value(u.radian) lat = self.ELAT.quantity.to_value(u.radian) pm_lon = ( self.PMELONG.quantity.to_value(u.radian / u.yr) / np.cos(self.ELAT.quantity).value ) pm_lat = self.PMELAT.quantity.to_value(u.radian / u.yr) with warnings.catch_warnings(): warnings.simplefilter("ignore", ErfaWarning) # note that pmsafe wants mu_lon not mu_lon * cos(lat) starpmout = pmsafe( lon, lat, pm_lon, pm_lat, self.PX.quantity.to_value(u.arcsec), 0.0, self.POSEPOCH.quantity.jd1, self.POSEPOCH.quantity.jd2, jd1, jd2, ) # lon,lat now in radians lon, lat = starpmout[0], starpmout[1] return self.xyz_from_latlong(lon, lat)
def xyz_from_latlong(self, lon, lat): x = np.cos(lon) * np.cos(lat) y = np.sin(lon) * np.cos(lat) z = np.sin(lat) return u.Quantity([x, y, z]).T
[docs] def get_d_delay_quantities_ecliptical(self, toas: pint.toa.TOAs) -> u.Quantity: """Calculate values needed for many d_delay_d_param functions.""" # TODO: Move all these calculations in a separate class for elegance # From the earth_ra dec to earth_elong and elat try: obliquity = OBL[self.ECL.value] except KeyError as e: raise ValueError( ( f"No obliquity {self.ECL.value}" + " provided. " "Check your pint/datafile/ecliptic.dat file." ) ) from e rd = self.get_d_delay_quantities(toas) coords_icrs = coords.ICRS(ra=rd["earth_ra"], dec=rd["earth_dec"]) coords_elpt = coords_icrs.transform_to(PulsarEcliptic(obliquity=obliquity)) rd["earth_elong"] = coords_elpt.lon rd["earth_elat"] = coords_elpt.lat return rd
def get_params_as_ICRS(self) -> dict: pv_ECL = self.get_psr_coords() pv_ICRS = pv_ECL.transform_to(coords.ICRS) return { "RAJ": pv_ICRS.ra.to(u.hourangle), "DECJ": pv_ICRS.dec, "PMRA": pv_ICRS.pm_ra_cosdec, "PMDEC": pv_ICRS.pm_dec, }
[docs] def d_delay_astrometry_d_ELONG( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt RAJ. For the RAJ and DEC derivatives, use the following approximate model for the pulse delay. (Inner-product between two Cartesian vectors) de = Earth declination (wrt SSB) ae = Earth right ascension dp = pulsar declination aa = pulsar right ascension r = distance from SSB to Earth c = speed of light delay = r*[cos(de)*cos(dp)*cos(ae-aa)+sin(de)*sin(dp)]/c elate = Earth elat (wrt SSB) elonge = Earth elong elatp = pulsar elat elongp = pulsar elong r = distance from SSB to Earth c = speed of light delay = r*[cos(elate)*cos(elatp)*cos(elonge-elongp)+sin(elate)*sin(elatp)]/c """ rd = self.get_d_delay_quantities_ecliptical(toas) psr_elong = self.ELONG.quantity psr_elat = self.ELAT.quantity geom = ( np.cos(rd["earth_elat"]) * np.cos(psr_elat) * np.sin(psr_elong - rd["earth_elong"]) ) dd_delong = rd["ssb_obs_r"] * geom / (const.c * u.radian) return dd_delong.decompose(u.si.bases)
[docs] def d_delay_astrometry_d_ELAT( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt DECJ Definitions as in d_delay_d_RAJ """ rd = self.get_d_delay_quantities_ecliptical(toas) psr_elong = self.ELONG.quantity psr_elat = self.ELAT.quantity geom = np.cos(rd["earth_elat"]) * np.sin(psr_elat) * np.cos( psr_elong - rd["earth_elong"] ) - np.sin(rd["earth_elat"]) * np.cos(psr_elat) dd_delat = rd["ssb_obs_r"] * geom / (const.c * u.radian) return dd_delat.decompose(u.si.bases)
[docs] def d_delay_astrometry_d_PMELONG( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt PMRA Definitions as in d_delay_d_RAJ. Now we have a derivative in mas/yr for the pulsar RA """ rd = self.get_d_delay_quantities_ecliptical(toas) psr_elong = self.ELONG.quantity psr_elat = self.ELAT.quantity te = rd["epoch"] - self.POSEPOCH.quantity.tdb.mjd_long * u.day geom = np.cos(rd["earth_elat"]) * np.sin(psr_elong - rd["earth_elong"]) deriv = rd["ssb_obs_r"] * geom * te / (const.c * u.radian) dd_dpmelong = deriv * u.mas / u.year # We want to return sec / (mas / yr) return dd_dpmelong.decompose(u.si.bases) / (u.mas / u.year)
[docs] def d_delay_astrometry_d_PMELAT( self, toas: pint.toa.TOAs, param="", acc_delay=None ) -> u.Quantity: """Calculate the derivative wrt PMDEC Definitions as in d_delay_d_RAJ. Now we have a derivative in mas/yr for the pulsar DEC """ rd = self.get_d_delay_quantities_ecliptical(toas) psr_elong = self.ELONG.quantity psr_elat = self.ELAT.quantity te = rd["epoch"] - self.POSEPOCH.quantity.tdb.mjd_long * u.day geom = np.cos(rd["earth_elat"]) * np.sin(psr_elat) * np.cos( psr_elong - rd["earth_elong"] ) - np.cos(psr_elat) * np.sin(rd["earth_elat"]) deriv = rd["ssb_obs_r"] * geom * te / (const.c * u.radian) dd_dpmelat = deriv * u.mas / u.year # We want to return sec / (mas / yr) return dd_dpmelat.decompose(u.si.bases) / (u.mas / u.year)
[docs] def print_par(self, format: str = "pint") -> str: result = "" print_order = ["ELONG", "ELAT", "PMELONG", "PMELAT", "PX", "ECL", "POSEPOCH"] for p in print_order: par = getattr(self, p) if par.quantity is not None: result += getattr(self, p).as_parfile_line(format=format) return result
[docs] def change_posepoch(self, new_epoch: time_like): """Change POSEPOCH to a new value and update the position accordingly. Parameters ---------- new_epoch: float or `astropy.Time` or `astropy.units.Quantity` object The new POSEPOCH value. """ if isinstance(new_epoch, Time): new_epoch = Time(new_epoch, scale="tdb", precision=9) else: new_epoch = Time(new_epoch, scale="tdb", format="mjd", precision=9) if self.POSEPOCH.value is None: raise ValueError("POSEPOCH is not currently set.") new_coords = self.get_psr_coords(new_epoch.mjd_long) self.ELONG.value = new_coords.lon self.ELAT.value = new_coords.lat self.POSEPOCH.value = new_epoch
[docs] def as_ECL( self, epoch: Optional[time_like] = None, ecl: str = "IERS2010" ) -> "AstrometryEcliptic": """Return pint.models.astrometry.Astrometry object in PulsarEcliptic frame. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed. New epoch for position. ecl : str, optional Obliquity for PulsarEcliptic frame Returns ------- pint.models.astrometry.AstrometryEcliptic """ # change epoch only if ecl == self.ECL.value: m = copy.deepcopy(self) if epoch is not None: m.change_posepoch(epoch) return m m_ecl = AstrometryEcliptic() # transfer over parallax and POSEPOCH: don't need to change m_ecl.PX = self.PX m_ecl.POSEPOCH = self.POSEPOCH # get ELONG, ELAT, PM c = self.coords_as_ECL(epoch=epoch, ecl=ecl) m_ecl.ELONG.quantity = c.lon m_ecl.ELAT.quantity = c.lat m_ecl.PMELONG.quantity = c.pm_lon_coslat m_ecl.PMELAT.quantity = c.pm_lat m_ecl.ECL.value = ecl # use fake proper motions to convert uncertainties on ELONG, ELAT # assume that ELONG uncertainty does not include cos(ELAT) # and that the RA uncertainty does not include cos(DEC) # put it in here as pm_ra_cosdec since astropy complains otherwise dt = 1 * u.yr c = coords.SkyCoord( lon=self.ELONG.quantity, lat=self.ELAT.quantity, obliquity=OBL[self.ECL.value], obstime=self.POSEPOCH.quantity, pm_lon_coslat=( self.ELONG.uncertainty * np.cos(self.ELAT.quantity) / dt if self.ELONG.uncertainty is not None else 0 * self.ELONG.units / dt ), pm_lat=( self.ELAT.uncertainty / dt if self.ELAT.uncertainty is not None else 0 * self.ELAT.units / dt ), frame=PulsarEcliptic, ) c_ECL = c.transform_to(PulsarEcliptic(ecl=ecl)) m_ecl.ELONG.uncertainty = c_ECL.pm_lon_coslat * dt / np.cos(c_ECL.lat) m_ecl.ELAT.uncertainty = c_ECL.pm_lat * dt # use fake proper motions to convert uncertainties on proper motion # assume that PMELONG uncertainty includes cos(DEC) c = coords.SkyCoord( lon=self.ELONG.quantity, lat=self.ELAT.quantity, obliquity=OBL[self.ECL.value], obstime=self.POSEPOCH.quantity, pm_lon_coslat=( self.PMELONG.uncertainty if self.PMELONG.uncertainty is not None else 0 * self.PMELONG.units ), pm_lat=( self.PMELAT.uncertainty if self.PMELAT.uncertainty is not None else 0 * self.PMELAT.units ), frame=PulsarEcliptic, ) c_ECL = c.transform_to(PulsarEcliptic(ecl=ecl)) m_ecl.PMELONG.uncertainty = c_ECL.pm_lon_coslat m_ecl.PMELAT.uncertainty = c_ECL.pm_lat # freeze comparable parameters m_ecl.ELONG.frozen = self.ELONG.frozen m_ecl.ELAT.frozen = self.ELAT.frozen m_ecl.PMELONG.frozen = self.PMELONG.frozen m_ecl.PMELAT.frozen = self.PMELAT.frozen return m_ecl
[docs] def as_ICRS(self, epoch: Optional[time_like] = None) -> "AstrometryEquatorial": """Return pint.models.astrometry.Astrometry object in ICRS frame. Parameters ---------- epoch : float or astropy.time.Time or astropy.units.Quantity, optional If float or Quantity, MJD(TDB) is assumed. New epoch for position. Returns ------- pint.models.astrometry.AstrometryEquatorial """ m_eq = AstrometryEquatorial() # transfer over parallax and POSEPOCH: don't need to change m_eq.PX = self.PX m_eq.POSEPOCH = self.POSEPOCH # get RA, DEC, PM c = self.coords_as_ICRS(epoch=epoch) m_eq.RAJ.quantity = c.ra m_eq.DECJ.quantity = c.dec m_eq.PMRA.quantity = c.pm_ra_cosdec m_eq.PMDEC.quantity = c.pm_dec # use fake proper motions to convert uncertainties on RA,Dec # assume that RA uncertainty does not include cos(Dec) # and neither does the ELONG uncertainty # put it in as pm_lon_coslat since astropy complains otherwise dt = 1 * u.yr c = coords.SkyCoord( lon=self.ELONG.quantity, lat=self.ELAT.quantity, obliquity=OBL[self.ECL.value], obstime=self.POSEPOCH.quantity, pm_lon_coslat=( self.ELONG.uncertainty * np.cos(self.ELAT.quantity) / dt if self.ELONG.uncertainty is not None else 0 * self.ELONG.units / dt ), pm_lat=( self.ELAT.uncertainty / dt if self.ELAT.uncertainty is not None else 0 * self.ELAT.units / dt ), frame=PulsarEcliptic, ) c_ICRS = c.transform_to(coords.ICRS) m_eq.RAJ.uncertainty = np.abs(c_ICRS.pm_ra_cosdec * dt / np.cos(c_ICRS.dec)) m_eq.DECJ.uncertainty = np.abs(c_ICRS.pm_dec * dt) # use fake proper motions to convert uncertainties on proper motion # assume that PMELONG uncertainty includes cos(DEC) c = coords.SkyCoord( lon=self.ELONG.quantity, lat=self.ELAT.quantity, obliquity=OBL[self.ECL.value], obstime=self.POSEPOCH.quantity, pm_lon_coslat=( self.PMELONG.uncertainty if self.PMELONG.uncertainty is not None else 0 * self.PMELONG.units ), pm_lat=( self.PMELAT.uncertainty if self.PMELAT.uncertainty is not None else 0 * self.PMELAT.units ), frame=PulsarEcliptic, ) c_ICRS = c.transform_to(coords.ICRS) m_eq.PMRA.uncertainty = np.abs(c_ICRS.pm_ra_cosdec) m_eq.PMDEC.uncertainty = np.abs(c_ICRS.pm_dec) # freeze comparable parameters m_eq.RAJ.frozen = self.ELONG.frozen m_eq.DECJ.frozen = self.ELAT.frozen m_eq.PMRA.frozen = self.PMELONG.frozen m_eq.PMDEC.frozen = self.PMELAT.frozen return m_eq