diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000000000000000000000000000000000000..a7178595084f562faeb953a3b661c87d7c4e9878 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,7 @@ +root = true + +[*] +indent_style = space +indent_size = 4 +trim_trailing_whitespace=true + diff --git a/psrdada_cpp/effelsberg/edd/VLBI.cuh b/psrdada_cpp/effelsberg/edd/VLBI.cuh index 9f051f85da2dcb2c3b1db3d85aa1d6bbeac8d172..5b44c658c3f665d76b4f4bb17e094ed39f311df6 100644 --- a/psrdada_cpp/effelsberg/edd/VLBI.cuh +++ b/psrdada_cpp/effelsberg/edd/VLBI.cuh @@ -18,7 +18,7 @@ namespace edd { const size_t vlbiHeaderSize = 8 * 32 / 8; // bytes [8 words a 32 bit] -/// class VDIFHeaderView provides interprets a data block as VDIF compliant header +/// class VDIFHeaderView provides interprets a data block as VDIF compliant header /// See https://vlbi.org/vlbi-standards/vdif/specification 1.1.1 from June 2014 for details. class VDIFHeaderView { @@ -29,21 +29,20 @@ class VDIFHeaderView void setDataLocation(const uint32_t* _data); const uint32_t* getDataLocation() const; uint32_t getVersionNumber() const; - bool isValid() const; - uint32_t getSecondsFromReferenceEpoch() const; - uint32_t getReferenceEpoch() const; + bool isValid() const; + uint32_t getSecondsFromReferenceEpoch() const; + uint32_t getReferenceEpoch() const; size_t getTimestamp() const; - uint32_t getDataFrameNumber() const; - // Length of the data frame including the header, in units of 8 bytes - uint32_t getDataFrameLength() const; - uint32_t getNumberOfChannels() const; + uint32_t getDataFrameNumber() const; + // Length of the data frame including the header, in units of 8 bytes + uint32_t getDataFrameLength() const; + uint32_t getNumberOfChannels() const; bool isRealDataType() const; - bool isComplexDataType() const; - // Number of bits per sample -1 (max 32/bits per sample) - uint32_t getBitsPerSample() const; - uint32_t getThreadId() const; - uint32_t getStationId() const; - + bool isComplexDataType() const; + // Number of bits per sample -1 (max 32/bits per sample) + uint32_t getBitsPerSample() const; + uint32_t getThreadId() const; + uint32_t getStationId() const; }; @@ -52,33 +51,33 @@ class VDIFHeaderView /// specification 1.1.1 from June 2014 for details. class VDIFHeader : public VDIFHeaderView { - private: - uint32_t data[8]; + private: + uint32_t data[8]; - public: - VDIFHeader(); - VDIFHeader(const VDIFHeader &v); - VDIFHeader& operator=(const VDIFHeader& other); + public: + VDIFHeader(); + VDIFHeader(const VDIFHeader &v); + VDIFHeader& operator=(const VDIFHeader& other); // return pointer to the data block for low level manipulation - uint32_t* getData(); - void setInvalid(); - void setValid(); - void setSecondsFromReferenceEpoch(uint32_t value); - void setReferenceEpoch(uint32_t value); + uint32_t* getData(); + void setInvalid(); + void setValid(); + void setSecondsFromReferenceEpoch(uint32_t value); + void setReferenceEpoch(uint32_t value); /// set reference epoch and seconds from reference epoch from POSIX time /// stamp void setTimeReferencesFromTimestamp(size_t); /// converts time reference data to POSIX time - void setDataFrameNumber(uint32_t value); - void setDataFrameLength(uint32_t value); - void setNumberOfChannels(uint32_t value); - void setComplexDataType(); - void setRealDataType(); - void setBitsPerSample(uint32_t value); - void setThreadId(uint32_t value); - void setStationId(uint32_t value); + void setDataFrameNumber(uint32_t value); + void setDataFrameLength(uint32_t value); + void setNumberOfChannels(uint32_t value); + void setComplexDataType(); + void setRealDataType(); + void setBitsPerSample(uint32_t value); + void setThreadId(uint32_t value); + void setStationId(uint32_t value); }; diff --git a/psrdada_cpp/effelsberg/edd/detail/VLBI.cu b/psrdada_cpp/effelsberg/edd/detail/VLBI.cu index e46aa816ba10880eb5afa83723cb5ce6fd666801..054972ff3cddf034daf8520b6d8c5b5c9db05610 100644 --- a/psrdada_cpp/effelsberg/edd/detail/VLBI.cu +++ b/psrdada_cpp/effelsberg/edd/detail/VLBI.cu @@ -37,14 +37,13 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth, BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with " << vlbiHeaderSize << "bytes header info and " << _vdifHeader.getDataFrameLength() * 8 - << " bytes data frame length"; + << " bytes total data frame length"; BOOST_LOG_TRIVIAL(debug) << " Expecting speadheaps of size " << speadHeapSize << " byte"; - BOOST_LOG_TRIVIAL(debug) << " Sample rate " << _sampleRate << " Hz"; std::size_t n64bit_words = _buffer_bytes / sizeof(uint64_t); - BOOST_LOG_TRIVIAL(debug) << "Allocating memory"; + BOOST_LOG_TRIVIAL(debug) << "Allocating memory:"; _raw_voltage_db.resize(n64bit_words); BOOST_LOG_TRIVIAL(debug) << " Input voltages size : " << _raw_voltage_db.size() << " 64-bit words," @@ -56,17 +55,15 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth, << _packed_voltage.size() << " byte"; _spillOver.reserve(vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize); - // number of vlbi frames per input block - size_t nSamplesPerInputBlock = _packed_voltage.size() * 8 / _output_bitDepth; - size_t frames_per_block = _packed_voltage.size() / (vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize); - BOOST_LOG_TRIVIAL(debug) << " this correspoonds to " << frames_per_block << " - " << frames_per_block + 1 << " frames"; + // number of vlbi frames per input block + size_t nSamplesPerInputBlock = _packed_voltage.size() * 8 / _output_bitDepth; + size_t frames_per_block = _packed_voltage.size() / (vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize); + BOOST_LOG_TRIVIAL(debug) << " this correspoonds to " << frames_per_block << " - " << frames_per_block + 1 << " frames."; - _outputBuffer.resize((frames_per_block+1) * vdifHeader.getDataFrameLength() * 8 ); - // potetnitally invalidating the last frame + _outputBuffer.resize((frames_per_block+1) * vdifHeader.getDataFrameLength() * 8 ); + // potetnitally invalidating the last frame BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with " << _outputBuffer.size() << " bytes per buffer"; - - CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream)); CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream)); @@ -145,7 +142,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream)); _raw_voltage_db.swap(); - BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes() + BOOST_LOG_TRIVIAL(debug) << " - block.used_bytes() = " << block.used_bytes() << ", dataBlockBytes = " << _buffer_bytes << "\n"; CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()), @@ -159,7 +156,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { // Process data _packed_voltage.swap(); - BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages"; + BOOST_LOG_TRIVIAL(debug) << " - Unpacking raw voltages"; switch (_input_bitDepth) { case 8: _unpacker->unpack<8>(_raw_voltage_db.b(), _unpacked_voltage); @@ -172,7 +169,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { } - BOOST_LOG_TRIVIAL(debug) << "Calculate baseline"; + BOOST_LOG_TRIVIAL(debug) << " - Calculate baseline"; psrdada_cpp::effelsberg::edd:: array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>( thrust::raw_pointer_cast(_unpacked_voltage.data()), @@ -183,7 +180,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data())); - BOOST_LOG_TRIVIAL(debug) << "Calculate std-dev"; + BOOST_LOG_TRIVIAL(debug) << " - Calculate std-dev"; psrdada_cpp::effelsberg::edd:: scaled_square_residual_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>( thrust::raw_pointer_cast(_unpacked_voltage.data()), @@ -196,13 +193,13 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { // non linear packing - BOOST_LOG_TRIVIAL(debug) << "Packing data with non linear 2-bit packaging " + BOOST_LOG_TRIVIAL(debug) << " - Packing data with non linear 2-bit packaging " "using levels -v*sigma, 0, v*sigma with v = " << _digitizer_threshold; _packed_voltage.b().resize(_unpacked_voltage.size() * 2 / 8); - BOOST_LOG_TRIVIAL(debug) << "Input size: " << _unpacked_voltage.size() + BOOST_LOG_TRIVIAL(debug) << " - Input size: " << _unpacked_voltage.size() << " elements"; - BOOST_LOG_TRIVIAL(debug) << "Resizing output buffer to " + BOOST_LOG_TRIVIAL(debug) << " - Resizing output buffer to " << _packed_voltage.b().size() << " byte"; pack2bit_nonLinear<<<128, 1024, 0, _proc_stream>>>( @@ -213,7 +210,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { thrust::raw_pointer_cast(_baseLineN.data())); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); - BOOST_LOG_TRIVIAL(trace) << " Standard Deviation squared: " << _stdDevN[0] + BOOST_LOG_TRIVIAL(trace) << " - Standard Deviation squared: " << _stdDevN[0] << " " << "Mean Value: " << _baseLineN[0] / _unpacked_voltage.size(); @@ -224,58 +221,49 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { _outputBuffer.swap(); //////////////////////////////////////////////////////////////////////// - BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host"; + BOOST_LOG_TRIVIAL(debug) << " - Copy Data back to host"; CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); const size_t outputBlockSize = _vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize; - const size_t totalSizeOfData = _packed_voltage.size() + _spillOver.size(); // current array + remaining of previous - size_t numberOfBlocksInOutput = totalSizeOfData / outputBlockSize; - size_t remainingBytes = outputBlockSize - _spillOver.size(); - BOOST_LOG_TRIVIAL(debug) << " Number of blocks in output " + BOOST_LOG_TRIVIAL(debug) << " - Number of blocks in output " << numberOfBlocksInOutput; - //_outputBuffer.a().resize(numberOfBlocksInOutput * - // (outputBlockSize + vlbiHeaderSize)); - - BOOST_LOG_TRIVIAL(debug) << " Copying " << _spillOver.size() + // First copy spill-over from last block for first frame, leaving room for + // header of course + BOOST_LOG_TRIVIAL(debug) << " - Copying " << _spillOver.size() << " bytes spill over"; - // leave room for header and fill first block of output with spill over std::copy(_spillOver.begin(), _spillOver.end(), _outputBuffer.a().begin() + vlbiHeaderSize); - BOOST_LOG_TRIVIAL(debug) << " Copying remaining " << remainingBytes - << " bytes for first block"; // cuda memcopy remainder of first block + BOOST_LOG_TRIVIAL(debug) << " - Copying remaining " << remainingBytes + << " bytes for first block"; CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_outputBuffer.a_ptr() + vlbiHeaderSize + _spillOver.size()), static_cast<void *>(_packed_voltage.a_ptr()), remainingBytes, cudaMemcpyDeviceToHost, _d2h_stream)); + // cuda memcopy rest with pitch, leaving room for the header const size_t dpitch = outputBlockSize + vlbiHeaderSize; const size_t spitch = outputBlockSize; const size_t width = outputBlockSize; size_t height = numberOfBlocksInOutput-1; - - BOOST_LOG_TRIVIAL(debug) << " Copying " << height - << " blocks a " << outputBlockSize << " bytes"; - // we now have a full first block, pitch copy rest leaving room for the header + BOOST_LOG_TRIVIAL(debug) << " - Copying " << height + << " blocks of " << outputBlockSize << " bytes"; CUDA_ERROR_CHECK(cudaMemcpy2DAsync( (void *)(_outputBuffer.a_ptr() + outputBlockSize + 2 * vlbiHeaderSize), dpitch, (void *)thrust::raw_pointer_cast(_packed_voltage.a_ptr() + remainingBytes), spitch, width, height, cudaMemcpyDeviceToHost, _d2h_stream)); - - // new spill over + // copy remaing part of device to spill over _spillOver.resize(totalSizeOfData - numberOfBlocksInOutput * outputBlockSize); - size_t offset = (numberOfBlocksInOutput-1) * outputBlockSize + remainingBytes; - BOOST_LOG_TRIVIAL(debug) << " New spill over size " << _spillOver.size() + BOOST_LOG_TRIVIAL(debug) << " - New spill over size " << _spillOver.size() << " bytes with offset " << offset; - CUDA_ERROR_CHECK(cudaMemcpyAsync( static_cast<void *>(thrust::raw_pointer_cast(_spillOver.data())), static_cast<void *>(_packed_voltage.a_ptr() + offset), @@ -285,8 +273,8 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { const uint32_t samplesPerDataFrame = outputBlockSize * 8 / _output_bitDepth; const uint32_t dataFramesPerSecond = _sampleRate / samplesPerDataFrame; - BOOST_LOG_TRIVIAL(debug) << " Samples per data frame: " << samplesPerDataFrame; - BOOST_LOG_TRIVIAL(debug) << " Dataframes per second: " << dataFramesPerSecond; + BOOST_LOG_TRIVIAL(debug) << " Samples per data frame: " << samplesPerDataFrame; + BOOST_LOG_TRIVIAL(debug) << " Dataframes per second: " << dataFramesPerSecond; for (uint32_t ib = 0; ib < _outputBuffer.a().size(); ib += _vdifHeader.getDataFrameLength() * 8) { @@ -294,32 +282,32 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { std::copy(reinterpret_cast<uint8_t *>(_vdifHeader.getData()), reinterpret_cast<uint8_t *>(_vdifHeader.getData()) + vlbiHeaderSize, _outputBuffer.a().begin() + ib); - size_t i = ib / _vdifHeader.getDataFrameLength() / 8; - - // invalidate rest of data so it can be dropped later. - // Needed so that the outpuitbuffer can have always the same size - if (i < numberOfBlocksInOutput) - { - _vdifHeader.setValid(); - } - else - { - _vdifHeader.setInvalid(); - continue; - } + size_t i = ib / _vdifHeader.getDataFrameLength() / 8; - // update header - uint32_t dataFrame = _vdifHeader.getDataFrameNumber(); if (i < 5) - BOOST_LOG_TRIVIAL(debug) << i << " Dataframe Number: " << dataFrame; - if (dataFrame < dataFramesPerSecond) + BOOST_LOG_TRIVIAL(debug) << i << " Dataframe Number: " << _vdifHeader.getDataFrameNumber(); + + // invalidate rest of data so it can be dropped later. + // Needed so that the outpuitbuffer can have always the same size + if (i < numberOfBlocksInOutput) { - _vdifHeader.setDataFrameNumber(dataFrame + 1); + _vdifHeader.setValid(); } else + { + _vdifHeader.setInvalid(); + continue; + } + + // increase dataframenumber + _vdifHeader.setDataFrameNumber(_vdifHeader.getDataFrameNumber() + 1); + + // update header + if (_vdifHeader.getDataFrameNumber() >= dataFramesPerSecond) { _vdifHeader.setDataFrameNumber(0); _vdifHeader.setSecondsFromReferenceEpoch(_vdifHeader.getSecondsFromReferenceEpoch() + 1); + BOOST_LOG_TRIVIAL(debug) << i << " Beginning new second after epoch: " << _vdifHeader.getSecondsFromReferenceEpoch(); } } diff --git a/psrdada_cpp/effelsberg/edd/src/VLBI.cu b/psrdada_cpp/effelsberg/edd/src/VLBI.cu index 7199e73719a0ca9ef39978c75e8d6e7d1349fa0e..ce3c9cf14a47ce9fc15e4ea2e9fb712d2cadfaad 100644 --- a/psrdada_cpp/effelsberg/edd/src/VLBI.cu +++ b/psrdada_cpp/effelsberg/edd/src/VLBI.cu @@ -15,12 +15,13 @@ namespace edd { VDIFHeaderView::VDIFHeaderView(const uint32_t* data) : data(data) {}; + void VDIFHeaderView::setDataLocation(const uint32_t* _data) { data = _data; }; const uint32_t* VDIFHeaderView::getDataLocation() const { - return data; + return data; }; bool VDIFHeaderView::isValid() const { @@ -97,18 +98,18 @@ VDIFHeader::VDIFHeader() : VDIFHeaderView(data) VDIFHeader::VDIFHeader(const VDIFHeader &v): VDIFHeaderView(data) { - for (int i = 0; i < 8; i++) { + for (int i = 0; i < 8; i++) { data[i] = v.getDataLocation()[i]; } - setDataLocation(data); + setDataLocation(data); } VDIFHeader& VDIFHeader::operator=(const VDIFHeader& other) { - for (int i = 0; i < 8; i++) { + for (int i = 0; i < 8; i++) { data[i] = other.getDataLocation()[i]; } - return *this; + return *this; } uint32_t *VDIFHeader::getData() { return data; } diff --git a/psrdada_cpp/effelsberg/edd/src/vdif_send.cu b/psrdada_cpp/effelsberg/edd/src/vdif_send.cu index f699c4f9248ebffe0087ae3af463ea8f6bc57c01..9f34163a6e43823618aa5173d986cdb9b08ba815 100644 --- a/psrdada_cpp/effelsberg/edd/src/vdif_send.cu +++ b/psrdada_cpp/effelsberg/edd/src/vdif_send.cu @@ -34,6 +34,9 @@ class VDIF_Sender int port; double max_rate; + uint32_t currentSecondFromReferenceEpoch; + size_t noOfSendFrames; // frames in last second + boost::asio::ip::udp::socket socket; boost::asio::ip::udp::endpoint remote_endpoint; @@ -48,7 +51,7 @@ class VDIF_Sender * communication. */ VDIF_Sender(const std::string &source_ip, const std::string &destination_ip, int port, double max_rate, boost::asio::io_service& - io_service): socket(io_service), source_ip(source_ip), destination_ip(destination_ip), port(port), max_rate(max_rate) + io_service): socket(io_service), source_ip(source_ip), destination_ip(destination_ip), port(port), max_rate(max_rate), currentSecondFromReferenceEpoch(0), noOfSendFrames(0) { } @@ -97,18 +100,31 @@ class VDIF_Sender BOOST_LOG_TRIVIAL(debug) << " Length of first frame: " << vdifHeader.getDataFrameLength() * 8 << " bytes"; size_t counter = 0; + size_t invalidFrames = 0; auto start = std::chrono::high_resolution_clock::now(); for(char* frame_start = block.ptr(); frame_start < block.ptr() + blockSize; frame_start += vdifHeader.getDataFrameLength() * 8) { vdifHeader.setDataLocation(reinterpret_cast<uint32_t*>(frame_start)); - // skip invalid blocks - if (!vdifHeader.isValid()) - continue; + // skip invalid blocks + if (!vdifHeader.isValid()) + { + invalidFrames++; + continue; + } + if (vdifHeader.getSecondsFromReferenceEpoch() > currentSecondFromReferenceEpoch) + { + BOOST_LOG_TRIVIAL(info) << " New second frome reference epoch: " << vdifHeader.getSecondsFromReferenceEpoch() << ", send " << noOfSendFrames << " in previous second."; + + BOOST_LOG_TRIVIAL(debug) << " Previous second from refEpoch " << currentSecondFromReferenceEpoch << " delta = " << vdifHeader.getSecondsFromReferenceEpoch() - currentSecondFromReferenceEpoch; + currentSecondFromReferenceEpoch = vdifHeader.getSecondsFromReferenceEpoch(); + noOfSendFrames = 0; + } uint32_t frameLength = vdifHeader.getDataFrameLength() * 8; // in units of 8 bytes socket.send_to(boost::asio::buffer(frame_start, frameLength), remote_endpoint, 0, err); + noOfSendFrames++; counter++; size_t processed_bytes = (frame_start - block.ptr()) + frameLength; @@ -124,11 +140,11 @@ class VDIF_Sender //BOOST_LOG_TRIVIAL(debug) << counter << " Set delay to " << delay.count()<< " ns. Current rate " << current_rate << ", processed_bytes: " << processed_bytes; } - if (counter < 5) - BOOST_LOG_TRIVIAL(debug) << counter << " Send - FN: " << vdifHeader.getDataFrameNumber() << ", Sec f. E.: " << vdifHeader.getSecondsFromReferenceEpoch() << " Get TS.: " << vdifHeader.getTimestamp(); + if (counter < 5) + BOOST_LOG_TRIVIAL(debug) << counter << " Send - FN: " << vdifHeader.getDataFrameNumber() << ", Sec f. E.: " << vdifHeader.getSecondsFromReferenceEpoch() << " Get TS.: " << vdifHeader.getTimestamp(); } - BOOST_LOG_TRIVIAL(info) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size."; + BOOST_LOG_TRIVIAL(debug) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size. " << invalidFrames << " invalid frames in block."; return false; } };