Commit b3079d13 authored by Niclas Esser's avatar Niclas Esser
Browse files

Simple test pipelie

parent d7821d68
Pipeline #132540 failed with stages
in 0 seconds
from __future__ import print_function, division, unicode_literals
from mpikat.utils.process_tools import ManagedProcess, command_watcher
from mpikat.utils.process_monitor import SubprocessMonitor
from mpikat.utils.db_monitor import DbMonitor
from mpikat.effelsberg.edd.pipeline.EDDPipeline import EDDPipeline, launchPipelineServer, state_change
from mpikat.effelsberg.edd.EDDDataStore import EDDDataStore
import mpikat.utils.numa as numa
from mpikat.utils.core_manager import CoreManager
from astropy.time import Time as ATime
from tornado.gen import coroutine
from katcp import Sensor, FailReply
import logging
import coloredlogs
import json
import tempfile
log = logging.getLogger("mpikat.effelsberg.edd.pipeline.SimpleSpeadTestPipeline")
_DEFAULT_CONFIG = {
"id": "SimpleSpeadTest", # default cfgs for master controller. Needs to get a unique ID -- TODO, from ansible
"type": "SimpleSpeadTest",
"samples_per_block": 8388608,
"n_blocks":64,
"host" : "pacifix3",
"port" : 7148,
"utc_start" : "",
"input_data_streams":
[
{
"format": "CRYO_PAF_PACKETIZER:1", # Format has version separated via colon
"ip": "225.0.2.156",
"port": 17100,
"bit_depth" : 8,
"samples_per_heap": 16384,
"sample_rate" : 1e8,
}
],
"output_data_streams":
{
"test_stream_out_0" :
{
"format": "CRYO_PAF_PACKETIZER:1", # Format has version separated via colon
"ip": "225.0.2.157",
"port": 17101
}
},
"data_store" :
{
"host" : "pacifix6",
"port" : 6379
}
}
mkrecv_header = """
## Dada header configuration
HEADER DADA
HDR_VERSION 1.0
HDR_SIZE 4096
DADA_VERSION 1.0
## MKRECV configuration
PACKET_SIZE 4096
BUFFER_SIZE 212992
SAMPLE_CLOCK_START 0 # This is updated with the sync-time of the packetiser to allow for UTC conversion from the sample clock
DADA_NSLOTS 64
SLOTS_SKIP 0
NTHREADS 4
#SPEAD specifcation
NINDICES 1
# SPEAD2 Items
IDX1_ITEM 0 # First item of a SPEAD heap (timestamp)
"""
mksend_header = """
HEADER DADA
HDR_VERSION 1.0
HDR_SIZE 4096
DADA_VERSION 1.0
# MKSEND CONFIG
IBV_VECTOR 0 # IBV forced into polling mode
NETWORK_MODE 1
PACKET_SIZE 4096
BUFFER_SIZE 212992
SYNC_TIME unset # Default value from mksend manual
SAMPLE_CLOCK unset # Default value from mksend manual
SAMPLE_CLOCK_START 0
# SPEAD specifcation
NITEMS 2
HEAP_SIZE 16384
HEAP_COUNT 1
HEAP_ID_OFFSET 0
HEAP_ID_STEP 1
# SPEAD2 Items
ITEM1_ID 5632 # First item of a SPEAD heap (timestamp)
ITEM1_SERIAL
ITEM2_ID 5633 #
"""
class SimpleSpeadTestPipeline(EDDPipeline):
def __init__(self, ip, port):
"""initialize the pipeline."""
EDDPipeline.__init__(self, ip, port, _DEFAULT_CONFIG)
self.mkrecv_proc = []
self.mksend_proc = []
self.dada_buffers = []
@coroutine
def setup_sensors(self):
"""
@brief Setup monitoring sensors
"""
EDDPipeline.setup_sensors(self)
self._input_fill_level_sensor = Sensor.float(
"input-buffer-fill-level",
description="Fill level of dada buffer",
params=[0, 1])
self.add_sensor(self._input_fill_level_sensor)
self._input_total_write_sensor = Sensor.float(
"input-buffer-total-write",
description="Total write into dada buffer ",
params=[0, 1])
self.add_sensor(self._input_total_write_sensor)
@state_change(target="configured", allowed=["idle", "configured"], intermediate="configuring")
@coroutine
def configure(self):
"""
@brief Configure capture pipeline
@param config_json A JSON dictionary object containing configuration information
"""
log.info("Configuring EDD backend for PAF capturing")
log.debug("Final config: {}".format(json.dumps(self._config, indent=4)))
self.__numa_node_pool = []
for node in numa.getInfo():
# remove numa nodes with missing capabilities
if len(numa.getInfo()[node]['gpus']) < 1:
log.debug("Not enough gpus on numa node {} - removing from pool.".format(node))
continue
elif len(numa.getInfo()[node]['net_devices']) < 1:
log.debug("Not enough nics on numa node {} - removing from pool.".format(node))
continue
else:
self.__numa_node_pool.append(node)
log.debug("{} numa nodes remaining in pool after constraints.".format(len(self.__numa_node_pool)))
if len(self._config['input_data_streams']) > len(self.__numa_node_pool):
log.error("Not enough numa nodes to process {} streams!".format(len(self._config['input_data_streams'])))
raise FailReply("Not enough numa nodes to process {} streams!".format(len(self._config['input_data_streams'])))
self.__coreManager = [] # one core manager per numa node
self._subprocessMonitor = SubprocessMonitor()
if self._config["utc_start"] == "":
self._config["utc_start"] = int((ATime(ATime.now(), format='isot', scale='utc')-ATime(ATime("1970-01-01 00:00:00"), format='isot', scale='utc')).value * 24 * 60 * 60)
for i, stream_desc in enumerate(self._config['input_data_streams']):
numa_node = self.__numa_node_pool[i]
self.__coreManager.append(CoreManager(numa_node))
self.__coreManager[i].add_task("mksend", 2)
self.__coreManager[i].add_task("mkrecv", 2, prefere_isolated=True)
# Setup dada buffer
stream_desc['dada_key'] = ["dada", "dadc"][i]
heapSize = stream_desc["samples_per_heap"] * stream_desc["bit_depth"] // 8
nHeaps = self._config["samples_per_block"] / stream_desc["samples_per_heap"]
bufferSize = nHeaps * (heapSize)# + 64 // 8)
log.info("""Input dada parameters:\n
heap size: {} bytes
heaps per block: {}
buffer size: {} bytes""".format(heapSize, nHeaps, bufferSize))
yield self._create_ring_buffer(bufferSize, self._config["n_blocks"], stream_desc["dada_key"], numa_node)
# Setup mksend process
mksend_header_file = tempfile.NamedTemporaryFile(delete=False)
mksend_header_file.write(mksend_header)
mksend_header_file.close()
fastest_nic, nic_params = numa.getFastestNic(numa_node)
log.info("Sending data for stream {} on NIC {} [ {} ] @ {} Mbit/s".format(i, fastest_nic, nic_params["ip"], nic_params["speed"]))
mksend_cmd = "taskset -c " + self.__coreManager[i].get_coresstr('mksend') + " mksend --header " + str(mksend_header_file.name)
mksend_cmd += " --dada-key " + str(stream_desc['dada_key'])
mksend_cmd += " --udp-if " + str(nic_params['ip'])
mksend_cmd += " --port " + str(self._config["output_data_streams"]["test_stream_out_0"]["port"])
mksend_cmd += " --sync-epoch " + str(self._config["utc_start"]) # Where to get the sync epoch from?
mksend_cmd += " --sample-clock " + str(stream_desc["sample_rate"])
mksend_cmd += " --item1-step " + str(stream_desc["samples_per_heap"])
mksend_cmd += " --rate "+ str(stream_desc["sample_rate"])
mksend_cmd += " " + self._config["output_data_streams"]["test_stream_out_0"]["ip"] # Multicast address
# mksend_cmd = "dada_dbnull -k " + str(stream_desc['dada_key'])
log.debug("Command to run: {}".format(mksend_cmd))
mksend = ManagedProcess(mksend_cmd)
self.mksend_proc.append(mksend)
self._subprocessMonitor.add(mksend, self._subprocess_error)
self._subprocesses.append(mksend)
self._subprocessMonitor.start()
@state_change(target="streaming", allowed=["configured"], intermediate="capture_starting")
@coroutine
def capture_start(self):
"""
@brief start capturing
"""
log.info("Capture starting")
try:
for i, stream_desc in enumerate(self._config["input_data_streams"]):
mkrecv_header_file = tempfile.NamedTemporaryFile(delete=False)
mkrecv_header_file.write(mkrecv_header)
# mkrecv_header_file.write("NBIT {}\n".format(stream_desc["bit_depth"]))
# mkrecv_header_file.write("HEAP_SIZE {}\n".format(stream_desc["samples_per_heap"] * stream_desc["bit_depth"] // 8))
# mkrecv_header_file.write("SAMPLES_PER_BLOCK {}\n".format(self._config["samples_per_block"]))
mkrecv_header_file.close()
numa_node = self.__numa_node_pool[i]
fastest_nic, nic_params = numa.getFastestNic(numa_node)
log.info("Receiving data stream {} on NIC {} [ {} ] @ {} Mbit/s".format(i, fastest_nic, nic_params['ip'], nic_params['speed']))
mkrecv_cmd = "taskset -c " + self.__coreManager[i].get_coresstr('mkrecv') + " mkrecv_v4 --header " + str(mkrecv_header_file.name)
mkrecv_cmd += " --idx1-step " + str(stream_desc["samples_per_heap"])
mkrecv_cmd += " --heap-size " + str(stream_desc["samples_per_heap"] * stream_desc["bit_depth"] // 8)
#mkrecv_cmd += " --nthreads " + str(len(self.__coreManager[i].get_cores('mkrecv')) - 1)
mkrecv_cmd += " --dada-key " + str(stream_desc['dada_key'])
mkrecv_cmd += " --sync-epoch " + str(self._config["utc_start"])
mkrecv_cmd += " --sample-clock " + str(stream_desc["sample_rate"])
mkrecv_cmd += " --udp-if " + str(nic_params['ip'])
mkrecv_cmd += " --port " + str(stream_desc["port"])
mkrecv_cmd += " " + stream_desc["ip"]
mk_recv = ManagedProcess(mkrecv_cmd)
self.mkrecv_proc.append(mk_recv)
self._subprocessMonitor.add(mk_recv, self._subprocess_error)
except Exception as E:
log.error("Error starting pipeline: {}".format(E))
raise E
@state_change(target="configured", allowed=["ready", "streaming"], intermediate="capture_stopping")
@coroutine
def capture_stop(self):
"""
@brief Stop streaming of data
"""
log.info("Stoping capturing")
yield self.deconfigure()
@state_change(target="idle", intermediate="deconfiguring", error='panic')
@coroutine
def deconfigure(self):
"""
@brief deconfigure the gated spectrometer pipeline.
"""
log.info("Deconfiguring EDD backend")
if self.previous_state == 'streaming':
yield self.capture_stop()
if self._subprocessMonitor is not None:
yield self._subprocessMonitor.stop()
for proc in self._subprocesses:
yield proc.terminate()
log.debug("Destroying dada buffers")
for k in self.dada_buffers:
k['monitor'].stop()
cmd = "dada_db -d -k {0}".format(k['key'])
log.debug("Running command: {0}".format(cmd))
yield command_watcher(cmd)
self.dada_buffers = []
@coroutine
def _create_ring_buffer(self, buffer_size, blocks, key, numa_node):
"""
@brief Create a ring buffer of given size with given key on specified numa node.
Adds and register an appropriate sensor to thw list
"""
# always clear buffer first. Allow fail here
yield command_watcher("dada_db -d -k {key}".format(key=key), allow_fail=True)
cmd = "numactl --cpubind={numa_node} --membind={numa_node} dada_db -k {key} -n {blocks} -b {buffer_size} -p -l" \
.format(key=key, blocks=blocks, buffer_size=buffer_size, numa_node=numa_node)
log.debug("Running command: {0}".format(cmd))
yield command_watcher(cmd, allow_fail=True)
M = DbMonitor(key, self._buffer_status_handle)
M.start()
self.dada_buffers.append({'key': key, 'monitor': M})
def _buffer_status_handle(self, status):
"""
@brief Process a change in the buffer status
"""
self._input_fill_level_sensor.set_value(status['fraction-full'])
self._input_total_write_sensor.set_value(status['written'])
@coroutine
def register(self, host=None, port=None):
"""
Registers the pipeline in the data store.
Args:
host, port: Ip and port of the data store.
If no host and port ar eprovided, values from the internal config are used.
"""
EDDPipeline.register(self, host, port)
self._config["data_store"]["ip"] = host
self._config["data_store"]["port"] = port
try:
self.__eddDataStore = EDDDataStore.EDDDataStore(host, port)
except:
log.error("Failed to connect to EDDDataStore")
self.__eddDataStore.addDataStream(self._config["id"] + "_input_streams", self._config["input_data_streams"]["raw_voltage_stream"])
if __name__ == "__main__":
launchPipelineServer(SimpleSpeadTestPipeline)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment