GatedSpectrometer.cu 19.2 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
16
17
18
19

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template<typename T>
__device__ void reduce(T *x, const T &v)
{
  x[threadIdx.x] = v;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }


}


36
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData,
37
                       size_t N, size_t heapSize, size_t bitpos,
38
39
                       size_t noOfSideChannels, size_t selectedSideChannel,
                       const float* __restrict__ _baseLineN, uint64_cu* stats_G0, uint64_cu* stats_G1) {
40
  float baseLine = (*_baseLineN) / N;
41

42
  // statistics values for samopels to G0, G1
43
44
45
  uint32_t _G0stats = 0;
  uint32_t _G1stats = 0;

Tobias Winchen's avatar
Tobias Winchen committed
46
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
47
       i += blockDim.x * gridDim.x) {
Tobias Winchen's avatar
Tobias Winchen committed
48
    const float w = G0[i] - baseLine;
49
    const uint64_t sideChannelItem =
50
51
52
53
54
55
        sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel]; // Probably not optimal access as
                                              // same data is copied for several
                                              // threads, but maybe efficiently
                                              // handled by cache?

56
57
58
59
60
61
62
63
64
65
    const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
    const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
    G1[i] = w * bit_set * (!heap_lost) + baseLine;
    G0[i] = w * (!bit_set) *(!heap_lost) + baseLine;
    _G0stats += (!bit_set) *(!heap_lost);
    _G1stats += bit_set * (!heap_lost);
  }
  __shared__ uint32_t x[1024];

  // Reduce G0, G1
66
  reduce<uint32_t>(x, _G0stats);
67
68
69
  if(threadIdx.x == 0)
    atomicAdd(stats_G0,  (uint64_cu) x[threadIdx.x]);

70
  reduce<uint32_t>(x, _G1stats);
71
  if(threadIdx.x == 0)
72
    atomicAdd(stats_G1,  (uint64_cu) x[threadIdx.x]);
73
74
}

Tobias Winchen's avatar
Tobias Winchen committed
75

76
__global__ void countBitSet(const uint64_t *sideChannelData, size_t N, size_t
77
    bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
78
    *nBitsSet)
79
{
Tobias Winchen's avatar
Tobias Winchen committed
80
  // really not optimized reduction, but here only trivial array sizes.
81
  // run only in one block!
Tobias Winchen's avatar
Tobias Winchen committed
82
83
  __shared__ uint64_t x[1024];
  uint64_t ls = 0;
Tobias Winchen's avatar
Tobias Winchen committed
84

Tobias Winchen's avatar
Tobias Winchen committed
85
  for (uint64_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
86
87
88
89
       i += blockDim.x * gridDim.x) {
    ls += TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
  }
  x[threadIdx.x] = ls;
Tobias Winchen's avatar
Tobias Winchen committed
90
91
92
93
94
95
96
97

  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
98

Tobias Winchen's avatar
Tobias Winchen committed
99
  if(threadIdx.x == 0)
100
   nBitsSet[0] += x[threadIdx.x];
Tobias Winchen's avatar
Tobias Winchen committed
101
}
102

103

104
105
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
106
107
    const DadaBufferLayout &dadaBufferLayout,
    std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
108
    std::size_t nbits, float input_level, float output_level,
109
    HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout),
110
      _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
111
      _fft_length(fft_length),
112
      _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
113
      _call_count(0), _nsamps_per_heap(4096), _processing_efficiency(0.){
114
115

  // Sanity checks
116
  assert(((_nbits == 12) || (_nbits == 8)));
117
118
119
120
121
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

122
  BOOST_LOG_TRIVIAL(info)
123
      << "Creating new GatedSpectrometer instance with parameters: \n"
124
125
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
126
127
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
128
129
130
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
131

132
133
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
134
135
  assert(selectedBit < 64); // Sanity check of selected bit

136
   _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155

  _nsamps_per_output_spectra = fft_length * naccumulate;
  int nBlocks;
  if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
  { // one buffer block is used for one or multiple output spectra
    size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
    nBlocks = 1;
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  _nsamps_per_output_spectra /  _nsamps_per_buffer;
    // All data in multiple blocks has to be used
    assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
    nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";

156
  _nchans = _fft_length / 2 + 1;
157
  int batch = _nsamps_per_buffer / _fft_length;
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
  int n[] = {static_cast<int>(_fft_length)};
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));
  cufftSetStream(_fft_plan, _proc_stream);

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
174
175
  _raw_voltage_db.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
  _sideChannelData_db.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
176
177
  BOOST_LOG_TRIVIAL(debug) << "  Input voltages size (in 64-bit words): "
                           << _raw_voltage_db.size();
178
179
  _unpacked_voltage_G0.resize(_nsamps_per_buffer);
  _unpacked_voltage_G1.resize(_nsamps_per_buffer);
Tobias Winchen's avatar
Tobias Winchen committed
180
181

  _baseLineN.resize(array_sum_Nthreads);
182
183
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
184
  _channelised_voltage.resize(_nchans * batch);
185
  BOOST_LOG_TRIVIAL(debug) << "  Channelised voltages size: "
186
                           << _channelised_voltage.size();
187
188
189
190
191
  _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2);  // hold on and off spectra to simplify output
  thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
  thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
  BOOST_LOG_TRIVIAL(debug) << "  Powers size: " << _power_db.size() / 2;

192
193
194
195
196
197
198
  _noOfBitSetsIn_G0.resize( batch / (_naccumulate / nBlocks));
  _noOfBitSetsIn_G1.resize( batch / (_naccumulate / nBlocks));
  thrust::fill(_noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G0.b().begin(), _noOfBitSetsIn_G0.b().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G1.b().begin(), _noOfBitSetsIn_G1.b().end(), 0L);
  BOOST_LOG_TRIVIAL(debug) << "  Bit set counter size: " << _noOfBitSetsIn_G0.size();
199

200
201
  // on the host both power are stored in the same data buffer together with
  // the number of bit sets
202
  _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsIn_G0.size());
203
204
205
206
207
208
209

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
210
  _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling,
211
212
213
214
                                          offset, _proc_stream));
} // constructor


215
216
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
217
218
219
220
221
222
223
224
225
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


226
227
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
228
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
      << "nchannels                " << _fft_length << "\n"
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

252
253
254
255
  _handler.init(block);
}


256
257
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
258
    thrust::device_vector<RawVoltageType> const &digitiser_raw,
259
    thrust::device_vector<uint64_t> const &sideChannelData,
260
    thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1) {
261
262
263
264
265
266
267
268
269
270
271
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
    _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
    break;
  case 12:
    _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
272
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
273
274
  psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
  psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
275

276
277
  BOOST_LOG_TRIVIAL(debug) << "Perform gating";

278
279

  for (size_t i = 0; i < noOfBitSetsIn_G0.size(); i++)
280
  { // ToDo: Should be in one kernel call
281
282
283
284
285
286
287
288
289
  gating<<<1024, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      _unpacked_voltage_G0.size() / noOfBitSetsIn_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
      _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()),
      thrust::raw_pointer_cast(noOfBitSetsIn_G0.data() + i),
      thrust::raw_pointer_cast(noOfBitSetsIn_G1.data() + i)
      );
290
  }
291
292
293
294
295

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
  ChannelisedVoltageType *_channelised_voltage_ptr =
296
      thrust::raw_pointer_cast(_channelised_voltage.data());
297
298
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));
299
  _detector->detect(_channelised_voltage, detected, 2, 0);
300
301
302
303
304
305

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

306
  _detector->detect(_channelised_voltage, detected, 2, 1);
Tobias Winchen's avatar
Tobias Winchen committed
307
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
308
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
309
310
311
} // process


312
313
template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
314
315
316
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
317
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
318
319
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
320
                             << _dadaBufferLayout.getBufferSize() << " byte)";
321
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
Tobias Winchen's avatar
Tobias Winchen committed
322
323
    cudaProfilerStop();
    return true;
324
325
  }

326
  // Copy data to device
327
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
328
329
330
331
  _raw_voltage_db.swap();
  _sideChannelData_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
332
                           << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n";
Tobias Winchen's avatar
Tobias Winchen committed
333

334
335
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
336
                                   _dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice,
337
338
339
                                   _h2d_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_sideChannelData_db.a_ptr()),
340
341
      static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
      _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
342
  BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" <<   std::setw(16) << std::setfill('0') << std::hex <<  (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()))[0] << std::dec;
343

344
345
346
347

  if (_call_count == 1) {
    return false;
  }
348
  // process data
349

350
351
352
353
354
355
356
357
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
  bool newBlock = false;
  if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device
  {
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
    newBlock = true;
    _power_db.swap();
358
359
    _noOfBitSetsIn_G0.swap();
    _noOfBitSetsIn_G1.swap();
360
    // move to specific stream!
361
    thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.);
362
363
    thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
    thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
364
  }
365

366
  process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsIn_G0.a(), _noOfBitSetsIn_G1.a());
367
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
368

369
  if ((_call_count == 2) || (!newBlock)) {
370
371
372
    return false;
  }

373
  // copy data to host if block is finished
374
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
375
  _host_power_db.swap();
376

377
  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
378
379
380
381
382
383
384
385
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
    // copy 2x channel data
    CUDA_ERROR_CHECK(
        cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) ,
                        static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans),
                        2 * _nchans * sizeof(IntegratedPowerType),
                        cudaMemcpyDeviceToHost, _d2h_stream));
386

387
388
    // copy noOf bit set data
    CUDA_ERROR_CHECK(
389
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)),
390
391
392
393
394
395
396
          static_cast<void *>(_noOfBitSetsIn_G0.b_ptr() + i ),
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));

    CUDA_ERROR_CHECK(
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)),
          static_cast<void *>(_noOfBitSetsIn_G1.b_ptr() + i ),
397
398
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));
399
  }
400
401

  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
402

403
404
405
406
  if (_call_count == 3) {
    return false;
  }

407
  // calculate off value
408
409
410
  BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:";
  size_t total_samples_lost = 0;
  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
411
412
413
414
415
416
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

    size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
    size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

417
418
419
    size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
    total_samples_lost += samples_lost;

420
    BOOST_LOG_TRIVIAL(info) << "    Heap " << i << ":\n"
421
422
423
      <<"                            Samples with  bit set  : " << *on_values << std::endl
      <<"                            Samples without bit set: " << *off_values << std::endl
      <<"                            Samples lost           : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
424
  }
425
426
427
428
429
  double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size());
  double prev_average = _processing_efficiency / (_call_count- 3 - 1);
  _processing_efficiency += efficiency;
  double average = _processing_efficiency / (_call_count-3);
  BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")";
430
431

  // Wrap in a RawBytes object here;
432
  RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
433
434
                 _host_power_db.size(),
                 _host_power_db.size());
435
436
437
438
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
439
440
441

  _handler(bytes);
  return false; //
442
443
444
445
446
447
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp