diff --git a/cmake/boost.cmake b/cmake/boost.cmake index 7393624b4689f30239a3ecccea388f4924cdb0a9..2c6be676ee024694d09cc693a7b1fdd412c6359e 100644 --- a/cmake/boost.cmake +++ b/cmake/boost.cmake @@ -11,4 +11,4 @@ if(BOOST_ASIO_DEBUG) add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING) else(BOOST_ASIO_DEBUG) set(BOOST_ASIO_DEBUG false) -endif(BOOST_ASIO_DEBUG) \ No newline at end of file +endif(BOOST_ASIO_DEBUG) diff --git a/cmake/compiler_settings.cmake b/cmake/compiler_settings.cmake index 1db13939c9eaa6316c60d95217a6865fdf186c29..3ce03c95ed0346b6159601536c88c108aac67915 100644 --- a/cmake/compiler_settings.cmake +++ b/cmake/compiler_settings.cmake @@ -31,9 +31,9 @@ endif () set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG -Wno-unused-local-typedefs") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g -Wall -Wextra -pedantic -Wno-unused-local-typedefs") -set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g -Wall -Wextra -pedantic -Wno-unused-local-typedefs") +set(CMAKE_CXX_FLAGS_DEBUG "-O3 -g -pg -Wall -Wextra -pedantic -Wno-unused-local-typedefs") # Set include directories for dependencies include_directories( ${Boost_INCLUDE_DIRS} -) \ No newline at end of file +) diff --git a/docker/Dockerfile b/docker/Dockerfile index 6702f45bfb283d777bbe8a2348ef478915b5b706..99fb43c0142169733e5600f9cbd84d9d13973e35 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -75,7 +75,7 @@ ENV PACKAGES $PSRDADA_BUILD WORKDIR $PSRHOME RUN git clone https://github.com/ewanbarr/psrdada_cpp.git && \ cd psrdada_cpp/ &&\ - git checkout fbfuse &&\ + git checkout transpose &&\ git submodule init &&\ git submodule update &&\ mkdir build/ &&\ diff --git a/psrdada_cpp/CMakeLists.txt b/psrdada_cpp/CMakeLists.txt index 50c243990e9eeab27c26399c58096cd8aa2229e8..cbc70777963fd234b810eb4716e41a9ecaff4943 100644 --- a/psrdada_cpp/CMakeLists.txt +++ b/psrdada_cpp/CMakeLists.txt @@ -17,19 +17,27 @@ set(psrdada_cpp_src src/multilog.cpp src/raw_bytes.cpp src/simple_file_writer.cpp + src/sigprocheader.cpp + src/psrdadaheader.cpp ) set(psrdada_cpp_inc cli_utils.hpp dada_client_base.hpp - dada_io_loop.hpp - dada_io_loop_writer.hpp + dada_input_stream.hpp + dada_output_stream.hpp dada_read_client.hpp + dada_junk_source.hpp + dada_null_sink.hpp multilog.hpp common.hpp - dada_io_loop_reader.hpp dada_write_client.hpp raw_bytes.hpp + cuda_utils.hpp + simple_file_writer.hpp + sigprocheader.hpp + psrdadaheader.hpp + psrdada_to_sigproc_header.hpp ) # -- the main library target @@ -39,6 +47,14 @@ add_library(${CMAKE_PROJECT_NAME} ${psrdada_cpp_src}) add_executable(junkdb examples/junkdb.cpp) target_link_libraries (junkdb ${PSRDADA_CPP_LIBRARIES}) +#syncdb +add_executable(syncdb examples/syncdb.cpp) +target_link_libraries (syncdb ${PSRDADA_CPP_LIBRARIES}) + +#fbfuse_output_db +add_executable(fbfuse_output_db examples/fbfuse_output_db.cpp) +target_link_libraries (fbfuse_output_db ${PSRDADA_CPP_LIBRARIES}) + #dbnull add_executable(dbnull examples/dbnull.cpp) target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES}) @@ -47,9 +63,14 @@ target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES}) add_executable(dbdisk examples/dbdisk.cpp) target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES}) -install (TARGETS junkdb dbnull dbdisk DESTINATION bin) - #include subdirs add_subdirectory(meerkat) add_subdirectory(effelsberg) +install (TARGETS junkdb dbnull dbdisk syncdb fbfuse_output_db DESTINATION bin) +install (TARGETS ${CMAKE_PROJECT_NAME} + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib) +install(FILES ${psrdada_cpp_inc} DESTINATION include/psrdada_cpp) +add_subdirectory(meerkat) diff --git a/psrdada_cpp/dada_input_stream.hpp b/psrdada_cpp/dada_input_stream.hpp index 85084acdb2ca6c16d09eb3dcb6557578f5aab722..176e43408181cecbc753444ab67a79f6742315aa 100644 --- a/psrdada_cpp/dada_input_stream.hpp +++ b/psrdada_cpp/dada_input_stream.hpp @@ -28,4 +28,4 @@ namespace psrdada_cpp #include "psrdada_cpp/detail/dada_input_stream.cpp" -#endif //PSRDADA_CPP_DADA_INPUT_STREAM_HPP \ No newline at end of file +#endif //PSRDADA_CPP_DADA_INPUT_STREAM_HPP diff --git a/psrdada_cpp/dada_junk_source.hpp b/psrdada_cpp/dada_junk_source.hpp index 13eaaf82129448e9b0ead9311b76458e596ccdbf..931a14f49610fe798c72cc8a72af5ed9d5f3a374 100644 --- a/psrdada_cpp/dada_junk_source.hpp +++ b/psrdada_cpp/dada_junk_source.hpp @@ -4,6 +4,8 @@ #include "psrdada_cpp/raw_bytes.hpp" #include "psrdada_cpp/common.hpp" #include <vector> +#include <cstdlib> +#include <fstream> namespace psrdada_cpp { @@ -11,13 +13,23 @@ namespace psrdada_cpp void junk_source(Handler& handler, std::size_t header_size, std::size_t nbytes_per_write, - std::size_t total_bytes) + std::size_t total_bytes, + std::string header_file) { std::vector<char> junk_header(header_size); - std::vector<char> junk_block(nbytes_per_write); + std::ifstream input; + input.open(header_file,std::ios::in | std::ios::binary); + input.read(junk_header.data(),header_size); RawBytes header(junk_header.data(),header_size,header_size,false); handler.init(header); std::size_t bytes_written = 0; + std::vector<char> junk_block(total_bytes,0); + srand(0); + for (std::uint32_t ii=0; ii < total_bytes; ++ii) + { + junk_block[ii] = rand()%255 + 1; + } + while (bytes_written < total_bytes) { RawBytes data(junk_block.data(),nbytes_per_write,nbytes_per_write,false); @@ -26,4 +38,4 @@ namespace psrdada_cpp } } } //namespace psrdada_cpp -#endif //PSRDADA_CPP_DADA_JUNK_SOURCE_HPP \ No newline at end of file +#endif //PSRDADA_CPP_DADA_JUNK_SOURCE_HPP diff --git a/psrdada_cpp/dada_sync_source.hpp b/psrdada_cpp/dada_sync_source.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ed4f110adf72329562961c00f41f647ee393d49b --- /dev/null +++ b/psrdada_cpp/dada_sync_source.hpp @@ -0,0 +1,71 @@ +#ifndef PSRDADA_CPP_DADA_SYNC_SOURCE_HPP +#define PSRDADA_CPP_DADA_SYNC_SOURCE_HPP + +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/common.hpp" +#include "ascii_header.h" +#include <vector> +#include <chrono> +#include <iomanip> // std::put_time +#include <thread> // std::this_thread::sleep_until +#include <chrono> // std::chrono::system_clock +#include <ctime> +#include <cmath> + +namespace psrdada_cpp +{ + template <class Handler> + void sync_source(Handler& handler, + std::size_t header_size, + std::size_t nbytes_per_write, + std::size_t total_bytes, + std::time_t sync_epoch, + double block_duration, + std::size_t ts_per_block) + { + std::vector<char> sync_header(header_size, 0); + std::vector<char> sync_block(nbytes_per_write, 1); + RawBytes header(sync_header.data(), header_size, header_size, false); + RawBytes data(sync_block.data(), nbytes_per_write, nbytes_per_write, false); + + std::size_t sample_clock_start; + auto sync_epoch_tp = std::chrono::system_clock::from_time_t(sync_epoch); + auto curr_epoch_tp = std::chrono::system_clock::now(); + auto diff = std::chrono::duration_cast<std::chrono::microseconds>(curr_epoch_tp - sync_epoch_tp).count(); + + std::size_t next_block_idx; + + if (sync_epoch_tp > curr_epoch_tp) + { + BOOST_LOG_TRIVIAL(info) << "The sync epoch is " << static_cast<float>(diff)/1e6 + << " seconds in the future"; + next_block_idx = 0; + sample_clock_start = 0; + } + else + { + next_block_idx = static_cast<std::size_t>((std::ceil(static_cast<double>(diff)/1e6) / block_duration)); + sample_clock_start = next_block_idx * ts_per_block; + } + + BOOST_LOG_TRIVIAL(info) << "Setting SAMPLE_CLOCK_START to " << sample_clock_start; + ascii_header_set(sync_header.data(), "SAMPLE_CLOCK_START", "%ld", sample_clock_start); + handler.init(header); + std::size_t bytes_written = 0; + + bool infinite = (total_bytes == 0); + while (true) + { + auto epoch_of_wait = sync_epoch_tp + std::chrono::duration<double>(next_block_idx * block_duration); + std::this_thread::sleep_until(epoch_of_wait); + handler(data); + bytes_written += data.used_bytes(); + ++next_block_idx; + if (!infinite && bytes_written >= total_bytes) + { + break; + } + } + } +} //namespace psrdada_cpp +#endif //PSRDADA_CPP_DADA_SYNC_SOURCE_HPP diff --git a/psrdada_cpp/dada_write_client.hpp b/psrdada_cpp/dada_write_client.hpp index f8c75a325de97392978bfc703e886e44d8a6c91b..23a47fa3280b640a47fa3043b19bd4278e903d45 100644 --- a/psrdada_cpp/dada_write_client.hpp +++ b/psrdada_cpp/dada_write_client.hpp @@ -132,4 +132,4 @@ namespace psrdada_cpp { } //namespace psrdada_cpp -#endif //PSRDADA_CPP_DADA_WRITE_CLIENT_HPP \ No newline at end of file +#endif //PSRDADA_CPP_DADA_WRITE_CLIENT_HPP diff --git a/psrdada_cpp/detail/dada_input_stream.cpp b/psrdada_cpp/detail/dada_input_stream.cpp index f55f7252134c59d0cfb5d65ed163b2e3ff16cf61..786145f0a4725b16736aec3dd1cd8d4b448ce251 100644 --- a/psrdada_cpp/detail/dada_input_stream.cpp +++ b/psrdada_cpp/detail/dada_input_stream.cpp @@ -55,4 +55,4 @@ namespace psrdada_cpp { _stop = true; } -} //psrdada_cpp \ No newline at end of file +} //psrdada_cpp diff --git a/psrdada_cpp/detail/psrdada_to_sigproc_header.cpp b/psrdada_cpp/detail/psrdada_to_sigproc_header.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ab6cf4111c507dc9d519afa1c7d2bbd3686f5f05 --- /dev/null +++ b/psrdada_cpp/detail/psrdada_to_sigproc_header.cpp @@ -0,0 +1,40 @@ +#include "psrdada_cpp/psrdada_to_sigproc_header.hpp" +#include "psrdada_cpp/psrdadaheader.hpp" +#include "psrdada_cpp/sigprocheader.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include <thread> + +namespace psrdada_cpp { + + template <class HandlerType> + PsrDadaToSigprocHeader<HandlerType>::PsrDadaToSigprocHeader(HandlerType& handler) + : _handler(handler) + { + } + + template <class HandlerType> + PsrDadaToSigprocHeader<HandlerType>::~PsrDadaToSigprocHeader() + { + } + + template <class HandlerType> + void PsrDadaToSigprocHeader<HandlerType>::init(RawBytes& block) + { + SigprocHeader h; + PsrDadaHeader ph; + ph.from_bytes(block); + std::memset(block.ptr(), 0, block.total_bytes()); + h.write_header(block,ph); + block.used_bytes(block.total_bytes()); + _handler.init(block); + } + + template <class HandlerType> + bool PsrDadaToSigprocHeader<HandlerType>::operator()(RawBytes& block) + { + _handler(block); + return false; + } + + +} //psrdada_cpp diff --git a/psrdada_cpp/detail/sigprocheader.cpp b/psrdada_cpp/detail/sigprocheader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..65173ddd9dccc1b3021ec6233a1e0225dc9f1184 --- /dev/null +++ b/psrdada_cpp/detail/sigprocheader.cpp @@ -0,0 +1,17 @@ +#include "psrdada_cpp/sigprocheader.hpp" +#include "psrdada_cpp/psrdadaheader.hpp" +#include <string> + + +namespace psrdada_cpp +{ + + template<typename NumericT> + void SigprocHeader::header_write(char*& ptr, std::string const& name, NumericT val) + { + header_write(ptr,name); + std::memcpy(ptr,(char*)&val,sizeof(val)); + ptr += sizeof(val); + } + +} // namespace psrdada_cpp diff --git a/psrdada_cpp/examples/Transpose_test.cpp b/psrdada_cpp/examples/Transpose_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e710980c7fc0ee16be0668689cff454671e9286e --- /dev/null +++ b/psrdada_cpp/examples/Transpose_test.cpp @@ -0,0 +1,94 @@ +#include "psrdada_cpp/transpose_client.hpp" +#include<boost/process.hpp> + + +{ //namespace psrdada_cpp + +namespace bp = boost::process; +int main() +{ + + // Make a large dadabuffer + { + std::string key = "dada"; + boost::filesystem::path p = "/home/krajwade/libpsrdada/bin/dada_db"; + bp::child c0(p, "-k", key); + co.wait(); + //Write Data into a dada buffer + { + std::vector<unsigned char> input_data(64*128*64*32*6); + std::string logname = "Logger"; + Multilog log(logname); + DadaWriteClient writer0(string_to_key(key),log); + in = input_data.data(); + auto stream = writer0.data_stream(); + auto out = stream.next(); + std::memcpy(out.ptr(),in,input_data.size()); + out.used_bytes(input_data.size()); + stream.release(); + writer0::~DadaWriteClient(); + std::string* keys[]; + *(keys) = "dadc"; + *(keys + 1) = "dade" + *(keys + 2) = "dae0" + *(keys + 3) = "dae2" + *(keys + 4) = "dae4" + *(keys + 5) = "dae6" + + // Running the psrdada to create 6 buffers + bp::child c1(p, "-k", *keys[0]); + bp::child c2(p, "-k", *keys[1]); + bp::child c3(p, "-k", *keys[2]); + bp::child c4(p, "-k", *keys[3]); + bp::child c5(p, "-k", *keys[4]); + bp::child c6(p, "-k", *keys[5]); + + c1.wait(); + c2.wait(); + c3.wait(); + c4.wait(); + c5.wait(); + c6.wait(); + + // Read to transpose + RawBytes& input; + RawBytes& transposed, transposed1,transposed2,transposed3,transposed4,transposed5; + DadaReadClient reader(,log); + DadaWriteClient* writer1[]; + TransposeClient transpose(reader,writer1[],keys,6); + input = transpose.read_to_transpose(); + + // Do the transpose for all beams + transposed = transpose.do_transpose(input,0); + transposed1 = transpose.do_transpose(input,1); + transposed2 = transpose.do_transpose(input,2); + transposed3 = transpose.do_transpose(input,3); + transposed4 = transpose.do_transpose(input,4); + transposed5 = transpose.do_transpose(input,5); + + + // Writing out each beam to 6 dada buffers as 6 threads + + + std::thread t1(transpose.write_transpose(),std::ref(transposed),std::ref(*writer1[0])); + std::thread t2(transpose.write_transpose(),std::ref(transposed1),std::ref(*writer1[1])); + std::thread t3(transpose.write_transpose(),std::ref(transposed2),std::ref(*writer1[2])); + std::thread t4(transpose.write_transpose(),std::ref(transposed3),std::ref(*writer1[3])); + std::thread t5(transpose.write_transpose(),std::ref(transposed4),std::ref(*writer1[4])); + std::thread t6(transpose.write_transpose(),std::ref(transposed5),std::ref(*writer1[5])); + + t1.join(); + t2.join(); + t3.join(); + t4.join(); + t5.join(); + t6.join(); + + } + + // Destroy the buffers + } + +} + +} diff --git a/psrdada_cpp/examples/fbfuse_output_db.cpp b/psrdada_cpp/examples/fbfuse_output_db.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a57b8f77f741a2254b0de78a6aae220a572a3fae --- /dev/null +++ b/psrdada_cpp/examples/fbfuse_output_db.cpp @@ -0,0 +1,207 @@ +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/dada_output_stream.hpp" +#include "psrdada_cpp/dada_sync_source.hpp" +#include "psrdada_cpp/cli_utils.hpp" + +#include "boost/program_options.hpp" + +#include <sys/types.h> +#include <iostream> +#include <string> +#include <sstream> +#include <ios> +#include <algorithm> + +using namespace psrdada_cpp; + +namespace +{ + const size_t ERROR_IN_COMMAND_LINE = 1; + const size_t SUCCESS = 0; + const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + +template<class HandlerType> +class FakeBeams +{ + +public: + FakeBeams(HandlerType& handler, std::size_t size); + ~FakeBeams(); + + void init(RawBytes& block); + + bool operator()(RawBytes& block); + + void beam_num_start(std::uint8_t beam_num_start); + + void beam_num_end(std::uint8_t beam_num_end); + +private: + HandlerType& _handler; + std::uint8_t _beam_num; + std::uint8_t _beam_num_start; + std::uint8_t _beam_num_end; + std::size_t _size; +}; + +template<class HandlerType> +FakeBeams<HandlerType>::FakeBeams(HandlerType& handler, std::size_t size): +_handler(handler), +_beam_num(1), +_beam_num_start(1), +_beam_num_end(6), +_size(size) +{ +} + +template<class HandlerType> +FakeBeams<HandlerType>::~FakeBeams() +{ +} + +template<class HandlerType> +void FakeBeams<HandlerType>::init(RawBytes& block) +{ + _handler.init(block); +} + +template<class HandlerType> +void FakeBeams<HandlerType>::beam_num_start(std::uint8_t beam_num_start) +{ + _beam_num_start = beam_num_start; +} + +template<class HandlerType> +void FakeBeams<HandlerType>::beam_num_end(std::uint8_t beam_num_end) +{ + _beam_num_end = beam_num_end; +} + +template<class HandlerType> +bool FakeBeams<HandlerType>::operator()(RawBytes& block) +{ + _beam_num = _beam_num_start; + char* ptr = block.ptr(); + std::size_t bytes_written = 0 ; + while (bytes_written <= block.total_bytes()) + { + std::vector<char> beam_data(_size,_beam_num); + std::copy(beam_data.begin(),beam_data.end(),ptr); + ptr += _size; + bytes_written = bytes_written + _size; + ++_beam_num; + if (_beam_num > _beam_num_end) + _beam_num = _beam_num_start; + } + block.used_bytes(block.total_bytes()); + _handler(block); + return false; +} + + + +int main(int argc, char** argv) +{ + try + { + std::size_t nbytes = 0; + key_t key; + std::time_t sync_epoch; + double period; + std::size_t ts_per_block; + std::uint8_t beam_num_start; + std::uint8_t beam_num_end; + std::size_t write_size; + /** Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + ("help,h", "Print help messages") + ("nbytes,n", po::value<std::size_t>(&nbytes) + ->default_value(0), + "Total number of bytes to write") + ("key,k", po::value<std::string>() + ->default_value("dada") + ->notifier([&key](std::string in) + { + key = string_to_key(in); + }), + "The shared memory key for the dada buffer to connect to (hex string)") + ("sync_epoch,s", po::value<std::size_t>() + ->default_value(0) + ->notifier([&sync_epoch](std::size_t in) + { + sync_epoch = static_cast<std::time_t>(in); + }), + "The global sync time for all producing instances") + ("period,p", po::value<double>(&period) + ->default_value(1.0), + "The period (in seconds) at which dada blocks are produced") + ("ts_per_block,t", po::value<std::size_t>(&ts_per_block) + ->default_value(8192*128), + "The increment in timestamp between consecutive blocks") + ("beam_start,b", po::value<std::uint8_t>(&beam_num_start) + ->default_value(1), + "Starting beam id for the heap") + ("beam_end,e", po::value<std::uint8_t>(&beam_num_end) + ->default_value(1), + "Last beam id for the heap") + ("write_size, w", po::value<std::size_t>(&write_size) + ->default_value(0), + "bytes to write per cycle. Should be equal to the heap size.") + ("log_level", po::value<std::string>() + ->default_value("info") + ->notifier([](std::string level) + { + set_log_level(level); + }), + "The logging level to use (debug, info, warning, error)"); + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "SyncDB -- write 1 into a DADA ring buffer at a synchronised and fixed rate" << std::endl + << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch(po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + /** + * All the application code goes here + */ + + MultiLog log("syncdb"); + DadaOutputStream out_stream(key, log); + FakeBeams<decltype(out_stream)> fakebeams(out_stream, write_size); + fakebeams.beam_num_start(beam_num_start); + fakebeams.beam_num_end(beam_num_end); + sync_source<decltype(fakebeams)>( + fakebeams, out_stream.client().header_buffer_size(), + out_stream.client().data_buffer_size(), nbytes, + sync_epoch, period, ts_per_block); + + /** + * End of application code + */ + } + catch(std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; + +} diff --git a/psrdada_cpp/examples/junkdb.cpp b/psrdada_cpp/examples/junkdb.cpp index 5a4faba64c80c4a44a9535a63c8a615f47b4ab03..1a38b04d07a028344ed7ff8c4d50619ef1e64670 100644 --- a/psrdada_cpp/examples/junkdb.cpp +++ b/psrdada_cpp/examples/junkdb.cpp @@ -28,6 +28,7 @@ int main(int argc, char** argv) { std::size_t nbytes = 0; key_t key; + std::string header_file; /** Define and parse the program options */ namespace po = boost::program_options; @@ -36,7 +37,9 @@ int main(int argc, char** argv) ("help,h", "Print help messages") ("nbytes,n", po::value<std::size_t>(&nbytes) ->default_value(0), - "Total number of bytes to write") + "Total number of bytes to write") + ("header_file,f", po::value<std::string>(&header_file), + "Header file to write header block") ("key,k", po::value<std::string>() ->default_value("dada") ->notifier([&key](std::string in) @@ -78,7 +81,7 @@ int main(int argc, char** argv) DadaOutputStream out_stream(key, log); junk_source<decltype(out_stream)>( out_stream, out_stream.client().header_buffer_size(), - out_stream.client().data_buffer_size(), nbytes); + out_stream.client().data_buffer_size(), nbytes, header_file); /** * End of application code @@ -92,4 +95,4 @@ int main(int argc, char** argv) } return SUCCESS; -} \ No newline at end of file +} diff --git a/psrdada_cpp/examples/syncdb.cpp b/psrdada_cpp/examples/syncdb.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f80bcf7f062153d1325234dd67b54d9cf3479403 --- /dev/null +++ b/psrdada_cpp/examples/syncdb.cpp @@ -0,0 +1,112 @@ +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/dada_output_stream.hpp" +#include "psrdada_cpp/dada_sync_source.hpp" +#include "psrdada_cpp/cli_utils.hpp" + +#include "boost/program_options.hpp" + +#include <sys/types.h> +#include <iostream> +#include <string> +#include <sstream> +#include <ios> +#include <algorithm> + +using namespace psrdada_cpp; + +namespace +{ + const size_t ERROR_IN_COMMAND_LINE = 1; + const size_t SUCCESS = 0; + const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + +int main(int argc, char** argv) +{ + try + { + std::size_t nbytes = 0; + key_t key; + std::time_t sync_epoch; + double period; + std::size_t ts_per_block; + /** Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + ("help,h", "Print help messages") + ("nbytes,n", po::value<std::size_t>(&nbytes) + ->default_value(0), + "Total number of bytes to write") + ("key,k", po::value<std::string>() + ->default_value("dada") + ->notifier([&key](std::string in) + { + key = string_to_key(in); + }), + "The shared memory key for the dada buffer to connect to (hex string)") + ("sync_epoch,s", po::value<std::size_t>() + ->default_value(0) + ->notifier([&sync_epoch](std::size_t in) + { + sync_epoch = static_cast<std::time_t>(in); + }), + "The global sync time for all producing instances") + ("period,p", po::value<double>(&period) + ->default_value(1.0), + "The period (in seconds) at which dada blocks are produced") + ("ts_per_block,t", po::value<std::size_t>(&ts_per_block) + ->default_value(8192*128), + "The increment in timestamp between consecutive blocks") + ("log_level", po::value<std::string>() + ->default_value("info") + ->notifier([](std::string level) + { + set_log_level(level); + }), + "The logging level to use (debug, info, warning, error)"); + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "SyncDB -- write 1 into a DADA ring buffer at a synchronised and fixed rate" << std::endl + << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch(po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + /** + * All the application code goes here + */ + + MultiLog log("syncdb"); + DadaOutputStream out_stream(key, log); + sync_source<decltype(out_stream)>( + out_stream, out_stream.client().header_buffer_size(), + out_stream.client().data_buffer_size(), nbytes, + sync_epoch, period, ts_per_block); + + /** + * End of application code + */ + } + catch(std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; + +} diff --git a/psrdada_cpp/meerkat/CMakeLists.txt b/psrdada_cpp/meerkat/CMakeLists.txt index 835f938670bdc04521f877c8229bf9e52948dac7..634a3515ef26bb177eac3877beb4084347bf74ec 100644 --- a/psrdada_cpp/meerkat/CMakeLists.txt +++ b/psrdada_cpp/meerkat/CMakeLists.txt @@ -1,2 +1,3 @@ #add_subdirectory(tools) add_subdirectory(fbfuse) +add_subdirectory(tuse) diff --git a/psrdada_cpp/meerkat/tools/detail/feng_to_dada.cu b/psrdada_cpp/meerkat/tools/detail/feng_to_dada.cu index a0ca612d2db7630c682d1107ece406b0ff526ce8..a5d653e4e2f81f9cb33e3f4823e704d329e7430c 100644 --- a/psrdada_cpp/meerkat/tools/detail/feng_to_dada.cu +++ b/psrdada_cpp/meerkat/tools/detail/feng_to_dada.cu @@ -53,4 +53,4 @@ namespace tools { } //tools } //meerkat -} //psrdada_cpp \ No newline at end of file +} //psrdada_cpp diff --git a/psrdada_cpp/meerkat/tools/feng_to_dada.cuh b/psrdada_cpp/meerkat/tools/feng_to_dada.cuh index 051cf4b21b07ae36ac0025442e1372b657582454..6e1a091b8abc98f595811025f0edf40460796874 100644 --- a/psrdada_cpp/meerkat/tools/feng_to_dada.cuh +++ b/psrdada_cpp/meerkat/tools/feng_to_dada.cuh @@ -62,4 +62,4 @@ private: } //psrdada_cpp #include "psrdada_cpp/meerkat/tools/detail/feng_to_dada.cu" -#endif //PSRDADA_CPP_MEERKAT_TOOLS_FENG_TO_DADA_HPP \ No newline at end of file +#endif //PSRDADA_CPP_MEERKAT_TOOLS_FENG_TO_DADA_HPP diff --git a/psrdada_cpp/meerkat/tools/src/feng_to_bandpass_cli.cu b/psrdada_cpp/meerkat/tools/src/feng_to_bandpass_cli.cu index c6fd8c4a2a1ce89e0fe117df69a30c2d41f694b5..51602cd0aeebcb8c5422b969f3cb89b15fb346c1 100644 --- a/psrdada_cpp/meerkat/tools/src/feng_to_bandpass_cli.cu +++ b/psrdada_cpp/meerkat/tools/src/feng_to_bandpass_cli.cu @@ -98,4 +98,4 @@ int main(int argc, char** argv) } return SUCCESS; -} \ No newline at end of file +} diff --git a/psrdada_cpp/meerkat/tools/src/feng_to_dada.cu b/psrdada_cpp/meerkat/tools/src/feng_to_dada.cu index 38534af7d4f60873ff9c53716f32cbd9a444bfcc..d5d7beff9e1037c9119b69e168a6fee206f2ead2 100644 --- a/psrdada_cpp/meerkat/tools/src/feng_to_dada.cu +++ b/psrdada_cpp/meerkat/tools/src/feng_to_dada.cu @@ -87,4 +87,4 @@ namespace kernels { } //kernels } //tools } //meerkat -} //psrdada_cpp \ No newline at end of file +} //psrdada_cpp diff --git a/psrdada_cpp/meerkat/tools/src/feng_to_dada_cli.cu b/psrdada_cpp/meerkat/tools/src/feng_to_dada_cli.cu index 26300fff8af351d4fd18e2677c248040db4f1f94..eec9ebfe41fe613180ecbfd5217c15fa214ade8a 100644 --- a/psrdada_cpp/meerkat/tools/src/feng_to_dada_cli.cu +++ b/psrdada_cpp/meerkat/tools/src/feng_to_dada_cli.cu @@ -119,4 +119,4 @@ int main(int argc, char** argv) } return SUCCESS; -} \ No newline at end of file +} diff --git a/psrdada_cpp/meerkat/tuse/CMakeLists.txt b/psrdada_cpp/meerkat/tuse/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..e36cbbdcf82e2bbd0ea63ca209136a21a4fdcb20 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/CMakeLists.txt @@ -0,0 +1,25 @@ +set(PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES + ${CMAKE_PROJECT_NAME} + ${CMAKE_PROJECT_NAME}_meerkat_tuse + ${DEPENDENCY_LIBRARIES}) + +set(psrdada_cpp_meerkat_tuse_src + src/transpose_to_dada.cpp + ) + +add_library(${CMAKE_PROJECT_NAME}_meerkat_tuse ${psrdada_cpp_meerkat_tuse_src}) + +#transpose_to_dada_cli +add_executable(transpose_to_dada_cli src/transpose_to_dada_cli.cpp) +target_link_libraries (transpose_to_dada_cli ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES}) + +#transpose_to_files_cli +add_executable(transpose_to_file_cli src/transpose_to_file_cli.cpp) +target_link_libraries (transpose_to_file_cli ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES}) + +#transpose_to_null_cli +add_executable(transpose_to_null_cli src/transpose_to_null_cli.cpp) +target_link_libraries (transpose_to_null_cli ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES}) +install (TARGETS transpose_to_null_cli transpose_to_file_cli transpose_to_dada_cli DESTINATION bin) +install(FILES transpose_to_dada.hpp DESTINATION include/psrdada_cpp/meerkat/tuse) +add_subdirectory(test) diff --git a/psrdada_cpp/meerkat/tuse/detail/transpose_to_dada.cpp b/psrdada_cpp/meerkat/tuse/detail/transpose_to_dada.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0f61bfb49df5f342c827b0d1edd5327493aca2a2 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/detail/transpose_to_dada.cpp @@ -0,0 +1,135 @@ +#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include <ctime> +#include <thread> +namespace psrdada_cpp { +namespace meerkat { +namespace tuse { + + template <class HandlerType> + TransposeToDada<HandlerType>::TransposeToDada(std::size_t numbeams, std::vector<std::shared_ptr<HandlerType>> handler) + : _numbeams(numbeams) + , _handler(std::move(handler)) + , _nchans(128) + , _nsamples(64) + , _nfreq(32) + , _ngroups(10) + { + } + + template <class HandlerType> + TransposeToDada<HandlerType>::~TransposeToDada() + { + } + + template <class HandlerType> + void TransposeToDada<HandlerType>::init(RawBytes& block) + { + std::uint32_t ii; + for (ii = 0; ii < _numbeams; ii++ ) + { + (*_handler[ii]).init(block); + } + } + + template <class HandlerType> + bool TransposeToDada<HandlerType>::operator()(RawBytes& block) + { + std::uint32_t ii; + std::vector<std::thread> threads; + auto transpose_size = _nchans * _nsamples * _nfreq * _ngroups; + char* o_data = new char[transpose_size]; + try + { + for(ii=0; ii< _numbeams; ii++) + { + threads.emplace_back(std::thread([&, ii]() + { + RawBytes transpose(o_data,std::size_t(transpose_size),std::size_t(0)); + transpose::do_transpose(transpose, block, _nchans, _nsamples, _nfreq, ii, _numbeams, _ngroups); + transpose.used_bytes(transpose.total_bytes()); + (*_handler[ii])(transpose); + } + )); + + } + + for (ii=0; ii< _numbeams; ii++) + { + threads[ii].join(); + } + + delete [] o_data; + } + + catch(...) + { + delete [] o_data; + BOOST_LOG_TRIVIAL(debug) << "Unknown exception caught"; + } + return false; + } + + template <class HandlerType> + void TransposeToDada<HandlerType>::set_nchans(const int nchans) + { + _nchans = nchans; + } + + template <class HandlerType> + void TransposeToDada<HandlerType>::set_nbeams(const int nbeams) + { + _numbeams = nbeams; + } + + template <class HandlerType> + void TransposeToDada<HandlerType>::set_ngroups(const int ngroups) + { + _ngroups = ngroups; + } + + template <class HandlerType> + void TransposeToDada<HandlerType>::set_nsamples(const int nsamples) + { + _nsamples = nsamples; + } + + template <class HandlerType> + void TransposeToDada<HandlerType>::set_nfreq(const int nfreq) + { + _nfreq = nfreq; + } + + template <class HandlerType> + std::uint32_t TransposeToDada<HandlerType>::nchans() + { + return _nchans; + } + + template <class HandlerType> + std::uint32_t TransposeToDada<HandlerType>::nsamples() + { + return _nsamples; + } + + template <class HandlerType> + std::uint32_t TransposeToDada<HandlerType>::nfreq() + { + return _nfreq; + } + + template <class HandlerType> + std::uint32_t TransposeToDada<HandlerType>::nbeams() + { + return _numbeams; + } + + template <class HandlerType> + std::uint32_t TransposeToDada<HandlerType>::ngroups() + { + return _ngroups; + } + +} //tuse +} //meerkat +} //psrdada_cpp diff --git a/psrdada_cpp/meerkat/tuse/src/transpose_to_dada.cpp b/psrdada_cpp/meerkat/tuse/src/transpose_to_dada.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ad256aa4f7f981e2c76a871b619505a4c0533955 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/src/transpose_to_dada.cpp @@ -0,0 +1,52 @@ +#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include <ctime> +#include <mutex> +#include <iostream> + + +namespace psrdada_cpp { +namespace meerkat { +namespace tuse { + +namespace transpose{ + + /* + * @brief This is the actual block that performs the + * transpose. The format is based on the heap format + * of SPEAD2 packets. This can change in time + */ + std::mutex MyMutex; + void do_transpose(RawBytes& transposed_data, RawBytes& input_data,std::uint32_t nchans, std::uint32_t nsamples, std::uint32_t nfreq, std::uint32_t beamnum, std::uint32_t nbeams, std::uint32_t ngroups) + { + // make copies of arrays to be transposed + size_t tocopy = ngroups * nsamples * nfreq * nchans; + char *tmpindata = new char[tocopy / ngroups]; + char *tmpoutdata = new char[tocopy]; + size_t skipgroup = nchans * nsamples * nfreq * nbeams; + size_t skipbeam = beamnum * nchans * nsamples * nfreq; + size_t skipband = nchans * nsamples; + size_t skipallchans = nchans * nfreq; + // actual transpose + for (unsigned int igroup = 0; igroup < ngroups; ++igroup) + { + memcpy(tmpindata, input_data.ptr() + skipbeam + igroup * skipgroup, tocopy / ngroups); + + for (unsigned int isamp = 0; isamp < nsamples; ++isamp) + { + for (unsigned int iband = 0; iband < nfreq; ++iband) + { + memcpy(tmpoutdata + iband * nchans + isamp * skipallchans + igroup * tocopy/ngroups, + tmpindata + iband * skipband + isamp * nchans, + nchans * sizeof(char)); + } // BAND LOOP + } // SAMPLES LOOP + } // GROUP LOOP + memcpy(transposed_data.ptr(), tmpoutdata, tocopy); + delete [] tmpoutdata; + delete [] tmpindata; + } +} //transpose +} //tuse +} //meerkat +} //psrdada_cpp diff --git a/psrdada_cpp/meerkat/tuse/src/transpose_to_dada_cli.cpp b/psrdada_cpp/meerkat/tuse/src/transpose_to_dada_cli.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a7ac9004cf0afefa95f1b32ac94e01b158bc10ec --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/src/transpose_to_dada_cli.cpp @@ -0,0 +1,124 @@ +#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp" +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/dada_input_stream.hpp" +#include "psrdada_cpp/dada_output_stream.hpp" +#include "psrdada_cpp/psrdada_to_sigproc_header.hpp" +#include "boost/program_options.hpp" +#include <memory> +#include <fstream> +#include <ctime> + +using namespace psrdada_cpp; + +namespace +{ + const size_t ERROR_IN_COMMAND_LINE = 1; + const size_t SUCCESS = 0; + const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + + +int main(int argc, char** argv) +{ + try + { + key_t input_key; + std::uint32_t nchans; + std::uint32_t nsamples; + std::uint32_t nfreq; + std::uint32_t nbeams; + std::uint32_t ngroups; + std::string filename; + std::fstream fkeys; + key_t* output_keys = new key_t[nbeams]; + /** + * Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + ("help,h", "Print help messages") + ("input_key,i", po::value<std::string>() + ->default_value("dada") + ->notifier([&input_key](std::string in) + { + input_key = string_to_key(in); + }), + "The shared memory key for the input dada buffer to connect to (hex string)") + ("ngroups,g", po::value<std::uint32_t>(&ngroups)->required(), + "Number of heap groups in one DADA block") + ("nbeams,b", po::value<std::uint32_t>(&nbeams)->required(), + "The number of beams in the stream") + + ("key_file,o", po::value<std::string> (&filename)->required(), + "File containing the keys for each ouput dada buffer corresponding to each beam") + + ("nchannels,c", po::value<std::uint32_t>(&nchans)->required(), + "The number of frequency channels per heap in the stream") + ("nsamples,s", po::value<std::uint32_t>(&nsamples)->required(), + "The number of time samples per heap in the stream") + ("nfreq,f", po::value<std::uint32_t>(&nfreq)->required(), + "The number of frequency subbands in the stream"); + + /* Catch Error and program description */ + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "Transpose2Dada -- read MeerKAT beamformed dada from DADA buffer, transpose per beam and write to an output DADA buffer" + << std::endl << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch(po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + /* Open file to parse all values to the key_t object*/ + fkeys.open(filename,std::fstream::in); + std::uint32_t ii; + for (ii=0; ii < nbeams; ii++) + { + std::string key; + std::getline(fkeys,key); + output_keys[ii] = string_to_key(key); + } + fkeys.close(); + /* Application Code */ + MultiLog log("outstream"); + /* Setting up the pipeline based on the type of sink*/ + std::vector<std::shared_ptr<DadaOutputStream>> outstreams; + for (ii=0 ; ii < nbeams; ++ii) + { + outstreams.emplace_back(std::make_shared<DadaOutputStream>(output_keys[ii],log)); + } + + meerkat::tuse::TransposeToDada<DadaOutputStream> transpose(nbeams,std::move(outstreams)); + transpose.set_nsamples(nsamples); + transpose.set_nchans(nchans); + transpose.set_nfreq(nfreq); + transpose.set_ngroups(ngroups); + transpose.set_nbeams(nbeams); + PsrDadaToSigprocHeader<decltype(transpose)> ptos(transpose); + MultiLog log1("instream"); + DadaInputStream<decltype(ptos)> input(input_key,log1,ptos); + input.start(); + /* End Application Code */ + } + catch(std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; + +} diff --git a/psrdada_cpp/meerkat/tuse/src/transpose_to_file_cli.cpp b/psrdada_cpp/meerkat/tuse/src/transpose_to_file_cli.cpp new file mode 100644 index 0000000000000000000000000000000000000000..eed5c4288f6e96730d276e44fc76fdcc9dbbbd69 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/src/transpose_to_file_cli.cpp @@ -0,0 +1,115 @@ +#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp" +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/dada_input_stream.hpp" +#include "psrdada_cpp/simple_file_writer.hpp" +#include "psrdada_cpp/psrdada_to_sigproc_header.hpp" +#include "boost/program_options.hpp" +#include <memory> +#include <fstream> +#include <ctime> + +using namespace psrdada_cpp; + +namespace +{ + const size_t ERROR_IN_COMMAND_LINE = 1; + const size_t SUCCESS = 0; + const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + + +int main(int argc, char** argv) +{ + try + { + key_t input_key; + std::uint32_t nchans; + std::uint32_t nsamples; + std::uint32_t nfreq; + std::uint32_t nbeams; + std::uint32_t ngroups; + std::string filename; + /* + * Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + ("help,h", "Print help messages") + ("input_key,i", po::value<std::string>() + ->default_value("dada") + ->notifier([&input_key](std::string in) + { + input_key = string_to_key(in); + }), + "The shared memory key for the input dada buffer to connect to (hex string)") + ("ngroups,g", po::value<std::uint32_t>(&ngroups)->required(), + "Number of heap groups in one DADA block") + ("nbeams,b", po::value<std::uint32_t>(&nbeams)->required(), + "The number of beams in the stream") + ("nchannels,c", po::value<std::uint32_t>(&nchans)->required(), + "The number of frequency channels per packet in the stream") + ("nsamples,s", po::value<std::uint32_t>(&nsamples)->required(), + "The number of time samples per heap in the stream") + ("nfreq,f", po::value<std::uint32_t>(&nfreq)->required(), + "The number of frequency blocks in the stream"); + + /* Catch Error and program description */ + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "Transpose2files -- read MeerKAT beamformed dada from DADA buffer, transpose per beam and write to an output file" + << std::endl << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch(po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + + /* Application Code */ + + MultiLog log("outstream"); + + /* Setting up the pipeline based on the type of sink*/ + + + std::uint32_t ii; + std::vector<std::shared_ptr<SimpleFileWriter>> files; + for (ii=0; ii < nbeams; ++ii) + { + std::string filename = "beam" + std::to_string(ii) + ".fil"; + files.emplace_back(std::make_shared<SimpleFileWriter>(filename)); + } + meerkat::tuse::TransposeToDada<SimpleFileWriter> transpose(nbeams,std::move(files)); + transpose.set_nsamples(nsamples); + transpose.set_nchans(nchans); + transpose.set_nfreq(nfreq); + transpose.set_ngroups(ngroups); + transpose.set_nbeams(nbeams); + PsrDadaToSigprocHeader<decltype(transpose)> ptos(transpose); + MultiLog log1("instream"); + DadaInputStream<decltype(ptos)> input(input_key,log1,ptos); + input.start(); + /* End Application Code */ + + } + catch(std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; + +} diff --git a/psrdada_cpp/meerkat/tuse/src/transpose_to_null_cli.cpp b/psrdada_cpp/meerkat/tuse/src/transpose_to_null_cli.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7e67792c9d811d458040ed5b2772a9a626ca5804 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/src/transpose_to_null_cli.cpp @@ -0,0 +1,125 @@ +#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp" +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/dada_input_stream.hpp" +#include "psrdada_cpp/dada_null_sink.hpp" +#include "psrdada_cpp/psrdada_to_sigproc_header.hpp" +#include "boost/program_options.hpp" +#include <memory> +#include <fstream> +#include <dlfcn.h> +#include <unistd.h> +#include <csignal> +#include <ctime> + +using namespace psrdada_cpp; + +namespace +{ + const size_t ERROR_IN_COMMAND_LINE = 1; + const size_t SUCCESS = 0; + const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + + +namespace detail +{ + void SignalHandler(int signum) + { + exit(signum); + } +} + +int main(int argc, char** argv) +{ + try + { + key_t input_key; + std::uint32_t nchans; + std::uint32_t nsamples; + std::uint32_t nfreq; + std::uint32_t nbeams; + std::uint32_t ngroups; + /** + * Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + ("help,h", "Print help messages") + ("input_key,i", po::value<std::string>() + ->default_value("dada") + ->notifier([&input_key](std::string in) + { + input_key = string_to_key(in); + }), + "The shared memory key for the input dada buffer to connect to (hex string)") + ("ngroups,g", po::value<std::uint32_t>(&ngroups)->required(), + "Number of heap groups in one DADA block") + ("nbeams,b", po::value<std::uint32_t>(&nbeams)->required(), + "The number of beams in the stream") + ("nchannels,c", po::value<std::uint32_t>(&nchans)->required(), + "The number of frequency channels per packet in the stream") + ("nsamples,s", po::value<std::uint32_t>(&nsamples)->required(), + "The number of time samples per heap in the stream") + ("nfreq,f", po::value<std::uint32_t>(&nfreq)->required(), + "The number of frequency blocks in the stream"); + + /* Catch Error and program description */ + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "Transpose2sink -- read MeerKAT beamformed dada from DADA buffer, transpose per beam and write to a sink" + << std::endl << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch(po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + + /* Application Code */ + + std::signal(SIGINT,detail::SignalHandler); + MultiLog log("outstream"); + + /* Setting up the pipeline based on the type of sink*/ + std::vector<std::shared_ptr<NullSink>> nullsinks; + std::uint32_t ii; + for (ii=0; ii < nbeams; ++ii) + { + nullsinks.emplace_back(std::make_shared<NullSink>()); + } + + meerkat::tuse::TransposeToDada<NullSink> transpose(nbeams,std::move(nullsinks)); + transpose.set_nsamples(nsamples); + transpose.set_nchans(nchans); + transpose.set_nfreq(nfreq); + transpose.set_ngroups(ngroups); + transpose.set_nbeams(nbeams); + PsrDadaToSigprocHeader<decltype(transpose)> ptos(transpose); + MultiLog log1("instream"); + DadaInputStream<decltype(ptos)> input(input_key,log1,ptos); + + input.start(); + + /* End Application Code */ + + } + catch(std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; +} diff --git a/psrdada_cpp/meerkat/tuse/test/CMakeLists.txt b/psrdada_cpp/meerkat/tuse/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..39beec58427a171f5d4226a27aa445737fd73ce0 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/test/CMakeLists.txt @@ -0,0 +1,11 @@ +include_directories(${GTEST_INCLUDE_DIR}) +link_directories(${GTEST_LIBRARY_DIR}) + +set( + gtest_tuse_src + +) + +#cuda_add_executable(gtest_fbfuse ${gtest_tuse_src} ) +#target_link_libraries(gtest_tuse ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES}) +#add_test(gtest_tuse gtest_tuse --test_data "${CMAKE_CURRENT_LIST_DIR}/data") diff --git a/psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp b/psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp new file mode 100644 index 0000000000000000000000000000000000000000..d2013a58ca230ca3a1fc19f5a0f35b985fce8004 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp @@ -0,0 +1,124 @@ +#ifndef PSRDADA_CPP_TRANSPOSE_TO_DADA_HPP +#define PSRDADA_CPP_TRANSPOSE_TO_DADA_HPP + +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/common.hpp" + +namespace psrdada_cpp { +namespace meerkat { +namespace tuse { + +namespace transpose{ + + /** + * @brief the method that does the actual transpose + */ + void do_transpose(RawBytes& transposed_data, RawBytes& input_data, std::uint32_t nchans, std::uint32_t nsamples, std::uint32_t nfreq, std::uint32_t beamnum, std::uint32_t numbeams, std::uint32_t ngroups); +} + +template <class HandlerType> +class TransposeToDada +{ + +public: + TransposeToDada(std::size_t numbeams, std::vector<std::shared_ptr<HandlerType>> handler); + ~TransposeToDada(); + + /** + * @brief A transpose method to be called on connection + * to a ring buffer. + * + * @detail The number of beams to process and a vector of + * shared pointers to open DADA blocks are given + * as arguments. The transpose is performed on + * a beam to beam basis. + * + * @param block A RawBytes object wrapping a DADA header buffer + */ + void init(RawBytes& block); + + /** + * @brief A callback to be called on acqusition of a new + * data block. + * + * @param block A RawBytes object wrapping a DADA data buffer + */ + bool operator()(RawBytes& block); + + + /** + *@brief: Setter for number of beams + */ + + void set_nbeams(const int nbeams); + + /** + *@brief: Setter for ngroups + */ + + void set_ngroups(const int ngroups); + + /** + * @brief Setter for frequency channels + */ + + void set_nchans(const int nchans); + + /** + * @brief Setter of number of time samples + */ + + void set_nsamples(const int nsamples); + + /** + * @brief Setter for number of frequency blocks + */ + + void set_nfreq(const int _nfreq); + + /** + * @brief getter for number of channels + */ + + std::uint32_t nchans(); + + /** + * @brief getter for number of time samples + */ + + std::uint32_t nsamples(); + + /** + * @brief getter for frequency blocks + */ + + std::uint32_t nfreq(); + + /** + *@brief: getter for ngroups + */ + + std::uint32_t ngroups(); + + /** + *@brief: getter for nbeams + */ + + std::uint32_t nbeams(); + +private: + std::uint32_t _numbeams; + std::vector<std::shared_ptr<HandlerType>> _handler; + std::uint32_t _nchans; + std::uint32_t _nsamples; + std::uint32_t _nfreq; + std::uint32_t _ngroups; + +}; + +} // namespace tuse +} // namespace meerkat +} // namespace psrdada_cpp + +#include "psrdada_cpp/meerkat/tuse/detail/transpose_to_dada.cpp" +#endif //PSRDADA_CPP_TRANSPOSE_TO_DADA_HPP diff --git a/psrdada_cpp/psrdada_to_sigproc_header.hpp b/psrdada_cpp/psrdada_to_sigproc_header.hpp new file mode 100644 index 0000000000000000000000000000000000000000..624024b35546ae6d598487118d3924b5a220a5da --- /dev/null +++ b/psrdada_cpp/psrdada_to_sigproc_header.hpp @@ -0,0 +1,49 @@ +#ifndef PSRDADA_CPP_PSRDADA_TO_SIGPROC_HEADER_HPP +#define PSRDADA_CPP_PSRDADA_TO_SIGPROC_HEADER_HPP + +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/common.hpp" + +namespace psrdada_cpp { + + +template <class HandlerType> +class PsrDadaToSigprocHeader +{ + +public: + PsrDadaToSigprocHeader(HandlerType& handler); + ~PsrDadaToSigprocHeader(); + + /** + * @brief A header manipulation method for PSRDADA and SIGPROC + * + * + * @detail A class that converts the PSRDADA header to a SIGPROC + * header before writing it out to the header block of the + * DADA buffer. This conversion is needed so that the down + * stream pipeline can handle the header format. + * + * @param block A RawBytes object wrapping a DADA header buffer + */ + void init(RawBytes& block); + + /** + * @brief A callback to be called on acqusition of a new + * data block. + * + * @param block A RawBytes object wrapping a DADA data buffer + */ + bool operator()(RawBytes& block); + + +private: + HandlerType _handler; + +}; + + +} //psrdada_cpp + +#include "psrdada_cpp/detail/psrdada_to_sigproc_header.cpp" +#endif //PSRDADA_CPP_PSRDADA_TO_SIGPROC_HEADER_HPP diff --git a/psrdada_cpp/psrdadaheader.hpp b/psrdada_cpp/psrdadaheader.hpp new file mode 100644 index 0000000000000000000000000000000000000000..04dc5ff6bc06a2704c5dadf4299119ea90e5fe61 --- /dev/null +++ b/psrdada_cpp/psrdadaheader.hpp @@ -0,0 +1,130 @@ +#ifndef PSRDADA_CPP_PSRDADAHEADER_HPP +#define PSRDADA_CPP_PSRDADAHEADER_HPP + +/* + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + + +#pragma once +#include <string> +#include <iostream> +#include <fstream> +#include <sstream> +#include <vector> +#include <stdlib.h> +#include"psrdada_cpp/raw_bytes.hpp" + +#define DADA_HDR_SIZE 4096L + +/* + * @ detail: PsrDada header storage class + * Reads the header block of DADA buffer and stores + * all the necessary metadata to private members + * which can be accessed via getters + */ + +namespace psrdada_cpp +{ +class PsrDadaHeader +{ +public: + + PsrDadaHeader(); + ~PsrDadaHeader(); + + /** + * @brief: Get values from given key words + */ + void from_bytes(RawBytes& block); + + /** + * @ All getters for Sigproc header + */ + std::uint32_t bw(); + + double freq(); + + std::uint32_t nbits(); + + double tsamp(); + + std::string ra(); + + std::string dec(); + + std::string telescope(); + + std::string instrument(); + + std::string source_name(); + + double tstart(); + + std::uint32_t nchans(); + + /** + * @brief: All the setters + */ + void set_bw(std::uint32_t bw); + + void set_freq(double freq); + + void set_nbits(std::uint32_t nbits); + + void set_tsamp(double tsamp); + + void set_ra(std::string ra); + + void set_dec(std::string dec); + + void set_telescope(std::string telescope); + + void set_instrument(std::string instrument); + + void set_source(std::string source); + + void set_tstart(double tstart); + + void set_nchans(std::uint32_t nchans); + + /** + * @brief: returns the value of the key word based on the keyword + */ + std::string get_value(std::string name, std::stringstream& header); + +private: + + /** + * @brief All standard PSRDADA header parameters (can add/subtract + * if needed) + */ + std::uint32_t _bw; + double _freq; + std::uint32_t _nchans; + std::uint32_t _ndim; + std::uint32_t _npol; + std::uint32_t _nbits; + double _tsamp; + std::string _source_name; + std::string _ra; + std::string _dec; + std::string _telescope; + std::string _instrument; + double _mjd; + +}; +} // namespace psrdada_cpp + +#endif // PSRDADA_CPP_PSRDADAHEADER_HPP diff --git a/psrdada_cpp/raw_bytes.hpp b/psrdada_cpp/raw_bytes.hpp index 266c75bc26c0b2ee52f09f52e163895de9be9781..6e12209b1d35300502e425e660f354374c251d38 100644 --- a/psrdada_cpp/raw_bytes.hpp +++ b/psrdada_cpp/raw_bytes.hpp @@ -69,4 +69,4 @@ namespace psrdada_cpp { }; } //namespace psrdada_cpp -#endif //PSRDADA_CPP_RAW_BYTES_HPP \ No newline at end of file +#endif //PSRDADA_CPP_RAW_BYTES_HPP diff --git a/psrdada_cpp/sigprocheader.hpp b/psrdada_cpp/sigprocheader.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5965a7d44d38a8ecf4548b86798aa36981bf7fba --- /dev/null +++ b/psrdada_cpp/sigprocheader.hpp @@ -0,0 +1,48 @@ +#ifndef PSRDADA_CPP_SIGPROCHEADER_HPP +#define PSRDADA_CPP_SIGPROCHEADER_HPP + +#include <string> +#include <iostream> +#include <fstream> +#include <sstream> +#include <vector> +#include <stdlib.h> +#include"psrdada_cpp/psrdadaheader.hpp" +#include"psrdada_cpp/raw_bytes.hpp" + +/* @detail: A SigProc Header writer class. This class will parse values + * from a PSRDADA header object and write that out as a standard + * SigProc format. This is specific for PSRDADA stream. + */ + +namespace psrdada_cpp +{ + +class SigprocHeader +{ +public: + SigprocHeader(); + ~SigprocHeader(); + void write_header(RawBytes& block,PsrDadaHeader ph); + + std::size_t header_size() const; + +private: + std::size_t _header_size; + /* + * @brief write string to the header + */ + void header_write(char*& ptr, std::string const& str); + void header_write(char*& ptr, std::string const& str, std::string const& name); + + /* + * @brief write a value to the stream + */ + template<typename NumericT> + void header_write(char*& ptr, std::string const& name, NumericT val); + +}; + +} // namespace psrdada_cpp +#include "psrdada_cpp/detail/sigprocheader.cpp" +#endif //PSRDADA_CPP_SIGPROCHEADER_HPP diff --git a/psrdada_cpp/simple_file_writer.hpp b/psrdada_cpp/simple_file_writer.hpp index 476950db780629c49876ac4a7b9ba70f88f262ba..7c65da1c9d64eb3b16b9844d78a3b3d4b261d103 100644 --- a/psrdada_cpp/simple_file_writer.hpp +++ b/psrdada_cpp/simple_file_writer.hpp @@ -15,6 +15,7 @@ namespace psrdada_cpp { SimpleFileWriter(SimpleFileWriter const&) = delete; ~SimpleFileWriter(); void init(RawBytes&); + void init(RawBytes&, std::size_t); bool operator()(RawBytes&); private: diff --git a/psrdada_cpp/src/dada_write_client.cpp b/psrdada_cpp/src/dada_write_client.cpp index c6d2b3e77407353343db77b58865c74719358f4b..c978fc533d6392d3da5f6bade1ad64b422ef494e 100644 --- a/psrdada_cpp/src/dada_write_client.cpp +++ b/psrdada_cpp/src/dada_write_client.cpp @@ -101,7 +101,7 @@ namespace psrdada_cpp { _parent._log.write(LOG_ERR, "Could not mark filled header block\n"); throw std::runtime_error("Could not mark filled header block"); } - _current_block.reset(nullptr); + _current_block.reset(); } DadaWriteClient::DataStream::DataStream(DadaWriteClient& parent) @@ -151,7 +151,7 @@ namespace psrdada_cpp { _parent._log.write(LOG_ERR, "close_buffer: ipcio_close_block_write failed\n"); throw std::runtime_error("Could not close ipcio data block"); } - _current_block.reset(nullptr); + _current_block.reset(); } } diff --git a/psrdada_cpp/src/psrdadaheader.cpp b/psrdada_cpp/src/psrdadaheader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f09b4be1bbc2c2fda26d5a2921a9911617b72060 --- /dev/null +++ b/psrdada_cpp/src/psrdadaheader.cpp @@ -0,0 +1,163 @@ +#include"psrdada_cpp/psrdadaheader.hpp" +#include <iostream> +#include <chrono> + + + +namespace psrdada_cpp{ + +PsrDadaHeader::PsrDadaHeader() +{ +} + +PsrDadaHeader::~PsrDadaHeader() +{ +} + +void PsrDadaHeader::from_bytes(RawBytes& block) +{ + std::vector<char> buf(DADA_HDR_SIZE); + std::copy(block.ptr(),block.ptr()+block.total_bytes(),buf.begin()); + std::stringstream header; + header.rdbuf()->pubsetbuf(&buf[0],DADA_HDR_SIZE); + set_bw(atoi(get_value("BW ",header).c_str())); + set_freq(atof(get_value("FREQ ",header).c_str())); + set_nchans(atoi(get_value("NCHAN ",header).c_str())); + set_nbits(atoi(get_value("NBIT ",header).c_str())); + set_tsamp(atof(get_value("TSAMP ",header).c_str())); + set_source(get_value("SOURCE ",header)); + set_ra(get_value("RA ",header)); + set_dec(get_value("DEC ",header)); + set_telescope(get_value("TELESCOPE ",header)); + set_instrument(get_value("INSTRUMENT ",header)); + set_tstart(atof(get_value("MJD ",header).c_str())); + return; +} + +std::string PsrDadaHeader::get_value(std::string name,std::stringstream& header) +{ + size_t position = header.str().find(name); + if (position!=std::string::npos) + { + header.seekg(position+name.length()); + std::string value; + header >> value; + return value; + } + else + { + return ""; + } +} + +std::uint32_t PsrDadaHeader::bw() +{ + return _bw; +} + +double PsrDadaHeader::freq() +{ + return _freq; +} + +std::uint32_t PsrDadaHeader::nbits() +{ + return _nbits; +} + +double PsrDadaHeader::tsamp() +{ + return _tsamp; +} + +std::string PsrDadaHeader::ra() +{ + return _ra; +} + +std::string PsrDadaHeader::dec() +{ + return _dec; +} + +std::string PsrDadaHeader::telescope() +{ + return _telescope; +} + +std::string PsrDadaHeader::instrument() +{ + return _instrument; +} + +std::string PsrDadaHeader::source_name() +{ + return _source_name; +} + +std::uint32_t PsrDadaHeader::nchans() +{ + return _nchans; +} + +double PsrDadaHeader::tstart() +{ + return _mjd; +} + +void PsrDadaHeader::set_bw(std::uint32_t bw) +{ + _bw = bw; +} + +void PsrDadaHeader::set_freq(double freq) +{ + _freq=freq; +} + +void PsrDadaHeader::set_nbits(std::uint32_t nbits) +{ + _nbits=nbits; +} + +void PsrDadaHeader::set_tsamp(double tsamp) +{ + _tsamp = tsamp; +} + +void PsrDadaHeader::set_ra(std::string ra) +{ + _ra.assign(ra); +} + +void PsrDadaHeader::set_dec(std::string dec) +{ + _dec.assign(dec); +} + +void PsrDadaHeader::set_telescope(std::string telescope) +{ + _telescope = telescope; +} + +void PsrDadaHeader::set_instrument(std::string instrument) +{ + _instrument = instrument; +} + +void PsrDadaHeader::set_source(std::string source) +{ + _source_name=source; +} + +void PsrDadaHeader::set_tstart(double tstart) +{ + _mjd = tstart; +} + +void PsrDadaHeader::set_nchans(std::uint32_t nchans) +{ + _nchans=nchans; +} + +} // namespace psrdada_cpp diff --git a/psrdada_cpp/src/raw_bytes.cpp b/psrdada_cpp/src/raw_bytes.cpp index aeea23592f87775c6ad753a323beb5c8eb470c73..3a8a4812960b0ca79a39643fa33dd01a3e27c609 100644 --- a/psrdada_cpp/src/raw_bytes.cpp +++ b/psrdada_cpp/src/raw_bytes.cpp @@ -39,4 +39,4 @@ namespace psrdada_cpp { return _on_device; } -} //namespace psrdada_cpp \ No newline at end of file +} //namespace psrdada_cpp diff --git a/psrdada_cpp/src/sigprocheader.cpp b/psrdada_cpp/src/sigprocheader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..cc6e266f5257a7dca7f4d1c262b202cbb0335d75 --- /dev/null +++ b/psrdada_cpp/src/sigprocheader.cpp @@ -0,0 +1,74 @@ +#include "psrdada_cpp/sigprocheader.hpp" +#include "psrdada_cpp/psrdadaheader.hpp" +#include <string> +#include <chrono> +#include <boost/algorithm/string.hpp> +#include <iterator> + +namespace psrdada_cpp +{ + SigprocHeader::SigprocHeader() + : + _header_size(0) + { + } + + SigprocHeader::~SigprocHeader() + { + } + + void SigprocHeader::header_write(char*& ptr, std::string const& str) + { + int len = str.size(); + std::memcpy(ptr,(char*)&len,sizeof(len)); + ptr += sizeof(len); + std::copy(str.begin(),str.end(),ptr); + ptr += len; + } + + void SigprocHeader::header_write(char*& ptr, std::string const& str, std::string const& name) + { + header_write(ptr,str); + header_write(ptr,name); + } + + void SigprocHeader::write_header(RawBytes& block, PsrDadaHeader ph) + { + auto ptr = block.ptr(); + header_write(ptr,"HEADER_START"); + header_write<std::uint32_t>(ptr,"telescope_id",0); + header_write<std::uint32_t>(ptr,"machine_id",11); + header_write<std::uint32_t>(ptr,"data_type",1); + header_write<std::uint32_t>(ptr,"barycentric",0); + header_write(ptr,"source_name",ph.source_name()); + // RA DEC + auto ra_val = ph.ra(); + auto dec_val =ph.dec(); + std::vector<std::string> ra_s; + std::vector<std::string> dec_s; + boost::split(ra_s,ra_val,boost::is_any_of(":")); + boost::split(dec_s,dec_val,boost::is_any_of(":")); + double ra = stod(boost::join(ra_s,"")); + double dec = stod(boost::join(dec_s,"")); + header_write<double>(ptr,"src_raj",ra); + header_write<double>(ptr, "src_dej",dec); + header_write<std::uint32_t>(ptr,"nbits",ph.nbits()); + header_write<std::uint32_t>(ptr,"nifs",1); + header_write<std::uint32_t>(ptr,"nchans",ph.nchans()); + header_write<double>(ptr,"fch1", ph.freq()); + header_write<double>(ptr,"foff",ph.bw()/ph.nchans()); + header_write<double>(ptr,"tstart",ph.tstart()); + header_write<double>(ptr,"tsamp",ph.tsamp()); + header_write(ptr,"HEADER_END"); + _header_size = std::distance(block.ptr(),ptr); + } + + std::size_t SigprocHeader::header_size() const + { + return _header_size; + } + + +} // namespace psrdada_cpp + + diff --git a/psrdada_cpp/src/simple_file_writer.cpp b/psrdada_cpp/src/simple_file_writer.cpp index 03d59e75c16d5ec7a0cf8a5f986a94f6552d85b0..bdca2979f6bb8fa056c6e2b70b68787f2bf7c123 100644 --- a/psrdada_cpp/src/simple_file_writer.cpp +++ b/psrdada_cpp/src/simple_file_writer.cpp @@ -27,6 +27,11 @@ namespace psrdada_cpp { _outfile.write(block.ptr(), 4096); } + void SimpleFileWriter::init(RawBytes& block, std::size_t size) + { + _outfile.write(block.ptr(), size); + } + bool SimpleFileWriter::operator()(RawBytes& block) { _outfile.write(block.ptr(), block.used_bytes()); @@ -36,4 +41,4 @@ namespace psrdada_cpp { return false; } -} //psrdada_cpp \ No newline at end of file +} //psrdada_cpp