diff --git a/psrdada_cpp/dada_null_sink.hpp b/psrdada_cpp/dada_null_sink.hpp index 31ee7652e0a4e6eb2811f25c6dae8cbfe6a0ab39..e4d4a9eb67d4852836a0cebb358dd6f08e19658d 100644 --- a/psrdada_cpp/dada_null_sink.hpp +++ b/psrdada_cpp/dada_null_sink.hpp @@ -9,10 +9,10 @@ namespace psrdada_cpp class NullSink { public: - NullSink() {}; - ~NullSink() {}; - void init(RawBytes&) {}; - bool operator()(RawBytes&) {}; + NullSink(); + ~NullSink(); + void init(RawBytes&); + bool operator()(RawBytes&); }; } //namespace psrdada_cpp #endif //PSRDADA_CPP_DADA_NULL_SINK_HPP diff --git a/psrdada_cpp/effelsberg/edd/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/CMakeLists.txt index fdffa58265c7753de7debdf7c9281dc3aa575ea9..a5ebab55676343d59f25d97e0f16520c08049f76 100644 --- a/psrdada_cpp/effelsberg/edd/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/CMakeLists.txt @@ -9,7 +9,6 @@ set(PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES ${DEPENDENCY_LIBRARIES}) set(psrdada_cpp_effelsberg_edd_src - src/Channeliser.cu src/DadaBufferLayout.cpp src/DetectorAccumulator.cu src/EDDPolnMerge.cpp @@ -23,7 +22,6 @@ set(psrdada_cpp_effelsberg_edd_src set(psrdada_cpp_effelsberg_edd_inc - Channeliser.cuh DadaBufferLayout.hpp DetectorAccumulator.cuh EDDPolnMerge.hpp diff --git a/psrdada_cpp/effelsberg/edd/Channeliser.cuh b/psrdada_cpp/effelsberg/edd/Channeliser.cuh deleted file mode 100644 index 9568ce3c05eed0e0fac29d80b6d45f1d617c39bf..0000000000000000000000000000000000000000 --- a/psrdada_cpp/effelsberg/edd/Channeliser.cuh +++ /dev/null @@ -1,84 +0,0 @@ -#ifndef PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP -#define PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP - -#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh" -#include "psrdada_cpp/effelsberg/edd/ScaledTransposeTFtoTFT.cuh" -#include "psrdada_cpp/raw_bytes.hpp" -#include "psrdada_cpp/double_device_buffer.cuh" -#include "psrdada_cpp/double_host_buffer.cuh" -#include "psrdada_cpp/dada_write_client.hpp" -#include "thrust/device_vector.h" -#include "cufft.h" - -namespace psrdada_cpp { -namespace effelsberg { -namespace edd { - -class Channeliser -{ -public: - typedef uint64_t RawVoltageType; - typedef float UnpackedVoltageType; - typedef float2 ChannelisedVoltageType; - typedef char2 PackedChannelisedVoltageType; - -public: - Channeliser( - std::size_t buffer_bytes, - std::size_t fft_length, - std::size_t nbits, - float input_level, - float output_level, - DadaWriteClient& client); - ~Channeliser(); - - /** - * @brief A callback to be called on connection - * to a ring buffer. - * - * @detail The first available header block in the - * in the ring buffer is provided as an argument. - * It is here that header parameters could be read - * if desired. - * - * @param block A RawBytes object wrapping a DADA header buffer - */ - void init(RawBytes& block); - - /** - * @brief A callback to be called on acqusition of a new - * data block. - * - * @param block A RawBytes object wrapping a DADA data buffer - */ - bool operator()(RawBytes& block); - -private: - void process(thrust::device_vector<RawVoltageType> const& digitiser_raw, - thrust::device_vector<PackedChannelisedVoltageType>& packed_channelised); - -private: - std::size_t _buffer_bytes; - std::size_t _fft_length; - std::size_t _nbits; - DadaWriteClient& _client; - cufftHandle _fft_plan; - int _nchans; - int _call_count; - std::unique_ptr<Unpacker> _unpacker; - std::unique_ptr<ScaledTransposeTFtoTFT> _transposer; - DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db; - DoubleDeviceBuffer<PackedChannelisedVoltageType> _packed_channelised_voltage; - thrust::device_vector<UnpackedVoltageType> _unpacked_voltage; - thrust::device_vector<ChannelisedVoltageType> _channelised_voltage; - cudaStream_t _h2d_stream; - cudaStream_t _proc_stream; - cudaStream_t _d2h_stream; -}; - - -} //edd -} //effelsberg -} //psrdada_cpp - -#endif //PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index 9fcfaae3faa8bfdc87080acb854928ac71231817..847106d09b148a8590c97d922ee1623e6ed564dc 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -27,73 +27,81 @@ namespace edd { typedef unsigned long long int uint64_cu; static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!"); - typedef uint64_t RawVoltageType; - typedef float UnpackedVoltageType; - typedef float2 ChannelisedVoltageType; +typedef uint64_t RawVoltageType; +typedef float UnpackedVoltageType; +typedef float2 ChannelisedVoltageType; - typedef float IntegratedPowerType; - //typedef int8_t IntegratedPowerType; +typedef float IntegratedPowerType; +//typedef int8_t IntegratedPowerType; - // Input data and intermediate processing data for one polarization - struct PolarizationData +/// Input data and intermediate processing data for one polarization +struct PolarizationData +{ + /// Raw ADC Voltage + DoubleDeviceBuffer<RawVoltageType> _raw_voltage; + /// Side channel data + DoubleDeviceBuffer<uint64_t> _sideChannelData; + + /// Baseline in gate 0 state + thrust::device_vector<UnpackedVoltageType> _baseLineG0; + /// Baseline in gate 1 state + thrust::device_vector<UnpackedVoltageType> _baseLineG1; + /// Channelized voltage in gate 0 state + thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0; + /// Channelized voltage in gate 1 state + thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1; + + /// Swaps input buffers + void swap() { - DoubleDeviceBuffer<RawVoltageType> _raw_voltage; - DoubleDeviceBuffer<uint64_t> _sideChannelData; + _raw_voltage.swap(); + _sideChannelData.swap(); + } +}; - thrust::device_vector<UnpackedVoltageType> _baseLineG0; - thrust::device_vector<UnpackedVoltageType> _baseLineG1; - thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0; - thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1; - void swap() - { - _raw_voltage.swap(); - _sideChannelData.swap(); - } - }; +// Output data for one gate +struct StokesOutput +{ + /// Stokes parameters + DoubleDeviceBuffer<IntegratedPowerType> I; + DoubleDeviceBuffer<IntegratedPowerType> Q; + DoubleDeviceBuffer<IntegratedPowerType> U; + DoubleDeviceBuffer<IntegratedPowerType> V; + /// Number of samples integrated in this output block + DoubleDeviceBuffer<uint64_cu> _noOfBitSets; - // Outptu data for one gate - class StokesOutput + /// Reset outptu for new integration + void reset(cudaStream_t &_proc_stream) { - public: - DoubleDeviceBuffer<IntegratedPowerType> I; - DoubleDeviceBuffer<IntegratedPowerType> Q; - DoubleDeviceBuffer<IntegratedPowerType> U; - DoubleDeviceBuffer<IntegratedPowerType> V; - - DoubleDeviceBuffer<uint64_cu> _noOfBitSets; - - void reset(cudaStream_t &_proc_stream) - { - thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.); - thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.); - thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.); - thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.); - - thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L); - } - - void swap() - { - I.swap(); - Q.swap(); - U.swap(); - V.swap(); - _noOfBitSets.swap(); - - } - - void resize(size_t size, size_t blocks) - { - I.resize(size * blocks); - Q.resize(size * blocks); - U.resize(size * blocks); - V.resize(size * blocks); - - _noOfBitSets.resize(blocks); - } - }; + thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L); + } + + /// Swap output buffers + void swap() + { + I.swap(); + Q.swap(); + U.swap(); + V.swap(); + _noOfBitSets.swap(); + } + + /// Resize all buffers + void resize(size_t size, size_t blocks) + { + I.resize(size * blocks); + Q.resize(size * blocks); + U.resize(size * blocks); + V.resize(size * blocks); + _noOfBitSets.resize(blocks); + } +}; @@ -291,23 +299,6 @@ __global__ void stokes_accumulate(float2 const __restrict__ *pol1, } -/** - * @brief calculate stokes IQUV from two complex valuies for each polarization - */ -__device__ stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V) -{ -// I = fabs(p1.x*p1.x + p1.y * p1.y) + fabs(p2.x*p2.x + p2.y * p2.y); -// Q = fabs(p1.x*p1.x + p1.y * p1.y) - fabs(p2.x*p2.x + p2.y * p2.y); -// U = p1.x*p2.x + p1.y * p2.y; -// V = p1.y*p2.x - p1.x * p2.y; -} - - - -// __global__ void stokes_accumulate(float2 const __restrict__ *pol1, float2 const __restrict__ *pol2, float ... output int nchans, int naccumulate) - - - } // edd diff --git a/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu index d88d6fc5b2622d45abcaa77f392ae143d7653ce3..1f17862a8795a72b3c6177ab9f3e3cac12817401 100644 --- a/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu @@ -52,7 +52,7 @@ FftSpectrometer<HandlerType>::FftSpectrometer( _raw_voltage_db.resize(n64bit_words); BOOST_LOG_TRIVIAL(debug) << "Input voltages size (in 64-bit words): " << _raw_voltage_db.size(); //_unpacked_voltage.resize(nsamps_per_buffer); - _unpacked_voltage.resize(_nchans * batch * 2); + _unpacked_voltage.resize(nsamps_per_buffer); //BOOST_LOG_TRIVIAL(debug) << "Unpacked voltages size (in samples): " << _unpacked_voltage.size(); //_channelised_voltage.resize(_nchans * batch); //BOOST_LOG_TRIVIAL(debug) << "Channelised voltages size: " << _channelised_voltage.size(); @@ -161,19 +161,19 @@ bool FftSpectrometer<HandlerType>::operator()(RawBytes& block) _power_db.size() * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost, _d2h_stream)); - + if (_call_count == 3) { return false; - } + } //Wrap _detected_host_previous in a RawBytes object here; RawBytes bytes(reinterpret_cast<char*>(_host_power_db.b_ptr()), _host_power_db.size() * sizeof(IntegratedPowerType), _host_power_db.size() * sizeof(IntegratedPowerType)); BOOST_LOG_TRIVIAL(debug) << "Calling handler"; - // The handler can't do anything asynchronously without a copy here - // as it would be unsafe (given that it does not own the memory it + // The handler can't do anything asynchronously without a copy here + // as it would be unsafe (given that it does not own the memory it // is being passed). return _handler(bytes); } diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index 48d6af14a9845477080bf6726cadec42e996e467..23e1647bc272d88c272b109b79430153b0d71d24 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -86,26 +86,30 @@ __global__ void gating(float* __restrict__ G0, // Reduce G0, G1 sum_reduce<uint32_t>(x, _G0stats); - if(threadIdx.x == 0) + if(threadIdx.x == 0) { atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]); + } __syncthreads(); sum_reduce<uint32_t>(x, _G1stats); - if(threadIdx.x == 0) + if(threadIdx.x == 0) { atomicAdd(stats_G1, (uint64_cu) x[threadIdx.x]); + } __syncthreads(); //reuse shared array float *y = (float*) x; //update the baseline array sum_reduce<float>(y, baselineUpdateG0); - if(threadIdx.x == 0) + if(threadIdx.x == 0) { atomicAdd(baseLineNG0, y[threadIdx.x]); + } __syncthreads(); sum_reduce<float>(y, baselineUpdateG1); - if(threadIdx.x == 0) + if(threadIdx.x == 0) { atomicAdd(baseLineNG1, y[threadIdx.x]); + } __syncthreads(); } @@ -289,8 +293,9 @@ void GatedSpectrometer<HandlerType>::gated_fft( uint64_t NG0 = 0; uint64_t NG1 = 0; - // Loop over outputblocks, for case of multiple output blocks per input block +// Loop over outputblocks, for case of multiple output blocks per input block int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size(); + for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++) { // ToDo: Should be in one kernel call gating<<<1024, 1024, 0, _proc_stream>>>( diff --git a/psrdada_cpp/effelsberg/edd/src/Channeliser.cu b/psrdada_cpp/effelsberg/edd/src/Channeliser.cu deleted file mode 100644 index c5a13b7d157c91383546328e8cea0f926f718e90..0000000000000000000000000000000000000000 --- a/psrdada_cpp/effelsberg/edd/src/Channeliser.cu +++ /dev/null @@ -1,156 +0,0 @@ -#include "psrdada_cpp/effelsberg/edd/Channeliser.cuh" -#include "psrdada_cpp/common.hpp" -#include "psrdada_cpp/cuda_utils.hpp" -#include "psrdada_cpp/raw_bytes.hpp" -#include "thrust/functional.h" -#include "thrust/transform.h" -#include <cuda.h> - -namespace psrdada_cpp { -namespace effelsberg { -namespace edd { - -Channeliser::Channeliser( - std::size_t buffer_bytes, - std::size_t fft_length, - std::size_t nbits, - float input_level, - float output_level, - DadaWriteClient& client) - : _buffer_bytes(buffer_bytes) - , _fft_length(fft_length) - , _nbits(nbits) - , _client(client) - , _fft_plan(0) - , _call_count(0) -{ - - assert(((_nbits == 12) || (_nbits == 8))); - BOOST_LOG_TRIVIAL(debug) - << "Creating new Channeliser instance with parameters: \n" - << "fft_length = " << _fft_length << "\n" - << "nbits = " << _nbits; - std::size_t nsamps_per_buffer = buffer_bytes * 8 / nbits; - assert(nsamps_per_buffer % _fft_length == 0 /*Number of samples is not multiple of FFT size*/); - std::size_t n64bit_words = buffer_bytes / sizeof(uint64_t); - _nchans = _fft_length / 2 + 1; - int batch = nsamps_per_buffer/_fft_length; - std::size_t packed_channelised_voltage_bytes = _nchans * batch * sizeof(PackedChannelisedVoltageType); - BOOST_LOG_TRIVIAL(debug) << "Output buffer bytes: " << packed_channelised_voltage_bytes; - assert(_client.data_buffer_size() == packed_channelised_voltage_bytes /* Incorrect output DADA buffer size */); - BOOST_LOG_TRIVIAL(debug) << "Calculating scales and offsets"; - float scale = std::sqrt(_nchans) * input_level; - BOOST_LOG_TRIVIAL(debug) << "Correction factors for 8-bit conversion: scaling = " << scale; - BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan"; - int n[] = {static_cast<int>(_fft_length)}; - //Not we put this into transposed output order, so the inner dimension will be time. - CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, - NULL, 1, _nchans, CUFFT_R2C, batch)); - cufftSetStream(_fft_plan, _proc_stream); - BOOST_LOG_TRIVIAL(debug) << "Allocating memory"; - _raw_voltage_db.resize(n64bit_words); - BOOST_LOG_TRIVIAL(debug) << "Input voltages size (in 64-bit words): " << _raw_voltage_db.size(); - _unpacked_voltage.resize(nsamps_per_buffer); - BOOST_LOG_TRIVIAL(debug) << "Unpacked voltages size (in samples): " << _unpacked_voltage.size(); - _channelised_voltage.resize(_nchans * batch); - BOOST_LOG_TRIVIAL(debug) << "Channelised voltages size: " << _channelised_voltage.size(); - _packed_channelised_voltage.resize(_channelised_voltage.size()); - BOOST_LOG_TRIVIAL(debug) << "Packed channelised voltages size: " << _packed_channelised_voltage.size(); - CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); - CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream)); - CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream)); - CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream)); - _unpacker.reset(new Unpacker(_proc_stream)); - _transposer.reset(new ScaledTransposeTFtoTFT(_nchans, 8192, scale, 0.0, _proc_stream)); -} - -Channeliser::~Channeliser() -{ - BOOST_LOG_TRIVIAL(debug) << "Destroying Channeliser"; - if (!_fft_plan) - cufftDestroy(_fft_plan); - cudaStreamDestroy(_h2d_stream); - cudaStreamDestroy(_proc_stream); - cudaStreamDestroy(_d2h_stream); -} - -void Channeliser::init(RawBytes& block) -{ - BOOST_LOG_TRIVIAL(debug) << "Channeliser init called"; - auto& header_block = _client.header_stream().next(); - /* Populate new header */ - std::memcpy(header_block.ptr(), block.ptr(), block.total_bytes()); - header_block.used_bytes(header_block.total_bytes()); - _client.header_stream().release(); -} - -void Channeliser::process( - thrust::device_vector<RawVoltageType> const& digitiser_raw, - thrust::device_vector<PackedChannelisedVoltageType>& packed_channelised) -{ - BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages"; - switch (_nbits) - { - case 8: _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage); break; - case 12: _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage); break; - default: throw std::runtime_error("Unsupported number of bits"); - } - BOOST_LOG_TRIVIAL(debug) << "Performing FFT"; - UnpackedVoltageType* _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage.data()); - ChannelisedVoltageType* _channelised_voltage_ptr = thrust::raw_pointer_cast(_channelised_voltage.data()); - CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, - (cufftReal*) _unpacked_voltage_ptr, - (cufftComplex*) _channelised_voltage_ptr)); - CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); - _transposer->transpose(_channelised_voltage, packed_channelised); -} - -bool Channeliser::operator()(RawBytes& block) -{ - ++_call_count; - BOOST_LOG_TRIVIAL(debug) << "Channeliser operator() called (count = " << _call_count << ")"; - assert(block.used_bytes() == _buffer_bytes /* Unexpected buffer size */); - - CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream)); - _raw_voltage_db.swap(); - - CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void*>(_raw_voltage_db.a_ptr()), - static_cast<void*>(block.ptr()), block.used_bytes(), - cudaMemcpyHostToDevice, _h2d_stream)); - - if (_call_count == 1) - { - return false; - } - - // Synchronize all streams - CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); - _packed_channelised_voltage.swap(); - process(_raw_voltage_db.b(), _packed_channelised_voltage.a()); - - if (_call_count == 2) - { - return false; - } - CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); - - if (_call_count > 3) - { - _client.data_stream().release(); - } - auto& data_block = _client.data_stream().next(); - CUDA_ERROR_CHECK(cudaMemcpyAsync( - static_cast<void*>(data_block.ptr()), - static_cast<void*>(_packed_channelised_voltage.b_ptr()), - _packed_channelised_voltage.size() * sizeof(PackedChannelisedVoltageType), - cudaMemcpyDeviceToHost, - _d2h_stream)); - data_block.used_bytes(data_block.total_bytes()); - return false; -} - -} //edd -} //effelsberg -} //psrdada_cpp - - diff --git a/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt index 07a067a75f68ed7710fdc2ab7ebec33bde7a7106..bce6adee58fb4de23ef26366e9271897abcd0859 100644 --- a/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt @@ -4,7 +4,6 @@ link_directories(${GTEST_LIBRARY_DIR}) set(gtest_edd_src gtest_edd_src.cu - src/ChanneliserTester.cu src/DetectorAccumulatorTester.cu src/FftSpectrometerTester.cu src/GatedSpectrometerTest.cu diff --git a/psrdada_cpp/effelsberg/edd/test/src/ChanneliserTester.cu b/psrdada_cpp/effelsberg/edd/test/src/ChanneliserTester.cu deleted file mode 100644 index 2dee6300563dfb856363ba13ac40950523c084f1..0000000000000000000000000000000000000000 --- a/psrdada_cpp/effelsberg/edd/test/src/ChanneliserTester.cu +++ /dev/null @@ -1,83 +0,0 @@ -#include "psrdada_cpp/effelsberg/edd/test/ChanneliserTester.cuh" -#include "psrdada_cpp/effelsberg/edd/Channeliser.cuh" -#include "psrdada_cpp/dada_null_sink.hpp" -#include "psrdada_cpp/dada_db.hpp" -#include "psrdada_cpp/dada_write_client.hpp" -#include "psrdada_cpp/dada_input_stream.hpp" -#include "psrdada_cpp/double_host_buffer.cuh" -#include "psrdada_cpp/raw_bytes.hpp" -#include "psrdada_cpp/cuda_utils.hpp" -#include <vector> -#include <thread> -#include <chrono> -#include <exception> - -namespace psrdada_cpp { -namespace effelsberg { -namespace edd { -namespace test { - -ChanneliserTester::ChanneliserTester() - : ::testing::Test() -{ - -} - -ChanneliserTester::~ChanneliserTester() -{ - -} - -void ChanneliserTester::SetUp() -{ -} - -void ChanneliserTester::TearDown() -{ -} - -void ChanneliserTester::performance_test(std::size_t fft_length, std::size_t nbits) -{ - std::size_t input_block_bytes = fft_length * 8192 * 1024 * nbits / 8; - std::size_t output_block_bytes = (fft_length/2 + 1) * 8192 * 1024 * sizeof(Channeliser::PackedChannelisedVoltageType); - DadaDB output_buffer(8, output_block_bytes); - output_buffer.create(); - MultiLog log("test_log"); - NullSink null_sink; - DadaInputStream<NullSink> consumer(output_buffer.key(), log, null_sink); - std::thread consumer_thread( [&](){ - try { - consumer.start(); - } catch (std::exception& e) { - BOOST_LOG_TRIVIAL(error) << e.what(); - } - }); - DadaWriteClient client(output_buffer.key(), log); - DoublePinnedHostBuffer<char> input_block; - input_block.resize(input_block_bytes); - RawBytes input_raw_bytes(input_block.a_ptr(), input_block_bytes, input_block_bytes); - std::vector<char> header_block(4096); - RawBytes header_raw_bytes(header_block.data(), 4096, 4096); - Channeliser channeliser(input_block_bytes, fft_length, nbits, 16.0f, 16.0f, client); - channeliser.init(header_raw_bytes); - for (int ii = 0; ii < 100; ++ii) - { - channeliser(input_raw_bytes); - } - consumer.stop(); - std::this_thread::sleep_for(std::chrono::seconds(1)); - channeliser(input_raw_bytes); - consumer_thread.join(); -} - - -TEST_F(ChanneliserTester, simple_exec_test) -{ - performance_test(16, 12); - performance_test(32, 8); -} - -} //namespace test -} //namespace edd -} //namespace meerkat -} //namespace psrdada_cpp