From 15f1eb520f8bd4c3a787ad6f7bc09bafed346cbd Mon Sep 17 00:00:00 2001 From: Tobias Winchen <tobias.winchen@rwth-aachen.de> Date: Thu, 14 May 2020 09:14:19 +0200 Subject: [PATCH] Power spectrum with new IO classes --- .../effelsberg/edd/DadaBufferLayout.hpp | 4 + .../effelsberg/edd/GatedSpectrometer.cuh | 331 +++++++++++++----- .../edd/detail/GatedSpectrometer.cu | 158 +++++---- .../edd/detail/GatedStokesSpectrometer.cu | 10 +- .../effelsberg/edd/src/DadaBufferLayout.cpp | 6 +- .../edd/src/GatedSpectrometer_cli.cu | 6 +- 6 files changed, 351 insertions(+), 164 deletions(-) diff --git a/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp b/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp index 2c234341..96cf5980 100644 --- a/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp +++ b/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp @@ -24,12 +24,16 @@ class DadaBufferLayout public: + // input key of the dadad buffer + // size of spead heaps in bytes + // number of side channels DadaBufferLayout(key_t input_key , size_t heapSize, size_t nSideChannels); key_t getInputkey() const; size_t getBufferSize() const; + // get size of heaps in bytes size_t getHeapSize() const; size_t getNSideChannels() const; diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index 03cd6d45..57f01976 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -41,17 +41,30 @@ typedef float IntegratedPowerType; -struct InputDataStream +/// Input data and intermediate processing data for one polarization +struct PolarizationData { - virtual void swap() = 0; - virtual void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout) = 0; - virtual void getFromBlock(RawBytes &block, DadaBufferLayout &dadaBufferLayout, cudaStream_t &_h2d_stream) = 0; -}; + DadaBufferLayout _dadaBufferLayout; + size_t _fft_length; + size_t _batch; + // a buffer contains batch * fft_length samples + PolarizationData(size_t fft_length, size_t batch, const DadaBufferLayout + &dadaBufferLayout) : _fft_length(fft_length), _batch(batch), _dadaBufferLayout(dadaBufferLayout) + { + _raw_voltage.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); + BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " << _raw_voltage.size(); + + _baseLineG0.resize(1); + _baseLineG0_update.resize(1); + _baseLineG1.resize(1); + _baseLineG1_update.resize(1); + _channelised_voltage_G0.resize((_fft_length / 2 + 1) * _batch); + _channelised_voltage_G1.resize((_fft_length / 2 + 1) * _batch); + _sideChannelData.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps()); + BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " << _channelised_voltage_G0.size(); + }; -/// Input data and intermediate processing data for one polarization -struct PolarizationData : public InputDataStream -{ /// Raw ADC Voltage DoubleDeviceBuffer<RawVoltageType> _raw_voltage; /// Side channel data @@ -79,80 +92,133 @@ struct PolarizationData : public InputDataStream _sideChannelData.swap(); } - void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout) - { - _raw_voltage.resize(dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); - BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " << _raw_voltage.size(); - - _baseLineG0.resize(1); - _baseLineG0_update.resize(1); - _baseLineG1.resize(1); - _baseLineG1_update.resize(1); - _channelised_voltage_G0.resize(_nchans * _batch); - _channelised_voltage_G1.resize(_nchans * _batch); - BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " << _channelised_voltage_G0.size(); - } - - void getFromBlock(RawBytes &block, DadaBufferLayout &dadaBufferLayout, cudaStream_t &_h2d_stream) + void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream) { BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes() - << ", dataBlockBytes = " << dadaBufferLayout.sizeOfData() << "\n"; + << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n"; CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage.a_ptr()), static_cast<void *>(block.ptr()), - dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice, + _dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice, _h2d_stream)); CUDA_ERROR_CHECK(cudaMemcpyAsync( static_cast<void *>(_sideChannelData.a_ptr()), - static_cast<void *>(block.ptr() + dadaBufferLayout.sizeOfData() + dadaBufferLayout.sizeOfGap()), - dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream)); + static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()), + _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream)); BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16) << std::setfill('0') << std::hex << - (reinterpret_cast<uint64_t*>(block.ptr() + dadaBufferLayout.sizeOfData() - + dadaBufferLayout.sizeOfGap()))[0] << + (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + + _dadaBufferLayout.sizeOfGap()))[0] << std::dec; } +}; -}; +/// Input data and intermediate processing data for two polarizations +struct DualPolarizationData +{ + DadaBufferLayout _dadaBufferLayout; + DualPolarizationData(size_t fft_length, size_t batch, const DadaBufferLayout + &dadaBufferLayout) : polarization0(fft_length, batch, dadaBufferLayout), + polarization1(fft_length, batch, dadaBufferLayout), + _dadaBufferLayout(dadaBufferLayout) + { + }; -//struct DualPolarizationData : public InputDataStream -//{ -// PolarizationData pol0, pol1; -// void swap() -// { -// pol0.swap(); pol1.swap(); -// } -// -// void resize(size_t _nchans, size_t _batch, const DadaBufferLayout &dadaBufferLayout) -// { -// BOOST_LOG_TRIVIAL(debug) << " Pol0"; -// pol0.resize(_nchans, _batch, dadaBufferLayout); -// BOOST_LOG_TRIVIAL(debug) << " Pol1"; -// pol1.resize(_nchans, _batch, dadaBufferLayout); -// } -//}; + PolarizationData polarization0, polarization1; + void swap() + { + polarization0.swap(); polarization1.swap(); + } + + void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream) + { + // Copy the data with stride to the GPU: + // CPU: P1P2P1P2P1P2 ... + // GPU: P1P1P1 ... P2P2P2 ... + // If this is a bottleneck the gating kernel could sort the layout out + // during copy + int heapsize_bytes = _dadaBufferLayout.getHeapSize(); + CUDA_ERROR_CHECK(cudaMemcpy2DAsync( + static_cast<void *>(polarization0._raw_voltage.a_ptr()), + heapsize_bytes, + static_cast<void *>(block.ptr()), + 2 * heapsize_bytes, + heapsize_bytes, _dadaBufferLayout.sizeOfData() / heapsize_bytes/ 2, + cudaMemcpyHostToDevice, _h2d_stream)); + + CUDA_ERROR_CHECK(cudaMemcpy2DAsync( + static_cast<void *>(polarization1._raw_voltage.a_ptr()), + heapsize_bytes, + static_cast<void *>(block.ptr()) + heapsize_bytes, + 2 * heapsize_bytes, + heapsize_bytes, _dadaBufferLayout.sizeOfData() / heapsize_bytes/ 2, + cudaMemcpyHostToDevice, _h2d_stream)); + + CUDA_ERROR_CHECK(cudaMemcpy2DAsync( + static_cast<void *>(polarization0._sideChannelData.a_ptr()), + sizeof(uint64_t), + static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()), + 2 * sizeof(uint64_t), + sizeof(uint64_t), + _dadaBufferLayout.sizeOfSideChannelData() / 2 / sizeof(uint64_t), + cudaMemcpyHostToDevice, _h2d_stream)); + + CUDA_ERROR_CHECK(cudaMemcpy2DAsync( + static_cast<void *>(polarization1._sideChannelData.a_ptr()), + sizeof(uint64_t), + static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap() + sizeof(uint64_t)), + 2 * sizeof(uint64_t), + sizeof(uint64_t), + _dadaBufferLayout.sizeOfSideChannelData() / 2 / sizeof(uint64_t), cudaMemcpyHostToDevice, _h2d_stream)); + + BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16) + << std::setfill('0') << std::hex << + (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + + _dadaBufferLayout.sizeOfGap()))[0] << std::dec; + } + +}; -// Output data for one gate N = 1 for one pol, or 4 for full stokes +/// INterface for the processed output data stream struct OutputDataStream { - /// Reset outptu for new integration + size_t _nchans; + size_t _blocks; + + OutputDataStream(size_t nchans, size_t blocks) : _nchans(nchans), _blocks(blocks) + { + } + + /// Reset output to for new integration virtual void reset(cudaStream_t &_proc_stream) = 0; + /// Swap output buffers virtual void swap() = 0; - virtual void resize(size_t size, size_t blocks) = 0; + // output buffer on the host DoublePinnedHostBuffer<char> _host_power; + + // copy data from internal buffers of the implementation to the host output + // buffer + virtual void data2Host(cudaStream_t &_d2h_stream) = 0; }; // Output data for one gate, single power spectrum struct PowerSpectrumOutput { + PowerSpectrumOutput(size_t size, size_t blocks) + { + BOOST_LOG_TRIVIAL(debug) << "Setting size of power spectrum output size = " << size << ", blocks = " << blocks; + data.resize(size * blocks); + _noOfBitSets.resize(blocks); + } + /// spectrum data DoubleDeviceBuffer<IntegratedPowerType> data; @@ -162,7 +228,7 @@ struct PowerSpectrumOutput /// Reset outptu for new integration void reset(cudaStream_t &_proc_stream) { - thrust::fill(thrust::cuda::par.on(_proc_stream), data.a().begin(),data.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), data.a().begin(), data.a().end(), 0.); thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L); } @@ -172,20 +238,21 @@ struct PowerSpectrumOutput data.swap(); _noOfBitSets.swap(); } - - /// Resize all buffers - void resize(size_t size, size_t blocks) - { - data.resize(size * blocks); - _noOfBitSets.resize(blocks); - } }; struct GatedPowerSpectrumOutput : public OutputDataStream { + + GatedPowerSpectrumOutput(size_t nchans, size_t blocks) : OutputDataStream(nchans, blocks), + G0(nchans, blocks), G1(nchans, blocks) + { + // on the host both power are stored in the same data buffer together with + // the number of bit sets + _host_power.resize( 2 * ( _nchans * sizeof(IntegratedPowerType) + sizeof(size_t) ) * G0._noOfBitSets.size()); + } + PowerSpectrumOutput G0, G1; - size_t _nchans; void reset(cudaStream_t &_proc_stream) { @@ -201,19 +268,6 @@ struct GatedPowerSpectrumOutput : public OutputDataStream _host_power.swap(); } - /// Resize all buffers - void resize(size_t size, size_t blocks) - { - // ToDo: size passed in constructor, also number of blocks. - G0.resize(size, blocks); - G1.resize(size, blocks); - _nchans = size; - - // on the host both power are stored in the same data buffer together with - // the number of bit sets - _host_power.resize( 2 * ( size * sizeof(IntegratedPowerType) + sizeof(size_t) ) * G0._noOfBitSets.size()); - } - void data2Host(cudaStream_t &_d2h_stream) { // copy data to host if block is finished @@ -301,9 +355,11 @@ struct FullStokesOutput : public OutputDataStream } }; - +/* struct GatedFullStokesOutput: public OutputDataStream { + + FullStokesOutput G0, G1; void reset(cudaStream_t &_proc_stream) { @@ -324,8 +380,112 @@ struct GatedFullStokesOutput: public OutputDataStream G0.resize(size, blocks); G1.resize(size, blocks); } -}; + void data2Host(cudaStream_t &_d2h_stream) + { + for (size_t i = 0; i < G0._noOfBitSets.size(); i++) + { + size_t memslicesize = (_nchans * sizeof(IntegratedPowerType)); + size_t memOffset = 8 * i * (memslicesize + + sizeof(size_t)); + // Copy II QQ UU VV + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset) , + static_cast<void *>(G0.I.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 1 * memslicesize) , + static_cast<void *>(G1.I.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * memslicesize) , + static_cast<void *>(G0.Q.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 3 * memslicesize) , + static_cast<void *>(G1.Q.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 4 * memslicesize) , + static_cast<void *>(G0.U.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 5 * memslicesize) , + static_cast<void *>(G1.U.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 6 * memslicesize) , + static_cast<void *>(G0.V.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 7 * memslicesize) , + static_cast<void *>(G1.V.b_ptr() + i * memslicesize), + _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + + // Copy SCI + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize), + static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 1 * sizeof(size_t)), + static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 2 * sizeof(size_t)), + static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 3 * sizeof(size_t)), + static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 4 * sizeof(size_t)), + static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 5 * sizeof(size_t)), + static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 6 * sizeof(size_t)), + static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 7 * sizeof(size_t)), + static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + + } + + + + } + +}; +*/ @@ -335,7 +495,10 @@ struct GatedFullStokesOutput: public OutputDataStream bit set in side channel data. */ -template <class HandlerType > class GatedSpectrometer { +template <class HandlerType, + class InputType, + class OutputType + > class GatedSpectrometer { public: /** * @brief Constructor @@ -384,6 +547,15 @@ public: */ bool operator()(RawBytes &block); + // Processing strategy for single pol mode + void process(PolarizationData *inputDataStream, GatedPowerSpectrumOutput *outputDataStream); + + // Processing strategy for dual pol power mode + //void process(DualPolarizationData*inputDataStream, GatedPowerSpectrumOutput *outputDataStream); + + // Processing strategy for full stokes mode + //void process(DualPolarizationData*inputDataStream, FullStokesOutput *outputDataStream); + private: // gate the data from the input stream and fft data per gate. Number of bit // sets in every gate is stored in the output datasets @@ -408,22 +580,17 @@ private: uint64_t _nchans; uint64_t _nBlocks; uint64_t _call_count; - double _processing_efficiency; std::unique_ptr<Unpacker> _unpacker; - OutputDataStream* outputDataStream; - InputDataStream* inputDataStream; - - std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector; + OutputType* outputDataStream; + InputType* inputDataStream; // Temporary processing block - // ToDo: Use inplace FFT to avoid temporary coltage array + // ToDo: Use inplace FFT to avoid temporary voltage array thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0; thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1; - - cudaStream_t _h2d_stream; cudaStream_t _proc_stream; cudaStream_t _d2h_stream; diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index c9e8c9a7..5c631079 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -17,7 +17,7 @@ namespace psrdada_cpp { namespace effelsberg { namespace edd { -// Reduce thread local vatiable v in shared array x, so that x[0] +// Reduce thread local vatiable v in shared array x, so that x[0] contains sum template<typename T> __device__ void sum_reduce(T *x, const T &v) { @@ -32,9 +32,10 @@ __device__ void sum_reduce(T *x, const T &v) } -// If one of the side channel items is lsot, then both are considered as lost +// If one of the side channel items is lost, then both are considered as lost // here -__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t* __restrict__ B, size_t N) +__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t* + __restrict__ B, size_t N) { for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); i += blockDim.x * gridDim.x) @@ -146,21 +147,16 @@ __global__ void update_baselines(float* __restrict__ baseLineG0, - - - - - -template <class HandlerType> -GatedSpectrometer<HandlerType>::GatedSpectrometer( - const DadaBufferLayout &dadaBufferLayout, - std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate, - std::size_t nbits, float input_level, float output_level, - HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout), - _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit), - _fft_length(fft_length), - _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0), - _call_count(0), _nsamps_per_heap(4096), _processing_efficiency(0.){ +template <class HandlerType, class InputType, class OutputType> +GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer( + const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel, + std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate, + std::size_t nbits, float input_level, float output_level, HandlerType + &handler) : _dadaBufferLayout(dadaBufferLayout), + _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit), + _fft_length(fft_length), _naccumulate(naccumulate), _nbits(nbits), + _handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096) +{ // Sanity checks assert(((_nbits == 12) || (_nbits == 8))); @@ -184,7 +180,6 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer( assert(selectedBit < 64); // Sanity check of selected bit _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits; - _nsamps_per_output_spectra = fft_length * naccumulate; if (_nsamps_per_output_spectra <= _nsamps_per_buffer) { // one buffer block is used for one or multiple output spectra @@ -200,10 +195,12 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer( assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra); _nBlocks = N; } - BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << _nBlocks << " into one spectra."; + BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << + " samples from " << _nBlocks << " into one output spectrum."; _nchans = _fft_length / 2 + 1; int batch = _nsamps_per_buffer / _fft_length; + float dof = 2 * _naccumulate; float scale = std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2); @@ -213,27 +210,28 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer( << "Correction factors for 8-bit conversion: offset = " << offset << ", scaling = " << scaling; - BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan"; int n[] = {static_cast<int>(_fft_length)}; + BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n" + << " fft_length = " << _fft_length << "\n" + << " n[0] = " << n[0] << "\n" + << " _nchans = " << _nchans << "\n" + << " batch = " << batch << "\n"; + + + ; CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL, 1, _nchans, CUFFT_R2C, batch)); - cufftSetStream(_fft_plan, _proc_stream); BOOST_LOG_TRIVIAL(debug) << "Allocating memory"; - // if singlePol - inputDataStream = new PolarizationData(); - inputDataStream->resize(_nchans, batch, _dadaBufferLayout); + inputDataStream = new InputType(fft_length, batch, _dadaBufferLayout); _unpacked_voltage_G0.resize(_nsamps_per_buffer); _unpacked_voltage_G1.resize(_nsamps_per_buffer); BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size(); - outputDataStream = new GatedPowerSpectrumOutput(); - outputDataStream->resize(_nchans, batch / (_naccumulate / _nBlocks)); - - + outputDataStream = new OutputType(_nchans, batch / (_naccumulate / _nBlocks)); CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream)); @@ -241,14 +239,13 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer( CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream)); _unpacker.reset(new Unpacker(_proc_stream)); - _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / _nBlocks, scaling, - offset, _proc_stream)); } // constructor -template <class HandlerType> -GatedSpectrometer<HandlerType>::~GatedSpectrometer() { +template <class HandlerType, class InputType, class OutputType> +GatedSpectrometer<HandlerType, InputType, OutputType>::~GatedSpectrometer() { BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer"; + cudaDeviceSynchronize(); if (!_fft_plan) cufftDestroy(_fft_plan); cudaStreamDestroy(_h2d_stream); @@ -257,14 +254,14 @@ GatedSpectrometer<HandlerType>::~GatedSpectrometer() { } -template <class HandlerType> -void GatedSpectrometer<HandlerType>::init(RawBytes &block) { +template <class HandlerType, class InputType, class OutputType> +void GatedSpectrometer<HandlerType, InputType, OutputType>::init(RawBytes &block) { BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called"; std::stringstream headerInfo; headerInfo << "\n" << "# Gated spectrometer parameters: \n" << "fft_length " << _fft_length << "\n" - << "nchannels " << _fft_length << "\n" + << "nchannels " << _fft_length /2 + 1 << "\n" << "naccumulate " << _naccumulate << "\n" << "selected_side_channel " << _selectedSideChannel << "\n" << "selected_bit " << _selectedBit << "\n" @@ -288,8 +285,8 @@ void GatedSpectrometer<HandlerType>::init(RawBytes &block) { -template <class HandlerType> -void GatedSpectrometer<HandlerType>::gated_fft( +template <class HandlerType, class InputType, class OutputType> +void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft( PolarizationData &data, thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1 @@ -330,6 +327,8 @@ void GatedSpectrometer<HandlerType>::gated_fft( ); } + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); // only few output blocks per input block thus execution on only one thread. // Important is that the execution is async on the GPU. update_baselines<<<1,1,0, _proc_stream>>>( @@ -342,20 +341,29 @@ void GatedSpectrometer<HandlerType>::gated_fft( _noOfBitSetsIn_G0.size() ); + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1"; + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage"; UnpackedVoltageType *_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G0.data()); + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage"; ChannelisedVoltageType *_channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G0.data()); + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, (cufftComplex *)_channelised_voltage_ptr)); + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2"; _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data()); _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data()); CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, (cufftComplex *)_channelised_voltage_ptr)); + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); BOOST_LOG_TRIVIAL(debug) << "Exit processing"; } // process @@ -365,8 +373,8 @@ void GatedSpectrometer<HandlerType>::gated_fft( -template <class HandlerType> -bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { +template <class HandlerType, class InputType, class OutputType> +bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes &block) { ++_call_count; BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = " << _call_count << ")"; @@ -382,7 +390,7 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { // Copy data to device CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream)); inputDataStream->swap(); - inputDataStream->getFromBlock(block, _dadaBufferLayout, _h2d_stream); + inputDataStream->getFromBlock(block, _h2d_stream); if (_call_count == 1) { @@ -402,43 +410,21 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { outputDataStream->reset(_proc_stream); } + BOOST_LOG_TRIVIAL(debug) << "Processing block."; + cudaDeviceSynchronize(); + process(inputDataStream, outputDataStream); + cudaDeviceSynchronize(); + BOOST_LOG_TRIVIAL(debug) << "Processing block finished."; /// For one pol input and power out /// ToDo: For two pol input and power out /// ToDo: For two pol input and stokes out - PolarizationData *polData = dynamic_cast<PolarizationData*>(inputDataStream); - GatedPowerSpectrumOutput *powOut = dynamic_cast<GatedPowerSpectrumOutput*>(outputDataStream); - gated_fft(*polData, powOut->G0._noOfBitSets.a(), powOut->G1._noOfBitSets.a()); - - -// float2 const* input_ptr; -// IntegratedPowerType * output_ptr; -// = thrust::raw_pointer_cast(input.data()); -// T * output_ptr = thrust::raw_pointer_cast(output.data()); -// input_ptr = - kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( - thrust::raw_pointer_cast(polData->_channelised_voltage_G0.data()), - thrust::raw_pointer_cast(powOut->G0.data.a().data()), - _nchans, - polData->_channelised_voltage_G0.size() / _nchans, - _naccumulate / _nBlocks, - 1, 0., 1, 0); - - kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( - thrust::raw_pointer_cast(polData->_channelised_voltage_G1.data()), - thrust::raw_pointer_cast(powOut->G1.data.a().data()), - _nchans, - polData->_channelised_voltage_G1.size() / _nchans, - _naccumulate / _nBlocks, - 1, 0., 1, 0); - - CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); if ((_call_count == 2) || (!newBlock)) { return false; } - powOut->data2Host(_d2h_stream); + outputDataStream->data2Host(_d2h_stream); if (_call_count == 3) { return false; } @@ -468,9 +454,9 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { // BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")"; // // // Wrap in a RawBytes object here; - RawBytes bytes(reinterpret_cast<char *>(powOut->_host_power.b_ptr()), - powOut->_host_power.size(), - powOut->_host_power.size()); + RawBytes bytes(reinterpret_cast<char *>(outputDataStream->_host_power.b_ptr()), + outputDataStream->_host_power.size(), + outputDataStream->_host_power.size()); BOOST_LOG_TRIVIAL(debug) << "Calling handler"; // The handler can't do anything asynchronously without a copy here // as it would be unsafe (given that it does not own the memory it @@ -480,6 +466,34 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { return false; // } // operator () + + +template <class HandlerType, class InputType, class OutputType> +void GatedSpectrometer<HandlerType, InputType, OutputType>::process(PolarizationData *inputDataStream, GatedPowerSpectrumOutput *outputDataStream) +{ + gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a()); + + kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()), + thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()), + _nchans, + inputDataStream->_channelised_voltage_G0.size() / _nchans, + _naccumulate / _nBlocks, + 1, 0., 1, 0); + + kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G1.data()), + thrust::raw_pointer_cast(outputDataStream->G1.data.a().data()), + _nchans, + inputDataStream->_channelised_voltage_G1.size() / _nchans, + _naccumulate / _nBlocks, + 1, 0., 1, 0); + + CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); +} + + + } // edd } // effelsberg } // psrdada_cpp diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu index e67bfaa8..afcb8e17 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu @@ -148,7 +148,7 @@ __global__ void update_baselines(float* __restrict__ baseLineG0, -template <class HandlerType> +template <class HandlerType, class InputType> GatedStokesSpectrometer<HandlerType>::GatedStokesSpectrometer( const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate, @@ -250,7 +250,7 @@ GatedStokesSpectrometer<HandlerType>::GatedStokesSpectrometer( -template <class HandlerType> +template <class HandlerType, class InputType> GatedStokesSpectrometer<HandlerType>::~GatedStokesSpectrometer() { BOOST_LOG_TRIVIAL(debug) << "Destroying GatedStokesSpectrometer"; if (!_fft_plan) @@ -262,7 +262,7 @@ GatedStokesSpectrometer<HandlerType>::~GatedStokesSpectrometer() { -template <class HandlerType> +template <class HandlerType, class InputType> void GatedStokesSpectrometer<HandlerType>::init(RawBytes &block) { BOOST_LOG_TRIVIAL(debug) << "GatedStokesSpectrometer init called"; std::stringstream headerInfo; @@ -293,7 +293,7 @@ void GatedStokesSpectrometer<HandlerType>::init(RawBytes &block) { -template <class HandlerType> +template <class HandlerType, class InputType> void GatedStokesSpectrometer<HandlerType>::gated_fft( PolarizationData &data, thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0, @@ -366,7 +366,7 @@ void GatedStokesSpectrometer<HandlerType>::gated_fft( } // process -template <class HandlerType> +template <class HandlerType, class InputType> bool GatedStokesSpectrometer<HandlerType>::operator()(RawBytes &block) { ++_call_count; BOOST_LOG_TRIVIAL(debug) << "GatedStokesSpectrometer operator() called (count = " diff --git a/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp b/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp index abbf5b97..12acc268 100644 --- a/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp +++ b/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp @@ -20,10 +20,12 @@ DadaBufferLayout::DadaBufferLayout(key_t input_key, size_t heapSize, size_t nSid _gapSize = (_bufferSize - _nHeaps * totalHeapSize); _dataBlockBytes = _nHeaps * _heapSize; - BOOST_LOG_TRIVIAL(debug) << "Memory configuration of dada buffer: \n" + BOOST_LOG_TRIVIAL(debug) << "Memory configuration of dada buffer '" << _input_key << "' with " << nSideChannels << " side channels items and heapsize " << heapSize << " byte: \n" + << " total size of buffer: " << _bufferSize << " byte\n" << " number of heaps per buffer: " << _nHeaps << "\n" + << " datablock size in buffer: " << _dataBlockBytes << " byte\n" << " resulting gap: " << _gapSize << " byte\n" - << " datablock size in buffer: " << _dataBlockBytes << " byte\n"; + << " size of sidechannel data: " << _sideChannelSize << " byte\n"; } key_t DadaBufferLayout::getInputkey() const diff --git a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu index bb1f876e..4abf9426 100644 --- a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu +++ b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu @@ -38,7 +38,7 @@ void launchSpectrometer(const effelsberg::edd::DadaBufferLayout if (output_type == "file") { SimpleFileWriter sink(filename); - effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer(dadaBufferLayout, + effelsberg::edd::GatedSpectrometer<decltype(sink), effelsberg::edd::PolarizationData, effelsberg::edd::GatedPowerSpectrumOutput> spectrometer(dadaBufferLayout, selectedSideChannel, selectedBit, fft_length, naccumulate, nbits, input_level, output_level, sink); @@ -50,7 +50,7 @@ void launchSpectrometer(const effelsberg::edd::DadaBufferLayout else if (output_type == "dada") { DadaOutputStream sink(string_to_key(filename), log); - effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer(dadaBufferLayout, + effelsberg::edd::GatedSpectrometer<decltype(sink), effelsberg::edd::PolarizationData, effelsberg::edd::GatedPowerSpectrumOutput> spectrometer(dadaBufferLayout, selectedSideChannel, selectedBit, fft_length, naccumulate, nbits, input_level, output_level, sink); @@ -62,7 +62,7 @@ void launchSpectrometer(const effelsberg::edd::DadaBufferLayout else if (output_type == "profile") { NullSink sink; - effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer(dadaBufferLayout, + effelsberg::edd::GatedSpectrometer<decltype(sink), effelsberg::edd::PolarizationData, effelsberg::edd::GatedPowerSpectrumOutput> spectrometer(dadaBufferLayout, selectedSideChannel, selectedBit, fft_length, naccumulate, nbits, input_level, output_level, sink); -- GitLab