Commit cf0b0b7a authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Merge branch 'multithread_dbdisk' into 'devel'

Adding multithread dbdisk

See merge request mpifr-bdg/psrdada_cpp!8
parents 980771c9 352ceaae
Pipeline #99750 failed with stages
in 39 seconds
......@@ -19,6 +19,7 @@ set(psrdada_cpp_effelsberg_edd_src
src/EDDRoach_merge.cpp
src/EDDRoach_merge_leap.cpp
src/dada_disk_sink_leap.cpp
src/dada_disk_sink_multithread.cpp
src/ScaledTransposeTFtoTFT.cu
src/SKRfiReplacementCuda.cu
src/SpectralKurtosisCuda.cu
......@@ -38,6 +39,7 @@ set(psrdada_cpp_effelsberg_edd_inc
EDDRoach_merge.hpp
EDDRoach_merge_leap.hpp
dada_disk_sink_leap.hpp
dada_disk_sink_multithread.hpp
FftSpectrometer.cuh
GatedSpectrometer.cuh
Packer.cuh
......@@ -107,5 +109,12 @@ add_executable(dbdiskleap src/dbdisk_leap.cpp)
target_link_libraries (dbdiskleap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS dbdiskleap DESTINATION bin)
#dbdisk_multithread
add_executable(dbdisk_multithread src/dbdisk_multithread.cpp)
target_link_libraries (dbdisk_multithread ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS dbdisk_multithread DESTINATION bin)
add_subdirectory(test)
endif(ENABLE_CUDA)
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
#define HEADER_SIZE 4096
#define START_TIME 1024
#define HEAP_SIZE 32000
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include <fstream>
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class DiskSinkMultithread
{
public:
DiskSinkMultithread(std::string prefix, int nthread);
~DiskSinkMultithread();
void init(RawBytes& block);
bool operator()(RawBytes& block);
public:
std::string _prefix;
std::size_t _counter;
int _nthread;
char _header[HEADER_SIZE];
char _start_time[START_TIME];
bool first_block;
std::vector<char> _transpose;
std::vector<std::ofstream> _output_streams;
};
} // edd
} // effelsberg
} //namespace psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp"
#include "ascii_header.h"
#include <chrono>
#include <iostream>
#include <iomanip>
using namespace std;
using namespace std::chrono;
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
DiskSinkMultithread::DiskSinkMultithread(std::string prefix, int nthread)
: _prefix(prefix)
, _counter(0)
, _output_streams(nthread)
, _nthread(nthread)
{
}
DiskSinkMultithread::~DiskSinkMultithread()
{
}
void DiskSinkMultithread::init(RawBytes& block)
{
for (auto& of : _output_streams) {
if (of.is_open()) {
of.close();
}
}
std::memcpy(&_header, block.ptr(), block.used_bytes());
ascii_header_get(_header, "UTC_START", "%s", _start_time);
BOOST_LOG_TRIVIAL(debug) << "UTC_START = " << _start_time;
}
bool DiskSinkMultithread::operator()(RawBytes& block)
{
for (auto& of : _output_streams) {
if (of.is_open()) {
of.close();
}
}
#pragma omp parallel for num_threads(_nthread)
for (std::size_t ii = 0; ii < _nthread; ++ii) {
std::size_t index = ii * block.used_bytes()/_nthread;
char _loop_header[HEADER_SIZE];
std::memcpy(&_loop_header, &_header, HEADER_SIZE);
ascii_header_set(_loop_header, "OBS_OFFSET", "%ld", _counter + ii * block.used_bytes() / _nthread);
ascii_header_set(_loop_header, "FILE_SIZE", "%ld", block.used_bytes() / _nthread);
std::stringstream fname;
fname << _start_time << std::setw(20) << "_" << std::setfill('0') << _counter + ii * block.used_bytes() / _nthread << ".dada";
BOOST_LOG_TRIVIAL(debug) << "filename" << fname.str();
_output_streams[ii].open(fname.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_output_streams[ii].write((char*) _loop_header, HEADER_SIZE);
_output_streams[ii].write(block.ptr()+index, block.used_bytes()/_nthread);
}
_counter += block.used_bytes();
return false;
}
}//edd
}//effelsberg
} //namespace psrdada_cpp
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "boost/program_options.hpp"
#include <sys/types.h>
#include <iostream>
#include <string>
#include <sstream>
#include <ios>
#include <algorithm>
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t key;
std::string prefix;
int nthread;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("key,k", po::value<std::string>()
->default_value("dadc")
->notifier([&key](std::string in)
{
key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("prefix,p", po::value<std::string>(&prefix)
->default_value("dbdisk_dump"),
"Prefix for the filename to write to")
("nthread,n", po::value<int>(&nthread)
->default_value(2),
"number of threads to write")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "DbDiskMultithread -- read from DADA ring buffer and write to disk with multiple thread" << std::endl
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch (po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("dbdixk");
effelsberg::edd::DiskSinkMultithread sink(prefix, nthread);
DadaInputStream<decltype(sink)> stream(key, log, sink);
stream.start();
/**
* End of application code
*/
}
catch (std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment