Commit c0dcada7 authored by Tobias Winchen's avatar Tobias Winchen

Add central_freq and band flip as hdf dataset attributes

parent 26f9fa21
......@@ -31,7 +31,10 @@ data_formats = {
"GatedSpectrometer:1": {
"ip": "",
"port": "",
"description": "Spead stream of integrated spectra."
"description": "Spead stream of integrated spectra.",
"central_freq": "",
"band_flip": ""
},
"MPIFR_EDD_Packetizer:1": {
"ip": "",
......
......@@ -139,7 +139,7 @@ class EDDHDFFileWriter(object):
self.__subscan = self._file.create_group(scanid)
def addData(self, section, data):
def addData(self, section, data, attributes = {}):
"""
Add data block to a section of the current subscan.
......@@ -147,6 +147,8 @@ class EDDHDFFileWriter(object):
section (str): Name of the section
data (dict):
data[did] needs to return the data for did in the selected format.
attributes (dict):
First occurence of any key will be added as attribute to the dataset.
It is assumed that the first data set is complete. Subsequent datasets with missing items are ignored.
......@@ -177,6 +179,11 @@ class EDDHDFFileWriter(object):
dataset.resize(tuple(shape))
dataset[-1] = data[did]
for key, value in attributes.items():
if key not in self.__subscan[section].attrs.keys():
_log.debug("Adding attribute: {} = {} to section {}".format(key, value, section))
self.__subscan[section].attrs[key] = value
def close(self):
"""
......
......@@ -433,6 +433,13 @@ class GatedSpectrometerPipeline(EDDPipeline):
cfs = json.dumps(self._config, indent=4)
log.info("Final configuration:\n" + cfs)
for l in self._config["output_data_streams"].values():
l["central_freq"] = self._config["input_data_streams"][0]["central_freq"]
l["band_flip"] = self._config["input_data_streams"][0]["band_flip"]
self._configUpdated()
self.__numa_node_pool = []
# remove numa nodes with missing capabilities
for node in numa.getInfo():
......
......@@ -222,17 +222,23 @@ class EDDHDF5WriterPipeline(EDDPipeline):
if "hdf5_group_prefix" in stream_description:
hdf5_group = stream_description["hdf5_group_prefix"]
if hdf5_group not in self.mc_subscriptions:
self.mc_subscriptions[hdf5_group] = dict(groups=[], port=stream_description['port'])
self.mc_subscriptions[hdf5_group] = dict(groups=[], port=stream_description['port'], attributes={})
self.mc_subscriptions[hdf5_group]['groups'].append(stream_description['ip'])
if self.mc_subscriptions[hdf5_group]['port'] != stream_description['port']:
raise RuntimeError("All input streams of one group have to use the same port!!!")
for key in stream_description:
if key in ["ip", "port"]:
continue
self.mc_subscriptions[hdf5_group]['attributes'][key] = stream_description[key]
def _package_writer(self, data):
if self._state == "measuring":
_log.info('Writing data to section: {}'.format(data[0]))
self._output_file.addData(data[0], data[1])
self._output_file.addData(data[0], data[1], )
else:
_log.debug("Not measuring, Dropping package")
......@@ -253,7 +259,7 @@ class EDDHDF5WriterPipeline(EDDPipeline):
self._capture_threads = []
for hdf5_group_prefix, mcg in self.mc_subscriptions.items():
spead_handler = GatedSpectrometerSpeadHandler(hdf5_group_prefix)
spead_handler = GatedSpectrometerSpeadHandler(hdf5_group_prefix, mcg['attributes'])
ct = SpeadCapture(mcg["groups"], mcg["port"],
self._capture_interface,
spead_handler, self._package_writer, affinity)
......@@ -323,6 +329,8 @@ class EDDHDF5WriterPipeline(EDDPipeline):
@coroutine
def stop(self):
"""
......@@ -446,11 +454,12 @@ class GatedSpectrometerSpeadHandler(object):
Parse heaps of gated spectrometer output from spead stream and create data dict.
"""
def __init__(self, group_prefix=""):
def __init__(self, group_prefix="", attributes={}):
# self.plottingQueue = queue.PriorityQueue()
#self.__delay = delay
self.__group_prefix = group_prefix
self.__attributes = attributes
#Description of heap items
# ToDo: move to gated spectrometer or whereever the stream format is
......@@ -465,6 +474,7 @@ class GatedSpectrometerSpeadHandler(object):
self.ig.add_item(5638, "sampling_rate", "", (6,), dtype=">u1")
self.ig.add_item(5639, "naccumulate", "", (6,), dtype=">u1")
def __call__(self, heap):
"""
handle heaps. Merge polarization heaps with matching timestamps and pass to output queue.
......@@ -518,7 +528,7 @@ class GatedSpectrometerSpeadHandler(object):
data['integration_time'] = np.array([number_of_input_samples / sampling_rate])
data['saturated_samples'] = np.array([-42])
return section_id, data
return section_id, data, self.attributes
if __name__ == "__main__":
......
......@@ -35,7 +35,6 @@ class TestEDDHdfFileWriter(unittest.TestCase):
self.assertEqual(len(infile['scan'].keys()), 2)
def test_gated_spectrometer_data_insert(self):
f = EDDHDFFileWriter()
self.addCleanup(os.remove, f.filename)
......@@ -46,8 +45,10 @@ class TestEDDHdfFileWriter(unittest.TestCase):
for n,d in gated_spectrometer_format(nchannels).items():
data[n] = np.empty(**d)
f.newSubscan()
f.addData('mysection', data)
attr = {'foo':'bar', 'nu': 3}
f.addData('mysection', data, attr)
f.close()
infile = h5py.File(f.filename, "r")
......@@ -61,6 +62,8 @@ class TestEDDHdfFileWriter(unittest.TestCase):
idx = data[k] == data[k]
self.assertTrue((data[k] == dataset[k][0])[idx].all())
self.assertEqual(dataset.attrs['foo'], 'bar')
self.assertEqual(dataset.attrs['nu'], 3)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment