Commit 85c924bb authored by Ievgen Vovk's avatar Ievgen Vovk
Browse files

[gti_generator]: code refactoring (introduced GTIGenerator class).

parent 7686b08f
......@@ -153,129 +153,286 @@ def intersect_time_intervals(intervals1, intervals2):
return joined_intervals
class GTIGenerator:
def __init__(self, config=None, verbose=False):
self._config = config
self.verbose = verbose
@property
def config(self):
return self._config.copy()
@config.setter
def config(self, new_config):
if 'event_list' not in new_config:
raise ValueError('GTI generator error: the configuration dict is missing the "event_list" section.')
self._config = new_config.copy()
def gti_generator(config):
"""
GTI list generator.
def _itentify_data_taking_time_edges(self, file_mask, max_tdiff=1):
file_list = glob.glob(file_mask)
Parameters
----------
config: dict
Configuration dictionary (e.g. read from a YAML file).
if not file_list:
raise ValueError("GTI generator: no files to process")
Returns
-------
joint_intervals: list
A list of (TStart, TStop) lists, representing the identified GTIs.
mjd = scipy.zeros(0)
tdiff = scipy.zeros(0)
"""
if self.verbose:
info_message(f"identifying data taking time edges", "GTI generator")
if 'event_list' not in config:
raise ValueError('GTI generator error: the configuration file is missing the "event_list" section. Exiting.')
for fnum, fname in enumerate(file_list):
with uproot.open(fname) as input_stream:
if self.verbose:
info_message(f"processing file {fnum+1} / {len(file_list)}", "GTI generator")
# Identifying the files to read
info_message("looking for the files", "GTI generator")
file_mask = config["data_files"]["data"]["test_sample"]["magic2"]["input_mask"]
_tdiff = input_stream['Events']['MRawEvtHeader.fTimeDiff'].array()
_mjd = input_stream['Events']['MTime.fMjd'].array()
_millisec = input_stream['Events']['MTime.fTime.fMilliSec'].array()
_nanosec = input_stream['Events']['MTime.fNanoSec'].array()
file_list = glob.glob(file_mask)
_mjd = _mjd + (_millisec / 1e3 + _nanosec / 1e9) / 86400.0
mjd = scipy.concatenate((mjd, _mjd))
tdiff = scipy.concatenate((tdiff, _tdiff))
if not file_list:
raise ValueError("No files to process")
sort_args = mjd.argsort()
# Containers for the data points
dfs = {
'dc': pandas.DataFrame(),
'l3rate': pandas.DataFrame()
}
not_edge = tdiff < max_tdiff
# Removing the containers, not specified in the configuration card
if "l3rate" not in config['event_list']['cuts']:
del dfs['l3rate']
time_intervals = identify_time_edges(
mjd[sort_args],
not_edge[sort_args],
max_time_diff=max_tdiff / 86400.0
)
if "dc" not in config['event_list']['cuts']:
del dfs['dc']
return time_intervals
# Looping over the data files
for fnum, file_name in enumerate(file_list):
info_message(f"processing file {fnum+1} / {len(file_list)}", "GTI generator")
def _itentify_dc_time_edges(self, file_mask):
file_list = glob.glob(file_mask)
with uproot.open(file_name) as input_stream:
# --- DC ---
if "dc" in dfs:
if not file_list:
raise ValueError("GTI generator: no files to process")
if "dc" not in self.config['event_list']['cuts']:
raise ValueError("GTI generator: no DC cuts given")
if self.verbose:
info_message(f"identifying DC time edges", "GTI generator")
df = pandas.DataFrame()
# Looping over the data files
for fnum, file_name in enumerate(file_list):
if self.verbose:
info_message(f"processing file {fnum+1} / {len(file_list)}", "GTI generator")
with uproot.open(file_name) as input_stream:
mjd = input_stream["Camera"]["MTimeCamera.fMjd"].array()
millisec = input_stream["Camera"]["MTimeCamera.fTime.fMilliSec"].array()
nanosec = input_stream["Camera"]["MTimeCamera.fNanoSec"].array()
df_ = pandas.DataFrame()
df_['mjd'] = mjd + (millisec / 1e3 + nanosec / 1e9) / 86400
df_['value'] = input_stream["Camera"]["MReportCamera.fMedianDC"].array()
dfs['dc'] = dfs['dc'].append(df_)
# --- L3 rate ---
if "l3rate" in dfs:
df = df.append(df_)
df = df.sort_values(by=['mjd'])
cut = self.config['event_list']['cuts']['dc']
selection = df.query(cut)
_, idx, _ = scipy.intersect1d(df["mjd"], selection["mjd"], return_indices=True)
criterion = scipy.repeat(False, len(df["mjd"]))
criterion[idx] = True
time_intervals = identify_time_edges(
df["mjd"],
criterion,
max_time_diff=self.config['event_list']['max_time_diff']
)
return time_intervals
def _itentify_l3rate_time_edges(self, file_mask):
file_list = glob.glob(file_mask)
if not file_list:
raise ValueError("GTI generator: no files to process")
if "l3rate" not in self.config['event_list']['cuts']:
raise ValueError("GTI generator: no L3 rate cuts given")
if self.verbose:
info_message(f"identifying L3 rate time edges", "GTI generator")
df = pandas.DataFrame()
# Looping over the data files
for fnum, file_name in enumerate(file_list):
info_message(f"processing file {fnum+1} / {len(file_list)}", "GTI generator")
with uproot.open(file_name) as input_stream:
mjd = input_stream["Trigger"]["MTimeTrigger.fMjd"].array()
millisec = input_stream["Trigger"]["MTimeTrigger.fTime.fMilliSec"].array()
nanosec = input_stream["Trigger"]["MTimeTrigger.fNanoSec"].array()
df_ = pandas.DataFrame()
df_['mjd'] = mjd + (millisec / 1e3 + nanosec / 1e9) / 86400
df_['value'] = input_stream["Trigger"]["MReportTrigger.fL3Rate"].array()
dfs['l3rate'] = dfs['l3rate'].append(df_)
df = df.append(df_)
# Sorting data points by date is needed for the time intervals identification
for key in dfs:
dfs[key] = dfs[key].sort_values(by=['mjd'])
df = df.sort_values(by=['mjd'])
info_message("identifying GTIs", "GTI generator")
cut = self.config['event_list']['cuts']['l3rate']
selection = df.query(cut)
time_intervals_list = []
_, idx, _ = scipy.intersect1d(df["mjd"], selection["mjd"], return_indices=True)
# Identifying DC-related GTIs
if "dc" in dfs:
cut = config['event_list']['cuts']['dc']
criterion = scipy.repeat(False, len(df["mjd"]))
criterion[idx] = True
time_intervals = identify_time_edges(
df["mjd"],
criterion,
max_time_diff=self.config['event_list']['max_time_diff']
)
return time_intervals
selection = dfs['dc'].query(cut)
_, idx, _ = scipy.intersect1d(dfs['dc']["mjd"], selection["mjd"], return_indices=True)
def process_files(self, file_mask):
"""
GTI list generator.
criterion = scipy.repeat(False, len(dfs['dc']["mjd"]))
criterion[idx] = True
Parameters
----------
config: dict
Configuration dictionary (e.g. read from a YAML file).
time_intervals = identify_time_edges(dfs['dc']["mjd"], criterion, max_time_diff=config['event_list']['max_time_diff'])
Returns
-------
joint_intervals: list
A list of (TStart, TStop) lists, representing the identified GTIs.
time_intervals_list.append(time_intervals)
"""
# Identifying L3-related GTIs
if "l3rate" in dfs:
cut = config['event_list']['cuts']['l3rate']
selection = dfs['l3rate'].query(cut)
if not self.config:
raise ValueError("GTIGenerator: configuration is not set")
_, idx, _ = scipy.intersect1d(dfs['l3rate']["mjd"], selection["mjd"], return_indices=True)
# # Identifying the files to read
# info_message("looking for the files", "GTI generator")
# file_list = glob.glob(file_mask)
criterion = scipy.repeat(False, len(dfs['l3rate']["mjd"]))
criterion[idx] = True
# if not file_list:
# raise ValueError("No files to process")
# # Containers for the data points
# dfs = {
# 'dc': pandas.DataFrame(),
# 'l3rate': pandas.DataFrame()
# }
# # Removing the containers, not specified in the configuration card
# if "l3rate" not in config['event_list']['cuts']:
# del dfs['l3rate']
# if "dc" not in config['event_list']['cuts']:
# del dfs['dc']
# # Looping over the data files
# for fnum, file_name in enumerate(file_list):
# info_message(f"processing file {fnum+1} / {len(file_list)}", "GTI generator")
# with uproot.open(file_name) as input_stream:
# # --- DC ---
# if "dc" in dfs:
# mjd = input_stream["Camera"]["MTimeCamera.fMjd"].array()
# millisec = input_stream["Camera"]["MTimeCamera.fTime.fMilliSec"].array()
# nanosec = input_stream["Camera"]["MTimeCamera.fNanoSec"].array()
# df_ = pandas.DataFrame()
# df_['mjd'] = mjd + (millisec / 1e3 + nanosec / 1e9) / 86400
# df_['value'] = input_stream["Camera"]["MReportCamera.fMedianDC"].array()
# dfs['dc'] = dfs['dc'].append(df_)
# # --- L3 rate ---
# if "l3rate" in dfs:
# mjd = input_stream["Trigger"]["MTimeTrigger.fMjd"].array()
# millisec = input_stream["Trigger"]["MTimeTrigger.fTime.fMilliSec"].array()
# nanosec = input_stream["Trigger"]["MTimeTrigger.fNanoSec"].array()
# df_ = pandas.DataFrame()
# df_['mjd'] = mjd + (millisec / 1e3 + nanosec / 1e9) / 86400
# df_['value'] = input_stream["Trigger"]["MReportTrigger.fL3Rate"].array()
# dfs['l3rate'] = dfs['l3rate'].append(df_)
# # Sorting data points by date is needed for the time intervals identification
# for key in dfs:
# dfs[key] = dfs[key].sort_values(by=['mjd'])
# info_message("identifying GTIs", "GTI generator")
# time_intervals_list = []
# # Identifying DC-related GTIs
# if "dc" in dfs:
# cut = config['event_list']['cuts']['dc']
# selection = dfs['dc'].query(cut)
# _, idx, _ = scipy.intersect1d(dfs['dc']["mjd"], selection["mjd"], return_indices=True)
# criterion = scipy.repeat(False, len(dfs['dc']["mjd"]))
# criterion[idx] = True
# time_intervals = identify_time_edges(dfs['dc']["mjd"], criterion, max_time_diff=config['event_list']['max_time_diff'])
# time_intervals_list.append(time_intervals)
# # Identifying L3-related GTIs
# if "l3rate" in dfs:
# cut = config['event_list']['cuts']['l3rate']
# selection = dfs['l3rate'].query(cut)
# _, idx, _ = scipy.intersect1d(dfs['l3rate']["mjd"], selection["mjd"], return_indices=True)
# criterion = scipy.repeat(False, len(dfs['l3rate']["mjd"]))
# criterion[idx] = True
time_intervals = identify_time_edges(dfs['l3rate']["mjd"], criterion, max_time_diff=config['event_list']['max_time_diff'])
# time_intervals = identify_time_edges(dfs['l3rate']["mjd"], criterion, max_time_diff=config['event_list']['max_time_diff'])
time_intervals_list.append(time_intervals)
# time_intervals_list.append(time_intervals)
if not len(time_intervals_list):
raise ValueError("No valid selection cuts provided in the configuration file.")
time_intervals_list = [
self._itentify_data_taking_time_edges(file_mask),
self._itentify_dc_time_edges(file_mask),
self._itentify_l3rate_time_edges(file_mask)
]
joint_intervals = time_intervals_list[0]
joint_intervals = time_intervals_list[0]
# Joining all found GTIs
for i in range(1, len(time_intervals_list)):
joint_intervals = intersect_time_intervals(joint_intervals, time_intervals_list[i])
# Joining all found GTIs
for i in range(1, len(time_intervals_list)):
joint_intervals = intersect_time_intervals(joint_intervals, time_intervals_list[i])
return joint_intervals
return joint_intervals
# =================
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment