Commit ea209bec authored by Tobias Winchen's avatar Tobias Winchen

Merge branch 'DigPackSensors' into 'dev'

Support for meerkat firmware + additional sensors for digpack client

See merge request !49
parents 7cb4ba09 e06d695d
Pipeline #94420 passed with stages
in 1 minute and 23 seconds
......@@ -6,6 +6,8 @@ from __future__ import print_function
import logging
import time
import numpy as np
from tornado.gen import coroutine, sleep, Return
from tornado.ioloop import IOLoop
from katcp import KATCPClientResource
......@@ -29,7 +31,6 @@ class PacketiserInterfaceError(Exception):
class DigitiserPacketiserClient(object):
def __init__(self, host, port=7147):
"""
Wraps katcp commands to control a digitiser/packetiser.
......@@ -55,9 +56,13 @@ class DigitiserPacketiserClient(object):
3500000000: ("virtex7_dk769b", "3.5GHz", 7),
3200000000: ("virtex7_dk769b", "3.2GHz", 9),
2600000000: ("virtex7_dk769b", "2.6GHz", 3),
2560000000: ("virtex7_dk769b", "2.56GHz", 2)
}
2560000000: ("virtex7_dk769b", "2.56GHz", 2),
1750000000: ("virtex7_dk769b_test146.mkt", "3.5GHz", 7) # This is a special mode for the meerkat digitial filter cores inside the edd.
# An effective 1750 Mhz sampling rate/ 875MHz
# bandwidth is achieved by digitial filtering of
# the 3.5GHz sampled rate!
} # This is quite hacky, and the design of this client has to be has to be improved. Possibly by ahving a client per firmware
self.__firmware = None
def stop(self):
self._client.stop()
......@@ -85,7 +90,7 @@ class DigitiserPacketiserClient(object):
@coroutine
def _check_interfaces(self):
def _check_interfaces(self, interfaces=['iface00', 'iface01']):
"""
Check if interface of digitizer is in error state.
"""
......@@ -104,8 +109,8 @@ class DigitiserPacketiserClient(object):
"40-GbE interface '{}' did not boot".format(name))
else:
_log.debug("Interface '{}' is healthy".format(name))
yield _check_interface('iface00')
yield _check_interface('iface01')
for iface in interfaces:
yield _check_interface(iface)
@coroutine
......@@ -166,10 +171,20 @@ class DigitiserPacketiserClient(object):
attempts = 0
while True:
_log.debug("Reinit packetizer with firmware: {}".format(args[0]))
response = yield self._safe_request("rxs_packetizer_system_reinit", *args)
self.__firmware = args[0]
yield sleep(20)
try:
yield self._check_interfaces()
_log.warning("Hard coded firmware names in interface checks. This is a shortterm hack!")
if args[0] == "virtex7_dk769b":
yield self._check_interfaces()
elif args[0] == "virtex7_dk769b_test146.mkt":
yield self._check_interfaces(["iface00"])
else:
RuntimeError("Unknown core")
except PacketiserInterfaceError as error:
if attempts >= retries:
raise error
......@@ -181,6 +196,15 @@ class DigitiserPacketiserClient(object):
break
@coroutine
def set_digitial_filter(self, filter_number):
"""
Sets the digital filter number.
"""
yield self._safe_request("rxs_packetizer_40g_filter_selection_set", filter_number)
@coroutine
def set_bit_width(self, nbits):
"""
......@@ -194,6 +218,10 @@ class DigitiserPacketiserClient(object):
10: "edd10",
12: "edd12"
}
_log.warning("Firmware switch for bit set mode!")
if self.__firmware == "virtex7_dk769b_test146.mkt":
_log.debug("Firmware does not support setting bit rate!")
return
try:
mode = valid_modes[int(nbits)]
except KeyError as error:
......@@ -370,6 +398,33 @@ class DigitiserPacketiserClient(object):
sync_epoch = float(response.informs[0].arguments[0])
raise Return(sync_epoch)
@coroutine
def get_snapshot(self):
"""
Returns dictionary with snapshot data from the packetizer.
"""
response = yield self._safe_request("rxs_packetizer_snapshot_get_spec")
res = {}
for message in response.informs:
key = message.arguments[0]
if 'header' in key:
res[key] = dict(
band_width = float(message.arguments[1]) * 1e3,
integration_time = float(message.arguments[2]),
num_channels = int(message.arguments[3]),
band_width_adc = float(message.arguments[4]),
spec_counter = int(message.arguments[5]),
timestamp = message.arguments[6])
elif 'adc' in key:
res[key] = np.fromstring(message.arguments[1], dtype=np.float32)
elif 'level' in key:
res[key] = np.fromstring(message.arguments[1], dtype=np.int32)
raise Return(res)
@coroutine
def synchronize(self, unix_time=None):
"""
......@@ -484,7 +539,7 @@ if __name__ == "__main__":
if args.interface_addresses:
actions.append((client.set_interface_address, dict(intf=0, ip=args.interface_addresses[0])))
actions.append((client.set_interface_address, dict(intf=1, ip=args.interface_addresses[1])))
if args.predecimation_factor:
actions.append((client.set_predecimation, dict(factor=args.predecimation_factor)))
......
......@@ -98,8 +98,13 @@ import mpikat.utils.ip_utils as ip_utils
from tornado.gen import coroutine, sleep, Return
from tornado.ioloop import IOLoop, PeriodicCallback
from katcp import Sensor, FailReply
from katcp import Sensor, FailReply, AsyncReply
from katcp.kattypes import (request, return_reply)
import numpy as np
from concurrent.futures import ProcessPoolExecutor
import time
import logging
import json
......@@ -122,6 +127,8 @@ _DEFAULT_CONFIG = {
'dummy_configure': False,
'max_sync_age': 82800,
'interface_addresses': [], # if set, the digitizer nics are assigned an ip manually during configure
'digital_filter': 0,
'snapshot_frequency': 10.0, # Frequency for queries of digitizer snapshot, disbled for 0 or negative values
"output_data_streams":
{
"polarization_0" : # polarization_0 maps to v in packetizer nomenclatura
......@@ -142,12 +149,80 @@ _DEFAULT_CONFIG = {
}
def plot_script(data):
import matplotlib as mpl
mpl.use('Agg')
import numpy as np
import pylab as plt
import astropy.time
import sys
if sys.version_info[0] >=3:
import io
else:
import cStringIO as io
io.BytesIO = io.StringIO
import base64
mpl.rcParams.update(mpl.rcParamsDefault)
mpl.use('Agg')
spectrum_figure = plt.figure()
spectrum = spectrum_figure.add_subplot(111)
level_figure = plt.figure()
level = level_figure.add_subplot(111)
for i in range(2):
k = 'adc{}'.format(i)
if k in data:
spectrum.plot(data[k], label='Pol {}'.format(i))
k = 'level{}'.format(i)
if k in data:
level.step(np.arange(0, len(data[k])), data[k], label='Pol {}'.format(i))
spectrum.legend()
spectrum.set_xlabel('Channel')
spectrum.set_ylabel('Power [dB]')
level.legend()
level.set_xticklabels([])
level.set_yticklabels([])
level.set_xlabel('V [a.u.]')
level.set_ylabel('Samples / V')
def get_b64(fig):
"""
Encodes figure as b64 encoded png bytes object.
"""
fig_buffer = io.BytesIO()
fig.savefig(fig_buffer, format='png')
fig_buffer.seek(0)
b64 = base64.b64encode(fig_buffer.read())
return b64
result = []
for f in [spectrum_figure, level_figure]:
f.tight_layout()
f.suptitle(data['header']['timestamp'])
result.append(get_b64(f))
plt.close(f)
return result
class DigitizerControllerPipeline(EDDPipeline):
"""@brief gated spectrometer pipeline
"""
VERSION_INFO = ("mpikat-edd-api", 0, 1)
BUILD_INFO = ("mpikat-edd-implementation", 0, 1, "rc1")
# list of sensors that are passed through
def __init__(self, ip, port, device_ip, device_port=7147):
"""@brief initialize the pipeline.
@param device is the control ip of the board
......@@ -159,6 +234,8 @@ class DigitizerControllerPipeline(EDDPipeline):
# We do not know the initial state of the packetizr before we take
# control, thus we will config on first try
self.__previous_config = None
self.__plotting = False
def setup_sensors(self):
......@@ -167,6 +244,21 @@ class DigitizerControllerPipeline(EDDPipeline):
"""
EDDPipeline.setup_sensors(self)
self._bandpass = Sensor.string(
"bandpass_PNG",
description="band-pass data (base64 encoded)",
initial_status=Sensor.UNKNOWN)
self.add_sensor(self._bandpass)
self._level = Sensor.string(
"level_PNG",
description="ADC Level (base64 encoded)",
initial_status=Sensor.UNKNOWN)
self.add_sensor(self._level)
# def check_config(self, cfg):
# errors = []
# if not ip_utils.is_valid_multicast_range(*ip_utils.split_ipstring(cfg["output_data_streams"]["polarization_0"]["ip"])):
......@@ -206,6 +298,11 @@ class DigitizerControllerPipeline(EDDPipeline):
sync_age = time.time() - (yield self._client.get_sync_time())
if self._config["snapshot_frequency"] > 0:
self.start_plotting()
else:
self.stop_plotting()
if self._config['force_reconfigure'] or self.__previous_config != self._config or sync_age > self._config["max_sync_age"]:
log.debug("Reconfiguring packetizer - Config changed: {}; Sync_age : {}; Forced: {}".format(self.__previous_config != self._config, sync_age, self._config['force_reconfigure']))
......@@ -223,6 +320,8 @@ class DigitizerControllerPipeline(EDDPipeline):
yield self._client.flip_spectrum(self._config["flip_spectrum"])
yield self._client.set_bit_width(self._config["bit_depth"])
yield self._client.set_digitial_filter(self._config['digital_filter'])
for i, ip_address in enumerate(self._config["interface_addresses"]):
yield self._client.set_interface_address(i, ip_address)
yield self._client.set_destinations(vips, hips)
......@@ -231,6 +330,7 @@ class DigitizerControllerPipeline(EDDPipeline):
else:
yield self._client.synchronize()
log.debug("Update output data streams")
sync_time = yield self._client.get_sync_time()
......@@ -246,7 +346,6 @@ class DigitizerControllerPipeline(EDDPipeline):
yield self._client.set_noise_diode_firing_pattern(**self._config["noise_diode_pattern"])
@state_change(target="streaming", allowed=["configured"], intermediate="capture_starting")
@coroutine
def capture_start(self, config_json=""):
......@@ -257,6 +356,116 @@ class DigitizerControllerPipeline(EDDPipeline):
yield self._client.capture_start()
@request()
@return_reply()
def request_start_plotting(self, req):
"""
Starts periodic plotting of snapshots.
Return:
katcp reply object
"""
log.debug("Received start plot request")
req.reply("ok")
self.ioloop.add_callback(self.start_plotting)
raise AsyncReply
@request()
@return_reply()
def request_stop_plotting(self, req):
"""
Stop periodic plotting of snapshots.
Return:
katcp reply object
"""
self.stop_plotting()
raise AsyncReply
@coroutine
def start_plotting(self):
"""
start plotting of snapshots
"""
if not self.__plotting:
self.__plotting = True
self.periodic_plot_snapshot()
@coroutine
def stop_plotting(self):
"""
start plotting of snapshots
"""
self.__plotting = False
@coroutine
def periodic_plot_snapshot(self):
log.debug("Start periodic plot")
passthrough = ["rxs.packetizer.systemstatus.snapshot.totalpwr.h", "rxs.packetizer.systemstatus.snapshot.totalpwr.v", "rxs.packetizer.1pps.length", "rxs.packetizer.1pps.last", "s-band.center-frequency", "s-band.bandwidth", "s-band.noise-diode", "s-band.sampling-rate", "s-band.time.synchronisation-epoch", "rxs.packetizer.1pps.length", "rxs.packetizer.40g.selected-filter", "rxs.packetizer.device-status"]
self.__pass_through_sensors = {}
if not self.has_sensor(passthrough[0]):
# Checking first is enough
log.debug("Adding pass-through sensors")
yield self._client._client.until_synced()
log.debug("Client synced through sensors")
sensor_list = yield self._client._client.list_sensors()
for st in sensor_list:
if st.name not in passthrough:
log.debug(" - ignoring {}".format(st[1]))
continue
log.debug(" - adding {}".format(st))
try:
if st.type == "discrete":
t = "string"
else:
t = st.type
s = Sensor(Sensor.parse_type(t), st.name, st.description, st.units)
self.__pass_through_sensors[st.name] = st.object
except Exception as E:
log.exception(E)
self.add_sensor(s)
else:
log.debug("pass through sensors already added")
log.debug("Starting periodic plot")
with ProcessPoolExecutor(max_workers=1) as executor:
try:
while self.__plotting:
for key in passthrough:
sensor = self.get_sensor(key)
v = yield self.__pass_through_sensors[key].get_reading()
sensor.set_value(v.value, timestamp=v.timestamp)
starttime = time.time()
data = yield self._client.get_snapshot()
plt = yield executor.submit(plot_script, data)
log.debug("Setting bandpass sensor with timestamp")
self._bandpass.set_value(plt[0])
self._level.set_value(plt[1])
duration = time.time() - starttime
log.debug("Plot duration: {} s".format(duration))
if duration > self._config['snapshot_frequency']:
log.warning("Plot duration {} larger than plto interval!".format(duration, self._config["nplot"]))
else:
yield sleep(self._config['snapshot_frequency'] - duration)
except Exception as E:
log.error("Error in periodic plot. Abandon plotting.")
log.exception(E)
@coroutine
def measurement_prepare(self, config_json=""):
"""@brief Set quantization factor and fft_shift parameter"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment