diff --git a/psrdada_cpp/effelsberg/edd/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/CMakeLists.txt index 057fd1d3437e2b320502cc2674e2dd5f1e778d47..ca34dd724cb6d8c2f0f1ac125931b24f5a0aeb5a 100644 --- a/psrdada_cpp/effelsberg/edd/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/CMakeLists.txt @@ -14,6 +14,7 @@ set(psrdada_cpp_effelsberg_edd_src src/GatedSpectrometer.cu src/EDDPolnMerge.cpp src/EDDPolnMerge10to8.cpp + src/EDDPolnMerge10to8_1pol.cpp src/EDDRoach.cpp src/EDDRoach_merge.cpp src/EDDRoach_merge_leap.cpp @@ -32,6 +33,7 @@ set(psrdada_cpp_effelsberg_edd_inc DetectorAccumulator.cuh EDDPolnMerge.hpp EDDPolnMerge10to8.hpp + EDDPolnMerge10to8_1pol.hpp EDDRoach.hpp EDDRoach_merge.hpp EDDRoach_merge_leap.hpp @@ -84,6 +86,10 @@ add_executable(edd_merge_10to8 src/EDDPolnMerge10to8_cli.cpp) target_link_libraries(edd_merge_10to8 ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES}) install(TARGETS edd_merge_10to8 DESTINATION bin) +add_executable(edd_merge_10to8_1pol src/EDDPolnMerge10to8_1pol_cli.cpp) +target_link_libraries(edd_merge_10to8_1pol ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES}) +install(TARGETS edd_merge_10to8_1pol DESTINATION bin) + add_executable(edd_roach src/EDDRoach_cli.cpp) target_link_libraries(edd_roach ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES}) install(TARGETS edd_roach DESTINATION bin) diff --git a/psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8_1pol.hpp b/psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8_1pol.hpp new file mode 100644 index 0000000000000000000000000000000000000000..05ab36ab1c06d3e4f3ee941b677dd0a946e59878 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8_1pol.hpp @@ -0,0 +1,51 @@ +#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_1POL_HPP +#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_1POL_HPP +#define HEAP_SIZE_10BIT 5120 +#include "psrdada_cpp/dada_write_client.hpp" +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/common.hpp" +#include <vector> + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { + +class EDDPolnMerge10to8_1pol +{ +public: + EDDPolnMerge10to8_1pol(std::size_t nsamps_per_heap, std::size_t npol, int nthreads, DadaWriteClient& writer); + ~EDDPolnMerge10to8_1pol(); + + /** + * @brief A callback to be called on connection + * to a ring buffer. + * + * @detail The first available header block in the + * in the ring buffer is provided as an argument. + * It is here that header parameters could be read + * if desired. + * + * @param block A RawBytes object wrapping a DADA header buffer + */ + void init(RawBytes& block); + + /** + * @brief A callback to be called on acqusition of a new + * data block. + * + * @param block A RawBytes object wrapping a DADA data buffer + */ + bool operator()(RawBytes& block); + +private: + std::size_t _nsamps_per_heap; + std::size_t _npol; + int _nthreads; + DadaWriteClient& _writer; +}; + +} // edd +} // effelsberg +} // psrdada_cpp + +#endif //PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE10TO8_1POL_HPP diff --git a/psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8_1pol.cpp b/psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8_1pol.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ccfa356a6dcf540f2e6f180418fc19eb27b93fa6 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8_1pol.cpp @@ -0,0 +1,178 @@ +#include "psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8_1pol.hpp" +#include "ascii_header.h" +#include <immintrin.h> +#include <time.h> +#include <iomanip> +#include <cmath> + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { + +uint64_t interleave(uint32_t x, uint32_t y) { + __m128i xvec = _mm_cvtsi32_si128(x); + __m128i yvec = _mm_cvtsi32_si128(y); + __m128i interleaved = _mm_unpacklo_epi8(yvec, xvec); + return _mm_cvtsi128_si64(interleaved); +} + +uint64_t *unpack5(uint64_t *qword, uint8_t *out) +{ + uint64_t val, rest; + val = be64toh(*qword); + //printf("0x%016lX\n",val); + qword++; + out[0] = ((int64_t)(( 0xFFC0000000000000 & val) << 0) >> 54) & 0xFF; + out[1] = ((int64_t)(( 0x003FF00000000000 & val) << 10) >> 54) & 0xFF; + out[2] = ((int64_t)(( 0x00000FFC00000000 & val) << 20) >> 54) & 0xFF; + out[3] = ((int64_t)(( 0x00000003FF000000 & val) << 30) >> 54) & 0xFF; + out[4] = ((int64_t)(( 0x0000000000FFC000 & val) << 40) >> 54) & 0xFF; + out[5] = ((int64_t)(( 0x0000000000003FF0 & val) << 50) >> 54) & 0xFF; + rest = ( 0x000000000000000F & val) << 60; // 4 bits rest. + // 2nd: + val = be64toh(*qword); + //printf("0x%016lX\n",val); + qword++; + out[6] = ((int64_t)(((0xFC00000000000000 & val) >> 4) | rest) >> 54) & 0xFF; + out[7] = ((int64_t)(( 0x03FF000000000000 & val) << 6) >> 54) & 0xFF; + out[8] = ((int64_t)(( 0x0000FFC000000000 & val) << 16) >> 54) & 0xFF; + out[9] = ((int64_t)(( 0x0000003FF0000000 & val) << 26) >> 54) & 0xFF; + out[10] = ((int64_t)(( 0x000000000FFC0000 & val) << 36) >> 54) & 0xFF; + out[11] = ((int64_t)(( 0x000000000003FF00 & val) << 46) >> 54) & 0xFF; + rest = ( 0x00000000000000FF & val) << 56; // 8 bits rest. + // 3rd: + val = be64toh(*qword); + //printf("0x%016lX\n",val); + qword++; + out[12] = ((int64_t)(((0xC000000000000000 & val) >> 8) | rest) >> 54) & 0xFF; + out[13] = ((int64_t)(( 0x3FF0000000000000 & val) << 2) >> 54) & 0xFF; + out[14] = ((int64_t)(( 0x000FFC0000000000 & val) << 12) >> 54) & 0xFF; + out[15] = ((int64_t)(( 0x000003FF00000000 & val) << 22) >> 54) & 0xFF; + out[16] = ((int64_t)(( 0x00000000FFC00000 & val) << 32) >> 54) & 0xFF; + out[17] = ((int64_t)(( 0x00000000003FF000 & val) << 42) >> 54) & 0xFF; + out[18] = ((int64_t)(( 0x0000000000000FFC & val) << 52) >> 54) & 0xFF; + rest = ( 0x0000000000000003 & val) << 62; // 2 bits rest. + // 4th: + val = be64toh(*qword); + //printf("0x%016lX\n",val); + qword++; + out[19] = ((int64_t)(((0xFF00000000000000 & val) >> 2) | rest) >> 54) & 0xFF; + out[20] = ((int64_t)(( 0x00FFC00000000000 & val) << 8) >> 54) & 0xFF; + out[21] = ((int64_t)(( 0x00003FF000000000 & val) << 18) >> 54) & 0xFF; + out[22] = ((int64_t)(( 0x0000000FFC000000 & val) << 28) >> 54) & 0xFF; + out[23] = ((int64_t)(( 0x0000000003FF0000 & val) << 38) >> 54) & 0xFF; + out[24] = ((int64_t)(( 0x000000000000FFC0 & val) << 48) >> 54) & 0xFF; + rest = ( 0x000000000000003F & val) << 58; // 6 bits rest. + // 5th: + val = be64toh(*qword); + //printf("0x%016lX\n",val); + qword++; + out[25] = ((int64_t)(((0xF000000000000000 & val) >> 6) | rest) >> 54) & 0xFF; + out[26] = ((int64_t)(( 0x0FFC000000000000 & val) << 4) >> 54) & 0xFF; + out[27] = ((int64_t)(( 0x0003FF0000000000 & val) << 14) >> 54) & 0xFF; + out[28] = ((int64_t)(( 0x000000FFC0000000 & val) << 24) >> 54) & 0xFF; + out[29] = ((int64_t)(( 0x000000003FF00000 & val) << 34) >> 54) & 0xFF; + out[30] = ((int64_t)(( 0x00000000000FFC00 & val) << 44) >> 54) & 0xFF; + out[31] = ((int64_t)(( 0x00000000000003FF & val) << 54) >> 54) & 0xFF; + rest = 0; // No rest. + return qword; +} + +void handle_packet_numbers_4096x10_s(char const *buf, char *out) +{ // Print 4096 numbers of 10 bit signed integers. + + uint64_t val, rest; + uint64_t *qword0 = (uint64_t*)(buf); + uint8_t* D = reinterpret_cast<uint8_t*>(out); + for (int i = 0; i < 640 / 5; i++) + { + qword0 = unpack5(qword0, D); + *D += 32; + } +} + +EDDPolnMerge10to8_1pol::EDDPolnMerge10to8_1pol(std::size_t nsamps_per_heap, std::size_t npol, int nthreads, DadaWriteClient& writer) + : _nsamps_per_heap(nsamps_per_heap) + , _npol(npol) + , _nthreads(nthreads) + , _writer(writer) +{ +} + +EDDPolnMerge10to8_1pol::~EDDPolnMerge10to8_1pol() +{ +} + +void EDDPolnMerge10to8_1pol::init(RawBytes& block) +{ + RawBytes& oblock = _writer.header_stream().next(); + if (block.used_bytes() > oblock.total_bytes()) + { + _writer.header_stream().release(); + throw std::runtime_error("Output DADA buffer does not have enough space for header"); + } + std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes()); + char buffer[1024]; + ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer); + std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0); + ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer); + long double sample_clock = std::strtold(buffer, NULL); + ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer); + long double sync_time = std::strtold(buffer, NULL); + BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start " << sample_clock_start; + BOOST_LOG_TRIVIAL(debug) << "this is sample_clock " << sample_clock; + BOOST_LOG_TRIVIAL(debug) << "this is sync_time " << sync_time; + BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start / sample_clock " << sample_clock_start / sample_clock; + long double unix_time = sync_time + (sample_clock_start / sample_clock); + long double mjd_time = unix_time / 86400 - 40587.5; + char time_buffer[80]; + std::time_t unix_time_int; + struct std::tm * timeinfo; + double fractpart, intpart; + fractpart = std::modf (static_cast<double>(unix_time) , &intpart); + unix_time_int = static_cast<std::time_t>(intpart); + timeinfo = std::gmtime (&unix_time_int); + std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo); + std::stringstream utc_time_stamp; + BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time; + BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart; + utc_time_stamp << time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart * 10000000000) << std::setfill(' '); + BOOST_LOG_TRIVIAL(debug) << "this is start time in utc " << utc_time_stamp.str().c_str() << "\n"; + // std::cout << "this is sync_time MJD "<< mjd_time<< "\n"; + ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str()); + ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time); + oblock.used_bytes(oblock.total_bytes()); + _writer.header_stream().release(); + BOOST_LOG_TRIVIAL(info) << "Output header released" << "\n"; +} + +bool EDDPolnMerge10to8_1pol::operator()(RawBytes& block) +{ + std::cout << "Beginning of the operator" << std::endl; + std: size_t nheap_groups = block.used_bytes() / HEAP_SIZE_10BIT; + RawBytes& oblock = _writer.data_stream().next(); + +// if (block.used_bytes() > oblock.total_bytes()) +// { +// _writer.data_stream().release(); +// throw std::runtime_error("Output DADA buffer does not match with the input dada buffer"); +// } + + /* convert 10 bit to 8 bit data here */ + BOOST_LOG_TRIVIAL(debug) << "block.used_bytes() = " << block.used_bytes(); + BOOST_LOG_TRIVIAL(debug) << "Entering unpack loop"; + + #pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads) + for (std::size_t kk = 0; kk < block.used_bytes() / HEAP_SIZE_10BIT ; ++kk) + { + char *buffer = block.ptr() + HEAP_SIZE_10BIT * kk; + handle_packet_numbers_4096x10_s(buffer, oblock.ptr() + kk * _nsamps_per_heap ); + } + oblock.used_bytes(block.used_bytes() * _nsamps_per_heap / HEAP_SIZE_10BIT); + //oblock.used_bytes(block.used_bytes()); + _writer.data_stream().release(); + return false; +} +}//edd +}//effelsberg +}//psrdada_cpp diff --git a/psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8_1pol_cli.cpp b/psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8_1pol_cli.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c47601b7ca26aaa6823697d201a66feae9361bbc --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8_1pol_cli.cpp @@ -0,0 +1,108 @@ +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/dada_input_stream.hpp" +#include "psrdada_cpp/dada_write_client.hpp" +#include "psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8_1pol.hpp" +#include "boost/program_options.hpp" + + +using namespace psrdada_cpp; + +namespace +{ +const size_t ERROR_IN_COMMAND_LINE = 1; +const size_t SUCCESS = 0; +const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + + +int main(int argc, char** argv) +{ + try + { + key_t input_key; + key_t output_key; + std::size_t npol; + std::size_t nsamps_per_heap; + int nthreads; + + /** Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + + ("help,h", "Print help messages") + ("input_key,i", po::value<std::string>() + ->default_value("dada") + ->notifier([&input_key](std::string in) + { + input_key = string_to_key(in); + }), + "The shared memory key for the dada buffer to connect to (hex string)") + + ("output_key,o", po::value<std::string>() + ->default_value("dadc") + ->notifier([&output_key](std::string out) + { + output_key = string_to_key(out); + }), + "The shared memory key for the dada buffer to connect to (hex string)") + + ("npol,p", po::value<std::size_t>(&npol)->default_value(2), + "Value of number of pol") + + ("nthreads,n", po::value<int>(&nthreads)->default_value(4), + "Value of number of threads") + + ("nsamps_per_heap,b", po::value<std::size_t>(&nsamps_per_heap)->default_value(4096), + "Value of samples per heap") + + ("log_level", po::value<std::string>() + ->default_value("info") + ->notifier([](std::string level) + { + set_log_level(level); + }), + "The logging level to use (debug, info, warning, error)"); + + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "EDDPolnMerge10to8_1pol -- Read EDD data from a DADA buffer and merge the polarizations" + << std::endl << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch (po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + /** + * All the application code goes here + */ + MultiLog log("edd::EDDPolnMerge10to8_1pol"); + DadaWriteClient output(output_key, log); + effelsberg::edd::EDDPolnMerge10to8_1pol merger(nsamps_per_heap, npol, nthreads, output); + DadaInputStream <decltype(merger)> input(input_key, log, merger); + input.start(); + /** + * End of application code + */ + } + catch (std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; + +}