Source code for pint.scripts.fermiphase

#!/usr/bin/env python
import argparse

import as pyfits
import numpy as np
from astropy.coordinates import SkyCoord

import pint.logging
from loguru import logger as log


import pint.models
import pint.residuals
import pint.toa as toa
from pint.eventstats import h2sig, hmw
from pint.fermi_toas import get_Fermi_TOAs
from pint.fits_utils import read_fits_event_mjds_tuples
from pint.observatory.satellite_obs import get_satellite_observatory
from pint.plot_utils import phaseogram

__all__ = ["main"]

# log.setLevel('DEBUG')

[docs]def main(argv=None): parser = argparse.ArgumentParser( description="Use PINT to compute H-test and plot Phaseogram from a Fermi FT1 event file.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("eventfile", help="Fermi event FITS file name.") parser.add_argument("parfile", help="par file to construct model from") parser.add_argument( "weightcol", help="Column name for event weights (or 'CALC' to compute them)" ) parser.add_argument("--ft2", help="Path to FT2 file.", default=None) parser.add_argument( "--addphase", help="Write FT1 file with added phase column", default=False, action="store_true", ) parser.add_argument( "--plot", help="Show phaseogram plot.", action="store_true", default=False ) parser.add_argument("--plotfile", help="Output figure file name", default=None) parser.add_argument( "--maxMJD", help="Maximum MJD to include in analysis", default=None ) parser.add_argument( "--minMJD", help="Minimum MJD to include in analysis", default=None ) parser.add_argument( "--outfile", help="Output figure file name (default is to overwrite input file)", default=None, ) parser.add_argument( "--planets", help="Use planetary Shapiro delay in calculations", default=False, action="store_true", ) parser.add_argument("--ephem", help="Planetary ephemeris to use", default="DE421") parser.add_argument( "--log-level", type=str, choices=pint.logging.levels, default=pint.logging.script_level, help="Logging level", dest="loglevel", ) parser.add_argument( "-v", "--verbosity", default=0, action="count", help="Increase output verbosity" ) parser.add_argument( "-q", "--quiet", default=0, action="count", help="Decrease output verbosity" ) args = parser.parse_args(argv) pint.logging.setup( level=pint.logging.get_level(args.loglevel, args.verbosity, args.quiet) ) # If outfile is specified, that implies addphase if args.outfile is not None: args.addphase = True # Read in model modelin = pint.models.get_model(args.parfile) if "ELONG" in modelin.params: tc = SkyCoord( modelin.ELONG.quantity, modelin.ELAT.quantity, frame="barycentrictrueecliptic", ) else: tc = SkyCoord(modelin.RAJ.quantity, modelin.DECJ.quantity, frame="icrs") if args.ft2 is not None: # Instantiate Fermi observatory once so it gets added to the observatory registry get_satellite_observatory("Fermi", args.ft2) # Read event file and return list of TOA objects maxmjd = np.inf if (args.maxMJD is None) else float(args.maxMJD) minmjd = 0.0 if (args.minMJD is None) else float(args.minMJD) # Now convert to TOAs object and compute TDBs and posvels # For Fermi, we are not including GPS or TT(BIPM) corrections ts = get_Fermi_TOAs( args.eventfile, maxmjd=maxmjd, minmjd=minmjd, weightcolumn=args.weightcol, targetcoord=tc, planets=args.planets, ephem=args.ephem, ) ts.filename = args.eventfile print(ts.get_summary()) mjds = ts.get_mjds() print(mjds.min(), mjds.max()) # Compute model phase for each TOA iphss, phss = modelin.phase(ts, abs_phase=True) phss %= 1 phases = phss.value mjds = ts.get_mjds() weights, _ = ts.get_flag_value("weight", as_type=float) weights = np.array(weights) h = float(hmw(phases, weights)) print("Htest : {0:.2f} ({1:.2f} sigma)".format(h, h2sig(h))) if args.plot:"Making phaseogram plot with {0} photons".format(len(mjds))) phaseogram(mjds, phases, weights, bins=100, plotfile=args.plotfile) if args.addphase: # Read input FITS file (again). # If overwriting, open in 'update' mode if args.outfile is None: hdulist =, mode="update") else: hdulist = event_hdu = hdulist[1] event_hdr = event_hdu.header event_dat = event_mjds = read_fits_event_mjds_tuples(event_hdu) mjds_float = np.asarray([r[0] + r[1] for r in event_mjds]) time_mask = np.logical_and((mjds_float > minmjd), (mjds_float < maxmjd)) new_phases = np.full(len(event_dat), -1, dtype=float) new_phases[time_mask] = phases if "PULSE_PHASE" in event_hdu.columns.names:"Found existing PULSE_PHASE column, overwriting...") # Overwrite values in existing Column event_dat["PULSE_PHASE"] = new_phases else: # Construct and append new column, preserving HDU header and name"Adding new PULSE_PHASE column.") phasecol = pyfits.ColDefs( [pyfits.Column(name="PULSE_PHASE", format="D", array=new_phases)] ) bt = pyfits.BinTableHDU.from_columns( event_hdu.columns + phasecol, header=event_hdr, ) hdulist[1] = bt if args.outfile is None: # Overwrite the existing file"Overwriting existing FITS file {args.eventfile}") hdulist.flush(verbose=True, output_verify="warn") else: # Write to new output file"Writing output FITS file {args.outfile}") hdulist.writeto( args.outfile, overwrite=True, checksum=True, output_verify="warn" ) return 0