Commit 8b9130e0 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Added skeleteon implementation for VLBI 2 bit pack product and VDIF header implementation

parent 133d4984
......@@ -28,5 +28,11 @@ cuda_add_executable(gated_spectrometer src/GatedSpectrometer_cli.cu)
target_link_libraries(gated_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas)
install(TARGETS gated_spectrometer DESTINATION bin)
cuda_add_executable(VLBI src/VLBI_cli.cu)
target_link_libraries(VLBI ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas)
install(TARGETS VLBI DESTINATION bin)
add_subdirectory(test)
endif(ENABLE_CUDA)
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_VLBI_CUH
#define PSRDADA_CPP_EFFELSBERG_EDD_VLBI_CUH
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include <thrust/device_vector.h>
......@@ -12,16 +18,164 @@ namespace kernels {
__global__
void pack_edd_float32_to_2bit(float* __restrict__ in, uint32_t * __restrict__ out, size_t n);
void pack_edd_float32_to_2bit(const float* __restrict__ in, uint32_t * __restrict__ out, size_t n);
} //namespace kernels
void pack_2bit(thrust::device_vector<float> const& input, thrust::device_vector<uint32_t>& output, float minV, float maxV, cudaStream_t _stream = 0);
void pack_2bit(thrust::device_vector<float> const& input, thrust::device_vector<uint8_t>& output, float minV, float maxV, cudaStream_t _stream = 0);
// some helper functions to dealm with bit encoding of the header
// ToDo: Find better place in utility collection
/// Create bit mask with ones between first and lastBit (inclusive) and zeros
/// otherwise;
uint32_t bitMask(uint32_t firstBit, uint32_t lastBit);
/// Squeeze a value into the specified bitrange of the target. Throws runtime
/// error if too large.
void setBitsWithValue(uint32_t &target, uint32_t firstBit, uint32_t lastBit, uint32_t value);
/// Get numerical value from the specified bits in the target.
uint32_t getBitsValue(const uint32_t &target, uint32_t firstBit, uint32_t lastBit);
const size_t vlbiHeaderSize = 8 * 32 / 8; // bytes [8 words a 32 bit]
/// class VDIFHeader stores a VDIF compliant header block with conveniant
/// setters and getters. See https://vlbi.org/vlbi-standards/vdif/
/// specification 1.1.1 from June 2014 for details.
class VDIFHeader
{
private:
uint32_t data[8];
public:
VDIFHeader();
// return pointer to the data block for low level manipulation
uint32_t* getData();
void setInvalid();
void setValid();
bool isValid();
void setSecondsFromReferenceEpoch(uint32_t value);
uint32_t getSecondsFromReferenceEpoch();
void setReferenceEpoch(uint32_t value);
uint32_t getReferenceEpoch();
void setDataFrameNumber(uint32_t value);
uint32_t getDataFrameNumber();
void setDataFrameLength(uint32_t value);
uint32_t getDataFrameLength();
uint32_t getVersionNumber();
void setNumberOfChannels(uint32_t value);
uint32_t getNumberOfChannels();
bool isRealDataType();
bool isComplexDataType();
void setComplexDataType();
void setRealDataType();
void setBitsPerSample(uint32_t value);
uint32_t getBitsPerSample();
void setThreadId(uint32_t value);
uint32_t getThreadId();
void setStationId(uint32_t value);
uint32_t getStationId();
};
/**
@class VLBI
@brief Convert data to 2bit data in VDIF format.
*/
template <class HandlerType> class VLBI{
public:
typedef uint64_t RawVoltageType;
public:
/**
* @brief Constructor
*
* @param buffer_bytes A RawBytes object wrapping a DADA header buffer
* @param speadHeapSize Size of the spead heap block.
* @param input_bitDepth Bit depth of the sampled signal.
* @param outputBlockSize size of the output VDIF payload in bytes.
* @param handler Output handler
*
*/
VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
std::size_t speadHeapSize,
std::size_t outputBlockSize,
HandlerType &handler);
~VLBI();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes &block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer output
* are the integrated specttra with/without bit set.
*/
bool operator()(RawBytes &block);
private:
std::size_t _buffer_bytes;
std::size_t _input_bitDepth;
std::size_t _output_bitDepth;
std::size_t _speadHeapSize;
std::size_t _outputBlockSize;
HandlerType &_handler;
int _call_count;
std::unique_ptr<Unpacker> _unpacker;
// Input data
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
// process tmp
thrust::device_vector<float> _unpacked_voltage;
// Output data
DoubleDeviceBuffer<uint8_t> _packed_voltage;
DoublePinnedHostBuffer<uint8_t> _outputBuffer;
VDIFHeader _vdifHeader;
thrust::host_vector<uint8_t, thrust::system::cuda::experimental::pinned_allocator<uint8_t>> _spillOver;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
};
} //namespace edd
} //namespace effelsberg
} //namespace psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/detail/VLBI.cu"
#endif // PSRDADA_CPP_EFFELSBERG_EDD_VLBI_CUH
......
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"
//#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/cuda_utils.hpp"
#include <cuda.h>
#include <cuda_profiler_api.h>
#include <thrust/system/cuda/execution_policy.h>
#include <iostream>
#include <cstring>
#include <sstream>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
template <class HandlerType>
VLBI<HandlerType>::VLBI(
std::size_t buffer_bytes,
std::size_t input_bitDepth,
std::size_t speadHeapSize,
std::size_t outputBlockSize,
HandlerType &handler)
: _buffer_bytes(buffer_bytes),
_input_bitDepth(input_bitDepth),
_outputBlockSize(outputBlockSize),
_output_bitDepth(2),
_speadHeapSize(speadHeapSize),
_handler(handler),
_call_count(0) {
// Sanity checks
// check for any device errors
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(info) << "Creating new VLBI instance";
BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with " << vlbiHeaderSize << "bytes header info and " << outputBlockSize << " bytes payload";
BOOST_LOG_TRIVIAL(debug) << " Expecting speadheaps of size " << speadHeapSize << " byte";
std::size_t n64bit_words = _buffer_bytes / sizeof(uint64_t);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
_raw_voltage_db.resize(n64bit_words);
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): "
<< _raw_voltage_db.size();
_packed_voltage.resize(n64bit_words * 64 / input_bitDepth / 4);
_spillOver.reserve(5000);
BOOST_LOG_TRIVIAL(debug) << " Output voltages size: " << _packed_voltage.size() << " byte";
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
_unpacker.reset(new Unpacker(_proc_stream));
_vdifHeader.setBitsPerSample(2);
_vdifHeader.setNumberOfChannels(1);
_vdifHeader.setRealDataType();
//_vdifHeader.setThreadId(threadId);
//_vdifHeader.setStationId(stationId);
_vdifHeader.setDataFrameLength(outputBlockSize);
//_vdifHeader.setReferenceEpoch(referenceEpoch);
//_vdifHeader.setSecondsFromReferenceEpoch(secondsFromReferenceEpoch_sync);
} // constructor
template <class HandlerType>
VLBI<HandlerType>::~VLBI() {
BOOST_LOG_TRIVIAL(debug) << "Destroying VLBI";
cudaStreamDestroy(_h2d_stream);
cudaStreamDestroy(_proc_stream);
cudaStreamDestroy(_d2h_stream);
}
template <class HandlerType>
void VLBI<HandlerType>::init(RawBytes &block) {
BOOST_LOG_TRIVIAL(debug) << "VLBI init called";
std::stringstream headerInfo;
headerInfo << "\n" << "# VLBI parameters: \n";
size_t bEnd = std::strlen(block.ptr());
if (bEnd + headerInfo.str().size() < block.total_bytes())
{
std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
}
else
{
BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
<< " bytes already contains " << bEnd
<< "bytes. Cannot add gated spectrometer info of size "
<< headerInfo.str().size() << " bytes.";
}
_handler.init(block);
}
template <class HandlerType>
bool VLBI<HandlerType>::operator()(RawBytes &block) {
++_call_count;
BOOST_LOG_TRIVIAL(debug) << "VLBI operator() called (count = "
<< _call_count << ")";
if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
<< block.used_bytes() << " byte, expected "
<< _buffer_bytes << " byte)";
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
cudaProfilerStop();
return true;
}
////////////////////////////////////////////////////////////////////////
// Copy data to device
CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
_raw_voltage_db.swap();
BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes()
<< ", dataBlockBytes = " << _buffer_bytes<< "\n";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
static_cast<void *>(block.ptr()),
_buffer_bytes, cudaMemcpyHostToDevice,
_h2d_stream));
if (_call_count == 1) {
return false;
}
////////////////////////////////////////////////////////////////////////
// Process data
_packed_voltage.swap();
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
switch (_input_bitDepth) {
case 8:
_unpacker->unpack<8>(_raw_voltage_db.b(), _unpacked_voltage);
break;
case 12:
_unpacker->unpack<12>(_raw_voltage_db.b(), _unpacked_voltage);
break;
default:
throw std::runtime_error("Unsupported number of bits");
}
// ToDo: Eventually calulate minV, maxV from mean and std.
float minV = -2;
float maxV = 2;
pack_2bit(_unpacked_voltage, _packed_voltage.b(), minV, maxV, _proc_stream);
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
if ((_call_count == 2)) {
return false;
}
_outputBuffer.swap();
////////////////////////////////////////////////////////////////////////
BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
size_t remainingBytes = _outputBlockSize - _spillOver.size();
size_t numberOfBlocksInOutput = (_packed_voltage.size() - remainingBytes) / _outputBlockSize;
BOOST_LOG_TRIVIAL(debug) << " Number of blocks in output" << numberOfBlocksInOutput;
_outputBuffer.a().resize((1+numberOfBlocksInOutput) * (_outputBlockSize + vlbiHeaderSize));
BOOST_LOG_TRIVIAL(debug) << " Copying " << _spillOver.size() << " bytes spill over";
// leave room for header and fill first block of output with spill over
std::copy(_spillOver.begin(), _spillOver.end(), _outputBuffer.a().begin() + vlbiHeaderSize);
BOOST_LOG_TRIVIAL(debug) << " Copying remaining " << remainingBytes << " bytes for first block";
// cuda memcopy remainder of first block
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_packed_voltage.a_ptr()),
static_cast<void *>(_outputBuffer.a_ptr()),
remainingBytes,
cudaMemcpyDeviceToHost, _d2h_stream));
const size_t dpitch = _outputBlockSize + vlbiHeaderSize;
const size_t spitch = _outputBlockSize;
const size_t width = _outputBlockSize;
size_t height = numberOfBlocksInOutput;
BOOST_LOG_TRIVIAL(debug) << " Copying " << numberOfBlocksInOutput << " blocks a " << _outputBlockSize << " bytes";
// we now have a full first block, pitch copy rest leaving room for the header
CUDA_ERROR_CHECK(cudaMemcpy2DAsync((void*) (_outputBuffer.a_ptr()+ _outputBlockSize + 2 * vlbiHeaderSize) , dpitch, (void*) thrust::raw_pointer_cast(_packed_voltage.a_ptr() + remainingBytes), spitch, width, height, cudaMemcpyDeviceToHost, _d2h_stream));
// new spill over
_spillOver.resize(_packed_voltage.size() - remainingBytes - numberOfBlocksInOutput * _outputBlockSize);
size_t offset = numberOfBlocksInOutput * _outputBlockSize + remainingBytes;
BOOST_LOG_TRIVIAL(debug) << " New spill over size " << _spillOver.size() << " bytes with offset " << offset;
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_packed_voltage.a_ptr() + offset),
static_cast<void *>( thrust::raw_pointer_cast(_spillOver.data()) ),
_spillOver.size(),
cudaMemcpyDeviceToHost, _d2h_stream));
// fill in header data
for (size_t i = 0; i < numberOfBlocksInOutput + 1; i ++)
{
_vdifHeader.setDataFrameNumber(i); // ToDo: Use correct number
_vdifHeader.setSecondsFromReferenceEpoch(_call_count); // ToDo use correct number
std::copy(reinterpret_cast<uint8_t* >(_vdifHeader.getData()),
reinterpret_cast<uint8_t* >(_vdifHeader.getData()) + vlbiHeaderSize,
_outputBuffer.a().begin() + i * (_outputBlockSize + vlbiHeaderSize));
}
if (_call_count == 3) {
return false;
}
// Wrap in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char *>(_outputBuffer.b_ptr()),
_outputBuffer.b().size(),
_outputBuffer.b().size());
BOOST_LOG_TRIVIAL(debug) << "Calling handler, processing " << _outputBuffer.b().size() << " bytes";
// The handler can't do anything asynchronously without a copy here
// as it would be unsafe (given that it does not own the memory it
// is being passed).
_handler(bytes);
return false; //
} // operator ()
} // edd
} // effelsberg
} // psrdada_cpp
......@@ -71,12 +71,12 @@ void pack_edd_float32_to_2bit(const float * __restrict__ in, uint32_t* __restric
} //namespace kernels
void pack_2bit(thrust::device_vector<float> const& input, thrust::device_vector<uint32_t>& output, float minV, float maxV, cudaStream_t _stream)
void pack_2bit(thrust::device_vector<float> const& input, thrust::device_vector<uint8_t>& output, float minV, float maxV, cudaStream_t _stream)
{
BOOST_LOG_TRIVIAL(debug) << "Packing 2-bit data";
assert(input.size() % NPACK == 0);
output.resize(input.size() / NPACK);
BOOST_LOG_TRIVIAL(debug) << "INput size: " << input.size() << " elements";
output.resize(input.size() / NPACK * 4);
BOOST_LOG_TRIVIAL(debug) << "Input size: " << input.size() << " elements";
BOOST_LOG_TRIVIAL(debug) << "Resizing output buffer to " << output.size() << " elements";
size_t nblocks = std::min(input.size() / EDD_NTHREADS_PACK, 4096uL);
......@@ -84,13 +84,201 @@ void pack_2bit(thrust::device_vector<float> const& input, thrust::device_vector<
float const* input_ptr = thrust::raw_pointer_cast(input.data());
uint32_t* output_ptr = thrust::raw_pointer_cast(output.data());
uint32_t* output_ptr = (uint32_t*) thrust::raw_pointer_cast(output.data());
kernels::pack_edd_float32_to_2bit<<< nblocks, EDD_NTHREADS_PACK, 0, _stream>>>(
input_ptr, output_ptr, input.size(), minV, maxV);
CUDA_ERROR_CHECK(cudaStreamSynchronize(_stream));
}
// Create abit mask with 1 between first and lastBit (inclusive) and zero
/// otherwise;
uint32_t bitMask(uint32_t firstBit, uint32_t lastBit)
{
uint32_t mask = 0U;
for (uint32_t i=firstBit; i<=lastBit; i++)
mask |= 1 << i;
return mask;
}
/// Squeeze a value into the specified bitrange of the target
void setBitsWithValue(uint32_t &target, uint32_t firstBit, uint32_t lastBit, uint32_t value)
{
// check if value is larger than bit range
if (value > (1 << (lastBit + 1 - firstBit)))
{
std::cerr << "value: " << value << ", 1 << (last-bit - firstbit) " << (1 << (lastBit - firstBit)) << ", bitrange: " << lastBit-firstBit << std::endl;
throw std::runtime_error("Value does not fit into bitrange");
}
uint32_t mask = bitMask(firstBit, lastBit);
// zero out relevant bits in data
target &= ~mask;
// shift value to corerct position
value = value << firstBit;
// update target with value
target |= value;
}
/// get numerical value from the specified bits in the target
uint32_t getBitsValue(const uint32_t &target, uint32_t firstBit, uint32_t lastBit)
{
uint32_t mask = bitMask(firstBit, lastBit);
uint32_t res = target & mask;
return res >> firstBit;
}
VDIFHeader::VDIFHeader()
{
for (int i=0; i < 8; i++)
{
data[i] = 0U;
}
// set standard VDIF header
setBitsWithValue(data[1], 30, 30, 0);
setBitsWithValue(data[1], 30, 31, 0);
// set Version Number to 1
setBitsWithValue(data[2], 29, 31, 1);
}
uint32_t* VDIFHeader::getData()
{
return data;
}
void VDIFHeader::setInvalid()
{
setBitsWithValue(data[0], 31, 31, 1);
}
void VDIFHeader::setValid()
{
setBitsWithValue(data[0], 31, 31, 0);
}
bool VDIFHeader::isValid()
{
return (getBitsValue(data[0], 31, 31) == 0);
}
void VDIFHeader::setSecondsFromReferenceEpoch(uint32_t value)
{
setBitsWithValue(data[0], 0, 29, value);
}
uint32_t VDIFHeader::getSecondsFromReferenceEpoch()
{
return getBitsValue(data[0], 0, 29);
}
void VDIFHeader::setReferenceEpoch(uint32_t value)
{
setBitsWithValue(data[1], 24, 29, value);
}
uint32_t VDIFHeader::getReferenceEpoch()
{
return getBitsValue(data[1], 24, 29);
}
void VDIFHeader::setDataFrameNumber(uint32_t value)
{
setBitsWithValue(data[1], 0, 23, value);
}
uint32_t VDIFHeader::getDataFrameNumber()
{
return getBitsValue(data[1], 0, 23);
}
void VDIFHeader::setDataFrameLength(uint32_t value)
{
setBitsWithValue(data[2], 0, 23, value);
}
uint32_t VDIFHeader::getDataFrameLength()
{
return getBitsValue(data[2], 0, 23);
}
uint32_t VDIFHeader::getVersionNumber()
{
return getBitsValue(data[2], 29, 31);
}
void VDIFHeader::setNumberOfChannels(uint32_t value)
{
setBitsWithValue(data[2], 24, 28, value);
}
uint32_t VDIFHeader::getNumberOfChannels()
{
return getBitsValue(data[2], 24, 28);
}
bool VDIFHeader::isRealDataType()
{
return (getBitsValue(data[3], 31, 31) == 0);
}
bool VDIFHeader::isComplexDataType()
{
return (getBitsValue(data[3], 31, 31) == 1);
}
void VDIFHeader::setComplexDataType()
{
setBitsWithValue(data[3], 31, 31, 1<