From f2332816fc23b888d1fcdbc3449e40a46bd49cfe Mon Sep 17 00:00:00 2001 From: Tobias Winchen <tobias.winchen@rwth-aachen.de> Date: Thu, 14 May 2020 14:02:27 +0200 Subject: [PATCH] Added stokes mode to gated_cli --- .../effelsberg/edd/DadaBufferLayout.hpp | 2 + .../effelsberg/edd/GatedSpectrometer.cuh | 138 +++++++---- .../edd/detail/GatedSpectrometer.cu | 171 +++++++------ .../effelsberg/edd/src/DadaBufferLayout.cpp | 12 +- .../effelsberg/edd/src/GatedSpectrometer.cu | 96 ++++++-- .../edd/src/GatedSpectrometer_cli.cu | 187 +++++++++----- .../edd/test/src/GatedSpectrometerTest.cu | 230 +++++++++--------- 7 files changed, 512 insertions(+), 324 deletions(-) diff --git a/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp b/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp index 96cf5980..02e0ca3e 100644 --- a/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp +++ b/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp @@ -27,7 +27,9 @@ class DadaBufferLayout // input key of the dadad buffer // size of spead heaps in bytes // number of side channels + DadaBufferLayout(); DadaBufferLayout(key_t input_key , size_t heapSize, size_t nSideChannels); + void intitialize(key_t input_key , size_t heapSize, size_t nSideChannels); key_t getInputkey() const; diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index aa578153..68d4c6e1 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -44,14 +44,8 @@ typedef float IntegratedPowerType; /// Input data and intermediate processing data for one polarization struct PolarizationData { - DadaBufferLayout _dadaBufferLayout; - size_t _fft_length; - size_t _batch; - - // a buffer contains batch * fft_length samples - PolarizationData(size_t fft_length, size_t batch, const DadaBufferLayout - &dadaBufferLayout); - + size_t _nbits; + PolarizationData(size_t nbits): _nbits(nbits) {}; /// Raw ADC Voltage DoubleDeviceBuffer<RawVoltageType> _raw_voltage; /// Side channel data @@ -75,25 +69,49 @@ struct PolarizationData /// Swaps input buffers void swap(); + ///resize the internal buffers + void resize(size_t rawVolttageBufferBytes, size_t nsidechannelitems, size_t channelized_samples); +}; + + +struct SinglePolarizationInput : public PolarizationData +{ + DadaBufferLayout _dadaBufferLayout; + size_t _fft_length; + size_t _batch; + + // a buffer contains batch * fft_length samples + SinglePolarizationInput(size_t fft_length, size_t _batch, size_t nbits, + const DadaBufferLayout &dadaBufferLayout); + void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream); + size_t getSamplesPerInputPolarization() + { + return _dadaBufferLayout.sizeOfData() * 8 / _nbits; + } }; /// Input data and intermediate processing data for two polarizations -struct DualPolarizationData +struct DualPolarizationInput { + DadaBufferLayout _dadaBufferLayout; + size_t _fft_length; + size_t _batch; + PolarizationData polarization0, polarization1; - DualPolarizationData(size_t fft_length, size_t batch, const DadaBufferLayout + DualPolarizationInput(size_t fft_length, size_t batch, size_t nbit, const DadaBufferLayout &dadaBufferLayout); - DadaBufferLayout _dadaBufferLayout; - - PolarizationData polarization0, polarization1; void swap(); void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream); + size_t getSamplesPerInputPolarization() + { + return _dadaBufferLayout.sizeOfData() * 8 / polarization0._nbits / 2; + } }; @@ -190,7 +208,7 @@ struct FullStokesOutput } /// Resize all buffers - void resize(size_t size, size_t blocks) + FullStokesOutput(size_t size, size_t blocks) { I.resize(size * blocks); Q.resize(size * blocks); @@ -200,10 +218,17 @@ struct FullStokesOutput } }; -/* + struct GatedFullStokesOutput: public OutputDataStream { + GatedFullStokesOutput(size_t nchans, size_t blocks): OutputDataStream(nchans, blocks / 2), G0(nchans, blocks / 2), +G1(nchans, blocks / 2) + { + BOOST_LOG_TRIVIAL(debug) << "Output with " << _blocks << " blocks a " << _nchans << " channels"; + _host_power.resize( 8 * ( _nchans * sizeof(IntegratedPowerType) + sizeof(size_t) ) * _blocks); + BOOST_LOG_TRIVIAL(debug) << "Allocated " << _host_power.size() << " bytes."; + }; FullStokesOutput G0, G1; void reset(cudaStream_t &_proc_stream) @@ -217,108 +242,104 @@ struct GatedFullStokesOutput: public OutputDataStream { G0.swap(); G1.swap(); + _host_power.swap(); } - /// Resize all buffers - void resize(size_t size, size_t blocks) - { - G0.resize(size, blocks); - G1.resize(size, blocks); - } + void data2Host(cudaStream_t &_d2h_stream) { for (size_t i = 0; i < G0._noOfBitSets.size(); i++) { size_t memslicesize = (_nchans * sizeof(IntegratedPowerType)); - size_t memOffset = 8 * i * (memslicesize + + sizeof(size_t)); + size_t memOffset = 8 * i * (memslicesize + sizeof(size_t)); // Copy II QQ UU VV CUDA_ERROR_CHECK( cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset) , - static_cast<void *>(G0.I.b_ptr() + i * memslicesize), + static_cast<void *>(G0.I.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 1 * memslicesize) , - static_cast<void *>(G1.I.b_ptr() + i * memslicesize), + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 1 * memslicesize) , + static_cast<void *>(G1.I.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * memslicesize) , - static_cast<void *>(G0.Q.b_ptr() + i * memslicesize), + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 2 * memslicesize) , + static_cast<void *>(G0.Q.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 3 * memslicesize) , - static_cast<void *>(G1.Q.b_ptr() + i * memslicesize), + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 3 * memslicesize) , + static_cast<void *>(G1.Q.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 4 * memslicesize) , - static_cast<void *>(G0.U.b_ptr() + i * memslicesize), + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 4 * memslicesize) , + static_cast<void *>(G0.U.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 5 * memslicesize) , - static_cast<void *>(G1.U.b_ptr() + i * memslicesize), + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 5 * memslicesize) , + static_cast<void *>(G1.U.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 6 * memslicesize) , - static_cast<void *>(G0.V.b_ptr() + i * memslicesize), + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 6 * memslicesize) , + static_cast<void *>(G0.V.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 7 * memslicesize) , - static_cast<void *>(G1.V.b_ptr() + i * memslicesize), + cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 7 * memslicesize) , + static_cast<void *>(G1.V.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); // Copy SCI CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize), + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize), static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 1 * sizeof(size_t)), + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 1 * sizeof(size_t)), static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 2 * sizeof(size_t)), + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 2 * sizeof(size_t)), static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 3 * sizeof(size_t)), + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 3 * sizeof(size_t)), static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 4 * sizeof(size_t)), + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 4 * sizeof(size_t)), static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 5 * sizeof(size_t)), + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 5 * sizeof(size_t)), static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 6 * sizeof(size_t)), + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 6 * sizeof(size_t)), static_cast<void *>(G0._noOfBitSets.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( - cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 7 * sizeof(size_t)), + cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 7 * sizeof(size_t)), static_cast<void *>(G1._noOfBitSets.b_ptr() + i ), 1 * sizeof(size_t), cudaMemcpyDeviceToHost, _d2h_stream)); @@ -330,7 +351,7 @@ struct GatedFullStokesOutput: public OutputDataStream } }; -*/ + @@ -392,16 +413,16 @@ public: */ bool operator()(RawBytes &block); +private: // Processing strategy for single pol mode - void process(PolarizationData *inputDataStream, GatedPowerSpectrumOutput *outputDataStream); + void process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream); // Processing strategy for dual pol power mode - //void process(DualPolarizationData*inputDataStream, GatedPowerSpectrumOutput *outputDataStream); + //void process(DualPolarizationInput*inputDataStream, GatedPowerSpectrumOutput *outputDataStream); // Processing strategy for full stokes mode - //void process(DualPolarizationData*inputDataStream, FullStokesOutput *outputDataStream); + void process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream); -private: // gate the data from the input stream and fft data per gate. Number of bit // sets in every gate is stored in the output datasets void gated_fft(PolarizationData &data, @@ -412,12 +433,9 @@ private: DadaBufferLayout _dadaBufferLayout; std::size_t _fft_length; std::size_t _naccumulate; - std::size_t _nbits; std::size_t _selectedSideChannel; std::size_t _selectedBit; std::size_t _batch; - std::size_t _nsamps_per_output_spectra; - std::size_t _nsamps_per_buffer; std::size_t _nsamps_per_heap; HandlerType &_handler; @@ -478,6 +496,20 @@ __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, uint64_cu* stats_G1); +/** + * @brief calculate stokes IQUV from two complex valuies for each polarization + */ +__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V); + + +/** + * @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate + * complex spectra for individual polarizations + */ +__global__ void stokes_accumulate(float2 const __restrict__ *pol1, + float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V, + int nchans, int naccumulate); + } // edd diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index ad42df3d..d3df8cbd 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -12,6 +12,7 @@ #include <iomanip> #include <cstring> #include <sstream> +#include <typeinfo> namespace psrdada_cpp { namespace effelsberg { @@ -69,12 +70,12 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer( std::size_t nbits, float input_level, float output_level, HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout), _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit), - _fft_length(fft_length), _naccumulate(naccumulate), _nbits(nbits), + _fft_length(fft_length), _naccumulate(naccumulate), _handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096) { // Sanity checks - assert(((_nbits == 12) || (_nbits == 8))); + assert(((nbits == 12) || (nbits == 8))); assert(_naccumulate > 0); // check for any device errors @@ -94,28 +95,10 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer( (selectedSideChannel < _dadaBufferLayout.getNSideChannels())); // Sanity check of side channel value assert(selectedBit < 64); // Sanity check of selected bit - _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits; - _nsamps_per_output_spectra = fft_length * naccumulate; - if (_nsamps_per_output_spectra <= _nsamps_per_buffer) - { // one buffer block is used for one or multiple output spectra - size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra; - // All data in one block has to be used - assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer); - _nBlocks = 1; - } - else - { // multiple blocks are integrated intoone output - size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer; - // All data in multiple blocks has to be used - assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra); - _nBlocks = N; - } - BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << - " samples from " << _nBlocks << " into one output spectrum."; _nchans = _fft_length / 2 + 1; - int batch = _nsamps_per_buffer / _fft_length; + // Calculate the scaling parameters for 8 bit output float dof = 2 * _naccumulate; float scale = std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2); @@ -125,26 +108,45 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer( << "Correction factors for 8-bit conversion: offset = " << offset << ", scaling = " << scaling; + // plan the FFT + size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits; + int batch = nsamps_per_buffer / _fft_length; int n[] = {static_cast<int>(_fft_length)}; BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n" << " fft_length = " << _fft_length << "\n" << " n[0] = " << n[0] << "\n" << " _nchans = " << _nchans << "\n" << " batch = " << batch << "\n"; - - - ; CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL, 1, _nchans, CUFFT_R2C, batch)); - BOOST_LOG_TRIVIAL(debug) << "Allocating memory"; + inputDataStream = new InputType(fft_length, batch, nbits, _dadaBufferLayout); - inputDataStream = new InputType(fft_length, batch, _dadaBufferLayout); + //How many output spectra per input block? + size_t nsamps_per_output_spectra = fft_length * naccumulate; + + size_t nsamps_per_pol = inputDataStream->getSamplesPerInputPolarization(); + if (nsamps_per_output_spectra <= nsamps_per_pol) + { // one buffer block is used for one or multiple output spectra + size_t N = nsamps_per_pol / nsamps_per_output_spectra; + // All data in one block has to be used + assert(N * nsamps_per_output_spectra == nsamps_per_pol); + _nBlocks = 1; + } + else + { // multiple blocks are integrated intoone output + size_t N = nsamps_per_output_spectra / nsamps_per_pol; + // All data in multiple blocks has to be used + assert(N * nsamps_per_pol == nsamps_per_output_spectra); + _nBlocks = N; + } + BOOST_LOG_TRIVIAL(debug) << "Integrating " << nsamps_per_output_spectra << + " samples from " << _nBlocks << "blocks into one output spectrum."; - _unpacked_voltage_G0.resize(_nsamps_per_buffer); - _unpacked_voltage_G1.resize(_nsamps_per_buffer); - BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " - << _unpacked_voltage_G0.size(); + // We unpack one pol at a time + _unpacked_voltage_G0.resize(nsamps_per_pol); + _unpacked_voltage_G1.resize(nsamps_per_pol); + BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size(); outputDataStream = new OutputType(_nchans, batch / (_naccumulate / _nBlocks)); @@ -160,9 +162,12 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer( template <class HandlerType, class InputType, class OutputType> GatedSpectrometer<HandlerType, InputType, OutputType>::~GatedSpectrometer() { BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer"; - cudaDeviceSynchronize(); - if (!_fft_plan) + if (_fft_plan) cufftDestroy(_fft_plan); + + delete inputDataStream; + delete outputDataStream; + cudaStreamDestroy(_h2d_stream); cudaStreamDestroy(_proc_stream); cudaStreamDestroy(_d2h_stream); @@ -176,11 +181,20 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::init(RawBytes &block headerInfo << "\n" << "# Gated spectrometer parameters: \n" << "fft_length " << _fft_length << "\n" - << "nchannels " << _fft_length /2 + 1 << "\n" + << "nchannels " << _nchans << "\n" << "naccumulate " << _naccumulate << "\n" << "selected_side_channel " << _selectedSideChannel << "\n" << "selected_bit " << _selectedBit << "\n" - << "output_bit_depth " << sizeof(IntegratedPowerType) * 8; + << "output_bit_depth " << sizeof(IntegratedPowerType) * 8 << "\n" + << "full_stokes_output "; + if (typeid(OutputType) == typeid(GatedFullStokesOutput)) + { + headerInfo << "yes\n"; + } + else + { + headerInfo << "no\n"; + } size_t bEnd = std::strlen(block.ptr()); if (bEnd + headerInfo.str().size() < block.total_bytes()) @@ -208,7 +222,7 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft( ) { BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages"; - switch (_nbits) { + switch (data._nbits) { case 8: _unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0); break; @@ -242,8 +256,6 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft( ); } - CUDA_ERROR_CHECK(cudaDeviceSynchronize()); - CUDA_ERROR_CHECK(cudaDeviceSynchronize()); // only few output blocks per input block thus execution on only one thread. // Important is that the execution is async on the GPU. update_baselines<<<1,1,0, _proc_stream>>>( @@ -256,29 +268,23 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft( _noOfBitSetsIn_G0.size() ); - CUDA_ERROR_CHECK(cudaDeviceSynchronize()); BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1"; - CUDA_ERROR_CHECK(cudaDeviceSynchronize()); BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage"; UnpackedVoltageType *_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G0.data()); - CUDA_ERROR_CHECK(cudaDeviceSynchronize()); BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage"; ChannelisedVoltageType *_channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G0.data()); - CUDA_ERROR_CHECK(cudaDeviceSynchronize()); CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, (cufftComplex *)_channelised_voltage_ptr)); - CUDA_ERROR_CHECK(cudaDeviceSynchronize()); BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2"; _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data()); _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data()); CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, (cufftComplex *)_channelised_voltage_ptr)); - CUDA_ERROR_CHECK(cudaDeviceSynchronize()); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); BOOST_LOG_TRIVIAL(debug) << "Exit processing"; } // process @@ -314,21 +320,21 @@ bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes // process data // check if new outblock is started: _call_count -1 because this is the block number on the device - bool newBlock = (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0); + bool newBlock = ((((_call_count-1) * inputDataStream->getSamplesPerInputPolarization()) % (_fft_length * _naccumulate) ) == 0); // only if a newblock is started the output buffer is swapped. Otherwise the // new data is added to it if (newBlock) { BOOST_LOG_TRIVIAL(debug) << "Starting new output block."; + CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); outputDataStream->swap(); outputDataStream->reset(_proc_stream); } BOOST_LOG_TRIVIAL(debug) << "Processing block."; - cudaDeviceSynchronize(); process(inputDataStream, outputDataStream); - cudaDeviceSynchronize(); + CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); BOOST_LOG_TRIVIAL(debug) << "Processing block finished."; /// For one pol input and power out /// ToDo: For two pol input and power out @@ -339,36 +345,12 @@ bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes return false; } - outputDataStream->data2Host(_d2h_stream); + outputDataStream->data2Host(_d2h_stream); if (_call_count == 3) { return false; } -// // calculate off value -// BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:"; -// size_t total_samples_lost = 0; -// for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) -// { -// size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); -// -// size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)); -// size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); -// -// size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values); -// total_samples_lost += samples_lost; -// -// BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n" -// <<" Samples with bit set : " << *on_values << std::endl -// <<" Samples without bit set: " << *off_values << std::endl -// <<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl; -// } -// double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size()); -// double prev_average = _processing_efficiency / (_call_count- 3 - 1); -// _processing_efficiency += efficiency; -// double average = _processing_efficiency / (_call_count-3); -// BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")"; -// -// // Wrap in a RawBytes object here; + // Wrap in a RawBytes object here; RawBytes bytes(reinterpret_cast<char *>(outputDataStream->_host_power.b_ptr()), outputDataStream->_host_power.size(), outputDataStream->_host_power.size()); @@ -384,7 +366,7 @@ bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes template <class HandlerType, class InputType, class OutputType> -void GatedSpectrometer<HandlerType, InputType, OutputType>::process(PolarizationData *inputDataStream, GatedPowerSpectrumOutput *outputDataStream) +void GatedSpectrometer<HandlerType, InputType, OutputType>::process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream) { gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a()); @@ -404,10 +386,51 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::process(Polarization _naccumulate / _nBlocks, 1, 0., 1, 0); - CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); } +template <class HandlerType, class InputType, class OutputType> +void GatedSpectrometer<HandlerType, InputType, OutputType>::process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream) +{ + mergeSideChannels<<<1024, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(inputDataStream->polarization0._sideChannelData.a().data()), + thrust::raw_pointer_cast(inputDataStream->polarization1._sideChannelData.a().data()), inputDataStream->polarization1._sideChannelData.a().size()); + + gated_fft(inputDataStream->polarization0, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a()); + gated_fft(inputDataStream->polarization1, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a()); + + stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G0.data()), + thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G0.data()), + thrust::raw_pointer_cast(outputDataStream->G0.I.a().data()), + thrust::raw_pointer_cast(outputDataStream->G0.Q.a().data()), + thrust::raw_pointer_cast(outputDataStream->G0.U.a().data()), + thrust::raw_pointer_cast(outputDataStream->G0.V.a().data()), + _nchans, _naccumulate + ); + + stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G1.data()), + thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G1.data()), + thrust::raw_pointer_cast(outputDataStream->G1.I.a().data()), + thrust::raw_pointer_cast(outputDataStream->G1.Q.a().data()), + thrust::raw_pointer_cast(outputDataStream->G1.U.a().data()), + thrust::raw_pointer_cast(outputDataStream->G1.V.a().data()), + _nchans, _naccumulate + ); + + //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G0.I.a().begin(), outputDataStream->G0.I.a().end(), _call_count); + //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G0.Q.a().begin(), outputDataStream->G0.Q.a().end(), _call_count); + //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G0.U.a().begin(), outputDataStream->G0.U.a().end(), _call_count); + //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G0.V.a().begin(), outputDataStream->G0.V.a().end(), _call_count); + + + // thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G1.I.a().begin(), outputDataStream->G1.I.a().end(), _call_count); + //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G1.Q.a().begin(), outputDataStream->G1.Q.a().end(), _call_count); + //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G1.U.a().begin(), outputDataStream->G1.U.a().end(), _call_count); + //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G1.V.a().begin(), outputDataStream->G1.V.a().end(), _call_count); + + cudaDeviceSynchronize(); +} } // edd } // effelsberg diff --git a/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp b/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp index 12acc268..88a2fb14 100644 --- a/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp +++ b/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp @@ -7,9 +7,19 @@ namespace psrdada_cpp { namespace effelsberg { namespace edd { +DadaBufferLayout::DadaBufferLayout() {}; -DadaBufferLayout::DadaBufferLayout(key_t input_key, size_t heapSize, size_t nSideChannels) : _input_key(input_key), _heapSize(heapSize), _nSideChannels(nSideChannels) +DadaBufferLayout::DadaBufferLayout(key_t input_key, size_t heapSize, size_t nSideChannels) { + intitialize(input_key, heapSize, nSideChannels); +} + +void DadaBufferLayout::intitialize(key_t input_key, size_t heapSize, size_t nSideChannels) +{ + _input_key = input_key; + _heapSize = heapSize; + _nSideChannels = nSideChannels; + MultiLog log("DadaBufferLayout"); DadaClientBase client(input_key, log); _bufferSize = client.data_buffer_size(); diff --git a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer.cu index a86a2ee4..5924c3fb 100644 --- a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer.cu @@ -119,24 +119,76 @@ __global__ void update_baselines(float* __restrict__ baseLineG0, +/** + * @brief calculate stokes IQUV from two complex valuies for each polarization + */ +__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V) +{ + I = fabs(p1.x*p1.x + p1.y * p1.y) + fabs(p2.x*p2.x + p2.y * p2.y); + Q = fabs(p1.x*p1.x + p1.y * p1.y) - fabs(p2.x*p2.x + p2.y * p2.y); + U = 2 * (p1.x*p2.x + p1.y * p2.y); + V = -2 * (p1.y*p2.x - p1.x * p2.y); +} + + + +/** + * @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate + * complex spectra for individual polarizations + */ +__global__ void stokes_accumulate(float2 const __restrict__ *pol1, + float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V, + int nchans, int naccumulate) +{ + + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nchans); + i += blockDim.x * gridDim.x) + { + float rI = 0; + float rQ = 0; + float rU = 0; + float rV = 0; + + for (int k=0; k < naccumulate; k++) + { + const float2 p1 = pol1[i + k * nchans]; + const float2 p2 = pol2[i + k * nchans]; + + rI += fabs(p1.x * p1.x + p1.y * p1.y) + fabs(p2.x * p2.x + p2.y * p2.y); + rQ += fabs(p1.x * p1.x + p1.y * p1.y) - fabs(p2.x * p2.x + p2.y * p2.y); + rU += 2.f * (p1.x * p2.x + p1.y * p2.y); + rV += -2.f * (p1.y * p2.x - p1.x * p2.y); + } + I[i] += rI; + Q[i] += rQ; + U[i] += rU; + V[i] += rV; + } +} -PolarizationData::PolarizationData(size_t fft_length, size_t batch, const DadaBufferLayout - &dadaBufferLayout) : _fft_length(fft_length), _batch(batch), _dadaBufferLayout(dadaBufferLayout) +void PolarizationData::resize(size_t rawVolttageBufferBytes, size_t nsidechannelitems, size_t channelized_samples) { - _raw_voltage.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t)); + _raw_voltage.resize(rawVolttageBufferBytes / sizeof(uint64_t)); BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " << _raw_voltage.size(); _baseLineG0.resize(1); _baseLineG0_update.resize(1); _baseLineG1.resize(1); _baseLineG1_update.resize(1); - _channelised_voltage_G0.resize((_fft_length / 2 + 1) * _batch); - _channelised_voltage_G1.resize((_fft_length / 2 + 1) * _batch); - _sideChannelData.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps()); + _channelised_voltage_G0.resize(channelized_samples); + _channelised_voltage_G1.resize(channelized_samples); + _sideChannelData.resize(nsidechannelitems); BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " << _channelised_voltage_G0.size(); +} + + +SinglePolarizationInput::SinglePolarizationInput(size_t fft_length, size_t batch, size_t nbits, const DadaBufferLayout + &dadaBufferLayout) : PolarizationData(nbits), _fft_length(fft_length), _batch(batch), _dadaBufferLayout(dadaBufferLayout) +{ + resize(_dadaBufferLayout.sizeOfData(), _dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps(), (_fft_length / 2 + 1) * _batch); }; @@ -147,7 +199,7 @@ void PolarizationData::swap() } -void PolarizationData::getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream) +void SinglePolarizationInput::getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream) { BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes() << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n"; @@ -168,19 +220,26 @@ void PolarizationData::getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream) } -DualPolarizationData::DualPolarizationData(size_t fft_length, size_t batch, const DadaBufferLayout - &dadaBufferLayout) : polarization0(fft_length, batch, dadaBufferLayout), - polarization1(fft_length, batch, dadaBufferLayout), - _dadaBufferLayout(dadaBufferLayout) +DualPolarizationInput::DualPolarizationInput(size_t fft_length, size_t batch, size_t nbits, const DadaBufferLayout + &dadaBufferLayout) : _fft_length(fft_length), + _batch(batch), + polarization0(nbits), + polarization1(nbits), + _dadaBufferLayout(dadaBufferLayout) { + polarization0.resize(_dadaBufferLayout.sizeOfData() / 2, _dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps() / 2, (_fft_length / 2 + 1) * _batch); + polarization1.resize(_dadaBufferLayout.sizeOfData() / 2, _dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps() / 2, (_fft_length / 2 + 1) * _batch); }; -void DualPolarizationData::swap() + +void DualPolarizationInput::swap() { - polarization0.swap(); polarization1.swap(); + polarization0.swap(); + polarization1.swap(); } -void DualPolarizationData::getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream) + +void DualPolarizationInput::getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream) { // Copy the data with stride to the GPU: // CPU: P1P2P1P2P1P2 ... @@ -199,7 +258,7 @@ CUDA_ERROR_CHECK(cudaMemcpy2DAsync( CUDA_ERROR_CHECK(cudaMemcpy2DAsync( static_cast<void *>(polarization1._raw_voltage.a_ptr()), heapsize_bytes, - static_cast<void *>(block.ptr()) + heapsize_bytes, + static_cast<void *>(block.ptr() + heapsize_bytes), 2 * heapsize_bytes, heapsize_bytes, _dadaBufferLayout.sizeOfData() / heapsize_bytes/ 2, cudaMemcpyHostToDevice, _h2d_stream)); @@ -292,13 +351,13 @@ void GatedPowerSpectrumOutput::data2Host(cudaStream_t &_d2h_stream) // copy 2x channel data CUDA_ERROR_CHECK( cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset) , - static_cast<void *>(G0.data.b_ptr() + i * memslicesize), + static_cast<void *>(G0.data.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); CUDA_ERROR_CHECK( cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 1 * memslicesize) , - static_cast<void *>(G1.data.b_ptr() + i * memslicesize), + static_cast<void *>(G1.data.b_ptr() + i * _nchans), _nchans * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); @@ -321,4 +380,7 @@ void GatedPowerSpectrumOutput::data2Host(cudaStream_t &_d2h_stream) + + + }}} // namespace diff --git a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu index 4abf9426..8b0606f2 100644 --- a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu +++ b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu @@ -24,51 +24,68 @@ const size_t ERROR_UNHANDLED_EXCEPTION = 2; } // namespace -template<typename T> -void launchSpectrometer(const effelsberg::edd::DadaBufferLayout - &dadaBufferLayout, const std::string &output_type, - const std::string &filename, - size_t selectedSideChannel, size_t selectedBit, - size_t fft_length, size_t naccumulate, unsigned int nbits, - float input_level, float output_level) +struct GatedSpectrometerInputParameters +{ + effelsberg::edd::DadaBufferLayout dadaBufferLayout; + size_t selectedSideChannel; + size_t speadHeapSize; + size_t nSideChannels; + size_t selectedBit; + size_t fft_length; + size_t naccumulate; + unsigned int nbits; + float input_level; + float output_level; + std::string filename; + std::string output_type; +}; + + + +template<typename T, + class InputType, + class OutputType + > +void launchSpectrometer(const GatedSpectrometerInputParameters &i) { MultiLog log("DadaBufferLayout"); - std::cout << "Running with output_type: " << output_type << std::endl; - if (output_type == "file") + std::cout << "Running with output_type: " << i.output_type << std::endl; + if (i.output_type == "file") { - SimpleFileWriter sink(filename); - effelsberg::edd::GatedSpectrometer<decltype(sink), effelsberg::edd::PolarizationData, effelsberg::edd::GatedPowerSpectrumOutput> spectrometer(dadaBufferLayout, - selectedSideChannel, selectedBit, - fft_length, naccumulate, nbits, input_level, - output_level, sink); + SimpleFileWriter sink(i.filename); + effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> + spectrometer(i.dadaBufferLayout, + i.selectedSideChannel, i.selectedBit, + i.fft_length, i.naccumulate, i.nbits, i.input_level, + i.output_level, sink); - DadaInputStream<decltype(spectrometer)> istream(dadaBufferLayout.getInputkey(), log, + DadaInputStream<decltype(spectrometer)> istream(i.dadaBufferLayout.getInputkey(), log, spectrometer); istream.start(); } - else if (output_type == "dada") + else if (i.output_type == "dada") { - DadaOutputStream sink(string_to_key(filename), log); - effelsberg::edd::GatedSpectrometer<decltype(sink), effelsberg::edd::PolarizationData, effelsberg::edd::GatedPowerSpectrumOutput> spectrometer(dadaBufferLayout, - selectedSideChannel, selectedBit, - fft_length, naccumulate, nbits, input_level, - output_level, sink); + DadaOutputStream sink(string_to_key(i.filename), log); + effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> spectrometer(i.dadaBufferLayout, + i.selectedSideChannel, i.selectedBit, + i.fft_length, i.naccumulate, i.nbits, i.input_level, + i.output_level, sink); - DadaInputStream<decltype(spectrometer)> istream(dadaBufferLayout.getInputkey(), log, + DadaInputStream<decltype(spectrometer)> istream(i.dadaBufferLayout.getInputkey(), log, spectrometer); istream.start(); } - else if (output_type == "profile") + else if (i.output_type == "profile") { NullSink sink; - effelsberg::edd::GatedSpectrometer<decltype(sink), effelsberg::edd::PolarizationData, effelsberg::edd::GatedPowerSpectrumOutput> spectrometer(dadaBufferLayout, - selectedSideChannel, selectedBit, - fft_length, naccumulate, nbits, input_level, - output_level, sink); + effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> spectrometer(i.dadaBufferLayout, + i.selectedSideChannel, i.selectedBit, + i.fft_length, i.naccumulate, i.nbits, i.input_level, + i.output_level, sink); - std::vector<char> buffer(dadaBufferLayout.getBufferSize()); - cudaHostRegister(buffer.data(), buffer.size(),cudaHostRegisterPortable); + std::vector<char> buffer(i.dadaBufferLayout.getBufferSize()); + cudaHostRegister(buffer.data(), buffer.size(), cudaHostRegisterPortable); RawBytes ib(buffer.data(), buffer.size(), buffer.size()); spectrometer.init(ib); for (int i =0; i< 10; i++) @@ -85,25 +102,45 @@ void launchSpectrometer(const effelsberg::edd::DadaBufferLayout } +template<typename T> void io_eval(const GatedSpectrometerInputParameters &inputParameters, const std::string &input_polarizations, const std::string &output_format) +{ + if (input_polarizations == "Single" && output_format == "Power") + { + launchSpectrometer<T, effelsberg::edd::SinglePolarizationInput, + effelsberg::edd::GatedPowerSpectrumOutput>(inputParameters); + } + else if (input_polarizations == "Dual" && output_format == "Power") + { + throw std::runtime_error("Not implemented yet."); + } + else if (input_polarizations == "Dual" && output_format == "Stokes") + { + launchSpectrometer<T, effelsberg::edd::DualPolarizationInput, + effelsberg::edd::GatedFullStokesOutput>(inputParameters); + } + else + { + throw std::runtime_error("Not implemented yet."); + } + +} + + + + int main(int argc, char **argv) { try { key_t input_key; - int fft_length; - int naccumulate; - unsigned int nbits; - size_t nSideChannels; - size_t selectedSideChannel; - size_t selectedBit; - size_t speadHeapSize; - float input_level; - float output_level; + + GatedSpectrometerInputParameters ip; std::time_t now = std::time(NULL); std::tm *ptm = std::localtime(&now); - char buffer[32]; - std::strftime(buffer, 32, "%Y-%m-%d-%H:%M:%S.bp", ptm); - std::string filename(buffer); - std::string output_type = "file"; + char default_filename[32]; + std::strftime(default_filename, 32, "%Y-%m-%d-%H:%M:%S.bp", ptm); + + std::string input_polarizations = "Single"; + std::string output_format = "Power"; unsigned int output_bit_depth; /** Define and parse the program options @@ -119,54 +156,62 @@ int main(int argc, char **argv) { "The shared memory key for the dada buffer to connect to (hex " "string)"); desc.add_options()( - "output_type", po::value<std::string>(&output_type)->default_value(output_type), + "output_type", po::value<std::string>(&ip.output_type)->default_value("file"), "output type [dada, file, profile]. Default is file. Profile executes the spectrometer 10x on random data and passes the ouput to a null sink." ); desc.add_options()( - "output_bit_depth", po::value<unsigned int>(&output_bit_depth)->default_value(8), + "output_bit_depth", po::value<unsigned int>(&output_bit_depth)->default_value(32), "output_bit_depth [8, 32]. Default is 32." ); desc.add_options()( - "output_key,o", po::value<std::string>(&filename)->default_value(filename), + "output_key,o", po::value<std::string>(&ip.filename)->default_value(default_filename), "The key of the output bnuffer / name of the output file to write spectra " "to"); + desc.add_options()( + "input_polarizations,p", po::value<std::string>(&input_polarizations)->default_value(input_polarizations), + "Single, Dual"); + desc.add_options()( + "output_format,f", po::value<std::string>(&output_format)->default_value(output_format), + "Power, Stokes (requires dual poalriation input)"); + + desc.add_options()("nsidechannelitems,s", po::value<size_t>()->default_value(1)->notifier( - [&nSideChannels](size_t in) { nSideChannels = in; }), + [&ip](size_t in) { ip.nSideChannels = in; }), "Number of side channel items ( s >= 1)"); desc.add_options()( "selected_sidechannel,e", po::value<size_t>()->default_value(0)->notifier( - [&selectedSideChannel](size_t in) { selectedSideChannel = in; }), + [&ip](size_t in) { ip.selectedSideChannel = in; }), "Side channel selected for evaluation."); desc.add_options()("selected_bit,B", po::value<size_t>()->default_value(0)->notifier( - [&selectedBit](size_t in) { selectedBit = in; }), + [&ip](size_t in) { ip.selectedBit = in; }), "Side channel selected for evaluation."); desc.add_options()("speadheap_size", po::value<size_t>()->default_value(4096)->notifier( - [&speadHeapSize](size_t in) { speadHeapSize = in; }), + [&ip](size_t in) { ip.speadHeapSize = in; }), "size of the spead data heaps. The number of the " "heaps in the dada block depends on the number of " "side channel items."); - desc.add_options()("nbits,b", po::value<unsigned int>(&nbits)->required(), + + desc.add_options()("nbits,b", po::value<unsigned int>(&ip.nbits)->required(), "The number of bits per sample in the " "packetiser output (8 or 12)"); - - - - desc.add_options()("fft_length,n", po::value<int>(&fft_length)->required(), + desc.add_options()("fft_length,n", po::value<size_t>(&ip.fft_length)->required(), "The length of the FFT to perform on the data"); desc.add_options()("naccumulate,a", - po::value<int>(&naccumulate)->required(), + po::value<size_t>(&ip.naccumulate)->required(), "The number of samples to integrate in each channel"); desc.add_options()("input_level", - po::value<float>(&input_level)->required(), + po::value<float>()->default_value(100.)->notifier( + [&ip](float in) { ip.input_level = in; }), "The input power level (standard " "deviation, used for 8-bit conversion)"); desc.add_options()("output_level", - po::value<float>(&output_level)->required(), + po::value<float>()->default_value(100.)->notifier( + [&ip](float in) { ip.output_level = in; }), "The output power level (standard " "deviation, used for 8-bit " "conversion)"); @@ -191,12 +236,22 @@ int main(int argc, char **argv) { } po::notify(vm); - if (vm.count("output_type") && (!(output_type == "dada" || output_type == "file" || output_type== "profile") )) + if (vm.count("output_type") && (!(ip.output_type == "dada" || ip.output_type == "file" || ip.output_type== "profile") )) { - throw po::validation_error(po::validation_error::invalid_option_value, "output_type", output_type); + throw po::validation_error(po::validation_error::invalid_option_value, "output_type", ip.output_type); } - if (!(nSideChannels >= 1)) + if (vm.count("input_polarizations") && (!(input_polarizations == "Single" || input_polarizations == "Dual") )) + { + throw po::validation_error(po::validation_error::invalid_option_value, "input_polarizations", input_polarizations); + } + + if (vm.count("output_format") && (!(output_format == "Power" || output_format == "Stokes") )) + { + throw po::validation_error(po::validation_error::invalid_option_value, "output_format", output_format); + } + + if (!(ip.nSideChannels >= 1)) { throw po::validation_error(po::validation_error::invalid_option_value, "Number of side channels must be 1 or larger!"); } @@ -207,14 +262,18 @@ int main(int argc, char **argv) { return ERROR_IN_COMMAND_LINE; } - effelsberg::edd::DadaBufferLayout bufferLayout(input_key, speadHeapSize, nSideChannels); + if ((output_format == "Stokes") && (input_polarizations != "Dual")) + { + throw po::validation_error(po::validation_error::invalid_option_value, "Stokes output requires dual polarization input!"); + } + + ip.dadaBufferLayout.intitialize(input_key, ip.speadHeapSize, ip.nSideChannels); // ToDo: Supprot only single output depth if (output_bit_depth == 32) { - launchSpectrometer<float>(bufferLayout, output_type, filename, - selectedSideChannel, selectedBit, - fft_length, naccumulate, nbits, input_level, output_level); + + io_eval<float>(ip, input_polarizations, output_format); } else { diff --git a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu index 9983d260..495375dd 100644 --- a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu +++ b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu @@ -22,121 +22,121 @@ TEST(GatedSpectrometer, BitManipulationMacros) { } } -// -//TEST(GatedSpectrometer, stokes_IQUV) -//{ -// float I,Q,U,V; -// // No field -// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); -// EXPECT_FLOAT_EQ(I, 0); -// EXPECT_FLOAT_EQ(Q, 0); -// EXPECT_FLOAT_EQ(U, 0); -// EXPECT_FLOAT_EQ(V, 0); -// -// // For p1 = Ex, p2 = Ey -// // horizontal polarized -// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); -// EXPECT_FLOAT_EQ(I, 1); -// EXPECT_FLOAT_EQ(Q, 1); -// EXPECT_FLOAT_EQ(U, 0); -// EXPECT_FLOAT_EQ(V, 0); -// -// // vertical polarized -// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){1.0f,0.0f}, I, Q, U, V); -// EXPECT_FLOAT_EQ(I, 1); -// EXPECT_FLOAT_EQ(Q, -1); -// EXPECT_FLOAT_EQ(U, 0); -// EXPECT_FLOAT_EQ(V, 0); -// -// //linear +45 deg. -// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V); -// EXPECT_FLOAT_EQ(I, 1); -// EXPECT_FLOAT_EQ(Q, 0); -// EXPECT_FLOAT_EQ(U, 1); -// EXPECT_FLOAT_EQ(V, 0); -// -// //linear -45 deg. -// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){-1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V); -// EXPECT_FLOAT_EQ(I, 1); -// EXPECT_FLOAT_EQ(Q, 0); -// EXPECT_FLOAT_EQ(U, -1); -// EXPECT_FLOAT_EQ(V, 0); -// -// //left circular -// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V); -// EXPECT_FLOAT_EQ(I, 1); -// EXPECT_FLOAT_EQ(Q, 0); -// EXPECT_FLOAT_EQ(U, 0); -// EXPECT_FLOAT_EQ(V, -1); -// -// // right circular -// psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,-1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V); -// EXPECT_FLOAT_EQ(I, 1); -// EXPECT_FLOAT_EQ(Q, 0); -// EXPECT_FLOAT_EQ(U, 0); -// EXPECT_FLOAT_EQ(V, 1); -//} -// -// -//TEST(GatedSpectrometer, stokes_accumulate) -//{ -// CUDA_ERROR_CHECK(cudaDeviceSynchronize()); -// size_t nchans = 8 * 1024 * 1024 + 1; -// size_t naccumulate = 5; -// -// thrust::device_vector<float2> P0(nchans * naccumulate); -// thrust::device_vector<float2> P1(nchans * naccumulate); -// thrust::fill(P0.begin(), P0.end(), (float2){0, 0}); -// thrust::fill(P1.begin(), P1.end(), (float2){0, 0}); -// thrust::device_vector<float> I(nchans); -// thrust::device_vector<float> Q(nchans); -// thrust::device_vector<float> U(nchans); -// thrust::device_vector<float> V(nchans); -// thrust::fill(I.begin(), I.end(), 0); -// thrust::fill(Q.begin(), Q.end(), 0); -// thrust::fill(U.begin(), U.end(), 0); -// thrust::fill(V.begin(), V.end(), 0); -// -// // This channel should be left circular polarized -// size_t idx0 = 23; -// for (int k = 0; k< naccumulate; k++) -// { -// size_t idx = idx0 + k * nchans; -// P0[idx] = (float2){0.0f, 1.0f/std::sqrt(2)}; -// P1[idx] = (float2){1.0f/std::sqrt(2),0.0f}; -// } -// -// psrdada_cpp::effelsberg::edd::stokes_accumulate<<<1024, 1024>>>( -// thrust::raw_pointer_cast(P0.data()), -// thrust::raw_pointer_cast(P1.data()), -// thrust::raw_pointer_cast(I.data()), -// thrust::raw_pointer_cast(Q.data()), -// thrust::raw_pointer_cast(U.data()), -// thrust::raw_pointer_cast(V.data()), -// nchans, -// naccumulate -// ); -// -// CUDA_ERROR_CHECK(cudaDeviceSynchronize()); -// thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; -// -// minmax = thrust::minmax_element(I.begin(), I.end()); -// EXPECT_FLOAT_EQ(*minmax.first, 0); -// EXPECT_FLOAT_EQ(*minmax.second, naccumulate); -// -// minmax = thrust::minmax_element(Q.begin(), Q.end()); -// EXPECT_FLOAT_EQ(*minmax.first, 0); -// EXPECT_FLOAT_EQ(*minmax.second, 0); -// -// minmax = thrust::minmax_element(U.begin(), U.end()); -// EXPECT_FLOAT_EQ(*minmax.first, 0); -// EXPECT_FLOAT_EQ(*minmax.second, 0); -// -// minmax = thrust::minmax_element(V.begin(), V.end()); -// EXPECT_FLOAT_EQ(*minmax.first, -1. * naccumulate); -// EXPECT_FLOAT_EQ(*minmax.second, 0); -//} -// + +TEST(GatedSpectrometer, stokes_IQUV) +{ + float I,Q,U,V; + // No field + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 0); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, 0); + + // For p1 = Ex, p2 = Ey + // horizontal polarized + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f,0.0f}, (float2){0.0f,0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 1); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, 0); + + // vertical polarized + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){0.0f,0.0f}, (float2){1.0f,0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, -1); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, 0); + + //linear +45 deg. + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, 1); + EXPECT_FLOAT_EQ(V, 0); + + //linear -45 deg. + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){-1.0f/std::sqrt(2),0.0f}, (float2){1.0f/std::sqrt(2),0.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, -1); + EXPECT_FLOAT_EQ(V, 0); + + //left circular + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, -1); + + // right circular + psrdada_cpp::effelsberg::edd::stokes_IQUV((float2){.0f,-1.0f/std::sqrt(2)}, (float2){1.0f/std::sqrt(2),.0f}, I, Q, U, V); + EXPECT_FLOAT_EQ(I, 1); + EXPECT_FLOAT_EQ(Q, 0); + EXPECT_FLOAT_EQ(U, 0); + EXPECT_FLOAT_EQ(V, 1); +} + + +TEST(GatedSpectrometer, stokes_accumulate) +{ + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + size_t nchans = 8 * 1024 * 1024 + 1; + size_t naccumulate = 5; + + thrust::device_vector<float2> P0(nchans * naccumulate); + thrust::device_vector<float2> P1(nchans * naccumulate); + thrust::fill(P0.begin(), P0.end(), (float2){0, 0}); + thrust::fill(P1.begin(), P1.end(), (float2){0, 0}); + thrust::device_vector<float> I(nchans); + thrust::device_vector<float> Q(nchans); + thrust::device_vector<float> U(nchans); + thrust::device_vector<float> V(nchans); + thrust::fill(I.begin(), I.end(), 0); + thrust::fill(Q.begin(), Q.end(), 0); + thrust::fill(U.begin(), U.end(), 0); + thrust::fill(V.begin(), V.end(), 0); + + // This channel should be left circular polarized + size_t idx0 = 23; + for (int k = 0; k< naccumulate; k++) + { + size_t idx = idx0 + k * nchans; + P0[idx] = (float2){0.0f, 1.0f/std::sqrt(2)}; + P1[idx] = (float2){1.0f/std::sqrt(2),0.0f}; + } + + psrdada_cpp::effelsberg::edd::stokes_accumulate<<<1024, 1024>>>( + thrust::raw_pointer_cast(P0.data()), + thrust::raw_pointer_cast(P1.data()), + thrust::raw_pointer_cast(I.data()), + thrust::raw_pointer_cast(Q.data()), + thrust::raw_pointer_cast(U.data()), + thrust::raw_pointer_cast(V.data()), + nchans, + naccumulate + ); + + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; + + minmax = thrust::minmax_element(I.begin(), I.end()); + EXPECT_FLOAT_EQ(*minmax.first, 0); + EXPECT_FLOAT_EQ(*minmax.second, naccumulate); + + minmax = thrust::minmax_element(Q.begin(), Q.end()); + EXPECT_FLOAT_EQ(*minmax.first, 0); + EXPECT_FLOAT_EQ(*minmax.second, 0); + + minmax = thrust::minmax_element(U.begin(), U.end()); + EXPECT_FLOAT_EQ(*minmax.first, 0); + EXPECT_FLOAT_EQ(*minmax.second, 0); + + minmax = thrust::minmax_element(V.begin(), V.end()); + EXPECT_FLOAT_EQ(*minmax.first, -1. * naccumulate); + EXPECT_FLOAT_EQ(*minmax.second, 0); +} + TEST(GatedSpectrometer, GatingKernel) -- GitLab