Skip to content
Snippets Groups Projects
Commit 73dc0fbf authored by Niclas Esser's avatar Niclas Esser
Browse files

Interruptable AsyncDadaReadClient

parent 90bc6d61
Branches
Tags
1 merge request!41Avoid acquiring dada buffers when the buffer is full (writer) or when it is...
Pipeline #241198 passed
...@@ -41,7 +41,7 @@ namespace psrdada_cpp { ...@@ -41,7 +41,7 @@ namespace psrdada_cpp {
*/ */
DadaClientBase(key_t key, MultiLog& log); DadaClientBase(key_t key, MultiLog& log);
DadaClientBase(DadaClientBase const&) = delete; DadaClientBase(DadaClientBase const&) = delete;
~DadaClientBase(); virtual ~DadaClientBase();
/** /**
* @brief Get the sizes of each data block in the ring buffer * @brief Get the sizes of each data block in the ring buffer
......
...@@ -107,6 +107,7 @@ class HeaderStream: public AsyncStream<HeaderStream> ...@@ -107,6 +107,7 @@ class HeaderStream: public AsyncStream<HeaderStream>
} }
protected: protected:
bool is_stream_empty() const;
char* next_impl(uint64_t& total_bytes, uint64_t& used_bytes); char* next_impl(uint64_t& total_bytes, uint64_t& used_bytes);
void release_impl(); void release_impl();
bool at_end_impl() const; bool at_end_impl() const;
...@@ -128,6 +129,7 @@ class DataStream: public AsyncStream<DataStream> ...@@ -128,6 +129,7 @@ class DataStream: public AsyncStream<DataStream>
} }
protected: protected:
bool is_stream_empty() const;
char* next_impl(uint64_t& total_bytes, uint64_t& used_bytes); char* next_impl(uint64_t& total_bytes, uint64_t& used_bytes);
void release_impl(); void release_impl();
bool at_end_impl() const; bool at_end_impl() const;
...@@ -141,6 +143,8 @@ class AsyncDadaReadClient: public DadaClientBase ...@@ -141,6 +143,8 @@ class AsyncDadaReadClient: public DadaClientBase
{ {
friend DataStream; friend DataStream;
friend HeaderStream; friend HeaderStream;
friend AsyncStream<DataStream>;
friend AsyncStream<HeaderStream>;
private: private:
bool _locked; bool _locked;
......
...@@ -23,12 +23,29 @@ std::pair<RawBytes, std::function<void()>> ...@@ -23,12 +23,29 @@ std::pair<RawBytes, std::function<void()>>
AsyncStream<DerivedAsyncStream>::next() AsyncStream<DerivedAsyncStream>::next()
{ {
std::unique_lock<std::mutex> lock(mutex); std::unique_lock<std::mutex> lock(mutex);
_cv.wait(lock, [this]() { return !_has_block; }); _cv.wait(lock, [this]() {
return !_has_block; });
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< _parent.id() << "Acquiring block " << _block_counter; << _parent.id() << "Acquiring block " << _block_counter;
uint64_t total_bytes{}, used_bytes{}; uint64_t total_bytes{}, used_bytes{};
char* tmp = static_cast<DerivedAsyncStream*>(this)->next_impl(total_bytes, char * tmp;
while(true)
{
if(_parent._stop.load())
{
throw DadaInterruptException("Interrupt called in AsyncDadaReadClient::AsyncStream::next()");
}
if(static_cast<DerivedAsyncStream*>(this)->is_stream_empty())
{
usleep(100); // With very small buffers and high rates this might not be suffcient
}
else
{
tmp = static_cast<DerivedAsyncStream*>(this)->next_impl(total_bytes,
used_bytes); used_bytes);
break;
}
}
_has_block = true; _has_block = true;
RawBytes block(tmp, total_bytes, used_bytes); RawBytes block(tmp, total_bytes, used_bytes);
auto release_callback = [this]() { this->block_release_callback(); }; auto release_callback = [this]() { this->block_release_callback(); };
...@@ -73,6 +90,11 @@ char* HeaderStream::next_impl(uint64_t& total_bytes, uint64_t& used_bytes) ...@@ -73,6 +90,11 @@ char* HeaderStream::next_impl(uint64_t& total_bytes, uint64_t& used_bytes)
return tmp; return tmp;
} }
bool HeaderStream::is_stream_empty() const
{
return _parent.header_buffer_is_empty();
}
void HeaderStream::release_impl() void HeaderStream::release_impl()
{ {
BOOST_LOG_TRIVIAL(debug) << _parent.id() << "Releasing header block"; BOOST_LOG_TRIVIAL(debug) << _parent.id() << "Releasing header block";
...@@ -105,6 +127,12 @@ char* DataStream::next_impl(uint64_t& total_bytes, uint64_t& used_bytes) ...@@ -105,6 +127,12 @@ char* DataStream::next_impl(uint64_t& total_bytes, uint64_t& used_bytes)
return tmp; return tmp;
} }
bool DataStream::is_stream_empty() const
{
return _parent.data_buffer_is_empty();
}
void DataStream::release_impl() void DataStream::release_impl()
{ {
BOOST_LOG_TRIVIAL(debug) << _parent.id() << "Releasing data block"; BOOST_LOG_TRIVIAL(debug) << _parent.id() << "Releasing data block";
......
#pragma once
#include <memory>
#include <thread>
#include <gtest/gtest.h>
#include "psrdada_cpp/dada_db.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/dada_client_base.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/dadaflow/AsyncDadaReadClient.hpp"
namespace psrdada_cpp {
namespace dadaflow {
namespace test {
struct dada_buffer_config
{
uint64_t nbufs;
uint64_t bufsz;
uint64_t nhdrs;
uint64_t hdrsz;
};
class DadaClientTester : public ::testing::TestWithParam<dada_buffer_config>
{
public:
DadaClientTester();
~DadaClientTester(){}
void SetUp() override;
void TearDown() override;
void test_buffer_is_full();
// void test_buffer_is_empty();
// void test_buffer_nfree();
// void test_buffer_nfull();
// void test_buffer_size();
// void test_buffer_count();
// void test_cuda_register_unregister_memory();
// void test_reader_has_block();
// void test_reader_connect();
// void test_reader_disconnect_while_acquired();
// void test_reader_stop_next();
// void test_reader_stop_next_from_thread();
// void test_writer_connect();
// void test_writer_disconnect_while_acquired();
// void test_writer_stop_next();
// void test_writer_stop_next_from_thread();
private:
void write_header_slot();
void write_data_slot();
// void read_header_slot();
// void read_data_slot();
void write_header_until_full();
void write_data_until_full();
// void read_header_until_empty();
// void read_data_until_empty();
private:
MultiLog wlog;
MultiLog rlog;
dada_buffer_config config;
std::unique_ptr<DadaWriteClient> test_writer;
std::unique_ptr<AsyncDadaReadClient> test_reader;
std::unique_ptr<DadaDB> test_buffer;
};
}
}
}
\ No newline at end of file
...@@ -4,6 +4,7 @@ link_directories(${GTEST_LIBRARY_DIR}) ...@@ -4,6 +4,7 @@ link_directories(${GTEST_LIBRARY_DIR})
set(gtest_dadaflow set(gtest_dadaflow
src/gtest_dadaflow.cu src/gtest_dadaflow.cu
src/DadaAutoGenTester.cpp src/DadaAutoGenTester.cpp
src/AsyncDadaClientTester.cpp
src/DadaHeaderTester.cpp src/DadaHeaderTester.cpp
src/DadaNodeTester.cpp src/DadaNodeTester.cpp
src/DescribedDataTester.cu src/DescribedDataTester.cu
......
#include "psrdada_cpp/dadaflow/test/AsyncDadaClientTester.hpp"
namespace psrdada_cpp {
namespace dadaflow {
namespace test {
DadaClientTester::DadaClientTester()
: ::testing::TestWithParam<dada_buffer_config>(),
wlog("DadaWriteClient"),
rlog("AsyncDadaReadClient"),
config(GetParam()){}
void DadaClientTester::SetUp()
{
test_buffer = std::make_unique<DadaDB>(config.nbufs, config.bufsz, config.nhdrs, config.hdrsz);
test_buffer->create();
test_writer = std::make_unique<DadaWriteClient>(test_buffer->key(), wlog);
test_reader = std::make_unique<AsyncDadaReadClient>(test_buffer->key(), rlog);
}
void DadaClientTester::TearDown()
{
test_writer.reset();
test_reader.reset();
test_buffer->destroy();
}
void DadaClientTester::test_buffer_is_full()
{
ASSERT_FALSE(test_writer->header_buffer_is_full());
ASSERT_FALSE(test_reader->header_buffer_is_full());
ASSERT_FALSE(test_reader->data_buffer_is_full());
ASSERT_FALSE(test_writer->data_buffer_is_full());
this->write_header_until_full();
this->write_data_until_full();
ASSERT_TRUE(test_writer->header_buffer_is_full());
ASSERT_TRUE(test_reader->header_buffer_is_full());
ASSERT_TRUE(test_writer->data_buffer_is_full());
ASSERT_TRUE(test_reader->data_buffer_is_full());
auto [hblock, hreleaser] = test_reader->header_stream().next();
hreleaser();
ASSERT_FALSE(test_writer->header_buffer_is_full());
ASSERT_FALSE(test_reader->header_buffer_is_full());
auto [dblock, dreleaser] = test_reader->data_stream().next();
dreleaser();
ASSERT_FALSE(test_writer->data_buffer_is_full());
ASSERT_FALSE(test_reader->data_buffer_is_full());
}
// void DadaClientTester::test_buffer_is_empty()
// {
// ASSERT_TRUE(test_writer->header_buffer_is_empty());
// ASSERT_TRUE(test_reader->header_buffer_is_empty());
// ASSERT_TRUE(test_reader->data_buffer_is_empty());
// ASSERT_TRUE(test_writer->data_buffer_is_empty());
// this->write_header_until_full();
// this->write_data_until_full();
// ASSERT_FALSE(test_writer->header_buffer_is_empty());
// ASSERT_FALSE(test_reader->header_buffer_is_empty());
// ASSERT_FALSE(test_writer->data_buffer_is_empty());
// ASSERT_FALSE(test_reader->data_buffer_is_empty());
// // Read a single header and data slot
// read_data_slot();
// read_header_slot();
// ASSERT_FALSE(test_writer->header_buffer_is_empty());
// ASSERT_FALSE(test_reader->header_buffer_is_empty());
// ASSERT_FALSE(test_writer->data_buffer_is_empty());
// ASSERT_FALSE(test_reader->data_buffer_is_empty());
// // read until header and data buffer are empty
// this->read_header_until_empty();
// this->read_data_until_empty();
// ASSERT_TRUE(test_writer->header_buffer_is_empty());
// ASSERT_TRUE(test_reader->header_buffer_is_empty());
// ASSERT_TRUE(test_reader->data_buffer_is_empty());
// ASSERT_TRUE(test_writer->data_buffer_is_empty());
// }
// void DadaClientTester::test_buffer_nfree()
// {
// for(std::size_t i = config.nhdrs; i > 1; i--)
// {
// ASSERT_EQ(i, test_writer->header_buffer_free_slots());
// ASSERT_EQ(i, test_reader->header_buffer_free_slots());
// write_header_slot();
// }
// for(std::size_t i = config.nbufs; i > 1; i--)
// {
// ASSERT_EQ(i, test_writer->data_buffer_free_slots());
// ASSERT_EQ(i, test_reader->data_buffer_free_slots());
// write_data_slot();
// }
// }
// void DadaClientTester::test_buffer_nfull()
// {
// auto& hstream = test_writer->header_stream();
// for(std::size_t i = 0; i > config.nhdrs; i++)
// {
// ASSERT_EQ(i, test_writer->header_buffer_full_slots());
// ASSERT_EQ(i, test_reader->header_buffer_full_slots());
// auto& hbuffer = hstream.next();
// hbuffer.used_bytes(config.hdrsz);
// hstream.release();
// }
// auto& dstream = test_writer->data_stream();
// for(std::size_t i = 0; i > config.nbufs; i++)
// {
// ASSERT_EQ(i, test_writer->data_buffer_full_slots());
// ASSERT_EQ(i, test_reader->data_buffer_full_slots());
// auto& dbuffer = dstream.next();
// dbuffer.used_bytes(config.bufsz);
// dstream.release();
// }
// }
// void DadaClientTester::test_buffer_size()
// {
// ASSERT_EQ(config.hdrsz, test_writer->header_buffer_size());
// ASSERT_EQ(config.hdrsz, test_reader->header_buffer_size());
// ASSERT_EQ(config.bufsz, test_writer->data_buffer_size());
// ASSERT_EQ(config.bufsz, test_reader->data_buffer_size());
// }
// void DadaClientTester::test_buffer_count()
// {
// ASSERT_EQ(config.nhdrs, test_writer->header_buffer_count());
// ASSERT_EQ(config.nhdrs, test_reader->header_buffer_count());
// ASSERT_EQ(config.nbufs, test_writer->data_buffer_count());
// ASSERT_EQ(config.nbufs, test_reader->data_buffer_count());
// }
// void DadaClientTester::test_reader_has_block()
// {
// auto& hstream = test_reader->header_stream();
// auto& dstream = test_reader->data_stream();
// this->write_data_slot(); // Write one slot, so it can be read
// this->write_header_slot(); // Write one slot, so it can be read
// hstream.next(); // Acquire the header slot
// dstream.next(); // Acquire the data slot
// ASSERT_TRUE(dstream.has_block());
// ASSERT_TRUE(hstream.has_block());
// dstream.release(); // Release the read header slot
// hstream.release(); // Release the read data slot
// ASSERT_FALSE(dstream.has_block());
// ASSERT_FALSE(hstream.has_block());
// }
// void DadaClientTester::test_reader_stop_next()
// {
// auto& hstream = test_reader->header_stream();
// auto& dstream = test_reader->data_stream();
// test_reader->stop_next();
// EXPECT_THROW(hstream.next(), DadaInterruptException);
// EXPECT_THROW(dstream.next(), DadaInterruptException);
// }
// void DadaClientTester::test_reader_stop_next_from_thread()
// {
// auto& hstream = test_reader->header_stream();
// auto& dstream = test_reader->data_stream();
// std::thread hthread([&hstream]() {
// EXPECT_THROW(hstream.next(), DadaInterruptException);
// });
// test_reader->stop_next();
// hthread.join();
// std::thread dthread([&dstream]() {
// EXPECT_THROW(dstream.next(), DadaInterruptException);
// });
// test_reader->stop_next();
// dthread.join();
// }
// void DadaClientTester::test_reader_connect()
// {
// ASSERT_TRUE(test_reader->is_connected());
// test_reader->disconnect();
// ASSERT_FALSE(test_reader->is_connected());
// test_reader->connect();
// ASSERT_TRUE(test_reader->is_connected());
// }
// void DadaClientTester::test_reader_disconnect_while_acquired()
// {
// this->write_data_slot(); // Write one slot, so it can be read
// test_reader->data_stream().next(); // Acquire the slot
// ASSERT_TRUE(test_reader->data_stream().has_block());
// test_reader->disconnect(); // Disconnect while acquired
// ASSERT_FALSE(test_reader->is_connected());
// this->write_data_slot();
// test_reader->connect();
// test_reader->data_stream().next();
// ASSERT_TRUE(test_reader->data_stream().has_block());
// }
// void DadaClientTester::test_writer_connect()
// {
// ASSERT_TRUE(test_writer->is_connected());
// test_writer->disconnect();
// ASSERT_FALSE(test_writer->is_connected());
// test_writer->connect();
// ASSERT_TRUE(test_writer->is_connected());
// }
// void DadaClientTester::test_writer_disconnect_while_acquired()
// {
// test_writer->data_stream().next(); // Acquire the slot
// test_writer->header_stream().next(); // Acquire the slot
// ASSERT_TRUE(test_writer->data_stream().has_block());
// ASSERT_TRUE(test_writer->header_stream().has_block());
// test_writer->disconnect(); // Disconnect while acquired
// ASSERT_EQ(test_reader->data_buffer_full_slots(), 1); // Use reader to get full slots as writer is disconnected
// ASSERT_EQ(test_reader->header_buffer_full_slots(), 1); // Use reader to get full slots as writer is disconnected
// ASSERT_FALSE(test_writer->is_connected());
// test_writer->connect();
// }
// void DadaClientTester::test_writer_stop_next()
// {
// auto& hstream = test_writer->header_stream();
// auto& dstream = test_writer->data_stream();
// test_writer->stop_next(); // Set before next call
// EXPECT_THROW(hstream.next(), DadaInterruptException);
// EXPECT_THROW(dstream.next(), DadaInterruptException);
// }
// void DadaClientTester::test_writer_stop_next_from_thread()
// {
// auto& hstream = test_writer->header_stream();
// auto& dstream = test_writer->data_stream();
// std::thread hthread([&hstream]() {
// EXPECT_THROW(hstream.next(), DadaInterruptException);
// });
// test_writer->stop_next();
// hthread.join();
// std::thread dthread([&dstream]() {
// EXPECT_THROW(dstream.next(), DadaInterruptException);
// });
// test_writer->stop_next();
// dthread.join();
// }
// ----------------
// HELPER FUNCTIONS
// ----------------
void DadaClientTester::write_header_slot()
{
auto& hstream = test_writer->header_stream();
auto& hbuffer = hstream.next();
hbuffer.used_bytes(config.hdrsz);
hstream.release();
}
void DadaClientTester::write_data_slot()
{
auto& dstream = test_writer->data_stream();
auto& dbuffer = dstream.next();
dbuffer.used_bytes(config.bufsz);
dstream.release();
}
// void DadaClientTester::read_header_slot()
// {
// auto& hstream = test_reader->header_stream();
// hstream.next();
// hstream.release();
// }
// void DadaClientTester::read_data_slot()
// {
// auto& dstream = test_reader->data_stream();
// dstream.next();
// dstream.release();
// }
void DadaClientTester::write_header_until_full()
{
while(test_writer->header_buffer_free_slots())
{
write_header_slot();
}
}
void DadaClientTester::write_data_until_full()
{
while(test_writer->data_buffer_free_slots())
{
write_data_slot();
}
}
// void DadaClientTester::read_header_until_empty()
// {
// while(test_reader->header_buffer_full_slots())
// {
// read_header_slot();
// }
// }
// void DadaClientTester::read_data_until_empty()
// {
// while(test_reader->data_buffer_full_slots())
// {
// read_data_slot();
// }
// }
TEST_P(DadaClientTester, test_buffer_is_full){ test_buffer_is_full(); }
// TEST_P(DadaClientTester, test_buffer_is_empty){ test_buffer_is_empty(); }
// TEST_P(DadaClientTester, test_buffer_nfree){ test_buffer_nfree(); }
// TEST_P(DadaClientTester, test_buffer_nfull){ test_buffer_nfull(); }
// TEST_P(DadaClientTester, test_buffer_size){ test_buffer_size(); }
// TEST_P(DadaClientTester, test_buffer_count){ test_buffer_count(); }
// TEST_P(DadaClientTester, test_cuda_register_unregister_memory){ test_cuda_register_unregister_memory(); }
// TEST_P(DadaClientTester, test_reader_has_block){ test_reader_has_block(); }
// TEST_P(DadaClientTester, test_reader_connect){ test_reader_connect(); }
// TEST_P(DadaClientTester, test_reader_disconnect_while_acquired){ test_reader_disconnect_while_acquired(); }
// TEST_P(DadaClientTester, test_reader_interrupt_when_acquire){ test_reader_stop_next(); }
// TEST_P(DadaClientTester, test_reader_stop_next_from_thread){ test_reader_stop_next_from_thread(); }
// TEST_P(DadaClientTester, test_writer_connect){ test_writer_connect(); }
// TEST_P(DadaClientTester, test_writer_disconnect_while_acquired){ test_writer_disconnect_while_acquired(); }
// TEST_P(DadaClientTester, test_writer_stop_next){ test_writer_stop_next(); }
// TEST_P(DadaClientTester, test_writer_stop_next_from_thread){ test_writer_stop_next_from_thread(); }
INSTANTIATE_TEST_SUITE_P(DadaClientBaseTesterInstantiation, DadaClientTester, ::testing::Values(
// in_key | out_key | device_id | heap_size | n_sample | n_channel | n_element | n_pol | n_acc | n_bit
dada_buffer_config{16, 2048, 8, 4096},
dada_buffer_config{11, 2345, 12, 1111},
dada_buffer_config{123, 24, 2, 1337},
dada_buffer_config{9, 111222, 19, 32}
));
}
}
}
\ No newline at end of file
...@@ -119,7 +119,7 @@ namespace psrdada_cpp { ...@@ -119,7 +119,7 @@ namespace psrdada_cpp {
} }
if(_parent.header_buffer_is_empty()) if(_parent.header_buffer_is_empty())
{ {
usleep(1000); usleep(100); // With very small buffers and high rates this might not be suffcient
} }
else else
{ {
......
...@@ -177,7 +177,7 @@ namespace psrdada_cpp { ...@@ -177,7 +177,7 @@ namespace psrdada_cpp {
} }
if(_parent.data_buffer_is_full()) if(_parent.data_buffer_is_full())
{ {
usleep(1000); usleep(100); // With very small buffers and high rates this might not be suffcient
} }
else else
{ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment