Skip to content
Snippets Groups Projects
Commit 4cecf1b8 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Merged + Fixed unused return value error

parents 4252c7a9 875c4977
Branches
No related tags found
No related merge requests found
Showing
with 1356 additions and 47 deletions
......@@ -34,4 +34,4 @@ ENDIF(NOT PSRDADA_FOUND)
LIST(APPEND PSRDADA_INCLUDE_DIR "${PSRDADA_INCLUDE_DIR}")
MARK_AS_ADVANCED(PSRDADA_LIBRARIES PSRDADA_TEST_LIBRARIES PSRDADA_INCLUDE_DIR)
\ No newline at end of file
MARK_AS_ADVANCED(PSRDADA_LIBRARIES PSRDADA_TEST_LIBRARIES PSRDADA_INCLUDE_DIR)
......@@ -19,6 +19,7 @@ set(psrdada_cpp_src
src/simple_file_writer.cpp
src/sigprocheader.cpp
src/psrdadaheader.cpp
src/file_to_dada_cli.cpp
)
set(psrdada_cpp_inc
......@@ -38,6 +39,7 @@ set(psrdada_cpp_inc
sigprocheader.hpp
psrdadaheader.hpp
psrdada_to_sigproc_header.hpp
file_input_stream.hpp
)
# -- the main library target
......@@ -55,6 +57,11 @@ target_link_libraries (syncdb ${PSRDADA_CPP_LIBRARIES})
add_executable(fbfuse_output_db examples/fbfuse_output_db.cpp)
target_link_libraries (fbfuse_output_db ${PSRDADA_CPP_LIBRARIES})
#file_to_dada_cli
add_executable(file_to_dada src/file_to_dada_cli.cpp)
target_link_libraries (file_to_dada ${PSRDADA_CPP_LIBRARIES})
#dbnull
add_executable(dbnull examples/dbnull.cpp)
target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES})
......
......@@ -23,9 +23,9 @@ namespace psrdada_cpp
RawBytes header(junk_header.data(),header_size,header_size,false);
handler.init(header);
std::size_t bytes_written = 0;
std::vector<char> junk_block(total_bytes,0);
std::vector<char> junk_block(nbytes_per_write,0);
srand(0);
for (std::uint32_t ii=0; ii < total_bytes; ++ii)
for (std::uint32_t ii=0; ii < nbytes_per_write; ++ii)
{
junk_block[ii] = rand()%255 + 1;
}
......
#include "psrdada_cpp/file_input_stream.hpp"
namespace psrdada_cpp {
template <class HandlerType>
FileInputStream<HandlerType>::FileInputStream(std::string fileName, std::size_t headersize, std::size_t nbytes, HandlerType& handler)
: _headersize(headersize)
, _nbytes(nbytes)
, _handler(handler)
, _stop(false)
, _running(false)
{
const char *filename = fileName.c_str();
_filestream.exceptions(std::ifstream::failbit | std::ifstream::badbit);
_filestream.open(filename, std::ifstream::in | std::ifstream::binary);
if (!_filestream.is_open())
{
throw std::runtime_error("File could not be opened");
}
}
template <class HandlerType>
FileInputStream<HandlerType>::~FileInputStream()
{
_filestream.close();
}
template <class HandlerType>
void FileInputStream<HandlerType>::start()
{
if (_running)
{
throw std::runtime_error("Stream is already running");
}
_running = true;
// Get the header
char* header_ptr = new char[4096];
char* data_ptr = new char[_nbytes];
RawBytes header_block(header_ptr, 4096, 0, false);
_filestream.read(header_ptr, _headersize);
header_block.used_bytes(4096);
_handler.init(header_block);
// Continue to stream data until the end
while (!_stop)
{
BOOST_LOG_TRIVIAL(info) << "Reading data from the file";
// Read data from file here
RawBytes data_block(data_ptr, _nbytes, 0, false);
while (!_stop)
{
if (_filestream.eof())
{
BOOST_LOG_TRIVIAL(info) << "Reached end of file";
_filestream.close();
break;
}
_filestream.read(data_ptr, _nbytes);
data_block.used_bytes(data_block.total_bytes());
_handler(data_block);
}
data_block.~RawBytes();
}
_running = false;
}
template <class HandlerType>
void FileInputStream<HandlerType>::stop()
{
_stop = true;
}
} //psrdada_cpp
#ifndef PSRDADA_CPP_FILE_INPUT_STREAM_HPP
#define PSRDADA_CPP_FILE_INPUT_STREAM_HPP
#include <fstream>
#include <cstdlib>
#include "psrdada_cpp/raw_bytes.hpp"
/**
* @detail: A simple file input stream. Will go through one entire file.
* Will assume there is some amount of header to the file.
*/
namespace psrdada_cpp
{
template <class HandlerType>
class FileInputStream
{
public:
FileInputStream(std::string filename, std::size_t headersize, std::size_t nbytes, HandlerType& handler);
~FileInputStream();
void start();
void stop();
private:
std::size_t _headersize;
std::size_t _nbytes;
std::ifstream _filestream;
HandlerType& _handler;
bool _stop;
bool _running;
};
} //psrdada_cpp
#include "psrdada_cpp/detail/file_input_stream.cpp"
#endif //PSRDADA_CPP_FILE_INPUT_STREAM_HPP
......@@ -20,6 +20,11 @@ target_link_libraries (transpose_to_file_cli ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIE
#transpose_to_null_cli
add_executable(transpose_to_null_cli src/transpose_to_null_cli.cpp)
target_link_libraries (transpose_to_null_cli ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES})
#dada_dbevent
add_executable(dada_dbevent src/dada_dbevent.cpp)
target_link_libraries (dada_dbevent ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES})
install (TARGETS transpose_to_null_cli transpose_to_file_cli transpose_to_dada_cli DESTINATION bin)
install(FILES transpose_to_dada.hpp DESTINATION include/psrdada_cpp/meerkat/tuse)
add_subdirectory(test)
/***************************************************************************
*
* Copyright (C) 2012 by Andrew Jameson
* Licensed under the Academic Free License version 2.1
*
****************************************************************************/
/*
* Attaches to in input data block as a viewer, and opens a socket to listen
* for requests to write temporal events to the output data block. Can seek
* back in time over cleared data blocks
*/
#include "ascii_header.h"
#include "dada_hdu.h"
#include "dada_def.h"
#include "node_array.h"
#include "multilog.h"
#include "diff_time.h"
#include "sock.h"
#include "tmutil.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <math.h>
using namespace std;
const int DADA_DBEVENT_DEFAULT_PORT= 30000;
const int DADA_DBEVENT_DEFAULT_INPUT_BUFFER = 80;
const int DADA_DBEVENT_DEFAULT_INPUT_DELAY= 60;
#define DADA_DBEVENT_TIMESTR "%Y-%m-%d-%H:%M:%S"
int quit = 0;
typedef struct {
// input HDU
dada_hdu_t * in_hdu;
// output HDU
dada_hdu_t * out_hdu;
// multilog
multilog_t * log;
// input data block's UTC_START
time_t utc_start;
// input data block's BYTES_PER_SECOND
uint64_t bytes_per_second;
time_t input_maximum_delay;
char * header;
size_t header_size;
void * work_buffer;
void * event_buffer;
size_t event_buffer_size;
size_t work_buffer_size;
uint64_t in_nbufs;
uint64_t in_bufsz;
uint64_t resolution;
// verbosity
int verbose;
} dada_dbevent_t;
typedef struct {
uint64_t start_byte;
uint64_t end_byte;
float snr;
float dm;
float width;
unsigned beam;
float tsamp;
uint32_t nchans;
float f1;
float f2;
} event_t;
#define DADA_DBEVENT_INIT { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }
static int sort_events (const void *p1, const void *p2)
{
event_t A = *(event_t *) p1;
event_t B = *(event_t *) p2;
if (A.start_byte < B.start_byte) return -1;
if (A.start_byte > B.start_byte) return +1;
return 0;
}
int check_read_offset (dada_dbevent_t * dbevent);
//int check_write_timestamps (dada_dbevent_t * dbevent);
int64_t calculate_byte_offset (dada_dbevent_t * dbevent, char * time_str_secs, char * time_str_frac);
int receive_events (dada_dbevent_t * dbevent, int listen_fd);
int dump_event(dada_dbevent_t * dbevent, double event_start_utc, double event_end_utc, float event_snr, float event_dm);
void usage();
void usage()
{
fprintf (stdout,
"dada_dbevent [options] inkey outkey\n"
" inkey input hexadecimal shared memory key\n"
" outkey input hexadecimal shared memory key\n"
" -b percent delay procesing of the input buffer up to this amount [default %d %%]\n"
" -t delay maximum delay (s) to retain data for [default %ds]\n"
" -h print this help text\n"
" -p port port to listen for event commands [default %d]\n"
" -v be verbose\n",
DADA_DBEVENT_DEFAULT_INPUT_BUFFER,
DADA_DBEVENT_DEFAULT_INPUT_DELAY,
DADA_DBEVENT_DEFAULT_PORT);
}
void signal_handler(int signalValue)
{
fprintf(stderr, "dada_dbevent: SIGINT/TERM\n");
fprintf(stderr,"value of signal interrupt is: %d\n", signalValue);
quit = 1;
}
double dm_delay(float DM, double freq1, double freq2)
{
double freqghz1 = freq1/1e9;
double freqghz2 = freq2/1e9;
double delta_t = 4.15 * 1e-3 * DM * ((1/pow(freqghz1,2)) - 1/(pow(freqghz2,2))); // in seconds
return delta_t;
}
int calcbytes(double seconds, int nchans, double tsamp, int bytespersamp)
{
return (int) bytespersamp*(seconds/tsamp) * nchans;
}
int main (int argc, char **argv)
{
// core dbevent data struct
dada_dbevent_t dbevent = DADA_DBEVENT_INIT;
// DADA Logger
multilog_t* log = 0;
// flag set in verbose mode
char verbose = 0;
// port to listen for event requests
int port = DADA_DBEVENT_DEFAULT_PORT;
// input hexadecimal shared memory key
key_t in_dada_key;
// output hexadecimal shared memory key
key_t out_dada_key;
float input_data_block_threshold = DADA_DBEVENT_DEFAULT_INPUT_BUFFER;
int input_maximum_delay = DADA_DBEVENT_DEFAULT_INPUT_DELAY;
int arg = 0;
while ((arg=getopt(argc,argv,"b:hp:t:v")) != -1)
{
switch (arg)
{
case 'b':
if (sscanf (optarg, "%f", &input_data_block_threshold) != 1)
{
fprintf (stderr, "dada_dbevent: could not parse input buffer level from %s\n", optarg);
return EXIT_FAILURE;
}
break;
case 'h':
usage();
return EXIT_SUCCESS;
case 'v':
verbose++;
break;
case 'p':
if (sscanf (optarg, "%d", &port) != 1)
{
fprintf (stderr, "dada_dbevent: could not parse port from %s\n", optarg);
return EXIT_FAILURE;
}
break;
case 't':
if (sscanf (optarg, "%d", &input_maximum_delay) != 1)
{
fprintf (stderr, "dada_dbevent: could not parse maximum input delay from %s\n", optarg);
return EXIT_FAILURE;
}
break;
default:
usage ();
return EXIT_SUCCESS;
}
}
if (argc - optind != 2)
{
fprintf (stderr, "dada_dbevent: expected 2 command line arguments\n");
usage();
return EXIT_FAILURE;
}
if (sscanf (argv[optind], "%x",(unsigned int*) &in_dada_key) != 1)
{
fprintf (stderr,"dada_dbevent: could not parse in_key from %s\n", argv[optind]);
return EXIT_FAILURE;
}
if (sscanf (argv[optind+1], "%x",(unsigned int*) &out_dada_key) != 1)
{
fprintf (stderr,"dada_dbevent: could not parse out_key from %s\n", argv[optind+1]);
return EXIT_FAILURE;
}
// install some signal handlers
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
log = multilog_open ("dada_dbevent", 0);
multilog_add (log, stderr);
dbevent.verbose = verbose;
dbevent.log = log;
dbevent.input_maximum_delay = (time_t) input_maximum_delay;
dbevent.work_buffer_size = 1024 * 1024;
dbevent.work_buffer = malloc (dbevent.work_buffer_size);
if (!dbevent.work_buffer)
{
multilog(log, LOG_INFO, "could not allocate memory for work buffer\n");
return EXIT_FAILURE;
}
if (verbose)
multilog(log, LOG_INFO, "connecting to data blocks\n");
dbevent.in_hdu = dada_hdu_create (log);
dada_hdu_set_key (dbevent.in_hdu, in_dada_key);
if (dada_hdu_connect (dbevent.in_hdu) < 0)
{
multilog(log, LOG_ERR, "could not connect to input HDU\n");
return EXIT_FAILURE;
}
if (dada_hdu_lock_read(dbevent.in_hdu) < 0)
{
multilog (log, LOG_ERR, "could not open input HDU as viewer\n");
return EXIT_FAILURE;
}
dbevent.out_hdu = dada_hdu_create (log);
dada_hdu_set_key (dbevent.out_hdu, out_dada_key);
if (dada_hdu_connect (dbevent.out_hdu) < 0)
{
multilog(log, LOG_ERR, "could not connect to output HDU\n");
return EXIT_FAILURE;
}
if (dada_hdu_lock_write (dbevent.out_hdu) < 0)
{
multilog (log, LOG_ERR, "could not open output HDU as writer\n");
return EXIT_FAILURE;
}
// open listening socket
if (verbose)
multilog (log, LOG_INFO, "main: sock_create(%d)\n", port);
int listen_fd = sock_create (&port);
if (listen_fd < 0)
{
multilog (log, LOG_ERR, "could not open socket: %s\n", strerror(errno));
quit = 2;
}
else
{
if (verbose)
multilog (log, LOG_INFO, "listening on port %d for dump requests\n", port);
}
fd_set fds;
struct timeval timeout;
int fds_read;
// now get the header from the input data block
if (verbose)
multilog(log, LOG_INFO, "waiting for input header\n");
if (dada_hdu_open (dbevent.in_hdu) < 0)
{
multilog (log, LOG_ERR, "could not get input header\n");
quit = 1;
}
else
{
if (verbose > 1)
{
fprintf (stderr, "==========\n");
fprintf (stderr, "%s", dbevent.in_hdu->header);
fprintf (stderr, "==========\n");
}
dbevent.header_size = ipcbuf_get_bufsz (dbevent.in_hdu->header_block);
dbevent.header = (char *) malloc (dbevent.header_size);
if (!dbevent.header)
{
multilog (log, LOG_ERR, "failed to allocate memory for header\n");
quit = 1;
}
else
{
// make a local copy of the header
memcpy (dbevent.header, dbevent.in_hdu->header, dbevent.header_size);
// immediately mark this header as cleared
ipcbuf_mark_cleared (dbevent.in_hdu->header_block);
char utc_buffer[64];
// get the UTC_START and TSAMP / BYTES_PER_SECOND for this observation
if (ascii_header_get (dbevent.header, "UTC_START", "%s", utc_buffer) < 0)
{
multilog (log, LOG_ERR, "could not extract UTC_START from input datablock header\n");
quit = 2;
}
else
{
if (verbose)
multilog(log, LOG_INFO, "input UTC_START=%s\n", utc_buffer);
dbevent.utc_start = str2utctime (utc_buffer);
if (dbevent.utc_start == (time_t)-1)
{
multilog (log, LOG_ERR, "could not parse UTC_START from '%s'\n", utc_buffer);
quit = 2;
}
}
if (ascii_header_get (dbevent.header, "BYTES_PER_SECOND", "%" PRIu64, &(dbevent.bytes_per_second)) < 0)
{
multilog (log, LOG_ERR, "could not extract BYTES_PER_SECOND from input datablock header\n");
quit = 2;
}
else
{
if (verbose)
multilog(log, LOG_INFO, "input BYTES_PER_SECOND=%" PRIu64"\n", dbevent.bytes_per_second);
}
if (ascii_header_get (dbevent.header, "RESOLUTION", "%" PRIu64, &(dbevent.resolution)) < 0)
dbevent.resolution = 1;
if (verbose)
multilog(log, LOG_INFO, "input RESOLUTION=%" PRIu64"\n", dbevent.resolution);
}
}
ipcbuf_t * db = (ipcbuf_t *) dbevent.in_hdu->data_block;
// get the number and size of buffers in the input data block
dbevent.in_nbufs = ipcbuf_get_nbufs (db);
dbevent.in_bufsz = ipcbuf_get_bufsz (db);
while (!quit)
{
// setup file descriptor set for listening
FD_ZERO(&fds);
FD_SET(listen_fd, &fds);
timeout.tv_sec = 0;
timeout.tv_usec = 1000000;
fds_read = select(listen_fd+1, &fds, (fd_set *) 0, (fd_set *) 0, &timeout);
// problem with select call
if (fds_read < 0)
{
multilog (log, LOG_ERR, "select failed: %s\n", strerror(errno));
quit = 2;
break;
}
// select timed out, check input HDU for end of data
else if (fds_read == 0)
{
if (verbose > 1)
multilog (log, LOG_INFO, "main: check_read_offset ()\n");
int64_t n_skipped = check_read_offset (&dbevent);
if (n_skipped < 0)
multilog (log, LOG_WARNING, "check_db_times failed\n");
if (verbose > 1)
multilog (log, LOG_INFO, "main: check_db_times skipped %" PRIi64" events\n", n_skipped);
}
// we received a new connection on our listening FD, process comand
else
{
if (verbose)
multilog (log, LOG_INFO, "main: receiving events on socket\n");
int events_recorded = receive_events (&dbevent, listen_fd);
if (events_recorded < 0)
{
multilog (log, LOG_INFO, "main: quit requested via socket\n");
quit = 1;
}
if (verbose)
multilog (log, LOG_INFO, "main: received %d events\n", events_recorded);
}
// check how full the input datablock is
float percent_full = ipcio_percent_full (dbevent.in_hdu->data_block) * 100;
if (verbose > 1)
multilog (log, LOG_INFO, "input datablock %5.2f percent full\n", percent_full);
int64_t read_offset, remainder, seek_byte, seeked_byte;
while (!quit && percent_full > input_data_block_threshold)
{
if (verbose)
multilog (log, LOG_INFO, "percent_full=%5.2f threshold=%5.2f\n", percent_full, input_data_block_threshold);
// since we are too full, seek forward 1 block
read_offset = ipcio_tell (dbevent.in_hdu->data_block);
// if the current read offset is a full block, make it so
remainder = read_offset % dbevent.in_bufsz;
if (remainder != 0)
seek_byte = (int64_t) dbevent.in_bufsz - remainder;
else
seek_byte = (int64_t) dbevent.in_bufsz;
if ((seek_byte < 0) || (seek_byte > (int) dbevent.in_bufsz))
multilog (log, LOG_WARNING, "main: seek_byte limits warning: %" PRIi64"\n", seek_byte);
if (verbose)
multilog (log, LOG_INFO, "main: ipcio_seek(%" PRIu64", SEEK_CUR)\n", seek_byte);
// seek forward (from curr pos) to the next full block boundary
seeked_byte = ipcio_seek (dbevent.in_hdu->data_block, seek_byte, SEEK_CUR);
if (seeked_byte < 0)
{
multilog (log, LOG_INFO, "main: ipcio_seek failed\n");
quit = 1;
}
// sleep a short space to allow writer to write!
usleep(10000);
// update the percentage full for the data block
percent_full = ipcio_percent_full (dbevent.in_hdu->data_block) * 100;
if (verbose)
multilog (log, LOG_INFO, "input datablock reduced to %5.2f percent full\n", percent_full);
}
if (ipcbuf_eod (db))
{
if (verbose)
multilog (log, LOG_INFO, "EOD now true\n");
quit = 1;
}
}
if (quit)
{
if (!ipcbuf_eod (db))
{
multilog (log, LOG_INFO, "quit requested, trying to clear all remaining data blocks\n");
uint64_t bytes_written = ipcbuf_get_write_byte_xfer ((ipcbuf_t *) dbevent.in_hdu->data_block);
multilog (log, LOG_INFO, "quit requested: total bytes written=%" PRIu64"\n", bytes_written);
int64_t bytes_seeked = ipcio_seek (dbevent.in_hdu->data_block, (int64_t) bytes_written, SEEK_SET);
multilog (log, LOG_INFO, "quit requested: seeked to byte %" PRIi64"\n", bytes_seeked);
}
}
free (dbevent.work_buffer);
if (dbevent.header)
free (dbevent.header);
dbevent.header = 0;
if (dada_hdu_disconnect (dbevent.in_hdu) < 0)
{
fprintf (stderr, "dada_dbevent: disconnect from input data block failed\n");
return EXIT_FAILURE;
}
if (dada_hdu_unlock_write (dbevent.out_hdu) < 0)
{
fprintf (stderr, "dada_dbevent: unlock write on output data block failed\n");
return EXIT_FAILURE;
}
if (dada_hdu_disconnect (dbevent.out_hdu) < 0)
{
fprintf (stderr, "dada_dbevent: disconnect from output data block failed\n");
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
int check_read_offset (dada_dbevent_t * dbevent)
{
const uint64_t max_delay_bytes = dbevent->input_maximum_delay * dbevent->bytes_per_second;
unsigned have_old_buffers = 1;
// check for end of data before doing anything
if (ipcbuf_eod ((ipcbuf_t *) dbevent->in_hdu->data_block))
have_old_buffers = 0;
// get the time and byte offset for the time
const time_t now = time(0);
const time_t obs_offset = (now - dbevent->utc_start);
if (obs_offset <= 0)
{
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "check_read_offset: now <= obs_offset\n");
return (0);
}
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "check_read_offset: now=%ld utc_start=%ld obs_offset=%ld\n", now, dbevent->utc_start, obs_offset);
const uint64_t now_byte_offset = (uint64_t) obs_offset * dbevent->bytes_per_second;
uint64_t read_offset, remainder, seek_byte;
int64_t seeked_byte;
unsigned n_skipped = 0;
while (have_old_buffers)
{
// get the current read offset in bytes
read_offset = ipcio_tell (dbevent->in_hdu->data_block);
// if the current read offset + the maximum allowed delay is still less than
// the current byte offset, then we MUST read while it is not
if (read_offset + max_delay_bytes < now_byte_offset)
{
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "check_read_offset: read_offset[%" PRIu64"] + max_delay_bytes[%" PRIu64"] < now_byte_offset[%" PRIu64"]\n", read_offset, max_delay_bytes, now_byte_offset);
remainder = read_offset % dbevent->in_bufsz;
seek_byte = 0;
if (dbevent->verbose > 1)
multilog (dbevent->log, LOG_INFO, "check_read_offset: read_offset=%" PRIu64" remainder=%" PRIu64"\n", read_offset, remainder);
// check for a partially read block
if (remainder != 0)
{
seek_byte = dbevent->in_bufsz - remainder;
}
// otherwise we will seek forward 1 full block
else
{
seek_byte = dbevent->in_bufsz;
}
if (((int) seek_byte < 0) || (seek_byte > dbevent->in_bufsz))
{
multilog (dbevent->log, LOG_WARNING, "check_read_offset: seek_byte limits warning: %" PRIi64"\n", seek_byte);
}
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "check_read_offset: ipcio_seek(%" PRIu64", SEEK_CUR)\n", seek_byte);
seeked_byte = ipcio_seek (dbevent->in_hdu->data_block, seek_byte, SEEK_CUR);
if (seeked_byte < 0)
{
multilog (dbevent->log, LOG_INFO, "check_read_offset: ipcio_seek failed\n");
return -1;
}
n_skipped ++;
}
else
have_old_buffers = 0;
// also check for end of data
if (ipcbuf_eod ((ipcbuf_t *) dbevent->in_hdu->data_block))
{
have_old_buffers = 0;
}
}
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "check_read_offset: skipped %" PRIi64" blocks\n", n_skipped);
return n_skipped;
}
int receive_events (dada_dbevent_t * dbevent, int listen_fd)
{
multilog_t * log = dbevent->log;
int fd = 0;
FILE * sockin = 0;
unsigned more_events = 1;
unsigned buffer_size = 1024;
char *buffer = new char[buffer_size];
char * event_start;
char * event_start_fractional;
char * event_end;
char * event_end_fractional;
char * event_snr_str;
char * event_dm_str;
char * event_width_str;
char * event_beam_str;
// arrays for n_events
uint64_t n_events = 0;
event_t * events = NULL;
int events_recorded = 0;
int events_missed = 0;
if (dbevent->verbose)
multilog (log, LOG_INFO, "receive_events: sock_accept(listen_fd)\n");
fd = sock_accept (listen_fd);
if (fd < 0)
{
multilog(log, LOG_WARNING, "error accepting connection %s\n", strerror(errno));
return -1;
}
sockin = fdopen(fd,"r");
if (!sockin)
{
multilog(log, LOG_WARNING, "error creating input stream %s\n", strerror(errno));
close(fd);
return -1;
}
setbuf (sockin, 0);
// first line on the socket should be the number of events
if (fgets (buffer, buffer_size, sockin) == NULL)
{
fprintf(stderr, "Error: Null pointer from socket\n");
exit(1);
}
if (sscanf (buffer, "N_EVENTS %" PRIu64, &n_events) != 1)
{
multilog(log, LOG_WARNING, "failed to parse N_EVENTS\n");
more_events = 0;
}
else
{
events = (event_t *) malloc (sizeof(event_t) * n_events);
}
// second line on the socket should be the UTC_START of the obsevation
if (fgets (buffer, buffer_size, sockin) == NULL)
{
fprintf(stderr, "Error: Null pointer from socket\n");
exit(1);
}
time_t event_utc_start = str2utctime (buffer);
char * comment = 0;
unsigned i = 0;
const char * sep_time = ". \t";
const char * sep_float = " \t";
int64_t offset;
uint64_t remainder;
while (more_events > 0 && !feof(sockin))
{
if (dbevent->verbose > 1)
multilog (log, LOG_INFO, "getting new line\n");
char * saveptr = 0;
if (fgets (buffer, buffer_size, sockin) == NULL)
{
fprintf(stderr, "Error: Null pointer from socket\n");
exit(1);
}
//if (dbevent->verbose > 1)
multilog (log, LOG_INFO, " <- %s", buffer);
// ignore comments
comment = strchr( buffer, '#' );
if (comment)
*comment = '\0';
comment = strchr( buffer, '\r' );
if (comment)
*comment = '\0';
if (dbevent->verbose)
multilog (log, LOG_INFO, "< - %s\n", buffer);
if (strlen(buffer) < 10)
continue;
if (strcmp(buffer, "QUIT") == 0)
{
multilog (log, LOG_WARNING, "receive_events: QUIT event received\n");
more_events = -1;
continue;
}
// extract START_UTC string excluding sub-second components
event_start = strtok_r (buffer, sep_time, &saveptr);
if (event_start == NULL)
{
multilog (log, LOG_WARNING, "receive_events: problem extracting event_start\n");
more_events = 0;
continue;
}
event_start_fractional = strtok_r (NULL, sep_time, &saveptr);
if (dbevent->verbose)
multilog (log, LOG_INFO, "event_start=%s event_start_fractional=%s\n", event_start, event_start_fractional);
offset = calculate_byte_offset (dbevent, event_start, event_start_fractional);
if (offset >= 0)
{
remainder = offset % dbevent->resolution;
if (remainder != 0)
{
if (offset > (int) dbevent->resolution)
events[i].start_byte = (uint64_t) (offset - remainder);
else
events[i].start_byte = dbevent->resolution;
}
else
events[i].start_byte = (uint64_t) offset;
}
else
events[i].start_byte = 0;
// extract END_UTC string excluding sub-second components
event_end = strtok_r (NULL, sep_time, &saveptr);
if (event_end == NULL)
{
multilog (log, LOG_WARNING, "receive_events: problem extracting event_end\n");
more_events = 0;
continue;
}
event_end_fractional = strtok_r (NULL, sep_time, &saveptr);
if (dbevent->verbose)
multilog (log, LOG_INFO, "event_end=%s event_end_fractional=%s\n", event_start, event_start_fractional);
offset = calculate_byte_offset (dbevent, event_end, event_end_fractional);
if (offset >= 0)
{
events[i].end_byte = (uint64_t) offset;
remainder = offset % dbevent->resolution;
if (remainder != 0)
events[i].end_byte += (dbevent->resolution - remainder);
}
else
events[i].end_byte = 0;
event_dm_str = strtok_r (NULL, sep_float, &saveptr);
sscanf(event_dm_str, "%f", &(events[i].dm));
event_snr_str = strtok_r (NULL, sep_float, &saveptr);
sscanf(event_snr_str, "%f", &(events[i].snr));
event_width_str = strtok_r (NULL, sep_float, &saveptr);
sscanf(event_width_str, "%f", &(events[i].width));
event_beam_str = strtok_r (NULL, sep_float, &saveptr);
sscanf(event_beam_str, "%u", &(events[i].beam));
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "event: %" PRIi64" - %" PRIi64" SNR=%f, DM=%f WIDTH=%f beam=%u\n",
events[i].start_byte, events[i].end_byte, events[i].snr, events[i].dm,
events[i].width, events[i].beam);
i++;
if (i >= n_events)
more_events = 0;
}
if (n_events > 0)
{
if (event_utc_start != dbevent->utc_start)
{
multilog (dbevent->log, LOG_WARNING, "Event UTC_START [%d] != Obs UTC_START [%d]\n", event_utc_start, dbevent->utc_start);
}
else
{
// sort the events based on event start time
qsort (events, n_events, sizeof (event_t), sort_events);
// now check for overlapping events
for (i=1; i<n_events; i++)
{
// start overlap
if (events[i].start_byte < events[i-1].end_byte)
{
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "amalgamating event idx %d into %d\n", i-1, i);
events[i].start_byte = events[i-1].start_byte;
events[i-1].start_byte = 0;
if (events[i-1].end_byte > events[i].end_byte)
events[i].end_byte = events[i-1].end_byte;
events[i-1].end_byte = 0;
}
}
// for each event, check that its in the future, and if so, seek forward to it
uint64_t current_byte = 0;
int64_t seeked_byte = 0;
for (i=0; i<n_events; i++)
{
if ((events[i].start_byte == 0) && (events[i].end_byte == 0))
{
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "ignoring event[%d], start_byte == end_byte == 0\n", i);
continue;
}
current_byte = ipcio_tell (dbevent->in_hdu->data_block);
//multilog (dbevent->log, LOG_INFO, "current_byte=%"PRIu64"\n", current_byte);
if (events[i].start_byte < current_byte)
{
multilog (dbevent->log, LOG_WARNING, "skipping events[%d], current_byte [%" PRIu64"] past event start_byte [%" PRIu64"]\n", i, current_byte, events[i].start_byte);
events_missed++;
continue;
}
// seek forward to the relevant point in the datablock
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "seeking forward %" PRIu64" bytes from start of obs\n", events[i].start_byte);
seeked_byte = ipcio_seek (dbevent->in_hdu->data_block, (int64_t) events[i].start_byte, SEEK_SET);
if (seeked_byte < 0)
{
multilog (dbevent->log, LOG_WARNING, "could not seek to byte %" PRIu64"\n", events[i].start_byte);
events_missed++;
continue;
}
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "seeked_byte=%" PRIi64"\n", seeked_byte);
// determine how much to read
size_t to_read = events[i].end_byte - events[i].start_byte;
multilog (dbevent->log, LOG_INFO, "to read = %d [%" PRIu64" - %" PRIu64"]\n", to_read, events[i].end_byte, events[i].start_byte);
if (dbevent->work_buffer_size < to_read)
{
dbevent->work_buffer_size = to_read;
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "reallocating work_buffer [%p] to %d bytes\n",
dbevent->work_buffer, dbevent->work_buffer_size);
dbevent->work_buffer = realloc (dbevent->work_buffer, dbevent->work_buffer_size);
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "reallocated work_buffer [%p]\n", dbevent->work_buffer);
}
// read the event from the input buffer
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "reading %d bytes from input HDU into work buffer\n", to_read);
ssize_t bytes_read = ipcio_read (dbevent->in_hdu->data_block, (char *) dbevent->work_buffer, to_read);
if (dbevent->verbose)
multilog (dbevent->log, LOG_INFO, "read %d bytes from input HDU into work buffer\n", bytes_read);
if (bytes_read < 0)
{
multilog (dbevent->log, LOG_WARNING, "receive_events: ipcio_read on input HDU failed\n");
return -1;
}
/*// save cut-outs of the event here
dbevent->event_buffer_size = ((2.0 + events[i].width)/events[i].tsamp) * events[i].nchans * 2; // 16 bytes per sample
dbevent->event_buffer = malloc(dbevent->event_buffer_size * sizeof(char));
int first_offset = 0; //calc_offset();
(char*) dbevent->work_buffer += first_offset;
for (int ii=0; ii < nchans; ++ii)
{
int ftemp1 = f1;
int ftemp2 = f1 + ((f1 - f2)/events[i].nchans);
double delay = dm_delay(events[i].f1, events[i].f2, events[i].dm);
memcpy(dbevent->event_buffer, dbevent->work_buffer, dbevent->event_buffer_size);
}*/
events_recorded++;
char * header = ipcbuf_get_next_write (dbevent->out_hdu->header_block);
uint64_t header_size = ipcbuf_get_bufsz (dbevent->out_hdu->header_block);
if (header_size < dbevent->header_size)
{
multilog (log, LOG_ERR, "receive_events: output header too small for input header\n");
return -1;
}
// copy the input header to the output
memcpy (header, dbevent->header, dbevent->header_size);
// now write some relevant data to the header
ascii_header_set (header, "OBS_OFFSET", "%" PRIu64, events[i].start_byte);
ascii_header_set (header, "FILE_SIZE", "%ld", to_read);
ascii_header_set (header, "EVENT_SNR", "%f", events[i].snr);
ascii_header_set (header, "EVENT_DM", "%f", events[i].dm);
ascii_header_set (header, "EVENT_WIDTH", "%f", events[i].width);
ascii_header_set (header, "EVENT_BEAM", "%u", events[i].beam);
ascii_header_set (header, "EVENT_FIRST_FREQUENCY", "%f", events[i].f1);
ascii_header_set (header, "EVENT_LAST_FREQUENCY", "%f", events[i].f2);
ascii_header_set (header, "EVENT_TSAMP", "%f", events[i].tsamp);
ascii_header_set (header, "EVENT_NCHANS", "%d", events[i].nchans);
// tag this header as filled
ipcbuf_mark_filled (dbevent->out_hdu->header_block, header_size);
// write the specified amount to the output data block
ipcio_write (dbevent->out_hdu->data_block, (char *) dbevent->event_buffer, to_read);
// close the data block to ensure EOD is written
if (dada_hdu_unlock_write (dbevent->out_hdu) < 0)
{
multilog (log, LOG_ERR, "could not close output HDU as writer\n");
return -1;
}
// close the data block to ensure EOD is written
// lock write again to re-open for the next event
if (dada_hdu_lock_write (dbevent->out_hdu) < 0)
{
multilog (log, LOG_ERR, "could not open output HDU as writer\n");
return -1;
}
}
multilog (dbevent->log, LOG_INFO, "recorded=%d missed=%d\n", events_recorded, events_missed);
delete[] buffer;
}
}
fclose(sockin);
close (fd);
if (events)
free(events);
events = 0;
return events_recorded;
}
int64_t calculate_byte_offset (dada_dbevent_t * dbevent, char * time_str_secs, char * time_str_frac)
{
time_t time_secs; // integer time in seconds
uint64_t time_frac_numer; // numerator of fractional time
uint64_t time_frac_denom; // denominator of fractional time
int64_t event_byte_offset = -1;
uint64_t event_byte;
uint64_t event_byte_frac;
time_secs = str2utctime (time_str_secs);
sscanf (time_str_frac, "%" PRIu64, &time_frac_numer);
time_frac_denom = (uint64_t) powf(10,strlen(time_str_frac));
if (dbevent->verbose > 1)
multilog (dbevent->log, LOG_INFO, "calculate_byte_offset: time_secs=%d, time_frac_numer=%" PRIu64", time_frac_denom=%" PRIu64"\n",
time_secs, time_frac_numer, time_frac_denom);
// check we have utc_start and that this event is in the future
if (dbevent->utc_start && (time_secs >= dbevent->utc_start))
{
event_byte = (time_secs - dbevent->utc_start) * dbevent->bytes_per_second;
event_byte_frac = time_frac_numer * dbevent->bytes_per_second;
event_byte_frac /= time_frac_denom;
event_byte_offset = (int64_t) event_byte + event_byte_frac;
if (dbevent->verbose > 1)
multilog (dbevent->log, LOG_INFO, "calculate_byte_offset: byte_offset [%" PRIi64"]=event_byte [%" PRIu64"] + event_byte_frac [%" PRIu64"]\n",
event_byte_offset, event_byte, event_byte_frac);
}
else
{
multilog (dbevent->log, LOG_ERR, "calculate_byte_offset: time_secs=%d >= dbevent->utc_start=%" PRIu64"\n", time_secs, dbevent->utc_start);
}
return event_byte_offset;
}
/*
* dump the specified event to the output datablock */
int dump_event (dada_dbevent_t * dbevent, double event_start_utc,
double event_end_utc, float event_snr, float event_dm)
{
multilog (dbevent->log, LOG_INFO, "event time: %lf - %lf [seconds]\n", event_start_utc, event_end_utc);
multilog (dbevent->log, LOG_INFO, "event SNR: %f\n", event_snr);
multilog (dbevent->log, LOG_INFO, "event DM: %f\n", event_dm);
return 0;
}
......@@ -90,7 +90,7 @@ int main(int argc, char** argv)
{
std::string filename = "beam" + std::to_string(ii) + ".fil";
files.emplace_back(std::make_shared<SimpleFileWriter>(filename));
}
}
meerkat::tuse::TransposeToDada<SimpleFileWriter> transpose(nbeams,std::move(files));
transpose.set_nsamples(nsamples);
transpose.set_nchans(nchans);
......@@ -109,7 +109,7 @@ int main(int argc, char** argv)
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
return SUCCESS;
}
......@@ -99,13 +99,13 @@ int main(int argc, char** argv)
{
nullsinks.emplace_back(std::make_shared<NullSink>());
}
meerkat::tuse::TransposeToDada<NullSink> transpose(nbeams,std::move(nullsinks));
transpose.set_nsamples(nsamples);
transpose.set_nchans(nchans);
transpose.set_nfreq(nfreq);
transpose.set_ngroups(ngroups);
transpose.set_nbeams(nbeams);
PsrDadaToSigprocHeader<decltype(transpose)> ptos(transpose);
MultiLog log1("instream");
DadaInputStream<decltype(ptos)> input(input_key,log1,ptos);
......
#ifndef PSRDADA_CPP_MEERKAT_TUSE_TRANSPOSETEST_H
#define PSRDADA_CPP_MEERKAT_TUSE_TRANSPOSETEST_H
#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp"
#include <gtest/gtest.h>
namespace psrdada_cpp {
namespace meerkat {
namespace tuse {
namespace test {
class TransposeTest: public ::testing::Test
{
protected:
void SetUp() override;
void TearDown() override;
public:
TransposeTest();
~TransposeTest();
};
} //namespace test
} //namespace tuse
} //namespace meerkat
} //namespace psrdada_cpp
#endif //PSRDADA_CPP_MEERKAT_TUSE_TRANSPOSETEST_H
#include "psrdada_cpp/meerkat/tuse/test/TransposeTest.h"
#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.h"
namespace psrdada_cpp {
namespace meerkat {
namespace tuse {
namespace test {
TransposeTest::TransposeTest()
: ::testing::Test()
{
}
TransposeTest::~TransposeTest()
{
}
void TransposeTest::SetUp()
{
}
void TransposeTest::TearDown()
{
}
TEST_F(TransposeTest, test_attributes)
{
}
} //namespace test
} //namespace tuse
} //namespace meerkat
} //namespace psrdada_cpp
......@@ -58,53 +58,53 @@ public:
void set_ngroups(const int ngroups);
/**
* @brief Setter for frequency channels
*/
/**
* @brief Setter for frequency channels
*/
void set_nchans(const int nchans);
void set_nchans(const int nchans);
/**
* @brief Setter of number of time samples
*/
/**
* @brief Setter of number of time samples
*/
void set_nsamples(const int nsamples);
void set_nsamples(const int nsamples);
/**
* @brief Setter for number of frequency blocks
*/
/**
* @brief Setter for number of frequency blocks
*/
void set_nfreq(const int _nfreq);
void set_nfreq(const int _nfreq);
/**
* @brief getter for number of channels
*/
/**
* @brief getter for number of channels
*/
std::uint32_t nchans();
std::uint32_t nchans();
/**
* @brief getter for number of time samples
*/
/**
* @brief getter for number of time samples
*/
std::uint32_t nsamples();
std::uint32_t nsamples();
/**
* @brief getter for frequency blocks
*/
/**
* @brief getter for frequency blocks
*/
std::uint32_t nfreq();
std::uint32_t nfreq();
/**
*@brief: getter for ngroups
*/
/**
*@brief: getter for ngroups
*/
std::uint32_t ngroups();
std::uint32_t ngroups();
/**
*@brief: getter for nbeams
*/
/**
*@brief: getter for nbeams
*/
std::uint32_t nbeams();
std::uint32_t nbeams();
private:
std::uint32_t _numbeams;
......
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/file_input_stream.hpp"
#include "psrdada_cpp/dada_output_stream.hpp"
#include <memory>
#include <fstream>
#include "boost/program_options.hpp"
#include <ctime>
using namespace psrdada_cpp;
namespace
{
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char** argv)
{
try
{
key_t output_key;
std::size_t headersize;
std::size_t nbytes;
std::string filename;
/** Define and parse the program options
* */
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()
("output_key,k", po::value<std::string>()
->default_value("dada")
->notifier([&output_key](std::string in)
{
output_key = string_to_key(in);
}),
"The shared memory key for the output dada buffer to connect to (hex string)")
("help,h", "Print help messages")
("input_file,f", po::value<std::string>(&filename)->required(),
"Input file to read")
("nbytes,n", po::value<std::size_t>(&nbytes)->required(),
"Number of bytes to read in one DADA block")
("header_size,s", po::value<std::size_t>(&headersize)->required(),
"size of header to read");
/* Catch Error and program description */
po::variables_map vm;
try
{
po::store(po::parse_command_line(argc, argv, desc), vm);
if ( vm.count("help") )
{
std::cout << "Transpose2Dada -- read MeerKAT beamformed dada from DADA buffer, transpose per beam and write to an output DADA buffer"
<< std::endl << desc << std::endl;
return SUCCESS;
}
po::notify(vm);
}
catch(po::error& e)
{
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/* Application Code */
MultiLog log("outstream");
/* Setting up the pipeline based on the type of sink*/
DadaOutputStream outstream(output_key, log);
FileInputStream<decltype(outstream)> input(filename, headersize, nbytes, outstream);
input.start();
/* End Application Code */
}
catch(std::exception& e)
{
std::cerr << "Unhandled Exception reached the top of main: "
<< e.what() << ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
......@@ -21,20 +21,20 @@ namespace psrdada_cpp
{
int len = str.size();
std::memcpy(ptr,(char*)&len,sizeof(len));
ptr += sizeof(len);
ptr += sizeof(len);
std::copy(str.begin(),str.end(),ptr);
ptr += len;
ptr += len;
}
void SigprocHeader::header_write(char*& ptr, std::string const& str, std::string const& name)
{
header_write(ptr,str);
header_write(ptr,name);
header_write(ptr,str);
header_write(ptr,name);
}
void SigprocHeader::write_header(RawBytes& block, PsrDadaHeader ph)
{
auto ptr = block.ptr();
auto ptr = block.ptr();
header_write(ptr,"HEADER_START");
header_write<std::uint32_t>(ptr,"telescope_id",0);
header_write<std::uint32_t>(ptr,"machine_id",11);
......@@ -44,8 +44,8 @@ namespace psrdada_cpp
// RA DEC
auto ra_val = ph.ra();
auto dec_val =ph.dec();
std::vector<std::string> ra_s;
std::vector<std::string> dec_s;
std::vector<std::string> ra_s;
std::vector<std::string> dec_s;
boost::split(ra_s,ra_val,boost::is_any_of(":"));
boost::split(dec_s,dec_val,boost::is_any_of(":"));
double ra = stod(boost::join(ra_s,""));
......@@ -65,7 +65,7 @@ namespace psrdada_cpp
std::size_t SigprocHeader::header_size() const
{
return _header_size;
return _header_size;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment