Commit afaa5679 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Resolved merge conflict

parents 57e8a47c 2e9cefef
......@@ -6,12 +6,13 @@ set(PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES
${DEPENDENCY_LIBRARIES})
set(psrdada_cpp_effelsberg_edd_src
src/Unpacker.cu
src/Channeliser.cu
src/DadaBufferLayout.cpp
src/DetectorAccumulator.cu
src/ScaledTransposeTFtoTFT.cu
src/VLBI.cu
src/Tools.cu
src/DadaBufferLayout.cpp
src/Unpacker.cu
src/VLBI.cu
)
cuda_add_library(${CMAKE_PROJECT_NAME}_effelsberg_edd ${psrdada_cpp_effelsberg_edd_src})
......
......@@ -6,6 +6,7 @@
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/dada_write_client.hpp"
#include "thrust/device_vector.h"
#include "cufft.h"
......@@ -13,7 +14,6 @@ namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
template <class HandlerType>
class Channeliser
{
public:
......@@ -29,7 +29,7 @@ public:
std::size_t nbits,
float input_level,
float output_level,
HandlerType& handler);
DadaWriteClient& client);
~Channeliser();
/**
......@@ -61,7 +61,7 @@ private:
std::size_t _buffer_bytes;
std::size_t _fft_length;
std::size_t _nbits;
HandlerType& _handler;
DadaWriteClient& _client;
cufftHandle _fft_plan;
int _nchans;
int _call_count;
......@@ -71,7 +71,6 @@ private:
DoubleDeviceBuffer<PackedChannelisedVoltageType> _packed_channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
DoublePinnedHostBuffer<PackedChannelisedVoltageType> _host_packed_channelised_voltage;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
......@@ -82,5 +81,4 @@ private:
} //effelsberg
} //psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/detail/Channeliser.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
......@@ -10,18 +10,17 @@ namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
template <class HandlerType>
Channeliser<HandlerType>::Channeliser(
Channeliser::Channeliser(
std::size_t buffer_bytes,
std::size_t fft_length,
std::size_t nbits,
float input_level,
float output_level,
HandlerType& handler)
DadaWriteClient& client)
: _buffer_bytes(buffer_bytes)
, _fft_length(fft_length)
, _nbits(nbits)
, _handler(handler)
, _client(client)
, _fft_plan(0)
, _call_count(0)
{
......@@ -36,6 +35,9 @@ Channeliser<HandlerType>::Channeliser(
std::size_t n64bit_words = buffer_bytes / sizeof(uint64_t);
_nchans = _fft_length / 2 + 1;
int batch = nsamps_per_buffer/_fft_length;
std::size_t packed_channelised_voltage_bytes = _nchans * batch * sizeof(PackedChannelisedVoltageType);
BOOST_LOG_TRIVIAL(debug) << "Output buffer bytes: " << packed_channelised_voltage_bytes;
assert(_client.data_buffer_size() == packed_channelised_voltage_bytes /* Incorrect output DADA buffer size */);
BOOST_LOG_TRIVIAL(debug) << "Calculating scales and offsets";
float scale = std::sqrt(_nchans) * input_level;
BOOST_LOG_TRIVIAL(debug) << "Correction factors for 8-bit conversion: scaling = " << scale;
......@@ -54,7 +56,6 @@ Channeliser<HandlerType>::Channeliser(
BOOST_LOG_TRIVIAL(debug) << "Channelised voltages size: " << _channelised_voltage.size();
_packed_channelised_voltage.resize(_channelised_voltage.size());
BOOST_LOG_TRIVIAL(debug) << "Packed channelised voltages size: " << _packed_channelised_voltage.size();
_host_packed_channelised_voltage.resize(_packed_channelised_voltage.size());
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
......@@ -63,8 +64,7 @@ Channeliser<HandlerType>::Channeliser(
_transposer.reset(new ScaledTransposeTFtoTFT(_nchans, 8192, scale, 0.0, _proc_stream));
}
template <class HandlerType>
Channeliser<HandlerType>::~Channeliser()
Channeliser::~Channeliser()
{
BOOST_LOG_TRIVIAL(debug) << "Destroying Channeliser";
if (!_fft_plan)
......@@ -74,15 +74,17 @@ Channeliser<HandlerType>::~Channeliser()
cudaStreamDestroy(_d2h_stream);
}
template <class HandlerType>
void Channeliser<HandlerType>::init(RawBytes& block)
void Channeliser::init(RawBytes& block)
{
BOOST_LOG_TRIVIAL(debug) << "Channeliser init called";
_handler.init(block);
auto& header_block = _client.header_stream().next();
/* Populate new header */
std::memcpy(header_block.ptr(), block.ptr(), block.total_bytes());
header_block.used_bytes(header_block.total_bytes());
_client.header_stream().release();
}
template <class HandlerType>
void Channeliser<HandlerType>::process(
void Channeliser::process(
thrust::device_vector<RawVoltageType> const& digitiser_raw,
thrust::device_vector<PackedChannelisedVoltageType>& packed_channelised)
{
......@@ -103,8 +105,7 @@ void Channeliser<HandlerType>::process(
_transposer->transpose(_channelised_voltage, packed_channelised);
}
template <class HandlerType>
bool Channeliser<HandlerType>::operator()(RawBytes& block)
bool Channeliser::operator()(RawBytes& block)
{
++_call_count;
BOOST_LOG_TRIVIAL(debug) << "Channeliser operator() called (count = " << _call_count << ")";
......@@ -131,30 +132,21 @@ bool Channeliser<HandlerType>::operator()(RawBytes& block)
{
return false;
}
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
_host_packed_channelised_voltage.swap();
if (_call_count > 3)
{
_client.data_stream().release();
}
auto& data_block = _client.data_stream().next();
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void*>(_host_packed_channelised_voltage.a_ptr()),
static_cast<void*>(data_block.ptr()),
static_cast<void*>(_packed_channelised_voltage.b_ptr()),
_packed_channelised_voltage.size() * sizeof(PackedChannelisedVoltageType),
cudaMemcpyDeviceToHost,
_d2h_stream));
if (_call_count == 3)
{
return false;
}
//Wrap _detected_host_previous in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char*>(_host_packed_channelised_voltage.b_ptr()),
_host_packed_channelised_voltage.size() * sizeof(PackedChannelisedVoltageType),
_host_packed_channelised_voltage.size() * sizeof(PackedChannelisedVoltageType));
BOOST_LOG_TRIVIAL(debug) << "Calling handler";
// The handler can't do anything asynchronously without a copy here
// as it would be unsafe (given that it does not own the memory it
// is being passed).
return _handler(bytes);
data_block.used_bytes(data_block.total_bytes());
return false;
}
} //edd
......
#include "psrdada_cpp/effelsberg/edd/test/ChanneliserTester.cuh"
#include "psrdada_cpp/effelsberg/edd/Channeliser.cuh"
#include "psrdada_cpp/dada_null_sink.hpp"
#include "psrdada_cpp/dada_db.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include <vector>
#include <thread>
#include <chrono>
#include <exception>
namespace psrdada_cpp {
namespace effelsberg {
......@@ -33,19 +39,35 @@ void ChanneliserTester::TearDown()
void ChanneliserTester::performance_test(std::size_t fft_length, std::size_t nbits)
{
std::size_t input_block_bytes = fft_length * 8192 * 1024 * nbits / 8;
std::size_t output_block_bytes = (fft_length/2 + 1) * 8192 * 1024 * sizeof(Channeliser::PackedChannelisedVoltageType);
DadaDB output_buffer(8, output_block_bytes);
output_buffer.create();
MultiLog log("test_log");
NullSink null_sink;
DadaInputStream<NullSink> consumer(output_buffer.key(), log, null_sink);
std::thread consumer_thread( [&](){
try {
consumer.start();
} catch (std::exception& e) {
BOOST_LOG_TRIVIAL(error) << e.what();
}
});
DadaWriteClient client(output_buffer.key(), log);
DoublePinnedHostBuffer<char> input_block;
input_block.resize(input_block_bytes);
RawBytes input_raw_bytes(input_block.a_ptr(), input_block_bytes, input_block_bytes);
std::vector<char> header_block(4096);
RawBytes header_raw_bytes(header_block.data(), 4096, 4096);
NullSink null_sink;
Channeliser<NullSink> channeliser(input_block_bytes, fft_length, nbits, 16.0f, 16.0f, null_sink);
Channeliser channeliser(input_block_bytes, fft_length, nbits, 16.0f, 16.0f, client);
channeliser.init(header_raw_bytes);
for (int ii = 0; ii < 100; ++ii)
{
channeliser(input_raw_bytes);
}
consumer.stop();
std::this_thread::sleep_for(std::chrono::seconds(1));
channeliser(input_raw_bytes);
consumer_thread.join();
}
......
......@@ -79,7 +79,7 @@ TEST_F(ScaledTransposeTFtoTFTTester, counter_test)
int nsamps_per_packet = 8192;
float stdev = 64.0f;
float scale = 4.0f;
int nsamps = nsamps_per_packet * 1024;
int nsamps = nsamps_per_packet * 32;
int n = nchans * nsamps;
std::default_random_engine generator;
std::normal_distribution<float> distribution(0.0, stdev);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment