diff --git a/cmake/psrdada.cmake b/cmake/psrdada.cmake index a09b4da84078b552f71e38243952e7d354345fa9..a011a2d2250f0cc2cce1a6de048c4a049f1d64d4 100644 --- a/cmake/psrdada.cmake +++ b/cmake/psrdada.cmake @@ -34,4 +34,4 @@ ENDIF(NOT PSRDADA_FOUND) LIST(APPEND PSRDADA_INCLUDE_DIR "${PSRDADA_INCLUDE_DIR}") -MARK_AS_ADVANCED(PSRDADA_LIBRARIES PSRDADA_TEST_LIBRARIES PSRDADA_INCLUDE_DIR) \ No newline at end of file +MARK_AS_ADVANCED(PSRDADA_LIBRARIES PSRDADA_TEST_LIBRARIES PSRDADA_INCLUDE_DIR) diff --git a/psrdada_cpp/CMakeLists.txt b/psrdada_cpp/CMakeLists.txt index 18daba2f300ebcffbbd1ee97419bdea28599efe2..40dc32489460c316e89f599c13308b73790b2107 100644 --- a/psrdada_cpp/CMakeLists.txt +++ b/psrdada_cpp/CMakeLists.txt @@ -19,6 +19,7 @@ set(psrdada_cpp_src src/simple_file_writer.cpp src/sigprocheader.cpp src/psrdadaheader.cpp + src/file_to_dada_cli.cpp ) set(psrdada_cpp_inc @@ -38,6 +39,7 @@ set(psrdada_cpp_inc sigprocheader.hpp psrdadaheader.hpp psrdada_to_sigproc_header.hpp + file_input_stream.hpp ) # -- the main library target @@ -55,6 +57,11 @@ target_link_libraries (syncdb ${PSRDADA_CPP_LIBRARIES}) add_executable(fbfuse_output_db examples/fbfuse_output_db.cpp) target_link_libraries (fbfuse_output_db ${PSRDADA_CPP_LIBRARIES}) +#file_to_dada_cli +add_executable(file_to_dada src/file_to_dada_cli.cpp) +target_link_libraries (file_to_dada ${PSRDADA_CPP_LIBRARIES}) + + #dbnull add_executable(dbnull examples/dbnull.cpp) target_link_libraries (dbnull ${PSRDADA_CPP_LIBRARIES}) diff --git a/psrdada_cpp/dada_junk_source.hpp b/psrdada_cpp/dada_junk_source.hpp index 931a14f49610fe798c72cc8a72af5ed9d5f3a374..cce6a77e2f43802c904ddcc6c2a2b7f0de495059 100644 --- a/psrdada_cpp/dada_junk_source.hpp +++ b/psrdada_cpp/dada_junk_source.hpp @@ -23,9 +23,9 @@ namespace psrdada_cpp RawBytes header(junk_header.data(),header_size,header_size,false); handler.init(header); std::size_t bytes_written = 0; - std::vector<char> junk_block(total_bytes,0); + std::vector<char> junk_block(nbytes_per_write,0); srand(0); - for (std::uint32_t ii=0; ii < total_bytes; ++ii) + for (std::uint32_t ii=0; ii < nbytes_per_write; ++ii) { junk_block[ii] = rand()%255 + 1; } diff --git a/psrdada_cpp/detail/file_input_stream.cpp b/psrdada_cpp/detail/file_input_stream.cpp new file mode 100644 index 0000000000000000000000000000000000000000..216aba75e83188533bc57ee9f9ef93e1d85f083e --- /dev/null +++ b/psrdada_cpp/detail/file_input_stream.cpp @@ -0,0 +1,74 @@ +#include "psrdada_cpp/file_input_stream.hpp" + +namespace psrdada_cpp { + + template <class HandlerType> + FileInputStream<HandlerType>::FileInputStream(std::string fileName, std::size_t headersize, std::size_t nbytes, HandlerType& handler) + : _headersize(headersize) + , _nbytes(nbytes) + , _handler(handler) + , _stop(false) + , _running(false) + { + const char *filename = fileName.c_str(); + _filestream.exceptions(std::ifstream::failbit | std::ifstream::badbit); + _filestream.open(filename, std::ifstream::in | std::ifstream::binary); + if (!_filestream.is_open()) + { + throw std::runtime_error("File could not be opened"); + } + } + + template <class HandlerType> + FileInputStream<HandlerType>::~FileInputStream() + { + _filestream.close(); + } + + template <class HandlerType> + void FileInputStream<HandlerType>::start() + { + if (_running) + { + throw std::runtime_error("Stream is already running"); + } + _running = true; + + // Get the header + char* header_ptr = new char[4096]; + char* data_ptr = new char[_nbytes]; + RawBytes header_block(header_ptr, 4096, 0, false); + _filestream.read(header_ptr, _headersize); + header_block.used_bytes(4096); + _handler.init(header_block); + + // Continue to stream data until the end + while (!_stop) + { + BOOST_LOG_TRIVIAL(info) << "Reading data from the file"; + // Read data from file here + RawBytes data_block(data_ptr, _nbytes, 0, false); + while (!_stop) + { + if (_filestream.eof()) + { + BOOST_LOG_TRIVIAL(info) << "Reached end of file"; + _filestream.close(); + break; + } + _filestream.read(data_ptr, _nbytes); + data_block.used_bytes(data_block.total_bytes()); + _handler(data_block); + } + data_block.~RawBytes(); + } + _running = false; + } + + template <class HandlerType> + void FileInputStream<HandlerType>::stop() + { + _stop = true; + } + +} //psrdada_cpp diff --git a/psrdada_cpp/file_input_stream.hpp b/psrdada_cpp/file_input_stream.hpp new file mode 100644 index 0000000000000000000000000000000000000000..bfa81ee3b5bdd624298c707f2debf197e860c32a --- /dev/null +++ b/psrdada_cpp/file_input_stream.hpp @@ -0,0 +1,36 @@ +#ifndef PSRDADA_CPP_FILE_INPUT_STREAM_HPP +#define PSRDADA_CPP_FILE_INPUT_STREAM_HPP +#include <fstream> +#include <cstdlib> +#include "psrdada_cpp/raw_bytes.hpp" +/** + * @detail: A simple file input stream. Will go through one entire file. + * Will assume there is some amount of header to the file. + */ + + +namespace psrdada_cpp +{ + + template <class HandlerType> + class FileInputStream + { + public: + FileInputStream(std::string filename, std::size_t headersize, std::size_t nbytes, HandlerType& handler); + ~FileInputStream(); + void start(); + void stop(); + + private: + std::size_t _headersize; + std::size_t _nbytes; + std::ifstream _filestream; + HandlerType& _handler; + bool _stop; + bool _running; + }; +} //psrdada_cpp + +#include "psrdada_cpp/detail/file_input_stream.cpp" + +#endif //PSRDADA_CPP_FILE_INPUT_STREAM_HPP diff --git a/psrdada_cpp/meerkat/tuse/CMakeLists.txt b/psrdada_cpp/meerkat/tuse/CMakeLists.txt index e36cbbdcf82e2bbd0ea63ca209136a21a4fdcb20..54ad8a594e6e05fd2e3e469d65408090150663ad 100644 --- a/psrdada_cpp/meerkat/tuse/CMakeLists.txt +++ b/psrdada_cpp/meerkat/tuse/CMakeLists.txt @@ -20,6 +20,11 @@ target_link_libraries (transpose_to_file_cli ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIE #transpose_to_null_cli add_executable(transpose_to_null_cli src/transpose_to_null_cli.cpp) target_link_libraries (transpose_to_null_cli ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES}) + +#dada_dbevent +add_executable(dada_dbevent src/dada_dbevent.cpp) +target_link_libraries (dada_dbevent ${PSRDADA_CPP_MEERKAT_TUSE_LIBRARIES}) + install (TARGETS transpose_to_null_cli transpose_to_file_cli transpose_to_dada_cli DESTINATION bin) install(FILES transpose_to_dada.hpp DESTINATION include/psrdada_cpp/meerkat/tuse) add_subdirectory(test) diff --git a/psrdada_cpp/meerkat/tuse/src/dada_dbevent.cpp b/psrdada_cpp/meerkat/tuse/src/dada_dbevent.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bacef9a4bb87d6aea95c8ab38044f8fa0e8f93e8 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/src/dada_dbevent.cpp @@ -0,0 +1,1025 @@ +/*************************************************************************** + * + * Copyright (C) 2012 by Andrew Jameson + * Licensed under the Academic Free License version 2.1 + * + ****************************************************************************/ + +/* + * Attaches to in input data block as a viewer, and opens a socket to listen + * for requests to write temporal events to the output data block. Can seek + * back in time over cleared data blocks + */ + +#include "ascii_header.h" +#include "dada_hdu.h" +#include "dada_def.h" +#include "node_array.h" +#include "multilog.h" +#include "diff_time.h" +#include "sock.h" +#include "tmutil.h" + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> +#include <assert.h> +#include <signal.h> + +#include <sys/types.h> +#include <sys/socket.h> + +#include <sys/ipc.h> +#include <sys/sem.h> +#include <sys/shm.h> +#include <math.h> + +using namespace std; + +const int DADA_DBEVENT_DEFAULT_PORT= 30000; +const int DADA_DBEVENT_DEFAULT_INPUT_BUFFER = 80; +const int DADA_DBEVENT_DEFAULT_INPUT_DELAY= 60; +#define DADA_DBEVENT_TIMESTR "%Y-%m-%d-%H:%M:%S" + + +int quit = 0; + +typedef struct { + + // input HDU + dada_hdu_t * in_hdu; + + // output HDU + dada_hdu_t * out_hdu; + + // multilog + multilog_t * log; + + // input data block's UTC_START + time_t utc_start; + + // input data block's BYTES_PER_SECOND + uint64_t bytes_per_second; + + time_t input_maximum_delay; + + char * header; + + size_t header_size; + + void * work_buffer; + + void * event_buffer; + + size_t event_buffer_size; + + size_t work_buffer_size; + + uint64_t in_nbufs; + + uint64_t in_bufsz; + + uint64_t resolution; + + // verbosity + int verbose; + +} dada_dbevent_t; + +typedef struct { + + uint64_t start_byte; + + uint64_t end_byte; + + float snr; + + float dm; + + float width; + + unsigned beam; + + float tsamp; + + uint32_t nchans; + + float f1; + + float f2; + +} event_t; + +#define DADA_DBEVENT_INIT { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 } + +static int sort_events (const void *p1, const void *p2) +{ + event_t A = *(event_t *) p1; + event_t B = *(event_t *) p2; + if (A.start_byte < B.start_byte) return -1; + if (A.start_byte > B.start_byte) return +1; + return 0; +} + +int check_read_offset (dada_dbevent_t * dbevent); +//int check_write_timestamps (dada_dbevent_t * dbevent); + +int64_t calculate_byte_offset (dada_dbevent_t * dbevent, char * time_str_secs, char * time_str_frac); + +int receive_events (dada_dbevent_t * dbevent, int listen_fd); + +int dump_event(dada_dbevent_t * dbevent, double event_start_utc, double event_end_utc, float event_snr, float event_dm); + +void usage(); + +void usage() +{ + fprintf (stdout, + "dada_dbevent [options] inkey outkey\n" + " inkey input hexadecimal shared memory key\n" + " outkey input hexadecimal shared memory key\n" + " -b percent delay procesing of the input buffer up to this amount [default %d %%]\n" + " -t delay maximum delay (s) to retain data for [default %ds]\n" + " -h print this help text\n" + " -p port port to listen for event commands [default %d]\n" + " -v be verbose\n", + DADA_DBEVENT_DEFAULT_INPUT_BUFFER, + DADA_DBEVENT_DEFAULT_INPUT_DELAY, + DADA_DBEVENT_DEFAULT_PORT); +} + +void signal_handler(int signalValue) +{ + fprintf(stderr, "dada_dbevent: SIGINT/TERM\n"); + fprintf(stderr,"value of signal interrupt is: %d\n", signalValue); + quit = 1; +} + +double dm_delay(float DM, double freq1, double freq2) +{ + double freqghz1 = freq1/1e9; + double freqghz2 = freq2/1e9; + double delta_t = 4.15 * 1e-3 * DM * ((1/pow(freqghz1,2)) - 1/(pow(freqghz2,2))); // in seconds + return delta_t; +} + +int calcbytes(double seconds, int nchans, double tsamp, int bytespersamp) +{ + return (int) bytespersamp*(seconds/tsamp) * nchans; +} + +int main (int argc, char **argv) +{ + // core dbevent data struct + dada_dbevent_t dbevent = DADA_DBEVENT_INIT; + + // DADA Logger + multilog_t* log = 0; + + // flag set in verbose mode + char verbose = 0; + + // port to listen for event requests + int port = DADA_DBEVENT_DEFAULT_PORT; + + // input hexadecimal shared memory key + key_t in_dada_key; + + // output hexadecimal shared memory key + key_t out_dada_key; + + float input_data_block_threshold = DADA_DBEVENT_DEFAULT_INPUT_BUFFER; + + int input_maximum_delay = DADA_DBEVENT_DEFAULT_INPUT_DELAY; + + int arg = 0; + + while ((arg=getopt(argc,argv,"b:hp:t:v")) != -1) + { + switch (arg) + { + case 'b': + if (sscanf (optarg, "%f", &input_data_block_threshold) != 1) + { + fprintf (stderr, "dada_dbevent: could not parse input buffer level from %s\n", optarg); + return EXIT_FAILURE; + } + break; + + case 'h': + usage(); + return EXIT_SUCCESS; + + case 'v': + verbose++; + break; + + case 'p': + if (sscanf (optarg, "%d", &port) != 1) + { + fprintf (stderr, "dada_dbevent: could not parse port from %s\n", optarg); + return EXIT_FAILURE; + } + break; + + case 't': + if (sscanf (optarg, "%d", &input_maximum_delay) != 1) + { + fprintf (stderr, "dada_dbevent: could not parse maximum input delay from %s\n", optarg); + return EXIT_FAILURE; + } + break; + + default: + usage (); + return EXIT_SUCCESS; + } + } + + if (argc - optind != 2) + { + fprintf (stderr, "dada_dbevent: expected 2 command line arguments\n"); + usage(); + return EXIT_FAILURE; + } + + if (sscanf (argv[optind], "%x",(unsigned int*) &in_dada_key) != 1) + { + fprintf (stderr,"dada_dbevent: could not parse in_key from %s\n", argv[optind]); + return EXIT_FAILURE; + } + + if (sscanf (argv[optind+1], "%x",(unsigned int*) &out_dada_key) != 1) + { + fprintf (stderr,"dada_dbevent: could not parse out_key from %s\n", argv[optind+1]); + return EXIT_FAILURE; + } + + // install some signal handlers + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + + log = multilog_open ("dada_dbevent", 0); + multilog_add (log, stderr); + + dbevent.verbose = verbose; + dbevent.log = log; + dbevent.input_maximum_delay = (time_t) input_maximum_delay; + dbevent.work_buffer_size = 1024 * 1024; + dbevent.work_buffer = malloc (dbevent.work_buffer_size); + if (!dbevent.work_buffer) + { + multilog(log, LOG_INFO, "could not allocate memory for work buffer\n"); + return EXIT_FAILURE; + } + + if (verbose) + multilog(log, LOG_INFO, "connecting to data blocks\n"); + + dbevent.in_hdu = dada_hdu_create (log); + dada_hdu_set_key (dbevent.in_hdu, in_dada_key); + if (dada_hdu_connect (dbevent.in_hdu) < 0) + { + multilog(log, LOG_ERR, "could not connect to input HDU\n"); + return EXIT_FAILURE; + } + if (dada_hdu_lock_read(dbevent.in_hdu) < 0) + { + multilog (log, LOG_ERR, "could not open input HDU as viewer\n"); + return EXIT_FAILURE; + } + + dbevent.out_hdu = dada_hdu_create (log); + dada_hdu_set_key (dbevent.out_hdu, out_dada_key); + if (dada_hdu_connect (dbevent.out_hdu) < 0) + { + multilog(log, LOG_ERR, "could not connect to output HDU\n"); + return EXIT_FAILURE; + } + if (dada_hdu_lock_write (dbevent.out_hdu) < 0) + { + multilog (log, LOG_ERR, "could not open output HDU as writer\n"); + return EXIT_FAILURE; + } + + // open listening socket + if (verbose) + multilog (log, LOG_INFO, "main: sock_create(%d)\n", port); + int listen_fd = sock_create (&port); + if (listen_fd < 0) + { + multilog (log, LOG_ERR, "could not open socket: %s\n", strerror(errno)); + quit = 2; + } + else + { + if (verbose) + multilog (log, LOG_INFO, "listening on port %d for dump requests\n", port); + } + + + fd_set fds; + struct timeval timeout; + int fds_read; + + // now get the header from the input data block + if (verbose) + multilog(log, LOG_INFO, "waiting for input header\n"); + if (dada_hdu_open (dbevent.in_hdu) < 0) + { + multilog (log, LOG_ERR, "could not get input header\n"); + quit = 1; + } + else + { + if (verbose > 1) + { + fprintf (stderr, "==========\n"); + fprintf (stderr, "%s", dbevent.in_hdu->header); + fprintf (stderr, "==========\n"); + } + + dbevent.header_size = ipcbuf_get_bufsz (dbevent.in_hdu->header_block); + dbevent.header = (char *) malloc (dbevent.header_size); + if (!dbevent.header) + { + multilog (log, LOG_ERR, "failed to allocate memory for header\n"); + quit = 1; + } + else + { + // make a local copy of the header + memcpy (dbevent.header, dbevent.in_hdu->header, dbevent.header_size); + + // immediately mark this header as cleared + ipcbuf_mark_cleared (dbevent.in_hdu->header_block); + + char utc_buffer[64]; + // get the UTC_START and TSAMP / BYTES_PER_SECOND for this observation + if (ascii_header_get (dbevent.header, "UTC_START", "%s", utc_buffer) < 0) + { + multilog (log, LOG_ERR, "could not extract UTC_START from input datablock header\n"); + quit = 2; + } + else + { + if (verbose) + multilog(log, LOG_INFO, "input UTC_START=%s\n", utc_buffer); + dbevent.utc_start = str2utctime (utc_buffer); + if (dbevent.utc_start == (time_t)-1) + { + multilog (log, LOG_ERR, "could not parse UTC_START from '%s'\n", utc_buffer); + quit = 2; + } + } + + if (ascii_header_get (dbevent.header, "BYTES_PER_SECOND", "%" PRIu64, &(dbevent.bytes_per_second)) < 0) + { + multilog (log, LOG_ERR, "could not extract BYTES_PER_SECOND from input datablock header\n"); + quit = 2; + } + else + { + if (verbose) + multilog(log, LOG_INFO, "input BYTES_PER_SECOND=%" PRIu64"\n", dbevent.bytes_per_second); + } + + if (ascii_header_get (dbevent.header, "RESOLUTION", "%" PRIu64, &(dbevent.resolution)) < 0) + dbevent.resolution = 1; + + if (verbose) + multilog(log, LOG_INFO, "input RESOLUTION=%" PRIu64"\n", dbevent.resolution); + } + } + + ipcbuf_t * db = (ipcbuf_t *) dbevent.in_hdu->data_block; + + // get the number and size of buffers in the input data block + dbevent.in_nbufs = ipcbuf_get_nbufs (db); + dbevent.in_bufsz = ipcbuf_get_bufsz (db); + + while (!quit) + { + // setup file descriptor set for listening + FD_ZERO(&fds); + FD_SET(listen_fd, &fds); + timeout.tv_sec = 0; + timeout.tv_usec = 1000000; + fds_read = select(listen_fd+1, &fds, (fd_set *) 0, (fd_set *) 0, &timeout); + + // problem with select call + if (fds_read < 0) + { + multilog (log, LOG_ERR, "select failed: %s\n", strerror(errno)); + quit = 2; + break; + } + // select timed out, check input HDU for end of data + else if (fds_read == 0) + { + if (verbose > 1) + multilog (log, LOG_INFO, "main: check_read_offset ()\n"); + int64_t n_skipped = check_read_offset (&dbevent); + if (n_skipped < 0) + multilog (log, LOG_WARNING, "check_db_times failed\n"); + if (verbose > 1) + multilog (log, LOG_INFO, "main: check_db_times skipped %" PRIi64" events\n", n_skipped); + } + // we received a new connection on our listening FD, process comand + else + { + if (verbose) + multilog (log, LOG_INFO, "main: receiving events on socket\n"); + int events_recorded = receive_events (&dbevent, listen_fd); + if (events_recorded < 0) + { + multilog (log, LOG_INFO, "main: quit requested via socket\n"); + quit = 1; + } + if (verbose) + multilog (log, LOG_INFO, "main: received %d events\n", events_recorded); + } + + // check how full the input datablock is + float percent_full = ipcio_percent_full (dbevent.in_hdu->data_block) * 100; + if (verbose > 1) + multilog (log, LOG_INFO, "input datablock %5.2f percent full\n", percent_full); + + int64_t read_offset, remainder, seek_byte, seeked_byte; + + while (!quit && percent_full > input_data_block_threshold) + { + if (verbose) + multilog (log, LOG_INFO, "percent_full=%5.2f threshold=%5.2f\n", percent_full, input_data_block_threshold); + // since we are too full, seek forward 1 block + read_offset = ipcio_tell (dbevent.in_hdu->data_block); + + // if the current read offset is a full block, make it so + remainder = read_offset % dbevent.in_bufsz; + + if (remainder != 0) + seek_byte = (int64_t) dbevent.in_bufsz - remainder; + else + seek_byte = (int64_t) dbevent.in_bufsz; + + if ((seek_byte < 0) || (seek_byte > (int) dbevent.in_bufsz)) + multilog (log, LOG_WARNING, "main: seek_byte limits warning: %" PRIi64"\n", seek_byte); + + if (verbose) + multilog (log, LOG_INFO, "main: ipcio_seek(%" PRIu64", SEEK_CUR)\n", seek_byte); + + // seek forward (from curr pos) to the next full block boundary + seeked_byte = ipcio_seek (dbevent.in_hdu->data_block, seek_byte, SEEK_CUR); + if (seeked_byte < 0) + { + multilog (log, LOG_INFO, "main: ipcio_seek failed\n"); + quit = 1; + } + + // sleep a short space to allow writer to write! + usleep(10000); + + // update the percentage full for the data block + percent_full = ipcio_percent_full (dbevent.in_hdu->data_block) * 100; + if (verbose) + multilog (log, LOG_INFO, "input datablock reduced to %5.2f percent full\n", percent_full); + } + + if (ipcbuf_eod (db)) + { + if (verbose) + multilog (log, LOG_INFO, "EOD now true\n"); + quit = 1; + } + } + + if (quit) + { + if (!ipcbuf_eod (db)) + { + multilog (log, LOG_INFO, "quit requested, trying to clear all remaining data blocks\n"); + uint64_t bytes_written = ipcbuf_get_write_byte_xfer ((ipcbuf_t *) dbevent.in_hdu->data_block); + multilog (log, LOG_INFO, "quit requested: total bytes written=%" PRIu64"\n", bytes_written); + int64_t bytes_seeked = ipcio_seek (dbevent.in_hdu->data_block, (int64_t) bytes_written, SEEK_SET); + multilog (log, LOG_INFO, "quit requested: seeked to byte %" PRIi64"\n", bytes_seeked); + } + } + + free (dbevent.work_buffer); + if (dbevent.header) + free (dbevent.header); + dbevent.header = 0; + + if (dada_hdu_disconnect (dbevent.in_hdu) < 0) + { + fprintf (stderr, "dada_dbevent: disconnect from input data block failed\n"); + return EXIT_FAILURE; + } + + if (dada_hdu_unlock_write (dbevent.out_hdu) < 0) + { + fprintf (stderr, "dada_dbevent: unlock write on output data block failed\n"); + return EXIT_FAILURE; + } + if (dada_hdu_disconnect (dbevent.out_hdu) < 0) + { + fprintf (stderr, "dada_dbevent: disconnect from output data block failed\n"); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} + + +int check_read_offset (dada_dbevent_t * dbevent) +{ + const uint64_t max_delay_bytes = dbevent->input_maximum_delay * dbevent->bytes_per_second; + + unsigned have_old_buffers = 1; + + // check for end of data before doing anything + if (ipcbuf_eod ((ipcbuf_t *) dbevent->in_hdu->data_block)) + have_old_buffers = 0; + + // get the time and byte offset for the time + const time_t now = time(0); + const time_t obs_offset = (now - dbevent->utc_start); + if (obs_offset <= 0) + { + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "check_read_offset: now <= obs_offset\n"); + return (0); + } + + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "check_read_offset: now=%ld utc_start=%ld obs_offset=%ld\n", now, dbevent->utc_start, obs_offset); + const uint64_t now_byte_offset = (uint64_t) obs_offset * dbevent->bytes_per_second; + + uint64_t read_offset, remainder, seek_byte; + int64_t seeked_byte; + unsigned n_skipped = 0; + + while (have_old_buffers) + { + // get the current read offset in bytes + read_offset = ipcio_tell (dbevent->in_hdu->data_block); + + // if the current read offset + the maximum allowed delay is still less than + // the current byte offset, then we MUST read while it is not + if (read_offset + max_delay_bytes < now_byte_offset) + { + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "check_read_offset: read_offset[%" PRIu64"] + max_delay_bytes[%" PRIu64"] < now_byte_offset[%" PRIu64"]\n", read_offset, max_delay_bytes, now_byte_offset); + + remainder = read_offset % dbevent->in_bufsz; + seek_byte = 0; + + if (dbevent->verbose > 1) + multilog (dbevent->log, LOG_INFO, "check_read_offset: read_offset=%" PRIu64" remainder=%" PRIu64"\n", read_offset, remainder); + + // check for a partially read block + if (remainder != 0) + { + seek_byte = dbevent->in_bufsz - remainder; + } + // otherwise we will seek forward 1 full block + else + { + seek_byte = dbevent->in_bufsz; + } + + if (((int) seek_byte < 0) || (seek_byte > dbevent->in_bufsz)) + { + multilog (dbevent->log, LOG_WARNING, "check_read_offset: seek_byte limits warning: %" PRIi64"\n", seek_byte); + } + + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "check_read_offset: ipcio_seek(%" PRIu64", SEEK_CUR)\n", seek_byte); + seeked_byte = ipcio_seek (dbevent->in_hdu->data_block, seek_byte, SEEK_CUR); + if (seeked_byte < 0) + { + multilog (dbevent->log, LOG_INFO, "check_read_offset: ipcio_seek failed\n"); + return -1; + } + n_skipped ++; + } + else + have_old_buffers = 0; + + // also check for end of data + if (ipcbuf_eod ((ipcbuf_t *) dbevent->in_hdu->data_block)) + { + have_old_buffers = 0; + } + } + + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "check_read_offset: skipped %" PRIi64" blocks\n", n_skipped); + + return n_skipped; +} + +int receive_events (dada_dbevent_t * dbevent, int listen_fd) +{ + multilog_t * log = dbevent->log; + + int fd = 0; + FILE * sockin = 0; + unsigned more_events = 1; + + unsigned buffer_size = 1024; + char *buffer = new char[buffer_size]; + + char * event_start; + char * event_start_fractional; + char * event_end; + char * event_end_fractional; + char * event_snr_str; + char * event_dm_str; + char * event_width_str; + char * event_beam_str; + + // arrays for n_events + uint64_t n_events = 0; + event_t * events = NULL; + + int events_recorded = 0; + int events_missed = 0; + + if (dbevent->verbose) + multilog (log, LOG_INFO, "receive_events: sock_accept(listen_fd)\n"); + fd = sock_accept (listen_fd); + if (fd < 0) + { + multilog(log, LOG_WARNING, "error accepting connection %s\n", strerror(errno)); + return -1; + } + + sockin = fdopen(fd,"r"); + if (!sockin) + { + multilog(log, LOG_WARNING, "error creating input stream %s\n", strerror(errno)); + close(fd); + return -1; + } + + setbuf (sockin, 0); + + // first line on the socket should be the number of events + if (fgets (buffer, buffer_size, sockin) == NULL) + { + fprintf(stderr, "Error: Null pointer from socket\n"); + exit(1); + } + if (sscanf (buffer, "N_EVENTS %" PRIu64, &n_events) != 1) + { + multilog(log, LOG_WARNING, "failed to parse N_EVENTS\n"); + more_events = 0; + } + else + { + events = (event_t *) malloc (sizeof(event_t) * n_events); + } + + // second line on the socket should be the UTC_START of the obsevation + if (fgets (buffer, buffer_size, sockin) == NULL) + { + fprintf(stderr, "Error: Null pointer from socket\n"); + exit(1); + } + time_t event_utc_start = str2utctime (buffer); + + char * comment = 0; + unsigned i = 0; + const char * sep_time = ". \t"; + const char * sep_float = " \t"; + int64_t offset; + uint64_t remainder; + + while (more_events > 0 && !feof(sockin)) + { + if (dbevent->verbose > 1) + multilog (log, LOG_INFO, "getting new line\n"); + char * saveptr = 0; + + if (fgets (buffer, buffer_size, sockin) == NULL) + { + fprintf(stderr, "Error: Null pointer from socket\n"); + exit(1); + } + //if (dbevent->verbose > 1) + multilog (log, LOG_INFO, " <- %s", buffer); + + // ignore comments + comment = strchr( buffer, '#' ); + if (comment) + *comment = '\0'; + + comment = strchr( buffer, '\r' ); + if (comment) + *comment = '\0'; + + if (dbevent->verbose) + multilog (log, LOG_INFO, "< - %s\n", buffer); + + if (strlen(buffer) < 10) + continue; + + if (strcmp(buffer, "QUIT") == 0) + { + multilog (log, LOG_WARNING, "receive_events: QUIT event received\n"); + more_events = -1; + continue; + } + + // extract START_UTC string excluding sub-second components + event_start = strtok_r (buffer, sep_time, &saveptr); + if (event_start == NULL) + { + multilog (log, LOG_WARNING, "receive_events: problem extracting event_start\n"); + more_events = 0; + continue; + } + event_start_fractional = strtok_r (NULL, sep_time, &saveptr); + + if (dbevent->verbose) + multilog (log, LOG_INFO, "event_start=%s event_start_fractional=%s\n", event_start, event_start_fractional); + + offset = calculate_byte_offset (dbevent, event_start, event_start_fractional); + if (offset >= 0) + { + remainder = offset % dbevent->resolution; + if (remainder != 0) + { + if (offset > (int) dbevent->resolution) + events[i].start_byte = (uint64_t) (offset - remainder); + else + events[i].start_byte = dbevent->resolution; + } + else + events[i].start_byte = (uint64_t) offset; + } + else + events[i].start_byte = 0; + + // extract END_UTC string excluding sub-second components + event_end = strtok_r (NULL, sep_time, &saveptr); + if (event_end == NULL) + { + multilog (log, LOG_WARNING, "receive_events: problem extracting event_end\n"); + more_events = 0; + continue; + } + event_end_fractional = strtok_r (NULL, sep_time, &saveptr); + + if (dbevent->verbose) + multilog (log, LOG_INFO, "event_end=%s event_end_fractional=%s\n", event_start, event_start_fractional); + offset = calculate_byte_offset (dbevent, event_end, event_end_fractional); + if (offset >= 0) + { + events[i].end_byte = (uint64_t) offset; + remainder = offset % dbevent->resolution; + if (remainder != 0) + events[i].end_byte += (dbevent->resolution - remainder); + } + else + events[i].end_byte = 0; + + event_dm_str = strtok_r (NULL, sep_float, &saveptr); + sscanf(event_dm_str, "%f", &(events[i].dm)); + + event_snr_str = strtok_r (NULL, sep_float, &saveptr); + sscanf(event_snr_str, "%f", &(events[i].snr)); + + event_width_str = strtok_r (NULL, sep_float, &saveptr); + sscanf(event_width_str, "%f", &(events[i].width)); + + event_beam_str = strtok_r (NULL, sep_float, &saveptr); + sscanf(event_beam_str, "%u", &(events[i].beam)); + + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "event: %" PRIi64" - %" PRIi64" SNR=%f, DM=%f WIDTH=%f beam=%u\n", + events[i].start_byte, events[i].end_byte, events[i].snr, events[i].dm, + events[i].width, events[i].beam); + + i++; + + if (i >= n_events) + more_events = 0; + } + + if (n_events > 0) + { + if (event_utc_start != dbevent->utc_start) + { + multilog (dbevent->log, LOG_WARNING, "Event UTC_START [%d] != Obs UTC_START [%d]\n", event_utc_start, dbevent->utc_start); + } + else + { + // sort the events based on event start time + qsort (events, n_events, sizeof (event_t), sort_events); + + // now check for overlapping events + for (i=1; i<n_events; i++) + { + // start overlap + if (events[i].start_byte < events[i-1].end_byte) + { + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "amalgamating event idx %d into %d\n", i-1, i); + events[i].start_byte = events[i-1].start_byte; + events[i-1].start_byte = 0; + + if (events[i-1].end_byte > events[i].end_byte) + events[i].end_byte = events[i-1].end_byte; + events[i-1].end_byte = 0; + } + } + + // for each event, check that its in the future, and if so, seek forward to it + uint64_t current_byte = 0; + int64_t seeked_byte = 0; + for (i=0; i<n_events; i++) + { + if ((events[i].start_byte == 0) && (events[i].end_byte == 0)) + { + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "ignoring event[%d], start_byte == end_byte == 0\n", i); + continue; + } + + current_byte = ipcio_tell (dbevent->in_hdu->data_block); + //multilog (dbevent->log, LOG_INFO, "current_byte=%"PRIu64"\n", current_byte); + + if (events[i].start_byte < current_byte) + { + multilog (dbevent->log, LOG_WARNING, "skipping events[%d], current_byte [%" PRIu64"] past event start_byte [%" PRIu64"]\n", i, current_byte, events[i].start_byte); + events_missed++; + continue; + } + + // seek forward to the relevant point in the datablock + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "seeking forward %" PRIu64" bytes from start of obs\n", events[i].start_byte); + seeked_byte = ipcio_seek (dbevent->in_hdu->data_block, (int64_t) events[i].start_byte, SEEK_SET); + if (seeked_byte < 0) + { + multilog (dbevent->log, LOG_WARNING, "could not seek to byte %" PRIu64"\n", events[i].start_byte); + events_missed++; + continue; + } + + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "seeked_byte=%" PRIi64"\n", seeked_byte); + + // determine how much to read + size_t to_read = events[i].end_byte - events[i].start_byte; + multilog (dbevent->log, LOG_INFO, "to read = %d [%" PRIu64" - %" PRIu64"]\n", to_read, events[i].end_byte, events[i].start_byte); + + if (dbevent->work_buffer_size < to_read) + { + dbevent->work_buffer_size = to_read; + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "reallocating work_buffer [%p] to %d bytes\n", + dbevent->work_buffer, dbevent->work_buffer_size); + dbevent->work_buffer = realloc (dbevent->work_buffer, dbevent->work_buffer_size); + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "reallocated work_buffer [%p]\n", dbevent->work_buffer); + } + + // read the event from the input buffer + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "reading %d bytes from input HDU into work buffer\n", to_read); + ssize_t bytes_read = ipcio_read (dbevent->in_hdu->data_block, (char *) dbevent->work_buffer, to_read); + if (dbevent->verbose) + multilog (dbevent->log, LOG_INFO, "read %d bytes from input HDU into work buffer\n", bytes_read); + if (bytes_read < 0) + { + multilog (dbevent->log, LOG_WARNING, "receive_events: ipcio_read on input HDU failed\n"); + return -1; + } + + /*// save cut-outs of the event here + dbevent->event_buffer_size = ((2.0 + events[i].width)/events[i].tsamp) * events[i].nchans * 2; // 16 bytes per sample + dbevent->event_buffer = malloc(dbevent->event_buffer_size * sizeof(char)); + int first_offset = 0; //calc_offset(); + (char*) dbevent->work_buffer += first_offset; + for (int ii=0; ii < nchans; ++ii) + { + int ftemp1 = f1; + int ftemp2 = f1 + ((f1 - f2)/events[i].nchans); + double delay = dm_delay(events[i].f1, events[i].f2, events[i].dm); + memcpy(dbevent->event_buffer, dbevent->work_buffer, dbevent->event_buffer_size); + }*/ + + events_recorded++; + + char * header = ipcbuf_get_next_write (dbevent->out_hdu->header_block); + uint64_t header_size = ipcbuf_get_bufsz (dbevent->out_hdu->header_block); + if (header_size < dbevent->header_size) + { + multilog (log, LOG_ERR, "receive_events: output header too small for input header\n"); + return -1; + } + + // copy the input header to the output + memcpy (header, dbevent->header, dbevent->header_size); + + // now write some relevant data to the header + ascii_header_set (header, "OBS_OFFSET", "%" PRIu64, events[i].start_byte); + ascii_header_set (header, "FILE_SIZE", "%ld", to_read); + ascii_header_set (header, "EVENT_SNR", "%f", events[i].snr); + ascii_header_set (header, "EVENT_DM", "%f", events[i].dm); + ascii_header_set (header, "EVENT_WIDTH", "%f", events[i].width); + ascii_header_set (header, "EVENT_BEAM", "%u", events[i].beam); + ascii_header_set (header, "EVENT_FIRST_FREQUENCY", "%f", events[i].f1); + ascii_header_set (header, "EVENT_LAST_FREQUENCY", "%f", events[i].f2); + ascii_header_set (header, "EVENT_TSAMP", "%f", events[i].tsamp); + ascii_header_set (header, "EVENT_NCHANS", "%d", events[i].nchans); + + // tag this header as filled + ipcbuf_mark_filled (dbevent->out_hdu->header_block, header_size); + + // write the specified amount to the output data block + ipcio_write (dbevent->out_hdu->data_block, (char *) dbevent->event_buffer, to_read); + + // close the data block to ensure EOD is written + if (dada_hdu_unlock_write (dbevent->out_hdu) < 0) + { + multilog (log, LOG_ERR, "could not close output HDU as writer\n"); + return -1; + } + + // close the data block to ensure EOD is written + // lock write again to re-open for the next event + if (dada_hdu_lock_write (dbevent->out_hdu) < 0) + { + multilog (log, LOG_ERR, "could not open output HDU as writer\n"); + return -1; + } + } + + multilog (dbevent->log, LOG_INFO, "recorded=%d missed=%d\n", events_recorded, events_missed); + delete[] buffer; + } + } + + fclose(sockin); + close (fd); + + if (events) + free(events); + events = 0; + + return events_recorded; +} + +int64_t calculate_byte_offset (dada_dbevent_t * dbevent, char * time_str_secs, char * time_str_frac) +{ + time_t time_secs; // integer time in seconds + uint64_t time_frac_numer; // numerator of fractional time + uint64_t time_frac_denom; // denominator of fractional time + + int64_t event_byte_offset = -1; + uint64_t event_byte; + uint64_t event_byte_frac; + + time_secs = str2utctime (time_str_secs); + sscanf (time_str_frac, "%" PRIu64, &time_frac_numer); + time_frac_denom = (uint64_t) powf(10,strlen(time_str_frac)); + + if (dbevent->verbose > 1) + multilog (dbevent->log, LOG_INFO, "calculate_byte_offset: time_secs=%d, time_frac_numer=%" PRIu64", time_frac_denom=%" PRIu64"\n", + time_secs, time_frac_numer, time_frac_denom); + + // check we have utc_start and that this event is in the future + if (dbevent->utc_start && (time_secs >= dbevent->utc_start)) + { + event_byte = (time_secs - dbevent->utc_start) * dbevent->bytes_per_second; + event_byte_frac = time_frac_numer * dbevent->bytes_per_second; + event_byte_frac /= time_frac_denom; + event_byte_offset = (int64_t) event_byte + event_byte_frac; + if (dbevent->verbose > 1) + multilog (dbevent->log, LOG_INFO, "calculate_byte_offset: byte_offset [%" PRIi64"]=event_byte [%" PRIu64"] + event_byte_frac [%" PRIu64"]\n", + event_byte_offset, event_byte, event_byte_frac); + } + else + { + multilog (dbevent->log, LOG_ERR, "calculate_byte_offset: time_secs=%d >= dbevent->utc_start=%" PRIu64"\n", time_secs, dbevent->utc_start); + } + return event_byte_offset; +} + + +/* + * dump the specified event to the output datablock */ +int dump_event (dada_dbevent_t * dbevent, double event_start_utc, + double event_end_utc, float event_snr, float event_dm) +{ + multilog (dbevent->log, LOG_INFO, "event time: %lf - %lf [seconds]\n", event_start_utc, event_end_utc); + multilog (dbevent->log, LOG_INFO, "event SNR: %f\n", event_snr); + multilog (dbevent->log, LOG_INFO, "event DM: %f\n", event_dm); + return 0; +} diff --git a/psrdada_cpp/meerkat/tuse/src/transpose_to_file_cli.cpp b/psrdada_cpp/meerkat/tuse/src/transpose_to_file_cli.cpp index eed5c4288f6e96730d276e44fc76fdcc9dbbbd69..4aa7170bf465a35b2f2c71ac606f619dc553456f 100644 --- a/psrdada_cpp/meerkat/tuse/src/transpose_to_file_cli.cpp +++ b/psrdada_cpp/meerkat/tuse/src/transpose_to_file_cli.cpp @@ -90,7 +90,7 @@ int main(int argc, char** argv) { std::string filename = "beam" + std::to_string(ii) + ".fil"; files.emplace_back(std::make_shared<SimpleFileWriter>(filename)); - } + } meerkat::tuse::TransposeToDada<SimpleFileWriter> transpose(nbeams,std::move(files)); transpose.set_nsamples(nsamples); transpose.set_nchans(nchans); @@ -109,7 +109,7 @@ int main(int argc, char** argv) std::cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << std::endl; return ERROR_UNHANDLED_EXCEPTION; - } - return SUCCESS; + } + return SUCCESS; } diff --git a/psrdada_cpp/meerkat/tuse/src/transpose_to_null_cli.cpp b/psrdada_cpp/meerkat/tuse/src/transpose_to_null_cli.cpp index 7e67792c9d811d458040ed5b2772a9a626ca5804..5c49e79f25374e4d566af5c574405e1ba9b28d8d 100644 --- a/psrdada_cpp/meerkat/tuse/src/transpose_to_null_cli.cpp +++ b/psrdada_cpp/meerkat/tuse/src/transpose_to_null_cli.cpp @@ -99,13 +99,13 @@ int main(int argc, char** argv) { nullsinks.emplace_back(std::make_shared<NullSink>()); } - meerkat::tuse::TransposeToDada<NullSink> transpose(nbeams,std::move(nullsinks)); transpose.set_nsamples(nsamples); transpose.set_nchans(nchans); transpose.set_nfreq(nfreq); transpose.set_ngroups(ngroups); transpose.set_nbeams(nbeams); + PsrDadaToSigprocHeader<decltype(transpose)> ptos(transpose); MultiLog log1("instream"); DadaInputStream<decltype(ptos)> input(input_key,log1,ptos); diff --git a/psrdada_cpp/meerkat/tuse/test/TransposeTest.h b/psrdada_cpp/meerkat/tuse/test/TransposeTest.h new file mode 100644 index 0000000000000000000000000000000000000000..f1d74df3110134262cbd55c8f93716699501dc55 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/test/TransposeTest.h @@ -0,0 +1,29 @@ +#ifndef PSRDADA_CPP_MEERKAT_TUSE_TRANSPOSETEST_H +#define PSRDADA_CPP_MEERKAT_TUSE_TRANSPOSETEST_H + +#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp" +#include <gtest/gtest.h> + +namespace psrdada_cpp { +namespace meerkat { +namespace tuse { +namespace test { + +class TransposeTest: public ::testing::Test +{ +protected: + void SetUp() override; + void TearDown() override; + +public: + TransposeTest(); + ~TransposeTest(); + +}; + +} //namespace test +} //namespace tuse +} //namespace meerkat +} //namespace psrdada_cpp + +#endif //PSRDADA_CPP_MEERKAT_TUSE_TRANSPOSETEST_H diff --git a/psrdada_cpp/meerkat/tuse/test/src/TransposeTest.cpp b/psrdada_cpp/meerkat/tuse/test/src/TransposeTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3595ce8f2b6ff970b870e275b6a89c492f78d063 --- /dev/null +++ b/psrdada_cpp/meerkat/tuse/test/src/TransposeTest.cpp @@ -0,0 +1,35 @@ +#include "psrdada_cpp/meerkat/tuse/test/TransposeTest.h" +#include "psrdada_cpp/meerkat/tuse/transpose_to_dada.h" + +namespace psrdada_cpp { +namespace meerkat { +namespace tuse { +namespace test { + +TransposeTest::TransposeTest() + : ::testing::Test() +{ +} + +TransposeTest::~TransposeTest() +{ +} + +void TransposeTest::SetUp() +{ +} + +void TransposeTest::TearDown() +{ +} + +TEST_F(TransposeTest, test_attributes) +{ + +} + +} //namespace test +} //namespace tuse +} //namespace meerkat +} //namespace psrdada_cpp + diff --git a/psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp b/psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp index d2013a58ca230ca3a1fc19f5a0f35b985fce8004..ad6a61d298c26fb8b98e29ac152a83dfabc7092c 100644 --- a/psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp +++ b/psrdada_cpp/meerkat/tuse/transpose_to_dada.hpp @@ -58,53 +58,53 @@ public: void set_ngroups(const int ngroups); - /** - * @brief Setter for frequency channels - */ + /** + * @brief Setter for frequency channels + */ - void set_nchans(const int nchans); + void set_nchans(const int nchans); - /** - * @brief Setter of number of time samples - */ + /** + * @brief Setter of number of time samples + */ - void set_nsamples(const int nsamples); + void set_nsamples(const int nsamples); - /** - * @brief Setter for number of frequency blocks - */ + /** + * @brief Setter for number of frequency blocks + */ - void set_nfreq(const int _nfreq); + void set_nfreq(const int _nfreq); - /** - * @brief getter for number of channels - */ + /** + * @brief getter for number of channels + */ - std::uint32_t nchans(); + std::uint32_t nchans(); - /** - * @brief getter for number of time samples - */ + /** + * @brief getter for number of time samples + */ - std::uint32_t nsamples(); + std::uint32_t nsamples(); - /** - * @brief getter for frequency blocks - */ + /** + * @brief getter for frequency blocks + */ - std::uint32_t nfreq(); + std::uint32_t nfreq(); - /** - *@brief: getter for ngroups - */ + /** + *@brief: getter for ngroups + */ - std::uint32_t ngroups(); + std::uint32_t ngroups(); - /** - *@brief: getter for nbeams - */ + /** + *@brief: getter for nbeams + */ - std::uint32_t nbeams(); + std::uint32_t nbeams(); private: std::uint32_t _numbeams; diff --git a/psrdada_cpp/src/file_to_dada_cli.cpp b/psrdada_cpp/src/file_to_dada_cli.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a173262c62c736ea2aaff6fbb89775a6b8796259 --- /dev/null +++ b/psrdada_cpp/src/file_to_dada_cli.cpp @@ -0,0 +1,98 @@ +#include "psrdada_cpp/cli_utils.hpp" +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/file_input_stream.hpp" +#include "psrdada_cpp/dada_output_stream.hpp" +#include <memory> +#include <fstream> + +#include "boost/program_options.hpp" + +#include <ctime> + + +using namespace psrdada_cpp; + +namespace +{ + const size_t ERROR_IN_COMMAND_LINE = 1; + const size_t SUCCESS = 0; + const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + + +int main(int argc, char** argv) +{ + try + { + key_t output_key; + std::size_t headersize; + std::size_t nbytes; + std::string filename; + /** Define and parse the program options + * */ + namespace po = boost::program_options; + po::options_description desc("Options"); + desc.add_options() + ("output_key,k", po::value<std::string>() + ->default_value("dada") + ->notifier([&output_key](std::string in) + { + output_key = string_to_key(in); + }), + "The shared memory key for the output dada buffer to connect to (hex string)") + ("help,h", "Print help messages") + ("input_file,f", po::value<std::string>(&filename)->required(), + "Input file to read") + ("nbytes,n", po::value<std::size_t>(&nbytes)->required(), + "Number of bytes to read in one DADA block") + ("header_size,s", po::value<std::size_t>(&headersize)->required(), + "size of header to read"); + + + +/* Catch Error and program description */ + po::variables_map vm; + try + { + po::store(po::parse_command_line(argc, argv, desc), vm); + if ( vm.count("help") ) + { + std::cout << "Transpose2Dada -- read MeerKAT beamformed dada from DADA buffer, transpose per beam and write to an output DADA buffer" + << std::endl << desc << std::endl; + return SUCCESS; + } + po::notify(vm); + } + catch(po::error& e) + { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + + /* Application Code */ + + MultiLog log("outstream"); + + /* Setting up the pipeline based on the type of sink*/ + + DadaOutputStream outstream(output_key, log); + + FileInputStream<decltype(outstream)> input(filename, headersize, nbytes, outstream); + + input.start(); + + + /* End Application Code */ + + } + catch(std::exception& e) + { + std::cerr << "Unhandled Exception reached the top of main: " + << e.what() << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; + +} diff --git a/psrdada_cpp/src/sigprocheader.cpp b/psrdada_cpp/src/sigprocheader.cpp index cc6e266f5257a7dca7f4d1c262b202cbb0335d75..6d1246bae97dc71519973bd3a35baccb70889634 100644 --- a/psrdada_cpp/src/sigprocheader.cpp +++ b/psrdada_cpp/src/sigprocheader.cpp @@ -21,20 +21,20 @@ namespace psrdada_cpp { int len = str.size(); std::memcpy(ptr,(char*)&len,sizeof(len)); - ptr += sizeof(len); + ptr += sizeof(len); std::copy(str.begin(),str.end(),ptr); - ptr += len; + ptr += len; } void SigprocHeader::header_write(char*& ptr, std::string const& str, std::string const& name) { - header_write(ptr,str); - header_write(ptr,name); + header_write(ptr,str); + header_write(ptr,name); } void SigprocHeader::write_header(RawBytes& block, PsrDadaHeader ph) { - auto ptr = block.ptr(); + auto ptr = block.ptr(); header_write(ptr,"HEADER_START"); header_write<std::uint32_t>(ptr,"telescope_id",0); header_write<std::uint32_t>(ptr,"machine_id",11); @@ -44,8 +44,8 @@ namespace psrdada_cpp // RA DEC auto ra_val = ph.ra(); auto dec_val =ph.dec(); - std::vector<std::string> ra_s; - std::vector<std::string> dec_s; + std::vector<std::string> ra_s; + std::vector<std::string> dec_s; boost::split(ra_s,ra_val,boost::is_any_of(":")); boost::split(dec_s,dec_val,boost::is_any_of(":")); double ra = stod(boost::join(ra_s,"")); @@ -65,7 +65,7 @@ namespace psrdada_cpp std::size_t SigprocHeader::header_size() const { - return _header_size; + return _header_size; }