"""Module to inspect JWST visit files.
Authors
-------
Johannes Sahlmann
Marshall Perrin
"""
from collections import OrderedDict
import copy
import re
from astropy.table import Table, join
import numpy as np
iswhitespace = lambda x: re.fullmatch("\s+", x) is not None
# set up regular expressions for parsing
rx_dict = {
'template': re.compile(r'# (?P<apt_templates>.*)'),
}
def _parse_line(line, rx_dict):
"""Do a regex search against all defined regexes.
Return the key and match result of the first matching regex.
See https://www.vipinajayakumar.com/parsing-text-with-python/
"""
for key, rx in rx_dict.items():
match = rx.search(line)
if match:
return key, match
# if there are no matches
return None, None
# Some lightweight classes for parsed information
[docs]class Statement(object):
"""Capture visit file statement delimited by a semicolon."""
def __init__(self, cmdstring, verbose=False):
cmdparts = cmdstring.split(' ,')
self.name = cmdparts[0]
self.args = cmdparts[1:]
try:
self.scriptname = cmdparts[2]
except IndexError:
self.scriptname = 'NONE'
if verbose:
for part in cmdparts:
print(" " + part)
def __repr__(self):
return ("<Statement {} >".format(self.name))
[docs]class Aux(Statement):
"""Capture statement identified by AUX keyword."""
def __init__(self, cmdstring, verbose=False):
super().__init__(cmdstring, verbose=verbose)
for param in self.args:
if '=' in param:
key, value = param.split('=')
try:
value = float(value)
except:
pass
self.__dict__[key] = value
[docs]class Dither(Statement):
"""Capture statement identified by DITHER keyword."""
def __init__(self, cmdstring, verbose=False):
super().__init__(cmdstring, verbose=verbose)
self.id = self.args[0].split('=')[1]
for param in self.args:
if '=' in param:
key, value = param.split('=')
try:
value = float(value)
except:
pass
self.__dict__[key] = value
[docs]class Momentum(Statement):
"""Capture statement identified by MOMENTUM keyword."""
def __init__(self, cmdstring, verbose=False):
super().__init__(cmdstring, verbose=verbose)
for param in self.args:
if '=' in param:
key, value = param.split('=')
try:
value = float(value)
except:
pass
self.__dict__[key] = value
[docs]class SlewOrAct(Statement):
"""Capture statement identified by SLEW or ACT keyword."""
def __init__(self, cmdstring, group=None, sequence=None, verbose=False):
super().__init__(cmdstring, verbose=verbose)
self.group = group
self.sequence = sequence
try:
self.activity = int(self.args[0], base=16) # note, these are base 16 hex numbers
except ValueError as e:
print('Activity number parsing raises ValueError:\n{}\nSetting to 99'.format(e))
self.activity = 99
for param in self.args[2:]:
if '=' in param:
key, value = param.split('=')
try:
value = float(value)
except:
pass
self.__dict__[key] = value
@property
def gsa(self):
"Group, sequence, activity"
return "{:02d}{:1d}{:02d}".format(self.group, self.sequence, self.activity)
[docs]class VisitDescription(Statement):
"""Capture statement identified by VISIT keyword."""
def __init__(self, cmdstring, verbose=False):
super().__init__(cmdstring, verbose=verbose)
self.id = self.args[0]
for param in self.args[1:]:
if '=' in param:
key, value = param.split('=')
try:
value = float(value)
except:
pass
self.__dict__[key] = value
[docs]class Activity(SlewOrAct):
"""Capture information related to activity."""
def __init__(self, cmdstring, *args, **kwargs):
super().__init__(cmdstring, *args, **kwargs)
# self.scriptname = self.args[1]
def __repr__(self):
return ("Activity {}: {}".format(self.gsa, self.describe()))
[docs] def describe(self):
if self.scriptname == 'NRCWFSCMAIN':
description = """{s.scriptname} {s.CONFIG} WFCGROUP={s.WFCGROUP}
Readout={s.NGROUPS:.0f} groups, {s.NINTS:.0f} ints
SW={s.FILTSHORTA}, LW={s.FILTLONGA}"""
elif self.scriptname == 'NRCMAIN':
description = """{s.scriptname} {s.CONFIG}
Readout={s.NGROUPS:.0f} groups, {s.NINTS:.0f} ints
SW={s.FILTSHORTA}, LW={s.FILTLONGA}"""
elif self.scriptname == 'NRCWFCPMAIN':
mod = self.CONFIG[3] # a or B
description = "{s.scriptname} {s.FILTSHORT" + mod + "}+{s.PUPILSHORT" + mod + \
"} Readout={s.NGROUPS:.0f} groups, {s.NINTS:.0f} ints"
elif self.scriptname == 'SCSAMMAIN':
description = """SCSAMMAIN dx={s.DELTAX}, dy={s.DELTAY}, dpa={s.DELTAPA}"""
elif self.scriptname == 'NRCSUBMAIN':
description = """NRCSUBMAIN subarray={s.SUBARRAY}"""
else:
description = """{s.scriptname}"""
try:
return description.format(s=self)
except AttributeError as e:
print('Activity {} raised AttributeError:\n{}'.format(self.scriptname, e))
[docs]class Guide(SlewOrAct):
"""Expand Guide statement."""
[docs] def describe(self):
if self.args[1] == 'FGSVERMAIN':
return ("Verification")
else:
detnum = self.DETECTOR[-1]
return """FGS{detnum}""".format(detnum=detnum, s=self)
def __repr__(self):
return ("Guide {}: {}".format(self.gsa, self.describe()))
[docs]class Slew(SlewOrAct):
"""Expand statement identified by SLEW keyword."""
def __repr__(self):
return (
"Slew {}: for {} on GS at ({}, {}) with PA={}".format(self.gsa, 'N/A', self.GSRA,
self.GSDEC, self.GSPA))
# "Slew {}: for {} on GS at ({}, {}) with PA={}".format(self.gsa, self.GUIDEMODE, self.GSRA,
# self.GSDEC, self.GSPA))
[docs]class Visit():
"""Class for JWST visit file information"""
def __init__(self, templates, statements, groups):
self.templates = templates
self.groups = groups
self.statements = statements
# store non-exposure related information in class asstributes
dithers = OrderedDict()
for statement in self.statements:
if statement.name == 'VISIT':
self.id = statement.id
self.visit_parameters = statement
elif statement.name == 'MOMENTUM':
self.momentum = statement
elif statement.name == 'AUX':
self.aux = statement
elif statement.name == 'DITHER':
dithers['{}'.format(statement.id)] = statement
self.dithers = dithers
# construct astropy table with basic content
script_statements = []
self.table = Table(names=('GROUP_ID', 'SEQ_ID', 'ACT_ID', 'GSA', 'TYPE', 'SCRIPT'),
dtype=(int, int, int, object, object, object))
for group_id, group in self.groups.items():
for seq_id, seq in group.items():
for statement in seq:
self.table.add_row((np.int(group_id.split('_')[1]), np.int(seq_id.split('_')[1]),
statement.activity, statement.gsa, statement.name, statement.scriptname))
script_statements.append(statement)
for colname in 'TYPE SCRIPT GSA'.split():
self.table[colname] = np.array(self.table[colname]).astype(str)
self.table.meta['comments']=['{}'.format(self.id)]
self.number_of_statements = len(self.table)
self.script_statements = np.array(script_statements)
def __repr__(self):
return (
"Visit {}: {:>2} dithers, {:>2} groups, {:>3} observation statements. Uses {}".format(self.id, len(self.dithers),
len(self.groups), self.number_of_statements, self.templates))
[docs] def overview_table(self, instrument=None):
"""Return an astropy table with specific information, one row per exposure/activity.
Parameters
----------
instrument : str
JWST instrument name, case insensitive
Returns
-------
table : astropy.table
Table with additional information extracted from the statements.
"""
table = copy.deepcopy(self.table)
if instrument.lower() == 'niriss':
remove_index = np.array([i for i, script in enumerate(self.table['SCRIPT']) if script[0:3] not in ['NIS', 'NRC']])
table.remove_rows(remove_index)
column_names = 'GSA OPMODE TARGTYPE DITHERID PATTERN NINTS NGROUPS PUPIL FILTER SUBARRAY'.split()
instrument_table = Table(names=tuple(column_names), dtype=tuple([object]*10))
for statement in self.script_statements:
if statement.gsa in table['GSA']:
row = [statement.gsa]
for colname in instrument_table.colnames[1:]:
try:
value = str(getattr(statement, colname))
except AttributeError:
value = 'NONE'
row.append(value)
instrument_table.add_row(vals=row)
for colname in instrument_table.colnames:
instrument_table[colname] = np.array(instrument_table[colname]).astype(str)
table = join(table, instrument_table, keys='GSA')
return table
[docs]def parse_visit_file(filename, verbose=False):
"""Read and parse the visit file line-by-line.
Parameters
----------
filename : str
Name fo file to parse
verbose : bool
verbosity
Returns
-------
visit : Visit object
Object containing all information extracted from the file.
"""
with open(filename) as file:
lines = file.readlines()
key, match = _parse_line(lines[0], rx_dict)
if key == 'template':
apt_templates = match.group('apt_templates').split(',')
# Simple parsing that ignores commands and newlines, but respects the fact that
# OSS parameters are separated by the exact string " ," with the comma necessarily after whitespace.
nocomments = [l.strip() for l in lines if not (l.startswith("#") or iswhitespace(l))]
for i in range(len(nocomments)):
if nocomments[i].startswith(','):
nocomments[i] = ' ' + nocomments[i]
merged = ''.join(nocomments)
commands = merged.split(';')
if verbose:
print(commands)
# Now iterate through the statements
groups = OrderedDict()
commands = np.array(commands)
# process initial statements
statements = []
for index,cmd in enumerate(commands):
if cmd == '':
continue
if verbose:
print(cmd)
print('*'*50)
parsedcmd = Statement(cmd)
if parsedcmd.name == 'GROUP':
break
elif parsedcmd.name == 'VISIT':
parsedcmd = VisitDescription(cmd)
elif parsedcmd.name == 'MOMENTUM':
parsedcmd = Momentum(cmd)
elif parsedcmd.name == 'AUX':
parsedcmd = Aux(cmd)
elif parsedcmd.name == 'DITHER':
parsedcmd = Dither(cmd)
statements.append(parsedcmd)
# process groups and sequences
for ii, cmd in enumerate(commands[index:]):
if cmd == '':
continue
parsedcmd = Statement(cmd)
if parsedcmd.name == 'GROUP':
ct_group = int(parsedcmd.args[0].split()[0])
groups['GROUP_{:02d}'.format(ct_group)] = OrderedDict()
continue
elif parsedcmd.name == 'SEQ':
ct_seq = int(parsedcmd.args[0].split()[0])
seq_statements = []
groups['GROUP_{:02d}'.format(ct_group)]['SEQ_{:02d}'.format(ct_seq)] = seq_statements
continue
elif parsedcmd.name == 'SLEW':
parsedcmd = Slew(cmd, group=ct_group, sequence=ct_seq)
elif parsedcmd.name == 'ACT':
if parsedcmd.args[1] == 'FGSMAIN' or parsedcmd.args[1] == 'FGSVERMAIN':
parsedcmd = Guide(cmd, group=ct_group, sequence=ct_seq)
else:
parsedcmd = Activity(cmd, group=ct_group, sequence=ct_seq)
seq_statements.append(parsedcmd)
return Visit(apt_templates, statements, groups)
[docs]def crosscheck_wfsc_visit_file(filename):
"""
TODO
----
update Perrin's function to updated code.
Parameters
----------
filename
Returns
-------
"""
statements, apt_template = parse_visit_file(filename)
print("==WFSC crosscheck for file {}==".format(filename))
print("From APT template: {}".format(apt_template))
# Check slew statements
slews = [s for s in statements if (isinstance(s, Slew) or isinstance(s, Guide))]
# Check activity statements
acts = [s for s in statements if isinstance(s, Activity)]
is_wfsc_visit = any(['WFSC' in a.scriptname for a in acts])
print("\nContains {} slew or guide statement(s):".format(len(slews)))
for s in slews:
print(" " + repr(s))
print("\nContains {} activity statement(s):".format(len(acts)))
act_by_num = dict()
for s in acts:
print(" " + repr(s))
act_by_num[s.gsa] = s
if is_wfsc_visit:
aux = [s for s in statements if s.name == 'AUX']
if len(aux) is 0:
raise RuntimeError("WFSC VISIT BUT NO AUX STATEMENT FOUND!")
# Check for presence of AUX statement