Skip to content
Snippets Groups Projects
Commit 91b06bd3 authored by Jason Wu's avatar Jason Wu
Browse files

Merge branch 'PFB_merge_update' into 'devel'

update PFB merge code for pulsar

See merge request !27
parents 2a1fe218 56674b82
No related branches found
No related tags found
1 merge request!27update PFB merge code for pulsar
Pipeline #170211 passed
...@@ -17,7 +17,7 @@ set(psrdada_cpp_effelsberg_edd_src ...@@ -17,7 +17,7 @@ set(psrdada_cpp_effelsberg_edd_src
src/EDDPolnMerge10to8_1pol.cpp src/EDDPolnMerge10to8_1pol.cpp
src/EDDRoach.cpp src/EDDRoach.cpp
src/EDDRoach_merge.cpp src/EDDRoach_merge.cpp
src/EDDRoach_merge_leap.cpp src/EDDPfbMerge.cpp
src/dada_disk_sink_leap.cpp src/dada_disk_sink_leap.cpp
src/dada_disk_sink_multithread.cpp src/dada_disk_sink_multithread.cpp
src/ScaledTransposeTFtoTFT.cu src/ScaledTransposeTFtoTFT.cu
...@@ -37,7 +37,7 @@ set(psrdada_cpp_effelsberg_edd_inc ...@@ -37,7 +37,7 @@ set(psrdada_cpp_effelsberg_edd_inc
EDDPolnMerge10to8_1pol.hpp EDDPolnMerge10to8_1pol.hpp
EDDRoach.hpp EDDRoach.hpp
EDDRoach_merge.hpp EDDRoach_merge.hpp
EDDRoach_merge_leap.hpp EDDPfbMerge.hpp
dada_disk_sink_leap.hpp dada_disk_sink_leap.hpp
dada_disk_sink_multithread.hpp dada_disk_sink_multithread.hpp
FftSpectrometer.cuh FftSpectrometer.cuh
...@@ -100,9 +100,9 @@ add_executable(edd_roach_merge src/EDDRoach_merge_cli.cpp) ...@@ -100,9 +100,9 @@ add_executable(edd_roach_merge src/EDDRoach_merge_cli.cpp)
target_link_libraries(edd_roach_merge ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES}) target_link_libraries(edd_roach_merge ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach_merge DESTINATION bin) install(TARGETS edd_roach_merge DESTINATION bin)
add_executable(edd_roach_merge_leap src/EDDRoach_merge_leap_cli.cpp) add_executable(edd_pfb_merge src/EDDPfbMerge_cli.cpp)
target_link_libraries(edd_roach_merge_leap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES}) target_link_libraries(edd_pfb_merge ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach_merge_leap DESTINATION bin) install(TARGETS edd_pfb_merge DESTINATION bin)
#dbdiskleap #dbdiskleap
add_executable(dbdiskleap src/dbdisk_leap.cpp) add_executable(dbdiskleap src/dbdisk_leap.cpp)
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPFBMERGE_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPFBMERGE_HPP
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class EDDPfbMerge
{
public:
EDDPfbMerge(std::size_t nchunck, int nthreads, int heap_size, DadaWriteClient& writer);
~EDDPfbMerge();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
std::size_t _nchunck;
int _nthreads;
int _heap_size;
DadaWriteClient& _writer;
};
} // edd
} // effelsberg
} // psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_EDDPFBMERGE_HPP
#include "psrdada_cpp/effelsberg/edd/EDDPfbMerge.hpp"
#include "ascii_header.h"
#include <immintrin.h>
#include <time.h>
#include <iomanip>
#include <cmath>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
EDDPfbMerge::EDDPfbMerge(std::size_t nchunck, int nthreads, int heap_size, DadaWriteClient& writer)
: _nchunck(nchunck)
, _heap_size(heap_size)
, _nthreads(nthreads)
, _writer(writer)
{
}
EDDPfbMerge::~EDDPfbMerge()
{
}
void EDDPfbMerge::init(RawBytes & block)
{
RawBytes& oblock = _writer.header_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
_writer.header_stream().release();
throw std::runtime_error("Output DADA buffer does not have enough space for header");
}
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
char buffer[1024];
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL);
long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = (unix_time / 86400.0 ) + 40587;
std::ostringstream mjd_start;
mjd_start << std::fixed;
mjd_start << std::setprecision(12);
mjd_start << mjd_time;
ascii_header_set(oblock.ptr(), "MJD_START", "%s", mjd_start.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
}
bool EDDPfbMerge::operator()(RawBytes & block)
{
BOOST_LOG_TRIVIAL(info) << "nchucnk " << _nchunck << "\n";
RawBytes& oblock = _writer.data_stream().next();
const std::size_t bytes_per_chunk = 4;
const std::size_t heap_group = _heap_size * _nchunck;
const std::size_t num_chunks = block.used_bytes() / heap_group;
if (block.used_bytes() % heap_group != 0)
{
throw std::runtime_error("num_chunks must be an integer.");
}
#pragma omp parallel for num_threads(_nthreads)
for (std::size_t xx = 0; xx < num_chunks; ++xx)
{
std::vector<const char*> chunk_ptrs(_nchunck);
for (std::size_t ii = 0; ii < _nchunck; ++ii)
{
const std::size_t offset = xx * heap_group + ii * heap_group / _nchunck;
chunk_ptrs[ii] = block.ptr() + offset;
}
const char *target = oblock.ptr() + xx * heap_group;
for (std::size_t yy = 0; yy < heap_group / _nchunck / bytes_per_chunk; ++yy)
{
for (std::size_t ii = 0; ii < _nchunck; ++ii)
{
std::memcpy((void*)target, chunk_ptrs[ii], bytes_per_chunk);
chunk_ptrs[ii] += bytes_per_chunk;
target += bytes_per_chunk;
}
}
}
oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
return false;
}
}//edd
}//effelsberg
}//psrdada_cpp
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/effelsberg/edd/EDDPfbMerge.hpp"
#include "boost/program_options.hpp"
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t input_key;
key_t output_key;
std::size_t nchunck;
int nthreads;
int heap_size;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("input_key,i", po::value<std::string>()
->default_value("dada")
->notifier([&input_key](std::string in)
{
input_key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("output_key,o", po::value<std::string>()
->default_value("dadc")
->notifier([&output_key](std::string out)
{
output_key = string_to_key(out);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("nchunck,p", po::value<std::size_t>(&nchunck)->default_value(2),
"number of incoming stream")
("nthreads,n", po::value<int>(&nthreads)->default_value(4),
"Value of number of threads")
("heap_size,b", po::value<int>(&heap_size)->default_value(8000),
"Size of a heap")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "EDDPfbMerge -- Read EDD data from a DADA buffer and merge the PFB channels"
<< std::endl << desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch (po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("edd::EDDPfbMerge");
DadaWriteClient output(output_key, log);
effelsberg::edd::EDDPfbMerge mod(nchunck, nthreads, heap_size, output);
DadaInputStream <decltype(mod)> input(input_key, log, mod);
input.start();
/**
* End of application code
*/
}
catch (std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
...@@ -12,6 +12,7 @@ set(gtest_edd_src ...@@ -12,6 +12,7 @@ set(gtest_edd_src
src/ScaledTransposeTFtoTFTTester.cu src/ScaledTransposeTFtoTFTTester.cu
src/VLBITest.cu src/VLBITest.cu
src/EDDPolnMergeTester.cpp src/EDDPolnMergeTester.cpp
src/EDDPfbMergeTester.cpp
src/SKTestVector.cpp src/SKTestVector.cpp
src/SpectralKurtosis.cpp src/SpectralKurtosis.cpp
src/SKRfiReplacement.cpp src/SKRfiReplacement.cpp
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_EDDPFBMERGETESTER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_EDDPFBMERGETESTER_HPP
#include <gtest/gtest.h>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
namespace test {
} //namespace test
} //namespace edd
} //namespace effelsberg
} //namespace psrdada_cpp
#endif //EFFELSBERG_EDD_EDDPFBMERGETESTER_HPP
#include "psrdada_cpp/effelsberg/edd/test/EDDPfbMergeTester.hpp"
#include "psrdada_cpp/effelsberg/edd/EDDPfbMerge.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/dada_read_client.hpp"
#include "psrdada_cpp/dada_db.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "ascii_header.h"
#include <vector>
struct TestParams {
std::size_t byte_per_sample;
std::size_t freq_chunk;
std::size_t time;
int nthreads;
};
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
namespace test {
class EDDPfbMergeTester : public ::testing::TestWithParam<TestParams> {
protected:
void SetUp() override {}
void TearDown() override {}
std::size_t getBufferSize(const TestParams& params) const {
return params.byte_per_sample * params.freq_chunk * params.time;
}
std::vector<char> generateTestVector(const TestParams& params) const {
std::vector<char> test_vector(params.byte_per_sample * params.time * params.freq_chunk);
for (int j = 0; j < params.freq_chunk; j++) {
for (int i = 0; i < params.time; i++) {
int index = params.byte_per_sample * i + j * params.byte_per_sample * params.time;
char value = static_cast<char>(i % 128);
std::fill(
test_vector.begin() + index,
test_vector.begin() + index + params.byte_per_sample,
value
);
}
}
return test_vector;
}
std::vector<char> generateExpectedOutput(const TestParams& params) const {
std::vector<char> expected_output(params.byte_per_sample * params.time * params.freq_chunk, 'X');
for (int j = 0; j < params.time; j++) {
for (int i = 0; i < params.freq_chunk; i++) {
int index = j * params.freq_chunk * params.byte_per_sample + i * params.byte_per_sample;
char value = static_cast<char>(j % 128);
std::fill(
expected_output.begin() + index,
expected_output.begin() + index + params.byte_per_sample,
value
);
}
}
return expected_output;
}
};
TEST_P(EDDPfbMergeTester, wrong_buffer_size) {
const TestParams& params = GetParam();
std::size_t heap_size = 4 * params.time;
std::size_t buffer_size = getBufferSize(params) * 0.9;
DadaDB db(4, buffer_size, 8, 4096);
db.create();
MultiLog log("edd::EDDPfbMerge_test");
DadaReadClient reader(db.key(), log);
DadaWriteClient writer(db.key(), log);
std::vector<char> test_vector = generateTestVector(params);
std::vector<char> expected_output = generateExpectedOutput(params);
RawBytes input(test_vector.data(), buffer_size, buffer_size);
effelsberg::edd::EDDPfbMerge merger(params.freq_chunk, params.nthreads, heap_size, writer);
std::vector<char> test_header(4096);
RawBytes input_header(test_header.data(), 4096, 4096);
merger.init(input_header);
for (std::size_t yy = 0; yy < 1; yy++) {
EXPECT_THROW({
merger(input);
RawBytes& block = reader.data_stream().next();
for (std::size_t ii = 0; ii < block.used_bytes(); ii++) {
ASSERT_EQ(block.ptr()[ii], expected_output[ii]);
}
reader.data_stream().release();
}, std::runtime_error);
}
}
TEST_P(EDDPfbMergeTester, test_sequence) {
const TestParams& params = GetParam();
std::size_t buffer_size = getBufferSize(params);
DadaDB db(4, buffer_size, 8, 4096);
db.create();
MultiLog log("edd::EDDPfbMerge_test");
DadaReadClient reader(db.key(), log);
DadaWriteClient writer(db.key(), log);
std::vector<char> test_vector = generateTestVector(params);
std::vector<char> expected_output = generateExpectedOutput(params);
RawBytes input(test_vector.data(), buffer_size, buffer_size);
effelsberg::edd::EDDPfbMerge merger(params.freq_chunk, params.nthreads, params.time * params.byte_per_sample, writer);
std::vector<char> test_header(4096);
RawBytes input_header(test_header.data(), 4096, 4096);
ascii_header_set(input_header.ptr(), "CLOCK_SAMPLE", "%s", "3200000000");
ascii_header_set(input_header.ptr(), "SAMPLE_CLOCK_START", "%s", "1000000000");
ascii_header_set(input_header.ptr(), "SYNC_TIME", "%s", "1574694522.0");
merger.init(input_header);
for (std::size_t yy = 0; yy < 1; yy++) {
merger(input);
RawBytes& block = reader.data_stream().next();
for (std::size_t ii = 0; ii < block.used_bytes(); ii++) {
ASSERT_EQ(block.ptr()[ii], expected_output[ii]);
}
reader.data_stream().release();
}
}
INSTANTIATE_TEST_CASE_P(
ParameterizedTest,
EDDPfbMergeTester,
::testing::Values(
TestParams{4, 2, 4000, 3},
TestParams{4, 64, 16000, 3},
TestParams{4, 32, 16000, 3}
)
);
} // namespace test
} // namespace edd
} // namespace effelsberg
} // namespace psrdada_cpp
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment