diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index d716594d4f9b4ababaa23fa827ad6b038fc92663..03cd6d45cdeffabc29a3386a7143f8a7415b7cec 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -14,6 +14,13 @@ #include "cublas_v2.h" + +#include <iostream> +#include <iomanip> +#include <cstring> +#include <sstream> + + namespace psrdada_cpp { namespace effelsberg { namespace edd { @@ -26,21 +33,309 @@ namespace edd { typedef unsigned long long int uint64_cu; static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!"); + +typedef uint64_t RawVoltageType; +typedef float UnpackedVoltageType; +typedef float2 ChannelisedVoltageType; +typedef float IntegratedPowerType; + + + +struct InputDataStream +{ + virtual void swap() = 0; + virtual void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout) = 0; + virtual void getFromBlock(RawBytes &block, DadaBufferLayout &dadaBufferLayout, cudaStream_t &_h2d_stream) = 0; +}; + + +/// Input data and intermediate processing data for one polarization +struct PolarizationData : public InputDataStream +{ + /// Raw ADC Voltage + DoubleDeviceBuffer<RawVoltageType> _raw_voltage; + /// Side channel data + DoubleDeviceBuffer<uint64_t> _sideChannelData; + + /// Baseline in gate 0 state + thrust::device_vector<UnpackedVoltageType> _baseLineG0; + /// Baseline in gate 1 state + thrust::device_vector<UnpackedVoltageType> _baseLineG1; + + /// Baseline in gate 0 state after update + thrust::device_vector<UnpackedVoltageType> _baseLineG0_update; + /// Baseline in gate 1 state after update + thrust::device_vector<UnpackedVoltageType> _baseLineG1_update; + + /// Channelized voltage in gate 0 state + thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0; + /// Channelized voltage in gate 1 state + thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1; + + /// Swaps input buffers + void swap() + { + _raw_voltage.swap(); + _sideChannelData.swap(); + } + + void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout) + { + _raw_voltage.resize(dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); + BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " << _raw_voltage.size(); + + _baseLineG0.resize(1); + _baseLineG0_update.resize(1); + _baseLineG1.resize(1); + _baseLineG1_update.resize(1); + _channelised_voltage_G0.resize(_nchans * _batch); + _channelised_voltage_G1.resize(_nchans * _batch); + BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " << _channelised_voltage_G0.size(); + } + + void getFromBlock(RawBytes &block, DadaBufferLayout &dadaBufferLayout, cudaStream_t &_h2d_stream) + { + BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes() + << ", dataBlockBytes = " << dadaBufferLayout.sizeOfData() << "\n"; + + CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage.a_ptr()), + static_cast<void *>(block.ptr()), + dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice, + _h2d_stream)); + CUDA_ERROR_CHECK(cudaMemcpyAsync( + static_cast<void *>(_sideChannelData.a_ptr()), + static_cast<void *>(block.ptr() + dadaBufferLayout.sizeOfData() + dadaBufferLayout.sizeOfGap()), + dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream)); + BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16) + << std::setfill('0') << std::hex << + (reinterpret_cast<uint64_t*>(block.ptr() + dadaBufferLayout.sizeOfData() + + dadaBufferLayout.sizeOfGap()))[0] << + std::dec; + } + + +}; + + +//struct DualPolarizationData : public InputDataStream +//{ +// PolarizationData pol0, pol1; +// void swap() +// { +// pol0.swap(); pol1.swap(); +// } +// +// void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout) +// { +// BOOST_LOG_TRIVIAL(debug) << " Pol0"; +// pol0.resize(_nchans, _batch, dadaBufferLayout); +// BOOST_LOG_TRIVIAL(debug) << " Pol1"; +// pol1.resize(_nchans, _batch, dadaBufferLayout); +// } +//}; + + + + + +// Output data for one gate N = 1 for one pol, or 4 for full stokes +struct OutputDataStream +{ + /// Reset outptu for new integration + virtual void reset(cudaStream_t &_proc_stream) = 0; + virtual void swap() = 0; + virtual void resize(size_t size, size_t blocks) = 0; + + DoublePinnedHostBuffer<char> _host_power; +}; + + +// Output data for one gate, single power spectrum +struct PowerSpectrumOutput +{ + /// spectrum data + DoubleDeviceBuffer<IntegratedPowerType> data; + + /// Number of samples integrated in this output block + DoubleDeviceBuffer<uint64_cu> _noOfBitSets; + + /// Reset outptu for new integration + void reset(cudaStream_t &_proc_stream) + { + thrust::fill(thrust::cuda::par.on(_proc_stream), data.a().begin(),data.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L); + } + + /// Swap output buffers + void swap() + { + data.swap(); + _noOfBitSets.swap(); + } + + /// Resize all buffers + void resize(size_t size, size_t blocks) + { + data.resize(size * blocks); + _noOfBitSets.resize(blocks); + } +}; + + +struct GatedPowerSpectrumOutput : public OutputDataStream +{ + PowerSpectrumOutput G0, G1; + size_t _nchans; + + void reset(cudaStream_t &_proc_stream) + { + G0.reset(_proc_stream); + G1.reset(_proc_stream); + } + + /// Swap output buffers + void swap() + { + G0.swap(); + G1.swap(); + _host_power.swap(); + } + + /// Resize all buffers + void resize(size_t size, size_t blocks) + { + // ToDo: size passed in constructor, also number of blocks. + G0.resize(size, blocks); + G1.resize(size, blocks); + _nchans = size; + + // on the host both power are stored in the same data buffer together with + // the number of bit sets + _host_power.resize( 2 * ( size * sizeof(IntegratedPowerType) + sizeof(size_t) ) * G0._noOfBitSets.size()); + } + + void data2Host(cudaStream_t &_d2h_stream) + { + // copy data to host if block is finished + CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); + + for (size_t i = 0; i < G0._noOfBitSets.size(); i++) + { + // size of individual spectrum + meta + size_t memslicesize = (_nchans * sizeof(IntegratedPowerType)); + // number of spectra per output + size_t memOffset = 2 * i * (memslicesize + + sizeof(size_t)); + + // copy 2x channel data + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset) , + static_cast<void *>(G0.data.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 1 * memslicesize) , + static_cast<void *>(G1.data.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + // copy noOf bit set data + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)), + static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)), + static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + } + } + + +}; + + +// Output data for one gate full stokes +struct FullStokesOutput : public OutputDataStream +{ + /// Stokes parameters + DoubleDeviceBuffer<IntegratedPowerType> I; + DoubleDeviceBuffer<IntegratedPowerType> Q; + DoubleDeviceBuffer<IntegratedPowerType> U; + DoubleDeviceBuffer<IntegratedPowerType> V; + + /// Number of samples integrated in this output block + DoubleDeviceBuffer<uint64_cu> _noOfBitSets; + + /// Reset outptu for new integration + void reset(cudaStream_t &_proc_stream) + { + thrust::fill(thrust::cuda::par.on(_proc_stream), I.a().begin(), I.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), Q.a().begin(), Q.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), U.a().begin(), U.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), V.a().begin(), V.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L); + } + + /// Swap output buffers + void swap() + { + I.swap(); + Q.swap(); + U.swap(); + V.swap(); + _noOfBitSets.swap(); + } + + /// Resize all buffers + void resize(size_t size, size_t blocks) + { + I.resize(size * blocks); + Q.resize(size * blocks); + U.resize(size * blocks); + V.resize(size * blocks); + _noOfBitSets.resize(blocks); + } +}; + + +struct GatedFullStokesOutput: public OutputDataStream +{ + FullStokesOutput G0, G1; + void reset(cudaStream_t &_proc_stream) + { + G0.reset(_proc_stream); + G1.reset(_proc_stream); + } + + /// Swap output buffers + void swap() + { + G0.swap(); + G1.swap(); + } + + /// Resize all buffers + void resize(size_t size, size_t blocks) + { + G0.resize(size, blocks); + G1.resize(size, blocks); + } +}; + + + + /** @class GatedSpectrometer @brief Split data into two streams and create integrated spectra depending on bit set in side channel data. */ -template <class HandlerType, typename IntegratedPowerType> class GatedSpectrometer { -public: - typedef uint64_t RawVoltageType; - typedef float UnpackedVoltageType; - typedef float2 ChannelisedVoltageType; - -// typedef float IntegratedPowerType; - //typedef int8_t IntegratedPowerType; - +template <class HandlerType > class GatedSpectrometer { public: /** * @brief Constructor @@ -90,11 +385,11 @@ public: bool operator()(RawBytes &block); private: - void process(thrust::device_vector<RawVoltageType> const &digitiser_raw, - thrust::device_vector<uint64_t> const &sideChannelData, - thrust::device_vector<IntegratedPowerType> &detected, - thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0, - thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1); + // gate the data from the input stream and fft data per gate. Number of bit + // sets in every gate is stored in the output datasets + void gated_fft(PolarizationData &data, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1); private: DadaBufferLayout _dadaBufferLayout; @@ -111,30 +406,23 @@ private: HandlerType &_handler; cufftHandle _fft_plan; uint64_t _nchans; + uint64_t _nBlocks; uint64_t _call_count; double _processing_efficiency; std::unique_ptr<Unpacker> _unpacker; - std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector; - - // Input data - DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db; - DoubleDeviceBuffer<uint64_t> _sideChannelData_db; - // Output data - DoubleDeviceBuffer<IntegratedPowerType> _power_db; + OutputDataStream* outputDataStream; + InputDataStream* inputDataStream; + std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector; - DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G0; - DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G1; - DoublePinnedHostBuffer<char> _host_power_db; - - // Intermediate process steps + // Temporary processing block + // ToDo: Use inplace FFT to avoid temporary coltage array thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0; thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1; - thrust::device_vector<ChannelisedVoltageType> _channelised_voltage; - thrust::device_vector<UnpackedVoltageType> _baseLineNG0; - thrust::device_vector<UnpackedVoltageType> _baseLineNG1; + + cudaStream_t _h2d_stream; cudaStream_t _proc_stream; @@ -170,8 +458,8 @@ private: __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, size_t N, size_t heapSize, size_t bitpos, size_t noOfSideChannels, size_t selectedSideChannel, - const float baseLineG0, - const float baseLineG1, + const float* __restrict__ _baseLineG0, + const float* __restrict__ _baseLineG1, float* __restrict__ baseLineNG0, float* __restrict__ baseLineNG1, uint64_cu* stats_G0, diff --git a/psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh index a3b160c5f8a5629acb21a866086e2fc0a59d5a7c..a55790cd8b43beeb557ec7867aad143e895b8d87 100644 --- a/psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedStokesSpectrometer.cuh @@ -27,87 +27,9 @@ namespace edd { typedef unsigned long long int uint64_cu; static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!"); -typedef uint64_t RawVoltageType; -typedef float UnpackedVoltageType; -typedef float2 ChannelisedVoltageType; -typedef float IntegratedPowerType; -//typedef int8_t IntegratedPowerType; - -/// Input data and intermediate processing data for one polarization -struct PolarizationData -{ - /// Raw ADC Voltage - DoubleDeviceBuffer<RawVoltageType> _raw_voltage; - /// Side channel data - DoubleDeviceBuffer<uint64_t> _sideChannelData; - - /// Baseline in gate 0 state - thrust::device_vector<UnpackedVoltageType> _baseLineG0; - /// Baseline in gate 1 state - thrust::device_vector<UnpackedVoltageType> _baseLineG1; - - /// Baseline in gate 0 state after update - thrust::device_vector<UnpackedVoltageType> _baseLineG0_update; - /// Baseline in gate 1 state after update - thrust::device_vector<UnpackedVoltageType> _baseLineG1_update; - - /// Channelized voltage in gate 0 state - thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0; - /// Channelized voltage in gate 1 state - thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1; - - /// Swaps input buffers - void swap() - { - _raw_voltage.swap(); - _sideChannelData.swap(); - } -}; -// Output data for one gate -struct StokesOutput -{ - /// Stokes parameters - DoubleDeviceBuffer<IntegratedPowerType> I; - DoubleDeviceBuffer<IntegratedPowerType> Q; - DoubleDeviceBuffer<IntegratedPowerType> U; - DoubleDeviceBuffer<IntegratedPowerType> V; - - /// Number of samples integrated in this output block - DoubleDeviceBuffer<uint64_cu> _noOfBitSets; - - /// Reset outptu for new integration - void reset(cudaStream_t &_proc_stream) - { - thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.); - thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.); - thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.); - thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.); - thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L); - } - - /// Swap output buffers - void swap() - { - I.swap(); - Q.swap(); - U.swap(); - V.swap(); - _noOfBitSets.swap(); - } - - /// Resize all buffers - void resize(size_t size, size_t blocks) - { - I.resize(size * blocks); - Q.resize(size * blocks); - U.resize(size * blocks); - V.resize(size * blocks); - _noOfBitSets.resize(blocks); - } -}; @@ -177,8 +99,8 @@ public: private: // gate the data and fft data per gate void gated_fft(PolarizationData &data, - thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, - thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1); + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1); private: DadaBufferLayout _dadaBufferLayout; diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index bd33eaecc2b0be867a8fe7d6768abf1c329ffcc1..c9e8c9a762a02708ad5b99eabe68a781bd4e5240 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -17,8 +17,9 @@ namespace psrdada_cpp { namespace effelsberg { namespace edd { +// Reduce thread local vatiable v in shared array x, so that x[0] template<typename T> -__device__ void reduce(T *x, const T &v) +__device__ void sum_reduce(T *x, const T &v) { x[threadIdx.x] = v; __syncthreads(); @@ -28,26 +29,40 @@ __device__ void reduce(T *x, const T &v) x[threadIdx.x] += x[threadIdx.x + s]; __syncthreads(); } +} +// If one of the side channel items is lsot, then both are considered as lost +// here +__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t* __restrict__ B, size_t N) +{ + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + i += blockDim.x * gridDim.x) + { + uint64_t v = A[i] || B[i]; + A[i] = v; + B[i] = v; + } } -__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData, - size_t N, size_t heapSize, size_t bitpos, - size_t noOfSideChannels, size_t selectedSideChannel, - const float baseLineG0, - const float baseLineG1, - float* __restrict__ baseLineNG0, - float* __restrict__ baseLineNG1, - uint64_cu* stats_G0, uint64_cu* stats_G1) { -// float baseLineG0 = (*_baseLineNG0) / N; - // float baseLineG1 = (*_baseLineNG1) / N; - +__global__ void gating(float* __restrict__ G0, + float* __restrict__ G1, + const uint64_t* __restrict__ sideChannelData, + size_t N, size_t heapSize, size_t bitpos, + size_t noOfSideChannels, size_t selectedSideChannel, + const float* __restrict__ _baseLineG0, + const float* __restrict__ _baseLineG1, + float* __restrict__ baseLineNG0, + float* __restrict__ baseLineNG1, + uint64_cu* stats_G0, uint64_cu* stats_G1) { // statistics values for samopels to G0, G1 uint32_t _G0stats = 0; uint32_t _G1stats = 0; + const float baseLineG0 = _baseLineG0[0]; + const float baseLineG1 = _baseLineG1[0]; + float baselineUpdateG0 = 0; float baselineUpdateG1 = 0; @@ -55,12 +70,8 @@ __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uin i += blockDim.x * gridDim.x) { const float v = G0[i]; - const uint64_t sideChannelItem = - sideChannelData[((i / heapSize) * (noOfSideChannels)) + - selectedSideChannel]; // Probably not optimal access as - // same data is copied for several - // threads, but maybe efficiently - // handled by cache? + const uint64_t sideChannelItem = sideChannelData[((i / heapSize) * (noOfSideChannels)) + + selectedSideChannel]; const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos); const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63); @@ -73,36 +84,75 @@ __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uin baselineUpdateG1 += v * bit_set * (!heap_lost); baselineUpdateG0 += v * (!bit_set) *(!heap_lost); } + __shared__ uint32_t x[1024]; // Reduce G0, G1 - reduce<uint32_t>(x, _G0stats); - if(threadIdx.x == 0) + sum_reduce<uint32_t>(x, _G0stats); + if(threadIdx.x == 0) { atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]); - + } __syncthreads(); - reduce<uint32_t>(x, _G1stats); - if(threadIdx.x == 0) + + sum_reduce<uint32_t>(x, _G1stats); + if(threadIdx.x == 0) { atomicAdd(stats_G1, (uint64_cu) x[threadIdx.x]); + } + __syncthreads(); //reuse shared array float *y = (float*) x; - //update the baseline array - reduce<float>(y, baselineUpdateG0); - if(threadIdx.x == 0) + sum_reduce<float>(y, baselineUpdateG0); + if(threadIdx.x == 0) { atomicAdd(baseLineNG0, y[threadIdx.x]); - + } __syncthreads(); - reduce<float>(y, baselineUpdateG1); - if(threadIdx.x == 0) + + sum_reduce<float>(y, baselineUpdateG1); + if(threadIdx.x == 0) { atomicAdd(baseLineNG1, y[threadIdx.x]); + } + __syncthreads(); +} + + + +// Updates the baselines of the gates for the polarization set for the next +// block +// only few output blocks per input block thus execution on only one thread. +// Important is that the execution is async on the GPU. +__global__ void update_baselines(float* __restrict__ baseLineG0, + float* __restrict__ baseLineG1, + float* __restrict__ baseLineNG0, + float* __restrict__ baseLineNG1, + uint64_cu* stats_G0, uint64_cu* stats_G1, + size_t N) +{ + size_t NG0 = 0; + size_t NG1 = 0; + + for (size_t i =0; i < N; i++) + { + NG0 += stats_G0[i]; + NG1 += stats_G1[i]; + } + + baseLineG0[0] = baseLineNG0[0] / NG0; + baseLineG1[0] = baseLineNG1[0] / NG1; + baseLineNG0[0] = 0; + baseLineNG1[0] = 0; } -template <class HandlerType, typename IntegratedPowerType> -GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( + + + + + +template <class HandlerType> +GatedSpectrometer<HandlerType>::GatedSpectrometer( const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate, std::size_t nbits, float input_level, float output_level, @@ -136,22 +186,21 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits; _nsamps_per_output_spectra = fft_length * naccumulate; - int nBlocks; if (_nsamps_per_output_spectra <= _nsamps_per_buffer) { // one buffer block is used for one or multiple output spectra size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra; // All data in one block has to be used assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer); - nBlocks = 1; + _nBlocks = 1; } else { // multiple blocks are integrated intoone output size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer; // All data in multiple blocks has to be used assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra); - nBlocks = N; + _nBlocks = N; } - BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra."; + BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << _nBlocks << " into one spectra."; _nchans = _fft_length / 2 + 1; int batch = _nsamps_per_buffer / _fft_length; @@ -171,36 +220,20 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( cufftSetStream(_fft_plan, _proc_stream); BOOST_LOG_TRIVIAL(debug) << "Allocating memory"; - _raw_voltage_db.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); - _sideChannelData_db.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps()); - BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " - << _raw_voltage_db.size(); + + // if singlePol + inputDataStream = new PolarizationData(); + inputDataStream->resize(_nchans, batch, _dadaBufferLayout); + _unpacked_voltage_G0.resize(_nsamps_per_buffer); _unpacked_voltage_G1.resize(_nsamps_per_buffer); - - _baseLineNG0.resize(1); - _baseLineNG1.resize(1); BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size(); - _channelised_voltage.resize(_nchans * batch); - BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " - << _channelised_voltage.size(); - _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2); // hold on and off spectra to simplify output - thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.); - thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.); - BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2; - - _noOfBitSetsIn_G0.resize( batch / (_naccumulate / nBlocks)); - _noOfBitSetsIn_G1.resize( batch / (_naccumulate / nBlocks)); - thrust::fill(_noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L); - thrust::fill(_noOfBitSetsIn_G0.b().begin(), _noOfBitSetsIn_G0.b().end(), 0L); - thrust::fill(_noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L); - thrust::fill(_noOfBitSetsIn_G1.b().begin(), _noOfBitSetsIn_G1.b().end(), 0L); - BOOST_LOG_TRIVIAL(debug) << " Bit set counter size: " << _noOfBitSetsIn_G0.size(); - - // on the host both power are stored in the same data buffer together with - // the number of bit sets - _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsIn_G0.size()); + + outputDataStream = new GatedPowerSpectrumOutput(); + outputDataStream->resize(_nchans, batch / (_naccumulate / _nBlocks)); + + CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream)); @@ -208,13 +241,13 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream)); _unpacker.reset(new Unpacker(_proc_stream)); - _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling, + _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / _nBlocks, scaling, offset, _proc_stream)); } // constructor -template <class HandlerType, typename IntegratedPowerType> -GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() { +template <class HandlerType> +GatedSpectrometer<HandlerType>::~GatedSpectrometer() { BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer"; if (!_fft_plan) cufftDestroy(_fft_plan); @@ -224,8 +257,8 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() { } -template <class HandlerType, typename IntegratedPowerType> -void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) { +template <class HandlerType> +void GatedSpectrometer<HandlerType>::init(RawBytes &block) { BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called"; std::stringstream headerInfo; headerInfo << "\n" @@ -254,78 +287,86 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) } -template <class HandlerType, typename IntegratedPowerType> -void GatedSpectrometer<HandlerType, IntegratedPowerType>::process( - thrust::device_vector<RawVoltageType> const &digitiser_raw, - thrust::device_vector<uint64_t> const &sideChannelData, - thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1) { + +template <class HandlerType> +void GatedSpectrometer<HandlerType>::gated_fft( + PolarizationData &data, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, + thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1 + ) +{ BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages"; switch (_nbits) { case 8: - _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0); + _unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0); break; case 12: - _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0); + _unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0); break; default: throw std::runtime_error("Unsupported number of bits"); } - BOOST_LOG_TRIVIAL(debug) << "Calculate baseline"; - //calculate baseline from previos block - - BOOST_LOG_TRIVIAL(debug) << "Perform gating"; - - float baseLineG0 = _baseLineNG0[0]; - float baseLineG1 = _baseLineNG1[0]; - - uint64_t NG0 = 0; - uint64_t NG1 = 0; // Loop over outputblocks, for case of multiple output blocks per input block - for (size_t i = 0; i < noOfBitSetsIn_G0.size(); i++) + int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size(); + + for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) { // ToDo: Should be in one kernel call gating<<<1024, 1024, 0, _proc_stream>>>( - thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()), - thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()), - thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()), - _unpacked_voltage_G0.size() / noOfBitSetsIn_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(), + thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap), + thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap), + thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step), + _unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(), + _dadaBufferLayout.getHeapSize(), + _selectedBit, + _dadaBufferLayout.getNSideChannels(), _selectedSideChannel, - baseLineG0, baseLineG1, - thrust::raw_pointer_cast(_baseLineNG0.data()), - thrust::raw_pointer_cast(_baseLineNG1.data()), - thrust::raw_pointer_cast(noOfBitSetsIn_G0.data() + i), - thrust::raw_pointer_cast(noOfBitSetsIn_G1.data() + i) + thrust::raw_pointer_cast(data._baseLineG0.data()), + thrust::raw_pointer_cast(data._baseLineG1.data()), + thrust::raw_pointer_cast(data._baseLineG0_update.data()), + thrust::raw_pointer_cast(data._baseLineG1_update.data()), + thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i), + thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i) ); - NG0 += noOfBitSetsIn_G0[i]; - NG1 += noOfBitSetsIn_G1[i]; } - _baseLineNG0[0] /= NG0; - _baseLineNG1[0] /= NG1; - BOOST_LOG_TRIVIAL(debug) << "Updating Baselines\n G0: " << baseLineG0 << " -> " << _baseLineNG0[0] << ", " << baseLineG1 << " -> " << _baseLineNG1[0] ; + // only few output blocks per input block thus execution on only one thread. + // Important is that the execution is async on the GPU. + update_baselines<<<1,1,0, _proc_stream>>>( + thrust::raw_pointer_cast(data._baseLineG0.data()), + thrust::raw_pointer_cast(data._baseLineG1.data()), + thrust::raw_pointer_cast(data._baseLineG0_update.data()), + thrust::raw_pointer_cast(data._baseLineG1_update.data()), + thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()), + thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()), + _noOfBitSetsIn_G0.size() + ); BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1"; UnpackedVoltageType *_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G0.data()); ChannelisedVoltageType *_channelised_voltage_ptr = - thrust::raw_pointer_cast(_channelised_voltage.data()); + thrust::raw_pointer_cast(data._channelised_voltage_G0.data()); CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, (cufftComplex *)_channelised_voltage_ptr)); - _detector->detect(_channelised_voltage, detected, 2, 0); BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2"; _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data()); + _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data()); CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, (cufftComplex *)_channelised_voltage_ptr)); - _detector->detect(_channelised_voltage, detected, 2, 1); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); BOOST_LOG_TRIVIAL(debug) << "Exit processing"; } // process -template <class HandlerType, typename IntegratedPowerType> -bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) { + + + + +template <class HandlerType> +bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { ++_call_count; BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = " << _call_count << ")"; @@ -340,25 +381,8 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b // Copy data to device CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream)); - _raw_voltage_db.swap(); - _sideChannelData_db.swap(); - - BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes() - << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n"; - - CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()), - static_cast<void *>(block.ptr()), - _dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice, - _h2d_stream)); - CUDA_ERROR_CHECK(cudaMemcpyAsync( - static_cast<void *>(_sideChannelData_db.a_ptr()), - static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()), - _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream)); - BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16) - << std::setfill('0') << std::hex << - (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() - + _dadaBufferLayout.sizeOfGap()))[0] << - std::dec; + inputDataStream->swap(); + inputDataStream->getFromBlock(block, _dadaBufferLayout, _h2d_stream); if (_call_count == 1) { @@ -366,91 +390,87 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b } // process data + // check if new outblock is started: _call_count -1 because this is the block number on the device + bool newBlock = (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0); + // only if a newblock is started the output buffer is swapped. Otherwise the // new data is added to it - bool newBlock = false; - if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device + if (newBlock) { BOOST_LOG_TRIVIAL(debug) << "Starting new output block."; - newBlock = true; - _power_db.swap(); - _noOfBitSetsIn_G0.swap(); - _noOfBitSetsIn_G1.swap(); - // move to specific stream! - thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.); - thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L); - thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L); + outputDataStream->swap(); + outputDataStream->reset(_proc_stream); } - process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsIn_G0.a(), _noOfBitSetsIn_G1.a()); + /// For one pol input and power out + /// ToDo: For two pol input and power out + /// ToDo: For two pol input and stokes out + PolarizationData *polData = dynamic_cast<PolarizationData*>(inputDataStream); + GatedPowerSpectrumOutput *powOut = dynamic_cast<GatedPowerSpectrumOutput*>(outputDataStream); + gated_fft(*polData, powOut->G0._noOfBitSets.a(), powOut->G1._noOfBitSets.a()); + + +// float2 const* input_ptr; +// IntegratedPowerType * output_ptr; +// = thrust::raw_pointer_cast(input.data()); +// T * output_ptr = thrust::raw_pointer_cast(output.data()); +// input_ptr = + kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(polData->_channelised_voltage_G0.data()), + thrust::raw_pointer_cast(powOut->G0.data.a().data()), + _nchans, + polData->_channelised_voltage_G0.size() / _nchans, + _naccumulate / _nBlocks, + 1, 0., 1, 0); + + kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(polData->_channelised_voltage_G1.data()), + thrust::raw_pointer_cast(powOut->G1.data.a().data()), + _nchans, + polData->_channelised_voltage_G1.size() / _nchans, + _naccumulate / _nBlocks, + 1, 0., 1, 0); + + CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); if ((_call_count == 2) || (!newBlock)) { return false; } - // copy data to host if block is finished - CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); - _host_power_db.swap(); - - for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) - { - size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); - // copy 2x channel data - CUDA_ERROR_CHECK( - cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) , - static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans), - 2 * _nchans * sizeof(IntegratedPowerType), - cudaMemcpyDeviceToHost, _d2h_stream)); - - // copy noOf bit set data - CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)), - static_cast<void *>(_noOfBitSetsIn_G0.b_ptr() + i ), - 1 * sizeof(size_t), - cudaMemcpyDeviceToHost, _d2h_stream)); - - CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)), - static_cast<void *>(_noOfBitSetsIn_G1.b_ptr() + i ), - 1 * sizeof(size_t), - cudaMemcpyDeviceToHost, _d2h_stream)); - } - - BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host"; - + powOut->data2Host(_d2h_stream); if (_call_count == 3) { return false; } - // calculate off value - BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:"; - size_t total_samples_lost = 0; - for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) - { - size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); - - size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)); - size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); - - size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values); - total_samples_lost += samples_lost; - - BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n" - <<" Samples with bit set : " << *on_values << std::endl - <<" Samples without bit set: " << *off_values << std::endl - <<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl; - } - double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size()); - double prev_average = _processing_efficiency / (_call_count- 3 - 1); - _processing_efficiency += efficiency; - double average = _processing_efficiency / (_call_count-3); - BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")"; - - // Wrap in a RawBytes object here; - RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()), - _host_power_db.size(), - _host_power_db.size()); +// // calculate off value +// BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:"; +// size_t total_samples_lost = 0; +// for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) +// { +// size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); +// +// size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)); +// size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); +// +// size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values); +// total_samples_lost += samples_lost; +// +// BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n" +// <<" Samples with bit set : " << *on_values << std::endl +// <<" Samples without bit set: " << *off_values << std::endl +// <<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl; +// } +// double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size()); +// double prev_average = _processing_efficiency / (_call_count- 3 - 1); +// _processing_efficiency += efficiency; +// double average = _processing_efficiency / (_call_count-3); +// BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")"; +// +// // Wrap in a RawBytes object here; + RawBytes bytes(reinterpret_cast<char *>(powOut->_host_power.b_ptr()), + powOut->_host_power.size(), + powOut->_host_power.size()); BOOST_LOG_TRIVIAL(debug) << "Calling handler"; // The handler can't do anything asynchronously without a copy here // as it would be unsafe (given that it does not own the memory it diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu index c16d19360ddd0a2862ce82684dda9fc9311b3d19..e67bfaa8dd70f4b684fdff2f37f4c711c2e856b2 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu @@ -227,21 +227,10 @@ GatedStokesSpectrometer<HandlerType>::GatedStokesSpectrometer( _unpacked_voltage_G0.resize(_nsamps_per_buffer); _unpacked_voltage_G1.resize(_nsamps_per_buffer); - polarization0._baseLineG0.resize(1); - polarization0._baseLineG0_update.resize(1); - polarization0._baseLineG1.resize(1); - polarization0._baseLineG1_update.resize(1); - polarization1._baseLineG0.resize(1); - polarization1._baseLineG0_update.resize(1); - polarization1._baseLineG1.resize(1); - polarization1._baseLineG1_update.resize(1); - BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size(); - polarization0._channelised_voltage_G0.resize(_nchans * batch); - polarization0._channelised_voltage_G1.resize(_nchans * batch); - polarization1._channelised_voltage_G0.resize(_nchans * batch); - polarization1._channelised_voltage_G1.resize(_nchans * batch); + polarization0.resize(_nchans * batch); + polarization1.resize(_nchans * batch); BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " << polarization0._channelised_voltage_G0.size(); diff --git a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu index 163e8e9fba27da62bac0eee7654316ddbe61fee4..9983d260de9d3f0d1cdc4dd7d99757588a939f74 100644 --- a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu +++ b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu @@ -7,7 +7,6 @@ #include "thrust/device_vector.h" #include "thrust/extrema.h" -namespace { TEST(GatedSpectrometer, BitManipulationMacros) { for (int i = 0; i < 64; i++) { @@ -24,30 +23,120 @@ TEST(GatedSpectrometer, BitManipulationMacros) { } // -//TEST(GatedSpectrometer, ParameterSanity) { -// ::testing::FLAGS_gtest_death_test_style = "threadsafe"; -// psrdada_cpp::NullSink sink; +//TEST(GatedSpectrometer, stokes_IQUV) +//{ +// float I,Q,U,V; +// // No field +// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); +// EXPECT_FLOAT_EQ(I, 0); +// EXPECT_FLOAT_EQ(Q, 0); +// EXPECT_FLOAT_EQ(U, 0); +// EXPECT_FLOAT_EQ(V, 0); // -// // 8 or 12 bit sampling -// EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 0, 0, 0, 4096, 0, 0, 0, 0, 0, sink), -// "_nbits == 8"); -// // naccumulate > 0 -// EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 0, 0, 0, 4096, 0, 0, 8, 0, 0, sink), -// "_naccumulate"); +// // For p1 = Ex, p2 = Ey +// // horizontal polarized +// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); +// EXPECT_FLOAT_EQ(I, 1); +// EXPECT_FLOAT_EQ(Q, 1); +// EXPECT_FLOAT_EQ(U, 0); +// EXPECT_FLOAT_EQ(V, 0); // -// // selected side channel -// EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 1, 2, 0, 4096, 0, 1, 8, 0, 0, sink), -// "nSideChannels"); +// // vertical polarized +// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){1.0f,0.0f}, I, Q, U, V); +// EXPECT_FLOAT_EQ(I, 1); +// EXPECT_FLOAT_EQ(Q, -1); +// EXPECT_FLOAT_EQ(U, 0); +// EXPECT_FLOAT_EQ(V, 0); // -// // selected bit -// EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 2, 1, 65, 4096, 0, 1, 8, 0, 0, sink), -// "selectedBit"); +// //linear +45 deg. +// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V); +// EXPECT_FLOAT_EQ(I, 1); +// EXPECT_FLOAT_EQ(Q, 0); +// EXPECT_FLOAT_EQ(U, 1); +// EXPECT_FLOAT_EQ(V, 0); // -// // valid construction -// psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink), int8_t> a( -// 4096 * 4096, 2, 1, 63, 4096, 1024, 1, 8, 100., 100., sink); +// //linear -45 deg. +// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){-1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V); +// EXPECT_FLOAT_EQ(I, 1); +// EXPECT_FLOAT_EQ(Q, 0); +// EXPECT_FLOAT_EQ(U, -1); +// EXPECT_FLOAT_EQ(V, 0); +// +// //left circular +// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V); +// EXPECT_FLOAT_EQ(I, 1); +// EXPECT_FLOAT_EQ(Q, 0); +// EXPECT_FLOAT_EQ(U, 0); +// EXPECT_FLOAT_EQ(V, -1); +// +// // right circular +// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,-1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V); +// EXPECT_FLOAT_EQ(I, 1); +// EXPECT_FLOAT_EQ(Q, 0); +// EXPECT_FLOAT_EQ(U, 0); +// EXPECT_FLOAT_EQ(V, 1); //} -} // namespace +// +// +//TEST(GatedSpectrometer, stokes_accumulate) +//{ +// CUDA_ERROR_CHECK(cudaDeviceSynchronize()); +// size_t nchans = 8 * 1024 * 1024 + 1; +// size_t naccumulate = 5; +// +// thrust::device_vector<float2> P0(nchans * naccumulate); +// thrust::device_vector<float2> P1(nchans * naccumulate); +// thrust::fill(P0.begin(), P0.end(), (float2){0, 0}); +// thrust::fill(P1.begin(), P1.end(), (float2){0, 0}); +// thrust::device_vector<float> I(nchans); +// thrust::device_vector<float> Q(nchans); +// thrust::device_vector<float> U(nchans); +// thrust::device_vector<float> V(nchans); +// thrust::fill(I.begin(), I.end(), 0); +// thrust::fill(Q.begin(), Q.end(), 0); +// thrust::fill(U.begin(), U.end(), 0); +// thrust::fill(V.begin(), V.end(), 0); +// +// // This channel should be left circular polarized +// size_t idx0 = 23; +// for (int k = 0; k< naccumulate; k++) +// { +// size_t idx = idx0 + k * nchans; +// P0[idx] = (float2){0.0f, 1.0f/std::sqrt(2)}; +// P1[idx] = (float2){1.0f/std::sqrt(2),0.0f}; +// } +// +// psrdada_cpp::effelsberg::edd::stokes_accumulate<<<1024, 1024>>>( +// thrust::raw_pointer_cast(P0.data()), +// thrust::raw_pointer_cast(P1.data()), +// thrust::raw_pointer_cast(I.data()), +// thrust::raw_pointer_cast(Q.data()), +// thrust::raw_pointer_cast(U.data()), +// thrust::raw_pointer_cast(V.data()), +// nchans, +// naccumulate +// ); +// +// CUDA_ERROR_CHECK(cudaDeviceSynchronize()); +// thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; +// +// minmax = thrust::minmax_element(I.begin(), I.end()); +// EXPECT_FLOAT_EQ(*minmax.first, 0); +// EXPECT_FLOAT_EQ(*minmax.second, naccumulate); +// +// minmax = thrust::minmax_element(Q.begin(), Q.end()); +// EXPECT_FLOAT_EQ(*minmax.first, 0); +// EXPECT_FLOAT_EQ(*minmax.second, 0); +// +// minmax = thrust::minmax_element(U.begin(), U.end()); +// EXPECT_FLOAT_EQ(*minmax.first, 0); +// EXPECT_FLOAT_EQ(*minmax.second, 0); +// +// minmax = thrust::minmax_element(V.begin(), V.end()); +// EXPECT_FLOAT_EQ(*minmax.first, -1. * naccumulate); +// EXPECT_FLOAT_EQ(*minmax.second, 0); +//} +// TEST(GatedSpectrometer, GatingKernel) @@ -63,6 +152,8 @@ TEST(GatedSpectrometer, GatingKernel) thrust::device_vector<float> baseLineG0(1); thrust::device_vector<float> baseLineG1(1); + thrust::device_vector<float> baseLineG0_update(1); + thrust::device_vector<float> baseLineG1_update(1); thrust::fill(G0.begin(), G0.end(), 42); thrust::fill(G1.begin(), G1.end(), 23); thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0); @@ -71,18 +162,22 @@ TEST(GatedSpectrometer, GatingKernel) { thrust::fill(_nG0.begin(), _nG0.end(), 0); thrust::fill(_nG1.begin(), _nG1.end(), 0); - baseLineG0[0] = 0.; - baseLineG1[0] = 0.; + baseLineG0[0] = -3; + baseLineG1[0] = -4; + baseLineG0_update[0] = 0; + baseLineG1_update[0] = 0; + const uint64_t *sideCD = (uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); psrdada_cpp::effelsberg::edd::gating<<<1024 , 1024>>>( thrust::raw_pointer_cast(G0.data()), thrust::raw_pointer_cast(G1.data()), sideCD, - G0.size(), G0.size(), 0, 1, + G0.size(), blockSize, 0, 1, 0, - -3., -4, thrust::raw_pointer_cast(baseLineG0.data()), thrust::raw_pointer_cast(baseLineG1.data()), + thrust::raw_pointer_cast(baseLineG0_update.data()), + thrust::raw_pointer_cast(baseLineG1_update.data()), thrust::raw_pointer_cast(_nG0.data()), thrust::raw_pointer_cast(_nG1.data()) ); @@ -99,27 +194,31 @@ TEST(GatedSpectrometer, GatingKernel) EXPECT_EQ(_nG0[0], G0.size()); EXPECT_EQ(_nG1[0], 0u); - EXPECT_FLOAT_EQ(baseLineG0[0] / (_nG0[0] + 1E-127), 42.f); - EXPECT_FLOAT_EQ(baseLineG1[0] / (_nG1[0] + 1E-127), 0.f); + EXPECT_FLOAT_EQ(42.f, baseLineG0_update[0] / (_nG0[0] + 1E-121)); + EXPECT_FLOAT_EQ(0.f, baseLineG1_update[0] / (_nG1[0] + 1E-121)); } // everything to G1 // with baseline -5 { thrust::fill(_nG0.begin(), _nG0.end(), 0); thrust::fill(_nG1.begin(), _nG1.end(), 0); - baseLineG0[0] = 0.; - baseLineG1[0] = 0.; + baseLineG0[0] = 5.; + baseLineG1[0] = -2; + baseLineG0_update[0] = 0; + baseLineG1_update[0] = 0; + thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L); const uint64_t *sideCD = (uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>( thrust::raw_pointer_cast(G0.data()), thrust::raw_pointer_cast(G1.data()), sideCD, - G0.size(), G0.size(), 0, 1, + G0.size(), blockSize, 0, 1, 0, - 5., -2., thrust::raw_pointer_cast(baseLineG0.data()), thrust::raw_pointer_cast(baseLineG1.data()), + thrust::raw_pointer_cast(baseLineG0_update.data()), + thrust::raw_pointer_cast(baseLineG1_update.data()), thrust::raw_pointer_cast(_nG0.data()), thrust::raw_pointer_cast(_nG1.data()) ); @@ -134,13 +233,12 @@ TEST(GatedSpectrometer, GatingKernel) EXPECT_EQ(_nG0[0], 0u); EXPECT_EQ(_nG1[0], G1.size()); - EXPECT_FLOAT_EQ(baseLineG0[0] / (_nG0[0] + 1E-127), 0.); - EXPECT_FLOAT_EQ(baseLineG1[0] / (_nG1[0] + 1E-127), 42.); + + EXPECT_FLOAT_EQ(0.f, baseLineG0_update[0] / (_nG0[0] + 1E-121)); + EXPECT_FLOAT_EQ(42.f, baseLineG1_update[0] / (_nG1[0] + 1E-121)); } } - - TEST(GatedSpectrometer, array_sum) { const size_t NBLOCKS = 16 * 32; @@ -163,10 +261,3 @@ TEST(GatedSpectrometer, array_sum) { EXPECT_EQ(size_t(blr[0]), inputLength); } - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - - return RUN_ALL_TESTS(); -} -