Commit 0d52a7fc authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Prototype for vdif_send

parent b4db0b3a
......@@ -30,6 +30,10 @@ cuda_add_executable(VLBI src/VLBI_cli.cu)
target_link_libraries(VLBI ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas)
install(TARGETS VLBI DESTINATION bin)
cuda_add_executable(vdif_send src/vdif_send.cu)
target_link_libraries(vdif_send ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
install(TARGETS VLBI DESTINATION bin)
cuda_add_executable(dada_dummy_data src/dummy_data_generator.cu)
target_link_libraries(dada_dummy_data ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
install(TARGETS VLBI DESTINATION bin)
......
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_client_base.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_null_sink.hpp"
#include "psrdada_cpp/dada_output_stream.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"
#include <boost/program_options.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <string>
using namespace psrdada_cpp;
namespace {
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class VDIF_Sender
{
private:
std::string destination_ip;
int port;
boost::asio::ip::udp::socket socket;
boost::asio::ip::udp::endpoint remote_endpoint;
public:
/**
* @brief Constructor.
*
* @param destination_ip Address to send the udo packages to.
* @param port Port to use.
* @param io_service boost::asio::io_Service instance to use for
* communication.
*/
VDIF_Sender(std::string destination_ip, int port, boost::asio::io_service&
io_service): socket(io_service), destination_ip(destination_ip), port(port)
{
}
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes &block)
{
// drop header as not needed, only open socket.
remote_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string(destination_ip), port);
socket.open(boost::asio::ip::udp::v4());
};
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes &block)
{
if (block.used_bytes() == 0)
{
BOOST_LOG_TRIVIAL(info) << "Received empty block, exiting.";
return true;
}
boost::system::error_code err;
VDIFHeaderView vdifHeader(reinterpret_cast<uint32_t*>(block.ptr()));
size_t blockSize = block.used_bytes();
BOOST_LOG_TRIVIAL(debug) << " Length of first frame: " << vdifHeader.getDataFrameLength() * 8 << " bytes";
size_t counter = 0;
for(char* frame_start = block.ptr(); frame_start < block.ptr() + blockSize; frame_start += vdifHeader.getDataFrameLength() * 8)
{
vdifHeader.setDataLocation(reinterpret_cast<uint32_t*>(frame_start));
uint32_t frameLength = vdifHeader.getDataFrameLength() * 8; // in units of 8 bytes
socket.send_to(boost::asio::buffer(frame_start, frameLength), remote_endpoint, 0, err);
counter++;
}
BOOST_LOG_TRIVIAL(info) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size.";
return false;
}
};
}
}
} // namespaces
int main(int argc, char **argv) {
try {
key_t input_key;
int destination_port;
std::string destination_ip;
double rate;
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()("help,h", "Print help messages");
desc.add_options()(
"input_key,i",
po::value<std::string>()->default_value("dada")->notifier(
[&input_key](std::string in) { input_key = string_to_key(in); }),
"The shared memory key for the dada buffer to connect to (hex "
"string)");
desc.add_options()("ip",
po::value<std::string>(&destination_ip)->required(),
"Destination IP");
desc.add_options()("port",
po::value<int>()->default_value(8125)->notifier(
[&destination_port](int in) { destination_port = in; }),
"Destination PORT");
// desc.add_options()("rate",
// po::value<double>()->default_value(1024*1024*5)->notifier(
// [&rate](double in) { rate = in; }),
// "Limit the output rate to [rate] byte/s");
desc.add_options()(
"log_level", po::value<std::string>()->default_value("info")->notifier(
[](std::string level) { set_log_level(level); }),
"The logging level to use "
"(trace, debug, info, warning, "
"error)");
po::variables_map vm;
try {
po::store(po::parse_command_line(argc, argv, desc), vm);
if (vm.count("help")) {
std::cout << "vdif_Send -- send vidf frames in a dada buffer to an ip via UDP."
<< std::endl << desc << std::endl;
return SUCCESS;
}
po::notify(vm);
} catch (po::error &e) {
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
MultiLog log("edd::vdif_send");
DadaClientBase client(input_key, log);
std::size_t buffer_bytes = client.data_buffer_size();
boost::asio::io_service io_service;
effelsberg::edd::VDIF_Sender vdif_sender(destination_ip, destination_port, io_service);
DadaInputStream<decltype(vdif_sender)> istream(input_key, log, vdif_sender);
istream.start();
} catch (std::exception &e) {
std::cerr << "Unhandled Exception reached the top of main: " << e.what()
<< ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment