Skip to content
Snippets Groups Projects
Commit 90deed77 authored by Ewan Barr's avatar Ewan Barr
Browse files

merged transpose

parents 70213c03 98fdc191
Branches
Tags
No related merge requests found
Showing
with 600 additions and 22 deletions
......@@ -31,7 +31,7 @@ endif ()
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG -Wno-unused-local-typedefs")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g -Wall -Wextra -pedantic -Wno-unused-local-typedefs")
set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g -Wall -Wextra -pedantic -Wno-unused-local-typedefs")
set(CMAKE_CXX_FLAGS_DEBUG "-O3 -g -pg -Wall -Wextra -pedantic -Wno-unused-local-typedefs")
# Set include directories for dependencies
include_directories(
......
......@@ -75,7 +75,7 @@ ENV PACKAGES $PSRDADA_BUILD
WORKDIR $PSRHOME
RUN git clone https://github.com/ewanbarr/psrdada_cpp.git && \
cd psrdada_cpp/ &&\
git checkout fbfuse &&\
git checkout transpose &&\
git submodule init &&\
git submodule update &&\
mkdir build/ &&\
......
......@@ -17,19 +17,27 @@ set(psrdada_cpp_src
src/multilog.cpp
src/raw_bytes.cpp
src/simple_file_writer.cpp
src/sigprocheader.cpp
src/psrdadaheader.cpp
)
set(psrdada_cpp_inc
cli_utils.hpp
dada_client_base.hpp
dada_io_loop.hpp
dada_io_loop_writer.hpp
dada_input_stream.hpp
dada_output_stream.hpp
dada_read_client.hpp
dada_junk_source.hpp
dada_null_sink.hpp
multilog.hpp
common.hpp
dada_io_loop_reader.hpp
dada_write_client.hpp
raw_bytes.hpp
cuda_utils.hpp
simple_file_writer.hpp
sigprocheader.hpp
psrdadaheader.hpp
psrdada_to_sigproc_header.hpp
)
# -- the main library target
......@@ -39,6 +47,14 @@ add_library(${CMAKE_PROJECT_NAME} ${psrdada_cpp_src})
add_executable(junkdb examples/junkdb.cpp)
target_link_libraries (junkdb ${PSRDADA_CPP_LIBRARIES})
#syncdb
add_executable(syncdb examples/syncdb.cpp)
target_link_libraries (syncdb ${PSRDADA_CPP_LIBRARIES})
#fbfuse_output_db
add_executable(fbfuse_output_db examples/fbfuse_output_db.cpp)
target_link_libraries (fbfuse_output_db ${PSRDADA_CPP_LIBRARIES})
#dbnull
add_executable(dbnull examples/dbnull.cpp)
target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES})
......@@ -47,9 +63,14 @@ target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES})
add_executable(dbdisk examples/dbdisk.cpp)
target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES})
install (TARGETS junkdb dbnull dbdisk DESTINATION bin)
#include subdirs
add_subdirectory(meerkat)
add_subdirectory(effelsberg)
install (TARGETS junkdb dbnull dbdisk syncdb fbfuse_output_db DESTINATION bin)
install (TARGETS ${CMAKE_PROJECT_NAME}
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
install(FILES ${psrdada_cpp_inc} DESTINATION include/psrdada_cpp)
add_subdirectory(meerkat)
......@@ -4,6 +4,8 @@
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include <vector>
#include <cstdlib>
#include <fstream>
namespace psrdada_cpp
{
......@@ -11,13 +13,23 @@ namespace psrdada_cpp
void junk_source(Handler& handler,
std::size_t header_size,
std::size_t nbytes_per_write,
std::size_t total_bytes)
std::size_t total_bytes,
std::string header_file)
{
std::vector<char> junk_header(header_size);
std::vector<char> junk_block(nbytes_per_write);
std::ifstream input;
input.open(header_file,std::ios::in | std::ios::binary);
input.read(junk_header.data(),header_size);
RawBytes header(junk_header.data(),header_size,header_size,false);
handler.init(header);
std::size_t bytes_written = 0;
std::vector<char> junk_block(total_bytes,0);
srand(0);
for (std::uint32_t ii=0; ii < total_bytes; ++ii)
{
junk_block[ii] = rand()%255 + 1;
}
while (bytes_written < total_bytes)
{
RawBytes data(junk_block.data(),nbytes_per_write,nbytes_per_write,false);
......
#ifndef PSRDADA_CPP_DADA_SYNC_SOURCE_HPP
#define PSRDADA_CPP_DADA_SYNC_SOURCE_HPP
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include "ascii_header.h"
#include <vector>
#include <chrono>
#include <iomanip> // std::put_time
#include <thread> // std::this_thread::sleep_until
#include <chrono> // std::chrono::system_clock
#include <ctime>
#include <cmath>
namespace psrdada_cpp
{
template <class Handler>
void sync_source(Handler& handler,
std::size_t header_size,
std::size_t nbytes_per_write,
std::size_t total_bytes,
std::time_t sync_epoch,
double block_duration,
std::size_t ts_per_block)
{
std::vector<char> sync_header(header_size, 0);
std::vector<char> sync_block(nbytes_per_write, 1);
RawBytes header(sync_header.data(), header_size, header_size, false);
RawBytes data(sync_block.data(), nbytes_per_write, nbytes_per_write, false);
std::size_t sample_clock_start;
auto sync_epoch_tp = std::chrono::system_clock::from_time_t(sync_epoch);
auto curr_epoch_tp = std::chrono::system_clock::now();
auto diff = std::chrono::duration_cast<std::chrono::microseconds>(curr_epoch_tp - sync_epoch_tp).count();
std::size_t next_block_idx;
if (sync_epoch_tp > curr_epoch_tp)
{
BOOST_LOG_TRIVIAL(info) << "The sync epoch is " << static_cast<float>(diff)/1e6
<< " seconds in the future";
next_block_idx = 0;
sample_clock_start = 0;
}
else
{
next_block_idx = static_cast<std::size_t>((std::ceil(static_cast<double>(diff)/1e6) / block_duration));
sample_clock_start = next_block_idx * ts_per_block;
}
BOOST_LOG_TRIVIAL(info) << "Setting SAMPLE_CLOCK_START to " << sample_clock_start;
ascii_header_set(sync_header.data(), "SAMPLE_CLOCK_START", "%ld", sample_clock_start);
handler.init(header);
std::size_t bytes_written = 0;
bool infinite = (total_bytes == 0);
while (true)
{
auto epoch_of_wait = sync_epoch_tp + std::chrono::duration<double>(next_block_idx * block_duration);
std::this_thread::sleep_until(epoch_of_wait);
handler(data);
bytes_written += data.used_bytes();
++next_block_idx;
if (!infinite && bytes_written >= total_bytes)
{
break;
}
}
}
} //namespace psrdada_cpp
#endif //PSRDADA_CPP_DADA_SYNC_SOURCE_HPP
#include "psrdada_cpp/psrdada_to_sigproc_header.hpp"
#include "psrdada_cpp/psrdadaheader.hpp"
#include "psrdada_cpp/sigprocheader.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include <thread>
namespace psrdada_cpp {
template <class HandlerType>
PsrDadaToSigprocHeader<HandlerType>::PsrDadaToSigprocHeader(HandlerType& handler)
: _handler(handler)
{
}
template <class HandlerType>
PsrDadaToSigprocHeader<HandlerType>::~PsrDadaToSigprocHeader()
{
}
template <class HandlerType>
void PsrDadaToSigprocHeader<HandlerType>::init(RawBytes& block)
{
SigprocHeader h;
PsrDadaHeader ph;
ph.from_bytes(block);
std::memset(block.ptr(), 0, block.total_bytes());
h.write_header(block,ph);
block.used_bytes(block.total_bytes());
_handler.init(block);
}
template <class HandlerType>
bool PsrDadaToSigprocHeader<HandlerType>::operator()(RawBytes& block)
{
_handler(block);
return false;
}
} //psrdada_cpp
#include "psrdada_cpp/sigprocheader.hpp"
#include "psrdada_cpp/psrdadaheader.hpp"
#include <string>
namespace psrdada_cpp
{
template<typename NumericT>
void SigprocHeader::header_write(char*& ptr, std::string const& name, NumericT val)
{
header_write(ptr,name);
std::memcpy(ptr,(char*)&val,sizeof(val));
ptr += sizeof(val);
}
} // namespace psrdada_cpp
#include "psrdada_cpp/transpose_client.hpp"
#include<boost/process.hpp>
{ //namespace psrdada_cpp
namespace bp = boost::process;
int main()
{
// Make a large dadabuffer
{
std::string key = "dada";
boost::filesystem::path p = "/home/krajwade/libpsrdada/bin/dada_db";
bp::child c0(p, "-k", key);
co.wait();
//Write Data into a dada buffer
{
std::vector<unsigned char> input_data(64*128*64*32*6);
std::string logname = "Logger";
Multilog log(logname);
DadaWriteClient writer0(string_to_key(key),log);
in = input_data.data();
auto stream = writer0.data_stream();
auto out = stream.next();
std::memcpy(out.ptr(),in,input_data.size());
out.used_bytes(input_data.size());
stream.release();
writer0::~DadaWriteClient();
std::string* keys[];
*(keys) = "dadc";
*(keys + 1) = "dade"
*(keys + 2) = "dae0"
*(keys + 3) = "dae2"
*(keys + 4) = "dae4"
*(keys + 5) = "dae6"
// Running the psrdada to create 6 buffers
bp::child c1(p, "-k", *keys[0]);
bp::child c2(p, "-k", *keys[1]);
bp::child c3(p, "-k", *keys[2]);
bp::child c4(p, "-k", *keys[3]);
bp::child c5(p, "-k", *keys[4]);
bp::child c6(p, "-k", *keys[5]);
c1.wait();
c2.wait();
c3.wait();
c4.wait();
c5.wait();
c6.wait();
// Read to transpose
RawBytes& input;
RawBytes& transposed, transposed1,transposed2,transposed3,transposed4,transposed5;
DadaReadClient reader(,log);
DadaWriteClient* writer1[];
TransposeClient transpose(reader,writer1[],keys,6);
input = transpose.read_to_transpose();
// Do the transpose for all beams
transposed = transpose.do_transpose(input,0);
transposed1 = transpose.do_transpose(input,1);
transposed2 = transpose.do_transpose(input,2);
transposed3 = transpose.do_transpose(input,3);
transposed4 = transpose.do_transpose(input,4);
transposed5 = transpose.do_transpose(input,5);
// Writing out each beam to 6 dada buffers as 6 threads
std::thread t1(transpose.write_transpose(),std::ref(transposed),std::ref(*writer1[0]));
std::thread t2(transpose.write_transpose(),std::ref(transposed1),std::ref(*writer1[1]));
std::thread t3(transpose.write_transpose(),std::ref(transposed2),std::ref(*writer1[2]));
std::thread t4(transpose.write_transpose(),std::ref(transposed3),std::ref(*writer1[3]));
std::thread t5(transpose.write_transpose(),std::ref(transposed4),std::ref(*writer1[4]));
std::thread t6(transpose.write_transpose(),std::ref(transposed5),std::ref(*writer1[5]));
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
}
// Destroy the buffers
}
}
}
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/dada_output_stream.hpp"
#include "psrdada_cpp/dada_sync_source.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "boost/program_options.hpp"
#include <sys/types.h>
#include <iostream>
#include <string>
#include <sstream>
#include <ios>
#include <algorithm>
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
template<class HandlerType>
class FakeBeams
{
public:
FakeBeams(HandlerType& handler, std::size_t size);
~FakeBeams();
void init(RawBytes& block);
bool operator()(RawBytes& block);
void beam_num_start(std::uint8_t beam_num_start);
void beam_num_end(std::uint8_t beam_num_end);
private:
HandlerType& _handler;
std::uint8_t _beam_num;
std::uint8_t _beam_num_start;
std::uint8_t _beam_num_end;
std::size_t _size;
};
template<class HandlerType>
FakeBeams<HandlerType>::FakeBeams(HandlerType& handler, std::size_t size):
_handler(handler),
_beam_num(1),
_beam_num_start(1),
_beam_num_end(6),
_size(size)
{
}
template<class HandlerType>
FakeBeams<HandlerType>::~FakeBeams()
{
}
template<class HandlerType>
void FakeBeams<HandlerType>::init(RawBytes& block)
{
_handler.init(block);
}
template<class HandlerType>
void FakeBeams<HandlerType>::beam_num_start(std::uint8_t beam_num_start)
{
_beam_num_start = beam_num_start;
}
template<class HandlerType>
void FakeBeams<HandlerType>::beam_num_end(std::uint8_t beam_num_end)
{
_beam_num_end = beam_num_end;
}
template<class HandlerType>
bool FakeBeams<HandlerType>::operator()(RawBytes& block)
{
_beam_num = _beam_num_start;
char* ptr = block.ptr();
std::size_t bytes_written = 0 ;
while (bytes_written <= block.total_bytes())
{
std::vector<char> beam_data(_size,_beam_num);
std::copy(beam_data.begin(),beam_data.end(),ptr);
ptr += _size;
bytes_written = bytes_written + _size;
++_beam_num;
if (_beam_num > _beam_num_end)
_beam_num = _beam_num_start;
}
block.used_bytes(block.total_bytes());
_handler(block);
return false;
}
int main(int argc, char** argv)
{
try
{
std::size_t nbytes = 0;
key_t key;
std::time_t sync_epoch;
double period;
std::size_t ts_per_block;
std::uint8_t beam_num_start;
std::uint8_t beam_num_end;
std::size_t write_size;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("nbytes,n", po::value<std::size_t>(&nbytes)
->default_value(0),
"Total number of bytes to write")
("key,k", po::value<std::string>()
->default_value("dada")
->notifier([&key](std::string in)
{
key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("sync_epoch,s", po::value<std::size_t>()
->default_value(0)
->notifier([&sync_epoch](std::size_t in)
{
sync_epoch = static_cast<std::time_t>(in);
}),
"The global sync time for all producing instances")
("period,p", po::value<double>(&period)
->default_value(1.0),
"The period (in seconds) at which dada blocks are produced")
("ts_per_block,t", po::value<std::size_t>(&ts_per_block)
->default_value(8192*128),
"The increment in timestamp between consecutive blocks")
("beam_start,b", po::value<std::uint8_t>(&beam_num_start)
->default_value(1),
"Starting beam id for the heap")
("beam_end,e", po::value<std::uint8_t>(&beam_num_end)
->default_value(1),
"Last beam id for the heap")
("write_size, w", po::value<std::size_t>(&write_size)
->default_value(0),
"bytes to write per cycle. Should be equal to the heap size.")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "SyncDB -- write 1 into a DADA ring buffer at a synchronised and fixed rate" << std::endl
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch(po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("syncdb");
DadaOutputStream out_stream(key, log);
FakeBeams<decltype(out_stream)> fakebeams(out_stream, write_size);
fakebeams.beam_num_start(beam_num_start);
fakebeams.beam_num_end(beam_num_end);
sync_source<decltype(fakebeams)>(
fakebeams, out_stream.client().header_buffer_size(),
out_stream.client().data_buffer_size(), nbytes,
sync_epoch, period, ts_per_block);
/**
* End of application code
*/
}
catch(std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
......@@ -28,6 +28,7 @@ int main(int argc, char** argv)
{
std::size_t nbytes = 0;
key_t key;
std::string header_file;
/** Define and parse the program options
*/
namespace po = boost::program_options;
......@@ -37,6 +38,8 @@ int main(int argc, char** argv)
("nbytes,n", po::value<std::size_t>(&nbytes)
->default_value(0),
"Total number of bytes to write")
("header_file,f", po::value<std::string>(&header_file),
"Header file to write header block")
("key,k", po::value<std::string>()
->default_value("dada")
->notifier([&key](std::string in)
......@@ -78,7 +81,7 @@ int main(int argc, char** argv)
DadaOutputStream out_stream(key, log);
junk_source<decltype(out_stream)>(
out_stream, out_stream.client().header_buffer_size(),
out_stream.client().data_buffer_size(), nbytes);
out_stream.client().data_buffer_size(), nbytes, header_file);
/**
* End of application code
......
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/dada_output_stream.hpp"
#include "psrdada_cpp/dada_sync_source.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "boost/program_options.hpp"
#include <sys/types.h>
#include <iostream>
#include <string>
#include <sstream>
#include <ios>
#include <algorithm>
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
std::size_t nbytes = 0;
key_t key;
std::time_t sync_epoch;
double period;
std::size_t ts_per_block;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("nbytes,n", po::value<std::size_t>(&nbytes)
->default_value(0),
"Total number of bytes to write")
("key,k", po::value<std::string>()
->default_value("dada")
->notifier([&key](std::string in)
{
key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("sync_epoch,s", po::value<std::size_t>()
->default_value(0)
->notifier([&sync_epoch](std::size_t in)
{
sync_epoch = static_cast<std::time_t>(in);
}),
"The global sync time for all producing instances")
("period,p", po::value<double>(&period)
->default_value(1.0),
"The period (in seconds) at which dada blocks are produced")
("ts_per_block,t", po::value<std::size_t>(&ts_per_block)
->default_value(8192*128),
"The increment in timestamp between consecutive blocks")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "SyncDB -- write 1 into a DADA ring buffer at a synchronised and fixed rate" << std::endl
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch(po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("syncdb");
DadaOutputStream out_stream(key, log);
sync_source<decltype(out_stream)>(
out_stream, out_stream.client().header_buffer_size(),
out_stream.client().data_buffer_size(), nbytes,
sync_epoch, period, ts_per_block);
/**
* End of application code
*/
}
catch(std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
#add_subdirectory(tools)
add_subdirectory(fbfuse)
add_subdirectory(tuse)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment