Skip to content
Snippets Groups Projects
Commit b6c735c0 authored by Jason Wu's avatar Jason Wu
Browse files

multithread dbdisk implementation

parent 6a287b3c
No related branches found
No related tags found
No related merge requests found
...@@ -19,6 +19,7 @@ set(psrdada_cpp_effelsberg_edd_src ...@@ -19,6 +19,7 @@ set(psrdada_cpp_effelsberg_edd_src
src/EDDRoach_merge.cpp src/EDDRoach_merge.cpp
src/EDDRoach_merge_leap.cpp src/EDDRoach_merge_leap.cpp
src/dada_disk_sink_leap.cpp src/dada_disk_sink_leap.cpp
src/dada_disk_sink_multithread.cpp
src/ScaledTransposeTFtoTFT.cu src/ScaledTransposeTFtoTFT.cu
src/SKRfiReplacementCuda.cu src/SKRfiReplacementCuda.cu
src/SpectralKurtosisCuda.cu src/SpectralKurtosisCuda.cu
...@@ -38,6 +39,7 @@ set(psrdada_cpp_effelsberg_edd_inc ...@@ -38,6 +39,7 @@ set(psrdada_cpp_effelsberg_edd_inc
EDDRoach_merge.hpp EDDRoach_merge.hpp
EDDRoach_merge_leap.hpp EDDRoach_merge_leap.hpp
dada_disk_sink_leap.hpp dada_disk_sink_leap.hpp
dada_disk_sink_multithread.hpp
FftSpectrometer.cuh FftSpectrometer.cuh
GatedSpectrometer.cuh GatedSpectrometer.cuh
Packer.cuh Packer.cuh
...@@ -107,5 +109,12 @@ add_executable(dbdiskleap src/dbdisk_leap.cpp) ...@@ -107,5 +109,12 @@ add_executable(dbdiskleap src/dbdisk_leap.cpp)
target_link_libraries (dbdiskleap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES}) target_link_libraries (dbdiskleap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS dbdiskleap DESTINATION bin) install(TARGETS dbdiskleap DESTINATION bin)
#dbdisk_multithread
add_executable(dbdisk_multithread src/dbdisk_multithread.cpp)
target_link_libraries (dbdisk_multithread ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS dbdisk_multithread DESTINATION bin)
add_subdirectory(test) add_subdirectory(test)
endif(ENABLE_CUDA) endif(ENABLE_CUDA)
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
#define HEADER_SIZE 4096
#define START_TIME 1024
#define HEAP_SIZE 32000
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include <fstream>
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class DiskSinkMultithread
{
public:
DiskSinkMultithread(std::string prefix, int nthread);
~DiskSinkMultithread();
void init(RawBytes& block);
bool operator()(RawBytes& block);
public:
std::string _prefix;
std::size_t _counter;
int _nthread;
char _header[HEADER_SIZE];
char _start_time[START_TIME];
bool first_block;
std::vector<char> _transpose;
std::vector<std::ofstream> _output_streams;
};
} // edd
} // effelsberg
} //namespace psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp"
#include "ascii_header.h"
#include <chrono>
#include <iostream>
#include <iomanip>
using namespace std;
using namespace std::chrono;
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
DiskSinkMultithread::DiskSinkMultithread(std::string prefix, int nthread)
: _prefix(prefix)
, _counter(0)
, _output_streams(nthread)
, _nthread(nthread)
{
}
DiskSinkMultithread::~DiskSinkMultithread()
{
}
void DiskSinkMultithread::init(RawBytes& block)
{
for (auto& of : _output_streams) {
if (of.is_open()) {
of.close();
}
}
std::memcpy(&_header, block.ptr(), block.used_bytes());
ascii_header_get(_header, "UTC_START", "%s", _start_time);
BOOST_LOG_TRIVIAL(debug) << "UTC_START = " << _start_time;
}
bool DiskSinkMultithread::operator()(RawBytes& block)
{
for (auto& of : _output_streams) {
if (of.is_open()) {
of.close();
}
}
#pragma omp parallel for num_threads(_nthread)
for (std::size_t ii = 0; ii < _nthread; ++ii) {
std::size_t index = ii * block.used_bytes()/_nthread;
char _loop_header[HEADER_SIZE];
std::memcpy(&_loop_header, &_header, HEADER_SIZE);
ascii_header_set(_loop_header, "OBS_OFFSET", "%ld", _counter + ii * block.used_bytes() / _nthread);
ascii_header_set(_loop_header, "FILE_SIZE", "%ld", block.used_bytes() / _nthread);
std::stringstream fname;
fname << _start_time << std::setw(20) << std::setfill('0') << _counter + ii * block.used_bytes() / _nthread << ".dada";
BOOST_LOG_TRIVIAL(debug) << "filename" << fname.str();
_output_streams[ii].open(fname.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_output_streams[ii].write((char*) _loop_header, HEADER_SIZE);
_output_streams[ii].write(block.ptr()+index, block.used_bytes()/_nthread);
}
_counter += block.used_bytes();
return false;
}
}//edd
}//effelsberg
} //namespace psrdada_cpp
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "boost/program_options.hpp"
#include <sys/types.h>
#include <iostream>
#include <string>
#include <sstream>
#include <ios>
#include <algorithm>
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t key;
std::string prefix;
int nthread;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("key,k", po::value<std::string>()
->default_value("dadc")
->notifier([&key](std::string in)
{
key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("prefix,p", po::value<std::string>(&prefix)
->default_value("dbdisk_dump"),
"Prefix for the filename to write to")
("nthread,n", po::value<int>(&nthread)
->default_value(2),
"number of threads to write")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "DbDiskMultithread -- read from DADA ring buffer and write to disk with multiple thread" << std::endl
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch (po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("dbdixk");
effelsberg::edd::DiskSinkMultithread sink(prefix, nthread);
DadaInputStream<decltype(sink)> stream(key, log, sink);
stream.start();
/**
* End of application code
*/
}
catch (std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment