diff --git a/psrdada_cpp/dada_read_client.hpp b/psrdada_cpp/dada_read_client.hpp index 22e8cf28ed7f51734dc7c373a8bf4deb7d902786..4bb5335ea57fa172f27d3cd96ca19bbffbdd07d7 100644 --- a/psrdada_cpp/dada_read_client.hpp +++ b/psrdada_cpp/dada_read_client.hpp @@ -56,7 +56,7 @@ namespace psrdada_cpp { /** * @brief Check if a slot is open e.g. not released yet / marked as cleared */ - bool is_open() const; + bool has_block() const; /** * @brief Realease all full header blocks in the buffer @@ -111,7 +111,7 @@ namespace psrdada_cpp { /** * @brief Check if a slot is open e.g. not released yet / marked as cleared */ - bool is_open() const; + bool has_block() const; /** * @brief Return the index of the currently open block diff --git a/psrdada_cpp/dada_write_client.hpp b/psrdada_cpp/dada_write_client.hpp index 511af0d04d5418775c8af74064fa6327212dff09..d3fef37937f78c1d370540836c84899f43a2a1ee 100644 --- a/psrdada_cpp/dada_write_client.hpp +++ b/psrdada_cpp/dada_write_client.hpp @@ -63,6 +63,11 @@ namespace psrdada_cpp { * readable by reading client. */ void release(); + + /** + * Is there a currently active block e.g. acquired a block but not released + */ + bool has_block() const; }; class DataStream @@ -106,7 +111,7 @@ namespace psrdada_cpp { std::size_t block_idx() const; /** - * Is there a currently active block + * Is there a currently active block e.g. acquired a block but not released */ bool has_block() const; diff --git a/psrdada_cpp/dadaflow/DadaAutoGen.hpp b/psrdada_cpp/dadaflow/DadaAutoGen.hpp index a9e55d4a90064fd9a0de4b8a3ba0e31971feb984..05cdd5c85d4a56475010451ff492669502362f5e 100644 --- a/psrdada_cpp/dadaflow/DadaAutoGen.hpp +++ b/psrdada_cpp/dadaflow/DadaAutoGen.hpp @@ -20,6 +20,17 @@ namespace psrdada_cpp { struct DefaultKeyFormatter { + + /** + * Create a range from the first to the last vector item avoiding long string in header file. + * ToDo: The function is very stupid and is not analysing the range, it just takes the first and last element. + * Write some parsing here which parses the stepwidth and the actual first and last vector. + */ + static std::string range(std::vector<std::string> const& vec) + { + return vec.front() + ":" + vec.back(); + } + static std::string format(std::string const& key) { return key; } @@ -223,12 +234,7 @@ struct DimensionParser<BaselineDimension, KeyFormatter> static void to_header(BaselineDimension const& dim, DadaHeader& header) { header.set(KeyFormatter::format("NBASELINES"), dim.nbaselines()); - std::vector<std::string> baselines; - baselines.reserve(dim.nbaselines()); - for (auto const& baseline: dim.baselines()) { - baselines.emplace_back(baseline); - } - header.set(KeyFormatter::format("BASELINES"), baselines); + header.set(KeyFormatter::format("BASELINES"), KeyFormatter::range(dim.baselines())); } }; diff --git a/psrdada_cpp/dadaflow/processors/AsyncDadaSink.hpp b/psrdada_cpp/dadaflow/processors/AsyncDadaSink.hpp index afcd209a44e6b50df11aa81f8da365a948a0e5a5..4e87214e88cf8494bf80e920a3f6f5c6461b2908 100644 --- a/psrdada_cpp/dadaflow/processors/AsyncDadaSink.hpp +++ b/psrdada_cpp/dadaflow/processors/AsyncDadaSink.hpp @@ -45,6 +45,9 @@ GenericHeaderFormatter formatter([](auto const& a, auto const& b) { class GenericDataFormatter { public: + + ~GenericDataFormatter(){}; + template <typename DataStream, typename DataType> requires Iterable<DataType> void operator()(DataStream& stream, DataType const& data) @@ -148,6 +151,11 @@ class AsyncDadaSink *(std::forward<DataPtrs>(data_ptrs))...); } + DadaWriteClient& client() + { + return *_client; + } + protected: std::unique_ptr<DadaWriteClient> _client; HeaderFormatter _header_formatter; diff --git a/psrdada_cpp/src/dada_read_client.cpp b/psrdada_cpp/src/dada_read_client.cpp index fad224d6461a29a5587775ef2674b65fd1fd1b48..7b91b12278249c5d0b3d9b999b6e6e6d14a4727b 100644 --- a/psrdada_cpp/src/dada_read_client.cpp +++ b/psrdada_cpp/src/dada_read_client.cpp @@ -69,11 +69,11 @@ namespace psrdada_cpp { void DadaReadClient::disconnect() { - if(_data_stream.is_open()) + if(_data_stream.has_block()) { _data_stream.release(); } - if(_header_stream.is_open()) + if(_header_stream.has_block()) { _header_stream.release(); } @@ -164,7 +164,7 @@ namespace psrdada_cpp { return (bool) ipcbuf_eod(_parent._hdu->header_block); } - bool DadaReadClient::HeaderStream::is_open() const + bool DadaReadClient::HeaderStream::has_block() const { return (_current_block != nullptr); } @@ -259,7 +259,7 @@ namespace psrdada_cpp { return (bool) ipcbuf_eod((ipcbuf_t *)(_parent._hdu->data_block)); } - bool DadaReadClient::DataStream::is_open() const + bool DadaReadClient::DataStream::has_block() const { return (_current_block != nullptr); } diff --git a/psrdada_cpp/src/dada_write_client.cpp b/psrdada_cpp/src/dada_write_client.cpp index 9c7a44f809e4025c43ce823b63e370b6e9b04f66..00557981b5dfd15fc616d5bafc49438b08ed839c 100644 --- a/psrdada_cpp/src/dada_write_client.cpp +++ b/psrdada_cpp/src/dada_write_client.cpp @@ -42,7 +42,7 @@ namespace psrdada_cpp { { throw std::runtime_error("Release requested on unlocked HDU\n"); } - BOOST_LOG_TRIVIAL(debug) << this->id() << "Releasing writing lock on dada buffer"; + BOOST_LOG_TRIVIAL(info) << this->id() << "Releasing writing lock on dada buffer"; if (dada_hdu_unlock_write (_hdu) < 0) { _log.write(LOG_ERR, "open_hdu: could not release write\n"); @@ -60,14 +60,16 @@ namespace psrdada_cpp { void DadaWriteClient::disconnect() { - // if(_data_stream.is_open()) - // { - // _data_stream.release(); - // } - // if(_header_stream.is_open()) - // { - // _header_stream.release(); - // } + if(_data_stream.has_block()) + { + BOOST_LOG_TRIVIAL(warning) << "Disconnecting while data block is acquired but not released, realsing it now"; + _data_stream.release(); + } + if(_header_stream.has_block()) + { + BOOST_LOG_TRIVIAL(warning) << "Disconnecting while header block is acquired but not released, realsing it now"; + _header_stream.release(); + } if(_locked) { release(); @@ -141,6 +143,12 @@ namespace psrdada_cpp { _current_block.reset(); } + bool DadaWriteClient::HeaderStream::has_block() const + { + return bool(_current_block); + } + + DadaWriteClient::DataStream::DataStream(DadaWriteClient& parent) : _parent(parent) , _current_block(nullptr) diff --git a/psrdada_cpp/test/dada_client_tester.hpp b/psrdada_cpp/test/dada_client_tester.hpp index 6898ab38bf3c21b0ea6b9224b148a9c3e7631355..be8a5adaf8ff4c8174f1654dabc366de20e65d7e 100644 --- a/psrdada_cpp/test/dada_client_tester.hpp +++ b/psrdada_cpp/test/dada_client_tester.hpp @@ -36,7 +36,6 @@ public: void test_buffer_is_empty(); - void test_buffer_nfree(); void test_buffer_nfull(); @@ -45,27 +44,23 @@ public: void test_buffer_count(); - - - void test_reader_is_open(); + void test_reader_has_block(); void test_reader_connect(); void test_reader_disconnect_while_acquired(); void test_reader_stop_next(); + + void test_reader_stop_next_from_thread(); void test_writer_connect(); - void test_disconnect(); - - void test_reconnect(); - - void test_stop_next(); + void test_writer_disconnect_while_acquired(); - void test_cuda_register_memory(); + void test_writer_stop_next(); - void test_id(); + void test_writer_stop_next_from_thread(); private: void write_header_slot(); diff --git a/psrdada_cpp/test/src/dada_client_tester.cpp b/psrdada_cpp/test/src/dada_client_tester.cpp index 6ef595dbf3157796eb9125feabc60d7051514fa9..432761b310729dde752d6a1e719f7a9bb4ca4c0d 100644 --- a/psrdada_cpp/test/src/dada_client_tester.cpp +++ b/psrdada_cpp/test/src/dada_client_tester.cpp @@ -134,7 +134,7 @@ void DadaClientTester::test_buffer_count() ASSERT_EQ(config.nbufs, test_reader->data_buffer_count()); } -void DadaClientTester::test_reader_is_open() +void DadaClientTester::test_reader_has_block() { auto& hstream = test_reader->header_stream(); auto& dstream = test_reader->data_stream(); @@ -142,12 +142,12 @@ void DadaClientTester::test_reader_is_open() this->write_header_slot(); // Write one slot, so it can be read hstream.next(); // Acquire the header slot dstream.next(); // Acquire the data slot - ASSERT_TRUE(dstream.is_open()); - ASSERT_TRUE(hstream.is_open()); + ASSERT_TRUE(dstream.has_block()); + ASSERT_TRUE(hstream.has_block()); dstream.release(); // Release the read header slot hstream.release(); // Release the read data slot - ASSERT_FALSE(dstream.is_open()); - ASSERT_FALSE(hstream.is_open()); + ASSERT_FALSE(dstream.has_block()); + ASSERT_FALSE(hstream.has_block()); } void DadaClientTester::test_reader_stop_next() @@ -159,6 +159,22 @@ void DadaClientTester::test_reader_stop_next() EXPECT_THROW(dstream.next(), DadaInterruptException); } +void DadaClientTester::test_reader_stop_next_from_thread() +{ + auto& hstream = test_reader->header_stream(); + auto& dstream = test_reader->data_stream(); + std::thread hthread([&hstream]() { + EXPECT_THROW(hstream.next(), DadaInterruptException); + }); + test_reader->stop_next(); + hthread.join(); + std::thread dthread([&dstream]() { + EXPECT_THROW(dstream.next(), DadaInterruptException); + }); + test_reader->stop_next(); + dthread.join(); +} + void DadaClientTester::test_reader_connect() { ASSERT_TRUE(test_reader->is_connected()); @@ -172,13 +188,13 @@ void DadaClientTester::test_reader_disconnect_while_acquired() { this->write_data_slot(); // Write one slot, so it can be read test_reader->data_stream().next(); // Acquire the slot - ASSERT_TRUE(test_reader->data_stream().is_open()); + ASSERT_TRUE(test_reader->data_stream().has_block()); test_reader->disconnect(); // Disconnect while acquired ASSERT_FALSE(test_reader->is_connected()); this->write_data_slot(); test_reader->connect(); test_reader->data_stream().next(); - ASSERT_TRUE(test_reader->data_stream().is_open()); + ASSERT_TRUE(test_reader->data_stream().has_block()); } void DadaClientTester::test_writer_connect() @@ -190,23 +206,48 @@ void DadaClientTester::test_writer_connect() ASSERT_TRUE(test_writer->is_connected()); } -// void DadaClientTester::test_writer_disconnect() -// { - -// } -// void DadaClientTester::test_reconnect() -// { +void DadaClientTester::test_writer_disconnect_while_acquired() +{ + test_writer->data_stream().next(); // Acquire the slot + test_writer->header_stream().next(); // Acquire the slot + ASSERT_TRUE(test_writer->data_stream().has_block()); + ASSERT_TRUE(test_writer->header_stream().has_block()); + test_writer->disconnect(); // Disconnect while acquired + ASSERT_EQ(test_reader->data_buffer_full_slots(), 1); // Use reader to get full slots as writer is disconnected + ASSERT_EQ(test_reader->header_buffer_full_slots(), 1); // Use reader to get full slots as writer is disconnected + ASSERT_FALSE(test_writer->is_connected()); + test_writer->connect(); +} -// } -// void DadaClientTester::test_cuda_register_memory() -// { +void DadaClientTester::test_writer_stop_next() +{ + auto& hstream = test_writer->header_stream(); + auto& dstream = test_writer->data_stream(); + test_writer->stop_next(); // Set before next call + EXPECT_THROW(hstream.next(), DadaInterruptException); + EXPECT_THROW(dstream.next(), DadaInterruptException); +} -// } -// void DadaClientTester::test_id() -// { +void DadaClientTester::test_writer_stop_next_from_thread() +{ + auto& hstream = test_writer->header_stream(); + auto& dstream = test_writer->data_stream(); + std::thread hthread([&hstream]() { + EXPECT_THROW(hstream.next(), DadaInterruptException); + }); + test_writer->stop_next(); + hthread.join(); + std::thread dthread([&dstream]() { + EXPECT_THROW(dstream.next(), DadaInterruptException); + }); + test_writer->stop_next(); + dthread.join(); +} -// } +// ---------------- +// HELPER FUNCTIONS +// ---------------- void DadaClientTester::write_header_slot() { auto& hstream = test_writer->header_stream(); @@ -278,15 +319,16 @@ TEST_P(DadaClientTester, test_buffer_nfull){ test_buffer_nfull(); } TEST_P(DadaClientTester, test_buffer_size){ test_buffer_size(); } TEST_P(DadaClientTester, test_buffer_count){ test_buffer_count(); } -TEST_P(DadaClientTester, test_reader_is_open){ test_reader_is_open(); } +TEST_P(DadaClientTester, test_reader_has_block){ test_reader_has_block(); } TEST_P(DadaClientTester, test_reader_connect){ test_reader_connect(); } TEST_P(DadaClientTester, test_reader_disconnect_while_acquired){ test_reader_disconnect_while_acquired(); } TEST_P(DadaClientTester, test_reader_interrupt_when_acquire){ test_reader_stop_next(); } +TEST_P(DadaClientTester, test_reader_stop_next_from_thread){ test_reader_stop_next_from_thread(); } TEST_P(DadaClientTester, test_writer_connect){ test_writer_connect(); } -// TEST_P(DadaClientTester, test_disconnect){ test_writer_disconnect(); } -// TEST_P(DadaClientTester, test_reconnect){ test_reconnect(); } -// TEST_P(DadaClientTester, test_stop_next){ test_stop_next(); } +TEST_P(DadaClientTester, test_writer_disconnect_while_acquired){ test_writer_disconnect_while_acquired(); } +TEST_P(DadaClientTester, test_writer_stop_next){ test_writer_stop_next(); } +TEST_P(DadaClientTester, test_writer_stop_next_from_thread){ test_writer_stop_next_from_thread(); } INSTANTIATE_TEST_SUITE_P(DadaClientBaseTesterInstantiation, DadaClientTester, ::testing::Values( // in_key | out_key | device_id | heap_size | n_sample | n_channel | n_element | n_pol | n_acc | n_bit