Skip to content
Snippets Groups Projects
Commit dfd48dff authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Added limitation for output rate to avoid bursts

parent d17ea1c6
No related branches found
No related tags found
No related merge requests found
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <chrono>
#include <thread>
using namespace psrdada_cpp; using namespace psrdada_cpp;
...@@ -30,6 +32,7 @@ class VDIF_Sender ...@@ -30,6 +32,7 @@ class VDIF_Sender
private: private:
std::string destination_ip; std::string destination_ip;
int port; int port;
double max_rate;
boost::asio::ip::udp::socket socket; boost::asio::ip::udp::socket socket;
boost::asio::ip::udp::endpoint remote_endpoint; boost::asio::ip::udp::endpoint remote_endpoint;
...@@ -40,11 +43,12 @@ class VDIF_Sender ...@@ -40,11 +43,12 @@ class VDIF_Sender
* *
* @param destination_ip Address to send the udo packages to. * @param destination_ip Address to send the udo packages to.
* @param port Port to use. * @param port Port to use.
* @param max_rate Output data rate - Usefull to avoid burst
* @param io_service boost::asio::io_Service instance to use for * @param io_service boost::asio::io_Service instance to use for
* communication. * communication.
*/ */
VDIF_Sender(std::string destination_ip, int port, boost::asio::io_service& VDIF_Sender(std::string destination_ip, int port, double max_rate, boost::asio::io_service&
io_service): socket(io_service), destination_ip(destination_ip), port(port) io_service): socket(io_service), destination_ip(destination_ip), port(port), max_rate(max_rate)
{ {
} }
...@@ -87,6 +91,8 @@ class VDIF_Sender ...@@ -87,6 +91,8 @@ class VDIF_Sender
BOOST_LOG_TRIVIAL(debug) << " Length of first frame: " << vdifHeader.getDataFrameLength() * 8 << " bytes"; BOOST_LOG_TRIVIAL(debug) << " Length of first frame: " << vdifHeader.getDataFrameLength() * 8 << " bytes";
size_t counter = 0; size_t counter = 0;
auto start = std::chrono::high_resolution_clock::now();
for(char* frame_start = block.ptr(); frame_start < block.ptr() + blockSize; frame_start += vdifHeader.getDataFrameLength() * 8) for(char* frame_start = block.ptr(); frame_start < block.ptr() + blockSize; frame_start += vdifHeader.getDataFrameLength() * 8)
{ {
vdifHeader.setDataLocation(reinterpret_cast<uint32_t*>(frame_start)); vdifHeader.setDataLocation(reinterpret_cast<uint32_t*>(frame_start));
...@@ -94,6 +100,21 @@ class VDIF_Sender ...@@ -94,6 +100,21 @@ class VDIF_Sender
socket.send_to(boost::asio::buffer(frame_start, frameLength), remote_endpoint, 0, err); socket.send_to(boost::asio::buffer(frame_start, frameLength), remote_endpoint, 0, err);
counter++; counter++;
size_t processed_bytes = (frame_start - block.ptr()) + frameLength;
auto elapsed_time = std::chrono::high_resolution_clock::now() - start;
double current_rate = processed_bytes / (std::chrono::duration_cast<std::chrono::nanoseconds>(elapsed_time).count() * 1E-9);
if (current_rate > max_rate)
{
std::chrono::duration<double, std::nano> expected_time(processed_bytes / max_rate * 1E9);
auto delay = expected_time - elapsed_time;
std::this_thread::sleep_for(delay);
//BOOST_LOG_TRIVIAL(debug) << counter << " Set delay to " << delay.count()<< " ns. Current rate " << current_rate << ", processed_bytes: " << processed_bytes;
}
} }
BOOST_LOG_TRIVIAL(info) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size."; BOOST_LOG_TRIVIAL(info) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size.";
return false; return false;
...@@ -114,7 +135,7 @@ int main(int argc, char **argv) { ...@@ -114,7 +135,7 @@ int main(int argc, char **argv) {
int destination_port; int destination_port;
std::string destination_ip; std::string destination_ip;
double rate; double max_rate;
/** Define and parse the program options /** Define and parse the program options
*/ */
...@@ -136,10 +157,10 @@ int main(int argc, char **argv) { ...@@ -136,10 +157,10 @@ int main(int argc, char **argv) {
[&destination_port](int in) { destination_port = in; }), [&destination_port](int in) { destination_port = in; }),
"Destination PORT"); "Destination PORT");
// desc.add_options()("rate", desc.add_options()("max_rate",
// po::value<double>()->default_value(1024*1024*5)->notifier( po::value<double>()->default_value(1024*1024*5)->notifier(
// [&rate](double in) { rate = in; }), [&max_rate](double in) { max_rate = in; }),
// "Limit the output rate to [rate] byte/s"); "Limit the output rate to [max_rate] byte/s");
desc.add_options()( desc.add_options()(
"log_level", po::value<std::string>()->default_value("info")->notifier( "log_level", po::value<std::string>()->default_value("info")->notifier(
...@@ -170,7 +191,7 @@ int main(int argc, char **argv) { ...@@ -170,7 +191,7 @@ int main(int argc, char **argv) {
boost::asio::io_service io_service; boost::asio::io_service io_service;
effelsberg::edd::VDIF_Sender vdif_sender(destination_ip, destination_port, io_service); effelsberg::edd::VDIF_Sender vdif_sender(destination_ip, destination_port, max_rate, io_service);
DadaInputStream<decltype(vdif_sender)> istream(input_key, log, vdif_sender); DadaInputStream<decltype(vdif_sender)> istream(input_key, log, vdif_sender);
istream.start(); istream.start();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment