Skip to content
Snippets Groups Projects
Commit 73e43faa authored by Niclas Esser's avatar Niclas Esser
Browse files

All tests passing for DadaClients, Allow AsyncDadaSink to return client to allow for stop_next()

parent 0ccafdfe
No related branches found
No related tags found
1 merge request!41Avoid acquiring dada buffers when the buffer is full (writer) or when it is...
Pipeline #231177 passed
......@@ -56,7 +56,7 @@ namespace psrdada_cpp {
/**
* @brief Check if a slot is open e.g. not released yet / marked as cleared
*/
bool is_open() const;
bool has_block() const;
/**
* @brief Realease all full header blocks in the buffer
......@@ -111,7 +111,7 @@ namespace psrdada_cpp {
/**
* @brief Check if a slot is open e.g. not released yet / marked as cleared
*/
bool is_open() const;
bool has_block() const;
/**
* @brief Return the index of the currently open block
......
......@@ -63,6 +63,11 @@ namespace psrdada_cpp {
* readable by reading client.
*/
void release();
/**
* Is there a currently active block e.g. acquired a block but not released
*/
bool has_block() const;
};
class DataStream
......@@ -106,7 +111,7 @@ namespace psrdada_cpp {
std::size_t block_idx() const;
/**
* Is there a currently active block
* Is there a currently active block e.g. acquired a block but not released
*/
bool has_block() const;
......
......@@ -20,6 +20,17 @@ namespace psrdada_cpp {
struct DefaultKeyFormatter {
/**
* Create a range from the first to the last vector item avoiding long string in header file.
* ToDo: The function is very stupid and is not analysing the range, it just takes the first and last element.
* Write some parsing here which parses the stepwidth and the actual first and last vector.
*/
static std::string range(std::vector<std::string> const& vec)
{
return vec.front() + ":" + vec.back();
}
static std::string format(std::string const& key) {
return key;
}
......@@ -223,12 +234,7 @@ struct DimensionParser<BaselineDimension, KeyFormatter>
static void to_header(BaselineDimension const& dim, DadaHeader& header) {
header.set(KeyFormatter::format("NBASELINES"), dim.nbaselines());
std::vector<std::string> baselines;
baselines.reserve(dim.nbaselines());
for (auto const& baseline: dim.baselines()) {
baselines.emplace_back(baseline);
}
header.set(KeyFormatter::format("BASELINES"), baselines);
header.set(KeyFormatter::format("BASELINES"), KeyFormatter::range(dim.baselines()));
}
};
......
......@@ -45,6 +45,9 @@ GenericHeaderFormatter formatter([](auto const& a, auto const& b) {
class GenericDataFormatter
{
public:
~GenericDataFormatter(){};
template <typename DataStream, typename DataType>
requires Iterable<DataType>
void operator()(DataStream& stream, DataType const& data)
......@@ -148,6 +151,11 @@ class AsyncDadaSink
*(std::forward<DataPtrs>(data_ptrs))...);
}
DadaWriteClient& client()
{
return *_client;
}
protected:
std::unique_ptr<DadaWriteClient> _client;
HeaderFormatter _header_formatter;
......
......@@ -69,11 +69,11 @@ namespace psrdada_cpp {
void DadaReadClient::disconnect()
{
if(_data_stream.is_open())
if(_data_stream.has_block())
{
_data_stream.release();
}
if(_header_stream.is_open())
if(_header_stream.has_block())
{
_header_stream.release();
}
......@@ -164,7 +164,7 @@ namespace psrdada_cpp {
return (bool) ipcbuf_eod(_parent._hdu->header_block);
}
bool DadaReadClient::HeaderStream::is_open() const
bool DadaReadClient::HeaderStream::has_block() const
{
return (_current_block != nullptr);
}
......@@ -259,7 +259,7 @@ namespace psrdada_cpp {
return (bool) ipcbuf_eod((ipcbuf_t *)(_parent._hdu->data_block));
}
bool DadaReadClient::DataStream::is_open() const
bool DadaReadClient::DataStream::has_block() const
{
return (_current_block != nullptr);
}
......
......@@ -42,7 +42,7 @@ namespace psrdada_cpp {
{
throw std::runtime_error("Release requested on unlocked HDU\n");
}
BOOST_LOG_TRIVIAL(debug) << this->id() << "Releasing writing lock on dada buffer";
BOOST_LOG_TRIVIAL(info) << this->id() << "Releasing writing lock on dada buffer";
if (dada_hdu_unlock_write (_hdu) < 0)
{
_log.write(LOG_ERR, "open_hdu: could not release write\n");
......@@ -60,14 +60,16 @@ namespace psrdada_cpp {
void DadaWriteClient::disconnect()
{
// if(_data_stream.is_open())
// {
// _data_stream.release();
// }
// if(_header_stream.is_open())
// {
// _header_stream.release();
// }
if(_data_stream.has_block())
{
BOOST_LOG_TRIVIAL(warning) << "Disconnecting while data block is acquired but not released, realsing it now";
_data_stream.release();
}
if(_header_stream.has_block())
{
BOOST_LOG_TRIVIAL(warning) << "Disconnecting while header block is acquired but not released, realsing it now";
_header_stream.release();
}
if(_locked)
{
release();
......@@ -141,6 +143,12 @@ namespace psrdada_cpp {
_current_block.reset();
}
bool DadaWriteClient::HeaderStream::has_block() const
{
return bool(_current_block);
}
DadaWriteClient::DataStream::DataStream(DadaWriteClient& parent)
: _parent(parent)
, _current_block(nullptr)
......
......@@ -36,7 +36,6 @@ public:
void test_buffer_is_empty();
void test_buffer_nfree();
void test_buffer_nfull();
......@@ -45,9 +44,7 @@ public:
void test_buffer_count();
void test_reader_is_open();
void test_reader_has_block();
void test_reader_connect();
......@@ -55,17 +52,15 @@ public:
void test_reader_stop_next();
void test_writer_connect();
void test_disconnect();
void test_reader_stop_next_from_thread();
void test_reconnect();
void test_writer_connect();
void test_stop_next();
void test_writer_disconnect_while_acquired();
void test_cuda_register_memory();
void test_writer_stop_next();
void test_id();
void test_writer_stop_next_from_thread();
private:
void write_header_slot();
......
......@@ -134,7 +134,7 @@ void DadaClientTester::test_buffer_count()
ASSERT_EQ(config.nbufs, test_reader->data_buffer_count());
}
void DadaClientTester::test_reader_is_open()
void DadaClientTester::test_reader_has_block()
{
auto& hstream = test_reader->header_stream();
auto& dstream = test_reader->data_stream();
......@@ -142,12 +142,12 @@ void DadaClientTester::test_reader_is_open()
this->write_header_slot(); // Write one slot, so it can be read
hstream.next(); // Acquire the header slot
dstream.next(); // Acquire the data slot
ASSERT_TRUE(dstream.is_open());
ASSERT_TRUE(hstream.is_open());
ASSERT_TRUE(dstream.has_block());
ASSERT_TRUE(hstream.has_block());
dstream.release(); // Release the read header slot
hstream.release(); // Release the read data slot
ASSERT_FALSE(dstream.is_open());
ASSERT_FALSE(hstream.is_open());
ASSERT_FALSE(dstream.has_block());
ASSERT_FALSE(hstream.has_block());
}
void DadaClientTester::test_reader_stop_next()
......@@ -159,6 +159,22 @@ void DadaClientTester::test_reader_stop_next()
EXPECT_THROW(dstream.next(), DadaInterruptException);
}
void DadaClientTester::test_reader_stop_next_from_thread()
{
auto& hstream = test_reader->header_stream();
auto& dstream = test_reader->data_stream();
std::thread hthread([&hstream]() {
EXPECT_THROW(hstream.next(), DadaInterruptException);
});
test_reader->stop_next();
hthread.join();
std::thread dthread([&dstream]() {
EXPECT_THROW(dstream.next(), DadaInterruptException);
});
test_reader->stop_next();
dthread.join();
}
void DadaClientTester::test_reader_connect()
{
ASSERT_TRUE(test_reader->is_connected());
......@@ -172,13 +188,13 @@ void DadaClientTester::test_reader_disconnect_while_acquired()
{
this->write_data_slot(); // Write one slot, so it can be read
test_reader->data_stream().next(); // Acquire the slot
ASSERT_TRUE(test_reader->data_stream().is_open());
ASSERT_TRUE(test_reader->data_stream().has_block());
test_reader->disconnect(); // Disconnect while acquired
ASSERT_FALSE(test_reader->is_connected());
this->write_data_slot();
test_reader->connect();
test_reader->data_stream().next();
ASSERT_TRUE(test_reader->data_stream().is_open());
ASSERT_TRUE(test_reader->data_stream().has_block());
}
void DadaClientTester::test_writer_connect()
......@@ -190,23 +206,48 @@ void DadaClientTester::test_writer_connect()
ASSERT_TRUE(test_writer->is_connected());
}
// void DadaClientTester::test_writer_disconnect()
// {
// }
// void DadaClientTester::test_reconnect()
// {
void DadaClientTester::test_writer_disconnect_while_acquired()
{
test_writer->data_stream().next(); // Acquire the slot
test_writer->header_stream().next(); // Acquire the slot
ASSERT_TRUE(test_writer->data_stream().has_block());
ASSERT_TRUE(test_writer->header_stream().has_block());
test_writer->disconnect(); // Disconnect while acquired
ASSERT_EQ(test_reader->data_buffer_full_slots(), 1); // Use reader to get full slots as writer is disconnected
ASSERT_EQ(test_reader->header_buffer_full_slots(), 1); // Use reader to get full slots as writer is disconnected
ASSERT_FALSE(test_writer->is_connected());
test_writer->connect();
}
// }
// void DadaClientTester::test_cuda_register_memory()
// {
void DadaClientTester::test_writer_stop_next()
{
auto& hstream = test_writer->header_stream();
auto& dstream = test_writer->data_stream();
test_writer->stop_next(); // Set before next call
EXPECT_THROW(hstream.next(), DadaInterruptException);
EXPECT_THROW(dstream.next(), DadaInterruptException);
}
// }
// void DadaClientTester::test_id()
// {
void DadaClientTester::test_writer_stop_next_from_thread()
{
auto& hstream = test_writer->header_stream();
auto& dstream = test_writer->data_stream();
std::thread hthread([&hstream]() {
EXPECT_THROW(hstream.next(), DadaInterruptException);
});
test_writer->stop_next();
hthread.join();
std::thread dthread([&dstream]() {
EXPECT_THROW(dstream.next(), DadaInterruptException);
});
test_writer->stop_next();
dthread.join();
}
// }
// ----------------
// HELPER FUNCTIONS
// ----------------
void DadaClientTester::write_header_slot()
{
auto& hstream = test_writer->header_stream();
......@@ -278,15 +319,16 @@ TEST_P(DadaClientTester, test_buffer_nfull){ test_buffer_nfull(); }
TEST_P(DadaClientTester, test_buffer_size){ test_buffer_size(); }
TEST_P(DadaClientTester, test_buffer_count){ test_buffer_count(); }
TEST_P(DadaClientTester, test_reader_is_open){ test_reader_is_open(); }
TEST_P(DadaClientTester, test_reader_has_block){ test_reader_has_block(); }
TEST_P(DadaClientTester, test_reader_connect){ test_reader_connect(); }
TEST_P(DadaClientTester, test_reader_disconnect_while_acquired){ test_reader_disconnect_while_acquired(); }
TEST_P(DadaClientTester, test_reader_interrupt_when_acquire){ test_reader_stop_next(); }
TEST_P(DadaClientTester, test_reader_stop_next_from_thread){ test_reader_stop_next_from_thread(); }
TEST_P(DadaClientTester, test_writer_connect){ test_writer_connect(); }
// TEST_P(DadaClientTester, test_disconnect){ test_writer_disconnect(); }
// TEST_P(DadaClientTester, test_reconnect){ test_reconnect(); }
// TEST_P(DadaClientTester, test_stop_next){ test_stop_next(); }
TEST_P(DadaClientTester, test_writer_disconnect_while_acquired){ test_writer_disconnect_while_acquired(); }
TEST_P(DadaClientTester, test_writer_stop_next){ test_writer_stop_next(); }
TEST_P(DadaClientTester, test_writer_stop_next_from_thread){ test_writer_stop_next_from_thread(); }
INSTANTIATE_TEST_SUITE_P(DadaClientBaseTesterInstantiation, DadaClientTester, ::testing::Values(
// in_key | out_key | device_id | heap_size | n_sample | n_channel | n_element | n_pol | n_acc | n_bit
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment