GatedSpectrometer.cu 19.4 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
16
17
18
19

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
template<typename T>
__device__ void reduce(T *x, const T &v)
{
  x[threadIdx.x] = v;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }


}


36
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData,
37
                       size_t N, size_t heapSize, size_t bitpos,
38
                       size_t noOfSideChannels, size_t selectedSideChannel,
39
40
41
42
43
44
45
                       const float  baseLineG0,
                       const float  baseLineG1,
                       float* __restrict__ baseLineNG0,
                       float* __restrict__ baseLineNG1,
                       uint64_cu* stats_G0, uint64_cu* stats_G1) {
//  float baseLineG0 = (*_baseLineNG0) / N;
 // float baseLineG1 = (*_baseLineNG1) / N;
46

47
  // statistics values for samopels to G0, G1
48
49
50
  uint32_t _G0stats = 0;
  uint32_t _G1stats = 0;

51
52
53
  float baselineUpdateG0 = 0;
  float baselineUpdateG1 = 0;

Tobias Winchen's avatar
Tobias Winchen committed
54
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
55
       i += blockDim.x * gridDim.x) {
56
57
    const float v = G0[i];

58
    const uint64_t sideChannelItem =
59
60
61
62
63
64
        sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel]; // Probably not optimal access as
                                              // same data is copied for several
                                              // threads, but maybe efficiently
                                              // handled by cache?

65
66
    const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
    const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
67
68
69
    G1[i] = (v - baseLineG1) * bit_set * (!heap_lost) + baseLineG1;
    G0[i] = (v - baseLineG0) * (!bit_set) *(!heap_lost) + baseLineG0;

70
71
    _G0stats += (!bit_set) *(!heap_lost);
    _G1stats += bit_set * (!heap_lost);
72
73
74

    baselineUpdateG1 += v * bit_set * (!heap_lost);
    baselineUpdateG0 += v * (!bit_set) *(!heap_lost);
75
76
77
78
  }
  __shared__ uint32_t x[1024];

  // Reduce G0, G1
79
  reduce<uint32_t>(x, _G0stats);
80
81
82
  if(threadIdx.x == 0)
    atomicAdd(stats_G0,  (uint64_cu) x[threadIdx.x]);

83
  __syncthreads();
84
  reduce<uint32_t>(x, _G1stats);
85
  if(threadIdx.x == 0)
86
    atomicAdd(stats_G1,  (uint64_cu) x[threadIdx.x]);
87

88
89
  //reuse shared array
  float *y = (float*) x;
Tobias Winchen's avatar
Tobias Winchen committed
90

91
92
93
94
  //update the baseline array
  reduce<float>(y, baselineUpdateG0);
  if(threadIdx.x == 0)
    atomicAdd(baseLineNG0, y[threadIdx.x]);
Tobias Winchen's avatar
Tobias Winchen committed
95
96

  __syncthreads();
97
  reduce<float>(y, baselineUpdateG1);
Tobias Winchen's avatar
Tobias Winchen committed
98
  if(threadIdx.x == 0)
99
    atomicAdd(baseLineNG1, y[threadIdx.x]);
Tobias Winchen's avatar
Tobias Winchen committed
100
}
101

102

103

104
105
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
106
107
    const DadaBufferLayout &dadaBufferLayout,
    std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
108
    std::size_t nbits, float input_level, float output_level,
109
    HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout),
110
      _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
111
      _fft_length(fft_length),
112
      _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
113
      _call_count(0), _nsamps_per_heap(4096), _processing_efficiency(0.){
114
115

  // Sanity checks
116
  assert(((_nbits == 12) || (_nbits == 8)));
117
118
119
120
121
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

122
  BOOST_LOG_TRIVIAL(info)
123
      << "Creating new GatedSpectrometer instance with parameters: \n"
124
125
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
126
127
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
128
129
130
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
131

132
133
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
134
135
  assert(selectedBit < 64); // Sanity check of selected bit

136
   _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155

  _nsamps_per_output_spectra = fft_length * naccumulate;
  int nBlocks;
  if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
  { // one buffer block is used for one or multiple output spectra
    size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
    nBlocks = 1;
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  _nsamps_per_output_spectra /  _nsamps_per_buffer;
    // All data in multiple blocks has to be used
    assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
    nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";

156
  _nchans = _fft_length / 2 + 1;
157
  int batch = _nsamps_per_buffer / _fft_length;
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
  int n[] = {static_cast<int>(_fft_length)};
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));
  cufftSetStream(_fft_plan, _proc_stream);

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
174
175
  _raw_voltage_db.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
  _sideChannelData_db.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
176
177
  BOOST_LOG_TRIVIAL(debug) << "  Input voltages size (in 64-bit words): "
                           << _raw_voltage_db.size();
178
179
  _unpacked_voltage_G0.resize(_nsamps_per_buffer);
  _unpacked_voltage_G1.resize(_nsamps_per_buffer);
Tobias Winchen's avatar
Tobias Winchen committed
180

181
182
  _baseLineNG0.resize(1);
  _baseLineNG1.resize(1);
183
184
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
185
  _channelised_voltage.resize(_nchans * batch);
186
  BOOST_LOG_TRIVIAL(debug) << "  Channelised voltages size: "
187
                           << _channelised_voltage.size();
188
189
190
191
192
  _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2);  // hold on and off spectra to simplify output
  thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
  thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
  BOOST_LOG_TRIVIAL(debug) << "  Powers size: " << _power_db.size() / 2;

193
194
195
196
197
198
199
  _noOfBitSetsIn_G0.resize( batch / (_naccumulate / nBlocks));
  _noOfBitSetsIn_G1.resize( batch / (_naccumulate / nBlocks));
  thrust::fill(_noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G0.b().begin(), _noOfBitSetsIn_G0.b().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G1.b().begin(), _noOfBitSetsIn_G1.b().end(), 0L);
  BOOST_LOG_TRIVIAL(debug) << "  Bit set counter size: " << _noOfBitSetsIn_G0.size();
200

201
202
  // on the host both power are stored in the same data buffer together with
  // the number of bit sets
203
  _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsIn_G0.size());
204
205
206
207
208
209
210

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
211
  _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling,
212
213
214
215
                                          offset, _proc_stream));
} // constructor


216
217
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
218
219
220
221
222
223
224
225
226
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


227
228
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
229
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
      << "nchannels                " << _fft_length << "\n"
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

253
254
255
256
  _handler.init(block);
}


257
258
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
259
    thrust::device_vector<RawVoltageType> const &digitiser_raw,
260
    thrust::device_vector<uint64_t> const &sideChannelData,
261
    thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1) {
262
263
264
265
266
267
268
269
270
271
272
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
    _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
    break;
  case 12:
    _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
273
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
274
  //calculate baseline from previos block
275

276
277
  BOOST_LOG_TRIVIAL(debug) << "Perform gating";

278
279
  float baseLineG0 = _baseLineNG0[0];
  float baseLineG1 = _baseLineNG1[0];
280

281
282
283
284
  uint64_t NG0 = 0;
  uint64_t NG1 = 0;

  // Loop over outputblocks, for case of multiple output blocks per input block
285
  for (size_t i = 0; i < noOfBitSetsIn_G0.size(); i++)
286
  { // ToDo: Should be in one kernel call
287
288
289
290
291
  gating<<<1024, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      _unpacked_voltage_G0.size() / noOfBitSetsIn_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
292
293
294
295
      _selectedSideChannel,
      baseLineG0, baseLineG1,
      thrust::raw_pointer_cast(_baseLineNG0.data()),
      thrust::raw_pointer_cast(_baseLineNG1.data()),
296
297
298
      thrust::raw_pointer_cast(noOfBitSetsIn_G0.data() + i),
      thrust::raw_pointer_cast(noOfBitSetsIn_G1.data() + i)
      );
299
300
    NG0 += noOfBitSetsIn_G0[i];
    NG1 += noOfBitSetsIn_G1[i];
301
  }
302
303
304
305
  _baseLineNG0[0] /= NG0;
  _baseLineNG1[0] /= NG1;
  BOOST_LOG_TRIVIAL(debug) << "Updating Baselines\n G0: " << baseLineG0 << " -> " << _baseLineNG0[0] << ", " << baseLineG1 << " -> " << _baseLineNG1[0] ;

306
307
308
309
310

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
  ChannelisedVoltageType *_channelised_voltage_ptr =
311
      thrust::raw_pointer_cast(_channelised_voltage.data());
312
313
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));
314
  _detector->detect(_channelised_voltage, detected, 2, 0);
315
316
317
318
319
320

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

321
  _detector->detect(_channelised_voltage, detected, 2, 1);
Tobias Winchen's avatar
Tobias Winchen committed
322
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
323
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
324
325
326
} // process


327
328
template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
329
330
331
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
332
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
333
334
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
335
                             << _dadaBufferLayout.getBufferSize() << " byte)";
336
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
Tobias Winchen's avatar
Tobias Winchen committed
337
338
    cudaProfilerStop();
    return true;
339
340
  }

341
  // Copy data to device
342
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
343
344
345
346
  _raw_voltage_db.swap();
  _sideChannelData_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
347
                           << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n";
Tobias Winchen's avatar
Tobias Winchen committed
348

349
350
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
351
                                   _dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice,
352
353
354
                                   _h2d_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_sideChannelData_db.a_ptr()),
355
356
      static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
      _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
357
358
359
360
361
  BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" <<   std::setw(16)
      << std::setfill('0') << std::hex <<
      (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData()
                                   + _dadaBufferLayout.sizeOfGap()))[0] <<
      std::dec;
362

363
364
365
366

  if (_call_count == 1) {
    return false;
  }
367
  // process data
368

369
370
371
372
373
374
375
376
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
  bool newBlock = false;
  if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device
  {
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
    newBlock = true;
    _power_db.swap();
377
378
    _noOfBitSetsIn_G0.swap();
    _noOfBitSetsIn_G1.swap();
379
    // move to specific stream!
380
    thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.);
381
382
    thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
    thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
383
  }
384

385
  process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsIn_G0.a(), _noOfBitSetsIn_G1.a());
386
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
387

388
  if ((_call_count == 2) || (!newBlock)) {
389
390
391
    return false;
  }

392
  // copy data to host if block is finished
393
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
394
  _host_power_db.swap();
395

396
  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
397
398
399
400
401
402
403
404
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
    // copy 2x channel data
    CUDA_ERROR_CHECK(
        cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) ,
                        static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans),
                        2 * _nchans * sizeof(IntegratedPowerType),
                        cudaMemcpyDeviceToHost, _d2h_stream));
405

406
407
    // copy noOf bit set data
    CUDA_ERROR_CHECK(
408
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)),
409
410
411
412
413
414
415
          static_cast<void *>(_noOfBitSetsIn_G0.b_ptr() + i ),
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));

    CUDA_ERROR_CHECK(
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)),
          static_cast<void *>(_noOfBitSetsIn_G1.b_ptr() + i ),
416
417
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));
418
  }
419
420

  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
421

422
423
424
425
  if (_call_count == 3) {
    return false;
  }

426
  // calculate off value
427
428
429
  BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:";
  size_t total_samples_lost = 0;
  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
430
431
432
433
434
435
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

    size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
    size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

436
437
438
    size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
    total_samples_lost += samples_lost;

439
    BOOST_LOG_TRIVIAL(info) << "    Heap " << i << ":\n"
440
441
442
      <<"                            Samples with  bit set  : " << *on_values << std::endl
      <<"                            Samples without bit set: " << *off_values << std::endl
      <<"                            Samples lost           : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
443
  }
444
445
446
447
448
  double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size());
  double prev_average = _processing_efficiency / (_call_count- 3 - 1);
  _processing_efficiency += efficiency;
  double average = _processing_efficiency / (_call_count-3);
  BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")";
449
450

  // Wrap in a RawBytes object here;
451
  RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
452
453
                 _host_power_db.size(),
                 _host_power_db.size());
454
455
456
457
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
458
459
460

  _handler(bytes);
  return false; //
461
462
463
464
465
466
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp