Skip to content
Snippets Groups Projects
Commit a346222c authored by Jason Wu's avatar Jason Wu
Browse files

refactor to improve performance

parent 42c0808e
Branches
Tags
1 merge request!25Alveo merge code heap size fix
Pipeline #160715 failed
...@@ -9,28 +9,32 @@ namespace psrdada_cpp { ...@@ -9,28 +9,32 @@ namespace psrdada_cpp {
namespace effelsberg { namespace effelsberg {
namespace edd { namespace edd {
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
void merge2pol(char const *buf, char *out) void merge2pol(char const *buf, char *out)
{ {
uint8_t *qword0 = (uint8_t*)(buf); uint8_t const* qword0 = reinterpret_cast<uint8_t const*>(buf);
uint8_t *qword1 = (uint8_t*)(buf) + HEAP_SIZE; uint8_t const* qword1 = reinterpret_cast<uint8_t const*>(buf) + HEAP_SIZE;
uint64_t* D = reinterpret_cast<uint64_t*>(out); uint64_t* D = reinterpret_cast<uint64_t*>(out);
for (size_t i = 0; i < HEAP_SIZE / sizeof(uint32_t); i++)
__m128i xvec, yvec, interleaved;
for (size_t i = 0; i < HEAP_SIZE / sizeof(uint32_t); i += 8)
{ {
uint32_t* S0 = reinterpret_cast<uint32_t*>(qword0); xvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword0));
uint32_t* S1 = reinterpret_cast<uint32_t*>(qword1); yvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword1));
*D++ = interleave(*S1++, *S0++); interleaved = _mm_unpacklo_epi8(yvec, xvec);
qword0 += sizeof(uint32_t); _mm_storeu_si128(reinterpret_cast<__m128i*>(D), interleaved);
qword1 += sizeof(uint32_t);
xvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword0 + 16));
yvec = _mm_loadu_si128(reinterpret_cast<__m128i const*>(qword1 + 16));
interleaved = _mm_unpacklo_epi8(yvec, xvec);
_mm_storeu_si128(reinterpret_cast<__m128i*>(D + 2), interleaved);
qword0 += 32;
qword1 += 32;
D += 4;
} }
} }
EDDPolnMerge::EDDPolnMerge(std::size_t npol, int nthreads, DadaWriteClient& writer) EDDPolnMerge::EDDPolnMerge(std::size_t npol, int nthreads, DadaWriteClient& writer)
: _npol(npol) : _npol(npol)
, _nthreads(nthreads) , _nthreads(nthreads)
......
...@@ -9,13 +9,6 @@ namespace psrdada_cpp { ...@@ -9,13 +9,6 @@ namespace psrdada_cpp {
namespace effelsberg { namespace effelsberg {
namespace edd { namespace edd {
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
EDDRoach_merge_leap::EDDRoach_merge_leap(std::size_t nchunck, int nthreads, int heap_size, DadaWriteClient& writer) EDDRoach_merge_leap::EDDRoach_merge_leap(std::size_t nchunck, int nthreads, int heap_size, DadaWriteClient& writer)
: _nchunck(nchunck) : _nchunck(nchunck)
, _heap_size(heap_size) , _heap_size(heap_size)
...@@ -28,7 +21,7 @@ EDDRoach_merge_leap::~EDDRoach_merge_leap() ...@@ -28,7 +21,7 @@ EDDRoach_merge_leap::~EDDRoach_merge_leap()
{ {
} }
void EDDRoach_merge_leap::init(RawBytes& block) void EDDRoach_merge_leap::init(RawBytes & block)
{ {
RawBytes& oblock = _writer.header_stream().next(); RawBytes& oblock = _writer.header_stream().next();
if (block.used_bytes() > oblock.total_bytes()) if (block.used_bytes() > oblock.total_bytes())
...@@ -37,47 +30,48 @@ void EDDRoach_merge_leap::init(RawBytes& block) ...@@ -37,47 +30,48 @@ void EDDRoach_merge_leap::init(RawBytes& block)
throw std::runtime_error("Output DADA buffer does not have enough space for header"); throw std::runtime_error("Output DADA buffer does not have enough space for header");
} }
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes()); std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
char buffer[1024]; char buffer[1024];
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer); ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0); std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer); ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL); long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer); ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL); long double sync_time = std::strtold(buffer, NULL);
long double unix_time = sync_time + (sample_clock_start / sample_clock); long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = (unix_time / 86400.0 ) + 40587; long double mjd_time = (unix_time / 86400.0 ) + 40587;
std::ostringstream mjd_start; std::ostringstream mjd_start;
mjd_start << std::fixed; mjd_start << std::fixed;
mjd_start << std::setprecision(12); mjd_start << std::setprecision(12);
mjd_start << mjd_time; mjd_start << mjd_time;
ascii_header_set(oblock.ptr(), "MJD_START", "%s", mjd_start.str().c_str()); ascii_header_set(oblock.ptr(), "MJD_START", "%s", mjd_start.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time); ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes()); oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release(); _writer.header_stream().release();
} }
bool EDDRoach_merge_leap::operator()(RawBytes & block)
bool EDDRoach_merge_leap::operator()(RawBytes& block)
{ {
BOOST_LOG_TRIVIAL(info) << "nchucnk " << _nchunck << "\n"; BOOST_LOG_TRIVIAL(info) << "nchucnk " << _nchunck << "\n";
RawBytes& oblock = _writer.data_stream().next(); RawBytes& oblock = _writer.data_stream().next();
std::size_t bytes_per_chunk = 4; const std::size_t bytes_per_chunk = 4;
std::size_t heap_group = _heap_size * _nchunck; const std::size_t heap_group = _heap_size * _nchunck;
const std::size_t num_chunks = block.used_bytes() / heap_group;
#pragma omp parallel for num_threads(_nthreads) #pragma omp parallel for num_threads(_nthreads)
for (std::size_t xx = 0; xx < block.used_bytes() / heap_group; xx++) for (std::size_t xx = 0; xx < num_chunks; ++xx)
{ {
std::vector<char*> ptrs(_nchunck); std::vector<const char*> chunk_ptrs(_nchunck);
for (std::size_t ii = 0; ii < _nchunck; ++ii) for (std::size_t ii = 0; ii < _nchunck; ++ii)
{ {
ptrs[ii] = block.ptr() + xx * heap_group + ii * heap_group / _nchunck; const std::size_t offset = xx * heap_group + ii * heap_group / _nchunck;
chunk_ptrs[ii] = block.ptr() + offset;
} }
const char *target = oblock.ptr() + xx * heap_group; const char *target = oblock.ptr() + xx * heap_group;
for (std::size_t yy = 0; yy < heap_group / _nchunck / bytes_per_chunk; yy++) for (std::size_t yy = 0; yy < heap_group / _nchunck / bytes_per_chunk; ++yy)
{ {
for (std::size_t ii = 0; ii < _nchunck; ++ii) for (std::size_t ii = 0; ii < _nchunck; ++ii)
{ {
std::memcpy((void*)target, (void*)ptrs[ii], bytes_per_chunk); std::memcpy((void*)target, chunk_ptrs[ii], bytes_per_chunk);
ptrs[ii] += bytes_per_chunk; chunk_ptrs[ii] += bytes_per_chunk;
target += bytes_per_chunk; target += bytes_per_chunk;
} }
} }
...@@ -86,6 +80,7 @@ bool EDDRoach_merge_leap::operator()(RawBytes& block) ...@@ -86,6 +80,7 @@ bool EDDRoach_merge_leap::operator()(RawBytes& block)
_writer.data_stream().release(); _writer.data_stream().release();
return false; return false;
} }
}//edd }//edd
}//effelsberg }//effelsberg
}//psrdada_cpp }//psrdada_cpp
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment