Commit 33acdb93 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Add second+frame info to header

parent 3fc61db9
......@@ -60,38 +60,38 @@ class VDIFHeader
void setInvalid();
void setValid();
bool isValid();
bool isValid() const;
void setSecondsFromReferenceEpoch(uint32_t value);
uint32_t getSecondsFromReferenceEpoch();
uint32_t getSecondsFromReferenceEpoch() const;
void setReferenceEpoch(uint32_t value);
uint32_t getReferenceEpoch();
uint32_t getReferenceEpoch() const;
void setDataFrameNumber(uint32_t value);
uint32_t getDataFrameNumber();
uint32_t getDataFrameNumber() const;
void setDataFrameLength(uint32_t value);
uint32_t getDataFrameLength();
uint32_t getDataFrameLength() const;
uint32_t getVersionNumber();
uint32_t getVersionNumber() const;
void setNumberOfChannels(uint32_t value);
uint32_t getNumberOfChannels();
uint32_t getNumberOfChannels() const;
bool isRealDataType();
bool isComplexDataType();
bool isRealDataType() const;
bool isComplexDataType() const;
void setComplexDataType();
void setRealDataType();
void setBitsPerSample(uint32_t value);
uint32_t getBitsPerSample();
uint32_t getBitsPerSample() const;
void setThreadId(uint32_t value);
uint32_t getThreadId();
uint32_t getThreadId() const;
void setStationId(uint32_t value);
uint32_t getStationId();
uint32_t getStationId() const;
};
......@@ -110,13 +110,14 @@ public:
* @param buffer_bytes A RawBytes object wrapping a DADA header buffer
* @param speadHeapSize Size of the spead heap block.
* @param input_bitDepth Bit depth of the sampled signal.
* @param outputBlockSize size of the output VDIF payload in bytes.
* @param VDIFHeader Header of the VDIF output to be sed. Must contain size of the output VDIF payload in bytes.
* @param handler Output handler
*
*/
VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
std::size_t speadHeapSize,
std::size_t outputBlockSize,
double sampleRate,
const VDIFHeader &vdifHeader,
HandlerType &handler);
~VLBI();
......@@ -148,6 +149,7 @@ private:
std::size_t _output_bitDepth;
std::size_t _speadHeapSize;
std::size_t _outputBlockSize;
double _sampleRate;
HandlerType &_handler;
int _call_count;
......
......@@ -7,8 +7,8 @@
#include <cuda_profiler_api.h>
#include <thrust/system/cuda/execution_policy.h>
#include <iostream>
#include <cstring>
#include <iostream>
#include <sstream>
namespace psrdada_cpp {
......@@ -17,27 +17,27 @@ namespace edd {
template <class HandlerType>
VLBI<HandlerType>::VLBI(
std::size_t buffer_bytes,
std::size_t input_bitDepth,
std::size_t speadHeapSize,
std::size_t outputBlockSize,
HandlerType &handler)
: _buffer_bytes(buffer_bytes),
_input_bitDepth(input_bitDepth),
_outputBlockSize(outputBlockSize),
_output_bitDepth(2),
_speadHeapSize(speadHeapSize),
_handler(handler),
_call_count(0) {
VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
std::size_t speadHeapSize,
double sampleRate,
const VDIFHeader &vdifHeader,
HandlerType &handler)
: _buffer_bytes(buffer_bytes), _input_bitDepth(input_bitDepth),
_sampleRate(sampleRate), _vdifHeader(vdifHeader), _output_bitDepth(2),
_speadHeapSize(speadHeapSize), _handler(handler), _call_count(0) {
// Sanity checks
// check for any device errors
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(info) << "Creating new VLBI instance";
BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with " << vlbiHeaderSize << "bytes header info and " << outputBlockSize << " bytes payload";
BOOST_LOG_TRIVIAL(debug) << " Expecting speadheaps of size " << speadHeapSize << " byte";
BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with "
<< vlbiHeaderSize << "bytes header info and "
<< _vdifHeader.getDataFrameLength() << " bytes payload";
BOOST_LOG_TRIVIAL(debug) << " Expecting speadheaps of size " << speadHeapSize
<< " byte";
BOOST_LOG_TRIVIAL(debug) << " Sample rate " << _sampleRate << " Hz";
std::size_t n64bit_words = _buffer_bytes / sizeof(uint64_t);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
......@@ -48,7 +48,8 @@ VLBI<HandlerType>::VLBI(
_packed_voltage.resize(n64bit_words * 64 / input_bitDepth / 4);
_spillOver.reserve(5000);
BOOST_LOG_TRIVIAL(debug) << " Output voltages size: " << _packed_voltage.size() << " byte";
BOOST_LOG_TRIVIAL(debug) << " Output voltages size: "
<< _packed_voltage.size() << " byte";
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
......@@ -59,19 +60,10 @@ VLBI<HandlerType>::VLBI(
_vdifHeader.setBitsPerSample(2);
_vdifHeader.setNumberOfChannels(1);
_vdifHeader.setRealDataType();
//_vdifHeader.setThreadId(threadId);
//_vdifHeader.setStationId(stationId);
_vdifHeader.setDataFrameLength(outputBlockSize);
//_vdifHeader.setReferenceEpoch(referenceEpoch);
//_vdifHeader.setSecondsFromReferenceEpoch(secondsFromReferenceEpoch_sync);
} // constructor
template <class HandlerType>
VLBI<HandlerType>::~VLBI() {
template <class HandlerType> VLBI<HandlerType>::~VLBI() {
BOOST_LOG_TRIVIAL(debug) << "Destroying VLBI";
cudaStreamDestroy(_h2d_stream);
cudaStreamDestroy(_proc_stream);
......@@ -79,23 +71,21 @@ VLBI<HandlerType>::~VLBI() {
}
template <class HandlerType>
void VLBI<HandlerType>::init(RawBytes &block) {
template <class HandlerType> void VLBI<HandlerType>::init(RawBytes &block) {
BOOST_LOG_TRIVIAL(debug) << "VLBI init called";
std::stringstream headerInfo;
headerInfo << "\n" << "# VLBI parameters: \n";
headerInfo << "\n"
<< "# VLBI parameters: \n";
size_t bEnd = std::strlen(block.ptr());
if (bEnd + headerInfo.str().size() < block.total_bytes())
{
if (bEnd + headerInfo.str().size() < block.total_bytes()) {
std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
}
else
{
BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
<< " bytes already contains " << bEnd
<< "bytes. Cannot add gated spectrometer info of size "
<< headerInfo.str().size() << " bytes.";
} else {
BOOST_LOG_TRIVIAL(warning)
<< "Header of size " << block.total_bytes()
<< " bytes already contains " << bEnd
<< "bytes. Cannot add gated spectrometer info of size "
<< headerInfo.str().size() << " bytes.";
}
_handler.init(block);
......@@ -105,8 +95,8 @@ void VLBI<HandlerType>::init(RawBytes &block) {
template <class HandlerType>
bool VLBI<HandlerType>::operator()(RawBytes &block) {
++_call_count;
BOOST_LOG_TRIVIAL(debug) << "VLBI operator() called (count = "
<< _call_count << ")";
BOOST_LOG_TRIVIAL(debug) << "VLBI operator() called (count = " << _call_count
<< ")";
if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
<< block.used_bytes() << " byte, expected "
......@@ -121,7 +111,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
_raw_voltage_db.swap();
BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes()
<< ", dataBlockBytes = " << _buffer_bytes<< "\n";
<< ", dataBlockBytes = " << _buffer_bytes << "\n";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
static_cast<void *>(block.ptr()),
......@@ -164,65 +154,91 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
size_t remainingBytes = _outputBlockSize - _spillOver.size();
size_t numberOfBlocksInOutput = (_packed_voltage.size() - remainingBytes) / _outputBlockSize;
BOOST_LOG_TRIVIAL(debug) << " Number of blocks in output" << numberOfBlocksInOutput;
const size_t outputBlockSize = _vdifHeader.getDataFrameLength();
size_t remainingBytes = outputBlockSize - _spillOver.size();
size_t numberOfBlocksInOutput =
(_packed_voltage.size() - remainingBytes) / outputBlockSize;
BOOST_LOG_TRIVIAL(debug) << " Number of blocks in output"
<< numberOfBlocksInOutput;
_outputBuffer.a().resize((1+numberOfBlocksInOutput) * (_outputBlockSize + vlbiHeaderSize));
_outputBuffer.a().resize((1 + numberOfBlocksInOutput) *
(outputBlockSize + vlbiHeaderSize));
BOOST_LOG_TRIVIAL(debug) << " Copying " << _spillOver.size() << " bytes spill over";
BOOST_LOG_TRIVIAL(debug) << " Copying " << _spillOver.size()
<< " bytes spill over";
// leave room for header and fill first block of output with spill over
std::copy(_spillOver.begin(), _spillOver.end(), _outputBuffer.a().begin() + vlbiHeaderSize);
std::copy(_spillOver.begin(), _spillOver.end(),
_outputBuffer.a().begin() + vlbiHeaderSize);
BOOST_LOG_TRIVIAL(debug) << " Copying remaining " << remainingBytes << " bytes for first block";
BOOST_LOG_TRIVIAL(debug) << " Copying remaining " << remainingBytes
<< " bytes for first block";
// cuda memcopy remainder of first block
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_packed_voltage.a_ptr()),
static_cast<void *>(_outputBuffer.a_ptr()),
remainingBytes,
cudaMemcpyDeviceToHost, _d2h_stream));
static_cast<void *>(_outputBuffer.a_ptr()),
remainingBytes, cudaMemcpyDeviceToHost,
_d2h_stream));
const size_t dpitch = _outputBlockSize + vlbiHeaderSize;
const size_t spitch = _outputBlockSize;
const size_t width = _outputBlockSize;
const size_t dpitch = outputBlockSize + vlbiHeaderSize;
const size_t spitch = outputBlockSize;
const size_t width = outputBlockSize;
size_t height = numberOfBlocksInOutput;
BOOST_LOG_TRIVIAL(debug) << " Copying " << numberOfBlocksInOutput << " blocks a " << _outputBlockSize << " bytes";
BOOST_LOG_TRIVIAL(debug) << " Copying " << numberOfBlocksInOutput
<< " blocks a " << outputBlockSize << " bytes";
// we now have a full first block, pitch copy rest leaving room for the header
CUDA_ERROR_CHECK(cudaMemcpy2DAsync((void*) (_outputBuffer.a_ptr()+ _outputBlockSize + 2 * vlbiHeaderSize) , dpitch, (void*) thrust::raw_pointer_cast(_packed_voltage.a_ptr() + remainingBytes), spitch, width, height, cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(cudaMemcpy2DAsync(
(void *)(_outputBuffer.a_ptr() + outputBlockSize + 2 * vlbiHeaderSize),
dpitch, (void *)thrust::raw_pointer_cast(_packed_voltage.a_ptr() +
remainingBytes),
spitch, width, height, cudaMemcpyDeviceToHost, _d2h_stream));
// new spill over
_spillOver.resize(_packed_voltage.size() - remainingBytes - numberOfBlocksInOutput * _outputBlockSize);
_spillOver.resize(_packed_voltage.size() - remainingBytes -
numberOfBlocksInOutput * outputBlockSize);
size_t offset = numberOfBlocksInOutput * _outputBlockSize + remainingBytes;
BOOST_LOG_TRIVIAL(debug) << " New spill over size " << _spillOver.size() << " bytes with offset " << offset;
size_t offset = numberOfBlocksInOutput * outputBlockSize + remainingBytes;
BOOST_LOG_TRIVIAL(debug) << " New spill over size " << _spillOver.size()
<< " bytes with offset " << offset;
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_packed_voltage.a_ptr() + offset),
static_cast<void *>( thrust::raw_pointer_cast(_spillOver.data()) ),
_spillOver.size(),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void *>(_packed_voltage.a_ptr() + offset),
static_cast<void *>(thrust::raw_pointer_cast(_spillOver.data())),
_spillOver.size(), cudaMemcpyDeviceToHost, _d2h_stream));
// fill in header data
for (size_t i = 0; i < numberOfBlocksInOutput + 1; i ++)
{
_vdifHeader.setDataFrameNumber(i); // ToDo: Use correct number
_vdifHeader.setSecondsFromReferenceEpoch(_call_count); // ToDo use correct number
const size_t samplesPerDataFrame = outputBlockSize * 8 / _output_bitDepth;
const size_t dataFramesPerSecond = _sampleRate / samplesPerDataFrame;
std::copy(reinterpret_cast<uint8_t* >(_vdifHeader.getData()),
reinterpret_cast<uint8_t* >(_vdifHeader.getData()) + vlbiHeaderSize,
_outputBuffer.a().begin() + i * (_outputBlockSize + vlbiHeaderSize));
for (size_t i = 0; i < numberOfBlocksInOutput + 1; i++)
{
// copy header to correct position
std::copy(reinterpret_cast<uint8_t *>(_vdifHeader.getData()),
reinterpret_cast<uint8_t *>(_vdifHeader.getData()) + vlbiHeaderSize,
_outputBuffer.a().begin() + i * (outputBlockSize + vlbiHeaderSize));
// update header
size_t dataFrame = _vdifHeader.getDataFrameNumber();
if (dataFrame < dataFramesPerSecond)
{
_vdifHeader.setDataFrameNumber(dataFrame + 1);
}
else
{
_vdifHeader.setDataFrameNumber(0);
_vdifHeader.setSecondsFromReferenceEpoch(_vdifHeader.getSecondsFromReferenceEpoch() + 1);
}
}
if (_call_count == 3) {
return false;
}
// Wrap in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char *>(_outputBuffer.b_ptr()),
_outputBuffer.b().size(),
_outputBuffer.b().size());
BOOST_LOG_TRIVIAL(debug) << "Calling handler, processing " << _outputBuffer.b().size() << " bytes";
_outputBuffer.b().size(), _outputBuffer.b().size());
BOOST_LOG_TRIVIAL(debug) << "Calling handler, processing "
<< _outputBuffer.b().size() << " bytes";
// The handler can't do anything asynchronously without a copy here
// as it would be unsafe (given that it does not own the memory it
// is being passed).
......
......@@ -165,7 +165,7 @@ void VDIFHeader::setValid()
setBitsWithValue(data[0], 31, 31, 0);
}
bool VDIFHeader::isValid()
bool VDIFHeader::isValid() const
{
return (getBitsValue(data[0], 31, 31) == 0);
}
......@@ -175,7 +175,7 @@ void VDIFHeader::setSecondsFromReferenceEpoch(uint32_t value)
setBitsWithValue(data[0], 0, 29, value);
}
uint32_t VDIFHeader::getSecondsFromReferenceEpoch()
uint32_t VDIFHeader::getSecondsFromReferenceEpoch() const
{
return getBitsValue(data[0], 0, 29);
}
......@@ -185,7 +185,7 @@ void VDIFHeader::setReferenceEpoch(uint32_t value)
setBitsWithValue(data[1], 24, 29, value);
}
uint32_t VDIFHeader::getReferenceEpoch()
uint32_t VDIFHeader::getReferenceEpoch() const
{
return getBitsValue(data[1], 24, 29);
}
......@@ -195,7 +195,7 @@ void VDIFHeader::setDataFrameNumber(uint32_t value)
setBitsWithValue(data[1], 0, 23, value);
}
uint32_t VDIFHeader::getDataFrameNumber()
uint32_t VDIFHeader::getDataFrameNumber() const
{
return getBitsValue(data[1], 0, 23);
}
......@@ -205,12 +205,12 @@ void VDIFHeader::setDataFrameLength(uint32_t value)
setBitsWithValue(data[2], 0, 23, value);
}
uint32_t VDIFHeader::getDataFrameLength()
uint32_t VDIFHeader::getDataFrameLength() const
{
return getBitsValue(data[2], 0, 23);
}
uint32_t VDIFHeader::getVersionNumber()
uint32_t VDIFHeader::getVersionNumber() const
{
return getBitsValue(data[2], 29, 31);
}
......@@ -220,17 +220,17 @@ void VDIFHeader::setNumberOfChannels(uint32_t value)
setBitsWithValue(data[2], 24, 28, value);
}
uint32_t VDIFHeader::getNumberOfChannels()
uint32_t VDIFHeader::getNumberOfChannels() const
{
return getBitsValue(data[2], 24, 28);
}
bool VDIFHeader::isRealDataType()
bool VDIFHeader::isRealDataType() const
{
return (getBitsValue(data[3], 31, 31) == 0);
}
bool VDIFHeader::isComplexDataType()
bool VDIFHeader::isComplexDataType() const
{
return (getBitsValue(data[3], 31, 31) == 1);
}
......@@ -250,7 +250,7 @@ void VDIFHeader::setBitsPerSample(uint32_t value)
setBitsWithValue(data[3], 26, 30, value);
}
uint32_t VDIFHeader::getBitsPerSample()
uint32_t VDIFHeader::getBitsPerSample() const
{
return getBitsValue(data[3], 26, 30);
}
......@@ -260,7 +260,7 @@ void VDIFHeader::setThreadId(uint32_t value)
setBitsWithValue(data[3], 16, 25, value);
}
uint32_t VDIFHeader::getThreadId()
uint32_t VDIFHeader::getThreadId() const
{
return getBitsValue(data[3], 16, 25);
}
......@@ -270,7 +270,7 @@ void VDIFHeader::setStationId(uint32_t value)
setBitsWithValue(data[3], 0, 15, value);
}
uint32_t VDIFHeader::getStationId()
uint32_t VDIFHeader::getStationId() const
{
return getBitsValue(data[3], 0, 15);
}
......
......@@ -103,13 +103,22 @@ int main(int argc, char **argv) {
DadaClientBase client(input_key, log);
std::size_t buffer_bytes = client.data_buffer_size();
// ToDo: Options to set values
effelsberg::edd::VDIFHeader vdifHeader;
vdifHeader.setThreadId(0);
vdifHeader.setStationId(0);
vdifHeader.setReferenceEpoch(123);
vdifHeader.setSecondsFromReferenceEpoch(42); // for first block
double sampleRate = 2.6E9;
std::cout << "Running with output_type: " << output_type << std::endl;
if (output_type == "file")
{
SimpleFileWriter sink(filename);
effelsberg::edd::VLBI<decltype(sink)> vlbi(
buffer_bytes, nbits,
speadHeapSize, 5000, sink);
speadHeapSize, sampleRate, vdifHeader, sink);
DadaInputStream<decltype(vlbi)> istream(input_key, log, vlbi);
istream.start();
......@@ -119,7 +128,7 @@ int main(int argc, char **argv) {
DadaOutputStream sink(string_to_key(filename), log);
effelsberg::edd::VLBI<decltype(sink)> vlbi(
buffer_bytes, nbits,
speadHeapSize, 5000, sink);
speadHeapSize, sampleRate, vdifHeader, sink);
DadaInputStream<decltype(vlbi)> istream(input_key, log, vlbi);
istream.start();
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment