diff --git a/psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh new file mode 100644 index 0000000000000000000000000000000000000000..a3b160c5f8a5629acb21a866086e2fc0a59d5a7c --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh @@ -0,0 +1,315 @@ +#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP +#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP + +#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh" +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/cuda_utils.hpp" +#include "psrdada_cpp/double_device_buffer.cuh" +#include "psrdada_cpp/double_host_buffer.cuh" +#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh" +#include "psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp" + +#include "thrust/device_vector.h" +#include "cufft.h" + +#include "cublas_v2.h" + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { + + +#define BIT_MASK(bit) (1uL << (bit)) +#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit)) +#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit)) +#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0) + +typedef unsigned long long int uint64_cu; +static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!"); + +typedef uint64_t RawVoltageType; +typedef float UnpackedVoltageType; +typedef float2 ChannelisedVoltageType; + +typedef float IntegratedPowerType; +//typedef int8_t IntegratedPowerType; + +/// Input data and intermediate processing data for one polarization +struct PolarizationData +{ + /// Raw ADC Voltage + DoubleDeviceBuffer<RawVoltageType> _raw_voltage; + /// Side channel data + DoubleDeviceBuffer<uint64_t> _sideChannelData; + + /// Baseline in gate 0 state + thrust::device_vector<UnpackedVoltageType> _baseLineG0; + /// Baseline in gate 1 state + thrust::device_vector<UnpackedVoltageType> _baseLineG1; + + /// Baseline in gate 0 state after update + thrust::device_vector<UnpackedVoltageType> _baseLineG0_update; + /// Baseline in gate 1 state after update + thrust::device_vector<UnpackedVoltageType> _baseLineG1_update; + + /// Channelized voltage in gate 0 state + thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0; + /// Channelized voltage in gate 1 state + thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1; + + /// Swaps input buffers + void swap() + { + _raw_voltage.swap(); + _sideChannelData.swap(); + } +}; + + +// Output data for one gate +struct StokesOutput +{ + /// Stokes parameters + DoubleDeviceBuffer<IntegratedPowerType> I; + DoubleDeviceBuffer<IntegratedPowerType> Q; + DoubleDeviceBuffer<IntegratedPowerType> U; + DoubleDeviceBuffer<IntegratedPowerType> V; + + /// Number of samples integrated in this output block + DoubleDeviceBuffer<uint64_cu> _noOfBitSets; + + /// Reset outptu for new integration + void reset(cudaStream_t &_proc_stream) + { + thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L); + } + + /// Swap output buffers + void swap() + { + I.swap(); + Q.swap(); + U.swap(); + V.swap(); + _noOfBitSets.swap(); + } + + /// Resize all buffers + void resize(size_t size, size_t blocks) + { + I.resize(size * blocks); + Q.resize(size * blocks); + U.resize(size * blocks); + V.resize(size * blocks); + _noOfBitSets.resize(blocks); + } +}; + + + + + + + +/** + @class GatedStokesSpectrometer + @brief Split data into two streams and create integrated spectra depending on + bit set in side channel data. + + */ +template <class HandlerType> class GatedStokesSpectrometer { +public: + + + +public: + /** + * @brief Constructor + * + * @param buffer_bytes A RawBytes object wrapping a DADA header buffer + * @param nSideChannels Number of side channel items in the data stream, + * @param selectedSideChannel Side channel item used for gating + * @param selectedBit bit of side channel item used for gating + * @param speadHeapSize Size of the spead heap block. + * @param fftLength Size of the FFT + * @param naccumulate Number of samples to integrate in the individual + * FFT bins + * @param nbits Bit depth of the sampled signal + * @param input_level Normalization level of the input signal + * @param output_level Normalization level of the output signal + * @param handler Output handler + * + */ + GatedStokesSpectrometer(const DadaBufferLayout &bufferLayout, + std::size_t selectedSideChannel, std::size_t selectedBit, + std::size_t fft_length, + std::size_t naccumulate, std::size_t nbits, + float input_level, float output_level, + HandlerType &handler); + ~GatedStokesSpectrometer(); + + /** + * @brief A callback to be called on connection + * to a ring buffer. + * + * @detail The first available header block in the + * in the ring buffer is provided as an argument. + * It is here that header parameters could be read + * if desired. + * + * @param block A RawBytes object wrapping a DADA header buffer + */ + void init(RawBytes &block); + + /** + * @brief A callback to be called on acqusition of a new + * data block. + * + * @param block A RawBytes object wrapping a DADA data buffer output + * are the integrated specttra with/without bit set. + */ + bool operator()(RawBytes &block); + +private: + // gate the data and fft data per gate + void gated_fft(PolarizationData &data, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1); + +private: + DadaBufferLayout _dadaBufferLayout; + std::size_t _fft_length; + std::size_t _naccumulate; + std::size_t _nbits; + std::size_t _selectedSideChannel; + std::size_t _selectedBit; + std::size_t _batch; + std::size_t _nsamps_per_output_spectra; + std::size_t _nsamps_per_buffer; + std::size_t _nsamps_per_heap; + + HandlerType &_handler; + cufftHandle _fft_plan; + uint64_t _nchans; + uint64_t _call_count; + double _processing_efficiency; + + std::unique_ptr<Unpacker> _unpacker; + + // Input data and per pol intermediate data + PolarizationData polarization0, polarization1; + + // Output data + StokesOutput stokes_G0, stokes_G1; + + DoublePinnedHostBuffer<char> _host_power_db; + + // Temporary processing block + // ToDo: Use inplace FFT to avoid temporary coltage array + thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0; + thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1; + + + cudaStream_t _h2d_stream; + cudaStream_t _proc_stream; + cudaStream_t _d2h_stream; +}; + + +/** + * @brief Splits the input data depending on a bit set into two arrays. + * + * @detail The resulting gaps are filled with a given baseline value in the other stream. + * + * @param GO Input data. Data is set to the baseline value if corresponding + * sideChannelData bit at bitpos os set. + * @param G1 Data in this array is set to the baseline value if corresponding + * sideChannelData bit at bitpos is not set. + * @param sideChannelData noOfSideChannels items per block of heapSize + * bytes in the input data. + * @param N lebgth of the input/output arrays G0. + * @param heapsize Size of the blocks for which there is an entry in the + sideChannelData. + * @param bitpos Position of the bit to evaluate for processing. + * @param noOfSideChannels Number of side channels items per block of + * data. + * @param selectedSideChannel No. of side channel item to be eveluated. + 0 <= selectedSideChannel < noOfSideChannels. + * @param stats_G0 No. of sampels contributing to G0, accounting also + * for loat heaps + * @param stats_G1 No. of sampels contributing to G1, accounting also + * for loat heaps + */ +__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, + size_t N, size_t heapSize, size_t bitpos, + size_t noOfSideChannels, size_t selectedSideChannel, + const float* __restrict__ _baseLineG0, + const float* __restrict__ _baseLineG1, + float* __restrict__ baseLineNG0, + float* __restrict__ baseLineNG1, + uint64_cu* stats_G0, + uint64_cu* stats_G1); + +/** + * @brief calculate stokes IQUV from two complex valuies for each polarization + */ +//__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V); +__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V) +{ + I = fabs(p1.x*p1.x + p1.y * p1.y) + fabs(p2.x*p2.x + p2.y * p2.y); + Q = fabs(p1.x*p1.x + p1.y * p1.y) - fabs(p2.x*p2.x + p2.y * p2.y); + U = 2 * (p1.x*p2.x + p1.y * p2.y); + V = -2 * (p1.y*p2.x - p1.x * p2.y); +} + + + + +/** + * @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate + * complex spectra for individual polarizations + */ +__global__ void stokes_accumulate(float2 const __restrict__ *pol1, + float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V, + int nchans, int naccumulate) +{ + + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nchans); + i += blockDim.x * gridDim.x) + { + float rI = 0; + float rQ = 0; + float rU = 0; + float rV = 0; + + for (int k=0; k < naccumulate; k++) + { + const float2 p1 = pol1[i + k * nchans]; + const float2 p2 = pol2[i + k * nchans]; + + rI += fabs(p1.x * p1.x + p1.y * p1.y) + fabs(p2.x * p2.x + p2.y * p2.y); + rQ += fabs(p1.x * p1.x + p1.y * p1.y) - fabs(p2.x * p2.x + p2.y * p2.y); + rU += 2.f * (p1.x * p2.x + p1.y * p2.y); + rV += -2.f * (p1.y * p2.x - p1.x * p2.y); + } + I[i] += rI; + Q[i] += rQ; + U[i] += rU; + V[i] += rV; + } + +} + + + + +} // edd +} // effelsberg +} // psrdada_cpp + +#include "psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu" +#endif //PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu new file mode 100644 index 0000000000000000000000000000000000000000..c16d19360ddd0a2862ce82684dda9fc9311b3d19 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu @@ -0,0 +1,650 @@ +#include "psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh" +#include "psrdada_cpp/effelsberg/edd/Tools.cuh" +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/cuda_utils.hpp" +#include "psrdada_cpp/raw_bytes.hpp" + +#include <cuda.h> +#include <cuda_profiler_api.h> +#include <thrust/system/cuda/execution_policy.h> + +#include <iostream> +#include <iomanip> +#include <cstring> +#include <sstream> + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { + +// Reduce thread local vatiable v in shared array x, so that x[0] +template<typename T> +__device__ void sum_reduce(T *x, const T &v) +{ + x[threadIdx.x] = v; + __syncthreads(); + for(int s = blockDim.x / 2; s > 0; s = s / 2) + { + if (threadIdx.x < s) + x[threadIdx.x] += x[threadIdx.x + s]; + __syncthreads(); + } +} + + +// If one of the side channel items is lsot, then both are considered as lost +// here +__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t* __restrict__ B, size_t N) +{ + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + i += blockDim.x * gridDim.x) + { + uint64_t v = A[i] || B[i]; + A[i] = v; + B[i] = v; + } +} + + +__global__ void gating(float* __restrict__ G0, + float* __restrict__ G1, + const uint64_t* __restrict__ sideChannelData, + size_t N, size_t heapSize, size_t bitpos, + size_t noOfSideChannels, size_t selectedSideChannel, + const float* __restrict__ _baseLineG0, + const float* __restrict__ _baseLineG1, + float* __restrict__ baseLineNG0, + float* __restrict__ baseLineNG1, + uint64_cu* stats_G0, uint64_cu* stats_G1) { + // statistics values for samopels to G0, G1 + uint32_t _G0stats = 0; + uint32_t _G1stats = 0; + + const float baseLineG0 = _baseLineG0[0]; + const float baseLineG1 = _baseLineG1[0]; + + float baselineUpdateG0 = 0; + float baselineUpdateG1 = 0; + + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + i += blockDim.x * gridDim.x) { + const float v = G0[i]; + + const uint64_t sideChannelItem = sideChannelData[((i / heapSize) * (noOfSideChannels)) + + selectedSideChannel]; + + const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos); + const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63); + G1[i] = (v - baseLineG1) * bit_set * (!heap_lost) + baseLineG1; + G0[i] = (v - baseLineG0) * (!bit_set) *(!heap_lost) + baseLineG0; + + _G0stats += (!bit_set) *(!heap_lost); + _G1stats += bit_set * (!heap_lost); + + baselineUpdateG1 += v * bit_set * (!heap_lost); + baselineUpdateG0 += v * (!bit_set) *(!heap_lost); + } + + __shared__ uint32_t x[1024]; + + // Reduce G0, G1 + sum_reduce<uint32_t>(x, _G0stats); + if(threadIdx.x == 0) { + atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]); + } + __syncthreads(); + + sum_reduce<uint32_t>(x, _G1stats); + if(threadIdx.x == 0) { + atomicAdd(stats_G1, (uint64_cu) x[threadIdx.x]); + } + __syncthreads(); + + //reuse shared array + float *y = (float*) x; + //update the baseline array + sum_reduce<float>(y, baselineUpdateG0); + if(threadIdx.x == 0) { + atomicAdd(baseLineNG0, y[threadIdx.x]); + } + __syncthreads(); + + sum_reduce<float>(y, baselineUpdateG1); + if(threadIdx.x == 0) { + atomicAdd(baseLineNG1, y[threadIdx.x]); + } + __syncthreads(); +} + + + +// Updates the baselines of the gates for the polarization set for the next +// block +// only few output blocks per input block thus execution on only one thread. +// Important is that the execution is async on the GPU. +__global__ void update_baselines(float* __restrict__ baseLineG0, + float* __restrict__ baseLineG1, + float* __restrict__ baseLineNG0, + float* __restrict__ baseLineNG1, + uint64_cu* stats_G0, uint64_cu* stats_G1, + size_t N) +{ + size_t NG0 = 0; + size_t NG1 = 0; + + for (size_t i =0; i < N; i++) + { + NG0 += stats_G0[i]; + NG1 += stats_G1[i]; + } + + baseLineG0[0] = baseLineNG0[0] / NG0; + baseLineG1[0] = baseLineNG1[0] / NG1; + baseLineNG0[0] = 0; + baseLineNG1[0] = 0; +} + + + + + +template <class HandlerType> +GatedStokesSpectrometer<HandlerType>::GatedStokesSpectrometer( + const DadaBufferLayout &dadaBufferLayout, + std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate, + std::size_t nbits, float input_level, float output_level, + HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout), + _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit), + _fft_length(fft_length), + _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0), + _call_count(0), _nsamps_per_heap(4096), _processing_efficiency(0.){ + + // Sanity checks + assert(((_nbits == 12) || (_nbits == 8))); + assert(_naccumulate > 0); + + // check for any device errors + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + + BOOST_LOG_TRIVIAL(info) + << "Creating new GatedStokesSpectrometer instance with parameters: \n" + << " fft_length " << _fft_length << "\n" + << " naccumulate " << _naccumulate << "\n" + << " nSideChannels " << _dadaBufferLayout.getNSideChannels() << "\n" + << " speadHeapSize " << _dadaBufferLayout.getHeapSize() << " byte\n" + << " selectedSideChannel " << _selectedSideChannel << "\n" + << " selectedBit " << _selectedBit << "\n" + << " output bit depth " << sizeof(IntegratedPowerType) * 8; + + assert((_dadaBufferLayout.getNSideChannels() == 0) || + (selectedSideChannel < _dadaBufferLayout.getNSideChannels())); // Sanity check of side channel value + assert(selectedBit < 64); // Sanity check of selected bit + + _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits; + + _nsamps_per_output_spectra = fft_length * naccumulate; + int nBlocks; + if (_nsamps_per_output_spectra <= _nsamps_per_buffer) + { // one buffer block is used for one or multiple output spectra + size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra; + // All data in one block has to be used + assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer); + nBlocks = 1; + } + else + { // multiple blocks are integrated intoone output + size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer; + // All data in multiple blocks has to be used + assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra); + nBlocks = N; + } + BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra."; + + _nchans = _fft_length / 2 + 1; + int batch = _nsamps_per_buffer / _fft_length; + float dof = 2 * _naccumulate; + float scale = + std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2); + float offset = scale * dof; + float scaling = scale * std::sqrt(2 * dof) / output_level; + BOOST_LOG_TRIVIAL(debug) + << "Correction factors for 8-bit conversion: offset = " << offset + << ", scaling = " << scaling; + + BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan"; + int n[] = {static_cast<int>(_fft_length)}; + CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL, + 1, _nchans, CUFFT_R2C, batch)); + cufftSetStream(_fft_plan, _proc_stream); + + BOOST_LOG_TRIVIAL(debug) << "Allocating memory"; + polarization0._raw_voltage.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); + polarization1._raw_voltage.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); + polarization0._sideChannelData.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps()); + polarization1._sideChannelData.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps()); + BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " + << polarization0._raw_voltage.size(); + _unpacked_voltage_G0.resize(_nsamps_per_buffer); + _unpacked_voltage_G1.resize(_nsamps_per_buffer); + + polarization0._baseLineG0.resize(1); + polarization0._baseLineG0_update.resize(1); + polarization0._baseLineG1.resize(1); + polarization0._baseLineG1_update.resize(1); + polarization1._baseLineG0.resize(1); + polarization1._baseLineG0_update.resize(1); + polarization1._baseLineG1.resize(1); + polarization1._baseLineG1_update.resize(1); + + BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " + << _unpacked_voltage_G0.size(); + polarization0._channelised_voltage_G0.resize(_nchans * batch); + polarization0._channelised_voltage_G1.resize(_nchans * batch); + polarization1._channelised_voltage_G0.resize(_nchans * batch); + polarization1._channelised_voltage_G1.resize(_nchans * batch); + BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " + << polarization0._channelised_voltage_G0.size(); + + stokes_G0.resize(_nchans, batch / (_naccumulate / nBlocks)); + stokes_G1.resize(_nchans, batch / (_naccumulate / nBlocks)); + + // on the host full output is stored together with sci data in one buffer + _host_power_db.resize( 8 * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)) * batch / (_naccumulate / nBlocks)); + + CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); + CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream)); + CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream)); + CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream)); + + _unpacker.reset(new Unpacker(_proc_stream)); +} // constructor + + + +template <class HandlerType> +GatedStokesSpectrometer<HandlerType>::~GatedStokesSpectrometer() { + BOOST_LOG_TRIVIAL(debug) << "Destroying GatedStokesSpectrometer"; + if (!_fft_plan) + cufftDestroy(_fft_plan); + cudaStreamDestroy(_h2d_stream); + cudaStreamDestroy(_proc_stream); + cudaStreamDestroy(_d2h_stream); +} + + + +template <class HandlerType> +void GatedStokesSpectrometer<HandlerType>::init(RawBytes &block) { + BOOST_LOG_TRIVIAL(debug) << "GatedStokesSpectrometer init called"; + std::stringstream headerInfo; + headerInfo << "\n" + << "# Gated spectrometer parameters: \n" + << "fft_length " << _fft_length << "\n" + << "nchannels " << _fft_length << "\n" + << "naccumulate " << _naccumulate << "\n" + << "selected_side_channel " << _selectedSideChannel << "\n" + << "selected_bit " << _selectedBit << "\n" + << "output_bit_depth " << sizeof(IntegratedPowerType) * 8; + + size_t bEnd = std::strlen(block.ptr()); + if (bEnd + headerInfo.str().size() < block.total_bytes()) + { + std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str()); + } + else + { + BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes() + << " bytes already contains " << bEnd + << "bytes. Cannot add gated spectrometer info of size " + << headerInfo.str().size() << " bytes."; + } + + _handler.init(block); +} + + + +template <class HandlerType> +void GatedStokesSpectrometer<HandlerType>::gated_fft( + PolarizationData &data, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1 + ) +{ + BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages"; + switch (_nbits) { + case 8: + _unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0); + break; + case 12: + _unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0); + break; + default: + throw std::runtime_error("Unsupported number of bits"); + } + + // Loop over outputblocks, for case of multiple output blocks per input block + int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size(); + + for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) + { // ToDo: Should be in one kernel call + gating<<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap), + thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap), + thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step), + _unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(), + _dadaBufferLayout.getHeapSize(), + _selectedBit, + _dadaBufferLayout.getNSideChannels(), + _selectedSideChannel, + thrust::raw_pointer_cast(data._baseLineG0.data()), + thrust::raw_pointer_cast(data._baseLineG1.data()), + thrust::raw_pointer_cast(data._baseLineG0_update.data()), + thrust::raw_pointer_cast(data._baseLineG1_update.data()), + thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i), + thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i) + ); + } + + // only few output blocks per input block thus execution on only one thread. + // Important is that the execution is async on the GPU. + update_baselines<<<1,1,0, _proc_stream>>>( + thrust::raw_pointer_cast(data._baseLineG0.data()), + thrust::raw_pointer_cast(data._baseLineG1.data()), + thrust::raw_pointer_cast(data._baseLineG0_update.data()), + thrust::raw_pointer_cast(data._baseLineG1_update.data()), + thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()), + thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()), + _noOfBitSetsIn_G0.size() + ); + + BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1"; + UnpackedVoltageType *_unpacked_voltage_ptr = + thrust::raw_pointer_cast(_unpacked_voltage_G0.data()); + ChannelisedVoltageType *_channelised_voltage_ptr = + thrust::raw_pointer_cast(data._channelised_voltage_G0.data()); + CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, + (cufftComplex *)_channelised_voltage_ptr)); + + BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2"; + _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data()); + _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data()); + CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, + (cufftComplex *)_channelised_voltage_ptr)); + + CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); + BOOST_LOG_TRIVIAL(debug) << "Exit processing"; +} // process + + +template <class HandlerType> +bool GatedStokesSpectrometer<HandlerType>::operator()(RawBytes &block) { + ++_call_count; + BOOST_LOG_TRIVIAL(debug) << "GatedStokesSpectrometer operator() called (count = " + << _call_count << ")"; + if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { + // Stop on unexpected buffer size + BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got " + << block.used_bytes() << " byte, expected " + << _dadaBufferLayout.getBufferSize() << " byte)"; + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + cudaProfilerStop(); + return true; + } + + // Copy data to device + CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream)); + polarization0.swap(); + polarization1.swap(); + + BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << + block.used_bytes() << ", dataBlockBytes = " << + _dadaBufferLayout.sizeOfData() << "\n"; + + // Copy the data with stride to the GPU: + // CPU: P1P2P1P2P1P2 ... + // GPU: P1P1P1 ... P2P2P2 ... + // If this is a bottleneck the gating kernel could sort the layout out + // during copy + int heapsize_bytes = _nsamps_per_heap * _nbits / 8; + CUDA_ERROR_CHECK(cudaMemcpy2DAsync( + static_cast<void *>(polarization0._raw_voltage.a_ptr()), + heapsize_bytes, + static_cast<void *>(block.ptr()), + 2 * heapsize_bytes, + heapsize_bytes, _dadaBufferLayout.sizeOfData() / heapsize_bytes/ 2, + cudaMemcpyHostToDevice, _h2d_stream)); + + CUDA_ERROR_CHECK(cudaMemcpy2DAsync( + static_cast<void *>(polarization1._raw_voltage.a_ptr()), + heapsize_bytes, + static_cast<void *>(block.ptr()) + heapsize_bytes, + 2 * heapsize_bytes, + heapsize_bytes, _dadaBufferLayout.sizeOfData() / heapsize_bytes/ 2, + cudaMemcpyHostToDevice, _h2d_stream)); + + CUDA_ERROR_CHECK(cudaMemcpy2DAsync( + static_cast<void *>(polarization0._sideChannelData.a_ptr()), + sizeof(uint64_t), + static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()), + 2 * sizeof(uint64_t), + sizeof(uint64_t), + _dadaBufferLayout.sizeOfSideChannelData() / 2 / sizeof(uint64_t), + cudaMemcpyHostToDevice, _h2d_stream)); + + CUDA_ERROR_CHECK(cudaMemcpy2DAsync( + static_cast<void *>(polarization1._sideChannelData.a_ptr()), + sizeof(uint64_t), + static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap() + sizeof(uint64_t)), + 2 * sizeof(uint64_t), + sizeof(uint64_t), + _dadaBufferLayout.sizeOfSideChannelData() / 2 / sizeof(uint64_t), cudaMemcpyHostToDevice, _h2d_stream)); + + BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16) + << std::setfill('0') << std::hex << + (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + + _dadaBufferLayout.sizeOfGap()))[0] << std::dec; + + + if (_call_count == 1) { + return false; + } + + // process data + // check if new outblock is started: _call_count -1 because this is the block number on the device + bool newBlock = (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0); + + // only if a newblock is started the output buffer is swapped. Otherwise the + // new data is added to it + if (newBlock) + { + BOOST_LOG_TRIVIAL(debug) << "Starting new output block."; + stokes_G0.swap(); + stokes_G1.swap(); + stokes_G0.reset(_proc_stream); + stokes_G1.reset(_proc_stream); + } + + mergeSideChannels<<<1024, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(polarization0._sideChannelData.a().data()), + thrust::raw_pointer_cast(polarization1._sideChannelData.a().data()), polarization1._sideChannelData.a().size()); + + gated_fft(polarization0, stokes_G0._noOfBitSets.a(), stokes_G1._noOfBitSets.a()); + gated_fft(polarization1, stokes_G0._noOfBitSets.a(), stokes_G1._noOfBitSets.a()); + + stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(polarization0._channelised_voltage_G0.data()), + thrust::raw_pointer_cast(polarization1._channelised_voltage_G0.data()), + thrust::raw_pointer_cast(stokes_G0.I.a().data()), + thrust::raw_pointer_cast(stokes_G0.Q.a().data()), + thrust::raw_pointer_cast(stokes_G0.U.a().data()), + thrust::raw_pointer_cast(stokes_G0.V.a().data()), + _nchans, _naccumulate + ); + + stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(polarization0._channelised_voltage_G1.data()), + thrust::raw_pointer_cast(polarization1._channelised_voltage_G1.data()), + thrust::raw_pointer_cast(stokes_G1.I.a().data()), + thrust::raw_pointer_cast(stokes_G1.Q.a().data()), + thrust::raw_pointer_cast(stokes_G1.U.a().data()), + thrust::raw_pointer_cast(stokes_G1.V.a().data()), + _nchans, _naccumulate + ); + + + CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); + + if ((_call_count == 2) || (!newBlock)) { + return false; + } + + // copy data to host if block is finished + CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); + _host_power_db.swap(); + // OUTPUT MEMORY LAYOUT: + // I G0, IG1,Q G0, QG1, U G0,UG1,V G0,VG1, 8xSCI, ... + + for (size_t i = 0; i < stokes_G0._noOfBitSets.size(); i++) + { + size_t memslicesize = (_nchans * sizeof(IntegratedPowerType)); + size_t memOffset = 8 * i * (memslicesize + + sizeof(size_t)); + // Copy II QQ UU VV + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) , + static_cast<void *>(stokes_G0.I.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 1 * memslicesize) , + static_cast<void *>(stokes_G1.I.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * memslicesize) , + static_cast<void *>(stokes_G0.Q.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 3 * memslicesize) , + static_cast<void *>(stokes_G1.Q.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 4 * memslicesize) , + static_cast<void *>(stokes_G0.U.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 5 * memslicesize) , + static_cast<void *>(stokes_G1.U.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 6 * memslicesize) , + static_cast<void *>(stokes_G0.V.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 7 * memslicesize) , + static_cast<void *>(stokes_G1.V.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + // Copy SCI + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize), + static_cast<void *>(stokes_G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 1 * sizeof(size_t)), + static_cast<void *>(stokes_G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 2 * sizeof(size_t)), + static_cast<void *>(stokes_G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 3 * sizeof(size_t)), + static_cast<void *>(stokes_G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 4 * sizeof(size_t)), + static_cast<void *>(stokes_G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 5 * sizeof(size_t)), + static_cast<void *>(stokes_G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 6 * sizeof(size_t)), + static_cast<void *>(stokes_G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 7 * sizeof(size_t)), + static_cast<void *>(stokes_G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + + } + + BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host"; + + if (_call_count == 3) { + return false; + } + + // calculate off value + //BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:"; + //size_t total_samples_lost = 0; + //for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) + //{ + // size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); + + // size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)); + // size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); + + // size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values); + // total_samples_lost += samples_lost; + + // BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n" + // <<" Samples with bit set : " << *on_values << std::endl + // <<" Samples without bit set: " << *off_values << std::endl + // <<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl; + //} + //double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size()); + //double prev_average = _processing_efficiency / (_call_count- 3 - 1); + //_processing_efficiency += efficiency; + //double average = _processing_efficiency / (_call_count-3); + //BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")"; + + // Wrap in a RawBytes object here; + RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()), + _host_power_db.size(), + _host_power_db.size()); + BOOST_LOG_TRIVIAL(debug) << "Calling handler"; + // The handler can't do anything asynchronously without a copy here + // as it would be unsafe (given that it does not own the memory it + // is being passed). + + _handler(bytes); + return false; // +} // operator () + +} // edd +} // effelsberg +} // psrdada_cpp + diff --git a/psrdada_cpp/effelsberg/edd/test/src/GatedStokesSpectrometerTest.cu b/psrdada_cpp/effelsberg/edd/test/src/GatedStokesSpectrometerTest.cu new file mode 100644 index 0000000000000000000000000000000000000000..9c88f4c4b752254cb662e1ff966d5f7c72399196 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/test/src/GatedStokesSpectrometerTest.cu @@ -0,0 +1,263 @@ +#include "psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh" + +#include "psrdada_cpp/dada_null_sink.hpp" +#include "psrdada_cpp/multilog.hpp" +#include "gtest/gtest.h" + +#include "thrust/device_vector.h" +#include "thrust/extrema.h" + + +TEST(GatedStokesSpectrometer, BitManipulationMacros) { + for (int i = 0; i < 64; i++) { + uint64_t v = 0; + SET_BIT(v, i); + + for (int j = 0; j < 64; j++) { + if (j == i) + EXPECT_EQ(TEST_BIT(v, j), 1); + else + EXPECT_EQ(TEST_BIT(v, j), 0); + } + } +} + + +TEST(GatedStokesSpectrometer, stokes_IQUV) +{ + float I,Q,U,V; + // No field + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 0); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, 0); + + // For p1 = Ex, p2 = Ey + // horizontal polarized + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 1); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, 0); + + // vertical polarized + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){1.0f,0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, -1); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, 0); + + //linear +45 deg. + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, 1); + EXPECT_FLOAT_EQ(V, 0); + + //linear -45 deg. + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){-1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, -1); + EXPECT_FLOAT_EQ(V, 0); + + //left circular + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, -1); + + // right circular + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,-1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, 1); +} + + +TEST(GatedStokesSpectrometer, stokes_accumulate) +{ + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + size_t nchans = 8 * 1024 * 1024 + 1; + size_t naccumulate = 5; + + thrust::device_vector<float2> P0(nchans * naccumulate); + thrust::device_vector<float2> P1(nchans * naccumulate); + thrust::fill(P0.begin(), P0.end(), (float2){0, 0}); + thrust::fill(P1.begin(), P1.end(), (float2){0, 0}); + thrust::device_vector<float> I(nchans); + thrust::device_vector<float> Q(nchans); + thrust::device_vector<float> U(nchans); + thrust::device_vector<float> V(nchans); + thrust::fill(I.begin(), I.end(), 0); + thrust::fill(Q.begin(), Q.end(), 0); + thrust::fill(U.begin(), U.end(), 0); + thrust::fill(V.begin(), V.end(), 0); + + // This channel should be left circular polarized + size_t idx0 = 23; + for (int k = 0; k< naccumulate; k++) + { + size_t idx = idx0 + k * nchans; + P0[idx] = (float2){0.0f, 1.0f/std::sqrt(2)}; + P1[idx] = (float2){1.0f/std::sqrt(2),0.0f}; + } + + psrdada_cpp::effelsberg::edd::stokes_accumulate<<<1024, 1024>>>( + thrust::raw_pointer_cast(P0.data()), + thrust::raw_pointer_cast(P1.data()), + thrust::raw_pointer_cast(I.data()), + thrust::raw_pointer_cast(Q.data()), + thrust::raw_pointer_cast(U.data()), + thrust::raw_pointer_cast(V.data()), + nchans, + naccumulate + ); + + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; + + minmax = thrust::minmax_element(I.begin(), I.end()); + EXPECT_FLOAT_EQ(*minmax.first, 0); + EXPECT_FLOAT_EQ(*minmax.second, naccumulate); + + minmax = thrust::minmax_element(Q.begin(), Q.end()); + EXPECT_FLOAT_EQ(*minmax.first, 0); + EXPECT_FLOAT_EQ(*minmax.second, 0); + + minmax = thrust::minmax_element(U.begin(), U.end()); + EXPECT_FLOAT_EQ(*minmax.first, 0); + EXPECT_FLOAT_EQ(*minmax.second, 0); + + minmax = thrust::minmax_element(V.begin(), V.end()); + EXPECT_FLOAT_EQ(*minmax.first, -1. * naccumulate); + EXPECT_FLOAT_EQ(*minmax.second, 0); +} + + + +TEST(GatedStokesSpectrometer, GatingKernel) +{ + const size_t blockSize = 1024; + const size_t nBlocks = 16 * 1024; + + thrust::device_vector<float> G0(blockSize * nBlocks); + thrust::device_vector<float> G1(blockSize * nBlocks); + thrust::device_vector<uint64_t> _sideChannelData(nBlocks); + thrust::device_vector<psrdada_cpp::effelsberg::edd::uint64_cu> _nG0(nBlocks); + thrust::device_vector<psrdada_cpp::effelsberg::edd::uint64_cu> _nG1(nBlocks); + thrust::device_vector<float> baseLineG0(1); + thrust::device_vector<float> baseLineG1(1); + + thrust::device_vector<float> baseLineG0_update(1); + thrust::device_vector<float> baseLineG1_update(1); + thrust::fill(G0.begin(), G0.end(), 42); + thrust::fill(G1.begin(), G1.end(), 23); + thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0); + + // everything to G0 + { + thrust::fill(_nG0.begin(), _nG0.end(), 0); + thrust::fill(_nG1.begin(), _nG1.end(), 0); + baseLineG0[0] = -3; + baseLineG1[0] = -4; + baseLineG0_update[0] = 0; + baseLineG1_update[0] = 0; + + const uint64_t *sideCD = + (uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); + psrdada_cpp::effelsberg::edd::gating<<<1024 , 1024>>>( + thrust::raw_pointer_cast(G0.data()), + thrust::raw_pointer_cast(G1.data()), sideCD, + G0.size(), blockSize, 0, 1, + 0, + thrust::raw_pointer_cast(baseLineG0.data()), + thrust::raw_pointer_cast(baseLineG1.data()), + thrust::raw_pointer_cast(baseLineG0_update.data()), + thrust::raw_pointer_cast(baseLineG1_update.data()), + thrust::raw_pointer_cast(_nG0.data()), + thrust::raw_pointer_cast(_nG1.data()) + ); + + thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; + minmax = thrust::minmax_element(G0.begin(), G0.end()); + EXPECT_EQ(*minmax.first, 42); + EXPECT_EQ(*minmax.second, 42); + + minmax = thrust::minmax_element(G1.begin(), G1.end()); + EXPECT_EQ(*minmax.first, -4.); + EXPECT_EQ(*minmax.second, -4.); + + EXPECT_EQ(_nG0[0], G0.size()); + EXPECT_EQ(_nG1[0], 0u); + + EXPECT_FLOAT_EQ(42.f, baseLineG0_update[0] / (_nG0[0] + 1E-121)); + EXPECT_FLOAT_EQ(0.f, baseLineG1_update[0] / (_nG1[0] + 1E-121)); + } + + // everything to G1 // with baseline -5 + { + thrust::fill(_nG0.begin(), _nG0.end(), 0); + thrust::fill(_nG1.begin(), _nG1.end(), 0); + baseLineG0[0] = 5.; + baseLineG1[0] = -2; + baseLineG0_update[0] = 0; + baseLineG1_update[0] = 0; + + thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L); + const uint64_t *sideCD = + (uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); + psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>( + thrust::raw_pointer_cast(G0.data()), + thrust::raw_pointer_cast(G1.data()), sideCD, + G0.size(), blockSize, 0, 1, + 0, + thrust::raw_pointer_cast(baseLineG0.data()), + thrust::raw_pointer_cast(baseLineG1.data()), + thrust::raw_pointer_cast(baseLineG0_update.data()), + thrust::raw_pointer_cast(baseLineG1_update.data()), + thrust::raw_pointer_cast(_nG0.data()), + thrust::raw_pointer_cast(_nG1.data()) + ); + thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; + minmax = thrust::minmax_element(G0.begin(), G0.end()); + EXPECT_EQ(*minmax.first, 5.); + EXPECT_EQ(*minmax.second, 5.); + + minmax = thrust::minmax_element(G1.begin(), G1.end()); + EXPECT_EQ(*minmax.first, 42); + EXPECT_EQ(*minmax.second, 42); + + EXPECT_EQ(_nG0[0], 0u); + EXPECT_EQ(_nG1[0], G1.size()); + + EXPECT_FLOAT_EQ(0.f, baseLineG0_update[0] / (_nG0[0] + 1E-121)); + EXPECT_FLOAT_EQ(42.f, baseLineG1_update[0] / (_nG1[0] + 1E-121)); + } +} + +TEST(GatedStokesSpectrometer, array_sum) { + + const size_t NBLOCKS = 16 * 32; + const size_t NTHREADS = 1024; + + size_t inputLength = 1 << 22 + 1 ; + size_t dataLength = inputLength; + ////zero pad input array + //if (inputLength % (2 * NTHREADS) != 0) + // dataLength = (inputLength / (2 * NTHREADS) + 1) * 2 * NTHREADS; + thrust::device_vector<float> data(dataLength); + thrust::fill(data.begin(), data.begin() + inputLength, 1); + //thrust::fill(data.begin() + inputLength, data.end(), 0); + thrust::device_vector<float> blr(NTHREADS * 2); + + thrust::fill(blr.begin(), blr.end(), 0); + + psrdada_cpp::effelsberg::edd::array_sum<<<NBLOCKS, NTHREADS, NTHREADS* sizeof(float)>>>(thrust::raw_pointer_cast(data.data()), data.size(), thrust::raw_pointer_cast(blr.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<1, NTHREADS, NTHREADS* sizeof(float)>>>(thrust::raw_pointer_cast(blr.data()), blr.size(), thrust::raw_pointer_cast(blr.data())); + + EXPECT_EQ(size_t(blr[0]), inputLength); +}