diff --git a/psrdada_cpp/CMakeLists.txt b/psrdada_cpp/CMakeLists.txt index 264d0bf0e9938cc857d8ea9f1077ae155ed46fd1..083bac8dca90e22b2a2c1f75f8019238f64a554f 100644 --- a/psrdada_cpp/CMakeLists.txt +++ b/psrdada_cpp/CMakeLists.txt @@ -47,6 +47,7 @@ set(psrdada_cpp_inc raw_bytes.hpp sigprocheader.hpp simple_file_writer.hpp + testing/consumer.hpp ) # -- the main library target @@ -96,5 +97,5 @@ install (TARGETS ${CMAKE_PROJECT_NAME} install(FILES ${psrdada_cpp_inc} DESTINATION include/psrdada_cpp) install(DIRECTORY detail DESTINATION include/psrdada_cpp) -add_subdirectory(meerkat) +# add_subdirectory(meerkat) add_subdirectory(effelsberg) diff --git a/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp b/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp index 75d2472a2153d69e7a908adbcf52773dd7f3c724..31fa4189542625220f42324a7808b5f7d655462f 100644 --- a/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp +++ b/psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp @@ -32,7 +32,7 @@ class DadaBufferLayout // number of side channels DadaBufferLayout(); DadaBufferLayout(key_t input_key , size_t heapSize, size_t nSideChannels); - void intitialize(key_t input_key , size_t heapSize, size_t nSideChannels); + void initialize(key_t input_key , size_t heapSize, size_t nSideChannels); key_t getInputkey() const; diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index d6cf309f54ec8e8f90afa5f6482098cf9496628a..676792be257976f1447fb918fbc22f5b81b55209 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -345,6 +345,9 @@ public: bool operator()(RawBytes &block); // make the relevant processing methods proteceed only for testing + + std::size_t call_count(){return _call_count;} + std::size_t nblocks(){return _nBlocks;} protected: /// Processing strategy for single pol mode void process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream); diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index 7a33bebab460b8ae91a0ba0cde6b985677a26421..9cd22b8384d42ac0fea2240c46cb16d35267543e 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -328,25 +328,24 @@ bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes if (newBlock) { BOOST_LOG_TRIVIAL(debug) << "Starting new output block."; - CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); + //CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); outputDataStream->swap(_proc_stream); } BOOST_LOG_TRIVIAL(debug) << "Processing block."; - process(inputDataStream, outputDataStream); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); + process(inputDataStream, outputDataStream); BOOST_LOG_TRIVIAL(debug) << "Processing block finished."; /// For one pol input and power out /// ToDo: For two pol input and power out /// ToDo: For two pol input and stokes out - - if ((_call_count == 2) || (!newBlock)) { return false; } + CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); outputDataStream->data2Host(_d2h_stream); - if (_call_count == 3) { + if (_call_count < 3+_nBlocks) { return false; } diff --git a/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp b/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp index 1fd89346bc805481070e18205fc7baaad23b03d5..6e127fd7888814b20c39794411ce1c83ae4249b8 100644 --- a/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp +++ b/psrdada_cpp/effelsberg/edd/src/DadaBufferLayout.cpp @@ -12,10 +12,10 @@ DadaBufferLayout::DadaBufferLayout() {} DadaBufferLayout::DadaBufferLayout(key_t input_key, size_t heapSize, size_t nSideChannels) { - intitialize(input_key, heapSize, nSideChannels); + initialize(input_key, heapSize, nSideChannels); } -void DadaBufferLayout::intitialize(key_t input_key, size_t heapSize, size_t nSideChannels) +void DadaBufferLayout::initialize(key_t input_key, size_t heapSize, size_t nSideChannels) { _input_key = input_key; _heapSize = heapSize; diff --git a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu index 29e995fd612597096f007483200fb47056645f70..0f6996bbe0a502b129ae3813f7ed0c21dfb788d1 100644 --- a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu +++ b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu @@ -41,8 +41,7 @@ void launchSpectrometer(const effelsberg::edd::GatedSpectrometerInputParameters effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> spectrometer(i, sink); - DadaInputStream<decltype(spectrometer)> istream(i.dadaBufferLayout.getInputkey(), log, - spectrometer); + DadaInputStream<decltype(spectrometer)> istream(i.dadaBufferLayout.getInputkey(), log, spectrometer); istream.start(); } else if (output_type == "dada") @@ -51,8 +50,7 @@ void launchSpectrometer(const effelsberg::edd::GatedSpectrometerInputParameters effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> spectrometer(i, sink); - DadaInputStream<decltype(spectrometer)> istream(i.dadaBufferLayout.getInputkey(), log, - spectrometer); + DadaInputStream<decltype(spectrometer)> istream(i.dadaBufferLayout.getInputkey(), log, spectrometer); istream.start(); } else if (output_type == "profile") @@ -234,7 +232,7 @@ int main(int argc, char **argv) { throw po::validation_error(po::validation_error::invalid_option_value, "Stokes output requires dual polarization input!"); } - ip.dadaBufferLayout.intitialize(input_key, ip.speadHeapSize, ip.nSideChannels); + ip.dadaBufferLayout.initialize(input_key, ip.speadHeapSize, ip.nSideChannels); io_eval<float>(ip, input_polarizations, output_format, filename, output_type ); diff --git a/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt index b445b414fb9de552fd00469844f7e0e3e1d55c9b..9ab019ecc0e56574bfb1535adc06ecd0fea43d80 100644 --- a/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt @@ -18,6 +18,7 @@ set(gtest_edd_src src/SKRfiReplacement.cpp src/SpectralKurtosisTester.cpp src/SpectralKurtosisCudaTester.cu + SpectrometerPipelineTester.cuh ) cuda_add_executable(gtest_edd ${gtest_edd_src} ) target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas ${CUDA_DEPENDENCY_LIBRARIES} ${GTEST_LIBRARIES}) diff --git a/psrdada_cpp/effelsberg/edd/test/SpectrometerPipelineTester.cuh b/psrdada_cpp/effelsberg/edd/test/SpectrometerPipelineTester.cuh new file mode 100644 index 0000000000000000000000000000000000000000..1a09a289b522232a6971e0321fcee303fcd9c38d --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/test/SpectrometerPipelineTester.cuh @@ -0,0 +1,41 @@ +#pragma once + +#include <gtest/gtest.h> +#include <string> +#include <vector> + +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/dada_db.hpp" +#include "psrdada_cpp/testing/consumer.hpp" +#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh" + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { +namespace test { + +template<typename Handler, typename InputType, typename OutputType> +class SpectrometerPipelineTester +{ +public: + SpectrometerPipelineTester(GatedSpectrometerInputParameters &config, Handler &handle); + ~SpectrometerPipelineTester(); + void test_buffer_writing(std::size_t nblock=1, std::size_t runs=16); + void init(); + +private: + GatedSpectrometerInputParameters _config; + std::unique_ptr<psrdada_cpp::DadaDB> input_buffer; + std::unique_ptr<GatedSpectrometer<Handler, InputType, OutputType>> test_object; + Handler& _output_handler; +}; + + +} +} +} +} + +#include "psrdada_cpp/effelsberg/edd/test/src/SpectrometerPipelineTester.cu" \ No newline at end of file diff --git a/psrdada_cpp/effelsberg/edd/test/gtest_edd_src.cu b/psrdada_cpp/effelsberg/edd/test/gtest_edd_src.cu index b97f62a969d4fad4d0bc2ab81071763ab580bf29..ac90bdcde97af402b626f601e4065693507df830 100644 --- a/psrdada_cpp/effelsberg/edd/test/gtest_edd_src.cu +++ b/psrdada_cpp/effelsberg/edd/test/gtest_edd_src.cu @@ -2,6 +2,8 @@ #include "psrdada_cpp/cli_utils.hpp" #include <cstdlib> +#include "psrdada_cpp/effelsberg/edd/test/SpectrometerPipelineTester.cuh" + int main(int argc, char **argv) { char * val = getenv("LOG_LEVEL"); if ( val ) diff --git a/psrdada_cpp/effelsberg/edd/test/src/SpectrometerPipelineTester.cu b/psrdada_cpp/effelsberg/edd/test/src/SpectrometerPipelineTester.cu new file mode 100644 index 0000000000000000000000000000000000000000..dae2bc5e66bfe2018c455e30f54fc197008c5a39 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/test/src/SpectrometerPipelineTester.cu @@ -0,0 +1,155 @@ +#pragma once + + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { +namespace test { + + +template<typename Handler, typename InputType, typename OutputType> +SpectrometerPipelineTester<Handler, InputType, OutputType>::SpectrometerPipelineTester( + GatedSpectrometerInputParameters &config, + Handler &handle +) + : _config(config), _output_handler(handle) +{ + test_object.reset(new GatedSpectrometer<Handler, InputType, OutputType>(_config, handle)); +} + + +template<typename Handler, typename InputType, typename OutputType> +SpectrometerPipelineTester<Handler, InputType, OutputType>::~SpectrometerPipelineTester() +{ + BOOST_LOG_TRIVIAL(debug) << "Destroying SpectrometerPipelineTester object"; + test_object.reset(); +} + +template<typename Handler, typename InputType, typename OutputType> +void SpectrometerPipelineTester<Handler, InputType, OutputType>::init() +{ + std::vector<char> header(4096); + psrdada_cpp::RawBytes block(reinterpret_cast<char *>(header.data()), header.size()); + test_object->init(block); +} + +template<typename Handler, typename InputType, typename OutputType> +void SpectrometerPipelineTester<Handler, InputType, OutputType>::test_buffer_writing(std::size_t nblock, std::size_t runs) +{ + BOOST_LOG_TRIVIAL(debug) << "Testing buffer writing"; + std::size_t num_obuffers = 0; + std::vector<char> idata(_config.fft_length * _config.naccumulate/nblock + _config.nSideChannels * 8 * _config.dadaBufferLayout.getNHeaps()); + psrdada_cpp::RawBytes block(reinterpret_cast<char *>(idata.data()), idata.size(), idata.size()); + this->init(); + ASSERT_EQ(test_object->nblocks(), nblock); + for(std::size_t i = 0; i < runs; i++) + { + if(i%nblock==0){ + std::fill(idata.begin(), idata.begin() + _config.dadaBufferLayout.sizeOfData(), i/nblock); // Fill value in the block - incremented on each output spectra + } + ASSERT_FALSE(test_object->operator()(block)); + // If true, the consumer got a new output block + if(_output_handler.call_count() == num_obuffers + 1) + { + num_obuffers = _output_handler.call_count(); + // DO testing + // First buffer should contain only zeros - The DC bin is zero in the first output otherwise unequal to zero + if(_output_handler.call_count() == 1){ + ASSERT_EQ(_output_handler.data_ptr()[0], 0); + }else{ + ASSERT_NE(_output_handler.data_ptr()[0], 0); + } + } + } +} + + +TEST(SpectrometerPipelineTester, test_buffer_writing_single_power_1_1) +{ + GatedSpectrometerInputParameters config; + config.selectedSideChannel = 0; + config.speadHeapSize = 4096; + config.nSideChannels = 1; + config.selectedBit = 1; + config.fft_length = 65536; + config.naccumulate = 4096; + config.nbits = 8; + std::size_t n_sidechannel = config.fft_length * config.speadHeapSize / config.naccumulate; + std::size_t nblock = 1; // 1 input block per 1 output block (1:1) + psrdada_cpp::DadaDB buffer(16, (config.fft_length*config.naccumulate*config.nbits/8 + 8*n_sidechannel) / nblock); + buffer.create(); + config.dadaBufferLayout.initialize(buffer.key(), config.speadHeapSize, config.nSideChannels); + buffer.destroy(); + testing::DummyConsumer<float> consumer; // Create a consumer -> output handler + SpectrometerPipelineTester<decltype(consumer), SinglePolarizationInput, GatedPowerSpectrumOutput> tester(config, consumer); + tester.test_buffer_writing(nblock); +} + +TEST(SpectrometerPipelineTester, test_buffer_writing_single_power_1_2) +{ + GatedSpectrometerInputParameters config; + config.selectedSideChannel = 0; + config.speadHeapSize = 4096; + config.nSideChannels = 1; + config.selectedBit = 1; + config.fft_length = 65536; + config.naccumulate = 4096; + config.nbits = 8; + std::size_t n_sidechannel = config.fft_length * config.speadHeapSize / config.naccumulate; + std::size_t nblock = 2; // 2 input blocks per 1 output block (2:1) + psrdada_cpp::DadaDB buffer(16, (config.fft_length*config.naccumulate*config.nbits/8 + 8*n_sidechannel) / nblock); + buffer.create(); + config.dadaBufferLayout.initialize(buffer.key(), config.speadHeapSize, config.nSideChannels); + buffer.destroy(); + testing::DummyConsumer<float> consumer; // Create a consumer -> output handler + SpectrometerPipelineTester<decltype(consumer), SinglePolarizationInput, GatedPowerSpectrumOutput> tester(config, consumer); + tester.test_buffer_writing(nblock); +} + +TEST(SpectrometerPipelineTester, test_buffer_writing_single_power_1_4) +{ + GatedSpectrometerInputParameters config; + config.selectedSideChannel = 0; + config.speadHeapSize = 4096; + config.nSideChannels = 1; + config.selectedBit = 1; + config.fft_length = 65536; + config.naccumulate = 4096; + config.nbits = 8; + std::size_t n_sidechannel = config.fft_length * config.speadHeapSize / config.naccumulate; + std::size_t nblock = 4; // 4 input blocks per 1 output block (4:1) + psrdada_cpp::DadaDB buffer(16, (config.fft_length*config.naccumulate*config.nbits/8 + 8*n_sidechannel) / nblock); + buffer.create(); + config.dadaBufferLayout.initialize(buffer.key(), config.speadHeapSize, config.nSideChannels); + buffer.destroy(); + testing::DummyConsumer<float> consumer; // Create a consumer -> output handler + SpectrometerPipelineTester<decltype(consumer), SinglePolarizationInput, GatedPowerSpectrumOutput> tester(config, consumer); + tester.test_buffer_writing(nblock, 24); +} + +TEST(SpectrometerPipelineTester, test_buffer_writing_single_power_1_8) +{ + GatedSpectrometerInputParameters config; + config.selectedSideChannel = 0; + config.speadHeapSize = 4096; + config.nSideChannels = 1; + config.selectedBit = 1; + config.fft_length = 65536; + config.naccumulate = 4096; + config.nbits = 8; + std::size_t n_sidechannel = config.fft_length * config.speadHeapSize / config.naccumulate; + std::size_t nblock = 8; // 8 input blocks per 1 output block (8:1) + psrdada_cpp::DadaDB buffer(16, (config.fft_length*config.naccumulate*config.nbits/8 + 8*n_sidechannel) / nblock); + buffer.create(); + config.dadaBufferLayout.initialize(buffer.key(), config.speadHeapSize, config.nSideChannels); + buffer.destroy(); + testing::DummyConsumer<float> consumer; // Create a consumer -> output handler + SpectrometerPipelineTester<decltype(consumer), SinglePolarizationInput, GatedPowerSpectrumOutput> tester(config, consumer); + tester.test_buffer_writing(nblock, 32); +} + + +} +} +} +} \ No newline at end of file diff --git a/psrdada_cpp/testing/consumer.hpp b/psrdada_cpp/testing/consumer.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5e46f04e0cedf10d0663ed2f72cdee16889162e0 --- /dev/null +++ b/psrdada_cpp/testing/consumer.hpp @@ -0,0 +1,81 @@ +#pragma once + +#include <random> +#include <cmath> +#include <cassert> +#include <vector> +#include <psrdada_cpp/raw_bytes.hpp> + +namespace psrdada_cpp{ +namespace testing{ + +class AbstractConsumer +{ +public: + virtual ~AbstractConsumer() {} + virtual void init(psrdada_cpp::RawBytes &block) = 0; + virtual bool operator()(psrdada_cpp::RawBytes &block) = 0; +}; + +template<typename T> +class DummyConsumer : public AbstractConsumer +{ +public: + DummyConsumer() + { + _header.resize(0); + _data.resize(0); + _call_count = 0; + }; + + ~DummyConsumer() + { + BOOST_LOG_TRIVIAL(debug) << "Destroyed DummyConsumer"; + }; + + void init(psrdada_cpp::RawBytes &block) override + { + if(_header.size() != block.used_bytes()){_header.resize(block.used_bytes());} + std::copy(block.ptr(), block.ptr() + block.used_bytes(), _header.data()); + }; + + bool operator()(psrdada_cpp::RawBytes &block) override + { + _call_count++; + BOOST_LOG_TRIVIAL(debug) << "DummyConsumer got new data block"; + if(_data.size() * sizeof(T) != block.used_bytes()){_data.resize(block.used_bytes() / sizeof(T));} + std::memcpy(_data.data(), block.ptr(), block.used_bytes()); + return false; + }; + + T* data_ptr(std::size_t position=0) + { + return &_data.data()[position]; + } + std::vector<T> data() + { + return _data; + } + + T* header_ptr(std::size_t position=0) + { + return &_header.data()[position]; + } + + std::vector<T> header() + { + return _header; + } + + std::size_t call_count(){return _call_count;} + +private: + std::size_t _header_bytes; + std::size_t _payload_bytes; + std::size_t _call_count; + std::vector<T> _header; + std::vector<T> _data; +}; + +} +} \ No newline at end of file