vdif_send.cu 8.41 KB
Newer Older
Tobias Winchen's avatar
Tobias Winchen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_client_base.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_null_sink.hpp"
#include "psrdada_cpp/dada_output_stream.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"

#include <boost/program_options.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <string>
15
16
#include <chrono>
#include <thread>
Tobias Winchen's avatar
Tobias Winchen committed
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

using namespace psrdada_cpp;

namespace {
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

class VDIF_Sender
{
  private:
33
    std::string destination_ip, source_ip;
Tobias Winchen's avatar
Tobias Winchen committed
34
    int port;
35
    double max_rate;
Tobias Winchen's avatar
Tobias Winchen committed
36

37
38
39
    uint32_t currentSecondFromReferenceEpoch;
    size_t noOfSendFrames;      // frames in last second

Tobias Winchen's avatar
Tobias Winchen committed
40
41
42
43
44
45
46
47
48
    boost::asio::ip::udp::socket socket;
    boost::asio::ip::udp::endpoint remote_endpoint;

  public:
  /**
     * @brief      Constructor.
     *
     * @param      destination_ip Address to send the udo packages to.
     * @param      port           Port to use.
49
     * @param      max_rate       Output data rate - Usefull to avoid burst
Tobias Winchen's avatar
Tobias Winchen committed
50
51
52
     * @param      io_service     boost::asio::io_Service instance to use for
     *                            communication.
     */
53
    VDIF_Sender(const std::string &source_ip, const std::string &destination_ip, int port, double max_rate, boost::asio::io_service&
54
        io_service): socket(io_service), source_ip(source_ip), destination_ip(destination_ip), port(port), max_rate(max_rate), currentSecondFromReferenceEpoch(0), noOfSendFrames(0)
Tobias Winchen's avatar
Tobias Winchen committed
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
    {

    }

  /**
   * @brief      A callback to be called on connection
   *             to a ring buffer.
   *
   * @detail     The first available header block in the
   *             in the ring buffer is provided as an argument.
   *             It is here that header parameters could be read
   *             if desired.
   *
   * @param      block  A RawBytes object wrapping a DADA header buffer
   */
  void init(RawBytes &block)
  {
72
    BOOST_LOG_TRIVIAL(debug) << "Preparing socket for communication from " << source_ip << " to " << destination_ip << " port: " << port;
Tobias Winchen's avatar
Tobias Winchen committed
73
74
    // drop header as not needed, only open socket.
    remote_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string(destination_ip), port);
75
76
77
78
    boost::asio::ip::address_v4 local_interface =
      boost::asio::ip::address_v4::from_string(source_ip);
    boost::asio::ip::multicast::outbound_interface option(local_interface);

Tobias Winchen's avatar
Tobias Winchen committed
79
    socket.open(boost::asio::ip::udp::v4());
80
    socket.set_option(option);
Tobias Winchen's avatar
Tobias Winchen committed
81
82
83
84
85
86
87
88
89
90
91
92
93
  };

  /**
   * @brief      A callback to be called on acqusition of a new
   *             data block.
   *
   * @param      block  A RawBytes object wrapping a DADA data buffer
   */
  bool operator()(RawBytes &block)
  {
    if (block.used_bytes() == 0)
    {
      BOOST_LOG_TRIVIAL(info) << "Received empty block, exiting.";
94
      return false;
Tobias Winchen's avatar
Tobias Winchen committed
95
96
97
98
99
100
101
102
    }
    boost::system::error_code err;
    VDIFHeaderView vdifHeader(reinterpret_cast<uint32_t*>(block.ptr()));

    size_t blockSize = block.used_bytes();

    BOOST_LOG_TRIVIAL(debug) << " Length of first frame: " << vdifHeader.getDataFrameLength() * 8  << " bytes";
    size_t counter = 0;
103
    size_t invalidFrames = 0;
104
105

    auto start = std::chrono::high_resolution_clock::now();
Tobias Winchen's avatar
Tobias Winchen committed
106
107
108
    for(char* frame_start = block.ptr(); frame_start < block.ptr() + blockSize; frame_start += vdifHeader.getDataFrameLength() * 8)
    {
      vdifHeader.setDataLocation(reinterpret_cast<uint32_t*>(frame_start));
109
110
111
112
113
114
115
116
117
118
119
120
121
122
      // skip invalid blocks
      if (!vdifHeader.isValid())
      {
        invalidFrames++;
        continue;
      }
      if (vdifHeader.getSecondsFromReferenceEpoch() > currentSecondFromReferenceEpoch)
      {
        BOOST_LOG_TRIVIAL(info) << " New second frome reference epoch: " << vdifHeader.getSecondsFromReferenceEpoch() << ", send " << noOfSendFrames << " in previous second.";

        BOOST_LOG_TRIVIAL(debug) <<     "  Previous second from refEpoch " << currentSecondFromReferenceEpoch << " delta = " << vdifHeader.getSecondsFromReferenceEpoch() - currentSecondFromReferenceEpoch;
        currentSecondFromReferenceEpoch = vdifHeader.getSecondsFromReferenceEpoch();
        noOfSendFrames = 0;
      }
123

Tobias Winchen's avatar
Tobias Winchen committed
124
125
126
      uint32_t frameLength = vdifHeader.getDataFrameLength() * 8; // in units of 8 bytes

      socket.send_to(boost::asio::buffer(frame_start, frameLength), remote_endpoint, 0, err);
127
      noOfSendFrames++;
Tobias Winchen's avatar
Tobias Winchen committed
128
      counter++;
129
130
131
132
133
134
135
136
137
138
139
140
141
142

      size_t processed_bytes = (frame_start - block.ptr()) + frameLength;
      auto elapsed_time = std::chrono::high_resolution_clock::now() - start;

      double current_rate = processed_bytes / (std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed_time).count() * 1E-9);
      if (current_rate > max_rate)
      {
        std::chrono::duration<double, std::nano> expected_time(processed_bytes / max_rate * 1E9);

        auto delay = expected_time  - elapsed_time;
        std::this_thread::sleep_for(delay);

        //BOOST_LOG_TRIVIAL(debug) << counter << " Set delay to " << delay.count()<< " ns. Current rate " << current_rate << ", processed_bytes: " << processed_bytes;
      }
143
144
      if (counter < 5)
        BOOST_LOG_TRIVIAL(debug) << counter << " Send - FN: " << vdifHeader.getDataFrameNumber() <<  ", Sec f. E.: " << vdifHeader.getSecondsFromReferenceEpoch() << " Get TS.: " << vdifHeader.getTimestamp();
145

Tobias Winchen's avatar
Tobias Winchen committed
146
    }
147
    BOOST_LOG_TRIVIAL(debug) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size. " << invalidFrames << " invalid frames in block.";
Tobias Winchen's avatar
Tobias Winchen committed
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
    return false;
  }
};



}
}
} // namespaces



int main(int argc, char **argv) {
  try {
    key_t input_key;

    int destination_port;
165
    std::string destination_ip, source_ip;
166
    double max_rate;
Tobias Winchen's avatar
Tobias Winchen committed
167
168
169
170
171
172
173
174
175
176
177
178
179

    /** Define and parse the program options
    */
    namespace po = boost::program_options;
    po::options_description desc("Options");

    desc.add_options()("help,h", "Print help messages");
    desc.add_options()(
        "input_key,i",
        po::value<std::string>()->default_value("dada")->notifier(
            [&input_key](std::string in) { input_key = string_to_key(in); }),
        "The shared memory key for the dada buffer to connect to (hex "
        "string)");
180
    desc.add_options()("dest_ip",
Tobias Winchen's avatar
Tobias Winchen committed
181
182
                       po::value<std::string>(&destination_ip)->required(),
                       "Destination IP");
183
184
185
    desc.add_options()("if_ip",
                       po::value<std::string>(&source_ip)->required(),
                       "IP of the interface to use");
Tobias Winchen's avatar
Tobias Winchen committed
186
187
188
189
190
    desc.add_options()("port",
                       po::value<int>()->default_value(8125)->notifier(
                           [&destination_port](int in) { destination_port = in; }),
                       "Destination PORT");

191
192
193
194
    desc.add_options()("max_rate",
                       po::value<double>()->default_value(1024*1024*5)->notifier(
                           [&max_rate](double in) { max_rate = in; }),
                       "Limit the output rate to [max_rate] byte/s");
Tobias Winchen's avatar
Tobias Winchen committed
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224

    desc.add_options()(
        "log_level", po::value<std::string>()->default_value("info")->notifier(
                         [](std::string level) { set_log_level(level); }),
        "The logging level to use "
        "(trace, debug, info, warning, "
        "error)");

    po::variables_map vm;
    try {
      po::store(po::parse_command_line(argc, argv, desc), vm);
      if (vm.count("help")) {
        std::cout << "vdif_Send -- send vidf frames in a dada buffer to an ip via UDP."
                  << std::endl << desc << std::endl;
        return SUCCESS;
      }
      po::notify(vm);

    } catch (po::error &e) {
      std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
      std::cerr << desc << std::endl;
      return ERROR_IN_COMMAND_LINE;
    }

    MultiLog log("edd::vdif_send");
    DadaClientBase client(input_key, log);
    std::size_t buffer_bytes = client.data_buffer_size();


    boost::asio::io_service io_service;
225
    effelsberg::edd::VDIF_Sender vdif_sender(source_ip, destination_ip, destination_port, max_rate, io_service);
Tobias Winchen's avatar
Tobias Winchen committed
226
227
228
229
230
231
232
233
234
235
236
237
238

    DadaInputStream<decltype(vdif_sender)> istream(input_key, log, vdif_sender);
    istream.start();


  } catch (std::exception &e) {
    std::cerr << "Unhandled Exception reached the top of main: " << e.what()
              << ", application will now exit" << std::endl;
    return ERROR_UNHANDLED_EXCEPTION;
  }
  return SUCCESS;
}