Skip to content
Snippets Groups Projects
Commit 4424434c authored by Jason Wu's avatar Jason Wu
Browse files

Merge branch 'edd_merge_10to8' into 'devel'

Edd merge 10to8

See merge request !43
parents 59a29e95 321e445b
Branches
Tags
1 merge request!43Edd merge 10to8
Pipeline #256124 passed
...@@ -57,7 +57,7 @@ int main(int argc, char** argv) ...@@ -57,7 +57,7 @@ int main(int argc, char** argv)
"Value of number of threads") "Value of number of threads")
("heap_size,b", po::value<std::size_t>(&heap_size)->default_value(8000), "Size of a heap") ("heap_size,b", po::value<std::size_t>(&heap_size)->default_value(8000), "Size of a heap")
("npol,p", po::value<std::size_t>(&npol)->default_value(2), "Size of a heap") ("npol,p", po::value<std::size_t>(&npol)->default_value(2), "Size of a heap")
("merge_type,t", po::value<std::string>(&mtype)->default_value("pfb"), "The merge type, either 'pfb' or 'pol'") ("merge_type,t", po::value<std::string>(&mtype)->default_value("pfb"), "The merge type, either 'pfb', '10' or '8'")
("log_level", po::value<std::string>() ("log_level", po::value<std::string>()
->default_value("info") ->default_value("info")
->notifier([](std::string level) ->notifier([](std::string level)
...@@ -94,7 +94,13 @@ int main(int argc, char** argv) ...@@ -94,7 +94,13 @@ int main(int argc, char** argv)
merger::Pipeline<decltype(merger), decltype(output)> pipeline(merger, output); merger::Pipeline<decltype(merger), decltype(output)> pipeline(merger, output);
DadaInputStream <decltype(pipeline)> input(input_key, log, pipeline); DadaInputStream <decltype(pipeline)> input(input_key, log, pipeline);
input.start(); input.start();
}else{ }else if (mtype == "10"){
merger::PolnMerger10to8 merger(npol, nthreads);
merger::Pipeline<decltype(merger), decltype(output)> pipeline(merger, output);
DadaInputStream <decltype(pipeline)> input(input_key, log, pipeline);
input.start();
}
else{
merger::PolnMerger merger(npol, nthreads); merger::PolnMerger merger(npol, nthreads);
merger::Pipeline<decltype(merger), decltype(output)> pipeline(merger, output); merger::Pipeline<decltype(merger), decltype(output)> pipeline(merger, output);
DadaInputStream <decltype(pipeline)> input(input_key, log, pipeline); DadaInputStream <decltype(pipeline)> input(input_key, log, pipeline);
......
...@@ -12,7 +12,8 @@ ...@@ -12,7 +12,8 @@
namespace psrdada_cpp { namespace psrdada_cpp {
namespace merger { namespace merger {
struct merge_conf{ // Configuration structure remains the same.
struct merge_conf {
std::size_t npol; std::size_t npol;
std::size_t nchunk; std::size_t nchunk;
std::size_t nthreads; std::size_t nthreads;
...@@ -22,34 +23,62 @@ struct merge_conf{ ...@@ -22,34 +23,62 @@ struct merge_conf{
} }
}; };
class PFBMerger // New common base class, which is polymorphic.
{ // Any merger class will inherit from this.
class Merger {
public:
virtual ~Merger() {}
virtual void process(char* idata, char* odata, std::size_t size) = 0;
};
//---------------------------------------------------------------------
// PFBMerger now inherits from Merger.
//---------------------------------------------------------------------
class PFBMerger : public Merger {
public: public:
PFBMerger(std::size_t nchunk, std::size_t nthreads, std::size_t heap_size); PFBMerger(std::size_t nchunk, std::size_t nthreads, std::size_t heap_size);
PFBMerger(merge_conf conf); PFBMerger(merge_conf conf);
~PFBMerger(); virtual ~PFBMerger();
void process(char* idata, char* odata, std::size_t size); virtual void process(char* idata, char* odata, std::size_t size) override;
private: private:
std::size_t _nchunk; std::size_t _nchunk;
std::size_t _nthreads; std::size_t _nthreads;
std::size_t _heap_size; std::size_t _heap_size;
}; };
class PolnMerger //---------------------------------------------------------------------
{ // PolnMerger now inherits from Merger.
//---------------------------------------------------------------------
class PolnMerger : public Merger {
public: public:
PolnMerger(std::size_t npol, std::size_t nthreads); PolnMerger(std::size_t npol, std::size_t nthreads);
PolnMerger(merge_conf conf); PolnMerger(merge_conf conf);
~PolnMerger(); virtual ~PolnMerger();
virtual void process(char* idata, char* odata, std::size_t size) override;
private:
std::size_t _npol;
std::size_t _nthreads;
};
//---------------------------------------------------------------------
// PolnMerger10to8 now inherits from Merger.
//---------------------------------------------------------------------
class PolnMerger10to8 : public Merger {
public:
PolnMerger10to8(std::size_t npol, std::size_t nthreads);
PolnMerger10to8(merge_conf conf);
virtual ~PolnMerger10to8();
void process(char* idata, char* odata, std::size_t size); virtual void process(char* idata, char* odata, std::size_t size) override;
private: private:
std::size_t _npol; std::size_t _npol;
std::size_t _nthreads; std::size_t _nthreads;
}; };
} } // namespace merger
} } // namespace psrdada_cpp
\ No newline at end of file
#include "psrdada_cpp/merger/merger.hpp" #include "psrdada_cpp/merger/merger.hpp"
#include <cstdint> #include <cstdint>
#include <cstring>
#include <stdexcept>
#include <vector>
#include <omp.h>
#include <immintrin.h>
#include <arpa/inet.h> // for be64toh
// Anonymous namespace for helper routines needed for 10-bit unpacking.
namespace {
// Interleave two 32-bit values using SSE instructions.
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
// Unpacks 32 10-bit values from 5 packed 64-bit words.
// The input pointer 'qword' advances by 5 words on output.
uint64_t* unpack5(uint64_t* qword, uint8_t* out)
{
uint64_t val, rest;
// 1st word
val = be64toh(*qword);
qword++;
out[0] = (int8_t)(((int64_t)(( 0xFFC0000000000000ULL & val) << 0) >> 54) >> 2);
out[1] = (int8_t)(((int64_t)(( 0x003FF00000000000ULL & val) << 10) >> 54) >> 2);
out[2] = (int8_t)(((int64_t)(( 0x00000FFC00000000ULL & val) << 20) >> 54) >> 2);
out[3] = (int8_t)(((int64_t)(( 0x00000003FF000000ULL & val) << 30) >> 54) >> 2);
out[4] = (int8_t)(((int64_t)(( 0x0000000000FFC000ULL & val) << 40) >> 54) >> 2);
out[5] = (int8_t)(((int64_t)(( 0x0000000000003FF0ULL & val) << 50) >> 54) >> 2);
rest = ( 0x000000000000000FULL & val) << 60; // 4 bits rest.
// 2nd word
val = be64toh(*qword);
qword++;
out[6] = (int8_t)(((int64_t)(((0xFC00000000000000ULL & val) >> 4) | rest) >> 54) >> 2);
out[7] = (int8_t)(((int64_t)(( 0x03FF000000000000ULL & val) << 6) >> 54) >> 2);
out[8] = (int8_t)(((int64_t)(( 0x0000FFC000000000ULL & val) << 16) >> 54) >> 2);
out[9] = (int8_t)(((int64_t)(( 0x0000003FF0000000ULL & val) << 26) >> 54) >> 2);
out[10] = (int8_t)(((int64_t)(( 0x000000000FFC0000ULL & val) << 36) >> 54) >> 2);
out[11] = (int8_t)(((int64_t)(( 0x000000000003FF00ULL & val) << 46) >> 54) >> 2);
rest = ( 0x00000000000000FFULL & val) << 56; // 8 bits rest.
// 3rd word
val = be64toh(*qword);
qword++;
out[12] = (int8_t)(((int64_t)(((0xC000000000000000ULL & val) >> 8) | rest) >> 54) >> 2);
out[13] = (int8_t)(((int64_t)(( 0x3FF0000000000000ULL & val) << 2) >> 54) >> 2);
out[14] = (int8_t)(((int64_t)(( 0x000FFC0000000000ULL & val) << 12) >> 54) >> 2);
out[15] = (int8_t)(((int64_t)(( 0x000003FF00000000ULL & val) << 22) >> 54) >> 2);
out[16] = (int8_t)(((int64_t)(( 0x00000000FFC00000ULL & val) << 32) >> 54) >> 2);
out[17] = (int8_t)(((int64_t)(( 0x00000000003FF000ULL & val) << 42) >> 54) >> 2);
out[18] = (int8_t)(((int64_t)(( 0x0000000000000FFCULL & val) << 52) >> 54) >> 2);
rest = ( 0x0000000000000003ULL & val) << 62; // 2 bits rest.
// 4th word
val = be64toh(*qword);
qword++;
out[19] = (int8_t)(((int64_t)(((0xFF00000000000000ULL & val) >> 2) | rest) >> 54) >> 2);
out[20] = (int8_t)(((int64_t)(( 0x00FFC00000000000ULL & val) << 8) >> 54) >> 2);
out[21] = (int8_t)(((int64_t)(( 0x00003FF000000000ULL & val) << 18) >> 54) >> 2);
out[22] = (int8_t)(((int64_t)(( 0x0000000FFC000000ULL & val) << 28) >> 54) >> 2);
out[23] = (int8_t)(((int64_t)(( 0x0000000003FF0000ULL & val) << 38) >> 54) >> 2);
out[24] = (int8_t)(((int64_t)(( 0x000000000000FFC0ULL & val) << 48) >> 54) >> 2);
rest = ( 0x000000000000003FULL & val) << 58; // 6 bits rest.
// 5th word
val = be64toh(*qword);
qword++;
out[25] = (int8_t)(((int64_t)(((0xF000000000000000ULL & val) >> 6) | rest) >> 54) >> 2);
out[26] = (int8_t)(((int64_t)(( 0x0FFC000000000000ULL & val) << 4) >> 54) >> 2);
out[27] = (int8_t)(((int64_t)(( 0x0003FF0000000000ULL & val) << 14) >> 54) >> 2);
out[28] = (int8_t)(((int64_t)(( 0x000000FFC0000000ULL & val) << 24) >> 54) >> 2);
out[29] = (int8_t)(((int64_t)(( 0x000000003FF00000ULL & val) << 34) >> 54) >> 2);
out[30] = (int8_t)(((int64_t)(( 0x00000000000FFC00ULL & val) << 44) >> 54) >> 2);
out[31] = (int8_t)(((int64_t)(( 0x00000000000003FFULL & val) << 54) >> 54) >> 2);
return qword;
}
} // end anonymous namespace
namespace psrdada_cpp { namespace psrdada_cpp {
namespace merger { namespace merger {
//====================================================================
// PFBMerger (unchanged from before)
//====================================================================
PFBMerger::PFBMerger(std::size_t nchunk, std::size_t nthreads, std::size_t heap_size) PFBMerger::PFBMerger(std::size_t nchunk, std::size_t nthreads, std::size_t heap_size)
: _nchunk(nchunk), _nthreads(nthreads), _heap_size(heap_size){ : _nchunk(nchunk), _nthreads(nthreads), _heap_size(heap_size){
} }
PFBMerger::PFBMerger(merge_conf conf) PFBMerger::PFBMerger(merge_conf conf)
: _nchunk(conf.nchunk), _nthreads(conf.nthreads), _heap_size(conf.heap_size){ : _nchunk(conf.nchunk), _nthreads(conf.nthreads), _heap_size(conf.heap_size){
} }
PFBMerger::~PFBMerger(){ PFBMerger::~PFBMerger(){
} }
void PFBMerger::process(char* idata, char* odata, std::size_t size) void PFBMerger::process(char* idata, char* odata, std::size_t size)
{ {
const std::size_t bytes_per_chunk = 4; const std::size_t bytes_per_chunk = 4;
const std::size_t heap_group = _heap_size * _nchunk; const std::size_t heap_group = _heap_size * _nchunk;
const std::size_t num_chunks = size / heap_group; const std::size_t num_chunks = size / heap_group;
if (size % heap_group != 0){ if (size % heap_group != 0) {
throw std::runtime_error("Size is " + std::to_string(size) + " and not a multiple of heap group size " + std::to_string(heap_group)); throw std::runtime_error("Size is " + std::to_string(size) +
" and not a multiple of heap group size " +
std::to_string(heap_group));
} }
#pragma omp parallel for num_threads(_nthreads) #pragma omp parallel for num_threads(_nthreads)
for (std::size_t xx = 0; xx < num_chunks; ++xx) for (std::size_t xx = 0; xx < num_chunks; ++xx)
...@@ -45,11 +132,13 @@ void PFBMerger::process(char* idata, char* odata, std::size_t size) ...@@ -45,11 +132,13 @@ void PFBMerger::process(char* idata, char* odata, std::size_t size)
} }
} }
//====================================================================
// PolnMerger: Original 8-bit merger (unchanged)
//====================================================================
PolnMerger::PolnMerger(std::size_t npol, std::size_t nthreads) PolnMerger::PolnMerger(std::size_t npol, std::size_t nthreads)
: _npol(npol), _nthreads(nthreads){ : _npol(npol), _nthreads(nthreads){
} }
PolnMerger::PolnMerger(merge_conf conf) PolnMerger::PolnMerger(merge_conf conf)
: _npol(conf.npol), _nthreads(conf.nthreads){ : _npol(conf.npol), _nthreads(conf.nthreads){
} }
...@@ -59,31 +148,91 @@ PolnMerger::~PolnMerger(){ ...@@ -59,31 +148,91 @@ PolnMerger::~PolnMerger(){
void PolnMerger::process(char* idata, char* odata, std::size_t size) void PolnMerger::process(char* idata, char* odata, std::size_t size)
{ {
const std::size_t _heap_size = 4096; const std::size_t heap_size = 4096; // 8-bit input: each heap has 4096 bytes per polarization.
#pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads) #pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads)
for (std::size_t kk = 0; kk < size / _heap_size / _npol ; ++kk) for (std::size_t kk = 0; kk < size / heap_size / _npol; ++kk)
{ {
char *buffer = idata + _heap_size * _npol * kk; char *buffer = idata + heap_size * _npol * kk;
uint8_t const* qword0 = reinterpret_cast<uint8_t const*>(buffer); uint8_t const* qword0 = reinterpret_cast<uint8_t const*>(buffer);
uint8_t const* qword1 = reinterpret_cast<uint8_t const*>(buffer) + _heap_size; uint8_t const* qword1 = reinterpret_cast<uint8_t const*>(buffer) + heap_size;
uint64_t* tmp = reinterpret_cast<uint64_t*>(odata + kk * _npol * _heap_size); uint64_t* tmp = reinterpret_cast<uint64_t*>(odata + kk * _npol * heap_size);
__m128i xvec, yvec, interleaved; __m128i xvec, yvec, interleaved;
for (size_t i = 0; i < _heap_size / sizeof(__m128i); ++i) for (std::size_t i = 0; i < heap_size / sizeof(__m128i); ++i)
{ {
xvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword0)); xvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword0));
yvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword1)); yvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword1));
interleaved = _mm_unpacklo_epi8(xvec, yvec); interleaved = _mm_unpacklo_epi8(xvec, yvec);
_mm_storeu_si128(reinterpret_cast<__m128i*>(tmp), interleaved); _mm_storeu_si128(reinterpret_cast<__m128i*>(tmp), interleaved);
tmp+=2; tmp += 2;
interleaved = _mm_unpackhi_epi8(xvec, yvec); interleaved = _mm_unpackhi_epi8(xvec, yvec);
_mm_storeu_si128(reinterpret_cast<__m128i*>(tmp), interleaved); _mm_storeu_si128(reinterpret_cast<__m128i*>(tmp), interleaved);
tmp+=2; tmp += 2;
qword0 += 16; qword0 += 16;
qword1 += 16; qword1 += 16;
} }
} }
} }
//====================================================================
// PolnMerger10to8: New method for 10-bit to 8-bit conversion.
//====================================================================
PolnMerger10to8::PolnMerger10to8(std::size_t npol, std::size_t nthreads)
: _npol(npol), _nthreads(nthreads){
} }
PolnMerger10to8::PolnMerger10to8(merge_conf conf)
: _npol(conf.npol), _nthreads(conf.nthreads){
} }
PolnMerger10to8::~PolnMerger10to8(){
}
void PolnMerger10to8::process(char* idata, char* odata, std::size_t size)
{
// For 10-bit data, each heap contains 4096 samples per polarization.
// 4096 samples x 10 bits = 40960 bits = 5120 bytes per polarization.
const std::size_t heap_samples = 4096; // number of samples per polarization (after conversion)
const std::size_t heap_size_in = 640 * sizeof(uint64_t); // 640 64-bit words = 5120 bytes per polarization
const std::size_t num_heaps = size / (heap_size_in * _npol);
if (size % (heap_size_in * _npol) != 0) {
throw std::runtime_error("Size is " + std::to_string(size) +
" and not a multiple of the expected input heap size " +
std::to_string(heap_size_in * _npol));
}
#pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads)
for (std::size_t kk = 0; kk < num_heaps; ++kk)
{
// Each heap for _npol polarizations is stored consecutively.
char *buffer = idata + (heap_size_in * _npol) * kk;
// The output buffer is assumed to contain 8-bit samples with the same sample count.
char *out_ptr = odata + (heap_samples * _npol) * kk;
// For the 10-bit unpacker, this code currently assumes _npol == 2.
uint64_t* qword0 = reinterpret_cast<uint64_t*>(buffer);
uint64_t* qword1 = reinterpret_cast<uint64_t*>(buffer + heap_size_in);
uint64_t* D = reinterpret_cast<uint64_t*>(out_ptr);
// Temporary buffers to hold 32 unpacked bytes (representing eight 32-bit values).
uint8_t S0_8bit[32];
uint8_t S1_8bit[32];
// There are 640/5 = 128 iterations per heap.
for (int i = 0; i < 128; i++)
{
qword0 = unpack5(qword0, S0_8bit);
qword1 = unpack5(qword1, S1_8bit);
// Reinterpret the temporary buffers as arrays of eight 32-bit integers.
uint32_t* S0 = reinterpret_cast<uint32_t*>(S0_8bit);
uint32_t* S1 = reinterpret_cast<uint32_t*>(S1_8bit);
for (std::size_t ii = 0; ii < 8; ++ii)
{
*D++ = interleave(S1[ii], S0[ii]);
}
}
}
}
} // namespace merger
} // namespace psrdada_cpp
...@@ -44,20 +44,53 @@ bool Pipeline<MergerType, Handler>::operator()(RawBytes& block) ...@@ -44,20 +44,53 @@ bool Pipeline<MergerType, Handler>::operator()(RawBytes& block)
{ {
RawBytes& oblock = _handler.data_stream().next(); RawBytes& oblock = _handler.data_stream().next();
BOOST_LOG_TRIVIAL(debug) << "block.used_bytes: " << block.used_bytes() BOOST_LOG_TRIVIAL(debug) << "block.used_bytes: " << block.used_bytes()
<< " oblock.used_bytes " << oblock.used_bytes(); << " oblock.used_bytes " << oblock.used_bytes();
if (block.used_bytes() > oblock.total_bytes())
// By default, expected output size equals input size (for 8-bit mode)
std::size_t expected_output = block.used_bytes();
// Check if we're in 10-to-8 conversion mode.
if (dynamic_cast<psrdada_cpp::merger::PolnMerger10to8*>(&_merger) != nullptr)
{
BOOST_LOG_TRIVIAL(debug) << "10-to-8 merger mode detected.";
// Define conversion parameters.
const std::size_t heap_size_in = 640 * sizeof(uint64_t); // 5120 bytes per polarization input heap
const std::size_t heap_samples = 4096; // 4096 bytes per polarization output
const std::size_t npol = 2; // Adjust if using a different number of polarizations or via a getter
BOOST_LOG_TRIVIAL(debug) << "Conversion parameters: "
<< "heap_size_in = " << heap_size_in
<< ", heap_samples = " << heap_samples
<< ", npol = " << npol;
// Compute the number of complete heaps in the input buffer.
std::size_t num_heaps = block.used_bytes() / (heap_size_in * npol);
BOOST_LOG_TRIVIAL(debug) << "block.used_bytes() = " << block.used_bytes()
<< ", computed num_heaps = " << num_heaps;
// Calculate the expected output size.
expected_output = num_heaps * (heap_samples * npol);
BOOST_LOG_TRIVIAL(debug) << "Expected output size = " << expected_output
<< " bytes based on " << num_heaps << " heaps.";
}
// Verify that the output buffer is large enough to hold the expected output.
if (expected_output > oblock.total_bytes())
{ {
_handler.data_stream().release(); // <- Does the release makes sense here? _handler.data_stream().release();
throw std::runtime_error("Output DADA buffer does not match with the input dada buffer"); throw std::runtime_error("Output DADA buffer does not have enough space for the conversion");
} }
// Run the merger process.
_merger.process(block.ptr(), oblock.ptr(), block.used_bytes()); _merger.process(block.ptr(), oblock.ptr(), block.used_bytes());
oblock.used_bytes(block.used_bytes()); // Set the actual output size.
oblock.used_bytes(expected_output);
_handler.data_stream().release(); _handler.data_stream().release();
return false; return false;
} }
}//merger }//merger
}//psrdada_cpp }//psrdada_cpp
...@@ -97,19 +97,19 @@ std::size_t PolnMergeTester::getBufferSize() { ...@@ -97,19 +97,19 @@ std::size_t PolnMergeTester::getBufferSize() {
std::vector<char> PolnMergeTester::generateTestVector() std::vector<char> PolnMergeTester::generateTestVector()
{ {
std::vector<char> test_vector(getBufferSize()); std::vector<char> test_vector(getBufferSize());
for (std::size_t ii = 0; ii < params.nheap_groups; ii++) for (std::size_t ii = 0; ii < params.nheap_groups; ii++)
{ {
for (std::size_t jj = 0; jj < params.npol; jj++) for (std::size_t jj = 0; jj < params.npol; jj++)
{ {
for (std::size_t kk = 0; kk < params.nsamps_per_heaps; kk++) for (std::size_t kk = 0; kk < params.nsamps_per_heaps; kk++)
{ {
std::size_t idx = ii * params.npol * params.nsamps_per_heaps std::size_t idx = ii * params.npol * params.nsamps_per_heaps
+ jj * params.nsamps_per_heaps + kk; + jj * params.nsamps_per_heaps + kk;
test_vector[idx] = (ii * 2 + jj) % 128; test_vector[idx] = (ii * 2 + jj) % 128;
} }
} }
} }
return test_vector; return test_vector;
} }
......
...@@ -43,4 +43,4 @@ INSTANTIATE_TEST_SUITE_P(ParameterizedTest, ProvideParamsForPipelineTester, ::te ...@@ -43,4 +43,4 @@ INSTANTIATE_TEST_SUITE_P(ParameterizedTest, ProvideParamsForPipelineTester, ::te
} }
} }
} }
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment