Commit 0822a657 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Deleted obsolete header

parent f210a689
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
#include "psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp"
#include "thrust/device_vector.h"
#include "cufft.h"
#include "cublas_v2.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
#define BIT_MASK(bit) (1uL << (bit))
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0)
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
/**
@class GatedStokesSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template <class HandlerType> class GatedStokesSpectrometer {
public:
public:
/**
* @brief Constructor
*
* @param buffer_bytes A RawBytes object wrapping a DADA header buffer
* @param nSideChannels Number of side channel items in the data stream,
* @param selectedSideChannel Side channel item used for gating
* @param selectedBit bit of side channel item used for gating
* @param speadHeapSize Size of the spead heap block.
* @param fftLength Size of the FFT
* @param naccumulate Number of samples to integrate in the individual
* FFT bins
* @param nbits Bit depth of the sampled signal
* @param handler Output handler
*
*/
GatedStokesSpectrometer(const DadaBufferLayout &bufferLayout,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t fft_length,
std::size_t naccumulate, std::size_t nbits,
HandlerType &handler);
~GatedStokesSpectrometer();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @details The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes &block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer output
* are the integrated specttra with/without bit set.
*/
bool operator()(RawBytes &block);
private:
// gate the data and fft data per gate
void gated_fft(PolarizationData &data,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
private:
DadaBufferLayout _dadaBufferLayout;
std::size_t _fft_length;
std::size_t _naccumulate;
std::size_t _nbits;
std::size_t _selectedSideChannel;
std::size_t _selectedBit;
std::size_t _batch;
std::size_t _nsamps_per_output_spectra;
std::size_t _nsamps_per_buffer;
std::size_t _nsamps_per_heap;
HandlerType &_handler;
cufftHandle _fft_plan;
uint64_t _nchans;
uint64_t _call_count;
double _processing_efficiency;
std::unique_ptr<Unpacker> _unpacker;
// Input data and per pol intermediate data
PolarizationData polarization0, polarization1;
// Output data
StokesOutput stokes_G0, stokes_G1;
DoublePinnedHostBuffer<char> _host_power_db;
// Temporary processing block
// ToDo: Use inplace FFT to avoid temporary coltage array
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
};
/**
* @brief Splits the input data depending on a bit set into two arrays.
*
* @details The resulting gaps are filled with a given baseline value in the other stream.
*
* @param GO Input data. Data is set to the baseline value if corresponding
* sideChannelData bit at bitpos os set.
* @param G1 Data in this array is set to the baseline value if corresponding
* sideChannelData bit at bitpos is not set.
* @param sideChannelData noOfSideChannels items per block of heapSize
* bytes in the input data.
* @param N lebgth of the input/output arrays G0.
* @param heapsize Size of the blocks for which there is an entry in the
sideChannelData.
* @param bitpos Position of the bit to evaluate for processing.
* @param noOfSideChannels Number of side channels items per block of
* data.
* @param selectedSideChannel No. of side channel item to be eveluated.
0 <= selectedSideChannel < noOfSideChannels.
* @param stats_G0 No. of sampels contributing to G0, accounting also
* for loat heaps
* @param stats_G1 No. of sampels contributing to G1, accounting also
* for loat heaps
*/
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float* __restrict__ _baseLineG0,
const float* __restrict__ _baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0,
uint64_cu* stats_G1);
/**
* @brief calculate stokes IQUV from two complex valuies for each polarization
*/
//__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V);
__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V)
{
I = fabs(p1.x*p1.x + p1.y * p1.y) + fabs(p2.x*p2.x + p2.y * p2.y);
Q = fabs(p1.x*p1.x + p1.y * p1.y) - fabs(p2.x*p2.x + p2.y * p2.y);
U = 2 * (p1.x*p2.x + p1.y * p2.y);
V = -2 * (p1.y*p2.x - p1.x * p2.y);
}
/**
* @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate
* complex spectra for individual polarizations
*/
__global__ void stokes_accumulate(float2 const __restrict__ *pol1,
float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V,
int nchans, int naccumulate)
{
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nchans);
i += blockDim.x * gridDim.x)
{
float rI = 0;
float rQ = 0;
float rU = 0;
float rV = 0;
for (int k=0; k < naccumulate; k++)
{
const float2 p1 = pol1[i + k * nchans];
const float2 p2 = pol2[i + k * nchans];
rI += fabs(p1.x * p1.x + p1.y * p1.y) + fabs(p2.x * p2.x + p2.y * p2.y);
rQ += fabs(p1.x * p1.x + p1.y * p1.y) - fabs(p2.x * p2.x + p2.y * p2.y);
rU += 2.f * (p1.x * p2.x + p1.y * p2.y);
rV += -2.f * (p1.y * p2.x - p1.x * p2.y);
}
I[i] += rI;
Q[i] += rQ;
U[i] += rU;
V[i] += rV;
}
}
} // edd
} // effelsberg
} // psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#include "psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh"
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include <cuda.h>
#include <cuda_profiler_api.h>
#include <thrust/system/cuda/execution_policy.h>
#include <iostream>
#include <iomanip>
#include <cstring>
#include <sstream>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
// Reduce thread local vatiable v in shared array x, so that x[0]
template<typename T>
__device__ void sum_reduce(T *x, const T &v)
{
x[threadIdx.x] = v;
__syncthreads();
for(int s = blockDim.x / 2; s > 0; s = s / 2)
{
if (threadIdx.x < s)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
}
// If one of the side channel items is lsot, then both are considered as lost
// here
__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t* __restrict__ B, size_t N)
{
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x)
{
uint64_t v = A[i] || B[i];
A[i] = v;
B[i] = v;
}
}
__global__ void gating(float* __restrict__ G0,
float* __restrict__ G1,
const uint64_t* __restrict__ sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float* __restrict__ _baseLineG0,
const float* __restrict__ _baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0, uint64_cu* stats_G1) {
// statistics values for samopels to G0, G1
uint32_t _G0stats = 0;
uint32_t _G1stats = 0;
const float baseLineG0 = _baseLineG0[0];
const float baseLineG1 = _baseLineG1[0];
float baselineUpdateG0 = 0;
float baselineUpdateG1 = 0;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
const float v = G0[i];
const uint64_t sideChannelItem = sideChannelData[((i / heapSize) * (noOfSideChannels)) +
selectedSideChannel];
const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
G1[i] = (v - baseLineG1) * bit_set * (!heap_lost) + baseLineG1;
G0[i] = (v - baseLineG0) * (!bit_set) *(!heap_lost) + baseLineG0;
_G0stats += (!bit_set) *(!heap_lost);
_G1stats += bit_set * (!heap_lost);
baselineUpdateG1 += v * bit_set * (!heap_lost);
baselineUpdateG0 += v * (!bit_set) *(!heap_lost);
}
__shared__ uint32_t x[1024];
// Reduce G0, G1
sum_reduce<uint32_t>(x, _G0stats);
if(threadIdx.x == 0) {
atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]);
}
__syncthreads();
sum_reduce<uint32_t>(x, _G1stats);
if(threadIdx.x == 0) {
atomicAdd(stats_G1, (uint64_cu) x[threadIdx.x]);
}
__syncthreads();
//reuse shared array
float *y = (float*) x;
//update the baseline array
sum_reduce<float>(y, baselineUpdateG0);
if(threadIdx.x == 0) {
atomicAdd(baseLineNG0, y[threadIdx.x]);
}
__syncthreads();
sum_reduce<float>(y, baselineUpdateG1);
if(threadIdx.x == 0) {
atomicAdd(baseLineNG1, y[threadIdx.x]);
}
__syncthreads();
}
// Updates the baselines of the gates for the polarization set for the next
// block
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
__global__ void update_baselines(float* __restrict__ baseLineG0,
float* __restrict__ baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0, uint64_cu* stats_G1,
size_t N)
{
size_t NG0 = 0;
size_t NG1 = 0;
for (size_t i =0; i < N; i++)
{
NG0 += stats_G0[i];
NG1 += stats_G1[i];
}
baseLineG0[0] = baseLineNG0[0] / NG0;
baseLineG1[0] = baseLineNG1[0] / NG1;
baseLineNG0[0] = 0;
baseLineNG1[0] = 0;
}
template <class HandlerType, class InputType>
GatedStokesSpectrometer<HandlerType>::GatedStokesSpectrometer(
const DadaBufferLayout &dadaBufferLayout,
std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
std::size_t nbits, HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout),
_selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
_fft_length(fft_length),
_naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
_call_count(0), _nsamps_per_heap(4096), _processing_efficiency(0.){
// Sanity checks
assert(((_nbits == 12) || (_nbits == 8) || (_nbits == 10)));
assert(_naccumulate > 0);
// check for any device errors
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(info)
<< "Creating new GatedStokesSpectrometer instance with parameters: \n"
<< " fft_length " << _fft_length << "\n"
<< " naccumulate " << _naccumulate << "\n"
<< " nSideChannels " << _dadaBufferLayout.getNSideChannels() << "\n"
<< " speadHeapSize " << _dadaBufferLayout.getHeapSize() << " byte\n"
<< " selectedSideChannel " << _selectedSideChannel << "\n"
<< " selectedBit " << _selectedBit << "\n"
<< " output bit depth " << sizeof(IntegratedPowerType) * 8;
assert((_dadaBufferLayout.getNSideChannels() == 0) ||
(selectedSideChannel < _dadaBufferLayout.getNSideChannels())); // Sanity check of side channel value
assert(selectedBit < 64); // Sanity check of selected bit
_nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
_nsamps_per_output_spectra = fft_length * naccumulate;
int nBlocks;
if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
{ // one buffer block is used for one or multiple output spectra
size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
// All data in one block has to be used
assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
nBlocks = 1;
}
else
{ // multiple blocks are integrated intoone output
size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer;
// All data in multiple blocks has to be used
assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
nBlocks = N;
}
BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";
_nchans = _fft_length / 2 + 1;
int batch = _nsamps_per_buffer / _fft_length;
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
int n[] = {static_cast<int>(_fft_length)};
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
1, _nchans, CUFFT_R2C, batch));
cufftSetStream(_fft_plan, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
polarization0._raw_voltage.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
polarization1._raw_voltage.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
polarization0._sideChannelData.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
polarization1._sideChannelData.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): "
<< polarization0._raw_voltage.size();
_unpacked_voltage_G0.resize(_nsamps_per_buffer);
_unpacked_voltage_G1.resize(_nsamps_per_buffer);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): "
<< _unpacked_voltage_G0.size();
polarization0.resize(_nchans * batch);
polarization1.resize(_nchans * batch);
BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: "
<< polarization0._channelised_voltage_G0.size();
stokes_G0.resize(_nchans, batch / (_naccumulate / nBlocks));
stokes_G1.resize(_nchans, batch / (_naccumulate / nBlocks));
// on the host full output is stored together with sci data in one buffer
_host_power_db.resize( 8 * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)) * batch / (_naccumulate / nBlocks));
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));
_unpacker.reset(new Unpacker(_proc_stream));
} // constructor
template <class HandlerType, class InputType>
GatedStokesSpectrometer<HandlerType>::~GatedStokesSpectrometer() {
BOOST_LOG_TRIVIAL(debug) << "Destroying GatedStokesSpectrometer";
if (!_fft_plan)
cufftDestroy(_fft_plan);
cudaStreamDestroy(_h2d_stream);
cudaStreamDestroy(_proc_stream);
cudaStreamDestroy(_d2h_stream);
}
template <class HandlerType, class InputType>
void GatedStokesSpectrometer<HandlerType>::init(RawBytes &block) {
BOOST_LOG_TRIVIAL(debug) << "GatedStokesSpectrometer init called";
std::stringstream headerInfo;
headerInfo << "\n"
<< "# Gated spectrometer parameters: \n"
<< "fft_length " << _fft_length << "\n"
<< "nchannels " << _fft_length << "\n"
<< "naccumulate " << _naccumulate << "\n"
<< "selected_side_channel " << _selectedSideChannel << "\n"
<< "selected_bit " << _selectedBit << "\n"
<< "output_bit_depth " << sizeof(IntegratedPowerType) * 8;
size_t bEnd = std::strlen(block.ptr());
if (bEnd + headerInfo.str().size() < block.total_bytes())
{
std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
}
else
{
BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
<< " bytes already contains " << bEnd
<< "bytes. Cannot add gated spectrometer info of size "
<< headerInfo.str().size() << " bytes.";
}
_handler.init(block);
}
template <class HandlerType, class InputType>
void GatedStokesSpectrometer<HandlerType>::gated_fft(
PolarizationData &data,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1
)
{
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
switch (_nbits) {
case 8:
_unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0);
break;
case 12:
_unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0);
break;
default:
throw std::runtime_error("Unsupported number of bits");
}
// Loop over outputblocks, for case of multiple output blocks per input block
int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size();
for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
{ // ToDo: Should be in one kernel call
gating<<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap),
thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap),
thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step),
_unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(),
_dadaBufferLayout.getHeapSize(),
_selectedBit,
_dadaBufferLayout.getNSideChannels(),
_selectedSideChannel,
thrust::raw_pointer_cast(data._baseLineG0.data()),
thrust::raw_pointer_cast(data._baseLineG1.data()),
thrust::raw_pointer_cast(data._baseLineG0_update.data()),
thrust::raw_pointer_cast(data._baseLineG1_update.data()),
thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i),
thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i)
);
}
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
update_baselines<<<1,1,0, _proc_stream>>>(
thrust::raw_pointer_cast(data._baseLineG0.data()),
thrust::raw_pointer_cast(data._baseLineG1.data()),
thrust::raw_pointer_cast(data._baseLineG0_update.data()),
thrust::raw_pointer_cast(data._baseLineG1_update.data()),
thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()),
thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()),
_noOfBitSetsIn_G0.size()
);
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
UnpackedVoltageType *_unpacked_voltage_ptr =
thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
ChannelisedVoltageType *_channelised_voltage_ptr =
thrust::raw_pointer_cast(data._channelised_voltage_G0.data());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
(cufftComplex *)_channelised_voltage_ptr));
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
_channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data());