Commit d98cbc63 authored by Niclas Esser's avatar Niclas Esser
Browse files

Added pipelines (channelizer, beamformer) + pynvml-feature -> get compute...

Added pipelines (channelizer, beamformer) + pynvml-feature -> get compute architecture + SPEAD format of CRYO_PAF_CHANNELIZER
parent d7821d68
Pipeline #135341 failed with stages
in 0 seconds
......@@ -55,18 +55,88 @@ data_formats = {
"port": ""
},
"CRYO_PAF_PACKETIZER:1": {
"ip": "",
"CRYO_PAF_CHANNELIZER:1" : {
"ip":"",
"port": "",
"bit_depth" : 0,
"sample_rate" : 0,
"sync_time" : 0,
"band_flip": False,
"central_freq": 0,
"receiver_id": "",
"polarization": 2,
"samples_per_heap": 4096,
"description": "Spead stream of time-domain packetizer data as in EDD ICD."
"heap_size" : 8192,
# "heap_size": 8192 + 12 * 8,
"data_rate" : 2e9,
"spead_desc": {
"item1": {
"tag":1,
"id":0x1600,
"name":"timestamp",
"description":"timestamp of the oldest sample in this packet",
"shape":[6],
"dtype":">u1",
"order":"C",
"step":4096,
"index": 1
},
"item2": {
"tag":2,
"id":0x1601,
"name":"clipping_cnt",
"description":"Counts channelwise clipped data points",
"shape":[6],
"dtype":">u1",
"value":10,
"order":"C"
},
"item3": {
"tag":3,
"id":0x1602,
"name":"error_vector",
"description":"Contains an error code",
"shape":[6],
"dtype":">u1",
"value":0,
"order":"C"
},
"item4": {
"tag":4,
"id":0x1603,
"name":"scale_factor",
"description":"Contains the scaling factor",
"shape":[6],
"dtype":">u1",
"value":1,
"order":"C"
},
"item5": {
"tag":5,
"id":0x1604,
"name":"order_vector",
"description":"Contains element ID, channel ID and hardware ID",
"shape":[6],
"dtype":">u1",
"order":"C",
"value": 0,
"list":[[0,256,1]],#,[0,4,1],[0,256,1]], # [start, step, exclude last, number of bytes]
#"mask": [0xffff00000000, 0x0000ffff0000, 0x00000000ffff]
},
"item6": {
"tag":6,
"id":0x1605,
"name":"reserved",
"description":"Reserved item",
"shape":[6],
"dtype":">u1",
"value":0,
"order":"C"
},
"item7": {
"tag":7,
"id":0x1606,
"name":"payload",
"description":"channelized voltages of one PAF element",
"shape":[4096,2],
"dtype":"int8",
"order":"C"
}
}
},
"SIMPLE_SPEAD2:1": {
"ip": "",
......
This diff is collapsed.
from mpikat.utils.process_tools import ManagedProcess, command_watcher
from mpikat.utils.process_monitor import SubprocessMonitor
from mpikat.utils.sensor_watchdog import SensorWatchdog
from mpikat.utils.db_monitor import DbMonitor
from mpikat.utils.ip_utils import split_ipstring
from mpikat.utils.mkrecv_stdout_parser import MkrecvSensors
from mpikat.effelsberg.edd.pipeline.EDDPipeline import EDDPipeline, launchPipelineServer, updateConfig, state_change
from mpikat.effelsberg.edd.EDDDataStore import EDDDataStore, data_formats
import mpikat.utils.numa as numa
from mpikat.utils.core_manager import CoreManager
from mpikat.utils.ip_utils import ipstring_to_list
import logging, time
import numpy as np
import json
import tornado
import tempfile
from tornado.gen import coroutine
from katcp import Sensor, AsyncReply, FailReply
import testframework.utils as util
log = logging.getLogger("mpikat.testframework.MockChannelizer")
DEFAULT_CONFIG = {
"id": "MockChannelizer",
"type": "MockChannelizer",
"host" : "pacifix3",
"port" : 7148,
"dataset_dir" :"",
"channels":4,
"elements":128,
"output_data_streams":
[
{
"format" : "CRYO_PAF_CHANNELIZER:1",
"key" : "deadbee",
"nblocks" : 16,
"ip": "225.0.0.1",
"port": 17100,
"nic_addr": "192.168.2.90",
"udp" : True,
"source" : "dummy",
"heaps_per_buffer" : 4096
}
],
"data_store" :
{
"host" : "pacifix6",
"port" : 6379
}
}
class MockChannelizer(EDDPipeline):
def __init__(self, ip, port):
EDDPipeline.__init__(self, ip, port, DEFAULT_CONFIG)
self._process_list = []
self._dada_buffers = []
self._spead_description = []
self._output_handler = []
self._generators = []
self._header_files = []
self.__eddDataStore = 0
self._set_numa_node()
@coroutine
def setup_sensors(self):
"""
@brief Setup monitoring sensors
"""
EDDPipeline.setup_sensors(self)
@state_change(target="configured", allowed=["idle", "configured"], intermediate="configuring")
@coroutine
def configure(self):
self._subprocessMonitor = SubprocessMonitor()
self.__coreManager = CoreManager(self.numa_node)
for i, stream in enumerate(self._config["output_data_streams"]):
self._spead_description.append(data_formats[stream["format"]])
buffer_size = stream["heaps_per_buffer"] * data_formats[stream["format"]]["heap_size"]
yield self._create_ring_buffer(buffer_size, stream["nblocks"], stream['key'], self.numa_node)
self._header_files.append(tempfile.NamedTemporaryFile(delete=False))
self.__coreManager.add_task("mksend_{}".format(i), 1)
cmd = self._get_output_handle_cmd(stream, self._spead_description[i], self._header_files[i], self.__coreManager.get_coresstr('mksend_'+str(i)))
log.debug("Output handle command: {}".format(cmd))
self._output_handler.append(ManagedProcess(cmd))
self._process_list.append(self._output_handler[i])
self._subprocessMonitor.add(self._output_handler[i], self._subprocess_error)
self._subprocesses.append(self._output_handler[i])
self.__coreManager.add_task("generator_{}".format(i), 1)
cmd = self._get_generate_cmd(stream["source"], self._spead_description[i]["data_rate"], stream["key"], self._header_files[i])
log.debug("Stream {} will be generated by command {}".format(i, cmd))
self._generators.append(ManagedProcess(cmd))
self._process_list.append(self._generators[i])
self._subprocessMonitor.add(self._generators[i], self._subprocess_error)
self._subprocesses.append(self._generators[i])
log.info("Processes, buffers and generators ready, starting to stream..")
@state_change(target="ready", allowed=["configured"], intermediate="capture_starting")
@coroutine
def capture_start(self):
pass
@state_change(target="streaming", allowed=["configured", "ready"], intermediate="measurement_starting")
@coroutine
def measurement_start(self):
pass
@state_change(target="ready", allowed=["streaming"], intermediate="measurement_stopping")
@coroutine
def measurement_stop(self):
pass
@state_change(target="configured", allowed=["ready"], intermediate="capture_stopping")
@coroutine
def capture_stop(self):
pass
@state_change(target="idle", intermediate="deconfiguring", error='panic')
@coroutine
def deconfigure(self):
log.info("Deconfiguring EDD backend")
for generator in self._generators:
yield generator.terminate()
for handler in self._output_handler:
yield handler.terminate()
for buffer in self._dada_buffers:
self._destroy_ring_buffer(buffer)
@coroutine
def _create_ring_buffer(self, buffer_size, blocks, key, numa_node):
"""
@brief Create a ring buffer of given size with given key on specified numa node.
Adds and register an appropriate sensor to thw list
"""
# always clear buffer first. Allow fail here
yield command_watcher("dada_db -d -k {key}".format(key=key), allow_fail=True)
cmd = "numactl --cpubind={numa_node} --membind={numa_node} dada_db -k {key} -n {blocks} -b {buffer_size} -p -l" \
.format(key=key, blocks=blocks, buffer_size=buffer_size, numa_node=numa_node)
log.debug("Running command: {0}".format(cmd))
yield command_watcher(cmd, allow_fail=True)
M = DbMonitor(key, self._output_handle)
M.start()
self._dada_buffers.append({'key': key, 'monitor': M})
@coroutine
def _destroy_ring_buffer(self, buffer):
"""
@brief Create a ring buffer of given size with given key on specified numa node.
Adds and register an appropriate sensor to thw list
"""
# always clear buffer first. Allow fail here
buffer['monitor'].stop()
cmd = "dada_db -d -k {0}".format(buffer['key'])
log.debug("Running command: {0}".format(cmd))
yield command_watcher(cmd)
def _get_generate_cmd(self, source, rate, key, header_file, duration=60):
cmd = ""
if source == "dummy":
log.warning("Generating dummy input!")
cmd = "dada_junkdb -c 1 -R {fs} -t {duration} -k {key} {header}".format(
fs=rate/1e6,
duration=duration,
key=key,
header=header_file.name
)
else:
log.warning("Stream source {} not implemented yet".format(conf["source"]))
print(cmd)
return cmd
def _get_output_handle_cmd(self, stream_conf, spead_conf, header_file, physcpu):
log.debug("Creating input dada header file: {}".format(header_file.name))
header_file.write("# DADA header configuration")
header_file.write("\nHEADER DADA ")
header_file.write("\nHDR_VERSION 1.0 ")
header_file.write("\nHDR_SIZE 4096")
header_file.write("\nDADA_VERSION 1.0")
header_file.write("\nTELESCOPE EFF ")
header_file.write("\nRECEIVER PAF ")
header_file.write("\nINSTRUMENT CHAN ")
header_file.write("\nOBS_OFFSET 0")
header_file.write("\nRESOLUTION 1 ")
header_file.write("\nFREQ " + str(4))
header_file.write("\nDIM " + str(2))
header_file.write("\nNPOL " + str(2))
header_file.write("\nNBIT " + str(2))
header_file.write("\nUTC_START " + str(0))
header_file.write("\nNSLOTS " + str(stream_conf["nblocks"]))
header_file.write("\n# MKSEND parameters")
header_file.write("\nNETWORK_MODE 1")
header_file.write("\nPACKET_SIZE 8400")
header_file.write("\nSYNC_TIME unset")
header_file.write("\nSAMPLE_CLOCK unset")
header_file.write("\nUTC_START unset")
header_file.write("\nHEAP_COUNT "+ str(self._config["elements"] * self._config["channels"]))
header_file.write("\nHEAP_OFFSET 1")
header_file.write("\nHEAP_ID_STEP 1")
header_file.write("\nNITEMS " + str(len(spead_conf["spead_desc"])))
header_file.write("\nNSCI " + str(0))
for item_name, item_conf in spead_conf["spead_desc"].items():
print(item_name, item_conf)
if "step" in item_conf:
header_file.write("\nITEM{}_ID {}".format(item_conf["tag"], item_conf["id"]))
header_file.write("\nITEM{}_STEP {}".format(item_conf["tag"], item_conf["step"]))
elif "list" in item_conf:
for ii, outer in enumerate(item_conf["list"]):
header_file.write("\nITEM{}_ID {}".format(item_conf["tag"] + ii, item_conf["id"]))
header_file.write("\nITEM{}_LIST {}:{}:{}".format(item_conf["tag"] + ii, outer[0], outer[1], outer[2]))
header_file.write("\nITEM{}_INDEX {}".format(item_conf["tag"] + ii, ii + 2))
elif "value" in item_conf:
header_file.write("\nITEM{}_ID {}".format(item_conf["tag"], item_conf["id"]))
header_file.write("\nITEM{}_LIST {}".format(item_conf["tag"], item_conf["value"]))
else:
header_file.write("\nITEM{}_ID {}".format(item_conf["tag"], item_conf["id"]))
header_file.write("\n")
header_file.close()
cmd = "taskset -c " + str(physcpu)
cmd += " mksend --header " + str(header_file.name)
cmd += " --heap-id-start " + str(0)
cmd += " --dada-key " + str(stream_conf["key"])
# cmd += " --ibv-if {ibv_if}"
cmd += " --port " + str(stream_conf["port"])
cmd += " --sync-epoch " + str(0)
cmd += " --sample-clock " + str(spead_conf["data_rate"])
cmd += " --item1-step " + str(spead_conf["spead_desc"]["item1"]["step"])
cmd += " --rate " + str(spead_conf["data_rate"])
cmd += " --heap-size " + str(spead_conf["heap_size"])
if stream_conf["udp"]:
cmd += " --udp-if " + str(stream_conf["nic_addr"])
cmd += " --heap-group " + str(256)
cmd += " " + str(stream_conf["ip"])
return cmd
def _output_handle(self, foo):
pass
def _set_numa_node(self):
# Get NUMA node and its charachteristics
self.__numa_node_pool = []
for node in numa.getInfo():
cuda_devices = numa.getInfo()[node]['gpus']
# remove numa nodes with missing capabilities
if len(cuda_devices) < 1:
log.debug("Not enough gpus on numa node {} - removing from pool.".format(node))
continue
elif len(numa.getInfo()[node]['net_devices']) < 1:
log.debug("Not enough nics on numa node {} - removing from pool.".format(node))
continue
else:
self.__numa_node_pool.append(node)
log.debug("NUMA nodes {} remain in pool after constraints.".format(len(self.__numa_node_pool)))
if len(self.__numa_node_pool) == 0:
raise FailReply("Not enough numa nodes to process data!")
self.numa_node = self.__numa_node_pool[0]
log.debug("Associating with numa node {}".format(self.numa_node))
self.fastest_nic, self.nic_params = numa.getFastestNic(self.numa_node,False)
log.info("Will use NIC {} [ {} ] @ {} Mbit/s" \
.format( \
self.fastest_nic, \
self.nic_params["ip"], \
self.nic_params['speed']))
if __name__ == "__main__":
launchPipelineServer(MockChannelizer)
......@@ -128,7 +128,7 @@ NVML_INFOROM_ECC = 1
NVML_INFOROM_POWER = 2
NVML_INFOROM_COUNT = 3
_nvmlReturn_t = c_uint
_noden_t = c_uint
NVML_SUCCESS = 0
NVML_ERROR_UNINITIALIZED = 1
NVML_ERROR_INVALID_ARGUMENT = 2
......@@ -291,7 +291,7 @@ def _extractNVMLErrorsAsClasses():
e.g. NVML_ERROR_ALREADY_INITIALIZED will be turned into NVMLError_AlreadyInitialized
'''
this_module = sys.modules[__name__]
nvmlErrorsNames = [x for x in dir(this_module) if x.startswith("NVML_ERROR_")]
nvmlErrorsNames = [x for x in dir(this_module) if x.startswith("NVML_ERROR_")]
for err_name in nvmlErrorsNames:
# e.g. Turn NVML_ERROR_ALREADY_INITIALIZED into NVMLError_AlreadyInitialized
class_name = "NVMLError_" + string.capwords(err_name.replace("NVML_ERROR_", ""), "_").replace("_", "")
......@@ -1170,6 +1170,14 @@ def nvmlDeviceGetComputeMode(handle):
_nvmlCheckReturn(ret)
return c_mode.value
def nvmlDeviceGetCudaComputeCapability(handle):
major = c_uint()
minor = c_uint()
fn = _nvmlGetFunctionPointer("nvmlDeviceGetCudaComputeCapability")
ret = fn(handle, byref(major), byref(minor))
_nvmlCheckReturn(ret)
return major.value, minor.value
def nvmlDeviceGetEccMode(handle):
c_currState = _nvmlEnableState_t()
c_pendingState = _nvmlEnableState_t()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment