diff --git a/psrdada_cpp/merger/cli/edd_merge_cli.cpp b/psrdada_cpp/merger/cli/edd_merge_cli.cpp index ed6377daed423504b20f80ebb3e63164aeb9bbf1..c63c4896a90f76fe32622c9ae9e30de262e0184e 100644 --- a/psrdada_cpp/merger/cli/edd_merge_cli.cpp +++ b/psrdada_cpp/merger/cli/edd_merge_cli.cpp @@ -57,7 +57,7 @@ int main(int argc, char** argv) "Value of number of threads") ("heap_size,b", po::value<std::size_t>(&heap_size)->default_value(8000), "Size of a heap") ("npol,p", po::value<std::size_t>(&npol)->default_value(2), "Size of a heap") - ("merge_type,t", po::value<std::string>(&mtype)->default_value("pfb"), "The merge type, either 'pfb' or 'pol'") + ("merge_type,t", po::value<std::string>(&mtype)->default_value("pfb"), "The merge type, either 'pfb', '10' or '8'") ("log_level", po::value<std::string>() ->default_value("info") ->notifier([](std::string level) @@ -94,7 +94,13 @@ int main(int argc, char** argv) merger::Pipeline<decltype(merger), decltype(output)> pipeline(merger, output); DadaInputStream <decltype(pipeline)> input(input_key, log, pipeline); input.start(); - }else{ + }else if (mtype == "10"){ + merger::PolnMerger10to8 merger(npol, nthreads); + merger::Pipeline<decltype(merger), decltype(output)> pipeline(merger, output); + DadaInputStream <decltype(pipeline)> input(input_key, log, pipeline); + input.start(); + } + else{ merger::PolnMerger merger(npol, nthreads); merger::Pipeline<decltype(merger), decltype(output)> pipeline(merger, output); DadaInputStream <decltype(pipeline)> input(input_key, log, pipeline); diff --git a/psrdada_cpp/merger/merger.hpp b/psrdada_cpp/merger/merger.hpp index 94fd6b6d0f73ca5d73b3c78c6c08e56a7bf4f5cd..0212c1dcd1a33aececfed1b921f8be5e40ec5f13 100644 --- a/psrdada_cpp/merger/merger.hpp +++ b/psrdada_cpp/merger/merger.hpp @@ -12,7 +12,8 @@ namespace psrdada_cpp { namespace merger { -struct merge_conf{ +// Configuration structure remains the same. +struct merge_conf { std::size_t npol; std::size_t nchunk; std::size_t nthreads; @@ -22,34 +23,62 @@ struct merge_conf{ } }; -class PFBMerger -{ +// New common base class, which is polymorphic. +// Any merger class will inherit from this. +class Merger { +public: + virtual ~Merger() {} + virtual void process(char* idata, char* odata, std::size_t size) = 0; +}; + +//--------------------------------------------------------------------- +// PFBMerger now inherits from Merger. +//--------------------------------------------------------------------- +class PFBMerger : public Merger { public: PFBMerger(std::size_t nchunk, std::size_t nthreads, std::size_t heap_size); PFBMerger(merge_conf conf); - ~PFBMerger(); + virtual ~PFBMerger(); - void process(char* idata, char* odata, std::size_t size); + virtual void process(char* idata, char* odata, std::size_t size) override; private: std::size_t _nchunk; std::size_t _nthreads; std::size_t _heap_size; }; -class PolnMerger -{ +//--------------------------------------------------------------------- +// PolnMerger now inherits from Merger. +//--------------------------------------------------------------------- +class PolnMerger : public Merger { public: PolnMerger(std::size_t npol, std::size_t nthreads); PolnMerger(merge_conf conf); - ~PolnMerger(); + virtual ~PolnMerger(); + + virtual void process(char* idata, char* odata, std::size_t size) override; +private: + std::size_t _npol; + std::size_t _nthreads; +}; + +//--------------------------------------------------------------------- +// PolnMerger10to8 now inherits from Merger. +//--------------------------------------------------------------------- +class PolnMerger10to8 : public Merger { +public: + PolnMerger10to8(std::size_t npol, std::size_t nthreads); + PolnMerger10to8(merge_conf conf); + + virtual ~PolnMerger10to8(); - void process(char* idata, char* odata, std::size_t size); + virtual void process(char* idata, char* odata, std::size_t size) override; private: std::size_t _npol; std::size_t _nthreads; }; -} -} \ No newline at end of file +} // namespace merger +} // namespace psrdada_cpp diff --git a/psrdada_cpp/merger/src/merger.cpp b/psrdada_cpp/merger/src/merger.cpp index ec15e2801b460fcff331742fce235f092dbe2668..7d844abb96108775d82d882b8a823c39160c421e 100644 --- a/psrdada_cpp/merger/src/merger.cpp +++ b/psrdada_cpp/merger/src/merger.cpp @@ -1,27 +1,114 @@ #include "psrdada_cpp/merger/merger.hpp" #include <cstdint> +#include <cstring> +#include <stdexcept> +#include <vector> +#include <omp.h> +#include <immintrin.h> +#include <arpa/inet.h> // for be64toh + +// Anonymous namespace for helper routines needed for 10-bit unpacking. +namespace { + + // Interleave two 32-bit values using SSE instructions. + uint64_t interleave(uint32_t x, uint32_t y) { + __m128i xvec = _mm_cvtsi32_si128(x); + __m128i yvec = _mm_cvtsi32_si128(y); + __m128i interleaved = _mm_unpacklo_epi8(yvec, xvec); + return _mm_cvtsi128_si64(interleaved); + } + + // Unpacks 32 10-bit values from 5 packed 64-bit words. + // The input pointer 'qword' advances by 5 words on output. + uint64_t* unpack5(uint64_t* qword, uint8_t* out) + { + uint64_t val, rest; + // 1st word + val = be64toh(*qword); + qword++; + out[0] = (int8_t)(((int64_t)(( 0xFFC0000000000000ULL & val) << 0) >> 54) >> 2); + out[1] = (int8_t)(((int64_t)(( 0x003FF00000000000ULL & val) << 10) >> 54) >> 2); + out[2] = (int8_t)(((int64_t)(( 0x00000FFC00000000ULL & val) << 20) >> 54) >> 2); + out[3] = (int8_t)(((int64_t)(( 0x00000003FF000000ULL & val) << 30) >> 54) >> 2); + out[4] = (int8_t)(((int64_t)(( 0x0000000000FFC000ULL & val) << 40) >> 54) >> 2); + out[5] = (int8_t)(((int64_t)(( 0x0000000000003FF0ULL & val) << 50) >> 54) >> 2); + rest = ( 0x000000000000000FULL & val) << 60; // 4 bits rest. + + // 2nd word + val = be64toh(*qword); + qword++; + out[6] = (int8_t)(((int64_t)(((0xFC00000000000000ULL & val) >> 4) | rest) >> 54) >> 2); + out[7] = (int8_t)(((int64_t)(( 0x03FF000000000000ULL & val) << 6) >> 54) >> 2); + out[8] = (int8_t)(((int64_t)(( 0x0000FFC000000000ULL & val) << 16) >> 54) >> 2); + out[9] = (int8_t)(((int64_t)(( 0x0000003FF0000000ULL & val) << 26) >> 54) >> 2); + out[10] = (int8_t)(((int64_t)(( 0x000000000FFC0000ULL & val) << 36) >> 54) >> 2); + out[11] = (int8_t)(((int64_t)(( 0x000000000003FF00ULL & val) << 46) >> 54) >> 2); + rest = ( 0x00000000000000FFULL & val) << 56; // 8 bits rest. + + // 3rd word + val = be64toh(*qword); + qword++; + out[12] = (int8_t)(((int64_t)(((0xC000000000000000ULL & val) >> 8) | rest) >> 54) >> 2); + out[13] = (int8_t)(((int64_t)(( 0x3FF0000000000000ULL & val) << 2) >> 54) >> 2); + out[14] = (int8_t)(((int64_t)(( 0x000FFC0000000000ULL & val) << 12) >> 54) >> 2); + out[15] = (int8_t)(((int64_t)(( 0x000003FF00000000ULL & val) << 22) >> 54) >> 2); + out[16] = (int8_t)(((int64_t)(( 0x00000000FFC00000ULL & val) << 32) >> 54) >> 2); + out[17] = (int8_t)(((int64_t)(( 0x00000000003FF000ULL & val) << 42) >> 54) >> 2); + out[18] = (int8_t)(((int64_t)(( 0x0000000000000FFCULL & val) << 52) >> 54) >> 2); + rest = ( 0x0000000000000003ULL & val) << 62; // 2 bits rest. + + // 4th word + val = be64toh(*qword); + qword++; + out[19] = (int8_t)(((int64_t)(((0xFF00000000000000ULL & val) >> 2) | rest) >> 54) >> 2); + out[20] = (int8_t)(((int64_t)(( 0x00FFC00000000000ULL & val) << 8) >> 54) >> 2); + out[21] = (int8_t)(((int64_t)(( 0x00003FF000000000ULL & val) << 18) >> 54) >> 2); + out[22] = (int8_t)(((int64_t)(( 0x0000000FFC000000ULL & val) << 28) >> 54) >> 2); + out[23] = (int8_t)(((int64_t)(( 0x0000000003FF0000ULL & val) << 38) >> 54) >> 2); + out[24] = (int8_t)(((int64_t)(( 0x000000000000FFC0ULL & val) << 48) >> 54) >> 2); + rest = ( 0x000000000000003FULL & val) << 58; // 6 bits rest. + + // 5th word + val = be64toh(*qword); + qword++; + out[25] = (int8_t)(((int64_t)(((0xF000000000000000ULL & val) >> 6) | rest) >> 54) >> 2); + out[26] = (int8_t)(((int64_t)(( 0x0FFC000000000000ULL & val) << 4) >> 54) >> 2); + out[27] = (int8_t)(((int64_t)(( 0x0003FF0000000000ULL & val) << 14) >> 54) >> 2); + out[28] = (int8_t)(((int64_t)(( 0x000000FFC0000000ULL & val) << 24) >> 54) >> 2); + out[29] = (int8_t)(((int64_t)(( 0x000000003FF00000ULL & val) << 34) >> 54) >> 2); + out[30] = (int8_t)(((int64_t)(( 0x00000000000FFC00ULL & val) << 44) >> 54) >> 2); + out[31] = (int8_t)(((int64_t)(( 0x00000000000003FFULL & val) << 54) >> 54) >> 2); + return qword; + } + +} // end anonymous namespace namespace psrdada_cpp { namespace merger { +//==================================================================== +// PFBMerger (unchanged from before) +//==================================================================== PFBMerger::PFBMerger(std::size_t nchunk, std::size_t nthreads, std::size_t heap_size) : _nchunk(nchunk), _nthreads(nthreads), _heap_size(heap_size){ } + PFBMerger::PFBMerger(merge_conf conf) : _nchunk(conf.nchunk), _nthreads(conf.nthreads), _heap_size(conf.heap_size){ } PFBMerger::~PFBMerger(){ - } void PFBMerger::process(char* idata, char* odata, std::size_t size) { - const std::size_t bytes_per_chunk = 4; + const std::size_t bytes_per_chunk = 4; const std::size_t heap_group = _heap_size * _nchunk; const std::size_t num_chunks = size / heap_group; - if (size % heap_group != 0){ - throw std::runtime_error("Size is " + std::to_string(size) + " and not a multiple of heap group size " + std::to_string(heap_group)); + if (size % heap_group != 0) { + throw std::runtime_error("Size is " + std::to_string(size) + + " and not a multiple of heap group size " + + std::to_string(heap_group)); } #pragma omp parallel for num_threads(_nthreads) for (std::size_t xx = 0; xx < num_chunks; ++xx) @@ -45,11 +132,13 @@ void PFBMerger::process(char* idata, char* odata, std::size_t size) } } - - +//==================================================================== +// PolnMerger: Original 8-bit merger (unchanged) +//==================================================================== PolnMerger::PolnMerger(std::size_t npol, std::size_t nthreads) : _npol(npol), _nthreads(nthreads){ } + PolnMerger::PolnMerger(merge_conf conf) : _npol(conf.npol), _nthreads(conf.nthreads){ } @@ -59,31 +148,91 @@ PolnMerger::~PolnMerger(){ void PolnMerger::process(char* idata, char* odata, std::size_t size) { - const std::size_t _heap_size = 4096; + const std::size_t heap_size = 4096; // 8-bit input: each heap has 4096 bytes per polarization. #pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads) - for (std::size_t kk = 0; kk < size / _heap_size / _npol ; ++kk) + for (std::size_t kk = 0; kk < size / heap_size / _npol; ++kk) { - char *buffer = idata + _heap_size * _npol * kk; + char *buffer = idata + heap_size * _npol * kk; uint8_t const* qword0 = reinterpret_cast<uint8_t const*>(buffer); - uint8_t const* qword1 = reinterpret_cast<uint8_t const*>(buffer) + _heap_size; - uint64_t* tmp = reinterpret_cast<uint64_t*>(odata + kk * _npol * _heap_size); - + uint8_t const* qword1 = reinterpret_cast<uint8_t const*>(buffer) + heap_size; + uint64_t* tmp = reinterpret_cast<uint64_t*>(odata + kk * _npol * heap_size); __m128i xvec, yvec, interleaved; - for (size_t i = 0; i < _heap_size / sizeof(__m128i); ++i) + for (std::size_t i = 0; i < heap_size / sizeof(__m128i); ++i) { xvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword0)); yvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword1)); interleaved = _mm_unpacklo_epi8(xvec, yvec); _mm_storeu_si128(reinterpret_cast<__m128i*>(tmp), interleaved); - tmp+=2; + tmp += 2; interleaved = _mm_unpackhi_epi8(xvec, yvec); _mm_storeu_si128(reinterpret_cast<__m128i*>(tmp), interleaved); - tmp+=2; + tmp += 2; qword0 += 16; qword1 += 16; } } } +//==================================================================== +// PolnMerger10to8: New method for 10-bit to 8-bit conversion. +//==================================================================== +PolnMerger10to8::PolnMerger10to8(std::size_t npol, std::size_t nthreads) + : _npol(npol), _nthreads(nthreads){ } + +PolnMerger10to8::PolnMerger10to8(merge_conf conf) + : _npol(conf.npol), _nthreads(conf.nthreads){ } + +PolnMerger10to8::~PolnMerger10to8(){ +} + +void PolnMerger10to8::process(char* idata, char* odata, std::size_t size) +{ + // For 10-bit data, each heap contains 4096 samples per polarization. + // 4096 samples x 10 bits = 40960 bits = 5120 bytes per polarization. + const std::size_t heap_samples = 4096; // number of samples per polarization (after conversion) + const std::size_t heap_size_in = 640 * sizeof(uint64_t); // 640 64-bit words = 5120 bytes per polarization + const std::size_t num_heaps = size / (heap_size_in * _npol); + if (size % (heap_size_in * _npol) != 0) { + throw std::runtime_error("Size is " + std::to_string(size) + + " and not a multiple of the expected input heap size " + + std::to_string(heap_size_in * _npol)); + } + + #pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads) + for (std::size_t kk = 0; kk < num_heaps; ++kk) + { + // Each heap for _npol polarizations is stored consecutively. + char *buffer = idata + (heap_size_in * _npol) * kk; + // The output buffer is assumed to contain 8-bit samples with the same sample count. + char *out_ptr = odata + (heap_samples * _npol) * kk; + + // For the 10-bit unpacker, this code currently assumes _npol == 2. + uint64_t* qword0 = reinterpret_cast<uint64_t*>(buffer); + uint64_t* qword1 = reinterpret_cast<uint64_t*>(buffer + heap_size_in); + uint64_t* D = reinterpret_cast<uint64_t*>(out_ptr); + + // Temporary buffers to hold 32 unpacked bytes (representing eight 32-bit values). + uint8_t S0_8bit[32]; + uint8_t S1_8bit[32]; + + // There are 640/5 = 128 iterations per heap. + for (int i = 0; i < 128; i++) + { + qword0 = unpack5(qword0, S0_8bit); + qword1 = unpack5(qword1, S1_8bit); + + // Reinterpret the temporary buffers as arrays of eight 32-bit integers. + uint32_t* S0 = reinterpret_cast<uint32_t*>(S0_8bit); + uint32_t* S1 = reinterpret_cast<uint32_t*>(S1_8bit); + for (std::size_t ii = 0; ii < 8; ++ii) + { + *D++ = interleave(S1[ii], S0[ii]); + } + } + } +} + +} // namespace merger +} // namespace psrdada_cpp diff --git a/psrdada_cpp/merger/src/pipeline.cpp b/psrdada_cpp/merger/src/pipeline.cpp index ee64dfe723caa1d957126940b92b41610e4d35bf..1c54881d6af57946951e2644a193a59b95d13bec 100644 --- a/psrdada_cpp/merger/src/pipeline.cpp +++ b/psrdada_cpp/merger/src/pipeline.cpp @@ -44,20 +44,53 @@ bool Pipeline<MergerType, Handler>::operator()(RawBytes& block) { RawBytes& oblock = _handler.data_stream().next(); BOOST_LOG_TRIVIAL(debug) << "block.used_bytes: " << block.used_bytes() - << " oblock.used_bytes " << oblock.used_bytes(); - if (block.used_bytes() > oblock.total_bytes()) + << " oblock.used_bytes " << oblock.used_bytes(); + + // By default, expected output size equals input size (for 8-bit mode) + std::size_t expected_output = block.used_bytes(); + + // Check if we're in 10-to-8 conversion mode. +if (dynamic_cast<psrdada_cpp::merger::PolnMerger10to8*>(&_merger) != nullptr) +{ + BOOST_LOG_TRIVIAL(debug) << "10-to-8 merger mode detected."; + + // Define conversion parameters. + const std::size_t heap_size_in = 640 * sizeof(uint64_t); // 5120 bytes per polarization input heap + const std::size_t heap_samples = 4096; // 4096 bytes per polarization output + const std::size_t npol = 2; // Adjust if using a different number of polarizations or via a getter + + BOOST_LOG_TRIVIAL(debug) << "Conversion parameters: " + << "heap_size_in = " << heap_size_in + << ", heap_samples = " << heap_samples + << ", npol = " << npol; + + // Compute the number of complete heaps in the input buffer. + std::size_t num_heaps = block.used_bytes() / (heap_size_in * npol); + BOOST_LOG_TRIVIAL(debug) << "block.used_bytes() = " << block.used_bytes() + << ", computed num_heaps = " << num_heaps; + + // Calculate the expected output size. + expected_output = num_heaps * (heap_samples * npol); + BOOST_LOG_TRIVIAL(debug) << "Expected output size = " << expected_output + << " bytes based on " << num_heaps << " heaps."; +} + + // Verify that the output buffer is large enough to hold the expected output. + if (expected_output > oblock.total_bytes()) { - _handler.data_stream().release(); // <- Does the release makes sense here? - throw std::runtime_error("Output DADA buffer does not match with the input dada buffer"); + _handler.data_stream().release(); + throw std::runtime_error("Output DADA buffer does not have enough space for the conversion"); } + // Run the merger process. _merger.process(block.ptr(), oblock.ptr(), block.used_bytes()); - oblock.used_bytes(block.used_bytes()); + // Set the actual output size. + oblock.used_bytes(expected_output); _handler.data_stream().release(); + return false; } }//merger }//psrdada_cpp - diff --git a/psrdada_cpp/merger/test/src/merge_tester.cpp b/psrdada_cpp/merger/test/src/merge_tester.cpp index d6653e896f681f2c1ec4b3571fa1adc47dcb6879..ff69cb26d504b284c96db36abcd6a475aaabd99c 100644 --- a/psrdada_cpp/merger/test/src/merge_tester.cpp +++ b/psrdada_cpp/merger/test/src/merge_tester.cpp @@ -97,19 +97,19 @@ std::size_t PolnMergeTester::getBufferSize() { std::vector<char> PolnMergeTester::generateTestVector() { - std::vector<char> test_vector(getBufferSize()); - for (std::size_t ii = 0; ii < params.nheap_groups; ii++) - { - for (std::size_t jj = 0; jj < params.npol; jj++) - { - for (std::size_t kk = 0; kk < params.nsamps_per_heaps; kk++) - { + std::vector<char> test_vector(getBufferSize()); + for (std::size_t ii = 0; ii < params.nheap_groups; ii++) + { + for (std::size_t jj = 0; jj < params.npol; jj++) + { + for (std::size_t kk = 0; kk < params.nsamps_per_heaps; kk++) + { std::size_t idx = ii * params.npol * params.nsamps_per_heaps + jj * params.nsamps_per_heaps + kk; - test_vector[idx] = (ii * 2 + jj) % 128; - } - } - } + test_vector[idx] = (ii * 2 + jj) % 128; + } + } + } return test_vector; } diff --git a/psrdada_cpp/merger/test/src/pipeline_tester.cpp b/psrdada_cpp/merger/test/src/pipeline_tester.cpp index 39fb90513c3cf71e24fbcb799da208e742134ce5..5da6334e78f5aef44b433636402bfcf37f67722e 100644 --- a/psrdada_cpp/merger/test/src/pipeline_tester.cpp +++ b/psrdada_cpp/merger/test/src/pipeline_tester.cpp @@ -43,4 +43,4 @@ INSTANTIATE_TEST_SUITE_P(ParameterizedTest, ProvideParamsForPipelineTester, ::te } } -} \ No newline at end of file +}