vdif_send.cu 7.52 KB
Newer Older
Tobias Winchen's avatar
Tobias Winchen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_client_base.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_null_sink.hpp"
#include "psrdada_cpp/dada_output_stream.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"

#include <boost/program_options.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <string>
15
16
#include <chrono>
#include <thread>
Tobias Winchen's avatar
Tobias Winchen committed
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

using namespace psrdada_cpp;

namespace {
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

class VDIF_Sender
{
  private:
33
    std::string destination_ip, source_ip;
Tobias Winchen's avatar
Tobias Winchen committed
34
    int port;
35
    double max_rate;
Tobias Winchen's avatar
Tobias Winchen committed
36
37
38
39
40
41
42
43
44
45

    boost::asio::ip::udp::socket socket;
    boost::asio::ip::udp::endpoint remote_endpoint;

  public:
  /**
     * @brief      Constructor.
     *
     * @param      destination_ip Address to send the udo packages to.
     * @param      port           Port to use.
46
     * @param      max_rate       Output data rate - Usefull to avoid burst
Tobias Winchen's avatar
Tobias Winchen committed
47
48
49
     * @param      io_service     boost::asio::io_Service instance to use for
     *                            communication.
     */
50
51
    VDIF_Sender(const std::string &source_ip, const std::string &destination_ip, int port, double max_rate, boost::asio::io_service&
        io_service): socket(io_service), source_ip(source_ip), destination_ip(destination_ip), port(port), max_rate(max_rate)
Tobias Winchen's avatar
Tobias Winchen committed
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
    {

    }

  /**
   * @brief      A callback to be called on connection
   *             to a ring buffer.
   *
   * @detail     The first available header block in the
   *             in the ring buffer is provided as an argument.
   *             It is here that header parameters could be read
   *             if desired.
   *
   * @param      block  A RawBytes object wrapping a DADA header buffer
   */
  void init(RawBytes &block)
  {
69
    BOOST_LOG_TRIVIAL(debug) << "Preparing socket for communication from " << source_ip << " to " << destination_ip << " port: " << port;
Tobias Winchen's avatar
Tobias Winchen committed
70
71
    // drop header as not needed, only open socket.
    remote_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string(destination_ip), port);
72
73
74
75
    boost::asio::ip::address_v4 local_interface =
      boost::asio::ip::address_v4::from_string(source_ip);
    boost::asio::ip::multicast::outbound_interface option(local_interface);

Tobias Winchen's avatar
Tobias Winchen committed
76
    socket.open(boost::asio::ip::udp::v4());
77
    socket.set_option(option);
Tobias Winchen's avatar
Tobias Winchen committed
78
79
80
81
82
83
84
85
86
87
88
89
90
  };

  /**
   * @brief      A callback to be called on acqusition of a new
   *             data block.
   *
   * @param      block  A RawBytes object wrapping a DADA data buffer
   */
  bool operator()(RawBytes &block)
  {
    if (block.used_bytes() == 0)
    {
      BOOST_LOG_TRIVIAL(info) << "Received empty block, exiting.";
91
      return false;
Tobias Winchen's avatar
Tobias Winchen committed
92
93
94
95
96
97
98
99
    }
    boost::system::error_code err;
    VDIFHeaderView vdifHeader(reinterpret_cast<uint32_t*>(block.ptr()));

    size_t blockSize = block.used_bytes();

    BOOST_LOG_TRIVIAL(debug) << " Length of first frame: " << vdifHeader.getDataFrameLength() * 8  << " bytes";
    size_t counter = 0;
100
101

    auto start = std::chrono::high_resolution_clock::now();
Tobias Winchen's avatar
Tobias Winchen committed
102
103
104
    for(char* frame_start = block.ptr(); frame_start < block.ptr() + blockSize; frame_start += vdifHeader.getDataFrameLength() * 8)
    {
      vdifHeader.setDataLocation(reinterpret_cast<uint32_t*>(frame_start));
105
106
107
108
			// skip invalid blocks
			if (!vdifHeader.isValid())
				continue;

Tobias Winchen's avatar
Tobias Winchen committed
109
110
111
112
      uint32_t frameLength = vdifHeader.getDataFrameLength() * 8; // in units of 8 bytes

      socket.send_to(boost::asio::buffer(frame_start, frameLength), remote_endpoint, 0, err);
      counter++;
113
114
115
116
117
118
119
120
121
122
123
124
125
126

      size_t processed_bytes = (frame_start - block.ptr()) + frameLength;
      auto elapsed_time = std::chrono::high_resolution_clock::now() - start;

      double current_rate = processed_bytes / (std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed_time).count() * 1E-9);
      if (current_rate > max_rate)
      {
        std::chrono::duration<double, std::nano> expected_time(processed_bytes / max_rate * 1E9);

        auto delay = expected_time  - elapsed_time;
        std::this_thread::sleep_for(delay);

        //BOOST_LOG_TRIVIAL(debug) << counter << " Set delay to " << delay.count()<< " ns. Current rate " << current_rate << ", processed_bytes: " << processed_bytes;
      }
Tobias Winchen's avatar
Tobias Winchen committed
127
128
			if (counter < 5)
	      BOOST_LOG_TRIVIAL(debug) << counter << " Send - FN: " << vdifHeader.getDataFrameNumber() <<  ", Sec f. E.: " << vdifHeader.getSecondsFromReferenceEpoch() << " Get TS.: " << vdifHeader.getTimestamp();
129

Tobias Winchen's avatar
Tobias Winchen committed
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
    }
    BOOST_LOG_TRIVIAL(info) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size.";
    return false;
  }
};



}
}
} // namespaces



int main(int argc, char **argv) {
  try {
    key_t input_key;

    int destination_port;
149
    std::string destination_ip, source_ip;
150
    double max_rate;
Tobias Winchen's avatar
Tobias Winchen committed
151
152
153
154
155
156
157
158
159
160
161
162
163

    /** Define and parse the program options
    */
    namespace po = boost::program_options;
    po::options_description desc("Options");

    desc.add_options()("help,h", "Print help messages");
    desc.add_options()(
        "input_key,i",
        po::value<std::string>()->default_value("dada")->notifier(
            [&input_key](std::string in) { input_key = string_to_key(in); }),
        "The shared memory key for the dada buffer to connect to (hex "
        "string)");
164
    desc.add_options()("dest_ip",
Tobias Winchen's avatar
Tobias Winchen committed
165
166
                       po::value<std::string>(&destination_ip)->required(),
                       "Destination IP");
167
168
169
    desc.add_options()("if_ip",
                       po::value<std::string>(&source_ip)->required(),
                       "IP of the interface to use");
Tobias Winchen's avatar
Tobias Winchen committed
170
171
172
173
174
    desc.add_options()("port",
                       po::value<int>()->default_value(8125)->notifier(
                           [&destination_port](int in) { destination_port = in; }),
                       "Destination PORT");

175
176
177
178
    desc.add_options()("max_rate",
                       po::value<double>()->default_value(1024*1024*5)->notifier(
                           [&max_rate](double in) { max_rate = in; }),
                       "Limit the output rate to [max_rate] byte/s");
Tobias Winchen's avatar
Tobias Winchen committed
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208

    desc.add_options()(
        "log_level", po::value<std::string>()->default_value("info")->notifier(
                         [](std::string level) { set_log_level(level); }),
        "The logging level to use "
        "(trace, debug, info, warning, "
        "error)");

    po::variables_map vm;
    try {
      po::store(po::parse_command_line(argc, argv, desc), vm);
      if (vm.count("help")) {
        std::cout << "vdif_Send -- send vidf frames in a dada buffer to an ip via UDP."
                  << std::endl << desc << std::endl;
        return SUCCESS;
      }
      po::notify(vm);

    } catch (po::error &e) {
      std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
      std::cerr << desc << std::endl;
      return ERROR_IN_COMMAND_LINE;
    }

    MultiLog log("edd::vdif_send");
    DadaClientBase client(input_key, log);
    std::size_t buffer_bytes = client.data_buffer_size();


    boost::asio::io_service io_service;
209
    effelsberg::edd::VDIF_Sender vdif_sender(source_ip, destination_ip, destination_port, max_rate, io_service);
Tobias Winchen's avatar
Tobias Winchen committed
210
211
212
213
214
215
216
217
218
219
220
221
222

    DadaInputStream<decltype(vdif_sender)> istream(input_key, log, vdif_sender);
    istream.start();


  } catch (std::exception &e) {
    std::cerr << "Unhandled Exception reached the top of main: " << e.what()
              << ", application will now exit" << std::endl;
    return ERROR_UNHANDLED_EXCEPTION;
  }
  return SUCCESS;
}