Commit 7741eefa authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Fixed VLBI Level, some code cleanup

parent fe18fc23
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_TOOLS_CUH
#define PSRDADA_CPP_EFFELSBERG_EDD_TOOLS_CUH
#include <cstdint>
// collection of multi purpose kernels
// blocksize for the array sum kernel
......@@ -24,7 +26,17 @@ __global__ void array_sum(float *in, size_t N, float *out);
/// Calculates 1/N \sum (x_i - 1/N offset)**2 per block
/// To calculate the std dev sum partial results of block using array sum
__global__ void scaled_square_offset_sum(float *in, size_t N, float* offset, float *out);
__global__ void scaled_square_residual_sum(float *in, size_t N, float* offset, float *out);
/// Create a bit mask with 1 between first and lastBit (inclusive), zero otherwise;
uint32_t bitMask(uint32_t firstBit, uint32_t lastBit);
/// Squeeze a value into the specified bitrange of the target
void setBitsWithValue(uint32_t &target, uint32_t firstBit, uint32_t lastBit, uint32_t value);
/// Get numerical value from the specified bits in the target
uint32_t getBitsValue(const uint32_t &target, uint32_t firstBit, uint32_t lastBit);
} // edd
} // effelsberg
......
......@@ -15,22 +15,6 @@ namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
// some helper functions to dealm with bit encoding of the header
// ToDo: Find better place in utility collection
/// Create bit mask with ones between first and lastBit (inclusive) and zeros
/// otherwise;
uint32_t bitMask(uint32_t firstBit, uint32_t lastBit);
/// Squeeze a value into the specified bitrange of the target. Throws runtime
/// error if too large.
void setBitsWithValue(uint32_t &target, uint32_t firstBit, uint32_t lastBit, uint32_t value);
/// Get numerical value from the specified bits in the target.
uint32_t getBitsValue(const uint32_t &target, uint32_t firstBit, uint32_t lastBit);
const size_t vlbiHeaderSize = 8 * 32 / 8; // bytes [8 words a 32 bit]
/// class VDIFHeader stores a VDIF compliant header block with conveniant
......@@ -97,8 +81,9 @@ public:
* @brief Constructor
*
* @param buffer_bytes A RawBytes object wrapping a DADA header buffer
* @param speadHeapSize Size of the spead heap block.
* @param input_bitDepth Bit depth of the sampled signal.
* @param speadHeapSize Size of the spead heap block.
* @param digitizer_threshold Threshold for the 2 bit digitization in units of the RMS of the signal.
* @param VDIFHeader Header of the VDIF output to be sed. Must contain size of the output VDIF payload in bytes.
* @param handler Output handler
*
......@@ -106,6 +91,7 @@ public:
VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
std::size_t speadHeapSize,
double sampleRate,
double digitizer_threshold,
const VDIFHeader &vdifHeader,
HandlerType &handler);
~VLBI();
......@@ -138,7 +124,8 @@ private:
std::size_t _output_bitDepth;
std::size_t _speadHeapSize;
std::size_t _outputBlockSize;
double _sampleRate;
double _sampleRate, _digitizer_threshold;
VDIFHeader _vdifHeader;
HandlerType &_handler;
int _call_count;
......@@ -147,15 +134,16 @@ private:
// Input data
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
// process tmp
// Tmp data for processing
thrust::device_vector<float> _unpacked_voltage;
thrust::device_vector<float> _baseLineN;
thrust::device_vector<float> _stdDevN;
// Output data
DoubleDeviceBuffer<uint32_t> _packed_voltage;
DoubleDeviceBuffer<uint8_t> _packed_voltage;
DoublePinnedHostBuffer<uint8_t> _outputBuffer;
VDIFHeader _vdifHeader;
// spill over between two dada blocks as vdif block not necessarily aligned
thrust::host_vector<uint8_t, thrust::system::cuda::experimental::pinned_allocator<uint8_t>> _spillOver;
cudaStream_t _h2d_stream;
......@@ -165,13 +153,12 @@ private:
// pack float to 2 bit integers with VLBI non linear scaling with levels
// sigma = \sqrt(sigma2))
// -n * sigma, -1 signa, sigma, n * sigma
// For performance/technical reasons it is
// sigma = \sqrt(sigma2)) and meanN = N * mean
__global__ void pack2bit_nonLinear(const float *__restrict__ input,
uint32_t *__restrict__ output, size_t inputSize,
float n, float *sigma2);
float n, float *sigma2, float *meanN);
} //namespace edd
} //namespace effelsberg
......
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"
//#include "psrdada_cpp/effelsberg/edd/Packer.cuh"
#include "psrdada_cpp/effelsberg/edd/Packer.cuh"
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
#include "psrdada_cpp/cuda_utils.hpp"
......@@ -7,6 +7,7 @@
#include <cuda.h>
#include <cuda_profiler_api.h>
#include <thrust/system/cuda/execution_policy.h>
#include <thrust/extrema.h>
#include <cstring>
#include <iostream>
......@@ -17,42 +18,42 @@ namespace effelsberg {
namespace edd {
template <class HandlerType>
VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
std::size_t speadHeapSize,
double sampleRate,
const VDIFHeader &vdifHeader,
HandlerType &handler)
std::size_t speadHeapSize, double sampleRate,
double digitizer_threshold,
const VDIFHeader &vdifHeader, HandlerType &handler)
: _buffer_bytes(buffer_bytes), _input_bitDepth(input_bitDepth),
_sampleRate(sampleRate), _vdifHeader(vdifHeader), _output_bitDepth(2),
_speadHeapSize(speadHeapSize), _handler(handler), _call_count(0) {
_sampleRate(sampleRate), _digitizer_threshold(digitizer_threshold),
_vdifHeader(vdifHeader), _output_bitDepth(2),
_speadHeapSize(speadHeapSize), _handler(handler), _call_count(0) {
// Sanity checks
// check for any device errors
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(info) << "Creating new VLBI instance";
BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with "
BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with "
<< vlbiHeaderSize << "bytes header info and "
<< _vdifHeader.getDataFrameLength() << " bytes payload";
BOOST_LOG_TRIVIAL(debug) << " Expecting speadheaps of size " << speadHeapSize
<< " byte";
<< _vdifHeader.getDataFrameLength()
<< " bytes payload";
BOOST_LOG_TRIVIAL(debug) << " Expecting speadheaps of size "
<< speadHeapSize << " byte";
BOOST_LOG_TRIVIAL(debug) << " Sample rate " << _sampleRate << " Hz";
BOOST_LOG_TRIVIAL(debug) << " Sample rate " << _sampleRate << " Hz";
std::size_t n64bit_words = _buffer_bytes / sizeof(uint64_t);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
_raw_voltage_db.resize(n64bit_words);
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): "
<< _raw_voltage_db.size();
BOOST_LOG_TRIVIAL(debug) << " Input voltages size : "
<< _raw_voltage_db.size() << " 64-bit words,"
<< _raw_voltage_db.size() * 64 / 8 << " bytes";
_packed_voltage.resize(n64bit_words * 64 / input_bitDepth / 16);
_packed_voltage.resize(n64bit_words * 64 / input_bitDepth * _output_bitDepth /
8);
_spillOver.reserve(vdifHeader.getDataFrameLength() * 8);
BOOST_LOG_TRIVIAL(debug) << " Output voltages size: "
BOOST_LOG_TRIVIAL(debug) << " Output voltages size: "
<< _packed_voltage.size() << " byte";
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
......@@ -61,7 +62,7 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
_unpacker.reset(new Unpacker(_proc_stream));
_vdifHeader.setBitsPerSample(2);
_vdifHeader.setBitsPerSample(_output_bitDepth);
_vdifHeader.setNumberOfChannels(1);
_vdifHeader.setRealDataType();
} // constructor
......@@ -81,20 +82,18 @@ template <class HandlerType> void VLBI<HandlerType>::init(RawBytes &block) {
headerInfo << "\n"
<< "# VLBI parameters: \n";
size_t bEnd = std::strlen(block.ptr());
if (bEnd + headerInfo.str().size() < block.total_bytes()) {
std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
} else {
BOOST_LOG_TRIVIAL(warning)
<< "Header of size " << block.total_bytes()
<< " bytes already contains " << bEnd
<< "bytes. Cannot add VLBI info of size "
<< headerInfo.str().size() << " bytes.";
BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
<< " bytes already contains " << bEnd
<< "bytes. Cannot add VLBI info of size "
<< headerInfo.str().size() << " bytes.";
}
_baseLineN.resize(array_sum_Nthreads);
_stdDevN.resize(array_sum_Nthreads);
_handler.init(block);
}
......@@ -146,26 +145,50 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()), _unpacked_voltage.size(), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::
array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage.data()),
_unpacked_voltage.size(),
thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::
array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(),
thrust::raw_pointer_cast(_baseLineN.data()));
BOOST_LOG_TRIVIAL(debug) << "Calculate std-dev";
psrdada_cpp::effelsberg::edd::
scaled_square_residual_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage.data()),
_unpacked_voltage.size(), thrust::raw_pointer_cast(_baseLineN.data()),
thrust::raw_pointer_cast(_stdDevN.data()));
psrdada_cpp::effelsberg::edd::
array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_stdDevN.data()), _stdDevN.size(),
thrust::raw_pointer_cast(_stdDevN.data()));
BOOST_LOG_TRIVIAL(debug) << "Calculate std dev";
psrdada_cpp::effelsberg::edd::scaled_square_offset_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()), _unpacked_voltage.size(), thrust::raw_pointer_cast(_baseLineN.data()), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
// non linear packing
float v = 0.981599; // L. Kogan, Correction functions for digital correlators
// with two and four quantization levels, Radio Science 33 (1998), 1289-1296
BOOST_LOG_TRIVIAL(debug) << "Packing data with non linear 2-bit packaging using levels -v*sigma, 0, v*sigma with n = " << v;
_packed_voltage.b().resize(_unpacked_voltage.size() / 16);
BOOST_LOG_TRIVIAL(debug) << "Input size: " << _unpacked_voltage.size() << " elements";
BOOST_LOG_TRIVIAL(debug) << "Resizing output buffer to " << _packed_voltage.b().size() << " elements";
pack2bit_nonLinear<<<128, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()),
thrust::raw_pointer_cast(_packed_voltage.b().data()),
_unpacked_voltage.size(), v, thrust::raw_pointer_cast(_baseLineN.data()));
BOOST_LOG_TRIVIAL(debug) << "Packing data with non linear 2-bit packaging "
"using levels -v*sigma, 0, v*sigma with v = "
<< _digitizer_threshold;
_packed_voltage.b().resize(_unpacked_voltage.size() * 2 / 8);
BOOST_LOG_TRIVIAL(debug) << "Input size: " << _unpacked_voltage.size()
<< " elements";
BOOST_LOG_TRIVIAL(debug) << "Resizing output buffer to "
<< _packed_voltage.b().size() << " byte";
pack2bit_nonLinear<<<128, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage.data()),
(uint32_t *)thrust::raw_pointer_cast(_packed_voltage.b().data()),
_unpacked_voltage.size(), _digitizer_threshold,
thrust::raw_pointer_cast(_stdDevN.data()),
thrust::raw_pointer_cast(_baseLineN.data()));
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
BOOST_LOG_TRIVIAL(trace) << " Standard Deviation squared: " << _stdDevN[0]
<< " "
<< "Mean Value: "
<< _baseLineN[0] / _unpacked_voltage.size();
if ((_call_count == 2)) {
return false;
......
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
#include "psrdada_cpp/common.hpp"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
__global__ void array_sum(float *in, size_t N, float *out)
{
__global__ void array_sum(float *in, size_t N, float *out) {
__shared__ float data[array_sum_Nthreads];
size_t tid = threadIdx.x;
......@@ -14,7 +14,7 @@ namespace edd {
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
ls += in[i];
}
data[tid] = ls;
......@@ -27,20 +27,14 @@ namespace edd {
__syncthreads();
}
// unroll last warp
// if (tid < 32)
//{
// warpReduce(data, tid);
//}
if (tid == 0) {
out[blockIdx.x] = data[0];
}
}
__global__ void scaled_square_offset_sum(float *in, size_t N, float* offset, float *out) {
__global__ void scaled_square_residual_sum(float *in, size_t N, float *offset,
float *out) {
__shared__ float data[array_sum_Nthreads];
size_t tid = threadIdx.x;
......@@ -69,6 +63,46 @@ __global__ void scaled_square_offset_sum(float *in, size_t N, float* offset, flo
}
uint32_t bitMask(uint32_t firstBit, uint32_t lastBit) {
uint32_t mask = 0U;
for (uint32_t i = firstBit; i <= lastBit; i++)
mask |= 1 << i;
return mask;
}
void setBitsWithValue(uint32_t &target, uint32_t firstBit, uint32_t lastBit,
uint32_t value) {
// check if value is larger than bit range
if (value > (1 << (lastBit + 1 - firstBit))) {
BOOST_LOG_TRIVIAL(error)
<< "value: " << value << ", 1 << (last-bit - firstbit) "
<< (1 << (lastBit - firstBit)) << ", bitrange: " << lastBit - firstBit
<< std::endl;
throw std::runtime_error("Value does not fit into bitrange");
}
uint32_t mask = bitMask(firstBit, lastBit);
// zero out relevant bits in data
target &= ~mask;
// shift value to corerct position
value = value << firstBit;
// update target with value
target |= value;
}
uint32_t getBitsValue(const uint32_t &target, uint32_t firstBit,
uint32_t lastBit) {
uint32_t mask = bitMask(firstBit, lastBit);
uint32_t res = target & mask;
return res >> firstBit;
}
} // edd
} // effelsberg
} // psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
#include "psrdada_cpp/cuda_utils.hpp"
#include <cuda.h>
#include <cstdint>
#define EDD_NTHREADS_PACK 1024
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
// Create abit mask with 1 between first and lastBit (inclusive) and zero
/// otherwise;
uint32_t bitMask(uint32_t firstBit, uint32_t lastBit)
{
uint32_t mask = 0U;
for (uint32_t i=firstBit; i<=lastBit; i++)
mask |= 1 << i;
return mask;
}
/// Squeeze a value into the specified bitrange of the target
void setBitsWithValue(uint32_t &target, uint32_t firstBit, uint32_t lastBit, uint32_t value)
{
// check if value is larger than bit range
if (value > (1 << (lastBit + 1 - firstBit)))
{
std::cerr << "value: " << value << ", 1 << (last-bit - firstbit) " << (1 << (lastBit - firstBit)) << ", bitrange: " << lastBit-firstBit << std::endl;
throw std::runtime_error("Value does not fit into bitrange");
}
uint32_t mask = bitMask(firstBit, lastBit);
// zero out relevant bits in data
target &= ~mask;
// shift value to corerct position
value = value << firstBit;
// update target with value
target |= value;
}
/// get numerical value from the specified bits in the target
uint32_t getBitsValue(const uint32_t &target, uint32_t firstBit, uint32_t lastBit)
{
uint32_t mask = bitMask(firstBit, lastBit);
uint32_t res = target & mask;
return res >> firstBit;
}
VDIFHeader::VDIFHeader()
{
for (int i=0; i < 8; i++)
{
VDIFHeader::VDIFHeader() {
for (int i = 0; i < 8; i++) {
data[i] = 0U;
}
......@@ -68,138 +22,101 @@ VDIFHeader::VDIFHeader()
setBitsWithValue(data[2], 29, 31, 1);
}
uint32_t* VDIFHeader::getData()
{
return data;
}
uint32_t *VDIFHeader::getData() { return data; }
void VDIFHeader::setInvalid()
{
setBitsWithValue(data[0], 31, 31, 1);
}
void VDIFHeader::setInvalid() { setBitsWithValue(data[0], 31, 31, 1); }
void VDIFHeader::setValid()
{
setBitsWithValue(data[0], 31, 31, 0);
}
void VDIFHeader::setValid() { setBitsWithValue(data[0], 31, 31, 0); }
bool VDIFHeader::isValid() const
{
bool VDIFHeader::isValid() const {
return (getBitsValue(data[0], 31, 31) == 0);
}
void VDIFHeader::setSecondsFromReferenceEpoch(uint32_t value)
{
void VDIFHeader::setSecondsFromReferenceEpoch(uint32_t value) {
setBitsWithValue(data[0], 0, 29, value);
}
uint32_t VDIFHeader::getSecondsFromReferenceEpoch() const
{
uint32_t VDIFHeader::getSecondsFromReferenceEpoch() const {
return getBitsValue(data[0], 0, 29);
}
void VDIFHeader::setReferenceEpoch(uint32_t value)
{
void VDIFHeader::setReferenceEpoch(uint32_t value) {
setBitsWithValue(data[1], 24, 29, value);
}
uint32_t VDIFHeader::getReferenceEpoch() const
{
uint32_t VDIFHeader::getReferenceEpoch() const {
return getBitsValue(data[1], 24, 29);
}
void VDIFHeader::setDataFrameNumber(uint32_t value)
{
void VDIFHeader::setDataFrameNumber(uint32_t value) {
setBitsWithValue(data[1], 0, 23, value);
}
uint32_t VDIFHeader::getDataFrameNumber() const
{
uint32_t VDIFHeader::getDataFrameNumber() const {
return getBitsValue(data[1], 0, 23);
}
void VDIFHeader::setDataFrameLength(uint32_t value)
{
void VDIFHeader::setDataFrameLength(uint32_t value) {
setBitsWithValue(data[2], 0, 23, value);
}
uint32_t VDIFHeader::getDataFrameLength() const
{
uint32_t VDIFHeader::getDataFrameLength() const {
return getBitsValue(data[2], 0, 23);
}
uint32_t VDIFHeader::getVersionNumber() const
{
uint32_t VDIFHeader::getVersionNumber() const {
return getBitsValue(data[2], 29, 31);
}
void VDIFHeader::setNumberOfChannels(uint32_t value)
{
void VDIFHeader::setNumberOfChannels(uint32_t value) {
setBitsWithValue(data[2], 24, 28, value);
}
uint32_t VDIFHeader::getNumberOfChannels() const
{
uint32_t VDIFHeader::getNumberOfChannels() const {
return getBitsValue(data[2], 24, 28);
}
bool VDIFHeader::isRealDataType() const
{
bool VDIFHeader::isRealDataType() const {
return (getBitsValue(data[3], 31, 31) == 0);
}
bool VDIFHeader::isComplexDataType() const
{
bool VDIFHeader::isComplexDataType() const {
return (getBitsValue(data[3], 31, 31) == 1);
}
void VDIFHeader::setComplexDataType()
{
setBitsWithValue(data[3], 31, 31, 1);
}
void VDIFHeader::setComplexDataType() { setBitsWithValue(data[3], 31, 31, 1); }
void VDIFHeader::setRealDataType()
{
setBitsWithValue(data[0], 31, 31, 0);
}
void VDIFHeader::setRealDataType() { setBitsWithValue(data[0], 31, 31, 0); }
void VDIFHeader::setBitsPerSample(uint32_t value)
{
void VDIFHeader::setBitsPerSample(uint32_t value) {
setBitsWithValue(data[3], 26, 30, value);
}
uint32_t VDIFHeader::getBitsPerSample() const
{
uint32_t VDIFHeader::getBitsPerSample() const {
return getBitsValue(data[3], 26, 30);
}
void VDIFHeader::setThreadId(uint32_t value)
{
void VDIFHeader::setThreadId(uint32_t value) {
setBitsWithValue(data[3], 16, 25, value);
}
uint32_t VDIFHeader::getThreadId() const
{
uint32_t VDIFHeader::getThreadId() const {
return getBitsValue(data[3], 16, 25);
}
void VDIFHeader::setStationId(uint32_t value)
{
void VDIFHeader::setStationId(uint32_t value) {
setBitsWithValue(data[3], 0, 15, value);
}
uint32_t VDIFHeader::getStationId() const
{
uint32_t VDIFHeader::getStationId() const {
return getBitsValue(data[3], 0, 15);
}
// pack float to 2 bit integers with VLBI non linear scaling with levels
// sigma = \sqrt(sigma2))
// -n * sigma, -1 signa, sigma, n * sigma
__global__ void pack2bit_nonLinear(const float *__restrict__ input,