Commit dd0f2b58 authored by root's avatar root
Browse files

EDDRoach_merge.cpp

parent 13dc459c
......@@ -73,501 +73,32 @@ namespace edd {
bool EDDRoach_merge::operator()(RawBytes& block)
{
// std:size_t nheap_groups = block.used_bytes()/_npol/_nsamps_per_heap;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
BOOST_LOG_TRIVIAL(info) << "nchucnk "<< _nchunck << "\n";
RawBytes& oblock = _writer.data_stream().next();
if (_nchunck == 2 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
}
}
if (_nchunck == 0 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = 2;
std::size_t t_packet = 256;
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
const char *s0 = block.ptr() + xx * nbands * heap_size;
const char *s1 = block.ptr() + xx * nbands * heap_size + heap_size;//that's size of a heap
const char *target = oblock.ptr() + xx * nbands * heap_size;
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
}
//s0 += (nbands-1) * heap_size;
//s1 += (nbands-1) * heap_size;
}
}
if (_nchunck == 8 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = 8;
std::size_t t_packet = 256;
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
const char *s0 = block.ptr() + xx * nbands * heap_size;
const char *s1 = block.ptr() + xx * nbands * heap_size + heap_size;//that's size of a heap
const char *s2 = block.ptr() + xx * nbands * heap_size + 2*heap_size;
const char *s3 = block.ptr() + xx * nbands * heap_size + 3*heap_size;
const char *s4 = block.ptr() + xx * nbands * heap_size + 4*heap_size;
const char *s5 = block.ptr() + xx * nbands * heap_size + 5*heap_size;
const char *s6 = block.ptr() + xx * nbands * heap_size + 6*heap_size;
const char *s7 = block.ptr() + xx * nbands * heap_size + 7*heap_size;
const char *target = oblock.ptr() + xx * nbands * heap_size;
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s5, bytes_per_chunk);
s5 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s6, bytes_per_chunk);
s6 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s7, bytes_per_chunk);
s7 += bytes_per_chunk;
target += bytes_per_chunk;
}
//s0 += (nbands-1) * heap_size;
//s1 += (nbands-1) * heap_size;
}
}
if (_nchunck==3 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
for (std::size_t xx=0; xx< block.used_bytes()/_nchunck/heap_size; xx++)
{
std::vector<char*> ptrs(_nchunck);
for (std::size_t ii=0; ii<_nchunck; ++ii)
{
ptrs[ii] = block.ptr() + xx * nbands * heap_size + ii * heap_size;
}
const char *target = oblock.ptr() + xx * nbands * heap_size;
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
for (std::size_t ii=0; ii<_nchunck; ++ii)
{
std::memcpy((void*)target, (void*)ptrs[ii], bytes_per_chunk);
ptrs[ii] += bytes_per_chunk;
target += bytes_per_chunk;
}
}
}
}
if (_nchunck==4 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
}
}
if (_nchunck==5 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *s4 = block.ptr() + 4*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
s4 += (nbands-1) * heap_size;
}
}
if (_nchunck==6 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *s4 = block.ptr() + 4*heap_size;
const char *s5 = block.ptr() + 5*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s5, bytes_per_chunk);
s5 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
s4 += (nbands-1) * heap_size;
s5 += (nbands-1) * heap_size;
}
}
if (_nchunck ==7)
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *s4 = block.ptr() + 4*heap_size;
const char *s5 = block.ptr() + 5*heap_size;
const char *s6 = block.ptr() + 6*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s5, bytes_per_chunk);
s5 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s6, bytes_per_chunk);
s6 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
s4 += (nbands-1) * heap_size;
s5 += (nbands-1) * heap_size;
s6 += (nbands-1) * heap_size;
}
}
/**
if (_nchunck==8 )
{
//RawBytes& oblock = _writer.data_stream().next();
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *s4 = block.ptr() + 4*heap_size;
const char *s5 = block.ptr() + 5*heap_size;
const char *s6 = block.ptr() + 6*heap_size;
const char *s7 = block.ptr() + 7*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s5, bytes_per_chunk);
s5 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s6, bytes_per_chunk);
s6 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s7, bytes_per_chunk);
s7 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
s4 += (nbands-1) * heap_size;
s5 += (nbands-1) * heap_size;
s6 += (nbands-1) * heap_size;
s7 += (nbands-1) * heap_size;
}
}
**/
/**
RawBytes& oblock = _writer.data_stream().next();
if (_nchunck = 2)
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *target = oblock.ptr();
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += heap_size;
s1 += heap_size;
}
}
if (_nchunck = 3)
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;//that's size of a heap
const char *target = oblock.ptr();
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += heap_size;
s1 += heap_size;
s2 += heap_size;
}
}
if (_nchunck = 4)
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;//that's size of a heap
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *target = oblock.ptr();
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += heap_size;
s1 += heap_size;
s2 += heap_size;
s3 += heap_size;
}
}
**/
//std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment