GatedSpectrometer.cu 20.1 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
#include <typeinfo>
16
17
18
19
20

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

21
// Reduce thread local vatiable v in shared array x, so that x[0] contains sum
22
template<typename T>
23
__device__ void sum_reduce(T *x, const T &v)
24
25
26
27
28
29
30
31
32
{
  x[threadIdx.x] = v;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
33
}
34
35


36
// If one of the side channel items is lost, then both are considered as lost
37
// here
38
__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t*
39
        __restrict__ B, size_t N);
40
41


42
43
44
45
46
47
48
49
50
__global__ void gating(float* __restrict__ G0,
        float* __restrict__ G1,
        const uint64_t* __restrict__ sideChannelData,
        size_t N, size_t heapSize, size_t bitpos,
        size_t noOfSideChannels, size_t selectedSideChannel,
        const float*  __restrict__ _baseLineG0,
        const float*  __restrict__ _baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
51
        uint64_cu* stats_G0, uint64_cu* stats_G1);
52
53
54
55
56
57
58
59
60
61
62


// Updates the baselines of the gates for the polarization set for the next
// block
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
__global__ void update_baselines(float*  __restrict__ baseLineG0,
        float*  __restrict__ baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
        uint64_cu* stats_G0, uint64_cu* stats_G1,
63
        size_t N);
64

65

66
67
68
69
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
    const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel,
    std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
Tobias Winchen's avatar
Tobias Winchen committed
70
    std::size_t nbits, HandlerType
71
72
    &handler) : _dadaBufferLayout(dadaBufferLayout),
    _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
73
    _fft_length(fft_length), _naccumulate(naccumulate),
74
75
    _handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096)
{
76
77

  // Sanity checks
Tobias Winchen's avatar
Tobias Winchen committed
78
  assert(((nbits == 12) || (nbits == 8) || (nbits == 10)));
79
80
81
82
83
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

84
  BOOST_LOG_TRIVIAL(info)
85
      << "Creating new GatedSpectrometer instance with parameters: \n"
86
87
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
88
89
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
90
91
92
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
93

94
95
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
96
97
  assert(selectedBit < 64); // Sanity check of selected bit

98

99
  _nchans = _fft_length / 2 + 1;
100

101
  inputDataStream = new InputType(fft_length, nbits, _dadaBufferLayout);
102

103
104
105
106
107
108
109
110
111
112
113
114
  //How many output spectra per input block?
  size_t nsamps_per_output_spectra = fft_length * naccumulate;

  size_t nsamps_per_pol = inputDataStream->getSamplesPerInputPolarization();
  if (nsamps_per_output_spectra <= nsamps_per_pol)
  { // one buffer block is used for one or multiple output spectra
    size_t N = nsamps_per_pol / nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * nsamps_per_output_spectra == nsamps_per_pol);
    _nBlocks = 1;
  }
  else
Tobias Winchen's avatar
Tobias Winchen committed
115
  { // multiple blocks are integrated into one output
116
117
118
119
120
121
122
    size_t N =  nsamps_per_output_spectra /  nsamps_per_pol;
    // All data in multiple blocks has to be used
    assert(N * nsamps_per_pol == nsamps_per_output_spectra);
    _nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << nsamps_per_output_spectra <<
      " samples from " << _nBlocks << "blocks into one output spectrum.";
123

Tobias Winchen's avatar
Tobias Winchen committed
124

125
126
127
128
129
130
131
132
133
134
135
136
137
  // plan the FFT
  size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
  int batch = nsamps_per_pol / _fft_length;
  int n[] = {static_cast<int>(_fft_length)};
  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n"
      << "   fft_length = " << _fft_length << "\n"
      << "   n[0] = " << n[0] << "\n"
      << "   _nchans = " << _nchans << "\n"
      << "   batch = " << batch << "\n";
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));


138
  // We unpack and fft one pol at a time
139
140
141
  _unpacked_voltage_G0.resize(nsamps_per_pol);
  _unpacked_voltage_G1.resize(nsamps_per_pol);
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size();
142

143
  outputDataStream = new OutputType(_nchans, batch / (_naccumulate / _nBlocks));
144
145
146
147
148
149
150
151
152
153

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
} // constructor


154
155
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::~GatedSpectrometer() {
156
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
157
  if (_fft_plan)
158
    cufftDestroy(_fft_plan);
159
160
161
162

  delete inputDataStream;
  delete outputDataStream;

163
164
165
166
167
168
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


169
170
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::init(RawBytes &block) {
171
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
172
173
174
175
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
176
      << "nchannels                " << _nchans << "\n"
177
178
179
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
180
181
182
183
184
185
186
187
188
189
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8 << "\n"
      << "full_stokes_output       ";
  if (typeid(OutputType) == typeid(GatedFullStokesOutput))
  {
          headerInfo << "yes\n";
  }
  else
  {
          headerInfo << "no\n";
  }
190
191
192
193
194
195
196
197
198
199
200
201
202
203

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

204
205
206
207
  _handler.init(block);
}


208

209
210
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft(
211
212
213
214
215
  PolarizationData &data,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1
        )
{
216
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
217
  switch (data._nbits) {
218
  case 8:
219
    _unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0);
220
    break;
Tobias Winchen's avatar
Tobias Winchen committed
221
222
223
  case 10:
    _unpacker->unpack<10>(data._raw_voltage.b(), _unpacked_voltage_G0);
    break;
224
  case 12:
225
    _unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0);
226
227
228
229
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
230
231

  // Loop over outputblocks, for case of multiple output blocks per input block
232
233
234
  int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size();

  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
235
  { // ToDo: Should be in one kernel call
236
  gating<<<1024, 1024, 0, _proc_stream>>>(
237
238
239
240
241
242
243
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step),
      _unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(),
      _dadaBufferLayout.getHeapSize(),
      _selectedBit,
      _dadaBufferLayout.getNSideChannels(),
244
      _selectedSideChannel,
245
246
247
248
249
250
      thrust::raw_pointer_cast(data._baseLineG0.data()),
      thrust::raw_pointer_cast(data._baseLineG1.data()),
      thrust::raw_pointer_cast(data._baseLineG0_update.data()),
      thrust::raw_pointer_cast(data._baseLineG1_update.data()),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i)
251
      );
252
  }
253

254
255
256
257
258
259
260
261
262
263
264
    // only few output blocks per input block thus execution on only one thread.
    // Important is that the execution is async on the GPU.
    update_baselines<<<1,1,0, _proc_stream>>>(
        thrust::raw_pointer_cast(data._baseLineG0.data()),
        thrust::raw_pointer_cast(data._baseLineG1.data()),
        thrust::raw_pointer_cast(data._baseLineG0_update.data()),
        thrust::raw_pointer_cast(data._baseLineG1_update.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()),
        _noOfBitSetsIn_G0.size()
            );
265
266

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
267
  BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage";
268
269
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
270
  BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage";
271
  ChannelisedVoltageType *_channelised_voltage_ptr =
272
      thrust::raw_pointer_cast(data._channelised_voltage_G0.data());
273

274
275
276
277
278
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
279
  _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data());
280
281
282
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

Tobias Winchen's avatar
Tobias Winchen committed
283
284
//  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
//  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
285
286
287
} // process


288
289
290
291




292
293
template <class HandlerType, class InputType, class OutputType>
bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes &block) {
Tobias Winchen's avatar
Tobias Winchen committed
294
295
296
297
298
299
300
301
302
303
304
305
306
307
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _dadaBufferLayout.getBufferSize() << " byte)";
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
    cudaProfilerStop();
    return true;
  }

  // Copy data to device
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
308
  inputDataStream->swap();
309
  inputDataStream->getFromBlock(block, _h2d_stream);
310

311

312
313
314
  if (_call_count == 1) {
    return false;
  }
315
  // process data
316

317
  // check if new outblock is started:  _call_count -1 because this is the block number on the device
318
  bool newBlock = (((_call_count-1)  % (_nBlocks)) == 0);
319

320
321
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
322
  if (newBlock)
323
  {
Tobias Winchen's avatar
Tobias Winchen committed
324
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
325
    CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
326
    outputDataStream->swap(_proc_stream);
327
  }
328

329
330
  BOOST_LOG_TRIVIAL(debug) << "Processing block.";
  process(inputDataStream, outputDataStream);
331
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
332
  BOOST_LOG_TRIVIAL(debug) << "Processing block finished.";
333
334
335
336
  /// For one pol input and power out
  /// ToDo: For two pol input and power out
  /// ToDo: For two pol input and stokes out

Tobias Winchen's avatar
Tobias Winchen committed
337

338
  if ((_call_count == 2) || (!newBlock)) {
339
340
341
    return false;
  }

342
  outputDataStream->data2Host(_d2h_stream);
343
344
345
346
  if (_call_count == 3) {
    return false;
  }

347
  // Wrap in a RawBytes object here;
348
349
350
  RawBytes bytes(reinterpret_cast<char *>(outputDataStream->_host_power.b_ptr()),
                 outputDataStream->_host_power.size(),
                 outputDataStream->_host_power.size());
351
352
353
354
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
355
356
357

  _handler(bytes);
  return false; //
358
359
} // operator ()

360
361
362


template <class HandlerType, class InputType, class OutputType>
363
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream)
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
{
  gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()),
            thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()),
            _nchans,
            inputDataStream->_channelised_voltage_G0.size() / _nchans,
            _naccumulate / _nBlocks,
            1, 0., 1, 0);

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G1.data()),
            thrust::raw_pointer_cast(outputDataStream->G1.data.a().data()),
            _nchans,
            inputDataStream->_channelised_voltage_G1.size() / _nchans,
            _naccumulate / _nBlocks,
            1, 0., 1, 0);

383
    // count saturated samples
384
    for(size_t output_block_number = 0; output_block_number <_nBlocks; output_block_number++)
385
386
387
388
389
    {
        outputDataStream->G0._noOfOverflowed.a()[output_block_number] = 0;
        outputDataStream->G1._noOfOverflowed.a()[output_block_number] = 0;

        const int heaps_per_output_spectra = inputDataStream->_sideChannelData_h.size() / _naccumulate / _nBlocks;
390
        for (size_t j = output_block_number * heaps_per_output_spectra ; j < (output_block_number+1) * heaps_per_output_spectra * _dadaBufferLayout.getNSideChannels(); j+=_dadaBufferLayout.getNSideChannels())
391
        {
392
            if (TEST_BIT(inputDataStream->_sideChannelData_h.a().data()[j], 3))
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
            { // heap was lost
                continue;
            }

            uint16_t n_saturated = (inputDataStream->_sideChannelData_h.a().data()[j] >> 32);
            if (TEST_BIT(inputDataStream->_sideChannelData_h.a().data()[j], _selectedBit))
            {
                outputDataStream->G1._noOfOverflowed.a()[output_block_number] += n_saturated;
            }
            else
            {
                outputDataStream->G0._noOfOverflowed.a()[output_block_number] += n_saturated;
            }
        }
        BOOST_LOG_TRIVIAL(debug) << "Number of saturated samples G0: " << outputDataStream->G0._noOfOverflowed.a()[output_block_number]
                    << ", G1: " << outputDataStream->G1._noOfOverflowed.a()[output_block_number];
    }

411
412
413
}


414
415
416
417
418
419
420
421
422
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream)
{
  mergeSideChannels<<<1024, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(inputDataStream->polarization0._sideChannelData.a().data()),
          thrust::raw_pointer_cast(inputDataStream->polarization1._sideChannelData.a().data()), inputDataStream->polarization1._sideChannelData.a().size());

  gated_fft(inputDataStream->polarization0, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());
  gated_fft(inputDataStream->polarization1, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());

423
  for(size_t output_block_number = 0; output_block_number < outputDataStream->G0._noOfBitSets.size(); output_block_number++)
Tobias Winchen's avatar
Tobias Winchen committed
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
  {
      size_t input_offset = output_block_number * inputDataStream->polarization0._channelised_voltage_G0.size() / outputDataStream->G0._noOfBitSets.size();
      size_t output_offset = output_block_number * outputDataStream->G0.I.a().size() / outputDataStream->G0._noOfBitSets.size();
      BOOST_LOG_TRIVIAL(debug) << "Accumulating data for output block " << output_block_number << " with input offset " << input_offset << " and output_offset " << output_offset;
      stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
              thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G0.data() + input_offset),
              thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G0.data() + input_offset),
              thrust::raw_pointer_cast(outputDataStream->G0.I.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G0.Q.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G0.U.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G0.V.a().data() + output_offset),
              _nchans, _naccumulate / _nBlocks
              );

      stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
              thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G1.data() + input_offset),
              thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G1.data() + input_offset),
              thrust::raw_pointer_cast(outputDataStream->G1.I.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G1.Q.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G1.U.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G1.V.a().data() + output_offset),
              _nchans, _naccumulate / _nBlocks
              );
447
448
449
450
451
452
453
454

      // count saturated samples
        outputDataStream->G0._noOfOverflowed.a()[output_block_number] = 0;
        outputDataStream->G1._noOfOverflowed.a()[output_block_number] = 0;

        size_t lostHeaps = 0;

        const int heaps_per_output_spectra = inputDataStream->polarization0._sideChannelData_h.size() / _naccumulate / _nBlocks;
455
        for (size_t j = output_block_number * heaps_per_output_spectra ; j < (output_block_number+1) * heaps_per_output_spectra * _dadaBufferLayout.getNSideChannels(); j+=_dadaBufferLayout.getNSideChannels())
456
        {
457
            if (TEST_BIT(inputDataStream->polarization0._sideChannelData_h.a().data()[j], 3) || TEST_BIT(inputDataStream->polarization1._sideChannelData_h.a().data()[j], 3))
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
            {
                lostHeaps++;
               // thiss heap was lost, do not count
               continue;
            }

            uint16_t n_saturated = (inputDataStream->polarization0._sideChannelData_h.a().data()[j] >> 32) + (inputDataStream->polarization1._sideChannelData_h.a().data()[j] >> 32);


            if (TEST_BIT(inputDataStream->polarization0._sideChannelData_h.a().data()[j], _selectedBit))
            {
                outputDataStream->G1._noOfOverflowed.a()[output_block_number] += n_saturated;
            }
            else
            {
                outputDataStream->G0._noOfOverflowed.a()[output_block_number] += n_saturated;
            }

        }

        BOOST_LOG_TRIVIAL(debug) << "Number of saturated samples G0: " << outputDataStream->G0._noOfOverflowed.a()[output_block_number]
                    << ", G1: " << outputDataStream->G1._noOfOverflowed.a()[output_block_number];

        BOOST_LOG_TRIVIAL(debug) << "Lost heaps: " << lostHeaps;


Tobias Winchen's avatar
Tobias Winchen committed
484
  }
485

Tobias Winchen's avatar
Tobias Winchen committed
486
  //  cudaDeviceSynchronize();
487
}
488

489
490
491
492
} // edd
} // effelsberg
} // psrdada_cpp