From c6976d79ff4cce14496071a32e87ce3cc8a95b1b Mon Sep 17 00:00:00 2001 From: nesser <nesser@mpifr-bonn.mpg.de> Date: Wed, 6 Sep 2023 12:21:00 +0000 Subject: [PATCH] Added zerodb CLI - acquires and releases DADA ring buffer without copies --- .gitignore | 1 + psrdada_cpp/CMakeLists.txt | 6 +- psrdada_cpp/examples/zerodb.cpp | 140 ++++++++++++++++++++++++++++++++ 3 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 psrdada_cpp/examples/zerodb.cpp diff --git a/.gitignore b/.gitignore index 1a2f1cc6..f3f96b0c 100644 --- a/.gitignore +++ b/.gitignore @@ -33,6 +33,7 @@ # Directories build/ +.vscode/ # Editor files *~ diff --git a/psrdada_cpp/CMakeLists.txt b/psrdada_cpp/CMakeLists.txt index 083bac8d..0968c043 100644 --- a/psrdada_cpp/CMakeLists.txt +++ b/psrdada_cpp/CMakeLists.txt @@ -89,7 +89,11 @@ target_link_libraries (diskdb ${PSRDADA_CPP_LIBRARIES}) add_executable(dbsplit examples/dbsplit.cpp) target_link_libraries (dbsplit ${PSRDADA_CPP_LIBRARIES}) -install (TARGETS junkdb dbnull syncdb dbreset fbfuse_output_db diskdb dbsplit DESTINATION bin) +#zerodb +add_executable(zerodb examples/zerodb.cpp) +target_link_libraries (zerodb ${PSRDADA_CPP_LIBRARIES}) + +install (TARGETS junkdb dbnull syncdb dbreset fbfuse_output_db diskdb dbsplit zerodb DESTINATION bin) install (TARGETS ${CMAKE_PROJECT_NAME} RUNTIME DESTINATION bin LIBRARY DESTINATION lib diff --git a/psrdada_cpp/examples/zerodb.cpp b/psrdada_cpp/examples/zerodb.cpp new file mode 100644 index 00000000..0ae8b823 --- /dev/null +++ b/psrdada_cpp/examples/zerodb.cpp @@ -0,0 +1,140 @@ +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/dada_write_client.hpp" +#include "psrdada_cpp/cli_utils.hpp" + +#include "boost/program_options.hpp" + +#include <sys/types.h> +#include <iostream> +#include <string> +#include <ios> +#include <vector> +#include <fstream> +#include <chrono> +#include <thread> + +#define NANO 1000000000 + +using namespace psrdada_cpp; + +namespace +{ + const size_t ERROR_IN_COMMAND_LINE = 1; + const size_t SUCCESS = 0; + const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + +namespace psrdada_cpp +{ + template <class Handler> + void zerodb(Handler& writer, + std::string header_file, + float duration, + float sample_rate) + { + std::size_t header_size = writer.header_buffer_size(); + std::size_t block_size = writer.data_buffer_size(); + std::vector<char> header(header_size); + std::ifstream hinput; + // Read header + hinput.open(header_file, std::ios::in | std::ios::binary); + if(!hinput.is_open()){ + std::cerr << "File " << header_file << " not found!" << std::endl; + return; + } + hinput.read(header.data(), header_size); + RawBytes hdr(header.data(), header_size, header_size, false); + // Copy header file to header buffer + auto& hdr_buf = writer.header_stream().next(); + memcpy(hdr_buf.ptr(), hdr.ptr(), hdr.used_bytes()); + hdr_buf.used_bytes(hdr.used_bytes()); + writer.header_stream().release(); + + double sample_step = 1/sample_rate; + std::chrono::time_point<std::chrono::system_clock> begin; + std::chrono::time_point<std::chrono::system_clock> start; + std::chrono::duration<float> time_duration(duration); + std::chrono::duration<double, std::ratio<1,NANO>> block_step_ns = std::chrono::duration<double>(sample_step * block_size); + begin = std::chrono::system_clock::now(); + start = begin; + + BOOST_LOG_TRIVIAL(info) << "Running for " << time_duration.count() << " s"; + while(time_duration > start - begin) + { + start = std::chrono::system_clock::now(); + auto& no_use = writer.data_stream().next(); + writer.data_stream().release(); + auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start); + auto sleep_time = std::chrono::duration_cast<std::chrono::nanoseconds>(block_step_ns - elapsed); + std::this_thread::sleep_for(sleep_time); + auto run_time = start - begin; + BOOST_LOG_TRIVIAL(debug) << "Released " << block_size << " bytes in " << block_step_ns.count()/NANO + << " s (" << block_size / (block_step_ns.count()/NANO) / (1000000) << "(MB/s))."; + } + } +} + +int main(int argc, char** argv) +{ + try + { + key_t key; + std::string header_file; + float duration; + float sample_rate; + /** Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + ("help,h", "Print help messages") + ("header_file,f", po::value<std::string>(&header_file)->required(), "Header file to write header block") + ("rate,s", po::value<float>(&sample_rate)->default_value(64000000), "Data rate in bytes. If set to 0, it reads as fast as can") + ("duration,t", po::value<float>(&duration)->default_value(60), "Seconds to run the process") + ("key,k", po::value<std::string>()->default_value("dada")->notifier([&key](std::string in){key = string_to_key(in);}), + "The shared memory key for the dada buffer to connect to (hex string)") + ("log_level", po::value<std::string>()->default_value("info")->notifier([](std::string level){set_log_level(level);}),"The logging level to use (debug, info, warning, error)"); + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "zerodb -- acquires and releases DADA ring buffer without copies" << std::endl + << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch(po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + /** + * All the application code goes here + */ + + MultiLog log("zerodb"); + DadaWriteClient writer(key, log); + zerodb<decltype(writer)>( + writer, + header_file, + duration, + sample_rate); + /** + * End of application code + */ + } + catch(std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; + +} -- GitLab