diff --git a/cmake/compiler_settings.cmake b/cmake/compiler_settings.cmake index 1db13939c9eaa6316c60d95217a6865fdf186c29..3ce03c95ed0346b6159601536c88c108aac67915 100644 --- a/cmake/compiler_settings.cmake +++ b/cmake/compiler_settings.cmake @@ -31,9 +31,9 @@ endif () set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG -Wno-unused-local-typedefs") set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g -Wall -Wextra -pedantic -Wno-unused-local-typedefs") -set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g -Wall -Wextra -pedantic -Wno-unused-local-typedefs") +set(CMAKE_CXX_FLAGS_DEBUG "-O3 -g -pg -Wall -Wextra -pedantic -Wno-unused-local-typedefs") # Set include directories for dependencies include_directories( ${Boost_INCLUDE_DIRS} -) \ No newline at end of file +) diff --git a/psrdada_cpp/CMakeLists.txt b/psrdada_cpp/CMakeLists.txt index 3fb48db8931fe7f6c6e59cfe2a16986cbf76676c..a1dfaeca9b15f8603f606d5f4f7433a8ecd0a71b 100644 --- a/psrdada_cpp/CMakeLists.txt +++ b/psrdada_cpp/CMakeLists.txt @@ -56,6 +56,10 @@ target_link_libraries (junkdb ${PSRDADA_CPP_LIBRARIES}) add_executable(syncdb examples/syncdb.cpp) target_link_libraries (syncdb ${PSRDADA_CPP_LIBRARIES}) +#fbfuse_output_db +add_executable(fbfuse_output_db examples/fbfuse_output_db.cpp) +target_link_libraries (fbfuse_output_db ${PSRDADA_CPP_LIBRARIES}) + #transpose_to_dada_cli add_executable(transpose_to_dada_cli src/transpose_to_dada_cli.cpp) target_link_libraries (transpose_to_dada_cli ${PSRDADA_CPP_LIBRARIES}) @@ -71,7 +75,7 @@ target_link_libraries (transpose_to_null_cli ${PSRDADA_CPP_LIBRARIES}) #dbnull add_executable(dbnull examples/dbnull.cpp) target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES}) -install (TARGETS junkdb dbnull syncdb transpose_to_dada_cli DESTINATION bin) +install (TARGETS junkdb dbnull syncdb fbfuse_output_db transpose_to_dada_cli DESTINATION bin) #install (FILES MathFunctions.h DESTINATION include) diff --git a/psrdada_cpp/detail/transpose_to_dada.cpp b/psrdada_cpp/detail/transpose_to_dada.cpp index a2b2982eeab59cacab6e738d689dcf690d8ed9f8..ebde458e2263f5a8023e6dbeb187b4e0edf73c4e 100644 --- a/psrdada_cpp/detail/transpose_to_dada.cpp +++ b/psrdada_cpp/detail/transpose_to_dada.cpp @@ -36,24 +36,26 @@ namespace psrdada_cpp { { std::uint32_t ii; - //std::vector<std::thread> threads; - char* o_data = new char[_nchans*_nsamples*_ntime*_nfreq*_ngroups]; + std::vector<std::thread> threads; + auto transpose_size = _nchans * _nsamples * _ntime * _nfreq * _ngroups; + char* o_data = new char[transpose_size]; for(ii=0; ii< _numbeams; ii++) { - //threads.emplace_back(std::thread([&]() - //{ - RawBytes transpose(o_data,std::size_t(_nchans*_nsamples*_ntime*_nfreq*_ngroups),std::size_t(0)); - transpose::do_transpose(transpose,block,_nchans,_nsamples,_ntime,_nfreq,ii,_numbeams,_ngroups); - transpose.used_bytes(transpose.total_bytes()); - (*_handler[ii])(transpose); - //})); + threads.emplace_back(std::thread([&, ii]() + { + RawBytes transpose(o_data,std::size_t(transpose_size),std::size_t(0)); + transpose::do_transpose(transpose, block, _nchans, _nsamples, _ntime, _nfreq, ii, _numbeams, _ngroups); + transpose.used_bytes(transpose.total_bytes()); + (*_handler[ii])(transpose); + } + )); } - /*for (ii=0; ii< _numbeams; ii++) + for (ii=0; ii< _numbeams; ii++) { threads[ii].join(); - }*/ + } return false; } diff --git a/psrdada_cpp/examples/fbfuse_output_db.cpp b/psrdada_cpp/examples/fbfuse_output_db.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ffe0fca763157b77cf9776404a9922f139d74edc --- /dev/null +++ b/psrdada_cpp/examples/fbfuse_output_db.cpp @@ -0,0 +1,207 @@ +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/dada_output_stream.hpp" +#include "psrdada_cpp/dada_sync_source.hpp" +#include "psrdada_cpp/cli_utils.hpp" + +#include "boost/program_options.hpp" + +#include <sys/types.h> +#include <iostream> +#include <string> +#include <sstream> +#include <ios> +#include <algorithm> + +using namespace psrdada_cpp; + +namespace +{ + const size_t ERROR_IN_COMMAND_LINE = 1; + const size_t SUCCESS = 0; + const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + +template<class HandlerType> +class FakeBeams +{ + +public: + FakeBeams(HandlerType& handler, std::size_t size); + ~FakeBeams(); + + void init(RawBytes& block); + + bool operator()(RawBytes& block); + + void beam_num_start(std::uint8_t beam_num_start); + + void beam_num_end(std::uint8_t beam_num_end); + +private: + HandlerType& _handler; + std::uint8_t _beam_num; + std::uint8_t _beam_num_start; + std::uint8_t _beam_num_end; + std::size_t _size; +}; + +template<class HandlerType> +FakeBeams<HandlerType>::FakeBeams(HandlerType& handler, std::size_t size): +_handler(handler), +_beam_num(1), +_beam_num_start(1), +_beam_num_end(6), +_size(size) +{ +} + +template<class HandlerType> +FakeBeams<HandlerType>::~FakeBeams() +{ +} + +template<class HandlerType> +void FakeBeams<HandlerType>::init(RawBytes& block) +{ + _handler.init(block); +} + +template<class HandlerType> +void FakeBeams<HandlerType>::beam_num_start(std::uint8_t beam_num_start) +{ + _beam_num_start = beam_num_start; +} + +template<class HandlerType> +void FakeBeams<HandlerType>::beam_num_end(std::uint8_t beam_num_end) +{ + _beam_num_end = beam_num_end; +} + +template<class HandlerType> +bool FakeBeams<HandlerType>::operator()(RawBytes& block) +{ + _beam_num = _beam_num_start; + char* ptr = block.ptr(); + std::size_t bytes_written = 0 ; + while (bytes_written <= block.total_bytes()) + { + std::vector<char> beam_data(_size,_beam_num); + std::copy(beam_data.begin(),beam_data.end(),ptr); + ptr += static_cast<int>(_size); + bytes_written = bytes_written + _size; + ++_beam_num; + if (_beam_num > _beam_num_end) + _beam_num = _beam_num_start; + } + block.used_bytes(block.total_bytes()); + _handler(block); + return false; +} + + + +int main(int argc, char** argv) +{ + try + { + std::size_t nbytes = 0; + key_t key; + std::time_t sync_epoch; + double period; + std::size_t ts_per_block; + std::uint8_t beam_num_start; + std::uint8_t beam_num_end; + std::size_t write_size; + /** Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + ("help,h", "Print help messages") + ("nbytes,n", po::value<std::size_t>(&nbytes) + ->default_value(0), + "Total number of bytes to write") + ("key,k", po::value<std::string>() + ->default_value("dada") + ->notifier([&key](std::string in) + { + key = string_to_key(in); + }), + "The shared memory key for the dada buffer to connect to (hex string)") + ("sync_epoch,s", po::value<std::size_t>() + ->default_value(0) + ->notifier([&sync_epoch](std::size_t in) + { + sync_epoch = static_cast<std::time_t>(in); + }), + "The global sync time for all producing instances") + ("period,p", po::value<double>(&period) + ->default_value(1.0), + "The period (in seconds) at which dada blocks are produced") + ("ts_per_block,t", po::value<std::size_t>(&ts_per_block) + ->default_value(8192*128), + "The increment in timestamp between consecutive blocks") + ("beam_start,b", po::value<std::uint8_t>(&beam_num_start) + ->default_value(1), + "Starting beam id for the heap") + ("beam_end,e", po::value<std::uint8_t>(&beam_num_end) + ->default_value(1), + "Last beam id for the heap") + ("write_size, w", po::value<std::size_t>(&write_size) + ->default_value(0), + "bytes to write per cycle. Should be equal to the heap size.") + ("log_level", po::value<std::string>() + ->default_value("info") + ->notifier([](std::string level) + { + set_log_level(level); + }), + "The logging level to use (debug, info, warning, error)"); + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "SyncDB -- write 1 into a DADA ring buffer at a synchronised and fixed rate" << std::endl + << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch(po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + /** + * All the application code goes here + */ + + MultiLog log("syncdb"); + DadaOutputStream out_stream(key, log); + FakeBeams<decltype(out_stream)> fakebeams(out_stream, write_size); + fakebeams.beam_num_start(beam_num_start); + fakebeams.beam_num_end(beam_num_end); + sync_source<decltype(fakebeams)>( + fakebeams, out_stream.client().header_buffer_size(), + out_stream.client().data_buffer_size(), nbytes, + sync_epoch, period, ts_per_block); + + /** + * End of application code + */ + } + catch(std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; + +} diff --git a/psrdada_cpp/sigprocheader.hpp b/psrdada_cpp/sigprocheader.hpp index 85cc59239dff21f9e5ccf28b7d95b68fd42a747e..7f482a53ca39d4785bff1e2577f50a63b7b7474f 100644 --- a/psrdada_cpp/sigprocheader.hpp +++ b/psrdada_cpp/sigprocheader.hpp @@ -41,9 +41,11 @@ class SigprocHeader SigprocHeader(); ~SigprocHeader(); void write_header(RawBytes& block,PsrDadaHeader ph); - + + std::size_t header_size() const; private: + std::size_t _header_size; /* * @brief write string to the header */ diff --git a/psrdada_cpp/simple_file_writer.hpp b/psrdada_cpp/simple_file_writer.hpp index 476950db780629c49876ac4a7b9ba70f88f262ba..7c65da1c9d64eb3b16b9844d78a3b3d4b261d103 100644 --- a/psrdada_cpp/simple_file_writer.hpp +++ b/psrdada_cpp/simple_file_writer.hpp @@ -15,6 +15,7 @@ namespace psrdada_cpp { SimpleFileWriter(SimpleFileWriter const&) = delete; ~SimpleFileWriter(); void init(RawBytes&); + void init(RawBytes&, std::size_t); bool operator()(RawBytes&); private: diff --git a/psrdada_cpp/src/sigprocheader.cpp b/psrdada_cpp/src/sigprocheader.cpp index 26ff40d4eff110de7568ae4df44a7df3c3dce757..cc6e266f5257a7dca7f4d1c262b202cbb0335d75 100644 --- a/psrdada_cpp/src/sigprocheader.cpp +++ b/psrdada_cpp/src/sigprocheader.cpp @@ -3,11 +3,13 @@ #include <string> #include <chrono> #include <boost/algorithm/string.hpp> - +#include <iterator> namespace psrdada_cpp { SigprocHeader::SigprocHeader() + : + _header_size(0) { } @@ -58,6 +60,12 @@ namespace psrdada_cpp header_write<double>(ptr,"tstart",ph.tstart()); header_write<double>(ptr,"tsamp",ph.tsamp()); header_write(ptr,"HEADER_END"); + _header_size = std::distance(block.ptr(),ptr); + } + + std::size_t SigprocHeader::header_size() const + { + return _header_size; } diff --git a/psrdada_cpp/src/simple_file_writer.cpp b/psrdada_cpp/src/simple_file_writer.cpp index 4c438c99df90f7599a8d42c4d5ef2bbece50cfde..5c3709efea7d06350047770bfc1712b03ade836f 100644 --- a/psrdada_cpp/src/simple_file_writer.cpp +++ b/psrdada_cpp/src/simple_file_writer.cpp @@ -27,6 +27,11 @@ namespace psrdada_cpp { _outfile.write(block.ptr(), 4096); } + void SimpleFileWriter::init(RawBytes& block, std::size_t size) + { + _outfile.write(block.ptr(), size); + } + bool SimpleFileWriter::operator()(RawBytes& block) { _outfile.write(block.ptr(), block.used_bytes()); @@ -35,4 +40,4 @@ namespace psrdada_cpp { return false; } -} //psrdada_cpp \ No newline at end of file +} //psrdada_cpp diff --git a/psrdada_cpp/src/transpose_to_dada.cpp b/psrdada_cpp/src/transpose_to_dada.cpp index 7aac894a5096f6c94cadf5a130313aa5bd155f3e..5b8d24e94532cac948748e8bc9e6c7aaa945a5c8 100644 --- a/psrdada_cpp/src/transpose_to_dada.cpp +++ b/psrdada_cpp/src/transpose_to_dada.cpp @@ -18,7 +18,7 @@ namespace transpose{ void do_transpose(RawBytes& transposed_data, RawBytes& input_data,std::uint32_t nchans, std::uint32_t nsamples, std::uint32_t ntime, std::uint32_t nfreq, std::uint32_t beamnum, std::uint32_t nbeams, std::uint32_t ngroups) { - std::lock_guard<std::mutex> guard(MyMutex); + //std::lock_guard<std::mutex> guard(MyMutex); size_t tocopy = ngroups * nsamples * ntime * nfreq * nchans; char *tmpindata = new char[tocopy / ngroups];