Skip to content
Snippets Groups Projects
Commit 8ad6400a authored by Jason Wu's avatar Jason Wu
Browse files

adding single polarisation 10 bit unpacker for dspsr

parent 64d92803
No related branches found
No related tags found
1 merge request!5adding single polarisation 10 bit unpacker for dspsr
Pipeline #92962 failed
...@@ -14,6 +14,7 @@ set(psrdada_cpp_effelsberg_edd_src ...@@ -14,6 +14,7 @@ set(psrdada_cpp_effelsberg_edd_src
src/GatedSpectrometer.cu src/GatedSpectrometer.cu
src/EDDPolnMerge.cpp src/EDDPolnMerge.cpp
src/EDDPolnMerge10to8.cpp src/EDDPolnMerge10to8.cpp
src/EDDPolnMerge10to8_1pol.cpp
src/EDDRoach.cpp src/EDDRoach.cpp
src/EDDRoach_merge.cpp src/EDDRoach_merge.cpp
src/EDDRoach_merge_leap.cpp src/EDDRoach_merge_leap.cpp
...@@ -32,6 +33,7 @@ set(psrdada_cpp_effelsberg_edd_inc ...@@ -32,6 +33,7 @@ set(psrdada_cpp_effelsberg_edd_inc
DetectorAccumulator.cuh DetectorAccumulator.cuh
EDDPolnMerge.hpp EDDPolnMerge.hpp
EDDPolnMerge10to8.hpp EDDPolnMerge10to8.hpp
EDDPolnMerge10to8_1pol.hpp
EDDRoach.hpp EDDRoach.hpp
EDDRoach_merge.hpp EDDRoach_merge.hpp
EDDRoach_merge_leap.hpp EDDRoach_merge_leap.hpp
...@@ -84,6 +86,10 @@ add_executable(edd_merge_10to8 src/EDDPolnMerge10to8_cli.cpp) ...@@ -84,6 +86,10 @@ add_executable(edd_merge_10to8 src/EDDPolnMerge10to8_cli.cpp)
target_link_libraries(edd_merge_10to8 ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES}) target_link_libraries(edd_merge_10to8 ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_merge_10to8 DESTINATION bin) install(TARGETS edd_merge_10to8 DESTINATION bin)
add_executable(edd_merge_10to8_1pol src/EDDPolnMerge10to8_1pol_cli.cpp)
target_link_libraries(edd_merge_10to8_1pol ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_merge_10to8_1pol DESTINATION bin)
add_executable(edd_roach src/EDDRoach_cli.cpp) add_executable(edd_roach src/EDDRoach_cli.cpp)
target_link_libraries(edd_roach ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES}) target_link_libraries(edd_roach ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach DESTINATION bin) install(TARGETS edd_roach DESTINATION bin)
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_1POL_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_1POL_HPP
#define HEAP_SIZE_10BIT 5120
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class EDDPolnMerge10to8_1pol
{
public:
EDDPolnMerge10to8_1pol(std::size_t nsamps_per_heap, std::size_t npol, int nthreads, DadaWriteClient& writer);
~EDDPolnMerge10to8_1pol();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
std::size_t _nsamps_per_heap;
std::size_t _npol;
int _nthreads;
DadaWriteClient& _writer;
};
} // edd
} // effelsberg
} // psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_1POL_HPP
#include "psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8_1pol.hpp"
#include "ascii_header.h"
#include <immintrin.h>
#include <time.h>
#include <iomanip>
#include <cmath>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
uint64_t *unpack5(uint64_t *qword, uint8_t *out)
{
uint64_t val, rest;
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[0] = ((int64_t)(( 0xFFC0000000000000 & val) << 0) >> 54) & 0xFF;
out[1] = ((int64_t)(( 0x003FF00000000000 & val) << 10) >> 54) & 0xFF;
out[2] = ((int64_t)(( 0x00000FFC00000000 & val) << 20) >> 54) & 0xFF;
out[3] = ((int64_t)(( 0x00000003FF000000 & val) << 30) >> 54) & 0xFF;
out[4] = ((int64_t)(( 0x0000000000FFC000 & val) << 40) >> 54) & 0xFF;
out[5] = ((int64_t)(( 0x0000000000003FF0 & val) << 50) >> 54) & 0xFF;
rest = ( 0x000000000000000F & val) << 60; // 4 bits rest.
// 2nd:
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[6] = ((int64_t)(((0xFC00000000000000 & val) >> 4) | rest) >> 54) & 0xFF;
out[7] = ((int64_t)(( 0x03FF000000000000 & val) << 6) >> 54) & 0xFF;
out[8] = ((int64_t)(( 0x0000FFC000000000 & val) << 16) >> 54) & 0xFF;
out[9] = ((int64_t)(( 0x0000003FF0000000 & val) << 26) >> 54) & 0xFF;
out[10] = ((int64_t)(( 0x000000000FFC0000 & val) << 36) >> 54) & 0xFF;
out[11] = ((int64_t)(( 0x000000000003FF00 & val) << 46) >> 54) & 0xFF;
rest = ( 0x00000000000000FF & val) << 56; // 8 bits rest.
// 3rd:
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[12] = ((int64_t)(((0xC000000000000000 & val) >> 8) | rest) >> 54) & 0xFF;
out[13] = ((int64_t)(( 0x3FF0000000000000 & val) << 2) >> 54) & 0xFF;
out[14] = ((int64_t)(( 0x000FFC0000000000 & val) << 12) >> 54) & 0xFF;
out[15] = ((int64_t)(( 0x000003FF00000000 & val) << 22) >> 54) & 0xFF;
out[16] = ((int64_t)(( 0x00000000FFC00000 & val) << 32) >> 54) & 0xFF;
out[17] = ((int64_t)(( 0x00000000003FF000 & val) << 42) >> 54) & 0xFF;
out[18] = ((int64_t)(( 0x0000000000000FFC & val) << 52) >> 54) & 0xFF;
rest = ( 0x0000000000000003 & val) << 62; // 2 bits rest.
// 4th:
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[19] = ((int64_t)(((0xFF00000000000000 & val) >> 2) | rest) >> 54) & 0xFF;
out[20] = ((int64_t)(( 0x00FFC00000000000 & val) << 8) >> 54) & 0xFF;
out[21] = ((int64_t)(( 0x00003FF000000000 & val) << 18) >> 54) & 0xFF;
out[22] = ((int64_t)(( 0x0000000FFC000000 & val) << 28) >> 54) & 0xFF;
out[23] = ((int64_t)(( 0x0000000003FF0000 & val) << 38) >> 54) & 0xFF;
out[24] = ((int64_t)(( 0x000000000000FFC0 & val) << 48) >> 54) & 0xFF;
rest = ( 0x000000000000003F & val) << 58; // 6 bits rest.
// 5th:
val = be64toh(*qword);
//printf("0x%016lX\n",val);
qword++;
out[25] = ((int64_t)(((0xF000000000000000 & val) >> 6) | rest) >> 54) & 0xFF;
out[26] = ((int64_t)(( 0x0FFC000000000000 & val) << 4) >> 54) & 0xFF;
out[27] = ((int64_t)(( 0x0003FF0000000000 & val) << 14) >> 54) & 0xFF;
out[28] = ((int64_t)(( 0x000000FFC0000000 & val) << 24) >> 54) & 0xFF;
out[29] = ((int64_t)(( 0x000000003FF00000 & val) << 34) >> 54) & 0xFF;
out[30] = ((int64_t)(( 0x00000000000FFC00 & val) << 44) >> 54) & 0xFF;
out[31] = ((int64_t)(( 0x00000000000003FF & val) << 54) >> 54) & 0xFF;
rest = 0; // No rest.
return qword;
}
void handle_packet_numbers_4096x10_s(char const *buf, char *out)
{ // Print 4096 numbers of 10 bit signed integers.
uint64_t val, rest;
uint64_t *qword0 = (uint64_t*)(buf);
uint8_t* D = reinterpret_cast<uint8_t*>(out);
for (int i = 0; i < 640 / 5; i++)
{
qword0 = unpack5(qword0, D);
*D += 32;
}
}
EDDPolnMerge10to8_1pol::EDDPolnMerge10to8_1pol(std::size_t nsamps_per_heap, std::size_t npol, int nthreads, DadaWriteClient& writer)
: _nsamps_per_heap(nsamps_per_heap)
, _npol(npol)
, _nthreads(nthreads)
, _writer(writer)
{
}
EDDPolnMerge10to8_1pol::~EDDPolnMerge10to8_1pol()
{
}
void EDDPolnMerge10to8_1pol::init(RawBytes& block)
{
RawBytes& oblock = _writer.header_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
_writer.header_stream().release();
throw std::runtime_error("Output DADA buffer does not have enough space for header");
}
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
char buffer[1024];
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL);
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start " << sample_clock_start;
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock " << sample_clock;
BOOST_LOG_TRIVIAL(debug) << "this is sync_time " << sync_time;
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start / sample_clock " << sample_clock_start / sample_clock;
long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = unix_time / 86400 - 40587.5;
char time_buffer[80];
std::time_t unix_time_int;
struct std::tm * timeinfo;
double fractpart, intpart;
fractpart = std::modf (static_cast<double>(unix_time) , &intpart);
unix_time_int = static_cast<std::time_t>(intpart);
timeinfo = std::gmtime (&unix_time_int);
std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo);
std::stringstream utc_time_stamp;
BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time;
BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart;
utc_time_stamp << time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart * 10000000000) << std::setfill(' ');
BOOST_LOG_TRIVIAL(debug) << "this is start time in utc " << utc_time_stamp.str().c_str() << "\n";
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
BOOST_LOG_TRIVIAL(info) << "Output header released" << "\n";
}
bool EDDPolnMerge10to8_1pol::operator()(RawBytes& block)
{
std::cout << "Beginning of the operator" << std::endl;
std: size_t nheap_groups = block.used_bytes() / HEAP_SIZE_10BIT;
RawBytes& oblock = _writer.data_stream().next();
// if (block.used_bytes() > oblock.total_bytes())
// {
// _writer.data_stream().release();
// throw std::runtime_error("Output DADA buffer does not match with the input dada buffer");
// }
/* convert 10 bit to 8 bit data here */
BOOST_LOG_TRIVIAL(debug) << "block.used_bytes() = " << block.used_bytes();
BOOST_LOG_TRIVIAL(debug) << "Entering unpack loop";
#pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads)
for (std::size_t kk = 0; kk < block.used_bytes() / HEAP_SIZE_10BIT ; ++kk)
{
char *buffer = block.ptr() + HEAP_SIZE_10BIT * kk;
handle_packet_numbers_4096x10_s(buffer, oblock.ptr() + kk * _nsamps_per_heap );
}
oblock.used_bytes(block.used_bytes() * _nsamps_per_heap / HEAP_SIZE_10BIT);
//oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
return false;
}
}//edd
}//effelsberg
}//psrdada_cpp
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8_1pol.hpp"
#include "boost/program_options.hpp"
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t input_key;
key_t output_key;
std::size_t npol;
std::size_t nsamps_per_heap;
int nthreads;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("input_key,i", po::value<std::string>()
->default_value("dada")
->notifier([&input_key](std::string in)
{
input_key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("output_key,o", po::value<std::string>()
->default_value("dadc")
->notifier([&output_key](std::string out)
{
output_key = string_to_key(out);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("npol,p", po::value<std::size_t>(&npol)->default_value(2),
"Value of number of pol")
("nthreads,n", po::value<int>(&nthreads)->default_value(4),
"Value of number of threads")
("nsamps_per_heap,b", po::value<std::size_t>(&nsamps_per_heap)->default_value(4096),
"Value of samples per heap")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "EDDPolnMerge10to8_1pol -- Read EDD data from a DADA buffer and merge the polarizations"
<< std::endl << desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch (po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("edd::EDDPolnMerge10to8_1pol");
DadaWriteClient output(output_key, log);
effelsberg::edd::EDDPolnMerge10to8_1pol merger(nsamps_per_heap, npol, nthreads, output);
DadaInputStream <decltype(merger)> input(input_key, log, merger);
input.start();
/**
* End of application code
*/
}
catch (std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment