Commit 61678df5 authored by root's avatar root
Browse files

EDDPolnMerge added

parent 99e4b6e0
......@@ -37,13 +37,15 @@ namespace psrdada_cpp {
auto& data_stream = client.data_stream();
while (!_stop && !handler_stop_request)
{
handler_stop_request = _handler(data_stream.next());
data_stream.release();
if (data_stream.at_end())
{
BOOST_LOG_TRIVIAL(info) << "Reached end of data";
break;
}
handler_stop_request = _handler(data_stream.next());
data_stream.release();
//handler_stop_request = _handler(data_stream.next());
//data_stream.release();
}
}
_running = false;
......
# OpenMP (optional for shared memory multiprocessing)
option(ENABLE_OPENMP "OpenMP for multithreading" ON)
if(ENABLE_OPENMP)
include(FindOpenMP)
if(OPENMP_FOUND)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OpenMP_C_FLAGS}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OpenMP_CXX_FLAGS}")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${OpenMP_EXE_LINKER_FLAGS}")
endif(OPENMP_FOUND)
endif(ENABLE_OPENMP)
if(ENABLE_CUDA)
set(PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES
......@@ -9,6 +20,9 @@ set(psrdada_cpp_effelsberg_edd_src
src/Channeliser.cu
src/DadaBufferLayout.cpp
src/DetectorAccumulator.cu
src/EDDPolnMerge.cpp
src/EDDRoach.cpp
src/EDDRoach_merge.cpp
src/ScaledTransposeTFtoTFT.cu
src/Tools.cu
src/Unpacker.cu
......@@ -39,5 +53,17 @@ cuda_add_executable(dada_dummy_data src/dummy_data_generator.cu)
target_link_libraries(dada_dummy_data ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
install(TARGETS VLBI DESTINATION bin)
add_executable(edd_merge src/EDDPolnMerge_cli.cpp)
target_link_libraries(edd_merge ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_merge DESTINATION bin)
add_executable(edd_roach src/EDDRoach_cli.cpp)
target_link_libraries(edd_roach ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach DESTINATION bin)
add_executable(edd_roach_merge src/EDDRoach_merge_cli.cpp)
target_link_libraries(edd_roach_merge ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach_merge DESTINATION bin)
add_subdirectory(test)
endif(ENABLE_CUDA)
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class EDDPolnMerge
{
public:
EDDPolnMerge(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer);
~EDDPolnMerge();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
std::size_t _nsamps_per_heap;
std::size_t _npol;
DadaWriteClient& _writer;
};
} // edd
} // effelsberg
} // psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class EDDRoach
{
public:
EDDRoach(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer);
~EDDRoach();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
std::size_t _nsamps_per_heap;
std::size_t _npol;
DadaWriteClient& _writer;
};
} // edd
} // effelsberg
} // psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class EDDRoach_merge
{
public:
EDDRoach_merge(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer);
~EDDRoach_merge();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
std::size_t _nsamps_per_heap;
std::size_t _npol;
DadaWriteClient& _writer;
};
} // edd
} // effelsberg
} // psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_EDDPOLNMERGE_HPP
#include "psrdada_cpp/effelsberg/edd/EDDPolnMerge.hpp"
#include "ascii_header.h"
#include <immintrin.h>
#include <time.h>
#include <iomanip>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
EDDPolnMerge::EDDPolnMerge(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer)
: _nsamps_per_heap(nsamps_per_heap)
, _npol(npol)
, _writer(writer)
{
}
EDDPolnMerge::~EDDPolnMerge()
{
}
void EDDPolnMerge::init(RawBytes& block)
{
RawBytes& oblock = _writer.header_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
_writer.header_stream().release();
throw std::runtime_error("Output DADA buffer does not have enough space for header");
}
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
char buffer[1024];
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL);
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start "<< sample_clock_start;
BOOST_LOG_TRIVIAL(debug)<< "this is sample_clock "<< sample_clock;
BOOST_LOG_TRIVIAL(debug) << "this is sync_time "<< sync_time;
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start / sample_clock "<< sample_clock_start / sample_clock;
long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = unix_time / 86400 - 40587.5;
char time_buffer[80];
std::time_t unix_time_int;
struct std::tm * timeinfo;
double fractpart, intpart;
fractpart = modf (static_cast<double>(unix_time) , &intpart);
unix_time_int = static_cast<std::time_t>(intpart);
timeinfo = std::gmtime (&unix_time_int);
std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo);
std::stringstream utc_time_stamp;
BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time;
BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart;
//BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
//utc_time_stamp<< time_buffer << "." <<fractpart;
utc_time_stamp<< time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart*10000000000) << std::setfill(' ');
//BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
//utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
BOOST_LOG_TRIVIAL(debug) << "this is start time in utc "<< utc_time_stamp.str().c_str()<< "\n";
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
}
bool EDDPolnMerge::operator()(RawBytes& block)
{
std:size_t nheap_groups = block.used_bytes()/_npol/_nsamps_per_heap;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
RawBytes& oblock = _writer.data_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
_writer.data_stream().release();
throw std::runtime_error("Output DADA buffer does not match with the input dada buffer");
}
uint32_t* S0 = reinterpret_cast<uint32_t*>(block.ptr());
uint32_t* S1 = reinterpret_cast<uint32_t*>(block.ptr() + _nsamps_per_heap);
uint64_t* D = reinterpret_cast<uint64_t*>(oblock.ptr());
for (std::size_t jj = 0; jj < nheap_groups; ++jj)
{
for (std::size_t ii = 0; ii < _nsamps_per_heap/sizeof(uint32_t); ++ii)
{
*D++ = interleave(*S1++, *S0++);
}
S0 += _nsamps_per_heap/sizeof(uint32_t);
S1 += _nsamps_per_heap/sizeof(uint32_t);
}
oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
return false;
}
}//edd
}//effelsberg
}//psrdada_cpp
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/effelsberg/edd/EDDPolnMerge.hpp"
#include "boost/program_options.hpp"
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t input_key;
key_t output_key;
std::size_t npol;
std::size_t nsamps_per_heap;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("input_key,i", po::value<std::string>()
->default_value("dada")
->notifier([&input_key](std::string in)
{
input_key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("output_key,o", po::value<std::string>()
->default_value("dadc")
->notifier([&output_key](std::string out)
{
output_key = string_to_key(out);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("npol,p", po::value<std::size_t>(&npol)->default_value(2),
"Value of number of pol")
("nsamps_per_heap,n", po::value<std::size_t>(&nsamps_per_heap)->default_value(4096),
"Value of samples per heap")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "EDDPolnMerge -- Read EDD data from a DADA buffer and merge the polarizations"
<< std::endl << desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch(po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("edd::EDDPolnMerge");
DadaWriteClient output(output_key, log);
effelsberg::edd::EDDPolnMerge merger(nsamps_per_heap, npol, output);
DadaInputStream <decltype(merger)> input(input_key, log, merger);
input.start();
/**
* End of application code
*/
}
catch(std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
#include "psrdada_cpp/effelsberg/edd/EDDRoach.hpp"
#include "ascii_header.h"
#include <immintrin.h>
#include <time.h>
#include <iomanip>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
uint64_t interleave(uint32_t x, uint32_t y) {
__m128i xvec = _mm_cvtsi32_si128(x);
__m128i yvec = _mm_cvtsi32_si128(y);
__m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
return _mm_cvtsi128_si64(interleaved);
}
EDDRoach::EDDRoach(std::size_t nsamps_per_heap, std::size_t npol, DadaWriteClient& writer)
: _nsamps_per_heap(nsamps_per_heap)
, _npol(npol)
, _writer(writer)
{
}
EDDRoach::~EDDRoach()
{
}
void EDDRoach::init(RawBytes& block)
{
RawBytes& oblock = _writer.header_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
_writer.header_stream().release();
throw std::runtime_error("Output DADA buffer does not have enough space for header");
}
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
char buffer[1024];
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL);
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start "<< sample_clock_start;
BOOST_LOG_TRIVIAL(debug)<< "this is sample_clock "<< sample_clock;
BOOST_LOG_TRIVIAL(debug) << "this is sync_time "<< sync_time;
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start / sample_clock "<< sample_clock_start / sample_clock;
long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = unix_time / 86400 - 40587.5;
char time_buffer[80];
std::time_t unix_time_int;
struct std::tm * timeinfo;
double fractpart, intpart;
fractpart = modf (static_cast<double>(unix_time) , &intpart);
unix_time_int = static_cast<std::time_t>(intpart);
timeinfo = std::gmtime (&unix_time_int);
std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo);
std::stringstream utc_time_stamp;
BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time;
BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart;
//BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
//utc_time_stamp<< time_buffer << "." <<fractpart;
utc_time_stamp<< time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart*10000000000) << std::setfill(' ');
//BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
//utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
BOOST_LOG_TRIVIAL(info) << "this is start time in utc "<< utc_time_stamp.str().c_str()<< "\n";
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
}
bool EDDRoach::operator()(RawBytes& block)
{
// std:size_t nheap_groups = block.used_bytes()/_npol/_nsamps_per_heap;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
RawBytes& oblock = _writer.data_stream().next();
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
return false;
}
}//edd
}//effelsberg
}//psrdada_cpp
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/effelsberg/edd/EDDRoach.hpp"
#include "boost/program_options.hpp"
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t input_key;
key_t output_key;
std::size_t npol;
std::size_t nsamps_per_heap;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("input_key,i", po::value<std::string>()
->default_value("dada")
->notifier([&input_key](std::string in)
{
input_key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("output_key,o", po::value<std::string>()
->default_value("dadc")
->notifier([&output_key](std::string out)
{
output_key = string_to_key(out);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("npol,p", po::value<std::size_t>(&npol)->default_value(2),
"Value of number of pol")
("nsamps_per_heap,n", po::value<std::size_t>(&nsamps_per_heap)->default_value(4096),
"Value of samples per heap")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try