Commit 13dc459c authored by root's avatar root
Browse files

EDDRoach_merge for combining roach2 PFB output

parent c873ca7d
......@@ -12,7 +12,7 @@ namespace edd {
class EDDRoach_merge
{
public:
EDDRoach_merge(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer);
EDDRoach_merge(std::size_t nsamps_per_heap, std::size_t nchunck, DadaWriteClient& writer);
~EDDRoach_merge();
/**
......@@ -38,7 +38,7 @@ public:
private:
std::size_t _nsamps_per_heap;
std::size_t _npol;
std::size_t _nchunck;
DadaWriteClient& _writer;
};
......
......@@ -14,9 +14,9 @@ namespace edd {
return _mm_cvtsi128_si64(interleaved);
}
EDDRoach_merge::EDDRoach_merge(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer)
EDDRoach_merge::EDDRoach_merge(std::size_t nsamps_per_heap, std::size_t nchunck, DadaWriteClient& writer)
: _nsamps_per_heap(nsamps_per_heap)
, _npol(npol)
, _nchunck(nchunck)
, _writer(writer)
{
}
......@@ -83,29 +83,491 @@ namespace edd {
return true;
}
**/
BOOST_LOG_TRIVIAL(info) << "nchucnk "<< _nchunck << "\n";
RawBytes& oblock = _writer.data_stream().next();
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = 2;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *target = oblock.ptr();
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
if (_nchunck == 2 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
}
}
if (_nchunck == 0 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = 2;
std::size_t t_packet = 256;
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
const char *s0 = block.ptr() + xx * nbands * heap_size;
const char *s1 = block.ptr() + xx * nbands * heap_size + heap_size;//that's size of a heap
const char *target = oblock.ptr() + xx * nbands * heap_size;
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
}
//s0 += (nbands-1) * heap_size;
//s1 += (nbands-1) * heap_size;
}
}
if (_nchunck == 8 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = 8;
std::size_t t_packet = 256;
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
const char *s0 = block.ptr() + xx * nbands * heap_size;
const char *s1 = block.ptr() + xx * nbands * heap_size + heap_size;//that's size of a heap
const char *s2 = block.ptr() + xx * nbands * heap_size + 2*heap_size;
const char *s3 = block.ptr() + xx * nbands * heap_size + 3*heap_size;
const char *s4 = block.ptr() + xx * nbands * heap_size + 4*heap_size;
const char *s5 = block.ptr() + xx * nbands * heap_size + 5*heap_size;
const char *s6 = block.ptr() + xx * nbands * heap_size + 6*heap_size;
const char *s7 = block.ptr() + xx * nbands * heap_size + 7*heap_size;
const char *target = oblock.ptr() + xx * nbands * heap_size;
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s5, bytes_per_chunk);
s5 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s6, bytes_per_chunk);
s6 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s7, bytes_per_chunk);
s7 += bytes_per_chunk;
target += bytes_per_chunk;
}
//s0 += (nbands-1) * heap_size;
//s1 += (nbands-1) * heap_size;
}
}
if (_nchunck==3 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
}
}
if (_nchunck==4 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
}
}
if (_nchunck==5 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *s4 = block.ptr() + 4*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
s4 += (nbands-1) * heap_size;
}
}
if (_nchunck==6 )
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *s4 = block.ptr() + 4*heap_size;
const char *s5 = block.ptr() + 5*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s5, bytes_per_chunk);
s5 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
s4 += (nbands-1) * heap_size;
s5 += (nbands-1) * heap_size;
}
}
if (_nchunck ==7)
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *s4 = block.ptr() + 4*heap_size;
const char *s5 = block.ptr() + 5*heap_size;
const char *s6 = block.ptr() + 6*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s5, bytes_per_chunk);
s5 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s6, bytes_per_chunk);
s6 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
s4 += (nbands-1) * heap_size;
s5 += (nbands-1) * heap_size;
s6 += (nbands-1) * heap_size;
}
}
/**
if (_nchunck==8 )
{
//RawBytes& oblock = _writer.data_stream().next();
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *s4 = block.ptr() + 4*heap_size;
const char *s5 = block.ptr() + 5*heap_size;
const char *s6 = block.ptr() + 6*heap_size;
const char *s7 = block.ptr() + 7*heap_size;
const char *target = oblock.ptr();
#pragma omp parallel for
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s4, bytes_per_chunk);
s4 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s5, bytes_per_chunk);
s5 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s6, bytes_per_chunk);
s6 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s7, bytes_per_chunk);
s7 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += (nbands-1) * heap_size;
s1 += (nbands-1) * heap_size;
s2 += (nbands-1) * heap_size;
s3 += (nbands-1) * heap_size;
s4 += (nbands-1) * heap_size;
s5 += (nbands-1) * heap_size;
s6 += (nbands-1) * heap_size;
s7 += (nbands-1) * heap_size;
}
}
**/
/**
RawBytes& oblock = _writer.data_stream().next();
if (_nchunck = 2)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += heap_size;
s1 += heap_size;
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *target = oblock.ptr();
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += heap_size;
s1 += heap_size;
}
}
if (_nchunck = 3)
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;//that's size of a heap
const char *target = oblock.ptr();
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += heap_size;
s1 += heap_size;
s2 += heap_size;
}
}
if (_nchunck = 4)
{
std::size_t bytes_per_chunk= 32;
std::size_t heap_size = 262144;
std::size_t nbands = _nchunck;
std::size_t t_packet = 256;
const char *s0 = block.ptr();
const char *s1 = block.ptr() + heap_size;//that's size of a heap
const char *s2 = block.ptr() + 2*heap_size;//that's size of a heap
const char *s3 = block.ptr() + 3*heap_size;//that's size of a heap
const char *target = oblock.ptr();
for (std::size_t xx=0; xx< block.used_bytes()/nbands/heap_size; xx++)
{
for (std::size_t yy=0; yy< heap_size/bytes_per_chunk; yy++)
{
std::memcpy((void*)target, (void*)s0, bytes_per_chunk);
s0 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s1, bytes_per_chunk);
s1 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s2, bytes_per_chunk);
s2 += bytes_per_chunk;
target += bytes_per_chunk;
std::memcpy((void*)target, (void*)s3, bytes_per_chunk);
s3 += bytes_per_chunk;
target += bytes_per_chunk;
}
s0 += heap_size;
s1 += heap_size;
s2 += heap_size;
s3 += heap_size;
}
}
**/
//std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
......
......@@ -23,7 +23,7 @@ int main(int argc, char** argv)
{
key_t input_key;
key_t output_key;
std::size_t npol;
std::size_t nchunck;
std::size_t nsamps_per_heap;
/** Define and parse the program options
......@@ -49,8 +49,8 @@ int main(int argc, char** argv)
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("npol,p", po::value<std::size_t>(&npol)->default_value(2),
"Value of number of pol")
("nchunck,p", po::value<std::size_t>(&nchunck)->default_value(2),
"number of incoming stream")
("nsamps_per_heap,n", po::value<std::size_t>(&nsamps_per_heap)->default_value(4096),
"Value of samples per heap")
......@@ -86,7 +86,7 @@ int main(int argc, char** argv)
*/
MultiLog log("edd::EDDRoach_merge");
DadaWriteClient output(output_key, log);
effelsberg::edd::EDDRoach_merge mod(nsamps_per_heap, npol, output);
effelsberg::edd::EDDRoach_merge mod(nsamps_per_heap, nchunck, output);
DadaInputStream <decltype(mod)> input(input_key, log, mod);
input.start();
/**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment