GatedSpectrometer.cu 19.3 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
16
17
18
19

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

20
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData,
21
                       size_t N, size_t heapSize, size_t bitpos,
22
23
                       size_t noOfSideChannels, size_t selectedSideChannel,
                       const float* __restrict__ _baseLineN, uint64_cu* stats_G0, uint64_cu* stats_G1) {
24
  float baseLine = (*_baseLineN) / N;
25
26
27
28
29

  // statistics values for samopels to G0, G1 
  uint32_t _G0stats = 0;
  uint32_t _G1stats = 0;

Tobias Winchen's avatar
Tobias Winchen committed
30
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
31
       i += blockDim.x * gridDim.x) {
Tobias Winchen's avatar
Tobias Winchen committed
32
    const float w = G0[i] - baseLine;
33
    const uint64_t sideChannelItem =
34
35
36
37
38
39
        sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel]; // Probably not optimal access as
                                              // same data is copied for several
                                              // threads, but maybe efficiently
                                              // handled by cache?

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
    const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
    G1[i] = w * bit_set * (!heap_lost) + baseLine;
    G0[i] = w * (!bit_set) *(!heap_lost) + baseLine;
    _G0stats += (!bit_set) *(!heap_lost);
    _G1stats += bit_set * (!heap_lost);
  }
  __shared__ uint32_t x[1024];

  // Reduce G0, G1
  x[threadIdx.x] = _G0stats;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }

  if(threadIdx.x == 0)
    atomicAdd(stats_G0,  (uint64_cu) x[threadIdx.x]);

  x[threadIdx.x] = _G1stats;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }

  if(threadIdx.x == 0)
  {
    uint64_cu y = x[threadIdx.x];
    atomicAdd(stats_G1, y) ;
75
76
77
  }
}

Tobias Winchen's avatar
Tobias Winchen committed
78

79
__global__ void countBitSet(const uint64_t *sideChannelData, size_t N, size_t
80
    bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
81
    *nBitsSet)
82
{
Tobias Winchen's avatar
Tobias Winchen committed
83
  // really not optimized reduction, but here only trivial array sizes.
84
85
86
  // run only in one block!
  __shared__ size_t x[1024];
  size_t ls = 0;
Tobias Winchen's avatar
Tobias Winchen committed
87

88
89
90
91
92
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
       i += blockDim.x * gridDim.x) {
    ls += TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
  }
  x[threadIdx.x] = ls;
Tobias Winchen's avatar
Tobias Winchen committed
93
94
95
96
97
98
99
100

  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
101

Tobias Winchen's avatar
Tobias Winchen committed
102
  if(threadIdx.x == 0)
103
   nBitsSet[0] += x[threadIdx.x];
Tobias Winchen's avatar
Tobias Winchen committed
104
}
105

106

107
108
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
109
110
    const DadaBufferLayout &dadaBufferLayout,
    std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
111
    std::size_t nbits, float input_level, float output_level,
112
    HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout),
113
      _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
114
      _fft_length(fft_length),
115
      _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
116
      _call_count(0), _nsamps_per_heap(4096), _processing_efficiency(0.){
117
118

  // Sanity checks
119
  assert(((_nbits == 12) || (_nbits == 8)));
120
121
122
123
124
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

125
  BOOST_LOG_TRIVIAL(info)
126
      << "Creating new GatedSpectrometer instance with parameters: \n"
127
128
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
129
130
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
131
132
133
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
134

135
136
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
137
138
  assert(selectedBit < 64); // Sanity check of selected bit

139
   _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158

  _nsamps_per_output_spectra = fft_length * naccumulate;
  int nBlocks;
  if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
  { // one buffer block is used for one or multiple output spectra
    size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
    nBlocks = 1;
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  _nsamps_per_output_spectra /  _nsamps_per_buffer;
    // All data in multiple blocks has to be used
    assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
    nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";

159
  _nchans = _fft_length / 2 + 1;
160
  int batch = _nsamps_per_buffer / _fft_length;
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
  int n[] = {static_cast<int>(_fft_length)};
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));
  cufftSetStream(_fft_plan, _proc_stream);

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
177
178
  _raw_voltage_db.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
  _sideChannelData_db.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
179
180
  BOOST_LOG_TRIVIAL(debug) << "  Input voltages size (in 64-bit words): "
                           << _raw_voltage_db.size();
181
182
  _unpacked_voltage_G0.resize(_nsamps_per_buffer);
  _unpacked_voltage_G1.resize(_nsamps_per_buffer);
Tobias Winchen's avatar
Tobias Winchen committed
183
184

  _baseLineN.resize(array_sum_Nthreads);
185
186
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
187
  _channelised_voltage.resize(_nchans * batch);
188
  BOOST_LOG_TRIVIAL(debug) << "  Channelised voltages size: "
189
                           << _channelised_voltage.size();
190
191
192
193
194
  _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2);  // hold on and off spectra to simplify output
  thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
  thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
  BOOST_LOG_TRIVIAL(debug) << "  Powers size: " << _power_db.size() / 2;

195
196
197
198
199
200
201
  _noOfBitSetsIn_G0.resize( batch / (_naccumulate / nBlocks));
  _noOfBitSetsIn_G1.resize( batch / (_naccumulate / nBlocks));
  thrust::fill(_noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G0.b().begin(), _noOfBitSetsIn_G0.b().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
  thrust::fill(_noOfBitSetsIn_G1.b().begin(), _noOfBitSetsIn_G1.b().end(), 0L);
  BOOST_LOG_TRIVIAL(debug) << "  Bit set counter size: " << _noOfBitSetsIn_G0.size();
202

203
204
  // on the host both power are stored in the same data buffer together with
  // the number of bit sets
205
  _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsIn_G0.size());
206
207
208
209
210
211
212

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
213
  _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling,
214
215
216
217
                                          offset, _proc_stream));
} // constructor


218
219
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
220
221
222
223
224
225
226
227
228
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


229
230
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
231
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
      << "nchannels                " << _fft_length << "\n"
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

255
256
257
258
  _handler.init(block);
}


259
260
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
261
    thrust::device_vector<RawVoltageType> const &digitiser_raw,
262
    thrust::device_vector<uint64_t> const &sideChannelData,
263
    thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0, thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1) {
264
265
266
267
268
269
270
271
272
273
274
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
    _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
    break;
  case 12:
    _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
275
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
276
277
  psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
  psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
278

279
280
  BOOST_LOG_TRIVIAL(debug) << "Perform gating";

281
282

  for (size_t i = 0; i < noOfBitSetsIn_G0.size(); i++)
283
  { // ToDo: Should be in one kernel call
284
285
286
287
288
289
290
291
292
  gating<<<1024, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
      _unpacked_voltage_G0.size() / noOfBitSetsIn_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
      _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()),
      thrust::raw_pointer_cast(noOfBitSetsIn_G0.data() + i),
      thrust::raw_pointer_cast(noOfBitSetsIn_G1.data() + i)
      );
293
  }
294
295
296
297
298

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
  ChannelisedVoltageType *_channelised_voltage_ptr =
299
      thrust::raw_pointer_cast(_channelised_voltage.data());
300
301
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));
302
  _detector->detect(_channelised_voltage, detected, 2, 0);
303
304
305
306
307
308

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

309
  _detector->detect(_channelised_voltage, detected, 2, 1);
Tobias Winchen's avatar
Tobias Winchen committed
310
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
311
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
312
313
314
} // process


315
316
template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
317
318
319
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
320
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
321
322
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
323
                             << _dadaBufferLayout.getBufferSize() << " byte)";
324
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
Tobias Winchen's avatar
Tobias Winchen committed
325
326
    cudaProfilerStop();
    return true;
327
328
  }

329
  // Copy data to device
330
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
331
332
333
334
  _raw_voltage_db.swap();
  _sideChannelData_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
335
                           << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n";
Tobias Winchen's avatar
Tobias Winchen committed
336

337
338
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
339
                                   _dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice,
340
341
342
                                   _h2d_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_sideChannelData_db.a_ptr()),
343
344
      static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
      _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
345
  BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" <<   std::setw(16) << std::setfill('0') << std::hex <<  (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()))[0] << std::dec;
346

347
348
349
350

  if (_call_count == 1) {
    return false;
  }
351
  // process data
352

353
354
355
356
357
358
359
360
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
  bool newBlock = false;
  if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device
  {
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
    newBlock = true;
    _power_db.swap();
361
362
    _noOfBitSetsIn_G0.swap();
    _noOfBitSetsIn_G1.swap();
363
    // move to specific stream!
364
    thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.);
365
366
    thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G0.a().begin(), _noOfBitSetsIn_G0.a().end(), 0L);
    thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsIn_G1.a().begin(), _noOfBitSetsIn_G1.a().end(), 0L);
367
  }
368

369
  process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsIn_G0.a(), _noOfBitSetsIn_G1.a());
370
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
371

372
  if ((_call_count == 2) || (!newBlock)) {
373
374
375
    return false;
  }

376
  // copy data to host if block is finished
377
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
378
  _host_power_db.swap();
379

380
  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
381
382
383
384
385
386
387
388
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
    // copy 2x channel data
    CUDA_ERROR_CHECK(
        cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) ,
                        static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans),
                        2 * _nchans * sizeof(IntegratedPowerType),
                        cudaMemcpyDeviceToHost, _d2h_stream));
389

390
391
    // copy noOf bit set data
    CUDA_ERROR_CHECK(
392
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)),
393
394
395
396
397
398
399
          static_cast<void *>(_noOfBitSetsIn_G0.b_ptr() + i ),
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));

    CUDA_ERROR_CHECK(
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)),
          static_cast<void *>(_noOfBitSetsIn_G1.b_ptr() + i ),
400
401
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));
402
  }
403
404

  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
405

406
407
408
409
  if (_call_count == 3) {
    return false;
  }

410
  // calculate off value
411
412
413
  BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:";
  size_t total_samples_lost = 0;
  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
414
415
416
417
418
419
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

    size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
    size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

420
421
422
423
424
425
426
    size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
    total_samples_lost += samples_lost;

    BOOST_LOG_TRIVIAL(info) << "    Heap " << i << ":\n" 
      <<"                            Samples with  bit set  : " << *on_values << std::endl
      <<"                            Samples without bit set: " << *off_values << std::endl
      <<"                            Samples lost           : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
427
  }
428
429
430
431
432
  double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size());
  double prev_average = _processing_efficiency / (_call_count- 3 - 1);
  _processing_efficiency += efficiency;
  double average = _processing_efficiency / (_call_count-3);
  BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")";
433
434

  // Wrap in a RawBytes object here;
435
  RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
436
437
                 _host_power_db.size(),
                 _host_power_db.size());
438
439
440
441
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
442
443
444

  _handler(bytes);
  return false; //
445
446
447
448
449
450
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp