Commit a1b93b01 authored by Niclas Esser's avatar Niclas Esser
Browse files

Merge remote-tracking branch 'upstream/dev' into dev

parents 808e937c e7d1c736
Pipeline #112683 failed with stages
in 0 seconds
......@@ -85,9 +85,10 @@ class EDDHDFFileWriter(object):
_log.debug('Using file: {}'.format(ofile))
self.__filename = ofile
self.__mode = mode
self.__chunksize = chunksize
self._file = h5py.File(self.__filename, mode)
self._file = h5py.File(self.__filename, self.__mode)
self._file.attrs['FORMAT_VERSION'] = self._file_format_version
now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")
......@@ -98,11 +99,18 @@ class EDDHDFFileWriter(object):
history[-1] = a
self.__subscan_id = 0
self.__subscan = None
self.__items = []
self._lock = Lock()
def __del__(self):
try:
self._file.close()
except:
pass
@property
def filename(self):
return self.__filename
......@@ -125,7 +133,12 @@ class EDDHDFFileWriter(object):
scanid = "scan/{:03}".format(scannum)
_log.debug('Starting new subscan: {}'.format(scanid))
self.__subscan = self._file.create_group(scanid)
self._file.create_group(scanid)
def getCurrentScanId(self):
return "scan/{:03}".format(len(self._file['scan'].keys()) - 1)
def addData(self, section, data, attributes={}):
"""
......@@ -144,22 +157,23 @@ class EDDHDFFileWriter(object):
Format management
"""
with self._lock:
if section not in self.__subscan:
_log.debug('Creating new section {} for subscan: {}'.format(section, self.__subscan.name))
self.__subscan.create_group(section)
__subscan = self._file.get(self.getCurrentScanId())
if section not in __subscan:
_log.debug('Creating new section {} for subscan: {}'.format(section, __subscan.name))
__subscan.create_group(section)
# columns = gated_spectrometer_format
for k, c in data.items():
if self.__chunksize == 'auto':
self.__subscan[section].create_dataset(k, dtype=c.dtype, shape=(0,) + c.shape, maxshape=(None,)+ c.shape, chunks=True)
__subscan[section].create_dataset(k, dtype=c.dtype, shape=(0,) + c.shape, maxshape=(None,)+ c.shape, chunks=True)
else:
self.__subscan[section].create_dataset(k, dtype=c.dtype, shape=(0,) + c.shape, maxshape=(None,)+ c.shape, chunks=(self.__chunksize, )+ c.shape, )
__subscan[section].create_dataset(k, dtype=c.dtype, shape=(0,) + c.shape, maxshape=(None,)+ c.shape, chunks=(self.__chunksize, )+ c.shape, )
self.__items = set(data.keys())
if set(data.keys()) != self.__items:
_log.warning("Missing keys in dataset: {} - Ignoring dataset!".format(",".join(self.__items.difference(data.keys()))))
return
for did, dataset in self.__subscan[section].items():
for did, dataset in __subscan[section].items():
shape = list(dataset.shape)
shape[0] += 1
_log.debug('Resizing {}: {} -> {}'.format(dataset.name, dataset.shape, tuple(shape)))
......@@ -167,9 +181,18 @@ class EDDHDFFileWriter(object):
dataset[-1] = data[did]
for key, value in attributes.items():
if key not in self.__subscan[section].attrs.keys():
if key not in __subscan[section].attrs.keys():
_log.debug("Adding attribute: {} = {} to section {}".format(key, value, section))
self.__subscan[section].attrs[key] = value
__subscan[section].attrs[key] = value
def open(self, mode='None'):
"""
Re-Opens the HDF file. Use the previous mode if no other mode is given..
"""
_log.debug('Opening : {}'.format(self.filename))
self._file.close()
self._file = h5py.File(self.__filename, self.__mode)
def close(self):
"""
......
......@@ -289,7 +289,7 @@ class DigitiserPacketiserClient(object):
@coroutine
def enable_snapshot(self, time=5):
yield self._safe_request("rxs_packetizer_snapshot_enable_spec", time)
yield self._safe_request("rxs_packetizer_snapshot_enable_spec")
#yield self._safe_request("rxs_packetizer_snapshot_enable_time", time)
......
......@@ -39,6 +39,7 @@ import logging
import coloredlogs
import json
import os
import io
import git
import tornado
......@@ -188,22 +189,13 @@ class EddMasterController(EDDPipeline):
else:
yield EDDPipeline.set(self, cfg)
@state_change(target="configured", intermediate="configuring")
@coroutine
def configure(self):
def __updateGraph(self):
"""
Configure the EDD backend
"""
log.info("Configuring EDD backend for processing")
yield self._installController(self._config['products'])
cfs = json.dumps(self._config, indent=4)
log.debug("Starting configuration:\n" + cfs)
Parses the configuration and creates a nx directed acyclic graph for
the product dependencies.
If the config contains a coorect DAG, the _configuration_dag sensor is updated accordingly.
"""
# Data streams are only filled in on final configure as they may
# require data from the configure command of previous products. As example, the packetizer
# data stream has a sync time that is propagated to other components
......@@ -232,9 +224,37 @@ class EddMasterController(EDDPipeline):
except nx.NetworkXNoCycle:
log.debug("No loop on graph found")
pass
graph = "\n".join([" {} --> {}".format(k[0], k[1]) for k in dag.edges()])
log.info("Dependency graph of products:\n{}".format(graph))
self._configuration_graph.set_value(graph)
txt_graph = "\n".join([" {} --> {}".format(k[0], k[1]) for k in dag.edges()])
log.info("Dependency graph of products:\n{}".format(txt_graph))
dag.graph['edges']={'arrowsize':'4.0'}
dag.graph['node']={'shape':'box'}
dag.graph['graph']={'size':8, 'rankdir':'LR'}
ggraph = nx.nx_agraph.to_agraph(dag)
with io.BytesIO() as f:
ggraph.draw(f, format='svg', prog='dot')
f.seek(0)
self._configuration_graph.set_value(f.read())
return dag
@state_change(target="configured", intermediate="configuring")
@coroutine
def configure(self):
"""
Configure the EDD backend
"""
log.info("Configuring EDD backend for processing")
yield self._installController(self._config['products'])
cfs = json.dumps(self._config, indent=4)
log.debug("Starting configuration:\n" + cfs)
dag = self.__updateGraph()
configure_results = {}
configure_futures = []
......@@ -321,10 +341,11 @@ class EddMasterController(EDDPipeline):
for cid, controller in self.__controller.items():
futures.append(controller.deconfigure())
yield futures
self._configuration_graph.set_value("")
# After deconfigure, there are no more datastreams
self.__eddDataStore._dataStreams.flushdb()
self.flush_notes()
@state_change(target="ready", allowed=["configured"], intermediate="capture_starting")
......@@ -352,8 +373,22 @@ class EddMasterController(EDDPipeline):
futures.append(controller.capture_stop())
yield futures
@state_change(target="set", intermediate="measurement_preparing")
@coroutine
def metadata_update(self, metadata_json=""):
"""
"""
log.debug("Received metadata update ... ")
try:
cfg = json.loads(metadata_json)
except:
log.error("Error parsing json")
raise FailReply("Cannot handle config string {} - Not valid json!".format(metadata_json))
for item in cfg:
log.debug("Setting metadata: {} with value: {} to EDD data store.".format(item, cfg[item]))
self.__eddDataStore.setTelescopeDataItem(item, cfg[item])
@state_change(target="set", allowed=["set", "ready", "measurement_starting", "measurement_stopping", "configured", "streaming"], intermediate="measurement_preparing")
@state_change(target="set", allowed=["set", "ready", "measurement_starting", "measurement_stopping", "configured", "streaming", "error"], intermediate="measurement_preparing")
@coroutine
def measurement_prepare(self, config_json=""):
"""
......@@ -374,6 +409,9 @@ class EddMasterController(EDDPipeline):
else:
log.debug("Sending measurement_prepare to {} with {}".format(cid, ""))
futures.append(controller.measurement_prepare({}))
for k in cfg:
if k not in self.__controller:
self.add_note("WARNING: Measurement prepare requested for unprovisioned product '{}'".format(k))
yield futures
......@@ -386,6 +424,7 @@ class EddMasterController(EDDPipeline):
futures = []
for cid, controller in self.__controller.items():
futures.append(controller.measurement_start())
yield futures
......@@ -468,8 +507,9 @@ class EddMasterController(EDDPipeline):
try:
yield command_watcher("ansible-playbook -i {} {} {}".format(self.__inventory, playfile.name, additional_args), env={"ANSIBLE_ROLES_PATH":os.path.join(self.__edd_ansible_git_repository_folder, "roles")}, timeout=300)
os.unlink(playfile.name)
except Exception as E:
playfile.unlink()
os.unlink(playfile.name)
raise RuntimeError("Error {} processing play:\n {}".format(E, yaml.dump(play)))
......@@ -488,7 +528,7 @@ class EddMasterController(EDDPipeline):
- ::`NAME1.yml;NAME2.json` to load different yml / json configs
"""
os.chdir(self.__edd_ansible_git_repository_folder)
log.debug("Provision description {} from directory {}".format(description, os.getcwd()))
log.debug("Reading provision description {} from directory {}".format(description, os.getcwd()))
if description.startswith('"'):
description = description.lstrip('"')
description = description.rstrip('"')
......@@ -529,7 +569,7 @@ class EddMasterController(EDDPipeline):
yield subplay_futures
except Exception as E:
raise FailReply("Error in provisioning thrown by ansible {}".format(E))
raise FailReply("Error in provisioning thrown by ansible: {}".format(E))
yield self._loadBasicConfig(basic_config_file)
......@@ -603,6 +643,7 @@ class EddMasterController(EDDPipeline):
self._config["products"][cfg['id']] = cfg
self._configUpdated()
self.__updateGraph()
def __sanitizeConfig(self, config):
......
......@@ -3,7 +3,6 @@ import logging
from mpikat.core.scpi import ScpiAsyncDeviceServer, scpi_request, raise_or_ok, launch_server
import mpikat.effelsberg.edd.pipeline.EDDPipeline as EDDPipeline
from mpikat.effelsberg.edd.edd_server_product_controller import EddServerProductController
from mpikat.effelsberg.edd import EDDDataStore
import coloredlogs
from tornado.gen import Return, coroutine, sleep
import tornado
......@@ -14,7 +13,7 @@ log = logging.getLogger('mpikat.edd_scpi_interface')
class EddScpiInterface(ScpiAsyncDeviceServer):
def __init__(self, interface, port, master_ip, master_port, redis_ip, redis_port, scannum_check_period=1000, ioloop=None):
def __init__(self, interface, port, master_ip, master_port, redis_ip, redis_port, ioloop=None):
"""
@brief A SCPI interface for a EddMasterController instance
......@@ -32,44 +31,6 @@ class EddScpiInterface(ScpiAsyncDeviceServer):
log.info("Datastore at {}:{}".format(redis_ip, redis_port))
super(EddScpiInterface, self).__init__(interface, port, ioloop)
self.__controller = EddServerProductController("MASTER", master_ip, master_port)
self.__eddDataStore = EDDDataStore.EDDDataStore(redis_ip, redis_port)
#Periodicaly check scan number and send measurement prepare on change
self._scannum_callback = tornado.ioloop.PeriodicCallback(self.__check_scannum, scannum_check_period)
self._scannum_callback.start()
self._last_scannum = None
self.__legacypulsarmode = False
@coroutine
def __check_scannum(self):
"""
@brief check scan number
"""
current_scan_number = self.__eddDataStore.getTelescopeDataItem("scannum")
if not self._last_scannum:
log.debug("First retrival of scannumbner, got {}".format(current_scan_number))
self._last_scannum = current_scan_number
elif self._last_scannum == current_scan_number:
#log.debug("Checking scan number {} == {}, doing nothing.".format(current_scan_number, self._last_scannum))
pass
else:
log.debug("Scan number change detected from {} -> {}".format(self._last_scannum, current_scan_number))
self._last_scannum = current_scan_number
if not self.__legacypulsarmode:
log.debug("Legacy pulsar mode disbaled. Not reacting on scannumber change!")
return
sourcename = self.__eddDataStore.getTelescopeDataItem("source-name")
if sourcename.endswith("_R"):
log.debug("Source ends with _R, enabling noise diode")
cfg = {"dig_pack_controller": {"set_noise_diode_firing_pattern": {"percentage":0.5, "period":1}}}
else:
log.debug("Source ends not with _R, enabling noise diode")
cfg = {"dig_pack_controller": {"set_noise_diode_firing_pattern": {"percentage":0.0, "period":1}}}
self.__controller.measurement_prepare(cfg)
@scpi_request()
......@@ -187,19 +148,6 @@ class EddScpiInterface(ScpiAsyncDeviceServer):
self._ioloop.add_callback(self._make_coroutine_wrapper(req, self.__controller.deprovision))
@scpi_request(str)
def request_edd_noisediodebysourcename(self, req, message):
log.debug("Setting pulsar mode")
if message.upper() in ['ON', 'TRUE', 'ENABLED']:
self.__legacypulsarmode = True
elif message.upper() in ['OFF', 'FALSE', 'DISABLED']:
self.__legacypulsarmode = False
else:
em = "Error setting {} - expecting ON or OFF.".format(message)
log.error(em)
req.error(em)
req.ok()
@scpi_request(str)
def request_edd_measurementprepare(self, req, message):
"""
......@@ -217,16 +165,6 @@ class EddScpiInterface(ScpiAsyncDeviceServer):
self._ioloop.add_callback(self._make_coroutine_wrapper(req, self.__controller.measurement_prepare, cfg))
@scpi_request(float, float)
def request_edd_setnoisediodepattern(self, req, percentage, period):
log.debug("Sending noise diode fireing pattern: percentage={}, period={}".format(percentage, period))
cfg = {"set_noise_diode_firing_pattern": {"percentage":percentage, "period":period}}
self._ioloop.add_callback(self._make_coroutine_wrapper(req, self.__controller.measurement_prepare, cfg))
if __name__ == "__main__":
parser = EDDPipeline.getArgumentParser()
......@@ -234,13 +172,11 @@ if __name__ == "__main__":
default="edd01", help='The ip for the master controller')
parser.add_argument('--master-controller-port', dest='master_port',
type=int, default=7147, help='The port number for the master controller')
parser.add_argument('--scannum-check-period', dest='scannum_check_period',
type=int, default=1000, help='Period [ms] between checks of changes of the scan number.')
args = parser.parse_args()
EDDPipeline.setup_logger(args)
server = EddScpiInterface(args.host, args.port, args.master_ip, args.master_port, args.redis_ip, args.redis_port, args.scannum_check_period)
server = EddScpiInterface(args.host, args.port, args.master_ip, args.master_port, args.redis_ip, args.redis_port)
#Scpi Server is not an EDDPipieline, but launcher work nevertheless
EDDPipeline.launchPipelineServer(server, args)
......
......@@ -98,7 +98,7 @@ import mpikat.utils.ip_utils as ip_utils
from tornado.gen import coroutine, sleep, Return
from tornado.ioloop import IOLoop, PeriodicCallback
from katcp import Sensor, FailReply, AsyncReply
from katcp import Sensor, FailReply, AsyncReply, Message
from katcp.kattypes import (request, return_reply)
import numpy as np
......@@ -128,7 +128,7 @@ _DEFAULT_CONFIG = {
'max_sync_age': 82800,
'interface_addresses': [], # if set, the digitizer nics are assigned an ip manually during configure
'digital_filter': 0,
'snapshot_frequency': 10.0, # Frequency for queries of digitizer snapshot, disbled for 0 or negative values
'snapshot_frequency': -1., # Frequency for queries of digitizer snapshot, disbled for 0 or negative values
"output_data_streams":
{
"polarization_0" : # polarization_0 maps to v in packetizer nomenclatura
......@@ -234,7 +234,6 @@ class DigitizerControllerPipeline(EDDPipeline):
# We do not know the initial state of the packetizr before we take
# control, thus we will config on first try
self.__previous_config = None
self.__plotting = False
......@@ -256,6 +255,12 @@ class DigitizerControllerPipeline(EDDPipeline):
initial_status=Sensor.UNKNOWN)
self.add_sensor(self._level)
self._plotting = Sensor.boolean(
"plotting",
description="If True, the digpackclient will periodically query sensors and spectra from the digpack client. Toggled by start/stop plotting request.",
initial_status=False)
self.add_sensor(self._plotting)
......@@ -381,6 +386,7 @@ class DigitizerControllerPipeline(EDDPipeline):
katcp reply object
"""
self.stop_plotting()
req.reply("ok")
raise AsyncReply
......@@ -389,8 +395,9 @@ class DigitizerControllerPipeline(EDDPipeline):
"""
start plotting of snapshots
"""
if not self.__plotting:
self.__plotting = True
if not self._plotting.value():
self._plotting.set_value(True)
self._config['snapshot_frequency'] = 10.
yield self._client.enable_snapshot()
self.periodic_plot_snapshot()
......@@ -400,7 +407,7 @@ class DigitizerControllerPipeline(EDDPipeline):
"""
start plotting of snapshots
"""
self.__plotting = False
self._plotting.set_value(False)
@coroutine
......@@ -433,6 +440,7 @@ class DigitizerControllerPipeline(EDDPipeline):
except Exception as E:
log.exception(E)
self.add_sensor(s)
self.mass_inform(Message.inform('interface-changed'))
else:
log.debug("pass through sensors already added")
......@@ -440,7 +448,7 @@ class DigitizerControllerPipeline(EDDPipeline):
log.debug("Starting periodic plot")
with ProcessPoolExecutor(max_workers=1) as executor:
try:
while self.__plotting:
while self._plotting.value():
for key in passthrough:
sensor = self.get_sensor(key)
v = yield self.__pass_through_sensors[key].get_reading()
......@@ -448,7 +456,7 @@ class DigitizerControllerPipeline(EDDPipeline):
starttime = time.time()
data = yield self._client.get_snapshot()
plt = yield executor.submit(plot_script, data)
log.debug("Setting bandpass sensor with timestamp %", v.timestamp)
log.debug("Setting bandpass sensor with timestamp %f", v.timestamp)
self._bandpass.set_value(plt[0])
self._level.set_value(plt[1])
......
......@@ -172,6 +172,8 @@ class EDDPipeline(AsyncDeviceServer):
self._state = "idle"
self.previous_state = "unprovisioned"
self._sensors = []
self.__notes = set()
s = None
if port == 0:
log.debug("Get prot from kernel port range ... ")
......@@ -295,6 +297,47 @@ class EDDPipeline(AsyncDeviceServer):
initial_status=Sensor.NOMINAL)
self.add_sensor(self._log_level)
self._notes_sensor = Sensor.string(
"notes",
description="Text notifications to users",
default="",
initial_status=Sensor.NOMINAL)
self.add_sensor(self._notes_sensor)
def add_note(self, note):
"""
Adds a note to the set of notes. Update sensor if needed.
"""
if note in self.__notes:
return
self.__notes.add(note)
self.__updateNoteSensor()
def __updateNoteSensor(self):
if len(self.__notes) == 1:
msg = "".join(self.__notes)
else:
msg = "\n".join(" * {}".format(n) for n in self.__notes)
self._notes_sensor.set_value(msg)
def flush_notes(self):
self.__notes = set()
self.__updateNoteSensor()
def clear_note(self, note):
"""
Remove a note from the set of notes. Update sensor if needed.
"""
if note not in self.__notes:
return
self.__notes.remove(note)
self.__updateNoteSensor()
@request(Str(default='INFO'))
@return_reply()
def request_set_log_level(self, req, level):
......@@ -778,6 +821,7 @@ class EDDPipeline(AsyncDeviceServer):
@coroutine
def deconfigure_wrapper():
try:
self.flush_notes()
yield self.deconfigure()
except Exception as error:
log.exception(str(error))
......
......@@ -427,6 +427,9 @@ class EDDHDF5WriterPipeline(EDDPipeline):
self._output_file = EDDHDFFileWriter(path=path, file_id_no=file_id)
self._current_file.set_value(self._output_file.filename)
else:
self._output_file.open()
if ("override_newsubscan" in config and config["override_newsubscan"]):
......@@ -450,6 +453,8 @@ class EDDHDF5WriterPipeline(EDDPipeline):
def measurement_stop(self):
_log.info("Stopping writing data to file")
self._output_file.flush()
self._output_file.close()
# ToDo: There probably should be an output queue so that time ordering
# is not becoming an issue
......
......@@ -3,8 +3,10 @@ from mpikat.effelsberg.edd.pipeline.EDDPipeline import EDDPipeline
from katcp import FailReply
import unittest
import logging
import tornado.testing
class TestEDDPipeline(unittest.TestCase):
class TestEDDPipeline(tornado.testing.AsyncTestCase):
@tornado.testing.gen_test(timeout=120)
def test_set(self):
pipeline = EDDPipeline("localhost", 1234, dict(foo=''))
pipeline.set({"foo":"bar"})
......@@ -13,6 +15,23 @@ class TestEDDPipeline(unittest.TestCase):
with self.assertRaises(FailReply) as cm:
yield pipeline.set({"bar":"foo"})
@tornado.testing.gen_test(timeout=120)
def testNotesSensor(self):
pipeline = EDDPipeline("localhost", 1234, dict(foo=''))
self.assertEqual(pipeline._notes_sensor.value(), "")
pipeline.add_note("FOO")
self.assertEqual(pipeline._notes_sensor.value().count("FOO"), 1)
pipeline.add_note("FOO")
self.assertEqual(pipeline._notes_sensor.value().count("FOO"), 1)
pipeline.add_note("BAR")
self.assertEqual(pipeline._notes_sensor.value().count("FOO"), 1)
self.assertEqual(pipeline._notes_sensor.value().count("BAR"), 1)
pipeline.clear_note("BAR")
self.assertEqual(pipeline._notes_sensor.value().count("FOO"), 1)
self.assertEqual(pipeline._notes_sensor.value().count("BAR"), 0)