"""Building a timing model from a par file."""
import copy
import warnings
from io import StringIO
from collections import Counter, defaultdict
from pathlib import Path
from astropy import units as u
from loguru import logger as log
import re
from pint.models.astrometry import Astrometry
from pint.models.parameter import maskParameter
from pint.models.timing_model import (
DEFAULT_ORDER,
Component,
AllComponents,
TimingModel,
ignore_prefix,
AliasConflict,
UnknownBinaryModel,
UnknownParameter,
TimingModelError,
MissingBinaryError,
ignore_params,
ignore_prefix,
)
from pint.toa import get_TOAs
from pint.utils import (
PrefixError,
interesting_lines,
lines_of,
split_prefixed_name,
get_unit,
)
from pint.models.tcb_conversion import convert_tcb_tdb
from pint.models.binary_ddk import _convert_kin, _convert_kom
__all__ = ["ModelBuilder", "get_model", "get_model_and_toas"]
default_models = ["StandardTimingModel"]
_binary_model_priority = [
"Isolated",
"BT",
"BT_piecewise",
"ELL1",
"ELL1H",
"ELL1k",
"DD",
"DDK",
"DDGR",
"DDS",
"DDH",
]
[docs]class ComponentConflict(ValueError):
"""Error for multiple components can be select but no other indications."""
[docs]def parse_parfile(parfile):
"""Function for parsing .par file or .par style StringIO.
Parameters
----------
parfile: str or file-like object
Input .par file name or string contents.
Returns
-------
dict:
Parameter and its associated lines. The key is the parameter name and
the value is a list of the lines associated to the parameter name.
"""
parfile_dict = defaultdict(list)
for l in interesting_lines(lines_of(parfile), comments=("#", "C ")):
k = l.split()
parfile_dict[k[0].upper()].append(" ".join(k[1:]))
return parfile_dict
def _replace_fdjump_in_parfile_dict(pardict):
"""Replace parameter names s of the form "FDJUMPp" by "FDpJUMP"
while reading the par file, where p is the prefix index.
Ideally, this should have been done using the parameter alias
mechanism, but there is no easy way to do this currently due to the
mask and prefix indices being treated in an identical manner.
See :class:`~pint.models.fdjump.FDJump` for more details."""
fdjumpn_regex = re.compile("^FDJUMP(\\d+)")
pardict_new = {}
for key, value in pardict.items():
if m := fdjumpn_regex.match(key):
j = int(m.groups()[0])
new_key = f"FD{j}JUMP"
pardict_new[new_key] = value
else:
pardict_new[key] = value
return pardict_new
[docs]class ModelBuilder:
"""Class for building a `TimingModel` object from a parameter file.
The ModelBuilder class helps building a TimingModel from a parameter file
(i.e., pulsar ephemeris or '.par' file).
It first maps the provided parameter names to the PINT defined parameter
names, if they are in the PINT parameter aliases list. Then, the
ModelBuilder selects model components based on the following rules:
* The components in the :py:attr:`~default_components` list will be selected.
* When a component get mapped uniquely by the given parameters.
* The pulsar binary component will be selected by the 'BINARY' parameter.
"""
def __init__(self):
# Validate the components
self.all_components = AllComponents()
self._validate_components()
self.default_components = []
[docs] def __call__(
self,
parfile,
allow_name_mixing=False,
allow_tcb=False,
allow_T2=False,
force_binary_model=None,
toas_for_tzr=None,
**kwargs,
):
"""Callable object for making a timing model from .par file.
Parameters
----------
parfile: str or file-like object
Input .par file name or string contents
allow_name_mixing : bool, optional
Flag for allowing the input to have mixing aliases names for the
same parameter. For example, if this flag is true, one can have
T2EFAC and EFAC, both of them maps to PINT parameter EFAC, present
in the parfile at the same time.
allow_tcb : True, False, or "raw", optional
Whether to read TCB par files. Default is False, and will throw an
error upon encountering TCB par files. If True, the par file will be
converted to TDB upon read. If "raw", an unconverted malformed TCB
TimingModel object will be returned.
allow_T2 : bool, optional
Whether to convert a T2 binary model to an appropriate underlying
binary model. Default is False, and will throw an error upon
encountering the T2 binary model. If True, the binary model will be
converted to the most appropriate PINT-compatible binary model.
force_binary_model : str, optional
When set to some binary model, like force_binary_model="DD", this
will override the binary model set in the parfile. Defaults to None
toas_for_tzr : TOAs or None, optional
If this is not None, a TZR TOA (AbsPhase) will be created using the
given TOAs object.
kwargs : dict
Any additional parameter/value pairs that will add to or override those in the parfile.
Returns
-------
pint.models.timing_model.TimingModel
The result timing model based on the input .parfile or file object.
"""
assert allow_tcb in [True, False, "raw"]
convert_tcb = allow_tcb == True
allow_tcb_ = allow_tcb in [True, "raw"]
assert isinstance(allow_T2, bool)
pint_param_dict, original_name, unknown_param = self._pintify_parfile(
parfile, allow_name_mixing
)
remaining_args = {}
for k, v in kwargs.items():
if k not in pint_param_dict:
if isinstance(v, u.Quantity):
pint_param_dict[k] = [
str(v.to_value(get_unit(k))),
]
else:
pint_param_dict[k] = [
str(v),
]
original_name[k] = k
else:
remaining_args[k] = v
selected, conflict, param_not_in_pint = self.choose_model(
pint_param_dict, force_binary_model=force_binary_model, allow_T2=allow_T2
)
selected.update(set(self.default_components))
# Add SolarSystemShapiro only if an Astrometry component is present.
if any(
isinstance(self.all_components.components[sc], Astrometry)
for sc in selected
):
selected.add("SolarSystemShapiro")
# Report conflict
if len(conflict) != 0:
self._report_conflict(conflict)
# Make timing model
cps = [self.all_components.components[c] for c in selected]
tm = TimingModel(components=cps)
self._setup_model(
tm,
pint_param_dict,
original_name,
setup=True,
validate=True,
allow_tcb=allow_tcb_,
)
# Report unknown line
for k, v in unknown_param.items():
p_line = " ".join([k] + v)
warnings.warn(f"Unrecognized parfile line '{p_line}'", UserWarning)
# log.warning(f"Unrecognized parfile line '{p_line}'")
if tm.UNITS.value is None or tm.UNITS.value == "":
log.warning("UNITS is not specified. Assuming TDB...")
tm.UNITS.value = "TDB"
if tm.UNITS.value == "TCB" and convert_tcb:
convert_tcb_tdb(tm)
for k, v in remaining_args.items():
if not hasattr(tm, k):
raise ValueError(f"Model does not have parameter '{k}'")
log.debug(f"Overriding '{k}' to '{v}'")
if isinstance(v, u.Quantity):
getattr(tm, k).quantity = v
else:
getattr(tm, k).value = v
# Explicitly add a TZR TOA from a given TOAs object.
if "AbsPhase" not in tm.components and toas_for_tzr is not None:
log.info("Creating a TZR TOA (AbsPhase) using the given TOAs object.")
tm.add_tzr_toa(toas_for_tzr)
if not hasattr(tm, "DelayComponent_list"):
setattr(tm, "DelayComponent_list", [])
if not hasattr(tm, "NoiseComponent_list"):
setattr(tm, "NoiseComponent_list", [])
return tm
def _validate_components(self):
"""Validate the built-in component.
This function validates if there is a subset parameter conflict in the
Components. Normally, one component's parameter should not be a subset
of another component's parameter list. Otherwise, the model builder does
not have an unique choice of component. Currently, Pulsar binary does
not follow the same rule. They are specified by the `BINARY` parameter.
"""
for k, v in self.all_components.components.items():
superset = self._is_subset_component(v)
if superset is not None:
m = (
f"Component {k}'s parameter is a subset of component"
f" {superset}. Module builder will have trouble to "
f" select the component. If component {k} is a base"
f" class, please set register to 'False' in the class"
f" of component {k}."
)
if v.category == "pulsar_system":
# The pulsar system will be selected by parameter BINARY
continue
else:
raise ComponentConflict(m)
def _get_component_param_overlap(self, component):
"""Check the parameter overlapping between two components.
Check if one component's parameters are overlapped with another
component.
Parameters
----------
components: pint.models.timing_model.Component
The component to be checked.
Returns
-------
overlap : dict
The component has overlap parameters and the overlapping parameters
in the format of {overlap component name: (overlap parameter,
number of non-overlap param in test component,
number of non-overlap param in overlap component) }
"""
overlap_entries = {}
for k, cp in self.all_components.components.items():
# Check name is a safer way to avoid the component compares to itself
if component.__class__.__name__ == k:
continue
# We assume parameters are unique in one component
in_param = set(component.aliases_map)
cpm_param = set(cp.aliases_map)
# Add aliases compare
overlap = in_param & cpm_param
# translate to PINT parameter
overlap_pint_par = {
self.all_components.alias_to_pint_param(ovlp)[0] for ovlp in overlap
}
# The degree of overlapping for input component and compared component
overlap_deg_in = len(component.params) - len(overlap_pint_par)
overlap_deg_cpm = len(cp.params) - len(overlap_pint_par)
overlap_entries[k] = (overlap_pint_par, overlap_deg_in, overlap_deg_cpm)
return overlap_entries
def _is_subset_component(self, component):
"""Is the component's parameters a subset of another component's parameters.
Parameters
----------
component: pint.models.timing_model.Component
The component to be checked.
Returns
-------
str
The superset component name, or None
"""
overlap = self._get_component_param_overlap(component)
for k, v in overlap.items():
if v[1] == 0:
return k
return None
def _pintify_parfile(self, parfile, allow_name_mixing=False):
"""Translate parfile parameter name to PINT style name.
This function converts the parfile information to PINT understandable
parameter name. It also returns the PINT unrecognized parameters and
check if the parfile has illegal repeating lines.
Parameters
----------
parfile : str, file-like object, or parfile dictionary
Parfile name, parfile StringIO, or the parfile dictionary returned
by :func:`parse_parfile`.
allow_name_mixing : bool, optional
Flag for allowing the input to have mixing aliases names for the
same parameter. For example, if this flag is true, one can have
T2EFAC and EFAC, both of them maps to PINT parameter EFAC, present
in the parfile at the same time.
Returns
-------
pint_param_dict : dict
Pintified parameter dictionary with the PINT name as key and list of
parameter value-uncertainty lines as value. For the
repeating parameters in the parfile, the value will contain
multiple lines.
original_name_map : dict
PINT name maps to the original .par file input names. PINT name
is the key and the original name is in the value.
unknown_param : dict
The PINT unrecognized parameters in the format of a dictionary.
The key is the unknown parameter name and the value is the
parfile value lines.
Raises
------
TimingModelError
If the parfile has multiple line with non-repeating parameters.
"""
pint_param_dict = defaultdict(list)
original_name_map = defaultdict(list)
unknown_param = defaultdict(list)
repeating = Counter()
if isinstance(parfile, (str, StringIO, Path)):
parfile_dict = parse_parfile(parfile)
else:
parfile_dict = parfile
# This is a special-case-hack to deal with FDJUMP parameters.
# @TODO: Implement a general mechanism to deal with cases like this.
parfile_dict = _replace_fdjump_in_parfile_dict(parfile_dict)
for k, v in parfile_dict.items():
try:
pint_name, init0 = self.all_components.alias_to_pint_param(k)
except UnknownParameter:
if k in ignore_params:
# Parameter is known but in the ignore list
continue
# Check ignored prefix
try:
pfx, idxs, idx = split_prefixed_name(k)
if pfx in ignore_prefix: # It is an ignored prefix.
continue
else:
unknown_param[k] += v
except PrefixError:
unknown_param[k] += v
continue
pint_param_dict[pint_name] += v
original_name_map[pint_name].append(k)
repeating[pint_name] += len(v)
# Check if this parameter is allowed to be repeated by PINT
if (
len(pint_param_dict[pint_name]) > 1
and pint_name not in self.all_components.repeatable_param
):
raise TimingModelError(
f"Parameter {pint_name} is not a repeatable parameter. "
f"However, multiple line use it."
)
# Check if the name is mixed
for p_n, o_n in original_name_map.items():
if len(o_n) > 1 and not allow_name_mixing:
raise TimingModelError(
f"Parameter {p_n} have mixed input names/alias "
f"{o_n}. If you want to have mixing names, please use"
f" 'allow_name_mixing=True', and the output .par file "
f"will use '{original_name_map[pint_name][0]}'."
)
original_name_map[p_n] = o_n[0]
return pint_param_dict, original_name_map, unknown_param
[docs] def choose_model(self, param_inpar, force_binary_model=None, allow_T2=False):
"""Choose the model components based on the parfile.
Parameters
----------
param_inpar: dict
Dictionary of the unique parameters in .par file with the key is the
parfile line. :func:`parse_parfile` returns this dictionary.
allow_T2 : bool, optional
Whether to convert a T2 binary model to an appropriate underlying
binary model. Default is False, and will throw an error upon
encountering the T2 binary model. If True, the binary model will be
converted to the most appropriate PINT-compatible binary model.
force_binary_model : str, optional
When set to some binary model, like force_binary_model="DD", this
will override the binary model set in the parfile. Defaults to None
Returns
-------
list
List of selected components.
dict
Conflict components dictionary, where the key the component name,
the value is a list of component names that are conflicted with the
components in the key.
list
A list of parameters that are in the .parfile but not in the PINT
defined parameters.
Note
----
The selection algorithm:
#. Look at the BINARY in the par file and cache the indicated binary model
#. Translate para file parameters to the pint parameter name
#. Go over the parameter-component map and pick up the components based
on the parameters in parfile.
#. Select the components that have its unique parameters in the parfile.
In other words, select the components that have one parameter to
on component mapping.
#. Log the conflict components, one parameter to multiple components mapping.
"""
selected_components = set()
param_count = Counter()
# 1. iteration read parfile with a no component timing_model to get
# the overall control parameters. This will get us the binary model name
# build the base fo the timing model
# pint_param_dict, unknown_param = self._pintify_parfile(param_inpar)
binary = param_inpar.get("BINARY", None)
if binary:
binary = binary[0]
selected_components.add(
self.choose_binary_model(param_inpar, force_binary_model, allow_T2)
)
# 2. Get the component list from the parameters in the parfile.
# 2.1 Check the aliases of input parameters.
# This does not include the repeating parameters, but it should not
# matter in the component selection.
param_not_in_pint = [] # For parameters not initialized in PINT yet.
param_components_inpar = {}
for pp in param_inpar.keys():
try:
p_name, first_init = self.all_components.alias_to_pint_param(pp)
except UnknownParameter:
param_not_in_pint.append(pp)
continue
param_count[p_name] += 1
# For the case that the indexed parameter maps to a component, but
# the parameter with the provided index is not initialized yet.
if p_name != first_init:
param_not_in_pint.append(pp)
if p_cp := self.all_components.param_component_map.get(first_init, None):
param_components_inpar[p_name] = p_cp
# Back map the possible_components and the parameters in the parfile
# This will remove the duplicate components.
conflict_components = defaultdict(set) # graph for conflict
for k, cps in param_components_inpar.items():
# If `timing_model` in param --> component mapping skip
# Timing model is the base.
if "timing_model" in cps:
continue
# Check if it is a binary component, if yes, skip. It is controlled
# by the BINARY tag
if self.all_components.components[cps[0]].category == "pulsar_system":
if binary is None:
raise MissingBinaryError(
f"Pulsar binary/pulsar system model is"
f" decided by the parameter 'BINARY'. "
f" Please indicate the binary model "
f" before using parameter {k}, which is"
f" a binary model parameter."
)
else:
continue
if len(cps) == 1: # No conflict, parameter only shows in one component.
selected_components.add(cps[0])
continue
# Has conflict, same parameter shows in different components
# Only record the conflict here and do nothing, if there is any
# component unique parameter show in the parfile, the component will
# be selected.
if len(cps) > 1:
# Add conflict to the conflict graph
for cp in cps:
temp_cf_cp = copy.deepcopy(cps)
temp_cf_cp.remove(cp)
conflict_components[cp].update(set(temp_cf_cp))
continue
# Check if the selected component in the conflict graph. If it is
# remove the selected components with its conflict components.
for ps_cp in selected_components:
cf_cps = conflict_components.get(ps_cp)
if cf_cps is not None: # Had conflict, but resolved.
for cf_cp in cf_cps:
del conflict_components[cf_cp]
del conflict_components[ps_cp]
# Check if there are components from the same category
selected_cates = {}
for cp in selected_components:
cate = self.all_components.component_category_map[cp]
if cate in selected_cates:
exist_cp = selected_cates[cate]
raise TimingModelError(
f"Component '{cp}' and '{exist_cp}' belong to the"
f" same category '{cate}'. Only one component from"
f" the same category can be used for a timing model."
f" Please check your input (e.g., .par file)."
)
else:
selected_cates[cate] = cp
return selected_components, conflict_components, param_not_in_pint
[docs] def choose_binary_model(self, param_inpar, force_binary_model=None, allow_T2=False):
"""Choose the BINARY model based on the parfile.
Parameters
----------
param_inpar: dict
Dictionary of the unique parameters in .par file with the key is the
parfile line. :func:`parse_parfile` returns this dictionary.
force_binary_model : str, optional
When set to some binary model, like force_binary_model="DD", this
will override the binary model set in the parfile. Defaults to None
allow_T2 : bool, optional
Whether to convert a T2 binary model to an appropriate underlying
binary model. Default is False, and will throw an error upon
encountering the T2 binary model. If True, the binary model will be
converted to the most appropriate PINT-compatible binary model.
Returns
-------
str
Name of the binary component
Note
----
If the binary model does not have a PINT model (e.g. the T2 model), an
error is thrown with the suggested model that could replace it. If
allow_T2 is set to True, the most appropriate binary model is guessed
and used. If an appropriate model cannot be found, no suggestion is
given and an error is thrown.
"""
binary = param_inpar["BINARY"][0]
# Guess what the binary model should be, regardless of BINARY parameter
try:
binary_model_guesses = guess_binary_model(param_inpar)
except UnknownBinaryModel as e:
log.error(
"Unable to find suitable binary model that has all the"
"parameters in the parfile. Please fix the par file."
)
# Allow for T2 model, gracefully
if force_binary_model is not None and binary != "T2":
binary = force_binary_model
elif binary == "T2" and allow_T2:
binary = binary_model_guesses[0]
log.warning(
f"Found T2 binary model. Gracefully converting T2 to: {binary}."
)
# Make sure that DDK parameters are properly converted
convert_binary_params_dict(param_inpar, force_binary_model=binary)
try:
binary_cp = self.all_components.search_binary_components(binary)
except UnknownBinaryModel as e:
log.error(f"Could not find binary model {binary}")
log.info(
f"Compatible models with these parameters: {', '.join(binary_model_guesses)}."
)
# Re-raise the error, with an added guess for the binary model if we have one
if binary_model_guesses:
raise UnknownBinaryModel(
str(e), suggestion=binary_model_guesses[0]
) from None
raise
return binary_cp.__class__.__name__
def _setup_model(
self,
timing_model,
pint_param_dict,
original_name=None,
setup=True,
validate=True,
allow_tcb=False,
):
"""Fill up a timing model with parameter values and then setup the model.
This function fills up the timing model parameter values from the input
pintified parameter dictionary. If the parameter has not initialized yet,
it will add the parameter to the timing model. For the repeatable parameters,
it will search matching key value pair first. If the input parameter line's
key-value matches the existing parameter, the parameter value and uncertainty
will copy to the existing parameter. If there is no match, it will find an
empty existing parameter, whose `key` is `None`, and fill it up. If no empty
parameter left, it will add a new parameter to it.
Parameters
----------
timing_model : pint.models.TimingModel
Timing model to get setup.
pint_param_dict: dict
Pintified parfile dictionary which can be acquired by
:meth:`ModelBuilder._pintify_parfile`
original_name : dict, optional
A map from PINT name to the original input name.
setup : bool, optional
Whether to run the setup function in the timing model.
validate : bool, optional
Whether to run the validate function in the timing model.
allow_tcb : bool, optional
Whether to allow reading TCB par files
"""
use_alias = original_name is not None
for pp, v in pint_param_dict.items():
try:
par = getattr(timing_model, pp)
except AttributeError:
# since the input is pintfied, it should be an uninitialized indexed parameter
# double check if the missing parameter an indexed parameter.
pint_par, first_init = self.all_components.alias_to_pint_param(pp)
try:
prefix, _, index = split_prefixed_name(pint_par)
except PrefixError as e:
par_hosts = self.all_components.param_component_map[pint_par]
current_cp = timing_model.components.keys()
raise TimingModelError(
f"Parameter {pint_par} is recognized"
f" by PINT, but not used in the current"
f" timing model. It is used in {par_hosts},"
f" but the current timing model uses {current_cp}."
) from e
# TODO need to create a better API for _locate_param_host
host_component = timing_model._locate_param_host(first_init)
timing_model.add_param_from_top(
getattr(timing_model, first_init).new_param(index),
host_component[0][0],
)
par = getattr(timing_model, pint_par)
# Fill up the values
param_line = len(v)
if param_line < 2:
name = original_name[pp] if use_alias else pp
par.from_parfile_line(" ".join([name] + v))
else: # For the repeatable parameters
lines = copy.deepcopy(v) # Line queue.
# Check how many repeatable parameters in the model.
example_par = getattr(timing_model, pp)
prefix, _, index = split_prefixed_name(pp)
for li in lines:
# Create a temp parameter with the idx bigger than all the existing indices
repeatable_map = timing_model.get_prefix_mapping(prefix)
new_max_idx = max(repeatable_map.keys()) + 1
temp_par = example_par.new_param(new_max_idx)
temp_par.from_parfile_line(
" ".join([prefix + str(new_max_idx), li])
)
if use_alias: # Use the input alias as input
temp_par.use_alias = original_name[pp]
# Check current repeatable's key and value
# TODO need to change here when maskParameter name changes to name_key_value
empty_repeat_param = []
for idx, rp in repeatable_map.items():
rp_par = getattr(timing_model, rp)
if rp_par.compare_key_value(temp_par):
# Key and key value match, copy the new line to it
# and exit
rp_par.from_parfile_line(" ".join([rp, li]))
if use_alias: # Use the input alias as input
rp_par.use_alias = original_name[pp]
break
if rp_par.key is None:
# Empty space for new repeatable parameter
empty_repeat_param.append(rp_par)
# There is no current repeatable parameter matching the new line
# First try to fill up an empty space.
if not empty_repeat_param:
# No empty space, add a new parameter to the timing model.
host_component = timing_model._locate_param_host(pp)
timing_model.add_param_from_top(temp_par, host_component[0][0])
else:
emt_par = empty_repeat_param.pop(0)
emt_par.from_parfile_line(" ".join([emt_par.name, li]))
if use_alias: # Use the input alias as input
emt_par.use_alias = original_name[pp]
if setup:
timing_model.setup()
if validate:
timing_model.validate(allow_tcb=allow_tcb)
return timing_model
def _report_conflict(self, conflict_graph):
"""Report conflict components"""
for k, v in conflict_graph.items():
# Put all the conflict components together from the graph
cf_cps = list(v)
cf_cps.append(k)
raise ComponentConflict(f"Can not decide the one component from: {cf_cps}")
[docs]def get_model(
parfile,
allow_name_mixing=False,
allow_tcb=False,
allow_T2=False,
force_binary_model=None,
toas_for_tzr=None,
**kwargs,
):
"""A one step function to build model from a parfile.
Parameters
----------
parfile : str
The parfile name, or a file-like object to read the parfile contents from
allow_name_mixing : bool, optional
Flag for allowing the input to have mixing aliases names for the
same parameter. For example, if this flag is true, one can have
T2EFAC and EFAC, both of them maps to PINT parameter EFAC, present
in the parfile at the same time.
allow_tcb : True, False, or "raw", optional
Whether to read TCB par files. Default is False, and will throw an
error upon encountering TCB par files. If True, the par file will be
converted to TDB upon read. If "raw", an unconverted malformed TCB
TimingModel object will be returned.
allow_T2 : bool, optional
Whether to convert a T2 binary model to an appropriate underlying
binary model. Default is False, and will throw an error upon
encountering the T2 binary model. If True, the binary model will be
converted to the most appropriate PINT-compatible binary model.
force_binary_model : str, optional
When set to some binary model, like force_binary_model="DD", this will
override the binary model set in the parfile. Defaults to None
toas_for_tzr : TOAs or None, optional
If this is not None, a TZR TOA (AbsPhase) will be created using the
given TOAs object.
kwargs : dict
Any additional parameter/value pairs that will add to or override those in the parfile.
Returns
-------
Model instance get from parfile.
"""
model_builder = ModelBuilder()
try:
contents = parfile.read()
except AttributeError:
contents = None
if contents is not None:
return model_builder(
StringIO(contents),
allow_name_mixing,
allow_tcb=allow_tcb,
allow_T2=allow_T2,
force_binary_model=force_binary_model,
toas_for_tzr=toas_for_tzr,
**kwargs,
)
# # parfile is a filename and can be handled by ModelBuilder
# if _model_builder is None:
# _model_builder = ModelBuilder()
model = model_builder(
parfile,
allow_name_mixing,
allow_tcb=allow_tcb,
allow_T2=allow_T2,
force_binary_model=force_binary_model,
toas_for_tzr=toas_for_tzr,
**kwargs,
)
model.name = parfile
return model
[docs]def get_model_and_toas(
parfile,
timfile,
ephem=None,
include_bipm=None,
bipm_version=None,
include_gps=None,
planets=None,
usepickle=False,
tdb_method="default",
include_pn=True,
picklefilename=None,
allow_name_mixing=False,
limits="warn",
allow_tcb=False,
allow_T2=False,
force_binary_model=None,
add_tzr_to_model=True,
**kwargs,
):
"""Load a timing model and a related TOAs, using model commands as needed
Parameters
----------
parfile : str
The parfile name, or a file-like object to read the parfile contents from
timfile : str
The timfile name, or a file-like object to read the timfile contents from
ephem : str, optional
If not None (default), this ephemeris will be used to create the TOAs object.
Default is to use the EPHEM parameter from the timing model.
include_bipm : bool or None
Whether to apply the BIPM clock correction. Defaults to True.
bipm_version : string or None
Which version of the BIPM tables to use for the clock correction.
The format must be 'BIPMXXXX' where XXXX is a year.
include_gps : bool or None
Whether to include the GPS clock correction. Defaults to True.
planets : bool or None
Whether to apply Shapiro delays based on planet positions. Note that a
long-standing TEMPO2 bug in this feature went unnoticed for years.
Defaults to False.
usepickle : bool
Whether to try to use pickle-based caching of loaded clock-corrected TOAs objects.
tdb_method : str
Which method to use for the clock correction to TDB. See
:func:`pint.observatory.Observatory.get_TDBs` for details.
include_pn : bool, optional
Whether or not to read in the 'pn' column (``pulse_number``)
picklefilename : str or None
Filename to use for caching loaded file. Defaults to adding ``.pickle.gz`` to the
filename of the timfile, if there is one and only one. If no filename is available,
or multiple filenames are provided, a specific filename must be provided.
allow_name_mixing : bool, optional
Flag for allowing the input to have mixing aliases names for the
same parameter. For example, if this flag is true, one can have
T2EFAC and EFAC, both of them maps to PINT parameter EFAC, present
in the parfile at the same time.
limits : "warn" or "error"
What to do when encountering TOAs for which clock corrections are not available.
allow_tcb : True, False, or "raw", optional
Whether to read TCB par files. Default is False, and will throw an
error upon encountering TCB par files. If True, the par file will be
converted to TDB upon read. If "raw", an unconverted malformed TCB
TimingModel object will be returned.
allow_T2 : bool, optional
Whether to convert a T2 binary model to an appropriate underlying
binary model. Default is False, and will throw an error upon
encountering the T2 binary model. If True, the binary model will be
converted to the most appropriate PINT-compatible binary model.
force_binary_model : str, optional
When set to some binary model, like force_binary_model="DD", this
will override the binary model set in the parfile. Defaults to None
add_tzr_to_model : bool, optional
Create a TZR TOA in the timing model using the created TOAs object. Default is
True.
kwargs : dict
Any additional parameter/value pairs that will add to or override those in the parfile.
Returns
-------
A tuple with (model instance, TOAs instance)
"""
mm = get_model(
parfile,
allow_name_mixing,
allow_tcb=allow_tcb,
allow_T2=allow_T2,
force_binary_model=force_binary_model,
**kwargs,
)
tt = get_TOAs(
timfile,
include_pn=include_pn,
model=mm,
ephem=ephem,
include_bipm=include_bipm,
bipm_version=bipm_version,
include_gps=include_gps,
planets=planets,
usepickle=usepickle,
tdb_method=tdb_method,
picklefilename=picklefilename,
limits=limits,
)
if "AbsPhase" not in mm.components and add_tzr_to_model:
log.info("Creating a TZR TOA (AbsPhase) using the given TOAs object.")
mm.add_tzr_toa(tt)
return mm, tt
[docs]def guess_binary_model(parfile_dict):
"""Based on the PINT parameter dictionary, guess the binary model
Parameters
----------
parfile_dict
The parameter dictionary as read-in by parse_parfile
Returns
-------
list:
A priority-ordered list of possible binary models. The first one is the
best-guess
"""
def add_sini(parameters):
"""If 'KIN' is a model parameter, Tempo2 doesn't really use SINI"""
if "KIN" in parameters:
return list(parameters) + ["SINI"]
else:
return list(parameters)
all_components = AllComponents()
binary_models = all_components.category_component_map["pulsar_system"]
# Find all binary parameters
binary_parameters_map = {
all_components.components[binary_model].binary_model_name: add_sini(
all_components.search_binary_components(binary_model).aliases_map.keys()
)
for binary_model in binary_models
}
binary_parameters_map.update({"Isolated": []})
all_binary_parameters = {
parname for parnames in binary_parameters_map.values() for parname in parnames
}
# Find all parfile parameters
all_parfile_parameters = set(parfile_dict.keys())
# All binary parameters in the parfile
parfile_binary_parameters = all_parfile_parameters & all_binary_parameters
# Find which binary models include those
allowed_binary_models = {
binary_model
for (binary_model, bmc) in binary_parameters_map.items()
if len(parfile_binary_parameters - set(bmc)) == 0
}
# Now select the best-guess binary model
priority = [bm for bm in _binary_model_priority if bm in allowed_binary_models]
omitted = allowed_binary_models - set(priority)
return priority + list(omitted)
[docs]def convert_binary_params_dict(
parfile_dict, convert_komkin=True, drop_ddk_sini=True, force_binary_model=None
):
"""Convert the PINT parameter dictionary to include the best-guess binary
Parameters
----------
parfile_dict
The parameter dictionary as read-in by parse_parfile
convert_komkin
Whether or not to convert the KOM and KIN parameters
drop_ddk_sini
Whether to drop SINI when converting to the DDK model
force_binary_model : str, optional
When set to some binary model, like force_binary_model="DD", this will
override the binary model set in the parfile. Defaults to None
Returns
-------
A new parfile dictionary with the binary model replaced with the best-guess
model. For a conversion to DDK, this function also converts the KOM/KIN
parameters if they exist.
"""
binary = parfile_dict.get("BINARY", None)
binary = binary if not binary else binary[0]
log.debug(f"Requested to convert binary model for BINARY model: {binary}")
if binary:
if not force_binary_model:
binary_model_guesses = guess_binary_model(parfile_dict)
log.info(
f"Compatible models with these parameters: {', '.join(binary_model_guesses)}. Using {binary_model_guesses[0]}"
)
if not binary_model_guesses:
error_message = f"Unable to determine binary model for this par file"
log_message = (
f"Unable to determine the binary model based"
f"on the model parameters in the par file."
)
log.error(log_message)
raise UnknownBinaryModel(error_message)
else:
binary_model_guesses = [force_binary_model]
# Select the best-guess binary model
parfile_dict["BINARY"] = [binary_model_guesses[0]]
# Convert KIN if requested
if convert_komkin and "KIN" in parfile_dict:
log.info(f"Converting KOM to/from IAU <--> DT96: {parfile_dict['KIN']}")
log.debug(f"Converting KIN to/from IAU <--> DT96")
entries = parfile_dict["KIN"][0].split()
new_value = _convert_kin(float(entries[0]) * u.deg).value
parfile_dict["KIN"] = [" ".join([repr(new_value)] + entries[1:])]
# Convert KOM if requested
if convert_komkin and "KOM" in parfile_dict:
log.debug(f"Converting KOM to/from IAU <--> DT96")
entries = parfile_dict["KOM"][0].split()
new_value = _convert_kom(float(entries[0]) * u.deg).value
parfile_dict["KOM"] = [" ".join([repr(new_value)] + entries[1:])]
# Drop SINI if requested
if drop_ddk_sini and binary_model_guesses[0] == "DDK":
log.debug(f"Dropping SINI from DDK model")
parfile_dict.pop("SINI", None)
return parfile_dict