Skip to content
Snippets Groups Projects
Commit 5047a7a2 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Refactored counting of samples per gate to include lost heaps.

parent 2c082719
No related branches found
No related tags found
No related merge requests found
......@@ -24,7 +24,8 @@ namespace edd {
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0)
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
/**
@class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on
......@@ -36,6 +37,7 @@ public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
// typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
......@@ -91,7 +93,8 @@ private:
void process(thrust::device_vector<RawVoltageType> const &digitiser_raw,
thrust::device_vector<uint64_t> const &sideChannelData,
thrust::device_vector<IntegratedPowerType> &detected,
thrust::device_vector<size_t> &noOfBitSet);
thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1);
private:
DadaBufferLayout _dadaBufferLayout;
......@@ -109,6 +112,8 @@ private:
cufftHandle _fft_plan;
uint64_t _nchans;
uint64_t _call_count;
double _processing_efficiency;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector;
......@@ -118,7 +123,10 @@ private:
// Output data
DoubleDeviceBuffer<IntegratedPowerType> _power_db;
DoubleDeviceBuffer<size_t> _noOfBitSetsInSideChannel;
DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G0;
DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G1;
DoublePinnedHostBuffer<char> _host_power_db;
// Intermediate process steps
......@@ -153,10 +161,16 @@ private:
* data.
* @param selectedSideChannel No. of side channel item to be eveluated.
0 <= selectedSideChannel < noOfSideChannels.
* @param stats_G0 No. of sampels contributing to G0, accounting also
* for loat heaps
* @param stats_G1 No. of sampels contributing to G1, accounting also
* for loat heaps
*/
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel, const float *_baseLine);
size_t noOfSideChannels, size_t selectedSideChannel,
const float *_baseLine, uint64_cu* stats_G0, uint64_cu*
stats_G1);
......
......@@ -19,8 +19,14 @@ namespace edd {
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) {
size_t noOfSideChannels, size_t selectedSideChannel,
const float* __restrict__ _baseLineN, uint64_cu* stats_G0, uint64_cu* stats_G1) {
float baseLine = (*_baseLineN) / N;
// statistics values for samopels to G0, G1
uint32_t _G0stats = 0;
uint32_t _G1stats = 0;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
const float w = G0[i] - baseLine;
......@@ -31,9 +37,41 @@ __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uin
// threads, but maybe efficiently
// handled by cache?
const int bit_set = TEST_BIT(sideChannelItem, bitpos);
G1[i] = w * bit_set + baseLine;
G0[i] = w * (!bit_set) + baseLine;
const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
G1[i] = w * bit_set * (!heap_lost) + baseLine;
G0[i] = w * (!bit_set) *(!heap_lost) + baseLine;
_G0stats += (!bit_set) *(!heap_lost);
_G1stats += bit_set * (!heap_lost);
}
__shared__ uint32_t x[1024];
// Reduce G0, G1
x[threadIdx.x] = _G0stats;
__syncthreads();
for(int s = blockDim.x / 2; s > 0; s = s / 2)
{
if (threadIdx.x < s)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
if(threadIdx.x == 0)
atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]);
x[threadIdx.x] = _G1stats;
__syncthreads();
for(int s = blockDim.x / 2; s > 0; s = s / 2)
{
if (threadIdx.x < s)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
if(threadIdx.x == 0)
{
uint64_cu y = x[threadIdx.x];
atomicAdd(stats_G1, y) ;
}
}
......@@ -75,7 +113,7 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
_selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
_fft_length(fft_length),
_naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
_call_count(0), _nsamps_per_heap(4096) {
_call_count(0), _nsamps_per_heap(4096), _processing_efficiency(0.){
// Sanity checks
assert(((_nbits == 12) || (_nbits == 8)));
......@@ -154,14 +192,17 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2;
_noOfBitSetsInSideChannel.resize( batch / (_naccumulate / nBlocks));
thrust::fill(_noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0L);
thrust::fill(_noOfBitSetsInSideChannel.b().begin(), _noOfBitSetsInSideChannel.b().end(), 0L);
BOOST_LOG_TRIVIAL(debug) << " Bit set counter size: " << _noOfBitSetsInSideChannel.size();
_noOfBitSetsIn_G0.resize( batch / (_naccumulate / nBlocks));
_noOfBitSetsIn_G1.resize( batch / (_naccumulate / nBlocks));
thrust::fill(_noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
thrust::fill(_noOfBitSetsIn_G0.b().begin(), _noOfBitSetsIn_G0.b().end(), 0L);
thrust::fill(_noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
thrust::fill(_noOfBitSetsIn_G1.b().begin(), _noOfBitSetsIn_G1.b().end(), 0L);
BOOST_LOG_TRIVIAL(debug) << " Bit set counter size: " << _noOfBitSetsIn_G0.size();
// on the host both power are stored in the same data buffer together with
// the number of bit sets
_host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsInSideChannel.size());
_host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsIn_G0.size());
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
......@@ -219,7 +260,7 @@ template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
thrust::device_vector<RawVoltageType> const &digitiser_raw,
thrust::device_vector<uint64_t> const &sideChannelData,
thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<size_t> &noOfBitSet) {
thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1) {
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
switch (_nbits) {
case 8:
......@@ -236,21 +277,19 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
BOOST_LOG_TRIVIAL(debug) << "Perform gating";
gating<<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage_G0.data()),
thrust::raw_pointer_cast(_unpacked_voltage_G1.data()),
thrust::raw_pointer_cast(sideChannelData.data()),
_unpacked_voltage_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
_selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()));
for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
for (size_t i = 0; i < noOfBitSetsIn_G0.size(); i++)
{ // ToDo: Should be in one kernel call
countBitSet<<<1, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / _noOfBitSetsInSideChannel.size() ),
sideChannelData.size() / _noOfBitSetsInSideChannel.size(), _selectedBit,
_dadaBufferLayout.getNSideChannels(), _selectedBit,
thrust::raw_pointer_cast(noOfBitSet.data() + i));
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
gating<<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
_unpacked_voltage_G0.size() / noOfBitSetsIn_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
_selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()),
thrust::raw_pointer_cast(noOfBitSetsIn_G0.data() + i),
thrust::raw_pointer_cast(noOfBitSetsIn_G1.data() + i)
);
}
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
......@@ -303,7 +342,7 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
static_cast<void *>(_sideChannelData_db.a_ptr()),
static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
_dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(12) << std::setfill('0') << std::hex << (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()))[0] << std::dec;
BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16) << std::setfill('0') << std::hex << (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()))[0] << std::dec;
if (_call_count == 1) {
......@@ -319,13 +358,15 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
newBlock = true;
_power_db.swap();
_noOfBitSetsInSideChannel.swap();
_noOfBitSetsIn_G0.swap();
_noOfBitSetsIn_G1.swap();
// move to specific stream!
thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0L);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
}
process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsInSideChannel.a());
process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsIn_G0.a(), _noOfBitSetsIn_G1.a());
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
if ((_call_count == 2) || (!newBlock)) {
......@@ -336,7 +377,7 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
_host_power_db.swap();
for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
{
size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
// copy 2x channel data
......@@ -345,10 +386,17 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans),
2 * _nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
// copy noOf bit set data
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)),
static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr() + i ),
static_cast<void *>(_noOfBitSetsIn_G0.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)),
static_cast<void *>(_noOfBitSetsIn_G1.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
}
......@@ -360,18 +408,28 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
}
// calculate off value
BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsInSideChannel.size() << "x2 output heaps:";
for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:";
size_t total_samples_lost = 0;
for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
{
size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
*on_values *= _nsamps_per_heap;
size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
*off_values = _nsamps_per_output_spectra - (*on_values);
BOOST_LOG_TRIVIAL(info) << " " << i << ": No of samples wo/w. bit set in side channel: " << *on_values << " / " << *off_values << std::endl;
size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
total_samples_lost += samples_lost;
BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n"
<<" Samples with bit set : " << *on_values << std::endl
<<" Samples without bit set: " << *off_values << std::endl
<<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
}
double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size());
double prev_average = _processing_efficiency / (_call_count- 3 - 1);
_processing_efficiency += efficiency;
double average = _processing_efficiency / (_call_count-3);
BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")";
// Wrap in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
......
......@@ -58,6 +58,8 @@ TEST(GatedSpectrometer, GatingKernel)
thrust::device_vector<float> G0(blockSize * nBlocks);
thrust::device_vector<float> G1(blockSize * nBlocks);
thrust::device_vector<uint64_t> _sideChannelData(nBlocks);
thrust::device_vector<psrdada_cpp::effelsberg::edd::uint64_cu> _nG0(1);
thrust::device_vector<psrdada_cpp::effelsberg::edd::uint64_cu> _nG1(1);
thrust::device_vector<float> baseLine(1);
thrust::fill(G0.begin(), G0.end(), 42);
......@@ -73,7 +75,10 @@ TEST(GatedSpectrometer, GatingKernel)
thrust::raw_pointer_cast(G0.data()),
thrust::raw_pointer_cast(G1.data()), sideCD,
G0.size(), blockSize, 0, 1,
0, thrust::raw_pointer_cast(baseLine.data()));
0, thrust::raw_pointer_cast(baseLine.data()),
thrust::raw_pointer_cast(_nG0.data()),
thrust::raw_pointer_cast(_nG1.data())
);
thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax;
minmax = thrust::minmax_element(G0.begin(), G0.end());
EXPECT_EQ(*minmax.first, 42);
......@@ -82,6 +87,9 @@ TEST(GatedSpectrometer, GatingKernel)
minmax = thrust::minmax_element(G1.begin(), G1.end());
EXPECT_EQ(*minmax.first, 0);
EXPECT_EQ(*minmax.second, 0);
EXPECT_EQ(_nG0[0], G0.size());
EXPECT_EQ(_nG1[0], 0);
}
// everything to G1 // with baseline -5
......@@ -94,7 +102,10 @@ TEST(GatedSpectrometer, GatingKernel)
thrust::raw_pointer_cast(G0.data()),
thrust::raw_pointer_cast(G1.data()), sideCD,
G0.size(), blockSize, 0, 1,
0, thrust::raw_pointer_cast(baseLine.data()));
0, thrust::raw_pointer_cast(baseLine.data()),
thrust::raw_pointer_cast(_nG0.data()),
thrust::raw_pointer_cast(_nG1.data())
);
thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax;
minmax = thrust::minmax_element(G0.begin(), G0.end());
EXPECT_EQ(*minmax.first, -5.);
......@@ -103,6 +114,10 @@ TEST(GatedSpectrometer, GatingKernel)
minmax = thrust::minmax_element(G1.begin(), G1.end());
EXPECT_EQ(*minmax.first, 42);
EXPECT_EQ(*minmax.second, 42);
EXPECT_EQ(_nG0[0], G0.size());
EXPECT_EQ(_nG1[0], 0);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment