Skip to content
Snippets Groups Projects
Commit 16156245 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Reverted Gated base

parent 0d2c5a10
Branches
Tags
No related merge requests found
...@@ -26,105 +26,20 @@ namespace edd { ...@@ -26,105 +26,20 @@ namespace edd {
typedef unsigned long long int uint64_cu; typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!"); static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
/// Input data and intermediate processing data for one polarization
struct PolarizationData
{
/// Raw ADC Voltage
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
/// Side channel data
DoubleDeviceBuffer<uint64_t> _sideChannelData;
/// Baseline in gate 0 state
thrust::device_vector<UnpackedVoltageType> _baseLineG0;
/// Baseline in gate 1 state
thrust::device_vector<UnpackedVoltageType> _baseLineG1;
/// Baseline in gate 0 state after update
thrust::device_vector<UnpackedVoltageType> _baseLineG0_update;
/// Baseline in gate 1 state after update
thrust::device_vector<UnpackedVoltageType> _baseLineG1_update;
/// Channelized voltage in gate 0 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
/// Channelized voltage in gate 1 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;
/// Swaps input buffers
void swap()
{
_raw_voltage.swap();
_sideChannelData.swap();
}
};
// Output data for one gate
struct StokesOutput
{
/// Stokes parameters
DoubleDeviceBuffer<IntegratedPowerType> I;
DoubleDeviceBuffer<IntegratedPowerType> Q;
DoubleDeviceBuffer<IntegratedPowerType> U;
DoubleDeviceBuffer<IntegratedPowerType> V;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Reset outptu for new integration
void reset(cudaStream_t &_proc_stream)
{
thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
/// Swap output buffers
void swap()
{
I.swap();
Q.swap();
U.swap();
V.swap();
_noOfBitSets.swap();
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
{
I.resize(size * blocks);
Q.resize(size * blocks);
U.resize(size * blocks);
V.resize(size * blocks);
_noOfBitSets.resize(blocks);
}
};
/** /**
@class GatedSpectrometer @class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on @brief Split data into two streams and create integrated spectra depending on
bit set in side channel data. bit set in side channel data.
*/ */
template <class HandlerType> class GatedSpectrometer { template <class HandlerType, typename IntegratedPowerType> class GatedSpectrometer {
public: public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
// typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
public: public:
/** /**
...@@ -175,10 +90,11 @@ public: ...@@ -175,10 +90,11 @@ public:
bool operator()(RawBytes &block); bool operator()(RawBytes &block);
private: private:
// gate the data and fft data per gate void process(thrust::device_vector<RawVoltageType> const &digitiser_raw,
void gated_fft(PolarizationData &data, thrust::device_vector<uint64_t> const &sideChannelData,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, thrust::device_vector<IntegratedPowerType> &detected,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1); thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1);
private: private:
DadaBufferLayout _dadaBufferLayout; DadaBufferLayout _dadaBufferLayout;
...@@ -199,20 +115,26 @@ private: ...@@ -199,20 +115,26 @@ private:
double _processing_efficiency; double _processing_efficiency;
std::unique_ptr<Unpacker> _unpacker; std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector;
// Input data and per pol intermediate data // Input data
PolarizationData polarization0, polarization1; DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<uint64_t> _sideChannelData_db;
// Output data // Output data
StokesOutput stokes_G0, stokes_G1; DoubleDeviceBuffer<IntegratedPowerType> _power_db;
DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G0;
DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G1;
DoublePinnedHostBuffer<char> _host_power_db; DoublePinnedHostBuffer<char> _host_power_db;
// Temporary processing block // Intermediate process steps
// ToDo: Use inplace FFT to avoid temporary coltage array
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0; thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1; thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _baseLineNG0;
thrust::device_vector<UnpackedVoltageType> _baseLineNG1;
cudaStream_t _h2d_stream; cudaStream_t _h2d_stream;
cudaStream_t _proc_stream; cudaStream_t _proc_stream;
...@@ -220,10 +142,11 @@ private: ...@@ -220,10 +142,11 @@ private:
}; };
/** /**
* @brief Splits the input data depending on a bit set into two arrays. * @brief Splits the input data depending on a bit set into two arrays.
* *
* @detail The resulting gaps are filled with a given baseline value in the other stream. * @detail The resulting gaps are filled with zeros in the other stream.
* *
* @param GO Input data. Data is set to the baseline value if corresponding * @param GO Input data. Data is set to the baseline value if corresponding
* sideChannelData bit at bitpos os set. * sideChannelData bit at bitpos os set.
...@@ -247,63 +170,13 @@ private: ...@@ -247,63 +170,13 @@ private:
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos, size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel, size_t noOfSideChannels, size_t selectedSideChannel,
const float* __restrict__ _baseLineG0, const float baseLineG0,
const float* __restrict__ _baseLineG1, const float baseLineG1,
float* __restrict__ baseLineNG0, float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1, float* __restrict__ baseLineNG1,
uint64_cu* stats_G0, uint64_cu* stats_G0,
uint64_cu* stats_G1); uint64_cu* stats_G1);
/**
* @brief calculate stokes IQUV from two complex valuies for each polarization
*/
//__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V);
__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V)
{
I = fabs(p1.x*p1.x + p1.y * p1.y) + fabs(p2.x*p2.x + p2.y * p2.y);
Q = fabs(p1.x*p1.x + p1.y * p1.y) - fabs(p2.x*p2.x + p2.y * p2.y);
U = 2 * (p1.x*p2.x + p1.y * p2.y);
V = -2 * (p1.y*p2.x - p1.x * p2.y);
}
/**
* @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate
* complex spectra for individual polarizations
*/
__global__ void stokes_accumulate(float2 const __restrict__ *pol1,
float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V,
int nchans, int naccumulate)
{
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nchans);
i += blockDim.x * gridDim.x)
{
float rI = 0;
float rQ = 0;
float rU = 0;
float rV = 0;
for (int k=0; k < naccumulate; k++)
{
const float2 p1 = pol1[i + k * nchans];
const float2 p2 = pol2[i + k * nchans];
rI += fabs(p1.x * p1.x + p1.y * p1.y) + fabs(p2.x * p2.x + p2.y * p2.y);
rQ += fabs(p1.x * p1.x + p1.y * p1.y) - fabs(p2.x * p2.x + p2.y * p2.y);
rU += 2.f * (p1.x * p2.x + p1.y * p2.y);
rV += -2.f * (p1.y * p2.x - p1.x * p2.y);
}
I[i] += rI;
Q[i] += rQ;
U[i] += rU;
V[i] += rV;
}
}
......
...@@ -17,9 +17,8 @@ namespace psrdada_cpp { ...@@ -17,9 +17,8 @@ namespace psrdada_cpp {
namespace effelsberg { namespace effelsberg {
namespace edd { namespace edd {
// Reduce thread local vatiable v in shared array x, so that x[0]
template<typename T> template<typename T>
__device__ void sum_reduce(T *x, const T &v) __device__ void reduce(T *x, const T &v)
{ {
x[threadIdx.x] = v; x[threadIdx.x] = v;
__syncthreads(); __syncthreads();
...@@ -29,40 +28,26 @@ __device__ void sum_reduce(T *x, const T &v) ...@@ -29,40 +28,26 @@ __device__ void sum_reduce(T *x, const T &v)
x[threadIdx.x] += x[threadIdx.x + s]; x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads(); __syncthreads();
} }
}
// If one of the side channel items is lsot, then both are considered as lost
// here
__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t* __restrict__ B, size_t N)
{
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x)
{
uint64_t v = A[i] || B[i];
A[i] = v;
B[i] = v;
}
} }
__global__ void gating(float* __restrict__ G0, __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData,
float* __restrict__ G1,
const uint64_t* __restrict__ sideChannelData,
size_t N, size_t heapSize, size_t bitpos, size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel, size_t noOfSideChannels, size_t selectedSideChannel,
const float* __restrict__ _baseLineG0, const float baseLineG0,
const float* __restrict__ _baseLineG1, const float baseLineG1,
float* __restrict__ baseLineNG0, float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1, float* __restrict__ baseLineNG1,
uint64_cu* stats_G0, uint64_cu* stats_G1) { uint64_cu* stats_G0, uint64_cu* stats_G1) {
// float baseLineG0 = (*_baseLineNG0) / N;
// float baseLineG1 = (*_baseLineNG1) / N;
// statistics values for samopels to G0, G1 // statistics values for samopels to G0, G1
uint32_t _G0stats = 0; uint32_t _G0stats = 0;
uint32_t _G1stats = 0; uint32_t _G1stats = 0;
const float baseLineG0 = _baseLineG0[0];
const float baseLineG1 = _baseLineG1[0];
float baselineUpdateG0 = 0; float baselineUpdateG0 = 0;
float baselineUpdateG1 = 0; float baselineUpdateG1 = 0;
...@@ -70,8 +55,12 @@ __global__ void gating(float* __restrict__ G0, ...@@ -70,8 +55,12 @@ __global__ void gating(float* __restrict__ G0,
i += blockDim.x * gridDim.x) { i += blockDim.x * gridDim.x) {
const float v = G0[i]; const float v = G0[i];
const uint64_t sideChannelItem = sideChannelData[((i / heapSize) * (noOfSideChannels)) + const uint64_t sideChannelItem =
selectedSideChannel]; sideChannelData[((i / heapSize) * (noOfSideChannels)) +
selectedSideChannel]; // Probably not optimal access as
// same data is copied for several
// threads, but maybe efficiently
// handled by cache?
const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos); const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63); const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
...@@ -84,72 +73,36 @@ __global__ void gating(float* __restrict__ G0, ...@@ -84,72 +73,36 @@ __global__ void gating(float* __restrict__ G0,
baselineUpdateG1 += v * bit_set * (!heap_lost); baselineUpdateG1 += v * bit_set * (!heap_lost);
baselineUpdateG0 += v * (!bit_set) *(!heap_lost); baselineUpdateG0 += v * (!bit_set) *(!heap_lost);
} }
__shared__ uint32_t x[1024]; __shared__ uint32_t x[1024];
// Reduce G0, G1 // Reduce G0, G1
sum_reduce<uint32_t>(x, _G0stats); reduce<uint32_t>(x, _G0stats);
if(threadIdx.x == 0) { if(threadIdx.x == 0)
atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]); atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]);
}
__syncthreads();
sum_reduce<uint32_t>(x, _G1stats);
if(threadIdx.x == 0) {
atomicAdd(stats_G1, (uint64_cu) x[threadIdx.x]);
}
__syncthreads(); __syncthreads();
reduce<uint32_t>(x, _G1stats);
if(threadIdx.x == 0)
atomicAdd(stats_G1, (uint64_cu) x[threadIdx.x]);
//reuse shared array //reuse shared array
float *y = (float*) x; float *y = (float*) x;
//update the baseline array //update the baseline array
sum_reduce<float>(y, baselineUpdateG0); reduce<float>(y, baselineUpdateG0);
if(threadIdx.x == 0) { if(threadIdx.x == 0)
atomicAdd(baseLineNG0, y[threadIdx.x]); atomicAdd(baseLineNG0, y[threadIdx.x]);
}
__syncthreads();
sum_reduce<float>(y, baselineUpdateG1);
if(threadIdx.x == 0) {
atomicAdd(baseLineNG1, y[threadIdx.x]);
}
__syncthreads(); __syncthreads();
reduce<float>(y, baselineUpdateG1);
if(threadIdx.x == 0)
atomicAdd(baseLineNG1, y[threadIdx.x]);
} }
// Updates the baselines of the gates for the polarization set for the next template <class HandlerType, typename IntegratedPowerType>
// block GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
__global__ void update_baselines(float* __restrict__ baseLineG0,
float* __restrict__ baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0, uint64_cu* stats_G1,
size_t N)
{
size_t NG0 = 0;
size_t NG1 = 0;
for (size_t i =0; i < N; i++)
{
NG0 += stats_G0[i];
NG1 += stats_G1[i];
}
baseLineG0[0] = baseLineNG0[0] / NG0;
baseLineG1[0] = baseLineNG1[0] / NG1;
baseLineNG0[0] = 0;
baseLineNG1[0] = 0;
}
template <class HandlerType>
GatedSpectrometer<HandlerType>::GatedSpectrometer(
const DadaBufferLayout &dadaBufferLayout, const DadaBufferLayout &dadaBufferLayout,
std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate, std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
std::size_t nbits, float input_level, float output_level, std::size_t nbits, float input_level, float output_level,
...@@ -218,38 +171,36 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer( ...@@ -218,38 +171,36 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer(
cufftSetStream(_fft_plan, _proc_stream); cufftSetStream(_fft_plan, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory"; BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
polarization0._raw_voltage.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); _raw_voltage_db.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
polarization1._raw_voltage.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); _sideChannelData_db.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
polarization0._sideChannelData.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
polarization1._sideChannelData.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): "
<< polarization0._raw_voltage.size(); << _raw_voltage_db.size();
_unpacked_voltage_G0.resize(_nsamps_per_buffer); _unpacked_voltage_G0.resize(_nsamps_per_buffer);
_unpacked_voltage_G1.resize(_nsamps_per_buffer); _unpacked_voltage_G1.resize(_nsamps_per_buffer);
polarization0._baseLineG0.resize(1); _baseLineNG0.resize(1);
polarization0._baseLineG0_update.resize(1); _baseLineNG1.resize(1);
polarization0._baseLineG1.resize(1);
polarization0._baseLineG1_update.resize(1);
polarization1._baseLineG0.resize(1);
polarization1._baseLineG0_update.resize(1);
polarization1._baseLineG1.resize(1);
polarization1._baseLineG1_update.resize(1);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): "
<< _unpacked_voltage_G0.size(); << _unpacked_voltage_G0.size();
polarization0._channelised_voltage_G0.resize(_nchans * batch); _channelised_voltage.resize(_nchans * batch);
polarization0._channelised_voltage_G1.resize(_nchans * batch);
polarization1._channelised_voltage_G0.resize(_nchans * batch);
polarization1._channelised_voltage_G1.resize(_nchans * batch);
BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: "
<< polarization0._channelised_voltage_G0.size(); << _channelised_voltage.size();
_power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2); // hold on and off spectra to simplify output
stokes_G0.resize(_nchans, batch / (_naccumulate / nBlocks)); thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
stokes_G1.resize(_nchans, batch / (_naccumulate / nBlocks)); thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2;
// on the host full output is stored together with sci data in one buffer
_host_power_db.resize( 8 * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)) * batch / (_naccumulate / nBlocks)); _noOfBitSetsIn_G0.resize( batch / (_naccumulate / nBlocks));
_noOfBitSetsIn_G1.resize( batch / (_naccumulate / nBlocks));
thrust::fill(_noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
thrust::fill(_noOfBitSetsIn_G0.b().begin(), _noOfBitSetsIn_G0.b().end(), 0L);
thrust::fill(_noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
thrust::fill(_noOfBitSetsIn_G1.b().begin(), _noOfBitSetsIn_G1.b().end(), 0L);
BOOST_LOG_TRIVIAL(debug) << " Bit set counter size: " << _noOfBitSetsIn_G0.size();
// on the host both power are stored in the same data buffer together with
// the number of bit sets
_host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsIn_G0.size());
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream)); CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
...@@ -257,12 +208,13 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer( ...@@ -257,12 +208,13 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer(
CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream)); CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));
_unpacker.reset(new Unpacker(_proc_stream)); _unpacker.reset(new Unpacker(_proc_stream));
_detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling,
offset, _proc_stream));
} // constructor } // constructor
template <class HandlerType, typename IntegratedPowerType>
template <class HandlerType> GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
GatedSpectrometer<HandlerType>::~GatedSpectrometer() {
BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer"; BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
if (!_fft_plan) if (!_fft_plan)
cufftDestroy(_fft_plan); cufftDestroy(_fft_plan);
...@@ -272,9 +224,8 @@ GatedSpectrometer<HandlerType>::~GatedSpectrometer() { ...@@ -272,9 +224,8 @@ GatedSpectrometer<HandlerType>::~GatedSpectrometer() {
} }
template <class HandlerType, typename IntegratedPowerType>
template <class HandlerType> void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
void GatedSpectrometer<HandlerType>::init(RawBytes &block) {
BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called"; BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
std::stringstream headerInfo; std::stringstream headerInfo;
headerInfo << "\n" headerInfo << "\n"
...@@ -303,87 +254,82 @@ void GatedSpectrometer<HandlerType>::init(RawBytes &block) { ...@@ -303,87 +254,82 @@ void GatedSpectrometer<HandlerType>::init(RawBytes &block) {
} }
template <class HandlerType, typename IntegratedPowerType>
template <class HandlerType> void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
void GatedSpectrometer<HandlerType>::gated_fft( thrust::device_vector<RawVoltageType> const &digitiser_raw,
PolarizationData &data, thrust::device_vector<uint64_t> const &sideChannelData,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1) {
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1
)
{
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages"; BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
switch (_nbits) { switch (_nbits) {
case 8: case 8:
_unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0); _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
break; break;
case 12: case 12:
_unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0); _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
break; break;
default: default:
throw std::runtime_error("Unsupported number of bits"); throw std::runtime_error("Unsupported number of bits");
} }
BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
//calculate baseline from previos block
// Loop over outputblocks, for case of multiple output blocks per input block BOOST_LOG_TRIVIAL(debug) << "Perform gating";
int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size();
for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) float baseLineG0 = _baseLineNG0[0];
float baseLineG1 = _baseLineNG1[0];
uint64_t NG0 = 0;
uint64_t NG1 = 0;
// Loop over outputblocks, for case of multiple output blocks per input block
for (size_t i = 0; i < noOfBitSetsIn_G0.size(); i++)
{ // ToDo: Should be in one kernel call { // ToDo: Should be in one kernel call
gating<<<1024, 1024, 0, _proc_stream>>>( gating<<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap), thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap), thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step), thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
_unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(), _unpacked_voltage_G0.size() / noOfBitSetsIn_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
_dadaBufferLayout.getHeapSize(),
_selectedBit,
_dadaBufferLayout.getNSideChannels(),
_selectedSideChannel, _selectedSideChannel,
thrust::raw_pointer_cast(data._baseLineG0.data()), baseLineG0, baseLineG1,
thrust::raw_pointer_cast(data._baseLineG1.data()), thrust::raw_pointer_cast(_baseLineNG0.data()),
thrust::raw_pointer_cast(data._baseLineG0_update.data()), thrust::raw_pointer_cast(_baseLineNG1.data()),
thrust::raw_pointer_cast(data._baseLineG1_update.data()), thrust::raw_pointer_cast(noOfBitSetsIn_G0.data() + i),
thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i), thrust::raw_pointer_cast(noOfBitSetsIn_G1.data() + i)
thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i)
); );
NG0 += noOfBitSetsIn_G0[i];
NG1 += noOfBitSetsIn_G1[i];
} }
_baseLineNG0[0] /= NG0;
_baseLineNG1[0] /= NG1;
BOOST_LOG_TRIVIAL(debug) << "Updating Baselines\n G0: " << baseLineG0 << " -> " << _baseLineNG0[0] << ", " << baseLineG1 << " -> " << _baseLineNG1[0] ;
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
update_baselines<<<1,1,0, _proc_stream>>>(
thrust::raw_pointer_cast(data._baseLineG0.data()),
thrust::raw_pointer_cast(data._baseLineG1.data()),
thrust::raw_pointer_cast(data._baseLineG0_update.data()),
thrust::raw_pointer_cast(data._baseLineG1_update.data()),
thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()),
thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()),
_noOfBitSetsIn_G0.size()
);
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1"; BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
UnpackedVoltageType *_unpacked_voltage_ptr = UnpackedVoltageType *_unpacked_voltage_ptr =
thrust::raw_pointer_cast(_unpacked_voltage_G0.data()); thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
ChannelisedVoltageType *_channelised_voltage_ptr = ChannelisedVoltageType *_channelised_voltage_ptr =
thrust::raw_pointer_cast(data._channelised_voltage_G0.data()); thrust::raw_pointer_cast(_channelised_voltage.data());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
(cufftComplex *)_channelised_voltage_ptr)); (cufftComplex *)_channelised_voltage_ptr));
_detector->detect(_channelised_voltage, detected, 2, 0);
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2"; BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data()); _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
_channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
(cufftComplex *)_channelised_voltage_ptr)); (cufftComplex *)_channelised_voltage_ptr));
_detector->detect(_channelised_voltage, detected, 2, 1);
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
BOOST_LOG_TRIVIAL(debug) << "Exit processing"; BOOST_LOG_TRIVIAL(debug) << "Exit processing";
} // process } // process
template <class HandlerType> template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
++_call_count; ++_call_count;
BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = " BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
<< _call_count << ")"; << _call_count << ")";
if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
// Stop on unexpected buffer size
BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got " BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
<< block.used_bytes() << " byte, expected " << block.used_bytes() << " byte, expected "
<< _dadaBufferLayout.getBufferSize() << " byte)"; << _dadaBufferLayout.getBufferSize() << " byte)";
...@@ -394,104 +340,49 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { ...@@ -394,104 +340,49 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) {
// Copy data to device // Copy data to device
CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream)); CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
polarization0.swap(); _raw_voltage_db.swap();
polarization1.swap(); _sideChannelData_db.swap();
BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes()
block.used_bytes() << ", dataBlockBytes = " << << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n";
_dadaBufferLayout.sizeOfData() << "\n";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
// Copy the data with stride to the GPU:
// CPU: P1P2P1P2P1P2 ...
// GPU: P1P1P1 ... P2P2P2 ...
// If this is a bottleneck the gating kernel could sort the layout out
// during copy
int heapsize_bytes = _nsamps_per_heap * _nbits / 8;
CUDA_ERROR_CHECK(cudaMemcpy2DAsync(
static_cast<void *>(polarization0._raw_voltage.a_ptr()),
heapsize_bytes,
static_cast<void *>(block.ptr()), static_cast<void *>(block.ptr()),
2 * heapsize_bytes, _dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice,
heapsize_bytes, _dadaBufferLayout.sizeOfData() / heapsize_bytes/ 2, _h2d_stream));
cudaMemcpyHostToDevice, _h2d_stream)); CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void *>(_sideChannelData_db.a_ptr()),
CUDA_ERROR_CHECK(cudaMemcpy2DAsync(
static_cast<void *>(polarization1._raw_voltage.a_ptr()),
heapsize_bytes,
static_cast<void *>(block.ptr()) + heapsize_bytes,
2 * heapsize_bytes,
heapsize_bytes, _dadaBufferLayout.sizeOfData() / heapsize_bytes/ 2,
cudaMemcpyHostToDevice, _h2d_stream));
CUDA_ERROR_CHECK(cudaMemcpy2DAsync(
static_cast<void *>(polarization0._sideChannelData.a_ptr()),
sizeof(uint64_t),
static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()), static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
2 * sizeof(uint64_t), _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
sizeof(uint64_t),
_dadaBufferLayout.sizeOfSideChannelData() / 2 / sizeof(uint64_t),
cudaMemcpyHostToDevice, _h2d_stream));
CUDA_ERROR_CHECK(cudaMemcpy2DAsync(
static_cast<void *>(polarization1._sideChannelData.a_ptr()),
sizeof(uint64_t),
static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap() + sizeof(uint64_t)),
2 * sizeof(uint64_t),
sizeof(uint64_t),
_dadaBufferLayout.sizeOfSideChannelData() / 2 / sizeof(uint64_t), cudaMemcpyHostToDevice, _h2d_stream));
BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16) BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16)
<< std::setfill('0') << std::hex << << std::setfill('0') << std::hex <<
(reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData()
+ _dadaBufferLayout.sizeOfGap()))[0] << std::dec; + _dadaBufferLayout.sizeOfGap()))[0] <<
std::dec;
if (_call_count == 1) { if (_call_count == 1) {
return false; return false;
} }
// process data // process data
// check if new outblock is started: _call_count -1 because this is the block number on the device
bool newBlock = (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0);
// only if a newblock is started the output buffer is swapped. Otherwise the // only if a newblock is started the output buffer is swapped. Otherwise the
// new data is added to it // new data is added to it
if (newBlock) bool newBlock = false;
if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device
{ {
BOOST_LOG_TRIVIAL(debug) << "Starting new output block."; BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
stokes_G0.swap(); newBlock = true;
stokes_G1.swap(); _power_db.swap();
stokes_G0.reset(_proc_stream); _noOfBitSetsIn_G0.swap();
stokes_G1.reset(_proc_stream); _noOfBitSetsIn_G1.swap();
// move to specific stream!
thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
} }
mergeSideChannels<<<1024, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(polarization0._sideChannelData.a().data()), process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsIn_G0.a(), _noOfBitSetsIn_G1.a());
thrust::raw_pointer_cast(polarization1._sideChannelData.a().data()), polarization1._sideChannelData.a().size());
gated_fft(polarization0, stokes_G0._noOfBitSets.a(), stokes_G1._noOfBitSets.a());
gated_fft(polarization1, stokes_G0._noOfBitSets.a(), stokes_G1._noOfBitSets.a());
stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(polarization0._channelised_voltage_G0.data()),
thrust::raw_pointer_cast(polarization1._channelised_voltage_G0.data()),
thrust::raw_pointer_cast(stokes_G0.I.a().data()),
thrust::raw_pointer_cast(stokes_G0.Q.a().data()),
thrust::raw_pointer_cast(stokes_G0.U.a().data()),
thrust::raw_pointer_cast(stokes_G0.V.a().data()),
_nchans, _naccumulate
);
stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(polarization0._channelised_voltage_G1.data()),
thrust::raw_pointer_cast(polarization1._channelised_voltage_G1.data()),
thrust::raw_pointer_cast(stokes_G1.I.a().data()),
thrust::raw_pointer_cast(stokes_G1.Q.a().data()),
thrust::raw_pointer_cast(stokes_G1.U.a().data()),
thrust::raw_pointer_cast(stokes_G1.V.a().data()),
_nchans, _naccumulate
);
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
if ((_call_count == 2) || (!newBlock)) { if ((_call_count == 2) || (!newBlock)) {
...@@ -501,104 +392,29 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { ...@@ -501,104 +392,29 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) {
// copy data to host if block is finished // copy data to host if block is finished
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
_host_power_db.swap(); _host_power_db.swap();
// OUTPUT MEMORY LAYOUT:
// I G0, IG1,Q G0, QG1, U G0,UG1,V G0,VG1, 8xSCI, ...
for (size_t i = 0; i < stokes_G0._noOfBitSets.size(); i++) for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
{ {
size_t memslicesize = (_nchans * sizeof(IntegratedPowerType)); size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
size_t memOffset = 8 * i * (memslicesize + + sizeof(size_t)); // copy 2x channel data
// Copy II QQ UU VV
CUDA_ERROR_CHECK( CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) , cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) ,
static_cast<void *>(stokes_G0.I.b_ptr() + i * memslicesize), static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans),
_nchans * sizeof(IntegratedPowerType), 2 * _nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 1 * memslicesize) ,
static_cast<void *>(stokes_G1.I.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * memslicesize) ,
static_cast<void *>(stokes_G0.Q.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 3 * memslicesize) ,
static_cast<void *>(stokes_G1.Q.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 4 * memslicesize) ,
static_cast<void *>(stokes_G0.U.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 5 * memslicesize) ,
static_cast<void *>(stokes_G1.U.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 6 * memslicesize) ,
static_cast<void *>(stokes_G0.V.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 7 * memslicesize) ,
static_cast<void *>(stokes_G1.V.b_ptr() + i * memslicesize),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream)); cudaMemcpyDeviceToHost, _d2h_stream));
// Copy SCI // copy noOf bit set data
CUDA_ERROR_CHECK( CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize), cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)),
static_cast<void *>(stokes_G0._noOfBitSets.b_ptr() + i ), static_cast<void *>(_noOfBitSetsIn_G0.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 1 * sizeof(size_t)),
static_cast<void *>(stokes_G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 2 * sizeof(size_t)),
static_cast<void *>(stokes_G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 3 * sizeof(size_t)),
static_cast<void *>(stokes_G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 4 * sizeof(size_t)),
static_cast<void *>(stokes_G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 5 * sizeof(size_t)),
static_cast<void *>(stokes_G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 6 * sizeof(size_t)),
static_cast<void *>(stokes_G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t), 1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream)); cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK( CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 7 * sizeof(size_t)), cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)),
static_cast<void *>(stokes_G1._noOfBitSets.b_ptr() + i ), static_cast<void *>(_noOfBitSetsIn_G1.b_ptr() + i ),
1 * sizeof(size_t), 1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream)); cudaMemcpyDeviceToHost, _d2h_stream));
} }
BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host"; BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
...@@ -608,28 +424,28 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { ...@@ -608,28 +424,28 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) {
} }
// calculate off value // calculate off value
//BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:"; BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:";
//size_t total_samples_lost = 0; size_t total_samples_lost = 0;
//for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
//{ {
// size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
// size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)); size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
// size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
// size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values); size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
// total_samples_lost += samples_lost; total_samples_lost += samples_lost;
// BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n" BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n"
// <<" Samples with bit set : " << *on_values << std::endl <<" Samples with bit set : " << *on_values << std::endl
// <<" Samples without bit set: " << *off_values << std::endl <<" Samples without bit set: " << *off_values << std::endl
// <<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl; <<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
//} }
//double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size()); double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size());
//double prev_average = _processing_efficiency / (_call_count- 3 - 1); double prev_average = _processing_efficiency / (_call_count- 3 - 1);
//_processing_efficiency += efficiency; _processing_efficiency += efficiency;
//double average = _processing_efficiency / (_call_count-3); double average = _processing_efficiency / (_call_count-3);
//BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")"; BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")";
// Wrap in a RawBytes object here; // Wrap in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()), RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
......
...@@ -25,7 +25,12 @@ const size_t ERROR_UNHANDLED_EXCEPTION = 2; ...@@ -25,7 +25,12 @@ const size_t ERROR_UNHANDLED_EXCEPTION = 2;
template<typename T> template<typename T>
void launchSpectrometer(const effelsberg::edd::DadaBufferLayout &dadaBufferLayout, const std::string &output_type, const std::string &filename, size_t selectedSideChannel, size_t selectedBit, size_t fft_length, size_t naccumulate, unsigned int nbits, float input_level, float output_level) void launchSpectrometer(const effelsberg::edd::DadaBufferLayout
&dadaBufferLayout, const std::string &output_type,
const std::string &filename,
size_t selectedSideChannel, size_t selectedBit,
size_t fft_length, size_t naccumulate, unsigned int nbits,
float input_level, float output_level)
{ {
MultiLog log("DadaBufferLayout"); MultiLog log("DadaBufferLayout");
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include "thrust/device_vector.h" #include "thrust/device_vector.h"
#include "thrust/extrema.h" #include "thrust/extrema.h"
namespace {
TEST(GatedSpectrometer, BitManipulationMacros) { TEST(GatedSpectrometer, BitManipulationMacros) {
for (int i = 0; i < 64; i++) { for (int i = 0; i < 64; i++) {
...@@ -22,121 +23,31 @@ TEST(GatedSpectrometer, BitManipulationMacros) { ...@@ -22,121 +23,31 @@ TEST(GatedSpectrometer, BitManipulationMacros) {
} }
} }
//
TEST(GatedSpectrometer, stokes_IQUV) //TEST(GatedSpectrometer, ParameterSanity) {
{ // ::testing::FLAGS_gtest_death_test_style = "threadsafe";
float I,Q,U,V; // psrdada_cpp::NullSink sink;
// No field //
psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); // // 8 or 12 bit sampling
EXPECT_FLOAT_EQ(I, 0); // EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 0, 0, 0, 4096, 0, 0, 0, 0, 0, sink),
EXPECT_FLOAT_EQ(Q, 0); // "_nbits == 8");
EXPECT_FLOAT_EQ(U, 0); // // naccumulate > 0
EXPECT_FLOAT_EQ(V, 0); // EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 0, 0, 0, 4096, 0, 0, 8, 0, 0, sink),
// "_naccumulate");
// For p1 = Ex, p2 = Ey //
// horizontal polarized // // selected side channel
psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); // EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 1, 2, 0, 4096, 0, 1, 8, 0, 0, sink),
EXPECT_FLOAT_EQ(I, 1); // "nSideChannels");
EXPECT_FLOAT_EQ(Q, 1); //
EXPECT_FLOAT_EQ(U, 0); // // selected bit
EXPECT_FLOAT_EQ(V, 0); // EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 2, 1, 65, 4096, 0, 1, 8, 0, 0, sink),
// "selectedBit");
// vertical polarized //
psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){1.0f,0.0f}, I, Q, U, V); // // valid construction
EXPECT_FLOAT_EQ(I, 1); // psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink), int8_t> a(
EXPECT_FLOAT_EQ(Q, -1); // 4096 * 4096, 2, 1, 63, 4096, 1024, 1, 8, 100., 100., sink);
EXPECT_FLOAT_EQ(U, 0); //}
EXPECT_FLOAT_EQ(V, 0); } // namespace
//linear +45 deg.
psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V);
EXPECT_FLOAT_EQ(I, 1);
EXPECT_FLOAT_EQ(Q, 0);
EXPECT_FLOAT_EQ(U, 1);
EXPECT_FLOAT_EQ(V, 0);
//linear -45 deg.
psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){-1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V);
EXPECT_FLOAT_EQ(I, 1);
EXPECT_FLOAT_EQ(Q, 0);
EXPECT_FLOAT_EQ(U, -1);
EXPECT_FLOAT_EQ(V, 0);
//left circular
psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V);
EXPECT_FLOAT_EQ(I, 1);
EXPECT_FLOAT_EQ(Q, 0);
EXPECT_FLOAT_EQ(U, 0);
EXPECT_FLOAT_EQ(V, -1);
// right circular
psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,-1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V);
EXPECT_FLOAT_EQ(I, 1);
EXPECT_FLOAT_EQ(Q, 0);
EXPECT_FLOAT_EQ(U, 0);
EXPECT_FLOAT_EQ(V, 1);
}
TEST(GatedSpectrometer, stokes_accumulate)
{
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
size_t nchans = 8 * 1024 * 1024 + 1;
size_t naccumulate = 5;
thrust::device_vector<float2> P0(nchans * naccumulate);
thrust::device_vector<float2> P1(nchans * naccumulate);
thrust::fill(P0.begin(), P0.end(), (float2){0, 0});
thrust::fill(P1.begin(), P1.end(), (float2){0, 0});
thrust::device_vector<float> I(nchans);
thrust::device_vector<float> Q(nchans);
thrust::device_vector<float> U(nchans);
thrust::device_vector<float> V(nchans);
thrust::fill(I.begin(), I.end(), 0);
thrust::fill(Q.begin(), Q.end(), 0);
thrust::fill(U.begin(), U.end(), 0);
thrust::fill(V.begin(), V.end(), 0);
// This channel should be left circular polarized
size_t idx0 = 23;
for (int k = 0; k< naccumulate; k++)
{
size_t idx = idx0 + k * nchans;
P0[idx] = (float2){0.0f, 1.0f/std::sqrt(2)};
P1[idx] = (float2){1.0f/std::sqrt(2),0.0f};
}
psrdada_cpp::effelsberg::edd::stokes_accumulate<<<1024, 1024>>>(
thrust::raw_pointer_cast(P0.data()),
thrust::raw_pointer_cast(P1.data()),
thrust::raw_pointer_cast(I.data()),
thrust::raw_pointer_cast(Q.data()),
thrust::raw_pointer_cast(U.data()),
thrust::raw_pointer_cast(V.data()),
nchans,
naccumulate
);
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax;
minmax = thrust::minmax_element(I.begin(), I.end());
EXPECT_FLOAT_EQ(*minmax.first, 0);
EXPECT_FLOAT_EQ(*minmax.second, naccumulate);
minmax = thrust::minmax_element(Q.begin(), Q.end());
EXPECT_FLOAT_EQ(*minmax.first, 0);
EXPECT_FLOAT_EQ(*minmax.second, 0);
minmax = thrust::minmax_element(U.begin(), U.end());
EXPECT_FLOAT_EQ(*minmax.first, 0);
EXPECT_FLOAT_EQ(*minmax.second, 0);
minmax = thrust::minmax_element(V.begin(), V.end());
EXPECT_FLOAT_EQ(*minmax.first, -1. * naccumulate);
EXPECT_FLOAT_EQ(*minmax.second, 0);
}
TEST(GatedSpectrometer, GatingKernel) TEST(GatedSpectrometer, GatingKernel)
...@@ -152,8 +63,6 @@ TEST(GatedSpectrometer, GatingKernel) ...@@ -152,8 +63,6 @@ TEST(GatedSpectrometer, GatingKernel)
thrust::device_vector<float> baseLineG0(1); thrust::device_vector<float> baseLineG0(1);
thrust::device_vector<float> baseLineG1(1); thrust::device_vector<float> baseLineG1(1);
thrust::device_vector<float> baseLineG0_update(1);
thrust::device_vector<float> baseLineG1_update(1);
thrust::fill(G0.begin(), G0.end(), 42); thrust::fill(G0.begin(), G0.end(), 42);
thrust::fill(G1.begin(), G1.end(), 23); thrust::fill(G1.begin(), G1.end(), 23);
thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0); thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0);
...@@ -162,22 +71,18 @@ TEST(GatedSpectrometer, GatingKernel) ...@@ -162,22 +71,18 @@ TEST(GatedSpectrometer, GatingKernel)
{ {
thrust::fill(_nG0.begin(), _nG0.end(), 0); thrust::fill(_nG0.begin(), _nG0.end(), 0);
thrust::fill(_nG1.begin(), _nG1.end(), 0); thrust::fill(_nG1.begin(), _nG1.end(), 0);
baseLineG0[0] = -3; baseLineG0[0] = 0.;
baseLineG1[0] = -4; baseLineG1[0] = 0.;
baseLineG0_update[0] = 0;
baseLineG1_update[0] = 0;
const uint64_t *sideCD = const uint64_t *sideCD =
(uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); (uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data()));
psrdada_cpp::effelsberg::edd::gating<<<1024 , 1024>>>( psrdada_cpp::effelsberg::edd::gating<<<1024 , 1024>>>(
thrust::raw_pointer_cast(G0.data()), thrust::raw_pointer_cast(G0.data()),
thrust::raw_pointer_cast(G1.data()), sideCD, thrust::raw_pointer_cast(G1.data()), sideCD,
G0.size(), blockSize, 0, 1, G0.size(), G0.size(), 0, 1,
0, 0,
-3., -4,
thrust::raw_pointer_cast(baseLineG0.data()), thrust::raw_pointer_cast(baseLineG0.data()),
thrust::raw_pointer_cast(baseLineG1.data()), thrust::raw_pointer_cast(baseLineG1.data()),
thrust::raw_pointer_cast(baseLineG0_update.data()),
thrust::raw_pointer_cast(baseLineG1_update.data()),
thrust::raw_pointer_cast(_nG0.data()), thrust::raw_pointer_cast(_nG0.data()),
thrust::raw_pointer_cast(_nG1.data()) thrust::raw_pointer_cast(_nG1.data())
); );
...@@ -194,31 +99,27 @@ TEST(GatedSpectrometer, GatingKernel) ...@@ -194,31 +99,27 @@ TEST(GatedSpectrometer, GatingKernel)
EXPECT_EQ(_nG0[0], G0.size()); EXPECT_EQ(_nG0[0], G0.size());
EXPECT_EQ(_nG1[0], 0u); EXPECT_EQ(_nG1[0], 0u);
EXPECT_FLOAT_EQ(42.f, baseLineG0_update[0] / (_nG0[0] + 1E-121)); EXPECT_FLOAT_EQ(baseLineG0[0] / (_nG0[0] + 1E-127), 42.f);
EXPECT_FLOAT_EQ(0.f, baseLineG1_update[0] / (_nG1[0] + 1E-121)); EXPECT_FLOAT_EQ(baseLineG1[0] / (_nG1[0] + 1E-127), 0.f);
} }
// everything to G1 // with baseline -5 // everything to G1 // with baseline -5
{ {
thrust::fill(_nG0.begin(), _nG0.end(), 0); thrust::fill(_nG0.begin(), _nG0.end(), 0);
thrust::fill(_nG1.begin(), _nG1.end(), 0); thrust::fill(_nG1.begin(), _nG1.end(), 0);
baseLineG0[0] = 5.; baseLineG0[0] = 0.;
baseLineG1[0] = -2; baseLineG1[0] = 0.;
baseLineG0_update[0] = 0;
baseLineG1_update[0] = 0;
thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L); thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L);
const uint64_t *sideCD = const uint64_t *sideCD =
(uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); (uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data()));
psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>( psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>(
thrust::raw_pointer_cast(G0.data()), thrust::raw_pointer_cast(G0.data()),
thrust::raw_pointer_cast(G1.data()), sideCD, thrust::raw_pointer_cast(G1.data()), sideCD,
G0.size(), blockSize, 0, 1, G0.size(), G0.size(), 0, 1,
0, 0,
5., -2.,
thrust::raw_pointer_cast(baseLineG0.data()), thrust::raw_pointer_cast(baseLineG0.data()),
thrust::raw_pointer_cast(baseLineG1.data()), thrust::raw_pointer_cast(baseLineG1.data()),
thrust::raw_pointer_cast(baseLineG0_update.data()),
thrust::raw_pointer_cast(baseLineG1_update.data()),
thrust::raw_pointer_cast(_nG0.data()), thrust::raw_pointer_cast(_nG0.data()),
thrust::raw_pointer_cast(_nG1.data()) thrust::raw_pointer_cast(_nG1.data())
); );
...@@ -233,12 +134,13 @@ TEST(GatedSpectrometer, GatingKernel) ...@@ -233,12 +134,13 @@ TEST(GatedSpectrometer, GatingKernel)
EXPECT_EQ(_nG0[0], 0u); EXPECT_EQ(_nG0[0], 0u);
EXPECT_EQ(_nG1[0], G1.size()); EXPECT_EQ(_nG1[0], G1.size());
EXPECT_FLOAT_EQ(baseLineG0[0] / (_nG0[0] + 1E-127), 0.);
EXPECT_FLOAT_EQ(0.f, baseLineG0_update[0] / (_nG0[0] + 1E-121)); EXPECT_FLOAT_EQ(baseLineG1[0] / (_nG1[0] + 1E-127), 42.);
EXPECT_FLOAT_EQ(42.f, baseLineG1_update[0] / (_nG1[0] + 1E-121));
} }
} }
TEST(GatedSpectrometer, array_sum) { TEST(GatedSpectrometer, array_sum) {
const size_t NBLOCKS = 16 * 32; const size_t NBLOCKS = 16 * 32;
...@@ -261,3 +163,10 @@ TEST(GatedSpectrometer, array_sum) { ...@@ -261,3 +163,10 @@ TEST(GatedSpectrometer, array_sum) {
EXPECT_EQ(size_t(blr[0]), inputLength); EXPECT_EQ(size_t(blr[0]), inputLength);
} }
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment