Commit 8e3cc058 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Refaktor dada buffer layout calculation

parent 33acdb93
......@@ -10,6 +10,7 @@ set(psrdada_cpp_effelsberg_edd_src
src/DetectorAccumulator.cu
src/ScaledTransposeTFtoTFT.cu
src/VLBI.cu
src/DadaBufferLayout.cpp
)
cuda_add_library(${CMAKE_PROJECT_NAME}_effelsberg_edd ${psrdada_cpp_effelsberg_edd_src})
......
#include "dada_hdu.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
/**
@class DadaBufferLayout
@brief Calculate the data layout of the buffer
*/
class DadaBufferLayout
{
private:
size_t _bufferSize;
size_t _heapSize;
size_t _nSideChannels;
key_t _input_key;
size_t _sideChannelSize;
size_t _nHeaps;
size_t _gapSize;
size_t _dataBlockBytes;
public:
DadaBufferLayout(key_t input_key , size_t heapSize, size_t nSideChannels);
key_t getInputkey() const;
size_t getBufferSize() const;
size_t getHeapSize() const;
size_t getNSideChannels() const;
// returns size of data in buffer block in bytes
size_t sizeOfData() const;
// return size of gap between data and side channel
size_t sizeOfGap() const;
// returns size of side channelm data in buffer block in bytes
size_t sizeOfSideChannelData() const;
// number of heaps stored in one block of the buffer
size_t getNHeaps() const;
};
} // edd
} // effelsberg
} // psrdada_cpp
......@@ -7,6 +7,7 @@
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
#include "psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp"
#include "thrust/device_vector.h"
#include "cufft.h"
......@@ -56,9 +57,9 @@ public:
* @param handler Output handler
*
*/
GatedSpectrometer(std::size_t buffer_bytes, std::size_t nSideChannels,
GatedSpectrometer(const DadaBufferLayout &bufferLayout,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t speadHeapSize, std::size_t fft_length,
std::size_t fft_length,
std::size_t naccumulate, std::size_t nbits,
float input_level, float output_level,
HandlerType &handler);
......@@ -93,19 +94,12 @@ private:
thrust::device_vector<size_t> &noOfBitSet);
private:
std::size_t _buffer_bytes;
DadaBufferLayout _dadaBufferLayout;
std::size_t _fft_length;
std::size_t _naccumulate;
std::size_t _nbits;
std::size_t _nSideChannels;
std::size_t _selectedSideChannel;
std::size_t _selectedBit;
std::size_t _speadHeapSize;
std::size_t _sideChannelSize;
std::size_t _totalHeapSize;
std::size_t _nHeaps;
std::size_t _gapSize;
std::size_t _dataBlockBytes;
std::size_t _batch;
std::size_t _nsamps_per_output_spectra;
std::size_t _nsamps_per_buffer;
......@@ -138,6 +132,7 @@ private:
};
/**
* @brief Splits the input data depending on a bit set into two arrays.
*
......
......@@ -104,14 +104,12 @@ __global__ void array_sum(float *in, size_t N, float *out) {
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
std::size_t buffer_bytes, std::size_t nSideChannels,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t speadHeapSize, std::size_t fft_length, std::size_t naccumulate,
const DadaBufferLayout &dadaBufferLayout,
std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
std::size_t nbits, float input_level, float output_level,
HandlerType &handler)
: _buffer_bytes(buffer_bytes), _nSideChannels(nSideChannels),
HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout),
_selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
_speadHeapSize(speadHeapSize), _fft_length(fft_length),
_fft_length(fft_length),
_naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
_call_count(0) {
......@@ -126,29 +124,17 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
<< "Creating new GatedSpectrometer instance with parameters: \n"
<< " fft_length " << _fft_length << "\n"
<< " naccumulate " << _naccumulate << "\n"
<< " nSideChannels " << _nSideChannels << "\n"
<< " speadHeapSize " << _speadHeapSize << " byte\n"
<< " nSideChannels " << _dadaBufferLayout.getNSideChannels() << "\n"
<< " speadHeapSize " << _dadaBufferLayout.getHeapSize() << " byte\n"
<< " selectedSideChannel " << _selectedSideChannel << "\n"
<< " selectedBit " << _selectedBit << "\n"
<< " output bit depth " << sizeof(IntegratedPowerType) * 8;
_sideChannelSize = nSideChannels * sizeof(int64_t);
_totalHeapSize = _speadHeapSize + _sideChannelSize;
_nHeaps = buffer_bytes / _totalHeapSize;
_gapSize = (buffer_bytes - _nHeaps * _totalHeapSize);
_dataBlockBytes = _nHeaps * _speadHeapSize;
assert((nSideChannels == 0) ||
(selectedSideChannel <
nSideChannels)); // Sanity check of side channel value
assert((_dadaBufferLayout.getNSideChannels() == 0) ||
(selectedSideChannel < _dadaBufferLayout.getNSideChannels())); // Sanity check of side channel value
assert(selectedBit < 64); // Sanity check of selected bit
BOOST_LOG_TRIVIAL(info) << "Resulting memory configuration: \n"
<< " totalSizeOfHeap: " << _totalHeapSize << " byte\n"
<< " number of heaps per buffer: " << _nHeaps << "\n"
<< " resulting gap: " << _gapSize << " byte\n"
<< " datablock size in buffer: " << _dataBlockBytes << " byte\n";
_nsamps_per_buffer = _dataBlockBytes * 8 / nbits;
_nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
_nsamps_per_output_spectra = fft_length * naccumulate;
int nBlocks;
......@@ -168,7 +154,6 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
}
BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";
std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t);
_nchans = _fft_length / 2 + 1;
int batch = _nsamps_per_buffer / _fft_length;
float dof = 2 * _naccumulate;
......@@ -187,8 +172,8 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
cufftSetStream(_fft_plan, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
_raw_voltage_db.resize(n64bit_words);
_sideChannelData_db.resize(_sideChannelSize * _nHeaps);
_raw_voltage_db.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
_sideChannelData_db.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): "
<< _raw_voltage_db.size();
_unpacked_voltage_G0.resize(_nsamps_per_buffer);
......@@ -290,7 +275,7 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
thrust::raw_pointer_cast(_unpacked_voltage_G0.data()),
thrust::raw_pointer_cast(_unpacked_voltage_G1.data()),
thrust::raw_pointer_cast(sideChannelData.data()),
_unpacked_voltage_G0.size(), _speadHeapSize, _selectedBit, _nSideChannels,
_unpacked_voltage_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
_selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()));
for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
......@@ -298,7 +283,7 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
countBitSet<<<(sideChannelData.size()+255)/256, 256, 0,
_proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / _noOfBitSetsInSideChannel.size() ),
sideChannelData.size() / _noOfBitSetsInSideChannel.size(), _selectedBit,
_nSideChannels, _selectedBit,
_dadaBufferLayout.getNSideChannels(), _selectedBit,
thrust::raw_pointer_cast(noOfBitSet.data() + i));
}
......@@ -327,10 +312,10 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
++_call_count;
BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
<< _call_count << ")";
if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
<< block.used_bytes() << " byte, expected "
<< _buffer_bytes << " byte)";
<< _dadaBufferLayout.getBufferSize() << " byte)";
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
cudaProfilerStop();
return true;
......@@ -342,16 +327,16 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
_sideChannelData_db.swap();
BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes()
<< ", dataBlockBytes = " << _dataBlockBytes << "\n";
<< ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
static_cast<void *>(block.ptr()),
_dataBlockBytes, cudaMemcpyHostToDevice,
_dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice,
_h2d_stream));
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void *>(_sideChannelData_db.a_ptr()),
static_cast<void *>(block.ptr() + _dataBlockBytes + _gapSize),
_sideChannelSize * _nHeaps, cudaMemcpyHostToDevice, _h2d_stream));
static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
_dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
if (_call_count == 1) {
return false;
......@@ -414,7 +399,7 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
*off_values = _nHeaps - (*on_values);
*off_values = _dadaBufferLayout.getNHeaps() - (*on_values);
BOOST_LOG_TRIVIAL(info) << " " << i << ": No of bit set in side channel: " << *on_values << " / " << *off_values << std::endl;
}
......
#include "psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/dada_client_base.hpp"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
DadaBufferLayout::DadaBufferLayout(key_t input_key, size_t heapSize, size_t nSideChannels) : _input_key(input_key), _heapSize(heapSize), _nSideChannels(nSideChannels)
{
MultiLog log("DadaBufferLayout");
DadaClientBase client(input_key, log);
std::size_t _bufferSize = client.data_buffer_size();
_sideChannelSize = nSideChannels * sizeof(int64_t);
size_t totalHeapSize = _heapSize + _sideChannelSize;
_nHeaps = _bufferSize / totalHeapSize;
_gapSize = (_bufferSize - _nHeaps * totalHeapSize);
_dataBlockBytes = _nHeaps * _heapSize;
BOOST_LOG_TRIVIAL(debug) << "Memory configuration of dada buffer: \n"
<< " number of heaps per buffer: " << _nHeaps << "\n"
<< " resulting gap: " << _gapSize << " byte\n"
<< " datablock size in buffer: " << _dataBlockBytes << " byte\n";
}
key_t DadaBufferLayout::getInputkey() const
{
return _input_key;
}
size_t DadaBufferLayout::getBufferSize() const
{
return _bufferSize;
}
size_t DadaBufferLayout::getHeapSize() const
{
return _heapSize;
}
size_t DadaBufferLayout::getNSideChannels() const
{
return _nSideChannels;
}
size_t DadaBufferLayout::sizeOfData() const
{
return _dataBlockBytes;
}
size_t DadaBufferLayout::sizeOfGap() const
{
return _gapSize;
}
size_t DadaBufferLayout::sizeOfSideChannelData() const
{
return _sideChannelSize * _nHeaps * sizeof(uint64_t);
}
size_t DadaBufferLayout::getNHeaps() const
{
return _nHeaps;
}
} // edd
} // effelsberg
} // psrdada_cpp
#include "boost/program_options.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_client_base.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_null_sink.hpp"
#include "psrdada_cpp/dada_output_stream.hpp"
......@@ -26,32 +25,32 @@ const size_t ERROR_UNHANDLED_EXCEPTION = 2;
template<typename T>
void launchSpectrometer(std::string const &output_type, key_t input_key, std::string const &filename, size_t nSideChannels, size_t selectedSideChannel, size_t selectedBit, size_t speadHeapSize, size_t fft_length, size_t naccumulate, unsigned int nbits, float input_level, float output_level)
void launchSpectrometer(const effelsberg::edd::DadaBufferLayout &dadaBufferLayout, const std::string &output_type, const std::string &filename, size_t selectedSideChannel, size_t selectedBit, size_t fft_length, size_t naccumulate, unsigned int nbits, float input_level, float output_level)
{
MultiLog log("edd::GatedSpectrometer");
DadaClientBase client(input_key, log);
std::size_t buffer_bytes = client.data_buffer_size();
MultiLog log("DadaBufferLayout");
std::cout << "Running with output_type: " << output_type << std::endl;
if (output_type == "file")
{
SimpleFileWriter sink(filename);
effelsberg::edd::GatedSpectrometer<decltype(sink), T> spectrometer(
buffer_bytes, nSideChannels, selectedSideChannel, selectedBit,
speadHeapSize, fft_length, naccumulate, nbits, input_level,
effelsberg::edd::GatedSpectrometer<decltype(sink), T> spectrometer(dadaBufferLayout,
selectedSideChannel, selectedBit,
fft_length, naccumulate, nbits, input_level,
output_level, sink);
DadaInputStream<decltype(spectrometer)> istream(input_key, log,
DadaInputStream<decltype(spectrometer)> istream(dadaBufferLayout.getInputkey(), log,
spectrometer);
istream.start();
}
else if (output_type == "dada")
{
DadaOutputStream sink(string_to_key(filename), log);
effelsberg::edd::GatedSpectrometer<decltype(sink), T> spectrometer(
buffer_bytes, nSideChannels, selectedSideChannel, selectedBit,
speadHeapSize, fft_length, naccumulate, nbits, input_level,
effelsberg::edd::GatedSpectrometer<decltype(sink), T> spectrometer(dadaBufferLayout,
selectedSideChannel, selectedBit,
fft_length, naccumulate, nbits, input_level,
output_level, sink);
DadaInputStream<decltype(spectrometer)> istream(input_key, log,
DadaInputStream<decltype(spectrometer)> istream(dadaBufferLayout.getInputkey(), log,
spectrometer);
istream.start();
}
......@@ -184,16 +183,18 @@ int main(int argc, char **argv) {
return ERROR_IN_COMMAND_LINE;
}
effelsberg::edd::DadaBufferLayout bufferLayout(input_key, speadHeapSize, nSideChannels);
if (output_bit_depth == 8)
{
launchSpectrometer<int8_t>(output_type, input_key, filename,
nSideChannels, selectedSideChannel, selectedBit, speadHeapSize,
launchSpectrometer<int8_t>(bufferLayout, output_type, filename,
selectedSideChannel, selectedBit,
fft_length, naccumulate, nbits, input_level, output_level);
}
else if (output_bit_depth == 32)
{
launchSpectrometer<float>(output_type, input_key, filename,
nSideChannels, selectedSideChannel, selectedBit, speadHeapSize,
launchSpectrometer<float>(bufferLayout, output_type, filename,
selectedSideChannel, selectedBit,
fft_length, naccumulate, nbits, input_level, output_level);
}
else
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment