Commit 137c6446 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Allow integration over more than one block

parent 185248ee
......@@ -31,7 +31,7 @@ void detect_and_accumulate(float2 const* __restrict__ in, int8_t* __restrict__ o
sum += x + y;
}
size_t toff = out_offset * nchans + currentOutputSpectra * nchans *stride;
out[toff + i] = (int8_t) ((sum - offset)/scale);
out[toff + i] += (int8_t) ((sum - offset)/scale);
}
}
......@@ -56,7 +56,7 @@ void detect_and_accumulate(float2 const* __restrict__ in, float* __restrict__ ou
sum += x + y;
}
size_t toff = out_offset * nchans + currentOutputSpectra * nchans * stride;
out[i + toff] = sum;
out[i + toff] += sum;
}
}
......
......@@ -107,6 +107,8 @@ private:
std::size_t _gapSize;
std::size_t _dataBlockBytes;
std::size_t _batch;
std::size_t _nsamps_per_output_spectra;
std::size_t _nsamps_per_buffer;
HandlerType &_handler;
cufftHandle _fft_plan;
......@@ -115,20 +117,21 @@ private:
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector;
// Input data
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<IntegratedPowerType> _power_db;
DoubleDeviceBuffer<int64_t> _sideChannelData_db;
// Output data
DoubleDeviceBuffer<IntegratedPowerType> _power_db;
DoubleDeviceBuffer<size_t> _noOfBitSetsInSideChannel;
size_t _noOfBitSetsInSideChannel_host [2];
DoublePinnedHostBuffer<char> _host_power_db;
// Intermediate process steps
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _baseLineN;
DoublePinnedHostBuffer<char> _host_power_db;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
......
......@@ -2,8 +2,10 @@
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include <cuda.h>
#include <cuda_profiler_api.h>
#include <thrust/system/cuda/execution_policy.h>
#include <iostream>
#include <cstring>
......@@ -135,6 +137,7 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
_nHeaps = buffer_bytes / _totalHeapSize;
_gapSize = (buffer_bytes - _nHeaps * _totalHeapSize);
_dataBlockBytes = _nHeaps * _speadHeapSize;
assert((nSideChannels == 0) ||
(selectedSideChannel <
nSideChannels)); // Sanity check of side channel value
......@@ -145,10 +148,29 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
<< " resulting gap: " << _gapSize << " byte\n"
<< " datablock size in buffer: " << _dataBlockBytes << " byte\n";
std::size_t nsamps_per_buffer = _dataBlockBytes * 8 / nbits;
_nsamps_per_buffer = _dataBlockBytes * 8 / nbits;
_nsamps_per_output_spectra = fft_length * naccumulate;
int nBlocks;
if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
{ // one buffer block is used for one or multiple output spectra
size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
// All data in one block has to be used
assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
nBlocks = 1;
}
else
{ // multiple blocks are integrated intoone output
size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer;
// All data in multiple blocks has to be used
assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
nBlocks = N;
}
BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";
std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t);
_nchans = _fft_length / 2 + 1;
int batch = nsamps_per_buffer / _fft_length;
int batch = _nsamps_per_buffer / _fft_length;
float dof = 2 * _naccumulate;
float scale =
std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
......@@ -169,8 +191,8 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
_sideChannelData_db.resize(_sideChannelSize * _nHeaps);
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): "
<< _raw_voltage_db.size();
_unpacked_voltage_G0.resize(nsamps_per_buffer);
_unpacked_voltage_G1.resize(nsamps_per_buffer);
_unpacked_voltage_G0.resize(_nsamps_per_buffer);
_unpacked_voltage_G1.resize(_nsamps_per_buffer);
_baseLineN.resize(array_sum_Nthreads);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): "
......@@ -178,11 +200,17 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
_channelised_voltage.resize(_nchans * batch);
BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: "
<< _channelised_voltage.size();
_power_db.resize(_nchans * batch / _naccumulate * 2); // hold on and off spectra to simplify output
BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2;
_power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2); // hold on and off spectra to simplify output
thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2;
_noOfBitSetsInSideChannel.resize( batch / (_naccumulate / nBlocks));
thrust::fill( _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.);
thrust::fill( _noOfBitSetsInSideChannel.b().begin(), _noOfBitSetsInSideChannel.b().end(), 0.);
// on the host both power are stored in the same data buffer together with
// the number of bit sets
_noOfBitSetsInSideChannel.resize( batch / _naccumulate);
_host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsInSideChannel.size());
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
......@@ -191,7 +219,7 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));
_unpacker.reset(new Unpacker(_proc_stream));
_detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate, scaling,
_detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling,
offset, _proc_stream));
} // constructor
......@@ -329,22 +357,32 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
if (_call_count == 1) {
return false;
}
// process data
//process data
_power_db.swap();
_noOfBitSetsInSideChannel.swap();
// only if a newblock is started the output buffer is swapped. Otherwise the
// new data is added to it
bool newBlock = false;
if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device
{
BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
newBlock = true;
_power_db.swap();
_noOfBitSetsInSideChannel.swap();
// move to specific stream!
thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
thrust::fill( _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.);
}
process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsInSideChannel.a());
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
if (_call_count == 2) {
if ((_call_count == 2) || (!newBlock)) {
return false;
}
//copy data to host
// copy data to host if block is finished
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
_host_power_db.swap();
std::swap(_noOfBitSetsInSideChannel_host[0], _noOfBitSetsInSideChannel_host[1]);
for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
{
......@@ -361,7 +399,7 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
}
}
BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
......@@ -370,7 +408,7 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
}
// calculate off value
BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count << " with " << _noOfBitSetsInSideChannel.size() << " output heaps:";
BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count << " with " << _noOfBitSetsInSideChannel.size() << " output heaps:";
for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
{
size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
......@@ -382,10 +420,6 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
BOOST_LOG_TRIVIAL(info) << " " << i << ": No of bit set in side channel: " << *on_values << " / " << *off_values << std::endl;
}
// call handler
// Wrap in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
_host_power_db.size(),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment