Commit e931dc38 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Restructured Gated to merge with full stokes

parent 16156245
......@@ -14,6 +14,13 @@
#include "cublas_v2.h"
#include <iostream>
#include <iomanip>
#include <cstring>
#include <sstream>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
......@@ -26,21 +33,309 @@ namespace edd {
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef float IntegratedPowerType;
struct InputDataStream
{
virtual void swap() = 0;
virtual void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout) = 0;
virtual void getFromBlock(RawBytes &block, DadaBufferLayout &dadaBufferLayout, cudaStream_t &_h2d_stream) = 0;
};
/// Input data and intermediate processing data for one polarization
struct PolarizationData : public InputDataStream
{
/// Raw ADC Voltage
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
/// Side channel data
DoubleDeviceBuffer<uint64_t> _sideChannelData;
/// Baseline in gate 0 state
thrust::device_vector<UnpackedVoltageType> _baseLineG0;
/// Baseline in gate 1 state
thrust::device_vector<UnpackedVoltageType> _baseLineG1;
/// Baseline in gate 0 state after update
thrust::device_vector<UnpackedVoltageType> _baseLineG0_update;
/// Baseline in gate 1 state after update
thrust::device_vector<UnpackedVoltageType> _baseLineG1_update;
/// Channelized voltage in gate 0 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
/// Channelized voltage in gate 1 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;
/// Swaps input buffers
void swap()
{
_raw_voltage.swap();
_sideChannelData.swap();
}
void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout)
{
_raw_voltage.resize(dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " << _raw_voltage.size();
_baseLineG0.resize(1);
_baseLineG0_update.resize(1);
_baseLineG1.resize(1);
_baseLineG1_update.resize(1);
_channelised_voltage_G0.resize(_nchans * _batch);
_channelised_voltage_G1.resize(_nchans * _batch);
BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " << _channelised_voltage_G0.size();
}
void getFromBlock(RawBytes &block, DadaBufferLayout &dadaBufferLayout, cudaStream_t &_h2d_stream)
{
BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes()
<< ", dataBlockBytes = " << dadaBufferLayout.sizeOfData() << "\n";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage.a_ptr()),
static_cast<void *>(block.ptr()),
dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice,
_h2d_stream));
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void *>(_sideChannelData.a_ptr()),
static_cast<void *>(block.ptr() + dadaBufferLayout.sizeOfData() + dadaBufferLayout.sizeOfGap()),
dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16)
<< std::setfill('0') << std::hex <<
(reinterpret_cast<uint64_t*>(block.ptr() + dadaBufferLayout.sizeOfData()
+ dadaBufferLayout.sizeOfGap()))[0] <<
std::dec;
}
};
//struct DualPolarizationData : public InputDataStream
//{
// PolarizationData pol0, pol1;
// void swap()
// {
// pol0.swap(); pol1.swap();
// }
//
// void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout)
// {
// BOOST_LOG_TRIVIAL(debug) << " Pol0";
// pol0.resize(_nchans, _batch, dadaBufferLayout);
// BOOST_LOG_TRIVIAL(debug) << " Pol1";
// pol1.resize(_nchans, _batch, dadaBufferLayout);
// }
//};
// Output data for one gate N = 1 for one pol, or 4 for full stokes
struct OutputDataStream
{
/// Reset outptu for new integration
virtual void reset(cudaStream_t &_proc_stream) = 0;
virtual void swap() = 0;
virtual void resize(size_t size, size_t blocks) = 0;
DoublePinnedHostBuffer<char> _host_power;
};
// Output data for one gate, single power spectrum
struct PowerSpectrumOutput
{
/// spectrum data
DoubleDeviceBuffer<IntegratedPowerType> data;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Reset outptu for new integration
void reset(cudaStream_t &_proc_stream)
{
thrust::fill(thrust::cuda::par.on(_proc_stream), data.a().begin(),data.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
/// Swap output buffers
void swap()
{
data.swap();
_noOfBitSets.swap();
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
{
data.resize(size * blocks);
_noOfBitSets.resize(blocks);
}
};
struct GatedPowerSpectrumOutput : public OutputDataStream
{
PowerSpectrumOutput G0, G1;
size_t _nchans;
void reset(cudaStream_t &_proc_stream)
{
G0.reset(_proc_stream);
G1.reset(_proc_stream);
}
/// Swap output buffers
void swap()
{
G0.swap();
G1.swap();
_host_power.swap();
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
{
// ToDo: size passed in constructor, also number of blocks.
G0.resize(size, blocks);
G1.resize(size, blocks);
_nchans = size;
// on the host both power are stored in the same data buffer together with
// the number of bit sets
_host_power.resize( 2 * ( size * sizeof(IntegratedPowerType) + sizeof(size_t) ) * G0._noOfBitSets.size());
}
void data2Host(cudaStream_t &_d2h_stream)
{
// copy data to host if block is finished
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
for (size_t i = 0; i < G0._noOfBitSets.size(); i++)
{
// size of individual spectrum + meta
size_t memslicesize = (_nchans * sizeof(IntegratedPowerType));
// number of spectra per output
size_t memOffset = 2 * i * (memslicesize + + sizeof(size_t));
// copy 2x channel data
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset) ,
static_cast<void *>(G0.data.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 1 * memslicesize) ,
static_cast<void *>(G1.data.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
// copy noOf bit set data
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
}
}
};
// Output data for one gate full stokes
struct FullStokesOutput : public OutputDataStream
{
/// Stokes parameters
DoubleDeviceBuffer<IntegratedPowerType> I;
DoubleDeviceBuffer<IntegratedPowerType> Q;
DoubleDeviceBuffer<IntegratedPowerType> U;
DoubleDeviceBuffer<IntegratedPowerType> V;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Reset outptu for new integration
void reset(cudaStream_t &_proc_stream)
{
thrust::fill(thrust::cuda::par.on(_proc_stream), I.a().begin(), I.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), Q.a().begin(), Q.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), U.a().begin(), U.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), V.a().begin(), V.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
/// Swap output buffers
void swap()
{
I.swap();
Q.swap();
U.swap();
V.swap();
_noOfBitSets.swap();
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
{
I.resize(size * blocks);
Q.resize(size * blocks);
U.resize(size * blocks);
V.resize(size * blocks);
_noOfBitSets.resize(blocks);
}
};
struct GatedFullStokesOutput: public OutputDataStream
{
FullStokesOutput G0, G1;
void reset(cudaStream_t &_proc_stream)
{
G0.reset(_proc_stream);
G1.reset(_proc_stream);
}
/// Swap output buffers
void swap()
{
G0.swap();
G1.swap();
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
{
G0.resize(size, blocks);
G1.resize(size, blocks);
}
};
/**
@class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template <class HandlerType, typename IntegratedPowerType> class GatedSpectrometer {
public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
// typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
template <class HandlerType > class GatedSpectrometer {
public:
/**
* @brief Constructor
......@@ -90,11 +385,11 @@ public:
bool operator()(RawBytes &block);
private:
void process(thrust::device_vector<RawVoltageType> const &digitiser_raw,
thrust::device_vector<uint64_t> const &sideChannelData,
thrust::device_vector<IntegratedPowerType> &detected,
thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1);
// gate the data from the input stream and fft data per gate. Number of bit
// sets in every gate is stored in the output datasets
void gated_fft(PolarizationData &data,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
private:
DadaBufferLayout _dadaBufferLayout;
......@@ -111,30 +406,23 @@ private:
HandlerType &_handler;
cufftHandle _fft_plan;
uint64_t _nchans;
uint64_t _nBlocks;
uint64_t _call_count;
double _processing_efficiency;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector;
// Input data
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<uint64_t> _sideChannelData_db;
// Output data
DoubleDeviceBuffer<IntegratedPowerType> _power_db;
OutputDataStream* outputDataStream;
InputDataStream* inputDataStream;
std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector;
DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G0;
DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G1;
DoublePinnedHostBuffer<char> _host_power_db;
// Intermediate process steps
// Temporary processing block
// ToDo: Use inplace FFT to avoid temporary coltage array
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _baseLineNG0;
thrust::device_vector<UnpackedVoltageType> _baseLineNG1;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
......@@ -170,8 +458,8 @@ private:
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float baseLineG0,
const float baseLineG1,
const float* __restrict__ _baseLineG0,
const float* __restrict__ _baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0,
......
......@@ -27,87 +27,9 @@ namespace edd {
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
/// Input data and intermediate processing data for one polarization
struct PolarizationData
{
/// Raw ADC Voltage
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
/// Side channel data
DoubleDeviceBuffer<uint64_t> _sideChannelData;
/// Baseline in gate 0 state
thrust::device_vector<UnpackedVoltageType> _baseLineG0;
/// Baseline in gate 1 state
thrust::device_vector<UnpackedVoltageType> _baseLineG1;
/// Baseline in gate 0 state after update
thrust::device_vector<UnpackedVoltageType> _baseLineG0_update;
/// Baseline in gate 1 state after update
thrust::device_vector<UnpackedVoltageType> _baseLineG1_update;
/// Channelized voltage in gate 0 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
/// Channelized voltage in gate 1 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;
/// Swaps input buffers
void swap()
{
_raw_voltage.swap();
_sideChannelData.swap();
}
};
// Output data for one gate
struct StokesOutput
{
/// Stokes parameters
DoubleDeviceBuffer<IntegratedPowerType> I;
DoubleDeviceBuffer<IntegratedPowerType> Q;
DoubleDeviceBuffer<IntegratedPowerType> U;
DoubleDeviceBuffer<IntegratedPowerType> V;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Reset outptu for new integration
void reset(cudaStream_t &_proc_stream)
{
thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
/// Swap output buffers
void swap()
{
I.swap();
Q.swap();
U.swap();
V.swap();
_noOfBitSets.swap();
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
{
I.resize(size * blocks);
Q.resize(size * blocks);
U.resize(size * blocks);
V.resize(size * blocks);
_noOfBitSets.resize(blocks);
}
};
......@@ -177,8 +99,8 @@ public:
private:
// gate the data and fft data per gate
void gated_fft(PolarizationData &data,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
private:
DadaBufferLayout _dadaBufferLayout;
......
......@@ -17,8 +17,9 @@ namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
// Reduce thread local vatiable v in shared array x, so that x[0]
template<typename T>
__device__ void reduce(T *x, const T &v)
__device__ void sum_reduce(T *x, const T &v)
{
x[threadIdx.x] = v;
__syncthreads();
......@@ -28,26 +29,40 @@ __device__ void reduce(T *x, const T &v)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
}
// If one of the side channel items is lsot, then both are considered as lost
// here
__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t* __restrict__ B, size_t N)
{
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x)
{
uint64_t v = A[i] || B[i];
A[i] = v;
B[i] = v;
}
}
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float baseLineG0,
const float baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0, uint64_cu* stats_G1) {
// float baseLineG0 = (*_baseLineNG0) / N;
// float baseLineG1 = (*_baseLineNG1) / N;
__global__ void gating(float* __restrict__ G0,
float* __restrict__ G1,
const uint64_t* __restrict__ sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float* __restrict__ _baseLineG0,
const float* __restrict__ _baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0, uint64_cu* stats_G1) {
// statistics values for samopels to G0, G1
uint32_t _G0stats = 0;
uint32_t _G1stats = 0;
const float baseLineG0 = _baseLineG0[0];
const float baseLineG1 = _baseLineG1[0];
float baselineUpdateG0 = 0;
float baselineUpdateG1 = 0;
......@@ -55,12 +70,8 @@ __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uin
i += blockDim.x * gridDim.x) {
const float v = G0[i];
const uint64_t sideChannelItem =
sideChannelData[((i / heapSize) * (noOfSideChannels)) +
selectedSideChannel]; // Probably not optimal access as
// same data is copied for several
// threads, but maybe efficiently
// handled by cache?
const uint64_t sideChannelItem = sideChannelData[((i / heapSize) * (noOfSideChannels)) +
selectedSideChannel];
const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
......@@ -73,36 +84,75 @@ __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uin
baselineUpdateG1 += v * bit_set * (!heap_lost);
baselineUpdateG0 += v * (!bit_set) *(!heap_lost);
}
__shared__ uint32_t x[1024];
// Reduce G0, G1
reduce<uint32_t>(x, _G0stats);
if(threadIdx.x == 0)
sum_reduce<uint32_t>(x, _G0stats);
if(threadIdx.x == 0) {
atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]);
}
__syncthreads();
reduce<uint32_t>(x, _G1stats);
if(threadIdx.x == 0)
sum_reduce<uint32_t>(x, _G1stats);
if(threadIdx.x == 0) {