diff --git a/psrdada_cpp/effelsberg/edd/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/CMakeLists.txt
index ca34dd724cb6d8c2f0f1ac125931b24f5a0aeb5a..12320d16cbf70b4f91134195107ba96c59e0abd4 100644
--- a/psrdada_cpp/effelsberg/edd/CMakeLists.txt
+++ b/psrdada_cpp/effelsberg/edd/CMakeLists.txt
@@ -19,6 +19,7 @@ set(psrdada_cpp_effelsberg_edd_src
src/EDDRoach_merge.cpp
src/EDDRoach_merge_leap.cpp
src/dada_disk_sink_leap.cpp
+ src/dada_disk_sink_multithread.cpp
src/ScaledTransposeTFtoTFT.cu
src/SKRfiReplacementCuda.cu
src/SpectralKurtosisCuda.cu
@@ -38,6 +39,7 @@ set(psrdada_cpp_effelsberg_edd_inc
EDDRoach_merge.hpp
EDDRoach_merge_leap.hpp
dada_disk_sink_leap.hpp
+ dada_disk_sink_multithread.hpp
FftSpectrometer.cuh
GatedSpectrometer.cuh
Packer.cuh
@@ -107,5 +109,12 @@ add_executable(dbdiskleap src/dbdisk_leap.cpp)
target_link_libraries (dbdiskleap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS dbdiskleap DESTINATION bin)
+#dbdisk_multithread
+add_executable(dbdisk_multithread src/dbdisk_multithread.cpp)
+target_link_libraries (dbdisk_multithread ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
+install(TARGETS dbdisk_multithread DESTINATION bin)
+
+
+
add_subdirectory(test)
endif(ENABLE_CUDA)
diff --git a/psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp b/psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp
new file mode 100644
index 0000000000000000000000000000000000000000..c360f38c94c7dc6e3f7f14cb053cb9384adbed73
--- /dev/null
+++ b/psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp
@@ -0,0 +1,35 @@
+#ifndef PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
+#define PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
+#define HEADER_SIZE 4096
+#define START_TIME 1024
+#define HEAP_SIZE 32000
+#include "psrdada_cpp/raw_bytes.hpp"
+#include "psrdada_cpp/common.hpp"
+#include "psrdada_cpp/dada_write_client.hpp"
+#include <fstream>
+#include <vector>
+namespace psrdada_cpp {
+namespace effelsberg {
+namespace edd {
+class DiskSinkMultithread
+{
+public:
+ DiskSinkMultithread(std::string prefix, int nthread);
+ ~DiskSinkMultithread();
+ void init(RawBytes& block);
+ bool operator()(RawBytes& block);
+
+public:
+ std::string _prefix;
+ std::size_t _counter;
+ int _nthread;
+ char _header[HEADER_SIZE];
+ char _start_time[START_TIME];
+ bool first_block;
+ std::vector<char> _transpose;
+ std::vector<std::ofstream> _output_streams;
+};
+} // edd
+} // effelsberg
+} //namespace psrdada_cpp
+#endif //PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
diff --git a/psrdada_cpp/effelsberg/edd/src/dada_disk_sink_multithread.cpp b/psrdada_cpp/effelsberg/edd/src/dada_disk_sink_multithread.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..aeea32aacbfb129a6721818ca81900d3ded71f63
--- /dev/null
+++ b/psrdada_cpp/effelsberg/edd/src/dada_disk_sink_multithread.cpp
@@ -0,0 +1,58 @@
+#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp"
+#include "ascii_header.h"
+#include <chrono>
+#include <iostream>
+#include <iomanip>
+using namespace std;
+using namespace std::chrono;
+namespace psrdada_cpp {
+namespace effelsberg {
+namespace edd {
+DiskSinkMultithread::DiskSinkMultithread(std::string prefix, int nthread)
+ : _prefix(prefix)
+ , _counter(0)
+ , _output_streams(nthread)
+ , _nthread(nthread)
+{
+}
+DiskSinkMultithread::~DiskSinkMultithread()
+{
+}
+void DiskSinkMultithread::init(RawBytes& block)
+{
+ for (auto& of : _output_streams) {
+ if (of.is_open()) {
+ of.close();
+ }
+ }
+ std::memcpy(&_header, block.ptr(), block.used_bytes());
+ ascii_header_get(_header, "UTC_START", "%s", _start_time);
+ BOOST_LOG_TRIVIAL(debug) << "UTC_START = " << _start_time;
+}
+bool DiskSinkMultithread::operator()(RawBytes& block)
+{
+ for (auto& of : _output_streams) {
+ if (of.is_open()) {
+ of.close();
+ }
+ }
+ #pragma omp parallel for num_threads(_nthread)
+ for (std::size_t ii = 0; ii < _nthread; ++ii) {
+ std::size_t index = ii * block.used_bytes()/_nthread;
+ char _loop_header[HEADER_SIZE];
+ std::memcpy(&_loop_header, &_header, HEADER_SIZE);
+ ascii_header_set(_loop_header, "OBS_OFFSET", "%ld", _counter + ii * block.used_bytes() / _nthread);
+ ascii_header_set(_loop_header, "FILE_SIZE", "%ld", block.used_bytes() / _nthread);
+ std::stringstream fname;
+ fname << _start_time << std::setw(20) << "_" << std::setfill('0') << _counter + ii * block.used_bytes() / _nthread << ".dada";
+ BOOST_LOG_TRIVIAL(debug) << "filename" << fname.str();
+ _output_streams[ii].open(fname.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
+ _output_streams[ii].write((char*) _loop_header, HEADER_SIZE);
+ _output_streams[ii].write(block.ptr()+index, block.used_bytes()/_nthread);
+ }
+ _counter += block.used_bytes();
+ return false;
+}
+}//edd
+}//effelsberg
+} //namespace psrdada_cpp
diff --git a/psrdada_cpp/effelsberg/edd/src/dbdisk_multithread.cpp b/psrdada_cpp/effelsberg/edd/src/dbdisk_multithread.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..6e39d57da5e40533be649f53383e2b3d45611603
--- /dev/null
+++ b/psrdada_cpp/effelsberg/edd/src/dbdisk_multithread.cpp
@@ -0,0 +1,96 @@
+#include "psrdada_cpp/multilog.hpp"
+#include "psrdada_cpp/raw_bytes.hpp"
+#include "psrdada_cpp/dada_input_stream.hpp"
+#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp"
+#include "psrdada_cpp/cli_utils.hpp"
+#include "psrdada_cpp/dada_write_client.hpp"
+#include "boost/program_options.hpp"
+
+#include <sys/types.h>
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <ios>
+#include <algorithm>
+
+using namespace psrdada_cpp;
+
+namespace
+{
+const size_t ERROR_IN_COMMAND_LINE = 1;
+const size_t SUCCESS = 0;
+const size_t ERROR_UNHANDLED_EXCEPTION = 2;
+} // namespace
+
+int main(int argc, char** argv)
+{
+ try
+ {
+ key_t key;
+ std::string prefix;
+ int nthread;
+ /** Define and parse the program options
+ */
+ namespace po = boost::program_options;
+ po::options_description desc("Options");
+ desc.add_options()
+ ("help,h", "Print help messages")
+ ("key,k", po::value<std::string>()
+ ->default_value("dadc")
+ ->notifier([&key](std::string in)
+ {
+ key = string_to_key(in);
+ }),
+ "The shared memory key for the dada buffer to connect to (hex string)")
+ ("prefix,p", po::value<std::string>(&prefix)
+ ->default_value("dbdisk_dump"),
+ "Prefix for the filename to write to")
+ ("nthread,n", po::value<int>(&nthread)
+ ->default_value(2),
+ "number of threads to write")
+ ("log_level", po::value<std::string>()
+ ->default_value("info")
+ ->notifier([](std::string level)
+ {
+ set_log_level(level);
+ }),
+ "The logging level to use (debug, info, warning, error)");
+ po::variables_map vm;
+ try
+ {
+ po::store(po::parse_command_line(argc, argv, desc), vm);
+ if ( vm.count("help") )
+ {
+ std::cout << "DbDiskMultithread -- read from DADA ring buffer and write to disk with multiple thread" << std::endl
+ << desc << std::endl;
+ return SUCCESS;
+ }
+ po::notify(vm);
+ }
+ catch (po::error& e)
+ {
+ std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
+ std::cerr << desc << std::endl;
+ return ERROR_IN_COMMAND_LINE;
+ }
+
+ /**
+ * All the application code goes here
+ */
+ MultiLog log("dbdixk");
+ effelsberg::edd::DiskSinkMultithread sink(prefix, nthread);
+ DadaInputStream<decltype(sink)> stream(key, log, sink);
+ stream.start();
+ /**
+ * End of application code
+ */
+ }
+ catch (std::exception& e)
+ {
+ std::cerr << "Unhandled Exception reached the top of main: "
+ << e.what() << ", application will now exit" << std::endl;
+ return ERROR_UNHANDLED_EXCEPTION;
+ }
+ return SUCCESS;
+
+}