From 8d8032618dfd862b40244863bf0fedc726f09e0b Mon Sep 17 00:00:00 2001 From: Tobias Winchen <tobias.winchen@rwth-aachen.de> Date: Fri, 10 Jan 2020 15:24:45 +0100 Subject: [PATCH] VLBI: Allow for varying number fo frames in output block. --- psrdada_cpp/effelsberg/edd/VLBI.cuh | 2 + psrdada_cpp/effelsberg/edd/detail/VLBI.cu | 41 ++++++++++++++++----- psrdada_cpp/effelsberg/edd/src/vdif_send.cu | 6 ++- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/psrdada_cpp/effelsberg/edd/VLBI.cuh b/psrdada_cpp/effelsberg/edd/VLBI.cuh index 24093d09..9f051f85 100644 --- a/psrdada_cpp/effelsberg/edd/VLBI.cuh +++ b/psrdada_cpp/effelsberg/edd/VLBI.cuh @@ -34,10 +34,12 @@ class VDIFHeaderView uint32_t getReferenceEpoch() const; size_t getTimestamp() const; uint32_t getDataFrameNumber() const; + // Length of the data frame including the header, in units of 8 bytes uint32_t getDataFrameLength() const; uint32_t getNumberOfChannels() const; bool isRealDataType() const; bool isComplexDataType() const; + // Number of bits per sample -1 (max 32/bits per sample) uint32_t getBitsPerSample() const; uint32_t getThreadId() const; uint32_t getStationId() const; diff --git a/psrdada_cpp/effelsberg/edd/detail/VLBI.cu b/psrdada_cpp/effelsberg/edd/detail/VLBI.cu index fe85ed5c..e46aa816 100644 --- a/psrdada_cpp/effelsberg/edd/detail/VLBI.cu +++ b/psrdada_cpp/effelsberg/edd/detail/VLBI.cu @@ -36,8 +36,8 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth, BOOST_LOG_TRIVIAL(info) << "Creating new VLBI instance"; BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with " << vlbiHeaderSize << "bytes header info and " - << _vdifHeader.getDataFrameLength() - << " bytes payload"; + << _vdifHeader.getDataFrameLength() * 8 + << " bytes data frame length"; BOOST_LOG_TRIVIAL(debug) << " Expecting speadheaps of size " << speadHeapSize << " byte"; @@ -49,14 +49,23 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth, BOOST_LOG_TRIVIAL(debug) << " Input voltages size : " << _raw_voltage_db.size() << " 64-bit words," << _raw_voltage_db.size() * 64 / 8 << " bytes"; - _unpacked_voltage.resize(n64bit_words * 64 / input_bitDepth ); _packed_voltage.resize(n64bit_words * 64 / input_bitDepth * _output_bitDepth / 8); - - _spillOver.reserve(vdifHeader.getDataFrameLength() * 8); BOOST_LOG_TRIVIAL(debug) << " Output voltages size: " << _packed_voltage.size() << " byte"; + _spillOver.reserve(vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize); + + // number of vlbi frames per input block + size_t nSamplesPerInputBlock = _packed_voltage.size() * 8 / _output_bitDepth; + size_t frames_per_block = _packed_voltage.size() / (vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize); + BOOST_LOG_TRIVIAL(debug) << " this correspoonds to " << frames_per_block << " - " << frames_per_block + 1 << " frames"; + + _outputBuffer.resize((frames_per_block+1) * vdifHeader.getDataFrameLength() * 8 ); + // potetnitally invalidating the last frame + BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with " << _outputBuffer.size() << " bytes per buffer"; + + CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream)); @@ -228,8 +237,8 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { BOOST_LOG_TRIVIAL(debug) << " Number of blocks in output " << numberOfBlocksInOutput; - _outputBuffer.a().resize(numberOfBlocksInOutput * - (outputBlockSize + vlbiHeaderSize)); + //_outputBuffer.a().resize(numberOfBlocksInOutput * + // (outputBlockSize + vlbiHeaderSize)); BOOST_LOG_TRIVIAL(debug) << " Copying " << _spillOver.size() << " bytes spill over"; @@ -279,12 +288,26 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { BOOST_LOG_TRIVIAL(debug) << " Samples per data frame: " << samplesPerDataFrame; BOOST_LOG_TRIVIAL(debug) << " Dataframes per second: " << dataFramesPerSecond; - for (uint32_t i = 0; i < numberOfBlocksInOutput; i++) + for (uint32_t ib = 0; ib < _outputBuffer.a().size(); ib += _vdifHeader.getDataFrameLength() * 8) { // copy header to correct position std::copy(reinterpret_cast<uint8_t *>(_vdifHeader.getData()), reinterpret_cast<uint8_t *>(_vdifHeader.getData()) + vlbiHeaderSize, - _outputBuffer.a().begin() + i * (outputBlockSize + vlbiHeaderSize)); + _outputBuffer.a().begin() + ib); + size_t i = ib / _vdifHeader.getDataFrameLength() / 8; + + // invalidate rest of data so it can be dropped later. + // Needed so that the outpuitbuffer can have always the same size + if (i < numberOfBlocksInOutput) + { + _vdifHeader.setValid(); + } + else + { + _vdifHeader.setInvalid(); + continue; + } + // update header uint32_t dataFrame = _vdifHeader.getDataFrameNumber(); if (i < 5) diff --git a/psrdada_cpp/effelsberg/edd/src/vdif_send.cu b/psrdada_cpp/effelsberg/edd/src/vdif_send.cu index d487215b..f699c4f9 100644 --- a/psrdada_cpp/effelsberg/edd/src/vdif_send.cu +++ b/psrdada_cpp/effelsberg/edd/src/vdif_send.cu @@ -88,7 +88,7 @@ class VDIF_Sender if (block.used_bytes() == 0) { BOOST_LOG_TRIVIAL(info) << "Received empty block, exiting."; - return true; + return false; } boost::system::error_code err; VDIFHeaderView vdifHeader(reinterpret_cast<uint32_t*>(block.ptr())); @@ -102,6 +102,10 @@ class VDIF_Sender for(char* frame_start = block.ptr(); frame_start < block.ptr() + blockSize; frame_start += vdifHeader.getDataFrameLength() * 8) { vdifHeader.setDataLocation(reinterpret_cast<uint32_t*>(frame_start)); + // skip invalid blocks + if (!vdifHeader.isValid()) + continue; + uint32_t frameLength = vdifHeader.getDataFrameLength() * 8; // in units of 8 bytes socket.send_to(boost::asio::buffer(frame_start, frameLength), remote_endpoint, 0, err); -- GitLab