diff --git a/psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh b/psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh index 82e7d3cdf89136face34313de39b94907dae6343..6302d105d6dd02c35460e3130648b4ee3322effa 100644 --- a/psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh +++ b/psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh @@ -31,7 +31,7 @@ void detect_and_accumulate(float2 const* __restrict__ in, int8_t* __restrict__ o sum += x + y; } size_t toff = out_offset * nchans + currentOutputSpectra * nchans *stride; - out[toff + i] = (int8_t) ((sum - offset)/scale); + out[toff + i] += (int8_t) ((sum - offset)/scale); } } @@ -56,7 +56,7 @@ void detect_and_accumulate(float2 const* __restrict__ in, float* __restrict__ ou sum += x + y; } size_t toff = out_offset * nchans + currentOutputSpectra * nchans * stride; - out[i + toff] = sum; + out[i + toff] += sum; } } diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index c7c4f92105d29ae5d7fee0de1e51fdd4024fea2c..56e6ff7c00416f952eb98a453f9de78791bb1345 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -107,6 +107,8 @@ private: std::size_t _gapSize; std::size_t _dataBlockBytes; std::size_t _batch; + std::size_t _nsamps_per_output_spectra; + std::size_t _nsamps_per_buffer; HandlerType &_handler; cufftHandle _fft_plan; @@ -115,20 +117,21 @@ private: std::unique_ptr<Unpacker> _unpacker; std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector; + // Input data DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db; - DoubleDeviceBuffer<IntegratedPowerType> _power_db; DoubleDeviceBuffer<int64_t> _sideChannelData_db; + + // Output data + DoubleDeviceBuffer<IntegratedPowerType> _power_db; DoubleDeviceBuffer<size_t> _noOfBitSetsInSideChannel; - size_t _noOfBitSetsInSideChannel_host [2]; + DoublePinnedHostBuffer<char> _host_power_db; + // Intermediate process steps thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0; thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1; thrust::device_vector<ChannelisedVoltageType> _channelised_voltage; - thrust::device_vector<UnpackedVoltageType> _baseLineN; - DoublePinnedHostBuffer<char> _host_power_db; - cudaStream_t _h2d_stream; cudaStream_t _proc_stream; cudaStream_t _d2h_stream; diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index c5e8a9be357aaf3f5b366a702944f0cbf6530010..fde76c4c21256ab5a14aa48f535d91aa71dc1e4c 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -2,8 +2,10 @@ #include "psrdada_cpp/common.hpp" #include "psrdada_cpp/cuda_utils.hpp" #include "psrdada_cpp/raw_bytes.hpp" + #include <cuda.h> #include <cuda_profiler_api.h> +#include <thrust/system/cuda/execution_policy.h> #include <iostream> #include <cstring> @@ -135,6 +137,7 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( _nHeaps = buffer_bytes / _totalHeapSize; _gapSize = (buffer_bytes - _nHeaps * _totalHeapSize); _dataBlockBytes = _nHeaps * _speadHeapSize; + assert((nSideChannels == 0) || (selectedSideChannel < nSideChannels)); // Sanity check of side channel value @@ -145,10 +148,29 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( << " resulting gap: " << _gapSize << " byte\n" << " datablock size in buffer: " << _dataBlockBytes << " byte\n"; - std::size_t nsamps_per_buffer = _dataBlockBytes * 8 / nbits; + _nsamps_per_buffer = _dataBlockBytes * 8 / nbits; + + _nsamps_per_output_spectra = fft_length * naccumulate; + int nBlocks; + if (_nsamps_per_output_spectra <= _nsamps_per_buffer) + { // one buffer block is used for one or multiple output spectra + size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra; + // All data in one block has to be used + assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer); + nBlocks = 1; + } + else + { // multiple blocks are integrated intoone output + size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer; + // All data in multiple blocks has to be used + assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra); + nBlocks = N; + } + BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra."; + std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t); _nchans = _fft_length / 2 + 1; - int batch = nsamps_per_buffer / _fft_length; + int batch = _nsamps_per_buffer / _fft_length; float dof = 2 * _naccumulate; float scale = std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2); @@ -169,8 +191,8 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( _sideChannelData_db.resize(_sideChannelSize * _nHeaps); BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " << _raw_voltage_db.size(); - _unpacked_voltage_G0.resize(nsamps_per_buffer); - _unpacked_voltage_G1.resize(nsamps_per_buffer); + _unpacked_voltage_G0.resize(_nsamps_per_buffer); + _unpacked_voltage_G1.resize(_nsamps_per_buffer); _baseLineN.resize(array_sum_Nthreads); BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " @@ -178,11 +200,17 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( _channelised_voltage.resize(_nchans * batch); BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " << _channelised_voltage.size(); - _power_db.resize(_nchans * batch / _naccumulate * 2); // hold on and off spectra to simplify output - BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2; + _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2); // hold on and off spectra to simplify output + thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.); + thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.); + BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2; + + _noOfBitSetsInSideChannel.resize( batch / (_naccumulate / nBlocks)); + thrust::fill( _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.); + thrust::fill( _noOfBitSetsInSideChannel.b().begin(), _noOfBitSetsInSideChannel.b().end(), 0.); + // on the host both power are stored in the same data buffer together with // the number of bit sets - _noOfBitSetsInSideChannel.resize( batch / _naccumulate); _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsInSideChannel.size()); CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); @@ -191,7 +219,7 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream)); _unpacker.reset(new Unpacker(_proc_stream)); - _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate, scaling, + _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling, offset, _proc_stream)); } // constructor @@ -329,22 +357,32 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b if (_call_count == 1) { return false; } + // process data - //process data - _power_db.swap(); - _noOfBitSetsInSideChannel.swap(); + // only if a newblock is started the output buffer is swapped. Otherwise the + // new data is added to it + bool newBlock = false; + if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device + { + BOOST_LOG_TRIVIAL(debug) << "Starting new output block."; + newBlock = true; + _power_db.swap(); + _noOfBitSetsInSideChannel.swap(); + // move to specific stream! + thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.); + thrust::fill( _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.); + } process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsInSideChannel.a()); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); - if (_call_count == 2) { + if ((_call_count == 2) || (!newBlock)) { return false; } - //copy data to host + // copy data to host if block is finished CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); _host_power_db.swap(); - std::swap(_noOfBitSetsInSideChannel_host[0], _noOfBitSetsInSideChannel_host[1]); for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++) { @@ -361,7 +399,7 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); -} + } BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host"; @@ -370,7 +408,7 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b } // calculate off value - BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count << " with " << _noOfBitSetsInSideChannel.size() << " output heaps:"; + BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count << " with " << _noOfBitSetsInSideChannel.size() << " output heaps:"; for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++) { size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); @@ -382,10 +420,6 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b BOOST_LOG_TRIVIAL(info) << " " << i << ": No of bit set in side channel: " << *on_values << " / " << *off_values << std::endl; } - - // call handler - - // Wrap in a RawBytes object here; RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()), _host_power_db.size(),