From b6c735c02a377bb6091e0ca9558e5f2bc9dc1164 Mon Sep 17 00:00:00 2001
From: Jason Wu <jwu@mpifr-bonn.mpg.de>
Date: Thu, 8 Apr 2021 17:12:09 +0200
Subject: [PATCH] multithread dbdisk implementation

---
 psrdada_cpp/effelsberg/edd/CMakeLists.txt     |  9 ++
 .../edd/dada_disk_sink_multithread.hpp        | 35 +++++++
 .../edd/src/dada_disk_sink_multithread.cpp    | 58 +++++++++++
 .../effelsberg/edd/src/dbdisk_multithread.cpp | 96 +++++++++++++++++++
 4 files changed, 198 insertions(+)
 create mode 100644 psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp
 create mode 100644 psrdada_cpp/effelsberg/edd/src/dada_disk_sink_multithread.cpp
 create mode 100644 psrdada_cpp/effelsberg/edd/src/dbdisk_multithread.cpp

diff --git a/psrdada_cpp/effelsberg/edd/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/CMakeLists.txt
index ca34dd72..12320d16 100644
--- a/psrdada_cpp/effelsberg/edd/CMakeLists.txt
+++ b/psrdada_cpp/effelsberg/edd/CMakeLists.txt
@@ -19,6 +19,7 @@ set(psrdada_cpp_effelsberg_edd_src
     src/EDDRoach_merge.cpp
     src/EDDRoach_merge_leap.cpp
     src/dada_disk_sink_leap.cpp
+    src/dada_disk_sink_multithread.cpp
     src/ScaledTransposeTFtoTFT.cu
     src/SKRfiReplacementCuda.cu
     src/SpectralKurtosisCuda.cu
@@ -38,6 +39,7 @@ set(psrdada_cpp_effelsberg_edd_inc
     EDDRoach_merge.hpp
     EDDRoach_merge_leap.hpp
     dada_disk_sink_leap.hpp
+    dada_disk_sink_multithread.hpp
     FftSpectrometer.cuh
     GatedSpectrometer.cuh
     Packer.cuh
@@ -107,5 +109,12 @@ add_executable(dbdiskleap src/dbdisk_leap.cpp)
 target_link_libraries (dbdiskleap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
 install(TARGETS dbdiskleap DESTINATION bin)
 
+#dbdisk_multithread
+add_executable(dbdisk_multithread src/dbdisk_multithread.cpp)
+target_link_libraries (dbdisk_multithread ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
+install(TARGETS dbdisk_multithread DESTINATION bin)
+
+
+
 add_subdirectory(test)
 endif(ENABLE_CUDA)
diff --git a/psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp b/psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp
new file mode 100644
index 00000000..c360f38c
--- /dev/null
+++ b/psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp
@@ -0,0 +1,35 @@
+#ifndef PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
+#define PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
+#define HEADER_SIZE 4096
+#define START_TIME 1024
+#define HEAP_SIZE 32000
+#include "psrdada_cpp/raw_bytes.hpp"
+#include "psrdada_cpp/common.hpp"
+#include "psrdada_cpp/dada_write_client.hpp"
+#include <fstream>
+#include <vector>
+namespace psrdada_cpp {
+namespace effelsberg {
+namespace edd {
+class DiskSinkMultithread
+{
+public:
+    DiskSinkMultithread(std::string prefix, int nthread);
+    ~DiskSinkMultithread();
+    void init(RawBytes& block);
+    bool operator()(RawBytes& block);
+
+public:
+    std::string _prefix;
+    std::size_t _counter;
+    int _nthread;
+    char _header[HEADER_SIZE];
+    char _start_time[START_TIME];
+    bool first_block;
+    std::vector<char> _transpose;
+    std::vector<std::ofstream> _output_streams;
+};
+} // edd
+} // effelsberg
+} //namespace psrdada_cpp
+#endif //PSRDADA_CPP_EFFELSBERG_EDD_DADA_DISK_SINK_MULTITHREAD_HPP
diff --git a/psrdada_cpp/effelsberg/edd/src/dada_disk_sink_multithread.cpp b/psrdada_cpp/effelsberg/edd/src/dada_disk_sink_multithread.cpp
new file mode 100644
index 00000000..9d5765e4
--- /dev/null
+++ b/psrdada_cpp/effelsberg/edd/src/dada_disk_sink_multithread.cpp
@@ -0,0 +1,58 @@
+#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp"
+#include "ascii_header.h"
+#include <chrono>
+#include <iostream>
+#include <iomanip>
+using namespace std;
+using namespace std::chrono;
+namespace psrdada_cpp {
+namespace effelsberg {
+namespace edd {
+DiskSinkMultithread::DiskSinkMultithread(std::string prefix, int nthread)
+  : _prefix(prefix)
+  , _counter(0)
+  , _output_streams(nthread)
+  , _nthread(nthread)
+{
+}
+DiskSinkMultithread::~DiskSinkMultithread()
+{
+}
+void DiskSinkMultithread::init(RawBytes& block)
+{
+  for (auto& of : _output_streams) {
+    if (of.is_open()) {
+      of.close();
+    }
+  }
+  std::memcpy(&_header, block.ptr(), block.used_bytes());
+  ascii_header_get(_header, "UTC_START", "%s", _start_time);
+  BOOST_LOG_TRIVIAL(debug) << "UTC_START = " << _start_time;
+}
+bool DiskSinkMultithread::operator()(RawBytes& block)
+{
+  for (auto& of : _output_streams) {
+    if (of.is_open()) {
+      of.close();
+    }
+  }
+  #pragma omp parallel for num_threads(_nthread)
+  for (std::size_t ii = 0; ii < _nthread; ++ii) {
+    std::size_t index = ii * block.used_bytes()/_nthread;
+    char _loop_header[HEADER_SIZE];
+    std::memcpy(&_loop_header, &_header, HEADER_SIZE);
+    ascii_header_set(_loop_header, "OBS_OFFSET", "%ld", _counter + ii * block.used_bytes() / _nthread);
+    ascii_header_set(_loop_header, "FILE_SIZE", "%ld", block.used_bytes() / _nthread);
+    std::stringstream fname;
+    fname <<  _start_time << std::setw(20) << std::setfill('0') << _counter + ii * block.used_bytes() / _nthread << ".dada";
+    BOOST_LOG_TRIVIAL(debug) << "filename" << fname.str();
+    _output_streams[ii].open(fname.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
+    _output_streams[ii].write((char*) _loop_header, HEADER_SIZE);
+    _output_streams[ii].write(block.ptr()+index, block.used_bytes()/_nthread);
+  }
+  _counter +=  block.used_bytes();
+  return false;
+}
+}//edd
+}//effelsberg
+} //namespace psrdada_cpp
diff --git a/psrdada_cpp/effelsberg/edd/src/dbdisk_multithread.cpp b/psrdada_cpp/effelsberg/edd/src/dbdisk_multithread.cpp
new file mode 100644
index 00000000..6e39d57d
--- /dev/null
+++ b/psrdada_cpp/effelsberg/edd/src/dbdisk_multithread.cpp
@@ -0,0 +1,96 @@
+#include "psrdada_cpp/multilog.hpp"
+#include "psrdada_cpp/raw_bytes.hpp"
+#include "psrdada_cpp/dada_input_stream.hpp"
+#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_multithread.hpp"
+#include "psrdada_cpp/cli_utils.hpp"
+#include "psrdada_cpp/dada_write_client.hpp"
+#include "boost/program_options.hpp"
+
+#include <sys/types.h>
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <ios>
+#include <algorithm>
+
+using namespace psrdada_cpp;
+
+namespace
+{
+const size_t ERROR_IN_COMMAND_LINE = 1;
+const size_t SUCCESS = 0;
+const size_t ERROR_UNHANDLED_EXCEPTION = 2;
+} // namespace
+
+int main(int argc, char** argv)
+{
+    try
+    {
+        key_t key;
+        std::string prefix;
+        int nthread;
+        /** Define and parse the program options
+            */
+        namespace po = boost::program_options;
+        po::options_description desc("Options");
+        desc.add_options()
+        ("help,h", "Print help messages")
+        ("key,k", po::value<std::string>()
+         ->default_value("dadc")
+         ->notifier([&key](std::string in)
+        {
+            key = string_to_key(in);
+        }),
+        "The shared memory key for the dada buffer to connect to (hex string)")
+        ("prefix,p", po::value<std::string>(&prefix)
+         ->default_value("dbdisk_dump"),
+         "Prefix for the filename to write to")
+        ("nthread,n", po::value<int>(&nthread)
+         ->default_value(2),
+         "number of threads to write")
+        ("log_level", po::value<std::string>()
+         ->default_value("info")
+         ->notifier([](std::string level)
+        {
+            set_log_level(level);
+        }),
+        "The logging level to use (debug, info, warning, error)");
+        po::variables_map vm;
+        try
+        {
+            po::store(po::parse_command_line(argc, argv, desc), vm);
+            if ( vm.count("help")  )
+            {
+                std::cout << "DbDiskMultithread -- read from DADA ring buffer and write to disk with multiple thread" << std::endl
+                          << desc << std::endl;
+                return SUCCESS;
+            }
+            po::notify(vm);
+        }
+        catch (po::error& e)
+        {
+            std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
+            std::cerr << desc << std::endl;
+            return ERROR_IN_COMMAND_LINE;
+        }
+
+        /**
+         * All the application code goes here
+         */
+        MultiLog log("dbdixk");
+        effelsberg::edd::DiskSinkMultithread sink(prefix, nthread);
+        DadaInputStream<decltype(sink)> stream(key, log, sink);
+        stream.start();
+        /**
+         * End of application code
+         */
+    }
+    catch (std::exception& e)
+    {
+        std::cerr << "Unhandled Exception reached the top of main: "
+                  << e.what() << ", application will now exit" << std::endl;
+        return ERROR_UNHANDLED_EXCEPTION;
+    }
+    return SUCCESS;
+
+}
-- 
GitLab