Commit e852bb6d authored by Jason Wu's avatar Jason Wu
Browse files

clean up and moved dbdiskleap to effelsberg/edd

parent ad85223c
Pipeline #92179 failed with stages
in 8 seconds
......@@ -9,7 +9,6 @@ set(psrdada_cpp_src
src/dada_client_base.cpp
src/dada_db.cpp
src/dada_disk_sink.cpp
src/dada_disk_sink_leap.cpp
src/dada_null_sink.cpp
src/dada_output_stream.cpp
src/dada_read_client.cpp
......@@ -29,7 +28,6 @@ set(psrdada_cpp_inc
dada_client_base.hpp
dada_db.hpp
dada_disk_sink.hpp
dada_disk_sink_leap.hpp
dada_input_stream.hpp
dada_junk_source.hpp
dada_null_sink.hpp
......@@ -78,15 +76,10 @@ add_executable(dbreset examples/dbreset.cpp)
target_link_libraries (dbreset ${PSRDADA_CPP_LIBRARIES})
#dbdisk
add_executable(dbdisk examples/dbdisk.cpp)
target_link_libraries (dbdisk ${PSRDADA_CPP_LIBRARIES})
#add_executable(dbdisk examples/dbdisk.cpp)
#target_link_libraries (dbdisk ${PSRDADA_CPP_LIBRARIES})
#dbdiskleap
add_executable(dbdiskleap dbdisk_leap.cpp)
target_link_libraries (dbdiskleap ${PSRDADA_CPP_LIBRARIES})
install (TARGETS junkdb dbnull syncdb dbreset fbfuse_output_db dbdisk dbdiskleap DESTINATION bin)
install (TARGETS junkdb dbnull syncdb dbreset fbfuse_output_db dbdisk DESTINATION bin)
install (TARGETS ${CMAKE_PROJECT_NAME}
RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
......
#ifndef PSRDADA_CPP_DADA_DISK_SINK_LEAP_HPP
#define PSRDADA_CPP_DADA_DISK_SINK_LEAP_HPP
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include <fstream>
#include <vector>
namespace psrdada_cpp{
class DiskSinkLeap
{
public:
DiskSinkLeap(std::string prefix, std::size_t nchan);
~DiskSinkLeap();
void init(RawBytes& block);
bool operator()(RawBytes& block);
public:
std::string _prefix;
std::size_t _counter;
std::size_t _nchan;
//RawBytes& _header;
//char _buffer[1024];
char _header[4096];
char _start_time[1024];
bool first_block;
std::vector<char> _transpose;
std::vector<std::ofstream> _output_streams;
};
} //namespace psrdada_cpp
#endif //PSRDADA_CPP__EFFELSBERG_EDD_DADA_DISK_SINK_LEAP_HPP
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_disk_sink_leap.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "boost/program_options.hpp"
#include <sys/types.h>
#include <iostream>
#include <string>
#include <sstream>
#include <ios>
#include <algorithm>
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t key;
std::string prefix;
std::size_t nchan;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("key,k", po::value<std::string>()
->default_value("dada")
->notifier([&key](std::string in)
{
key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("prefix,p", po::value<std::string>(&prefix)
->default_value("dbdisk_dump"),
"Prefix for the filename to write to")
("nchan,n", po::value<std::size_t>(&nchan)
->default_value(8),
"number of channels to split")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "DbDiskLeap -- read from DADA ring buffer and write to disk in LEAP spec" << std::endl
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch(po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("dbdixk");
DiskSinkLeap sink(prefix, nchan);
DadaInputStream<decltype(sink)> stream(key, log, sink);
stream.start();
/**
* End of application code
*/
}
catch(std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
# OpenMP (optional for shared memory multiprocessing)
# OpenMP (optional for shared memory multiprocessing)
if(ENABLE_CUDA)
......@@ -17,6 +17,7 @@ set(psrdada_cpp_effelsberg_edd_src
src/EDDRoach.cpp
src/EDDRoach_merge.cpp
src/EDDRoach_merge_leap.cpp
src/dada_disk_sink_leap.cpp
src/ScaledTransposeTFtoTFT.cu
src/SKRfiReplacementCuda.cu
src/SpectralKurtosisCuda.cu
......@@ -33,7 +34,8 @@ set(psrdada_cpp_effelsberg_edd_inc
EDDPolnMerge10to8.hpp
EDDRoach.hpp
EDDRoach_merge.hpp
EDDRoach_merge_leap.hpp
EDDRoach_merge_leap.hpp
dada_disk_sink_leap.hpp
FftSpectrometer.cuh
GatedSpectrometer.cuh
Packer.cuh
......@@ -52,8 +54,6 @@ install (TARGETS ${CMAKE_PROJECT_NAME}_effelsberg_edd
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
#simple FFT spectrometer interface
cuda_add_executable(fft_spectrometer src/fft_spectrometer_cli.cu)
target_link_libraries(fft_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
......@@ -96,6 +96,10 @@ add_executable(edd_roach_merge_leap src/EDDRoach_merge_leap_cli.cpp)
target_link_libraries(edd_roach_merge_leap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS edd_roach_merge_leap DESTINATION bin)
#dbdiskleap
add_executable(dbdiskleap dbdisk_leap.cpp)
target_link_libraries (dbdiskleap ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES})
install(TARGETS dbdiskleap DESTINATION bin)
add_subdirectory(test)
endif(ENABLE_CUDA)
......@@ -3,35 +3,31 @@
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include <fstream>
#include <vector>
namespace psrdada_cpp{
namespace effelsberg {
namespace edd {
{
class DiskSinkLeap
{
public:
DiskSinkLeap(std::string prefix, DadaWriteClient& writer);
DiskSinkLeap(std::string prefix, std::size_t nchan);
~DiskSinkLeap();
void init(RawBytes&);
bool operator()(RawBytes&);
void init(RawBytes& block);
bool operator()(RawBytes& block);
public:
std::string _prefix;
std::size_t _counter;
DadaWriteClient& _writer;
std::ofstream _current_file;
std::ofstream _current_file_1;
std::ofstream _current_file_2;
std::ofstream _current_file_3;
std::ofstream _current_file_4;
std::ofstream _current_file_5;
std::ofstream _current_file_6;
std::ofstream _current_file_7;
std::size_t _nchan;
char _header[4096];
char _start_time[1024];
bool first_block;
std::vector<char> _transpose;
std::vector<std::ofstream> _output_streams;
};
} // edd
} // effelsberg
} // effelsberg
} //namespace psrdada_cpp
#endif //PSRDADA_CPP__EFFELSBERG_EDD_DADA_DISK_SINK_LEAP_HPP
\ No newline at end of file
#endif //PSRDADA_CPP__EFFELSBERG_EDD_DADA_DISK_SINK_LEAP_HPP
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_disk_sink.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "boost/program_options.hpp"
#include <sys/types.h>
#include <iostream>
#include <string>
#include <sstream>
#include <ios>
#include <algorithm>
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t key;
std::string prefix;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("key,k", po::value<std::string>()
->default_value("dada")
->notifier([&key](std::string in)
{
key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("prefix,p", po::value<std::string>(&prefix)
->default_value("dbdisk_dump"),
"Prefix for the filename to write to")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "DbDiskLeap -- read from DADA ring buffer and write to disk in LEAP spec" << std::endl
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch(po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("dbdixk");
DadaWriteClient output(key, log);
DiskSinkLeap sink(prefix, output);
DadaInputStream<decltype(sink)> stream(key, log, sink);
stream.start();
/**
* End of application code
*/
}
catch(std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
\ No newline at end of file
......@@ -37,54 +37,41 @@ namespace edd {
}
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
char buffer[1024];
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL);
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start "<< sample_clock_start;
BOOST_LOG_TRIVIAL(debug)<< "this is sample_clock "<< sample_clock;
BOOST_LOG_TRIVIAL(debug) << "this is sync_time "<< sync_time;
BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start / sample_clock "<< sample_clock_start / sample_clock;
long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = unix_time / 86400 - 40587.5;
char time_buffer[80];
std::time_t unix_time_int;
struct std::tm * timeinfo;
double fractpart, intpart;
fractpart = std::modf (static_cast<double>(unix_time) , &intpart);
unix_time_int = static_cast<std::time_t>(intpart);
timeinfo = std::gmtime (&unix_time_int);
std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo);
std::stringstream utc_time_stamp;
BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time;
BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart;
//BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
//utc_time_stamp<< time_buffer << "." <<fractpart;
utc_time_stamp<< time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart*10000000000) << std::setfill(' ');
//BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
//utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
BOOST_LOG_TRIVIAL(debug) << "this is start time in utc "<< utc_time_stamp.str().c_str()<< "\n";
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
}
ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
long double sample_clock = std::strtold(buffer, NULL);
ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
long double sync_time = std::strtold(buffer, NULL);
BOOST_LOG_TRIVIAL(debug) << "Sample_clock_start "<< sample_clock_start;
BOOST_LOG_TRIVIAL(debug)<< "Sample_clock "<< sample_clock;
BOOST_LOG_TRIVIAL(debug) << "Sync_time "<< sync_time;
BOOST_LOG_TRIVIAL(debug) << "Sample_clock_start / sample_clock "<< sample_clock_start / sample_clock;
long double unix_time = sync_time + (sample_clock_start / sample_clock);
long double mjd_time = unix_time / 86400 - 40587.5;
char time_buffer[80];
std::time_t unix_time_int;
struct std::tm * timeinfo;
double fractpart, intpart;
fractpart = std::modf (static_cast<double>(unix_time) , &intpart);
unix_time_int = static_cast<std::time_t>(intpart);
timeinfo = std::gmtime (&unix_time_int);
std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo);
std::stringstream utc_time_stamp;
BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time;
BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart;
utc_time_stamp<< time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart*10000000000) << std::setfill(' ');
BOOST_LOG_TRIVIAL(debug) << "this is start time in utc "<< utc_time_stamp.str().c_str()<< "\n";
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str());
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
}
bool EDDPolnMerge::operator()(RawBytes& block)
{
std:size_t nheap_groups = block.used_bytes()/_npol/_nsamps_per_heap;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
RawBytes& oblock = _writer.data_stream().next();
if (block.used_bytes() > oblock.total_bytes())
......@@ -92,10 +79,9 @@ namespace edd {
_writer.data_stream().release();
throw std::runtime_error("Output DADA buffer does not match with the input dada buffer");
}
uint32_t* S0 = reinterpret_cast<uint32_t*>(block.ptr());
uint32_t* S0 = reinterpret_cast<uint32_t*>(block.ptr());
uint32_t* S1 = reinterpret_cast<uint32_t*>(block.ptr() + _nsamps_per_heap);
uint64_t* D = reinterpret_cast<uint64_t*>(oblock.ptr());
uint64_t* D = reinterpret_cast<uint64_t*>(oblock.ptr());
for (std::size_t jj = 0; jj < nheap_groups; ++jj)
{
......@@ -103,9 +89,9 @@ namespace edd {
{
*D++ = interleave(*S1++, *S0++);
}
S0 += _nsamps_per_heap/sizeof(uint32_t);
S1 += _nsamps_per_heap/sizeof(uint32_t);
}
S0 += _nsamps_per_heap/sizeof(uint32_t);
S1 += _nsamps_per_heap/sizeof(uint32_t);
}
oblock.used_bytes(block.used_bytes());
_writer.data_stream().release();
......
......@@ -135,7 +135,7 @@ void handle_packet_numbers_4096x10_s(char const *buf, char *out)
ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
oblock.used_bytes(oblock.total_bytes());
_writer.header_stream().release();
BOOST_LOG_TRIVIAL(info) << "Output header released"<< "\n";
BOOST_LOG_TRIVIAL(info) << "Output header released"<< "\n";
}
bool EDDPolnMerge10to8::operator()(RawBytes& block)
......@@ -153,9 +153,9 @@ void handle_packet_numbers_4096x10_s(char const *buf, char *out)
{
char *buffer_pol0 = block.ptr() + 5120 * kk * 2;
handle_packet_numbers_4096x10_s(buffer_pol0, oblock.ptr() + kk* 2 * 4096);
handle_packet_numbers_4096x10_s(buffer_pol0, oblock.ptr() + kk* 2 * 4096);
char *buffer_pol1 = block.ptr() + (5120 * kk * 2)+ 5120;
handle_packet_numbers_4096x10_s(buffer_pol1, oblock.ptr() + kk* 2 * 4096 + 4096);
handle_packet_numbers_4096x10_s(buffer_pol1, oblock.ptr() + kk* 2 * 4096 + 4096);
}
oblock.used_bytes(block.used_bytes()*0.8);
......
#include "psrdada_cpp/effelsberg/edd/dada_disk_sink_leap.hpp"
#include "ascii_header.h"
#include <chrono>
#include <iostream>
#include <iomanip>
using namespace std;
using namespace std::chrono;
namespace psrdada_cpp{
namespace effelsberg {
namespace edd {
DiskSinkLeap::DiskSinkLeap(std::string prefix, DadaWriteClient& writer)
DiskSinkLeap::DiskSinkLeap(std::string prefix, std::size_t nchan)
: _prefix(prefix)
, _counter(0)
, _writer(writer)
, _output_streams(nchan)
, _nchan(nchan)
{
}
}
DiskSinkLeap::~DiskSinkLeap()
{
}
void DiskSinkLeap::init(RawBytes& block)
{
if (_current_file.is_open())
{
_current_file.close();
}
if (_current_file_1.is_open())
{
_current_file_1.close();
}
if (_current_file_2.is_open())
{
_current_file_2.close();
}
if (_current_file_3.is_open())
{
_current_file_3.close();
}
if (_current_file_4.is_open())
{
_current_file_4.close();
}
if (_current_file_5.is_open())
{
_current_file_5.close();
}
if (_current_file_6.is_open())
{
_current_file_6.close();
}
if (_current_file_7.is_open())
{
_current_file_7.close();
}
std::stringstream fname;
fname << _prefix << "_1340_" << _counter << ".dada";
std::stringstream fname_1;
fname_1 << _prefix << "_1356_" << _counter << ".dada";
std::stringstream fname_2;
fname_2 << _prefix << "_1372_" << _counter << ".dada";
std::stringstream fname_3;
fname_3 << _prefix << "_1388_" << _counter << ".dada";
std::stringstream fname_4;
fname_4 << _prefix << "_1404_" << _counter << ".dada";
std::stringstream fname_5;
fname_5 << _prefix << "_1420_" << _counter << ".dada";
std::stringstream fname_6;
fname_6 << _prefix << "_1436_" << _counter << ".dada";
std::stringstream fname_7;
fname_7 << _prefix << "_1452_" << _counter << ".dada";
_current_file.open(fname.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_current_file_1.open(fname_1.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_current_file_2.open(fname_2.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_current_file_3.open(fname_3.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_current_file_4.open(fname_4.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_current_file_5.open(fname_5.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_current_file_6.open(fname_6.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
_current_file_7.open(fname_7.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
size_t heap_size = 32768;
size_t nchan = 8;
size_t nheap_groups = block.used_bytes()/heap_size/nchan;
RawBytes& oblock = _writer.header_stream().next();
if (block.used_bytes() > oblock.total_bytes())
{
_writer.header_stream().release();
throw std::runtime_error("Output DADA buffer does not have enough space for header");
}
std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
ascii_header_set(oblock.ptr(), "NCHAN", "%s", "1");
ascii_header_set(oblock.ptr(), "BW", "%s", "16");
ascii_header_set(oblock.ptr(), "FREQ", "%s", "1340");
_current_file.write((char*) oblock.ptr(), block.used_bytes());
ascii_header_set(oblock.ptr(), "FREQ", "%s", "1356");
_current_file_1.write((char*) oblock.ptr(), block.used_bytes());
ascii_header_set(oblock.ptr(), "FREQ", "%s", "1372");
_current_file_2.write((char*) oblock.ptr(), block.used_bytes());
ascii_header_set(oblock.ptr(), "FREQ", "%s", "1388");
_current_file_3.write((char*) oblock.ptr(), block.used_bytes());