Commit b3619f8d authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Merge branch 'LEAP' into 'devel'

Updating leap mode disk writing, leap mode folding and MPG-SKA dish 10 bit merger

See merge request mpifr-bdg/psrdada_cpp!3
parents 9a7fc42e 71fc285b
......@@ -74,9 +74,9 @@ target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES})
add_executable(dbreset examples/dbreset.cpp)
target_link_libraries (dbreset ${PSRDADA_CPP_LIBRARIES})
#dbisk
#dbdisk
#add_executable(dbdisk examples/dbdisk.cpp)
#target_link_libraries (dbisk ${PSRDADA_CPP_LIBRARIES})
#target_link_libraries (dbdisk ${PSRDADA_CPP_LIBRARIES})
install (TARGETS junkdb dbnull syncdb dbreset fbfuse_output_db DESTINATION bin)
install (TARGETS ${CMAKE_PROJECT_NAME}
......
......@@ -13,8 +13,11 @@ set(psrdada_cpp_effelsberg_edd_src
src/DetectorAccumulator.cu
src/GatedSpectrometer.cu
src/EDDPolnMerge.cpp
src/EDDPolnMerge10to8.cpp
src/EDDRoach.cpp
src/EDDRoach_merge.cpp
src/EDDRoach_merge_leap.cpp
src/dada_disk_sink_leap.cpp
src/ScaledTransposeTFtoTFT.cu
src/SKRfiReplacementCuda.cu
src/SpectralKurtosisCuda.cu
......@@ -28,8 +31,11 @@ set(psrdada_cpp_effelsberg_edd_inc
DadaBufferLayout.hpp
DetectorAccumulator.cuh
EDDPolnMerge.hpp
EDDPolnMerge10to8.hpp
EDDRoach.hpp
EDDRoach_merge.hpp
EDDRoach_merge_leap.hpp
dada_disk_sink_leap.hpp
FftSpectrometer.cuh
GatedSpectrometer.cuh
Packer.cuh
......@@ -48,8 +54,6 @@ install (TARGETS ${CMAKE_PROJECT_NAME}_effelsberg_edd
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
#simple FFT spectrometer interface
cuda_add_executable(fft_spectrometer src/fft_spectrometer_cli.cu)
target_link_libraries(fft_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
......@@ -76,6 +80,10 @@ add_executable(edd_merge src/EDDPolnMerge_cli.cpp)
target_link_libraries(edd_merge ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_merge DESTINATION bin)
add_executable(edd_merge_10to8 src/EDDPolnMerge10to8_cli.cpp)
target_link_libraries(edd_merge_10to8 ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_merge_10to8 DESTINATION bin)
add_executable(edd_roach src/EDDRoach_cli.cpp)
target_link_libraries(edd_roach ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach DESTINATION bin)
......@@ -84,5 +92,14 @@ add_executable(edd_roach_merge src/EDDRoach_merge_cli.cpp)
target_link_libraries(edd_roach_merge ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach_merge DESTINATION bin)
add_executable(edd_roach_merge_leap src/EDDRoach_merge_leap_cli.cpp)
target_link_libraries(edd_roach_merge_leap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach_merge_leap DESTINATION bin)
#dbdiskleap
add_executable(dbdiskleap src/dbdisk_leap.cpp)
target_link_libraries (dbdiskleap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS dbdiskleap DESTINATION bin)
add_subdirectory(test)
endif(ENABLE_CUDA)
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#define HEAP_SIZE 4096
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
......@@ -12,7 +13,7 @@ namespace edd {
class EDDPolnMerge
{
public:
EDDPolnMerge(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer);
EDDPolnMerge(std::size_t npol, int nthreads, DadaWriteClient& writer);
~EDDPolnMerge();
/**
......@@ -37,8 +38,8 @@ public:
bool operator()(RawBytes& block);
private:
std::size_t _nsamps_per_heap;
std::size_t _npol;
std::size_t _npol;
int _nthreads;
DadaWriteClient& _writer;
};
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_HPP
#define HEAP_SIZE_10BIT 5120
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class EDDPolnMerge10to8
{
public:
EDDPolnMerge10to8(std::size_t nsamps_per_heap, std::size_t npol, int nthreads, DadaWriteClient& writer);
~EDDPolnMerge10to8();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
std::size_t _nsamps_per_heap;
std::size_t _npol;
int _nthreads;
DadaWriteClient& _writer;
};
} // edd
} // effelsberg
} // psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_HPP
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#define HEAP_SIZE 262144
#define BYTES_PER_CHUNK 32
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
......@@ -12,7 +14,7 @@ namespace edd {
class EDDRoach_merge
{
public:
EDDRoach_merge(std::size_t nsamps_per_heap, std::size_t nchunck, DadaWriteClient& writer);
EDDRoach_merge(std::size_t nchunck, int nthreads, DadaWriteClient& writer);
~EDDRoach_merge();
/**
......@@ -37,8 +39,8 @@ public:
bool operator()(RawBytes& block);
private:
std::size_t _nsamps_per_heap;
std::size_t _nchunck;
std::size_t _nchunck;
int _nthreads;
DadaWriteClient& _writer;
};
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDROACH_MERGE_LEAP_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDROACH_MERGE_LEAP_HPP
#define HEAP_SIZE 32000
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class EDDRoach_merge_leap
{
public:
EDDRoach_merge_leap(std::size_t nchunck, int nthreads, DadaWriteClient& writer);
~EDDRoach_merge_leap();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
std::size_t _nchunck;
int _nthreads;
DadaWriteClient& _writer;
};
} // edd
} // effelsberg
} // psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_EDDROACH_MERGE_LEAP_HPP
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_LEAP_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_LEAP_HPP
#define HEADER_SIZE 4096
#define START_TIME 1024
#define HEAP_SIZE 32000
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include <fstream>
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class DiskSinkLeap
{
public:
DiskSinkLeap(std::string prefix, int nchan);
~DiskSinkLeap();
void init(RawBytes& block);
bool operator()(RawBytes& block);
public:
std::string _prefix;
std::size_t _counter;
int _nchan;
char _header[HEADER_SIZE];
char _start_time[START_TIME];
bool first_block;
std::vector<char> _transpose;
std::vector<std::ofstream> _output_streams;
};
} // edd
} // effelsberg
} //namespace psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_LEAP_HPP
......@@ -9,108 +9,115 @@ namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
EDDPolnMerge::EDDPolnMerge(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer)
: _nsamps_per_heap(nsamps_per_heap)
, _npol(npol)
, _writer(writer)
void merge2pol(char const *buf, char *out)
{
uint8_t *qword0 = (uint8_t*)(buf);
uint8_t *qword1 = (uint8_t*)(buf) + HEAP_SIZE;
uint64_t* D = reinterpret_cast<uint64_t*>(out);
for (int i = 0; i < HEAP_SIZE / sizeof(uint32_t); i++)
{
uint32_t* S0 = reinterpret_cast<uint32_t*>(qword0);
uint32_t* S1 = reinterpret_cast<uint32_t*>(qword1);
*D++ = interleave(*S1++, *S0++);
qword0 += sizeof(uint32_t);
qword1 += sizeof(uint32_t);
}
}
EDDPolnMerge::~EDDPolnMerge()
{
}
EDDPolnMerge::EDDPolnMerge(std::size_t npol, int nthreads, DadaWriteClient& writer)
: _npol(npol)
, _nthreads(nthreads)
, _writer(writer)
{
}
EDDPolnMerge::~EDDPolnMerge()
{
}
void EDDPolnMerge::init(RawBytes& block)
void EDDPolnMerge::init(RawBytes& block)
{
RawBytes& oblock = _writer.header_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
RawBytes& oblock = _writer.header_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
_writer.header_stream().release();
throw std::runtime_error("Output DADA buffer does not have enough space for header");
}
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
char buffer[1024];
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL);
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start "<< sample_clock_start;
BOOST_LOG_TRIVIAL(debug)<< "this is sample_clock "<< sample_clock;
BOOST_LOG_TRIVIAL(debug) << "this is sync_time "<< sync_time;
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start / sample_clock "<< sample_clock_start / sample_clock;
long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = unix_time / 86400 - 40587.5;
char time_buffer[80];
std::time_t unix_time_int;
struct std::tm * timeinfo;
double fractpart, intpart;
fractpart = std::modf (static_cast<double>(unix_time) , &intpart);
unix_time_int = static_cast<std::time_t>(intpart);
timeinfo = std::gmtime (&unix_time_int);
std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo);
std::stringstream utc_time_stamp;
BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time;
BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart;
//BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
//utc_time_stamp<< time_buffer << "." <<fractpart;
utc_time_stamp<< time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart*10000000000) << std::setfill(' ');
//BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
//utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
BOOST_LOG_TRIVIAL(debug) << "this is start time in utc "<< utc_time_stamp.str().c_str()<< "\n";
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
throw std::runtime_error("Output DADA buffer does not have enough space for header");
}
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
char buffer[1024];
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL);
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start " << sample_clock_start;
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock " << sample_clock;
BOOST_LOG_TRIVIAL(debug) << "this is sync_time " << sync_time;
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start / sample_clock " << sample_clock_start / sample_clock;
long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = unix_time / 86400 - 40587.5;
char time_buffer[80];
std::time_t unix_time_int;
struct std::tm * timeinfo;
double fractpart, intpart;
fractpart = std::modf (static_cast<double>(unix_time) , &intpart);
unix_time_int = static_cast<std::time_t>(intpart);
timeinfo = std::gmtime (&unix_time_int);
std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo);
std::stringstream utc_time_stamp;
BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time;
BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart;
//BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
//utc_time_stamp<< time_buffer << "." <<fractpart;
utc_time_stamp << time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart * 10000000000) << std::setfill(' ');
//BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
//utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
BOOST_LOG_TRIVIAL(debug) << "this is start time in utc " << utc_time_stamp.str().c_str() << "\n";
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
}
bool EDDPolnMerge::operator()(RawBytes& block)
{
std:size_t nheap_groups = block.used_bytes()/_npol/_nsamps_per_heap;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
RawBytes& oblock = _writer.data_stream().next();
bool EDDPolnMerge::operator()(RawBytes& block)
{
std: size_t nheap_groups = block.used_bytes() / _npol / HEAP_SIZE;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
RawBytes& oblock = _writer.data_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
_writer.data_stream().release();
throw std::runtime_error("Output DADA buffer does not match with the input dada buffer");
}
uint32_t* S0 = reinterpret_cast<uint32_t*>(block.ptr());
uint32_t* S1 = reinterpret_cast<uint32_t*>(block.ptr() + _nsamps_per_heap);
uint64_t* D = reinterpret_cast<uint64_t*>(oblock.ptr());
for (std::size_t jj = 0; jj < nheap_groups; ++jj)
{
for (std::size_t ii = 0; ii < _nsamps_per_heap/sizeof(uint32_t); ++ii)
{
*D++ = interleave(*S1++, *S0++);
}
S0 += _nsamps_per_heap/sizeof(uint32_t);
S1 += _nsamps_per_heap/sizeof(uint32_t);
}
if (block.used_bytes() > oblock.total_bytes())
{
_writer.data_stream().release();
throw std::runtime_error("Output DADA buffer does not match with the input dada buffer");
}
oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
return false;
#pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads)
for (std::size_t kk = 0; kk < block.used_bytes() / HEAP_SIZE / _npol ; ++kk)
{
char *buffer = block.ptr() + HEAP_SIZE * _npol * kk;
merge2pol(buffer, oblock.ptr() + kk * _npol * HEAP_SIZE);
}
oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
return false;
}
}//edd
}//effelsberg
}//psrdada_cpp
......
#include "psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8.hpp"
#include "ascii_header.h"
#include <immintrin.h>
#include <time.h>
#include <iomanip>
#include <cmath>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
uint64_t *unpack5(uint64_t *qword, uint8_t *out)
{
uint64_t val, rest;
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[0] = ((int64_t)(( 0xFFC0000000000000 & val) << 0) >> 54) & 0xFF;
out[1] = ((int64_t)(( 0x003FF00000000000 & val) << 10) >> 54) & 0xFF;
out[2] = ((int64_t)(( 0x00000FFC00000000 & val) << 20) >> 54) & 0xFF;
out[3] = ((int64_t)(( 0x00000003FF000000 & val) << 30) >> 54) & 0xFF;
out[4] = ((int64_t)(( 0x0000000000FFC000 & val) << 40) >> 54) & 0xFF;
out[5] = ((int64_t)(( 0x0000000000003FF0 & val) << 50) >> 54) & 0xFF;
rest = ( 0x000000000000000F & val) << 60; // 4 bits rest.
// 2nd:
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[6] = ((int64_t)(((0xFC00000000000000 & val) >> 4) | rest) >> 54) & 0xFF;
out[7] = ((int64_t)(( 0x03FF000000000000 & val) << 6) >> 54) & 0xFF;
out[8] = ((int64_t)(( 0x0000FFC000000000 & val) << 16) >> 54) & 0xFF;
out[9] = ((int64_t)(( 0x0000003FF0000000 & val) << 26) >> 54) & 0xFF;
out[10] = ((int64_t)(( 0x000000000FFC0000 & val) << 36) >> 54) & 0xFF;
out[11] = ((int64_t)(( 0x000000000003FF00 & val) << 46) >> 54) & 0xFF;
rest = ( 0x00000000000000FF & val) << 56; // 8 bits rest.
// 3rd:
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[12] = ((int64_t)(((0xC000000000000000 & val) >> 8) | rest) >> 54) & 0xFF;
out[13] = ((int64_t)(( 0x3FF0000000000000 & val) << 2) >> 54) & 0xFF;
out[14] = ((int64_t)(( 0x000FFC0000000000 & val) << 12) >> 54) & 0xFF;
out[15] = ((int64_t)(( 0x000003FF00000000 & val) << 22) >> 54) & 0xFF;
out[16] = ((int64_t)(( 0x00000000FFC00000 & val) << 32) >> 54) & 0xFF;
out[17] = ((int64_t)(( 0x00000000003FF000 & val) << 42) >> 54) & 0xFF;
out[18] = ((int64_t)(( 0x0000000000000FFC & val) << 52) >> 54) & 0xFF;
rest = ( 0x0000000000000003 & val) << 62; // 2 bits rest.
// 4th:
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[19] = ((int64_t)(((0xFF00000000000000 & val) >> 2) | rest) >> 54) & 0xFF;
out[20] = ((int64_t)(( 0x00FFC00000000000 & val) << 8) >> 54) & 0xFF;
out[21] = ((int64_t)(( 0x00003FF000000000 & val) << 18) >> 54) & 0xFF;
out[22] = ((int64_t)(( 0x0000000FFC000000 & val) << 28) >> 54) & 0xFF;
out[23] = ((int64_t)(( 0x0000000003FF0000 & val) << 38) >> 54) & 0xFF;
out[24] = ((int64_t)(( 0x000000000000FFC0 & val) << 48) >> 54) & 0xFF;
rest = ( 0x000000000000003F & val) << 58; // 6 bits rest.
// 5th:
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[25] = ((int64_t)(((0xF000000000000000 & val) >> 6) | rest) >> 54) & 0xFF;
out[26] = ((int64_t)(( 0x0FFC000000000000 & val) << 4) >> 54) & 0xFF;
out[27] = ((int64_t)(( 0x0003FF0000000000 & val) << 14) >> 54) & 0xFF;
out[28] = ((int64_t)(( 0x000000FFC0000000 & val) << 24) >> 54) & 0xFF;
out[29] = ((int64_t)(( 0x000000003FF00000 & val) << 34) >> 54) & 0xFF;
out[30] = ((int64_t)(( 0x00000000000FFC00 & val) << 44) >> 54) & 0xFF;
out[31] = ((int64_t)(( 0x00000000000003FF & val) << 54) >> 54) & 0xFF;
rest = 0; // No rest.
return qword;
}
void handle_packet_numbers_4096x10_s(char const *buf, char *out)
{ // Print 4096 numbers of 10 bit signed integers.
uint64_t val, rest;
uint8_t S0_8bit[32];
uint8_t S1_8bit[32];
uint64_t *qword0 = (uint64_t*)(buf);
uint64_t *qword1 = (uint64_t*)(buf) + 640;
uint64_t* D = reinterpret_cast<uint64_t*>(out);
for (int i = 0; i < 640 / 5; i++)
{
qword0 = unpack5(qword0, S0_8bit);
qword1 = unpack5(qword1, S1_8bit);
uint32_t* S0 = reinterpret_cast<uint32_t*>(S0_8bit);
uint32_t* S1 = reinterpret_cast<uint32_t*>(S1_8bit);
for (std::size_t ii = 0; ii < 8; ++ii)
{
*D++ = interleave(*S1++, *S0++);
}
}
}
EDDPolnMerge10to8::EDDPolnMerge10to8(std::size_t nsamps_per_heap, std::size_t npol, int nthreads, DadaWriteClient& writer)
: _nsamps_per_heap(nsamps_per_heap)
, _npol(npol)
, _nthreads(nthreads)
, _writer(writer)
{
}
EDDPolnMerge10to8::~EDDPolnMerge10to8()
{