diff --git a/mpikat/core/data_stream.py b/mpikat/core/data_stream.py index d2d39e3ef5d5a5c15e0b1cc1df1aab7d828af913..19418fc0758086bace4b07b0038d1dc92b3f1d85 100644 --- a/mpikat/core/data_stream.py +++ b/mpikat/core/data_stream.py @@ -7,32 +7,31 @@ import mpikat.core.logger _log = mpikat.core.logger.getLogger("mpikat.core.data_stream") -def convert48_64(A): +def convert48_64(arr: list) -> int: """ Converts 48 bit to 64 bit integers. Args: - A: array of 6 bytes. + arr (list): array of 6 bytes. Returns: - int + 64bit integer representation of the 48 bit list """ - assert len(A) == 6 - npd = np.array(A, dtype=np.uint64) # pylint: disable=no-member + assert len(arr) == 6 + npd = np.array(arr, dtype=np.uint64) # pylint: disable=no-member return np.sum(256**np.arange(0, 6, dtype=np.uint64)[::-1] * npd) # pylint: disable=no-member -def convert64_48(N): +def convert64_48(num: int) -> np.ndarray: """ Converts 64 bit number to 48 bit number Args: - N: 64-bit number + num (int): 64-bit number Returns: array of lower 6 bytes - """ - return np.frombuffer(N.to_bytes(6, byteorder='big'), dtype=np.uint8) + return np.frombuffer(num.to_bytes(6, byteorder='big'), dtype=np.uint8) def edd_spead(eid, name, description="", shape=(6,), dtype='>u1'): @@ -95,7 +94,7 @@ class DataStream(ABC): stream_meta = {} @classmethod - def get_data_item(cls, **kwargs): + def get_data_item(cls, **kwargs) -> spead2.Item: """ Get a data item by id, name or description via .get_data_item(id=1234) or .get_data_item(name='foo') """ @@ -109,7 +108,7 @@ class DataStream(ABC): raise ValueError(f'No item with {k} = {v}') @classmethod - def ig_update(cls, items, ig): # pylint: disable=unused-argument + def ig_update(cls, items, ig) -> bool: # pylint: disable=unused-argument """ Update the item group depending on the received items. Useful to dynamically modify the group based on the first received item. @@ -120,14 +119,14 @@ class DataStream(ABC): return False @classmethod - def add_item2group(cls, item, item_group): + def add_item2group(cls, item: spead2.Item, item_group: spead2.ItemGroup): """ Add a item to the item-group. """ item_group.add_item(item.id, item.name, item.description, item.shape, item.dtype) @classmethod - def create_item_group(cls): + def create_item_group(cls) -> spead2.ItemGroup: """ Create a spead2 item group for the format. """ @@ -136,11 +135,10 @@ class DataStream(ABC): for d in cls.data_items: if d.shape: cls.add_item2group(d, ig) - return ig @classmethod - def validate_items(cls, items): + def validate_items(cls, items: spead2.ItemGroup) -> bool: """ Validate that the received spead2 items match the item group. @@ -152,6 +150,7 @@ class DataStream(ABC): for i in cls.data_items: if i.name not in items: missing_keys.append(i.name) - _log.warning("Received invalid heap, containing only %i/%i keys. Missign keys: %s.", len(items), len(cls.data_items), ",".join(missing_keys)) + _log.warning("Received invalid heap, containing only %i/%i keys. Missign keys: %s.", + len(items), len(cls.data_items), ",".join(missing_keys)) return False return True diff --git a/mpikat/core/skarab_client.py b/mpikat/core/skarab_client.py deleted file mode 100644 index 5e25a83acac2e972123037d2bdea63f6dedb8851..0000000000000000000000000000000000000000 --- a/mpikat/core/skarab_client.py +++ /dev/null @@ -1,366 +0,0 @@ -from argparse import ArgumentParser - -from tornado.gen import coroutine, sleep -from tornado.platform.asyncio import AsyncIOMainLoop -from tornado.ioloop import IOLoop -import casperfpga - -import mpikat.core.logger -from mpikat.utils.ip_utils import split_ipstring, is_valid_multicast_range - -DEFAULTFIRMWARE="s_ubb_64ch_codd_2020-07-31_1348.fpg" - -log = mpikat.core.logger.getLogger("mpikat.core.edd_skarab_client") - -class SkarabInterface(object): - """ - Interface to skarab FPGAs via casperfpga but with asyncio. - """ - def __init__(self, ip, port, firmwarefile=None): - """Skarab instance") - - ip and port of the FPGA board to connect to. - """ - log.debug("Starting Skarab instance") - self.__ip = ip - self.__port = port - self._firmwarefile = firmwarefile - self._client = None - - def setFirmware(self, firmwarefile): - self._firmwarefile = firmwarefile - - @coroutine - def connect(self): - """ - Connects to a client. Raises runtime error after timeout. - """ - log.debug("Connecting to FPGA {}:{}".format(self.__ip, self.__port)) - self._client = casperfpga.CasperFpga(self.__ip, self.__port) - info = self._client.transport.get_skarab_version_info() - log.debug("Succcessfully Connected to FPGA - Retrieved Skarab info:" + "\n".join([" - {}: {}".format(k,v) for k,v in info.items()])) - - - def is_connected(self): - """ - Returns true if connection to skarab exists and is active - """ - if self._client: - return self._client.is_connected() - else: - return False - - @coroutine - def program(self): - """ - Programs FPGA with the chosen firmware - """ - log.debug("Loading firmware from {} ... ". format(self._firmwarefile)) - - #log.warning("NOT LOADING FIRMWARE DUE TO (Hard-Coded) TEST MODE") - #res = True - res = self._client.transport.upload_to_ram_and_program(self._firmwarefile) - if not res: - raise RuntimeError("Error loading firmware: {}. Result: {}".format(self._firmwarefile, res)) - log.debug("Done loading firmware {}". format(self._firmwarefile)) - - - @coroutine - def initialize(self): - """ - Connect to FPGA and try to read system information. Reprograms FPGA if this fails. - """ - yield self.connect() - try: - self._client.get_system_information(self._firmwarefile) - except: - log.error('Error getting system information for firmware {} - reprogramming '.format(self._firmwarefile)) - yield self.connect() - yield self.program() - - self._client.get_system_information(self._firmwarefile) - - - -class SkarabChannelizerClient(SkarabInterface): - """ - Client accessingd a Skarab wih channelizers firmware [REFERENCE TO PROEJCT HERE] - - The interface wraps all register oprations in high-level function calls, - so that the user can be mostly agnostic about the inner workings on the - firmware. - """ - def __init__(self, ip, port, firmwarefile="s_ubb_64ch_codd_2020-07-31_1348.fpg"): - """ - @brief Creates client, does NOT connect - - @param ip - @param port - @param firmwarefile - """ - SkarabInterface.__init__(self, ip, port, firmwarefile) - - - @coroutine - def configure_inputs(self, ips0, ips1, port=None): - """ - Does multicast group subscription. ip0, ip1 are of format - 225.0.0.152+3 to give .152, .153, .154, .155 - """ - rx_core0_name = 'gbe0' - rx_core1_name = 'gbe1' - log.debug('Subscribing to multicast groups ...') - - ip0, N0, p0 = split_ipstring(ips0) - ip1, N1, p1 = split_ipstring(ips1) - if not is_valid_multicast_range(ip0, N0, p0): - raise RuntimeError("Invalid multicast range {}".format(ips0)) - if not is_valid_multicast_range(ip1, N1, p1): - raise RuntimeError("Invalid multicast range {}".format(ips1)) - if not N0 == N1: - raise RuntimeError("Multicast range of both interfaces have to match {} {}".format(ips0, ips1)) - if not port and p1: - port = p1 - - - log.debug(" - set IGMP version to 2") - self._client.set_igmp_version("2") - - mask = casperfpga.gbe.IpAddress("255.255.255.252") - ip0 = casperfpga.gbe.IpAddress(ip0) - - log.debug(" - configure gbe0 to {} with mask {}".format(ip0.ip_str, mask.ip_str)) - self._client.transport.multicast_receive("gbe0", ip0, mask, 1) - self._client.gbes.gbe0.set_port(port) - - ip1 = casperfpga.gbe.IpAddress(ip1) - log.debug(" - configure gbe1 to {} with mask {}".format(ip1.ip_str, mask.ip_str)) - self._client.transport.multicast_receive("gbe1", ip1, mask, 2) - self._client.gbes.gbe1.set_port(port) - - - @coroutine - def generate_snapshots(self, size=4096): - """ - @brief Generates ADC snapshot data - - @params size - """ - log.debug('Getting ADC snapshot') - - self._client.registers.control.write(adc_snap_arm='pulse') - self._client.write_int('adc0_ctrl', 0x00) - self._client.write_int('adc0_ctrl', 0x01) - self._client.write_int('adc1_ctrl', 0x00) - self._client.write_int('adc1_ctrl', 0x01) - - fmt = '>{}b'.format(size) - log.debug(' byte format: {}'.format(fmt)) - - _adc0_snap = np.asarray(struct.unpack(fmt, self._client.read('adc0_bram', size))) - _adc1_snap = np.asarray(struct.unpack(fmt, self._client.read('adc1_bram', size))) - - timestamp = fpga.read_int('adc0_val') - if timestamp < 0: - timestamp = timestamp + 2**32 - log.debug(' timestamp: {}'.format(timestamp)) - - return (timestamp, _adc0_snap, _adc1_snap) - - - @coroutine - def configure_output(self, dest_ip, dest_port, number_of_groups=8, channels_per_group=8, board_id=0x07): - """ - @brief Configur skarab output - - @param dest_ip - @param dest_port - @param number_of_groups - @param channels_per_group - @param board_id Board ID [0 .. 255] inserted into the spead header - """ - log.debug('Configuring ouput') - log.debug(" - Staring ip: {}".format(dest_ip)) - log.debug(" - Port: {}".format(dest_port)) - ip = casperfpga.gbe.IpAddress(dest_ip) - log.debug(" - Packed ip: {}".format(ip.ip_int)) - self._client.write_int('iptx_base', ip.ip_int) - - # 3 hex numbers of one byte, defining number of frequency channels in each ouput MC, - # numper of output MC, and a magic number 8 - log.debug(" - Number of groups: {}".format(number_of_groups)) - log.debug(" - Channels per group: {}".format(channels_per_group)) - x_setup = (channels_per_group << 16) + (number_of_groups << 8) + 8 - log.debug(" - x_setup: {}".format(hex(x_setup))) - - self._client.write_int('x_setup', x_setup) - - # adjusting for leap mode setup, two fpga registers need to be set. - if channels_per_group == 1: - log.debug("Setting up for 64 mc LEAP mode, setting new x_setup and tx_pktsize_bytes") - log.debug(" - x_setup: {}".format(0x01080804)) - log.debug(" - tx_pktsize_bytes: {}".format(8000)) - - self._client.write_int('x_setup', 0x01080804) - self._client.write_int('tx_pktsize_bytes', 8000) - - # tx_meta consists of 24 bits for the port and on byte for the board_id - # tx_meta = 0xEA6107 :: PORT=EA61 (60001), BOARDID=07 - tx_meta = ((dest_port << 8) + board_id) - self._client.write_int('tx_metadata',tx_meta) - log.debug(" - Board id: {}".format(hex(board_id))) - log.debug(" - tx_meta: {}".format(hex(tx_meta))) - - - @coroutine - def capture_start(self): - log.debug("Starting capture ...") - yield sleep(1) - self.reset_registers() - self._client.registers.control.write(gbe_txen=True) - msw = self._client.read_int('sync_time_msw') - lsw = self._client.read_int('sync_time_lsw') - lock_timestamp = lsw + (msw << 32) - log.info("SKARAB locked to {}".format(lock_timestamp)) - - - @coroutine - def capture_stop(self): - log.debug("Stopping capture ...") - self._client.registers.control.write(gbe_txen=False) - - - @coroutine - def get_fpga_clock(self): - clk = round(self._client.estimate_fpga_clock()) - log.debug("FPGA running at {}".format(clk)) - return clk - - - @coroutine - def read_output_config(self): - """ - Reads destination IP and port from the firmware register - """ - log.debug("Rading dstination ip:") - d_ip = self._client.read_int('iptx_base') + 2**32 # Convert to unsigned int - log.debug(" - packed ip: {}".format(d_ip)) - ip = casperfpga.gbe.IpAddress(d_ip) - - tx_meta = self._client.read_int('tx_metadata') - log.debug(" - tx_meta: {}".format(hex(tx_meta))) - dest_port = (tx_meta >> 8) & 0xFFFF - board_id = tx_meta & 0xFF - return (ip.ip_str, dest_port, board_id) - - - @coroutine - def configure_quantization_factor(self, quant_factor): - """ - Writes quantization factor value in the firmware register - """ - self._client.write_int('quant_coeff',quant_factor) - yield sleep(0.5) - quant_val = self._client.read_int('quant_coeff') - if quant_factor != quant_val: - raise RuntimeError("Error setting quantization factor {}".format(hex(quant_factor))) - - - @coroutine - def configure_fft_shift(self, fft_shift): - """ - Writes FFT Shift - """ - self._client.write_int('fft_shift',fft_shift) - yield sleep(0.5) - ffs = self._client.read_int('fft_shift') - if ffs != fft_shift: - raise RuntimeError("Error setting fft_shift {}".format(hex(ffs))) - - @coroutine - def lock_timestamp(self): - """ - Reset all internal registers to intial state. - """ - self._client.write_int("lock_timestamp", 256000000) - - - @coroutine - def reset_registers(self): - """ - Reset all internal registers to intial state. - """ - self._client.write_int("control",0) # gbe disable - self._client.registers.control.write(auto_rst_enable=True) - self._client.registers.control.write(sys_rst='pulse') #256: sys_rst, 511 mrst pulse - - -if __name__ == "__main__": - parser = ArgumentParser(description="Configures skarab board") - parser.add_argument('host', type=str, - help='IP of Skarab board to bind to') - parser.add_argument('-p', '--port', dest='port', type=int, - help='Port to bind to (default=7147)', default=7147) - - parser.add_argument('--firmwarefile', dest='firmwarefile', type=str, default=DEFAULTFIRMWARE, - help='firmwarefile to load') - parser.add_argument('--configure-inputs', dest='inputs', type=str, - help='input multicast of format 225.0.0.152+3:7148,225.0.0.156+3:7148') - - parser.add_argument('--configure-outputs', dest='outputs', type=str, - help='Ouput destinations format baseip:port, e.g. 239.0.0.111+7:7148') - - parser.add_argument('--log-level',dest='log_level',type=str, - help='Logging level',default="INFO") - - parser.add_argument('--set-quantization', dest='quantization', - help='Sets quanization factor') - - parser.add_argument('--set-fftshift', dest='fftshift', - help='Sets fft shift') - - parser.add_argument('--program', action="store_true", default=False, help="Programs he FPGA with the given firmware") - args = parser.parse_args() - - print("Configuring skarab{}:{} wih firmware {}".format(args.host, args.port, args.firmwarefile)) - client = SkarabChannelizerClient(args.host, args.port, args.firmwarefile) - - mpikat.core.logger.setLevel(args.log_level.upper()) - - actions = [(client.initialize, {})] - if args.program: - actions.append((client.program, {})) - - if args.outputs: - ip, N, port = split_ipstring(args.outputs) - actions.append((client.configure_output, dict(dest_ip=ip, dest_port=port, number_of_groups=N))) - if args.inputs: - inp0, inp1 = args.inputs.split(',') - actions.append((client.configure_inputs, dict(ips0=inp0, ips1=inp1))) - - if args.quantization: - if args.quantization.startswith("0x"): - v = int(args.quantization, 16) - else: - v = int(args.quantization) - actions.append((client.configure_quantization_factor, dict(quant_factor=v))) - if args.fftshift: - if args.fftshift.startswith("0x"): - v = int(args.fftshift, 16) - else: - v = int(args.fftshift) - - actions.append((client.configure_fft_shift, dict(fft_shift=v))) - - if args.program or args.outputs or args.inputs: - actions.append((client.capture_start, {})) - - @coroutine - def perform_actions(): - for action, params in actions: - yield action(**params) - AsyncIOMainLoop().install() - ioloop = IOLoop.current() - - ioloop.run_sync(perform_actions) - diff --git a/mpikat/core/telescope_meta_information_server.py b/mpikat/core/telescope_meta_information_server.py index 0df88103caf7ba6eddc4a7851c348eec592ce949..15ae4bdc8030a55ea785d74b3f5f5086a4dbb6cc 100644 --- a/mpikat/core/telescope_meta_information_server.py +++ b/mpikat/core/telescope_meta_information_server.py @@ -1,5 +1,5 @@ """ -Telescope meta information server. Interface to telescope control systems, that +Telescope meta information (TMI) server. Interface to telescope control systems, that passively collects information about the telescope status and updates the dat store accordingly. Has to be sub-classed for every telescope. """ diff --git a/mpikat/pipelines/fits_interface.py b/mpikat/pipelines/fits_interface.py index 581b1bd0afe1a4876e03e84ada33fc405c49020f..6ea4050c8e181abd48b7e8d29392d4af1dd56a3e 100644 --- a/mpikat/pipelines/fits_interface.py +++ b/mpikat/pipelines/fits_interface.py @@ -3,7 +3,6 @@ Pipeline to send spectrometer data to the Effelsberg fits writer (APEX Fits writ """ import asyncio import time -import ctypes import copy import json from datetime import datetime @@ -18,7 +17,8 @@ from mpikat.core import logger from mpikat.utils.spead_capture import SpeadCapture, GatedSpectrometerSpeadHandler from mpikat.utils.sensor_watchdog import conditional_update import mpikat.utils.numa as numa -from mpikat.utils.fits import FitsSender +from mpikat.utils import fits + _log = logger.getLogger("mpikat.pipelines.fits_interface") @@ -93,6 +93,7 @@ _DEFAULT_CONFIG = { "nplot": 10., # update plot every 10 s "max_snapshots": 1024 }, + "dummy_input": False, "enable_plotting":True, "spead_config":copy.deepcopy(SpeadCapture.DEFAULT_CONFIG), "max_package_age":10, @@ -206,7 +207,7 @@ class FitsInterfaceServer(EDDPipeline): self._fits_sender.stop() self._fits_sender.join() - self._fits_sender = FitsSender( + self._fits_sender = fits.Sender( self._config["fits_writer_ip"], self._config["fits_writer_port"], keep_socket_alive=self._config["keep_socket_alive"]) @@ -223,17 +224,13 @@ class FitsInterfaceServer(EDDPipeline): """ _log.debug("Starting FITS interface capture") - spead_handler = GatedSpectrometerSpeadHandler() - merger = FitsPacketMerger(self._fits_sender, self.data_plotter, len(self._config["input_data_streams"]) // 2, self._config["noise_time_delta"], self._config['max_package_age'], self._config['drop_nans']) self._capture_thread = SpeadCapture(self._config["input_data_streams"], - spead_handler, - merger, - self._config['spead_config']) + GatedSpectrometerSpeadHandler(), merger, self._config['spead_config']) self._capture_thread.start() @@ -275,7 +272,7 @@ class FitsInterfaceServer(EDDPipeline): async def capture_stop(self): """Stops the capturing and plotting if enabled """ - _log("Capture stopping FitsInterface") + _log.info("Capture stopping FitsInterface") self._stop_capturing() @@ -283,7 +280,7 @@ class FitsInterfaceServer(EDDPipeline): async def deconfigure(self): """Deconfigures the pipeline """ - _log("Deconfiguring FitsInterface") + _log.info("Deconfiguring FitsInterface") self._stop_capturing() @@ -348,81 +345,13 @@ class FitsInterfaceServer(EDDPipeline): await super().stop() -def fw_factory(nsections, nchannels, data_type="EEEI", channel_data_type="F "): - """ - Creates APEX fits writer compatible packages. - - Args: - nsections (int): Number of sections. - nchannels (int): Number of channels per section. - data_type (str): Type of data - Only 'EEEI' is supported right now. - channel_data_type (str): Type of channel data - Only 'F ' is supported right now. - - References: - .. [HAFOK_2018] ]H. Hafok, D. Muders and M. Olberg, 2018, APEX SCPI socket command syntax and backend data stream format, - APEX-MPI-ICD-0005 https://www.mpifr-bonn.mpg.de/5274578/APEX-MPI-ICD-0005-R1_1.pdf - """ - class FWData(ctypes.LittleEndianStructure): # pylint: disable=too-few-public-methods - """ - Container for spectrum data that will be written to socket. - """ - _pack_ = 1 - _fields_ = [ - ('section_id', ctypes.c_uint32), - ('nchannels', ctypes.c_uint32), - ('data', ctypes.c_float * nchannels) - ] - - class FWPacket(ctypes.LittleEndianStructure): # pylint: disable=too-few-public-methods,too-many-instance-attributes - """ - Struct that will be written to socket. - """ - _pack_ = 1 - _fields_ = [ - ("data_type", ctypes.c_char * 4), # 0 - 3 - ("channel_data_type", ctypes.c_char * 4), # 4 - 7 - ("packet_size", ctypes.c_uint32), # 8 - 11 - ("backend_name", ctypes.c_char * 8), # 12 - 19 - ("timestamp", ctypes.c_char * 28), # 20 - 48 - ("integration_time", ctypes.c_uint32), # 49 - 53 - ("blank_phases", ctypes.c_uint32), # 54 - 57 alternate between 1 and 2 cal on or off - ("nsections", ctypes.c_uint32), # 57 - 61 number of sections - ("blocking_factor", ctypes.c_uint32), # 61-63 spectra per section ? - ("sections", FWData * nsections) # actual section data - ] - - _log.trace("Creating new FW packet") - packet = FWPacket() - - packet.packet_size = ctypes.sizeof(packet) # pylint: disable=attribute-defined-outside-init - - if channel_data_type != "F ": - raise NotADirectoryError("cannot handle backend data format {}".format(channel_data_type)) # pylint: disable=undefined-variable - packet.channel_data_type = channel_data_type.encode('ascii') # pylint: disable=attribute-defined-outside-init - - if data_type != "EEEI": - raise NotADirectoryError("cannot handle numerical encoding standard {}".format(data_type)) # pylint: disable=undefined-variable - packet.data_type = data_type.encode('ascii') # pylint: disable=attribute-defined-outside-init - - packet.nsections = nsections # pylint: disable=attribute-defined-outside-init - - # Hard coded for effelsberg ?? - packet.blocking_factor = 1 # pylint: disable=attribute-defined-outside-init - for ii in range(nsections): - packet.sections[ii].section_id = ii + 1 # pylint: disable=attribute-defined-outside-init - packet.sections[ii].nchannels = nchannels # pylint: disable=attribute-defined-outside-init - - return packet - - - class PrepPack: # pylint: disable=too-few-public-methods """ Package in preparation. """ def __init__(self, number_of_spectra, nchannels, reference_time): self._nspectra = number_of_spectra - self.fw_pkt = fw_factory(number_of_spectra, nchannels) # Actual package for fits writer + self.fits_packet = fits.create_packet(nsections=number_of_spectra, nchannels=nchannels) # Actual package for fits writer self.counter = 0 # Counts spectra set in fw_pkg self.valid = True # Invalid package will not be send to fits writer self.number_of_input_samples = 0 @@ -447,12 +376,10 @@ class PrepPack: # pylint: disable=too-few-public-methods _log.debug("Invalidate package due to NaN detected") self.valid = False #ToDo: calculation of offsets without magic numbers - ctypes.memmove(ctypes.byref(self.fw_pkt.sections[int(sec_id)].data), - packet.data.ctypes.data + 4, packet.data.size * 4 - 4) + self.fits_packet.section(int(sec_id)).data = packet.data[1:-1] if self.counter == self._nspectra: # Fill header fields + output data - _log.debug("Got all parts for reference_time {:.3f} - Finalizing".format(packet.reference_time)) def local_to_utc(t): """Convert timestamp to datetimestring in UTC""" @@ -464,22 +391,23 @@ class PrepPack: # pylint: disable=too-few-public-methods # Blank should be at the end according to specification timestamp += ".{:04d}UTC ".format(int((float(packet.reference_time) - int(packet.reference_time)) * 10000)) - self.fw_pkt.timestamp = timestamp.encode('ascii') # pylint: disable=attribute-defined-outside-init - _log.debug(" Calculated timestamp for fits: {}".format(self.fw_pkt.timestamp)) + self.fits_packet.header.timestamp = timestamp.encode('ascii') # pylint: disable=attribute-defined-outside-init integration_time = packet.number_of_input_samples / float(packet.sampling_rate) - _log.debug(" Integration period: {} s".format(packet.integration_period)) - _log.debug(" Received samples in period: {}".format(packet.number_of_input_samples)) - _log.debug(" Integration time: {} s".format(integration_time)) - self.fw_pkt.backend_name = b"EDDSPEAD" # pylint: disable=attribute-defined-outside-init + self.fits_packet.header.backend_name = b"EDDSPEAD" # pylint: disable=attribute-defined-outside-init # As the data is normalized to the actual nyumber of samples, the # integration time for the fits writer corresponds to the nominal # time. - self.fw_pkt.integration_time = int(packet.integration_period * 1000000) # in ms pylint: disable=attribute-defined-outside-init + self.fits_packet.header.integration_time = int(packet.integration_period * 1000000) # in ms pylint: disable=attribute-defined-outside-init - self.fw_pkt.blank_phases = int(2 - packet.noise_diode_status) # pylint: disable=attribute-defined-outside-init - _log.debug(" Noise diode status: {}, Blank phase: {}".format(packet.noise_diode_status, self.fw_pkt.blank_phases)) + self.fits_packet.header.blank_phases = int(2 - packet.noise_diode_status) # pylint: disable=attribute-defined-outside-init + _log.debug("Got all parts for reference_time {:.3f} - Finalizing".format(packet.reference_time)) + _log.debug(" Calculated timestamp for fits: %s", self.fits_packet.header.timestamp) + _log.debug(" Integration period: %d s", packet.integration_period) + _log.debug(" Received samples in period: %d", packet.number_of_input_samples) + _log.debug(" Integration time: %d s", integration_time) + _log.debug(" Noise diode status: %s, Blank phase: %d", packet.noise_diode_status, self.fits_packet.header.blank_phases) class FitsPacketMerger: @@ -487,13 +415,13 @@ class FitsPacketMerger: Merge spectra to a package expected by the fits writer and pass completed packages to the outptu queue. Inclompelte pacakges above a max age will be dropped. """ - def __init__(self, fits_interface: FitsSender, data_plotter: DataPlotter, + def __init__(self, fits_interface: fits.Sender, data_plotter: DataPlotter, spectra_per_package: int, noise_time_delta: float, max_age: int=10, drop_invalid_packages: bool=True): """_summary_ Args: - fits_interface (FitsSender): _description_ + fits_interface (fits.Sender): _description_ data_plotter (DataPlotter): _description_ spectra_per_package (int): _description_ noise_time_delta (float): _description_ @@ -572,10 +500,9 @@ class FitsPacketMerger: """ # Cleanup old packages tooold_packages = [] - _log.debug('Checking {} packages for age restriction'.format(len(self.__packages_in_preparation))) + _log.debug('Checking %d packages for age restriction', len(self.__packages_in_preparation)) for p in self.__packages_in_preparation: age = self.__now - p - #_log.debug(" Package with timestamp {}: now: {} age: {}".format(p, self.__now, age) ) if age > self.__max_age: _log.warning(" Age of package {} exceeded maximum age {} - Incomplete package will be dropped.".format(age, self.__max_age)) tooold_packages.append(p) diff --git a/mpikat/pipelines/gated_dual_pol_spectrometer.py b/mpikat/pipelines/gated_dual_pol_spectrometer.py index 77902765d56bb1438a06ca428a154f82dac7d6ec..57673763b297c63d678d3a41f59f540fdae0e6c0 100644 --- a/mpikat/pipelines/gated_dual_pol_spectrometer.py +++ b/mpikat/pipelines/gated_dual_pol_spectrometer.py @@ -103,7 +103,7 @@ import mpikat.utils.dada_tools as dada from mpikat.core.edd_pipeline_aio import EDDPipeline, launchPipelineServer, state_change import mpikat.pipelines.digpack_controller # pylint: disable=unused-import -from mpikat.pipelines.gated_spectrometer.gated_spectrometer_data_stream import GatedSpectrometerDataStreamFormat +from mpikat.pipelines.gated_spectrometer.data_stream import GatedSpectrometerDataStreamFormat import mpikat.core.logger diff --git a/mpikat/pipelines/gated_full_stokes_spectrometer.py b/mpikat/pipelines/gated_full_stokes_spectrometer.py index 0275260e81fc4c99d0a47fd692536a22c1e90c6d..fe06029b42575d35a54b6b24ff620441b8edfa7f 100644 --- a/mpikat/pipelines/gated_full_stokes_spectrometer.py +++ b/mpikat/pipelines/gated_full_stokes_spectrometer.py @@ -93,7 +93,7 @@ import mpikat.utils.dada_tools as dada import mpikat.utils.numa as numa import mpikat.pipelines.digpack_controller # pylint: disable=unused-import -from mpikat.pipelines.gated_spectrometer.gated_spectrometer_data_stream import GatedSpectrometerDataStreamFormat +from mpikat.pipelines.gated_spectrometer.data_stream import GatedSpectrometerDataStreamFormat import mpikat.core.logger diff --git a/mpikat/pipelines/gated_spectrometer/gated_spectrometer_data_stream.py b/mpikat/pipelines/gated_spectrometer/data_stream.py similarity index 95% rename from mpikat/pipelines/gated_spectrometer/gated_spectrometer_data_stream.py rename to mpikat/pipelines/gated_spectrometer/data_stream.py index 36340f9c9fa3763b571d1a434944114db44b8cf0..edd7d2de72e04017d0d888a0d5f73c33d640b078 100644 --- a/mpikat/pipelines/gated_spectrometer/gated_spectrometer_data_stream.py +++ b/mpikat/pipelines/gated_spectrometer/data_stream.py @@ -1,3 +1,4 @@ +import spead2 from mpikat.core.data_stream import DataStream, DataStreamRegistry, edd_spead, convert48_64 import mpikat.core.logger @@ -33,7 +34,7 @@ class GatedSpectrometerDataStreamFormat(DataStream): } @classmethod - def ig_update(cls, items, ig): + def ig_update(cls, items: dict, ig: spead2.ItemGroup) -> bool: """ Update an item group dependent on the received items """ @@ -50,4 +51,5 @@ class GatedSpectrometerDataStreamFormat(DataStream): return True + DataStreamRegistry.register(GatedSpectrometerDataStreamFormat) diff --git a/mpikat/pipelines/mock_fits_writer.py b/mpikat/pipelines/mock_fits_writer.py index 5da1c051938c90aa6c1283feb3686d74496dabc0..97e3adb04845da7ae93e3f483e45d9fc90bb7714 100644 --- a/mpikat/pipelines/mock_fits_writer.py +++ b/mpikat/pipelines/mock_fits_writer.py @@ -1,7 +1,7 @@ import asyncio from mpikat.core.edd_pipeline_aio import EDDPipeline, state_change, launchPipelineServer, getArgumentParser -from mpikat.utils.fits import FitsReceiver +from mpikat.utils import fits from mpikat.core import logger _log = logger.getLogger("mpikat.pipelines.mock_fits_writer") @@ -20,16 +20,20 @@ class MockFitsWriter(EDDPipeline): self.__client = None @state_change(target="measuring", allowed=["set"], ignored=["streaming"], intermediate="measurement_starting") - def measurement_start(self): - _log("FitsWriter starts measuring") - self.__client = FitsReceiver((self._config["fits_ip"], self._config["fits_port"]), self._config["output_directory"]) + async def measurement_start(self): + _log.info("MockFitsWriter starts measuring") + self.__client = fits.Receiver((self._config["fits_ip"], self._config["fits_port"]), self._config["output_directory"]) self.__client.start() @state_change(target="ready", allowed=["measuring", "set"], ignored=['streaming', 'ready'], intermediate="measurement_stopping") - def measurement_stop(self): - _log("FitsWriter stops measuring") + async def measurement_stop(self): + _log.info("MockFitsWriter stops measuring") self.__client.stop() - self.__client.join() + self.__client.join(3) + + @property + def client(self) -> fits.Receiver: + return self.__client if __name__ == "__main__": diff --git a/mpikat/pipelines/skarab_pfb_controller.py b/mpikat/pipelines/skarab_pfb_controller.py deleted file mode 100644 index fc1b3db42a6a77e87b0d554406d4da30b0fde142..0000000000000000000000000000000000000000 --- a/mpikat/pipelines/skarab_pfb_controller.py +++ /dev/null @@ -1,246 +0,0 @@ -from __future__ import print_function, division, unicode_literals - -from mpikat.core.edd_pipeline import EDDPipeline, launchPipelineServer, state_change, getArgumentParser, setup_logger -from mpikat.core.datastore import EDDDataStore -from mpikat.core.skarab_client import SkarabChannelizerClient -import mpikat.utils.ip_utils as ip_utils - -from tornado.gen import coroutine, sleep, Return -from tornado.ioloop import IOLoop, PeriodicCallback -from katcp import Sensor, FailReply - -import json -import os - -import mpikat.core.logger - -_log = mpikat.core.logger.getLogger("mpikat.pipelines.SkarabPipeline") - -DEFAULT_CONFIG = { - "id": "SkarabPipeline", # default name for master controler. Needs to get a unique ID -- TODO, from ansible - "type": "SkarabPipeline", - "supported_input_formats": {"MPIFR_EDD_Packetizer": [1]}, # supported input formats name:version - "input_data_streams": - { - "polarization_0" : - { - "format": "MPIFR_EDD_Packetizer:1", # Format has version seperated via colon - "ip": "225.0.0.140+3", - "port": "7148", - "bit_depth" : 12, - }, - "polarization_1" : - { - "format": "MPIFR_EDD_Packetizer:1", - "ip": "225.0.0.144+3", - "port": "7148", - "bit_depth" : 12, - } - }, - "output_data_streams": # Filled programatically, see below - { # The output can be split into an arbitrary sequence of streams. The board streams to the lowest specified stream + 8 groups - - }, - - "log_level": "debug", - "force_program": False, # Force reprogramming of with new firmware version - 'skip_device_config': False, # Skips the skarab config ALWAYS. Overrides force - "firmware_directory": os.path.join(os.path.dirname(os.path.realpath(__file__)), "skarab_firmware"), - "firmware": "s_ubb_64ch_coddh_2020-09-21_1540.fpg", - "channels_per_group": 8, # Channels per multicast group in the fpga output - "board_id": 23, # Id to add to the spead headers of the FPGA output - "initial_quantization_factor": 0xff, # initial value for the quantization factor. Can be changed per measurement - "initial_fft_shift": 0xff, # initial value for the fft shift. Can be changed per measurement - - } - -#for i in range(8): -# ip = "239.0.0.{}".format(120+i) -# DEFAULT_CONFIG["output_data_streams"]['Output_{}'.format(i)] = {"format": "Skarab:1", "ip": ip, "port": "7152"} -#DEFAULT_CONFIG["output_data_streams"] = { "lower_subband": { "format": "Skarab:1", "ip": "239.0.0.120+3", "port": "7152", "central_freq":None, "sync_time": None, "sample_rate":None, "predecimation_factor": None}, "upper_subband": { "format": "Skarab:1", "ip": "239.0.0.124+3", "port": "7152", "central_freq":None, "sync_time": None, "sample_rate":None, "predecimation_factor": None } } - - -class SkarabPipeline(EDDPipeline): - """@brief gated spectrometer pipeline - """ - VERSION_INFO = ("mpikat-edd-api", 0, 1) - BUILD_INFO = ("mpikat-edd-implementation", 0, 1, "rc1") - - def __init__(self, ip, port, device_ip, device_port=7147): - """@brief initialize the pipeline. - @param device is the control ip of the board - """ - EDDPipeline.__init__(self, ip, port, DEFAULT_CONFIG) - _log.info('Connecting to skarab @ {}:{}'.format(device_ip, device_port)) - self._client = SkarabChannelizerClient(device_ip, device_port) - self.__periodic_callback = PeriodicCallback(self._check_fpga_sensors, 1000) - self.__periodic_callback.start() - - - @coroutine - def _check_fpga_sensors(self): - _log.debug(" Check FPGA Sensors") - if self._client.is_connected(): - clk = yield self._client.get_fpga_clock() - self._fpga_clock.set_value(clk) - - - def setup_sensors(self): - """ - @brief Setup monitoring sensors - """ - EDDPipeline.setup_sensors(self) - self._fpga_clock = Sensor.float( - "fpga-clock", - description="FPGA Clock estimate", - initial_status=Sensor.UNKNOWN) - self.add_sensor(self._fpga_clock) - - - @coroutine - def set(self, config_json): - cfg = yield self._cfgjson2dict(config_json) - if 'output_data_streams' in cfg: - _log.debug("Stripping outputs from cfg before check") - # Do not check output data streams, as the only relevant thing is here - # that they are consecutive - outputs = cfg.pop('output_data_streams') - _log.debug("Pipeline set") - yield EDDPipeline.set(self, cfg) - _log.debug("Re-adding outputs") - self._config['output_data_streams'] = outputs - self._configUpdated() - else: - EDDPipeline.set(self, cfg) - - - - @state_change(target="configured", allowed=["idle"], intermediate="configuring") - @coroutine - def configure(self): - """ - Configure the Skarab PFb Pipeline - - Args: - config_json A JSON dictionary object containing configuration information - """ - _log.info("Configuring EDD backend for processing") - - # Convert arbitrary output parts to input list - iplist = [] - for l in self._config["output_data_streams"].values(): - iplist.extend(ip_utils.ipstring_to_list(l["ip"])) - - output_string = ip_utils.ipstring_from_list(iplist) - output_ip, Noutput_streams, port = ip_utils.split_ipstring(output_string) - - port = set([l["port"] for l in self._config["output_data_streams"].values()]) - if len(port) != 1: - raise FailReply("Output data streams have to stream to same port") - - # update sync tim based on input - for l in self._config["output_data_streams"].values(): - l["sync_time"] = self._config["input_data_streams"]["polarization_0"]["sync_time"] - self._configUpdated() - - cfs = json.dumps(self._config, indent=4) - _log.info("Final configuration:\n" + cfs) - - if self._config["skip_device_config"]: - _log.warning("Skipping device configuration because debug mode is active!") - raise Return - - - _log.debug("Setting firmware string") - self._client.setFirmware(os.path.join(self._config["firmware_directory"], self._config['firmware'])) - _log.debug("Connecting to client") - self._client.connect() - - if self._config['force_program']: - _log.debug("Forcing reprogramming") - yield self._client.program() - - yield self._client.initialize() - yield self._client.lock_timestamp() - yield self._client.configure_inputs(self._config["input_data_streams"]["polarization_0"]["ip"], self._config["input_data_streams"]["polarization_1"]["ip"], int(self._config["input_data_streams"]["polarization_0"]["port"])) - - yield self._client.configure_output(output_ip, int(port.pop()), Noutput_streams, self._config["channels_per_group"], self._config["board_id"] ) - - yield self._client.configure_quantization_factor(self._config["initial_quantization_factor"]) - yield self._client.configure_fft_shift(self._config["initial_fft_shift"]) - - - @state_change(target="streaming", allowed=["configured"], intermediate="capture_starting") - @coroutine - def capture_start(self, config_json=""): - """ - @brief start streaming spectrometer output - """ - _log.info("Starting EDD backend") - yield sleep(5) - yield self._client.capture_start() - - - @coroutine - def measurement_prepare(self, cfg): - """@brief Set quantization factor and fft_shift parameter""" - if cfg is None: - cfg = {} - if "fft_shift" in cfg: - yield self._client.configure_fft_shift(cfg["fft_shift"]) - if "quantization_factor" in cfg: - yield self._client.configure_quantization_factor(cfg["quantization_factor"]) - - - @state_change(target="idle", allowed=["streaming"], intermediate="capture_stopping") - @coroutine - def capture_stop(self): - """ - @brief Stop streaming of data - """ - _log.info("Stoping EDD backend") - yield self._client.capture_stop() - - - @state_change(target="idle", intermediate="deconfiguring", error='panic') - @coroutine - def deconfigure(self): - """ - @brief deconfigure the gated spectrometer pipeline. - """ - _log.info("Deconfiguring EDD backend") - - - @coroutine - def populate_data_store(self, host, port): - """@brief Populate the data store""" - _log.debug("Populate data store @ {}:{}".format(host, port)) - dataStore = EDDDataStore(host, port) - _log.debug("Adding output formats to known data formats") - - descr = {"description":"Channelized complex voltage ouptut.", - "ip": None, - "port": None, - "sample_rate":None, - "central_freq":None, - "sync_time": None, - "predecimation_factor": None - } - dataStore.addDataFormatDefinition("Skarab:1", descr) - - - -if __name__ == "__main__": - - parser = getArgumentParser() - parser.add_argument('--skarab-ip', dest='skarab_ip', type=str, help='The control ip of the skarab board') - parser.add_argument('--skarab-port', dest='skarab_port', type=int, default=7147, help='The port number to control the skarab board') - - args = parser.parse_args() - setup_logger(args) - - pipeline = SkarabPipeline( - args.host, args.port, - args.skarab_ip, args.skarab_port) - - launchPipelineServer(pipeline, args) diff --git a/mpikat/utils/fits/__init__.py b/mpikat/utils/fits/__init__.py index 1dfc256b1e6f09a4b1c6faf46318622e5567a10c..3c9c805267dc8908a049c5ecf3f4bfc8db3bf4ee 100644 --- a/mpikat/utils/fits/__init__.py +++ b/mpikat/utils/fits/__init__.py @@ -8,8 +8,8 @@ import numpy as np import ctypes from mpikat.utils.fits.format import TYPE_MAP, FitsSection, FitsHeader, FitsPacket, FitsPayload, _check_payload, itemsize, np_dtype, c_dtype -from mpikat.utils.fits.sender import FitsSender -from mpikat.utils.fits.receiver import FitsReceiver, recv_nbytes, recv_packet +from mpikat.utils.fits.sender import Sender +from mpikat.utils.fits.receiver import Receiver, recv_nbytes, recv_packet def getFitsTime(t: float) -> str: @@ -103,8 +103,8 @@ __all__ = ["TYPE_MAP", "FitsSection", "FitsHeader", "FitsPacket", - "FitsSender", + "Sender", "recv_nbytes", "recv_packet", - "FitsReceiver" + "Receiver" ] diff --git a/mpikat/utils/fits/format.py b/mpikat/utils/fits/format.py index 8fa203d748bcd70602f5ac0a8483aead791c4fe0..bd23a648c6c8b30a77425ee2e1e740b99e1f1f51 100644 --- a/mpikat/utils/fits/format.py +++ b/mpikat/utils/fits/format.py @@ -76,7 +76,7 @@ class FitsSection(ctypes.LittleEndianStructure): ('nchannels', ctypes.c_uint32) ] - def __init__(self, section_id: int=0, nchannels: int=0, data: np.ndarray=None): + def __init__(self, section_id: int=1, nchannels: int=0, data: np.ndarray=None): self.section_id = section_id self.nchannels = nchannels if data is not None: @@ -130,14 +130,13 @@ class FitsPayload(): stream = io.BytesIO(raw) while stream.tell() < len(raw): self.sections.append(FitsSection.from_buffer_copy(stream.read(ctypes.sizeof(FitsSection)))) - print(self.sections, dtype.itemsize) self.sections[-1].data = np.frombuffer( stream.read(self.sections[-1].nchannels * dtype.itemsize), dtype=dtype) def fromarray(self, data: np.ndarray): nsec, nchan = _check_payload(data) for i in range(nsec): - self.sections.append(FitsSection(i, nchan, data[i])) + self.sections.append(FitsSection(i+1, nchan, data[i])) def add(self, section: FitsSection): if not isinstance(section, FitsSection): diff --git a/mpikat/utils/fits/receiver.py b/mpikat/utils/fits/receiver.py index 304b7772acad5f952c2d099ffb7ad3e7a0abed92..c3fd4ab23ca057ce8bb5d068f8d3121022d6759e 100644 --- a/mpikat/utils/fits/receiver.py +++ b/mpikat/utils/fits/receiver.py @@ -55,7 +55,7 @@ def recv_packet(sock: socket.socket) -> FitsPacket: return packet -class FitsReceiver(threading.Thread): +class Receiver(threading.Thread): """ Class for receiving and storing FITS packets """ @@ -169,4 +169,5 @@ class FitsReceiver(threading.Thread): """Stops the running thread """ _log.info("Stopping receiving thread") - self._quit.set() \ No newline at end of file + self._quit.set() + _log.info("Receiving thread stopped") \ No newline at end of file diff --git a/mpikat/utils/fits/sender.py b/mpikat/utils/fits/sender.py index 192fc33c5b93ff06afb1cefa8d4c866da15ba07a..2bceacb3f60bfd0c24d6524bf634a77b91e78738 100644 --- a/mpikat/utils/fits/sender.py +++ b/mpikat/utils/fits/sender.py @@ -16,7 +16,7 @@ _FW_QUEUE_SIZE = 2048 # max Size of output queue _SOCKET_TIMEOUT = 12 # Ensure that we tmeout only after the Effelsberg Fits Writer -class FitsSender(Thread): +class Sender(Thread): """ Manages TCP connections to the APEX FITS writer. @@ -29,7 +29,7 @@ class FitsSender(Thread): order. """ def __init__(self, ip: str, port: int, delay: int=3, keep_socket_alive: bool=False): - """Constructor of FitsSender + """Constructor of Sender Args: ip (str): The IP address to accept connectons from. @@ -113,7 +113,7 @@ class FitsSender(Thread): def disconnect(self): """ - Drop any current FITS writer connection. The FitsSender will be ready for new connections. + Drop any current FITS writer connection. The Sender will be ready for new connections. """ self._has_connection.clear() self.start_time = np.inf diff --git a/mpikat/utils/spead_capture.py b/mpikat/utils/spead_capture.py index 898add4ed3702069f379fa423ea666c5079700fe..740fe24e40e47d634909d0023b94fc42b4ac8bd4 100644 --- a/mpikat/utils/spead_capture.py +++ b/mpikat/utils/spead_capture.py @@ -11,7 +11,7 @@ import mpikat.core.logger from mpikat.core.data_stream import convert48_64 from mpikat.utils.timestamp_handler import TimestampHandler -from mpikat.pipelines.gated_spectrometer.gated_spectrometer_data_stream import GatedSpectrometerDataStreamFormat +from mpikat.pipelines.gated_spectrometer.data_stream import GatedSpectrometerDataStreamFormat _log = mpikat.core.logger.getLogger("mpikat.utils.spead_capture") diff --git a/tests/test_fits_interface.py b/tests/test_fits_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..8c0cfbb28a6915a1dd2befe4eed9fd40163638ea --- /dev/null +++ b/tests/test_fits_interface.py @@ -0,0 +1,75 @@ +import unittest +from aiokatcp.client import Client + +from mpikat.pipelines.fits_interface import FitsInterfaceServer +from mpikat.pipelines.mock_fits_writer import MockFitsWriter +from mpikat.utils import get_port +from mpikat.utils import testing + + +HOST = "127.0.0.1" +class Test_FitsInterface(unittest.IsolatedAsyncioTestCase): + + def setUp(self) -> None: + self.sender_port = get_port() + self.sender_thread = testing.launch_server_in_thread(FitsInterfaceServer, HOST, self.sender_port) + self.receiver_port = get_port() + self.receiver_thread = testing.launch_server_in_thread(MockFitsWriter, HOST, self.receiver_port) + self.sender = self.sender_thread.server + self.receiver = self.receiver_thread.server + + async def asyncSetUp(self): + self.sender_client = await Client.connect(HOST, self.sender_port) + self.receiver_client = await Client.connect(HOST, self.receiver_port) + + def tearDown(self) -> None: + self.sender_client.close() + self.receiver_client.close() + + self.sender_thread.stop().result() + self.sender_thread.join(timeout=5) + + self.receiver_thread.stop().result() + self.receiver_thread.join(timeout=5) + + async def test_fits_send_recv(self): + """Should test if the sent fits packets from the FitsInterfaceServer + are received and written to disk from the MockFitsWriter. + ToDo: We need a spead heap generator of GS heaps""" + self.assertEqual(self.sender.state, "idle") + self.assertEqual(self.receiver.state, "idle") + + await self.sender_client.request('configure', '{"id":"sender","enable_plotting":false,"dummy_input":true}') + await self.receiver_client.request('configure', '{"id":"receiver"}') + self.assertEqual(self.sender.state, "configured") + self.assertEqual(self.receiver.state, "configured") + + await self.sender_client.request('capture-start') + await self.receiver_client.request('capture-start') + self.assertEqual(self.sender.state, "ready") + self.assertEqual(self.receiver.state, "ready") + + await self.sender_client.request('measurement-prepare') + await self.receiver_client.request('measurement-prepare') + self.assertEqual(self.sender.state, "set") + self.assertEqual(self.receiver.state, "set") + + await self.sender_client.request('measurement-start') + await self.receiver_client.request('measurement-start') + self.assertEqual(self.sender.state, "measuring") + self.assertEqual(self.receiver.state, "measuring") + + await self.sender_client.request('measurement-stop') + await self.receiver_client.request('measurement-stop') + self.assertEqual(self.sender.state, "ready") + self.assertEqual(self.receiver.state, "ready") + + await self.sender_client.request('deconfigure') + await self.receiver_client.request('deconfigure') + self.assertEqual(self.sender.state, "idle") + self.assertEqual(self.receiver.state, "idle") + + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_fits_module.py b/tests/test_fits_module.py index 716fca09cb6cd779c8086130fc8695e5a091c610..0d30d74d4d7aa0fceddf3432fb2a93e0d89d0389 100644 --- a/tests/test_fits_module.py +++ b/tests/test_fits_module.py @@ -53,7 +53,7 @@ class Test_FitsPacket(unittest.TestCase): """Should test if a packet can be created from a numpy array """ data = np.random.rand(3,3333).astype(np.float32) - test_packet = fits.create_packet(nchannels=2500, nsections=5, data=data) # nchannels and nsections should be gnored here + test_packet = fits.create_packet(nchannels=2500, nsections=5, data=data) # nchannels and nsections should be ignored here self.tobj.frombytes(bytes(test_packet)) self.assertEqual(bytes(self.tobj), bytes(test_packet)) @@ -62,8 +62,8 @@ class Test_FitsPacket(unittest.TestCase): class Test_FitsSendReceiver(unittest.TestCase): def setUp(self) -> None: - self.receiver = fits.FitsReceiver(_ADDRESS_, _DIRECTORY_, Queue()) - self.sender = fits.FitsSender(_ADDRESS_[0], _ADDRESS_[1], delay=0) + self.receiver = fits.Receiver(_ADDRESS_, _DIRECTORY_, Queue()) + self.sender = fits.Sender(_ADDRESS_[0], _ADDRESS_[1], delay=0) self.sender.start() def tearDown(self) -> None: @@ -84,7 +84,7 @@ class Test_FitsSendReceiver(unittest.TestCase): self.receiver.connect() def test_receiver_connect_disconnect(self): - """Should test if FitsReceiver can succesfully connect & disconnect + """Should test if Receiver can succesfully connect & disconnect """ self.receiver.connect() self.sender.wait_connected() @@ -95,7 +95,7 @@ class Test_FitsSendReceiver(unittest.TestCase): #self.assertFalse(self.sender.is_connected()) -> Won't register the disconnected receiver def test_start_stop(self): - """Should test if FitsReceiver starts and stops correctly + """Should test if Receiver starts and stops correctly """ self.receiver.wait_start() self.assertTrue(self.receiver.is_alive()) @@ -104,7 +104,7 @@ class Test_FitsSendReceiver(unittest.TestCase): self.assertFalse(self.receiver.is_alive()) def test_send_recv_packet(self): - """Should test if the FitsReceiver receives a sent FitsPacket + """Should test if the Receiver receives a sent FitsPacket """ self.receiver.wait_start() self.sender.is_measuring.set() @@ -121,7 +121,7 @@ class Test_FitsSendReceiver(unittest.TestCase): """ self.sender.stop() self.sender.join() - self.sender = fits.FitsSender(_ADDRESS_[0], _ADDRESS_[1], delay=4) + self.sender = fits.Sender(_ADDRESS_[0], _ADDRESS_[1], delay=4) self.sender.start() self.sender.is_measuring.set() self.receiver.wait_start() diff --git a/tests/test_master_controller.py b/tests/test_master_controller.py index 2979bcc9cd44d0e1c4e3f4d1932f04885ab0849d..033381fe210a5da8aa0b42dc51bfbcade7db1184 100644 --- a/tests/test_master_controller.py +++ b/tests/test_master_controller.py @@ -16,7 +16,7 @@ from mpikat.core.datastore import EDDDataStore from mpikat.core.edd_pipeline_aio import EDDPipeline from mpikat.pipelines.master_controller import ConfigurationGraph, EddMasterController, get_ansible_fail_message -from mpikat.utils.process_tools import command_watcher, ProcessException +from mpikat.utils.process_tools import ProcessException from mpikat.core import logger as logging from mpikat.utils import testing, get_port, ip_manager, ip_utils diff --git a/tests/test_spead_capture.py b/tests/test_spead_capture.py index 084f8944cea070606de9185164ea506228f4b918..260267b38ec410aa6dfd2ed31e67ca0a77a2c317 100644 --- a/tests/test_spead_capture.py +++ b/tests/test_spead_capture.py @@ -7,7 +7,7 @@ import numpy as np from mpikat.utils.spead_capture import GatedSpectrometerSpeadHandler from mpikat.core.data_stream import convert48_64, convert64_48 -from mpikat.pipelines.gated_spectrometer.gated_spectrometer_data_stream import GatedSpectrometerDataStreamFormat +from mpikat.pipelines.gated_spectrometer.data_stream import GatedSpectrometerDataStreamFormat class TestGatedSpectrometerSpeadhandler(unittest.TestCase): def setUp(self):