diff --git a/mpikat/pipelines/gs_plotter.py b/mpikat/pipelines/gs_plotter.py new file mode 100644 index 0000000000000000000000000000000000000000..518e19d749a41e0af411aea12126a9e4ebea67a6 --- /dev/null +++ b/mpikat/pipelines/gs_plotter.py @@ -0,0 +1,183 @@ +""" +""" +#Requires python 3.7+ and spead 3+ + +import asyncio +import copy +import json +import multiprocessing + +from mpikat.core.edd_pipeline_aio import EDDPipeline, launchPipelineServer, state_change +from mpikat.core import logger +from mpikat.utils.spead_capture import SpeadCapture, GatedSpectrometerSpeadHandler +from mpikat.pipelines.gated_spectrometer.data_stream import GatedSpectrometerDataStreamFormat +import mpikat.utils.data_streams as ds +import mpikat.utils.numa as numa + +_log = logger.getLogger("mpikat.pipelines.gs_plotter") + +_DEFAULT_CONFIG = { + "id":"gs_plotter", + "type":"gs_plotter", + "input_data_streams": [ + { + "source": "gated_spectrometer_0:polarization_0_0", + "hdf5_group_prefix": "S", + "format": "GatedSpectrometer:1", + "ip": "225.0.1.173", + "port": "7152" + }, + { + "source": "gated_spectrometer_0:polarization_0_1", + "hdf5_group_prefix": "S", + "format": "GatedSpectrometer:1", + "ip": "225.0.1.172", + "port": "7152" + }, + { + "source": "gated_spectrometer_1:polarization_1_0", + "hdf5_group_prefix": "S", + "format": "GatedSpectrometer:1", + "ip": "225.0.1.175", + "port": "7152" + }, + { + "source": "gated_spectrometer_1:polarization_1_1", + "hdf5_group_prefix": "S", + "format": "GatedSpectrometer:1", + "ip": "225.0.1.174", + "port": "7152" + }, + { + "source": "gated_spectrometer_0:polarization_0_0", + "hdf5_group_prefix": "S", + "format": "GatedSpectrometer:1", + "ip": "225.0.1.176", + "port": "7152" + }, + { + "source": "gated_spectrometer_0:polarization_0_1", + "hdf5_group_prefix": "S", + "format": "GatedSpectrometer:1", + "ip": "225.0.1.177", + "port": "7152" + }, + { + "source": "gated_spectrometer_1:polarization_1_0", + "hdf5_group_prefix": "S", + "format": "GatedSpectrometer:1", + "ip": "225.0.1.178", + "port": "7152" + }, + { + "source": "gated_spectrometer_1:polarization_1_1", + "hdf5_group_prefix": "S", + "format": "GatedSpectrometer:1", + "ip": "225.0.1.179", + "port": "7152" + } + ], + "spead_config":copy.deepcopy(SpeadCapture.DEFAULT_CONFIG) +} + + +class GSPlotter(EDDPipeline): + """ + Write EDD Data Streams to HDF5 Files. + """ + def __init__(self, ip, port, loop=None): + """ + Args: + ip: IP accepting katcp control connections from. + port: Port number to serve on + """ + super().__init__(ip, port, default_config=_DEFAULT_CONFIG, loop=loop) + self._capture_thread = None + self._data_plotter = None + + def setup_sensors(self): + """ + Setup monitoring sensors + """ + super().setup_sensors() + + + # --------------------- # + # State change routines # + # --------------------- # + @state_change(target="configured", allowed=["idle"], intermediate="configuring") + async def configure(self): + """Configures the HDF5Writer pipeline + + - Retrieves the address of the network interface card + - Instantiates a DataPlotter object + """ + _log.info("Configuring HDF5 Writer") + nic_name, nic_description = numa.getFastestNic() + _log.info("Capturing on interface %s, ip: %s, speed: %d Mbit/s", + nic_name, nic_description['ip'], nic_description['speed']) + self._config['spead_config']['interface_address'] = nic_description['ip'] + self._config['spead_config']['numa_affinity'] = numa.getInfo()[nic_description['node']]['cores'] + + qmanager = multiprocessing.Manager() + self.plotting_queue = qmanager.Queue() + converter = ds.DataStreamConverter(GatedSpectrometerDataStreamFormat(), + ds.RedisSpectrumStreamFormat(), + {"channels":1024}) + + self.redis_sender = ds.RedisJSONSender(self._config["data_store"]["ip"], + self._config["data_store"]["port"], 2, self._config["id"], + self.plotting_queue, callback=converter.convert) + self.redis_sender.connect() + self.redis_sender.start() + + self._capture_thread = SpeadCapture(self._config["input_data_streams"], + GatedSpectrometerSpeadHandler(), + self._package_writer, + self._config['spead_config']) + + _log.info("Final configuration:\n%s", json.dumps(self._config, indent=4)) + + @state_change(target="streaming", allowed=["configured"], intermediate="capture_starting") + async def capture_start(self): + """Starts the capturing of data + """ + _log.info("Starting capture") + self._capture_thread.start() + + + @state_change(target="idle", allowed=["ready", "streaming", "deconfiguring"], intermediate="capture_stopping") + async def capture_stop(self): + """Stop the data/network capturing + """ + self._stop_capturing() + + + @state_change(target="idle", intermediate="deconfiguring", error='panic') + async def deconfigure(self): + """Deconfigures the pipeline + """ + self._stop_capturing() + + def _stop_capturing(self): + _log.debug("Stopping capturing") + if self._capture_thread: + self._capture_thread.stop() + self._capture_thread.join(10.0) + self._capture_thread = None + _log.debug("Capture thread cleaned") + + + def _package_writer(self, packet): + """ + Writes a received heap to the writer queue and the data plotter + Args: + packet (dict): packet containing a heap + """ + _log.trace("Writing package") + self.plotting_queue.put(packet) + + + +if __name__ == "__main__": + asyncio.run(launchPipelineServer(GSPlotter)) diff --git a/mpikat/utils/data_streams/__init__.py b/mpikat/utils/data_streams/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..a78f45b5cd71660baa94857e58667ae71d9fd473 --- /dev/null +++ b/mpikat/utils/data_streams/__init__.py @@ -0,0 +1,9 @@ +from mpikat.utils.data_streams.redis_sender import RedisJSONSender +from mpikat.utils.data_streams.redis_spectrum import RedisSpectrumStreamFormat +from mpikat.utils.data_streams.converter import DataStreamConverter + +__all__ = [ + "RedisJSONSender", + "DataStreamConverter", + "RedisSpectrumStreamFormat" +] \ No newline at end of file diff --git a/mpikat/utils/data_streams/converter.py b/mpikat/utils/data_streams/converter.py new file mode 100644 index 0000000000000000000000000000000000000000..1f232d6b6c7aa6c68bf95b8b349cd7436261df3b --- /dev/null +++ b/mpikat/utils/data_streams/converter.py @@ -0,0 +1,80 @@ +import numpy as np +import base64 + +from mpikat.utils.spead_capture import SpeadPacket +from mpikat.core.data_stream import DataStream +from mpikat.utils.data_streams import RedisSpectrumStreamFormat +from mpikat.pipelines.gated_spectrometer.data_stream import GatedSpectrometerDataStreamFormat +from mpikat.core.logger import getLogger + +_log = getLogger("mpikat.utils.streams.converter") + + +def deserialize_array(serialized: str, dtype: str="float32", encoding: str="utf-8"): + """Decodes a b64 string into a numpy array + + Args: + serialized (str): the b64 encoded string + dtype (str, optional): The data type of the numpy array. Defaults to "float32". + encoding (str, optional): The encoding type. Defaults to "utf-8". + + Returns: + np.ndarray: The decoded numpy array + """ + return np.frombuffer(base64.b64decode(serialized.encode(encoding)), dtype=dtype) + +def serialize_array(array: np.ndarray, encoding: str="utf-8") -> str: + """Encodes a numpy array into b64 encoded string + + Args: + array (np.ndarray): The numpy array to encode + encoding (str, optional): The encoding type. Defaults to "utf-8". + + Returns: + str: the encoded string + """ + return base64.b64encode(array.tobytes()).decode(encoding) + +class DataStreamConverter: + + def __init__(self, istream: DataStream, ostream: DataStream, conf: dict=None): + self.istream = istream + self.ostream = ostream + self.conf = conf + + def convert(self, data: any) -> any: + """_summary_ + + Args: + data (any): _description_ + """ + if isinstance(self.istream, GatedSpectrometerDataStreamFormat) and isinstance(self.ostream, RedisSpectrumStreamFormat): + return self.from_gated_spectrometer_to_redis_spectrum(data) + else: + _log.error("Conversion from %s to %s not implemented", type(self.istream), type(self.ostream)) + return None + + def from_gated_spectrometer_to_redis_spectrum(self, idata: SpeadPacket) -> dict: + """_summary_ + + Args: + idata (SpeadPacket): _description_ + + Returns: + tuple: _description_ + """ + streamtype_prefixes = {0: 'P', 1: 'S'} + self.ostream.data_items["name"] = f"{streamtype_prefixes[idata.type]}{idata.polarization}_ND{idata.noise_diode_status}" + spectrum = idata.data[1:].reshape( + [self.conf["channels"], (idata.data.size -1) // self.conf["channels"]]).sum(axis=1) + if np.isfinite(spectrum).all(): + self.ostream.data_items["interval"] = idata.number_of_input_samples / idata.sampling_rate + self.ostream.data_items["timestamp"] = idata.reference_time + self.ostream.data_items["data"] = serialize_array(spectrum) + self.ostream.data_items["f_min"] = 0 #int(self.istream.stream_meta["central_freq"]) - int(idata.sampling_rate) / 4 + self.ostream.data_items["f_max"] = 1024 #int(self.istream.stream_meta["central_freq"]) + int(idata.sampling_rate) / 4 + return self.ostream.data_items + return None + + + diff --git a/mpikat/utils/data_streams/redis_sender.py b/mpikat/utils/data_streams/redis_sender.py new file mode 100644 index 0000000000000000000000000000000000000000..ce870ce022116649c558afc263a7ee81feec8a6e --- /dev/null +++ b/mpikat/utils/data_streams/redis_sender.py @@ -0,0 +1,84 @@ + +import multiprocessing as mp +import redis + +from mpikat.core.logger import getLogger + +_LOG = getLogger("mpikat.utils.data_streams.redis_sender") + +class RedisJSONSender(mp.Process): + """ + A multprocessing.Process implementation to calculate spectra and send them to a redis server via pub/sub + """ + + def __init__(self, ip: str, port: int, db: int, product_name: str, + queue:mp.Queue, callback: callable=None): + """ + Constructor + """ + super().__init__() + self.pname = product_name + self.ip = ip + self.port = port + self.db = db + self.streams = set() + self.callback = callback + self._quit = mp.Event() + self._queue = queue + self._connected = False + + def connect(self): + """Connects the stream to the redis database + """ + try: + _LOG.info("Connecting to redis %s:%d/%d", self.ip, self.port, self.db) + self._redis = redis.Redis(self.ip, self.port, db=self.db) + self._redis.ping() + except Exception as e: + raise ConnectionError("Connection to redis %s:%d/%d failed with %s", + self.ip, self.port, self.db, e) + self._client = self._redis.json() + self._connected = True + + def send_json(self, d: dict) -> None: + """Sends a dictonary to the connect redis instance via redisJSON + + Args: + d (dict): The dictionary to send + """ + if self._connected: + _LOG.info("Sending stream %s to DB", d["name"]) + ds_name = f"{self.pname}:{d['name']}_stream" + self._client.set(ds_name, '$', d) + self.streams.add(ds_name) + return + raise ConnectionError("Not connected to redis") + + def stop(self): + """Stops the transmission by setting mp.Event() + """ + self._quit.set() + # Remove entry after stop + for name in self.streams: + self._client.delete(name) + self.streams = set() + + def run(self): + """ + Function to run the processing and transmission of spectra + """ + _LOG.info(f"Starting stream transmission in {mp.current_process().name}") + while not self._quit.is_set(): + + if self._queue.full(): + _LOG.warning("Queue is full, flushing..") + while not self._queue.empty(): + self._queue.get() + + if not self._queue.empty(): + if self.callback is not None: + qitem = self.callback(self._queue.get()) + else: + qitem = self._queue.get() + if isinstance(qitem, dict): + self.send_json(qitem) diff --git a/mpikat/utils/data_streams/redis_spectrum/__init__.py b/mpikat/utils/data_streams/redis_spectrum/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..a44c9036bcae583933e3d7d671d681508d674c53 --- /dev/null +++ b/mpikat/utils/data_streams/redis_spectrum/__init__.py @@ -0,0 +1,5 @@ +from mpikat.utils.data_streams.redis_spectrum.data_stream import RedisSpectrumStreamFormat + +__all__ = [ + "RedisSpectrumStreamFormat" +] \ No newline at end of file diff --git a/mpikat/utils/data_streams/redis_spectrum/data_stream.py b/mpikat/utils/data_streams/redis_spectrum/data_stream.py new file mode 100644 index 0000000000000000000000000000000000000000..4647c292e307df18f07570a2b8b33cac834c9a29 --- /dev/null +++ b/mpikat/utils/data_streams/redis_spectrum/data_stream.py @@ -0,0 +1,26 @@ +from mpikat.core.data_stream import DataStream, DataStreamRegistry + +class RedisSpectrumStreamFormat(DataStream): + """ + Format description of the DigitalDownConverter + """ + + stream_meta = { + "type": "RedisSpectrumStream:1", + "ip": "", + "port": "", + "description": "Redis json stream of integrated spectra.", + "central_freq": "", + "receiver_id": "", + "band_flip": "" + } + + data_items = { + "timestamp":0, + "interval":0, + "f_min":0, + "f_max":0, + "data":"", + } + +DataStreamRegistry.register(RedisSpectrumStreamFormat) \ No newline at end of file diff --git a/mpikat/utils/testing/mock_redis_server.py b/mpikat/utils/testing/mock_redis_server.py index 19eae76180ff87621c71947e12c4fa4fa2a780ee..f05ce0eb61601245e09c87a8cce5592813ec57db 100644 --- a/mpikat/utils/testing/mock_redis_server.py +++ b/mpikat/utils/testing/mock_redis_server.py @@ -4,7 +4,7 @@ from mpikat.core import logger _log = logger.getLogger("mpikat.testEddPipeline") -def setup_redis(host=""): +def setup_redis(host: str="") -> redislite.Redis: """ Set up a local Redis mock-server """ diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/test_data_stream_converter.py b/tests/test_data_stream_converter.py new file mode 100644 index 0000000000000000000000000000000000000000..ad7b6c50c70e792d6fe4f9f574a0ce85c92d6b79 --- /dev/null +++ b/tests/test_data_stream_converter.py @@ -0,0 +1,27 @@ +import unittest +import numpy as np +import mpikat.utils.data_streams.converter as cv +from mpikat.core.logger import getLogger + + +LOG = getLogger("mpikat.dbbc.utils.spectrum_stream") + +class Test_HelperFunctions(unittest.TestCase): + """ + Test all helper functions implemented in dbbc.utils.redis_sender.py + """ + + def test_serialize_deserialze(self): + """ + Should test if the serialization and deserialization + of a np.ndarray to base64 works. + """ + arr = np.random.rand(2048) + enc = cv.serialize_array(arr) + res = cv.deserialize_array(enc, dtype=arr.dtype) + self.assertTrue(isinstance(enc, str)) + self.assertTrue(np.all(arr == res)) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_pipeline_states.py b/tests/test_pipeline_states.py index f8c5a577f2ffbab68fa01234057f4d1011bc8be2..16306599e673b356a67acea9b07d98e92af7de71 100644 --- a/tests/test_pipeline_states.py +++ b/tests/test_pipeline_states.py @@ -12,6 +12,7 @@ from mpikat.pipelines.master_controller import EddMasterController from mpikat.pipelines.fits_interface import FitsInterfaceServer from mpikat.pipelines.hdf5_writer import HDF5Writer from mpikat.pipelines.mock_fits_writer import MockFitsWriter +from mpikat.pipelines.gs_plotter import GSPlotter @@ -25,6 +26,9 @@ class TEST_GatedDualPolSpectrometer_StateModel(Test_StateModel): class TEST_GatedFullStokesSpectrometer_StateModel(Test_StateModel): def setUp(self): super().setUpBase(GatedFullStokesSpectrometerPipeline("localhost", 1234), True) +class TEST_GSPlotter_StateModel(Test_StateModel): + def setUp(self): + super().setUpBase(GSPlotter("localhost", 1234), True) class TEST_FitsInterfaceServer_StateModel(Test_StateModel): def setUp(self): @@ -71,6 +75,13 @@ class TEST_GatedFullStokesSpectrometer_StateModelOnRequests(Test_StateModelOnReq async def asyncSetUp(self): await super().setUpBase(GatedFullStokesSpectrometerPipeline, True) +class TEST_GSPlotter_StateModelOnRequests(Test_StateModelOnRequests): + def setUp(self) -> None: + return super().setUp() + + async def asyncSetUp(self): + await super().setUpBase(GSPlotter, True) + class TEST_HDF5Writer_StateModelOnRequests(Test_StateModelOnRequests): def setUp(self) -> None: return super().setUp() diff --git a/tests/test_redis_sender.py b/tests/test_redis_sender.py new file mode 100644 index 0000000000000000000000000000000000000000..0904014484f756947f44ad972278e9b5936557a2 --- /dev/null +++ b/tests/test_redis_sender.py @@ -0,0 +1,75 @@ +import unittest +import unittest.mock as mock +import multiprocessing as mp +import time +from mpikat.utils.testing import setup_redis +from mpikat.utils.data_streams import RedisJSONSender + +IP = "127.0.0.1" +DB = 0 +NAME = "TESTOBJ" + +class Test_RedisJSONStream(unittest.TestCase): + + def setUp(self) -> None: + self.queue = mp.Queue() + self.redis_server = setup_redis(IP) + self.port = self.redis_server.server_config["port"] + self.tobj = RedisJSONSender(IP, self.port, DB, NAME, self.queue) + + def tearDown(self) -> None: + if self.redis_server.running: + self.redis_server.shutdown() + + def test_connect_server_reachable(self): + self.tobj.connect() + + def test_connect_server_unreachable(self): + self.redis_server.shutdown() + with self.assertRaises(ConnectionError): + self.tobj.connect() + + def test_send_not_connected(self): + test_dict = {"cheese":"yammi"} + with self.assertRaises(ConnectionError): + self.tobj.send_json(test_dict) + + def test_send_bad_dictonary(self): + test_dict = {"cheese":"yammi"} + self.tobj.connect() + with self.assertRaises(KeyError): + self.tobj.send_json(test_dict) + + # Test below not supported by redislite instance because JSON.SET is not available + # def test_send_good_dictonary(self): + # test_dict = {"name":"test","cheese":"yammi"} + # self.tobj.connect() + # self.tobj.send_json(test_dict) + # client = redis.Redis(IP, self.port, DB) + # self.assertEqual(test_dict, client.get(self.tobj.streams[-1])) + + def test_start_and_stop(self): + self.tobj.start() + self.assertEqual(self.tobj.is_alive(), True) + self.tobj.stop() + time.sleep(1) + self.assertEqual(self.tobj.is_alive(), False) + self.assertEqual(self.tobj.streams, set()) + + def test_queue_fill(self): + self.tobj.send_json = mock.MagicMock(name="send_json") + self.tobj.start() + self.assertEqual(self.tobj.is_alive(), True) + for __ in range(10): + self.queue.put({"name":"test","cheese":"yammi"}) + # self.assertEqual(self.tobj.send_json., 10) + self.tobj.stop() + time.sleep(1) + self.assertEqual(self.tobj.is_alive(), False) + + + # def test_send(self): + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_spead_capture.py b/tests/test_spead_capture.py index 260267b38ec410aa6dfd2ed31e67ca0a77a2c317..740fe24e40e47d634909d0023b94fc42b4ac8bd4 100644 --- a/tests/test_spead_capture.py +++ b/tests/test_spead_capture.py @@ -1,60 +1,194 @@ -# pylint: disable=protected-access,missing-function-docstring,too-few-public-methods,missing-class-docstring - -import unittest +from threading import Thread, Event import time -import numpy as np - -from mpikat.utils.spead_capture import GatedSpectrometerSpeadHandler -from mpikat.core.data_stream import convert48_64, convert64_48 -from mpikat.pipelines.gated_spectrometer.data_stream import GatedSpectrometerDataStreamFormat - -class TestGatedSpectrometerSpeadhandler(unittest.TestCase): - def setUp(self): - self.handler = GatedSpectrometerSpeadHandler() - - def test_initialize(self): - items = {d.name: d for d in GatedSpectrometerDataStreamFormat.data_items} - - D = items.pop('data') - - # initialize installes a timestamp handler based on given sync time and - # sampling rate if data is not in items (i.e. on first valid item) - items['sync_time'].value = convert64_48(123) - items['sampling_rate'].value = convert64_48(4_000_000_000) - items['fft_length'].value = convert64_48(4_096) +import json +import spead2 +import spead2.recv +import packaging.version - self.assertIsNone(self.handler.timestamp_handler) - self.assertTrue(self.handler.initialize(items)) - self.assertIsNotNone(self.handler.timestamp_handler) - - items['data'] = D - self.assertFalse(self.handler.initialize(items)) - - def test_build_packet(self): - items = {d.name: d for d in GatedSpectrometerDataStreamFormat.data_items} - D = items.pop('data') - - items['sync_time'].value = convert64_48(int(time.time())) - items['sampling_rate'].value = convert64_48(4_000_000_000) - items['fft_length'].value = convert64_48(4_096) - items['naccumulate'].value = convert64_48(2_048) - - items["number_of_input_samples"].value = convert64_48(45) - items["timestamp_count"].value = convert64_48(123) - items["polarization"].value = convert64_48(1) - items["noise_diode_status"].value = convert64_48(1) - items["number_of_input_samples"].value = convert64_48(42) - items["number_of_saturated_samples"].value = convert64_48(23) - items["type"] .value= convert64_48(0) - - D.value = np.zeros(4_096) - self.handler.initialize(items) - items['data'] = D - - packet = self.handler.build_packet(items) +import mpikat.core.logger +from mpikat.core.data_stream import convert48_64 +from mpikat.utils.timestamp_handler import TimestampHandler +from mpikat.pipelines.gated_spectrometer.data_stream import GatedSpectrometerDataStreamFormat -if __name__ == '__main__': - unittest.main() +_log = mpikat.core.logger.getLogger("mpikat.utils.spead_capture") + + +class SpeadCapture(Thread): + """ + Captures heaps from one or more streams that are transmitted in SPEAD + format and passes them from spead_handler to the package_handler. + """ + DEFAULT_CONFIG = { + "numa_affinity": [], + "interface_address": "", + "memory_pool": { + "lower": 16384, + "upper": (2*4*16*1024**2)+1024, + "max_free": 128, + "initial": 128}, + "threadpoolsize": 1, # yes, as we only have one stream. Should use multiple streams otherwise + "stream_config_max_heaps": 64, + "ringstream_size": 128, + } + + def __init__(self, input_streams, speadhandler, package_handler, spead_config=None): + """ + Args: + mc_ip: Array of multicast group IPs + mc_port: Port number of multicast streams + speadhandler: Object that handles data in the stream + numa_affinity: List of NUMA cores used for affinity of spead2 capture threads + """ + Thread.__init__(self, name=self.__class__.__name__) + if spead_config is None: + spead_config = self.DEFAULT_CONFIG + + self._stop_event = Event() + self._spead_handler = speadhandler + self._package_handler = package_handler + + _log.debug("Initializing SpeadCapture with config:\n%s", json.dumps(spead_config, indent=4)) + + if packaging.version.parse(spead2.__version__) >= packaging.version.parse("3.2.0"): + thread_pool = spead2.ThreadPool(threads=spead_config['threadpoolsize'], affinity=[int(k) for k in spead_config["numa_affinity"]]) + pool = spead2.MemoryPool(lower=spead_config['memory_pool']['lower'], upper=spead_config['memory_pool']['upper'], max_free=spead_config['memory_pool']['max_free'], initial=spead_config['memory_pool']['initial']) + stream_config = spead2.recv.StreamConfig(bug_compat=spead2.BUG_COMPAT_PYSPEAD_0_5_2, max_heaps=spead_config["stream_config_max_heaps"], memory_allocator=pool, allow_out_of_order=True) # pylint: disable=no-member + ring_stream_config = spead2.recv.RingStreamConfig(heaps=spead_config["ringstream_size"], contiguous_only=False) # pylint: disable=no-member + self.stream = spead2.recv.Stream(thread_pool, stream_config, ring_stream_config) + else: + _log.warning("Using old spead2 version, %s", spead2.__version__) + thread_pool = spead2.ThreadPool(threads=spead_config['threadpoolsize'], affinity=[int(k) for k in spead_config["numa_affinity"]]) + self.stream = spead2.recv.Stream(thread_pool, spead2.BUG_COMPAT_PYSPEAD_0_5_2, max_heaps=spead_config["stream_config_max_heaps"], ring_heaps=spead_config["ringstream_size"], contiguous_only=False) + pool = spead2.MemoryPool(lower=spead_config['memory_pool']['lower'], upper=spead_config['memory_pool']['upper'], max_free=spead_config['memory_pool']['max_free'], initial=spead_config['memory_pool']['initial']) + self.stream.set_memory_allocator(pool) + + _log.debug("Subscribe to multicast groups:") + for i, ips in enumerate(input_streams): + _log.debug(" - Subs {}: ip: {}, port: {}".format(i, ips["ip"], ips["port"])) + self.stream.add_udp_reader(ips["ip"], int(ips["port"]), max_size=9200, + buffer_size=1073741820, interface_address=spead_config["interface_address"]) + + self.nheaps = 0 + self.incomplete_heaps = 0 + self.complete_heaps = 0 + + def stop(self): + """ + Stop the capture thread + """ + self.stream.stop() + self._stop_event.set() + + def run(self): + """ + Subscribe to MC groups and start processing. + """ + _log.debug("Start processing heaps:") + self.nheaps = 0 + self.incomplete_heaps = 0 + self.complete_heaps = 0 + + while not self._stop_event.is_set(): + try: + heap = self.stream.get_nowait() + except spead2.Empty: + time.sleep(0.5) + continue + + self.nheaps += 1 + _log.trace('Received heap %i - previously %i completed, %i incompleted', self.nheaps, self.complete_heaps, self.incomplete_heaps) + if isinstance(heap, spead2.recv.IncompleteHeap): + _log.warning('Received incomplete heap - received only %i / %i bytes.', heap.received_length, heap.heap_length) + self.incomplete_heaps += 1 + continue + self.complete_heaps += 1 + try: + package = self._spead_handler(heap) + except Exception as E: + _log.error("Error handling heap. Cought exception:\n{}".format(E)) + + try: + self._package_handler(package) + except Exception as E: + _log.error("Error handling decoded data package. Cought exception:\n{}".format(E)) + + _log.debug('Stream done.') + + + +class SpeadPacket: # pylint: disable=too-few-public-methods + """ + Contains the SPEAD items as members with conversion. + """ + def __init__(self, items): + self.integration_period = None + self.reference_time = None + + for item in items.values(): + if item.name != 'data': + setattr(self, item.name, convert48_64(item.value)) + else: + setattr(self, item.name, item.value) + + + + +class GatedSpectrometerSpeadHandler: + """ + Parse heaps of gated spectrometer output from spead2 stream and returns a data object. + """ + def __init__(self): + self.timestamp_handler = None + self.ig = GatedSpectrometerDataStreamFormat.create_item_group() + + def __call__(self, heap): + """ + Crate an SpeadPacket object containing interpreted output data + """ + _log.trace("Unpacking heap") + items = self.ig.update(heap) + + if self.initialize(items): + # Reprocess heap to get data + items = self.ig.update(heap) + + return self.build_packet(items) + + def initialize(self, items): + """ + Initialize handler on format update + """ + if not GatedSpectrometerDataStreamFormat.ig_update(items, self.ig): + return False + _log.debug('Initializing handler') + sync_time = convert48_64(items["sync_time"].value) + sampling_rate = float(convert48_64(items["sampling_rate"].value)) + self.timestamp_handler = TimestampHandler(sync_time, sampling_rate) + return True + + + def build_packet(self, items): + """ + Build a packet from the items + """ + _log.trace('Building package') + + if not GatedSpectrometerDataStreamFormat.validate_items(items): + return None + + packet = SpeadPacket(items) + + # Update timestamp count depending on epoch + packet.timestamp_count = self.timestamp_handler.updateTimestamp(packet.timestamp_count) # pylint: disable=no-member,attribute-defined-outside-init + + # Integration period does not contain efficiency of sampling as heaps may + # be lost respectively not in this gate + packet.integration_period = (packet.naccumulate * packet.fft_length) / float(packet.sampling_rate) # pylint: disable=no-member + + # The reference time is in the center of the integration period + packet.reference_time = float(packet.sync_time) + float(packet.timestamp_count) / float(packet.sampling_rate) + float(packet.integration_period/ 2.) # pylint: disable=no-member + + return packet