GatedSpectrometer.cu 20.5 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
#include <typeinfo>
16
17
18
19
20

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

21
// Reduce thread local vatiable v in shared array x, so that x[0] contains sum
22
template<typename T>
23
__device__ void sum_reduce(T *x, const T &v)
24
25
26
27
28
29
30
31
32
{
  x[threadIdx.x] = v;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
33
}
34
35


36
// If one of the side channel items is lost, then both are considered as lost
37
// here
38
__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t*
39
        __restrict__ B, size_t N);
40
41


42
43
44
45
46
47
48
49
50
__global__ void gating(float* __restrict__ G0,
        float* __restrict__ G1,
        const uint64_t* __restrict__ sideChannelData,
        size_t N, size_t heapSize, size_t bitpos,
        size_t noOfSideChannels, size_t selectedSideChannel,
        const float*  __restrict__ _baseLineG0,
        const float*  __restrict__ _baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
51
        uint64_cu* stats_G0, uint64_cu* stats_G1);
52
53
54
55
56
57
58
59
60
61
62


// Updates the baselines of the gates for the polarization set for the next
// block
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
__global__ void update_baselines(float*  __restrict__ baseLineG0,
        float*  __restrict__ baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
        uint64_cu* stats_G0, uint64_cu* stats_G1,
63
        size_t N);
64

65

66
67
68
69
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
    const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel,
    std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
Tobias Winchen's avatar
Tobias Winchen committed
70
    std::size_t nbits, HandlerType
71
72
    &handler) : _dadaBufferLayout(dadaBufferLayout),
    _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
73
    _fft_length(fft_length), _naccumulate(naccumulate),
74
75
    _handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096)
{
76
77

  // Sanity checks
Tobias Winchen's avatar
Tobias Winchen committed
78
  assert(((nbits == 12) || (nbits == 8) || (nbits == 10)));
79
80
81
82
83
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

84
  BOOST_LOG_TRIVIAL(info)
85
      << "Creating new GatedSpectrometer instance with parameters:\n"
86
87
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
88
89
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
90
91
92
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
93

94
95
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
96
97
  assert(selectedBit < 64); // Sanity check of selected bit

98

99
  _nchans = _fft_length / 2 + 1;
100

101
  inputDataStream = new InputType(fft_length, nbits, _dadaBufferLayout);
102

103
104
105
106
  //How many output spectra per input block?
  size_t nsamps_per_output_spectra = fft_length * naccumulate;

  size_t nsamps_per_pol = inputDataStream->getSamplesPerInputPolarization();
107
  BOOST_LOG_TRIVIAL(debug) << "Samples per input polarization: " << nsamps_per_pol;
108
  if (nsamps_per_output_spectra <= nsamps_per_pol)
109
110
  { //
    BOOST_LOG_TRIVIAL(debug) << "One buffer block is used for one or multiple output spectra";
111
112
113
114
115
116
    size_t N = nsamps_per_pol / nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * nsamps_per_output_spectra == nsamps_per_pol);
    _nBlocks = 1;
  }
  else
117
118
  {
    BOOST_LOG_TRIVIAL(debug) << "Multiple blocks are integrated into one output spectrum";
119
120
121
122
123
124
    size_t N =  nsamps_per_output_spectra /  nsamps_per_pol;
    // All data in multiple blocks has to be used
    assert(N * nsamps_per_pol == nsamps_per_output_spectra);
    _nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << nsamps_per_output_spectra <<
125
      " samples from " << _nBlocks << " blocks into one output spectrum.";
126

Tobias Winchen's avatar
Tobias Winchen committed
127

128
129
130
131
  // plan the FFT
  size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
  int batch = nsamps_per_pol / _fft_length;
  int n[] = {static_cast<int>(_fft_length)};
132
  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan:\n"
133
134
135
136
137
138
139
140
      << "   fft_length = " << _fft_length << "\n"
      << "   n[0] = " << n[0] << "\n"
      << "   _nchans = " << _nchans << "\n"
      << "   batch = " << batch << "\n";
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));


141
  // We unpack and fft one pol at a time
142
143
144
  _unpacked_voltage_G0.resize(nsamps_per_pol);
  _unpacked_voltage_G1.resize(nsamps_per_pol);
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size();
145

146
  outputDataStream = new OutputType(_nchans, batch / (_naccumulate / _nBlocks));
147
148
149
150
151
152
153
154
155
156

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
} // constructor


157
158
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::~GatedSpectrometer() {
159
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
160
  if (_fft_plan)
161
    cufftDestroy(_fft_plan);
162
163
164
165

  delete inputDataStream;
  delete outputDataStream;

166
167
168
169
170
171
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


172
173
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::init(RawBytes &block) {
174
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
175
176
177
178
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
179
      << "nchannels                " << _nchans << "\n"
180
181
182
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
183
184
185
186
187
188
189
190
191
192
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8 << "\n"
      << "full_stokes_output       ";
  if (typeid(OutputType) == typeid(GatedFullStokesOutput))
  {
          headerInfo << "yes\n";
  }
  else
  {
          headerInfo << "no\n";
  }
193
194
195
196
197
198
199
200
201
202
203
204
205
206

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

207
208
209
210
  _handler.init(block);
}


211

212
213
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft(
214
215
216
217
218
  PolarizationData &data,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1
        )
{
219
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
220
  switch (data._nbits) {
221
  case 8:
222
    _unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0);
223
    break;
Tobias Winchen's avatar
Tobias Winchen committed
224
225
226
  case 10:
    _unpacker->unpack<10>(data._raw_voltage.b(), _unpacked_voltage_G0);
    break;
227
  case 12:
228
    _unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0);
229
230
231
232
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
233
234

  // Loop over outputblocks, for case of multiple output blocks per input block
235
236
237
  int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size();

  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
238
  { // ToDo: Should be in one kernel call
239
  gating<<<1024, 1024, 0, _proc_stream>>>(
240
241
242
243
244
245
246
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step),
      _unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(),
      _dadaBufferLayout.getHeapSize(),
      _selectedBit,
      _dadaBufferLayout.getNSideChannels(),
247
      _selectedSideChannel,
248
249
250
251
252
253
      thrust::raw_pointer_cast(data._baseLineG0.data()),
      thrust::raw_pointer_cast(data._baseLineG1.data()),
      thrust::raw_pointer_cast(data._baseLineG0_update.data()),
      thrust::raw_pointer_cast(data._baseLineG1_update.data()),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i)
254
      );
255
  }
256

257
258
259
260
261
262
263
264
265
266
267
    // only few output blocks per input block thus execution on only one thread.
    // Important is that the execution is async on the GPU.
    update_baselines<<<1,1,0, _proc_stream>>>(
        thrust::raw_pointer_cast(data._baseLineG0.data()),
        thrust::raw_pointer_cast(data._baseLineG1.data()),
        thrust::raw_pointer_cast(data._baseLineG0_update.data()),
        thrust::raw_pointer_cast(data._baseLineG1_update.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()),
        _noOfBitSetsIn_G0.size()
            );
268
269

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
270
  BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage";
271
272
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
273
  BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage";
274
  ChannelisedVoltageType *_channelised_voltage_ptr =
275
      thrust::raw_pointer_cast(data._channelised_voltage_G0.data());
276

277
278
279
280
281
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
282
  _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data());
283
284
285
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

Tobias Winchen's avatar
Tobias Winchen committed
286
287
//  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
//  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
288
289
290
} // process


291
292
293
294




295
296
template <class HandlerType, class InputType, class OutputType>
bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes &block) {
Tobias Winchen's avatar
Tobias Winchen committed
297
298
299
300
301
302
303
304
305
306
307
308
309
310
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _dadaBufferLayout.getBufferSize() << " byte)";
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
    cudaProfilerStop();
    return true;
  }

  // Copy data to device
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
311
  inputDataStream->swap();
312
  inputDataStream->getFromBlock(block, _h2d_stream);
313

314

315
316
317
  if (_call_count == 1) {
    return false;
  }
318
  // process data
319

320
  // check if new outblock is started:  _call_count -1 because this is the block number on the device
321
  bool newBlock = (((_call_count-1)  % (_nBlocks)) == 0);
322

323
324
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
325
  if (newBlock)
326
  {
Tobias Winchen's avatar
Tobias Winchen committed
327
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
328
    CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
329
    outputDataStream->swap(_proc_stream);
330
  }
331

332
333
  BOOST_LOG_TRIVIAL(debug) << "Processing block.";
  process(inputDataStream, outputDataStream);
334
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
335
  BOOST_LOG_TRIVIAL(debug) << "Processing block finished.";
336
337
338
339
  /// For one pol input and power out
  /// ToDo: For two pol input and power out
  /// ToDo: For two pol input and stokes out

Tobias Winchen's avatar
Tobias Winchen committed
340

341
  if ((_call_count == 2) || (!newBlock)) {
342
343
344
    return false;
  }

345
  outputDataStream->data2Host(_d2h_stream);
346
347
348
349
  if (_call_count == 3) {
    return false;
  }

350
  // Wrap in a RawBytes object here;
351
352
353
  RawBytes bytes(reinterpret_cast<char *>(outputDataStream->_host_power.b_ptr()),
                 outputDataStream->_host_power.size(),
                 outputDataStream->_host_power.size());
354
355
356
357
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
358
359
360

  _handler(bytes);
  return false; //
361
362
} // operator ()

363
364
365


template <class HandlerType, class InputType, class OutputType>
366
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream)
367
368
369
370
371
372
373
{
  gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()),
            thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()),
            _nchans,
Tobias Winchen's avatar
Tobias Winchen committed
374
375
            inputDataStream->_channelised_voltage_G0.size(),
            _naccumulate,
376
377
378
379
380
381
            1, 0., 1, 0);

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G1.data()),
            thrust::raw_pointer_cast(outputDataStream->G1.data.a().data()),
            _nchans,
Tobias Winchen's avatar
Tobias Winchen committed
382
383
            inputDataStream->_channelised_voltage_G1.size(),
            _naccumulate,
384
385
            1, 0., 1, 0);

386
    // count saturated samples
387
    for(size_t output_block_number = 0; output_block_number < outputDataStream->G0._noOfOverflowed.size(); output_block_number++)
388
    {
389
390
        outputDataStream->G0._noOfOverflowed.a().data()[output_block_number] = 0;
        outputDataStream->G1._noOfOverflowed.a().data()[output_block_number] = 0;
391

392
        size_t lostHeaps = 0;
393
        const int heaps_per_output_spectra = inputDataStream->_sideChannelData_h.size() / outputDataStream->G0._noOfOverflowed.size();
394
        for (size_t j = output_block_number * heaps_per_output_spectra ; j < (output_block_number+1) * heaps_per_output_spectra * _dadaBufferLayout.getNSideChannels(); j+=_dadaBufferLayout.getNSideChannels())
395
        {
396
            if (TEST_BIT(inputDataStream->_sideChannelData_h.a().data()[j], 3))
397
            { // heap was lost
398
                lostHeaps++;
399
400
401
402
403
404
                continue;
            }

            uint16_t n_saturated = (inputDataStream->_sideChannelData_h.a().data()[j] >> 32);
            if (TEST_BIT(inputDataStream->_sideChannelData_h.a().data()[j], _selectedBit))
            {
405
                outputDataStream->G1._noOfOverflowed.a().data()[output_block_number] += n_saturated;
406
407
408
            }
            else
            {
409
                outputDataStream->G0._noOfOverflowed.a().data()[output_block_number] += n_saturated;
410
411
            }
        }
412
413
414
415
        BOOST_LOG_TRIVIAL(debug) << "Number of saturated samples G0: " << outputDataStream->G0._noOfOverflowed.a().data()[output_block_number]
                    << ", G1: " << outputDataStream->G1._noOfOverflowed.a().data()[output_block_number];

        BOOST_LOG_TRIVIAL(debug) << "Lost heaps: " << lostHeaps;
416
417
    }

418
419
420
}


421
422
423
424
425
426
427
428
429
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream)
{
  mergeSideChannels<<<1024, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(inputDataStream->polarization0._sideChannelData.a().data()),
          thrust::raw_pointer_cast(inputDataStream->polarization1._sideChannelData.a().data()), inputDataStream->polarization1._sideChannelData.a().size());

  gated_fft(inputDataStream->polarization0, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());
  gated_fft(inputDataStream->polarization1, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());

430
  for(size_t output_block_number = 0; output_block_number < outputDataStream->G0._noOfBitSets.size(); output_block_number++)
Tobias Winchen's avatar
Tobias Winchen committed
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
  {
      size_t input_offset = output_block_number * inputDataStream->polarization0._channelised_voltage_G0.size() / outputDataStream->G0._noOfBitSets.size();
      size_t output_offset = output_block_number * outputDataStream->G0.I.a().size() / outputDataStream->G0._noOfBitSets.size();
      BOOST_LOG_TRIVIAL(debug) << "Accumulating data for output block " << output_block_number << " with input offset " << input_offset << " and output_offset " << output_offset;
      stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
              thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G0.data() + input_offset),
              thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G0.data() + input_offset),
              thrust::raw_pointer_cast(outputDataStream->G0.I.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G0.Q.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G0.U.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G0.V.a().data() + output_offset),
              _nchans, _naccumulate / _nBlocks
              );

      stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
              thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G1.data() + input_offset),
              thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G1.data() + input_offset),
              thrust::raw_pointer_cast(outputDataStream->G1.I.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G1.Q.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G1.U.a().data() + output_offset),
              thrust::raw_pointer_cast(outputDataStream->G1.V.a().data() + output_offset),
              _nchans, _naccumulate / _nBlocks
              );
454

455
456
457
        // count saturated samples
        outputDataStream->G0._noOfOverflowed.a().data()[output_block_number] = 0;
        outputDataStream->G1._noOfOverflowed.a().data()[output_block_number] = 0;
458
459
460

        size_t lostHeaps = 0;

461
        const int heaps_per_output_spectra = inputDataStream->polarization0._sideChannelData_h.size() / outputDataStream->G0._noOfBitSets.size();
462
        for (size_t j = output_block_number * heaps_per_output_spectra ; j < (output_block_number+1) * heaps_per_output_spectra * _dadaBufferLayout.getNSideChannels(); j+=_dadaBufferLayout.getNSideChannels())
463
        {
464
            if (TEST_BIT(inputDataStream->polarization0._sideChannelData_h.a().data()[j], 3) || TEST_BIT(inputDataStream->polarization1._sideChannelData_h.a().data()[j], 3))
465
466
467
468
469
470
471
472
473
474
475
            {
                lostHeaps++;
               // thiss heap was lost, do not count
               continue;
            }

            uint16_t n_saturated = (inputDataStream->polarization0._sideChannelData_h.a().data()[j] >> 32) + (inputDataStream->polarization1._sideChannelData_h.a().data()[j] >> 32);


            if (TEST_BIT(inputDataStream->polarization0._sideChannelData_h.a().data()[j], _selectedBit))
            {
476
                outputDataStream->G1._noOfOverflowed.a().data()[output_block_number] += n_saturated;
477
478
479
            }
            else
            {
480
                outputDataStream->G0._noOfOverflowed.a().data()[output_block_number] += n_saturated;
481
482
483
484
            }

        }

485
486
        BOOST_LOG_TRIVIAL(debug) << "Number of saturated samples G0: " << outputDataStream->G0._noOfOverflowed.a().data()[output_block_number]
                    << ", G1: " << outputDataStream->G1._noOfOverflowed.a().data()[output_block_number];
487
488
489
490

        BOOST_LOG_TRIVIAL(debug) << "Lost heaps: " << lostHeaps;


Tobias Winchen's avatar
Tobias Winchen committed
491
  }
492

Tobias Winchen's avatar
Tobias Winchen committed
493
  //  cudaDeviceSynchronize();
494
}
495

496
497
498
499
} // edd
} // effelsberg
} // psrdada_cpp