Commit 80be4762 authored by Jason Wu's avatar Jason Wu
Browse files

moved file directory

parent c33feae7
Pipeline #92141 failed with stages
in 2 minutes and 8 seconds
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_disk_sink_leap.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "boost/program_options.hpp"
#include <sys/types.h>
#include <iostream>
#include <string>
#include <sstream>
#include <ios>
#include <algorithm>
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t key;
std::string prefix;
std::size_t nchan;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("help,h", "Print help messages")
("key,k", po::value<std::string>()
->default_value("dada")
->notifier([&key](std::string in)
{
key = string_to_key(in);
}),
"The shared memory key for the dada buffer to connect to (hex string)")
("prefix,p", po::value<std::string>(&prefix)
->default_value("dbdisk_dump"),
"Prefix for the filename to write to")
("nchan,n", po::value<std::size_t>(&nchan)
->default_value(8),
"number of channels to split")
("log_level", po::value<std::string>()
->default_value("info")
->notifier([](std::string level)
{
set_log_level(level);
}),
"The logging level to use (debug, info, warning, error)");
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "DbDiskLeap -- read from DADA ring buffer and write to disk in LEAP spec" << std::endl
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch(po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("dbdixk");
DiskSinkLeap sink(prefix, nchan);
DadaInputStream<decltype(sink)> stream(key, log, sink);
stream.start();
/**
* End of application code
*/
}
catch(std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
#include "psrdada_cpp/dada_disk_sink_leap.hpp"
#include "ascii_header.h"
#include <chrono>
#include <iostream>
#include <iomanip>
using namespace std;
using namespace std::chrono;
namespace psrdada_cpp{
DiskSinkLeap::DiskSinkLeap(std::string prefix, std::size_t nchan)
: _prefix(prefix)
, _counter(0)
, _output_streams(nchan)
, _nchan(nchan)
{
}
DiskSinkLeap::~DiskSinkLeap()
{
}
void DiskSinkLeap::init(RawBytes& block)
{
for (auto& of:_output_streams){
if (of.is_open()){
of.close();
}
}
std::memcpy(&_header, block.ptr(), block.used_bytes());
ascii_header_set(_header, "NCHAN", "%s", "1");
ascii_header_set(_header, "BW", "%s", "16");
ascii_header_get(_header, "UTC_START", "%s", _start_time);
//std::size_t _sample_clock_start_time = 0;
//ascii_header_get(_header, "SAMPLE_CLOCK_START", "%s", _sample_clock_start_time);
//cout <<"SAMPLE_CLOCK_START "<<_sample_clock_start_time<< endl;
cout << "UTC_START = " <<_start_time<< endl;
}
bool DiskSinkLeap::operator()(RawBytes& block)
{
for (auto& of:_output_streams){
if (of.is_open()){
of.close();
}
}
std::size_t heap_size = 32000;
std::size_t nchan = _nchan;
int fstart = 1340;
std::size_t nheap_groups = block.used_bytes()/heap_size/nchan;
_transpose.resize(block.used_bytes());
// auto start_ical = high_resolution_clock::now();
#pragma omp parallel for num_threads(nchan)
//#pragma omp parallel for num_threads(8)
for (std::size_t ii = 0; ii < nchan; ++ii){
#pragma omp parallel for num_threads(nchan)
for (std::size_t jj = 0; jj < nheap_groups; ++jj){
std::size_t in_index = ii * heap_size + jj * heap_size * nchan; //TFTP
std::size_t out_index = ii * heap_size * nheap_groups + jj * heap_size; //FTTP
std::memcpy(&_transpose[out_index], block.ptr() + in_index, heap_size);
}
}
// auto stop_ical = high_resolution_clock::now();
// auto duration_ical = duration_cast<microseconds>(stop_ical - start_ical);
// cout << "INDEX CALCULATION TOOK " <<duration_ical.count() << endl;
// auto start = high_resolution_clock::now();
#pragma omp parallel for num_threads(nchan)
for (std::size_t ii = 0; ii < nchan; ++ii){
std::size_t index = ii * heap_size * nheap_groups;
char _loop_header[4096];
std::memcpy(&_loop_header, &_header, 4096);
ascii_header_set(_loop_header, "FREQ", "%d", fstart + 16 * ii);
ascii_header_set(_loop_header, "OBS_OFFSET", "%ld", _counter);
ascii_header_set(_loop_header, "FILE_SIZE", "%ld", 640000000);
std::stringstream fname;
fname << _start_time << "_"<< fstart + 16 * ii << "_" << std::setw(20) << std::setfill('0') << _counter << ".dada";
//std::size_t freq = fstart + 16 * ii;
cout << fname.str() << endl;
_output_streams[ii].open(fname.str().c_str(), std::ios::out | std::ios::app | std::ios::binary);
//ascii_header_set(_header, "FREQ", "%d", freq);
_output_streams[ii].write((char*) _loop_header, 4096);
_output_streams[ii].write(&_transpose[index], heap_size * nheap_groups);
}
_counter += heap_size * nheap_groups;
//cout << "COUNTER " <<_counter << endl;
//cout << "heap_size " <<heap_size << endl;
//cout << "nheap_groups " <<nheap_groups << endl;
return false;
}
} //namespace psrdada_cpp
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment