GatedSpectrometer.cu 18.3 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
16
17
18
19

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

20
// Reduce thread local vatiable v in shared array x, so that x[0] contains sum
21
template<typename T>
22
__device__ void sum_reduce(T *x, const T &v)
23
24
25
26
27
28
29
30
31
{
  x[threadIdx.x] = v;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
32
}
33
34


35
// If one of the side channel items is lost, then both are considered as lost
36
// here
37
38
__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t*
        __restrict__ B, size_t N)
39
40
41
42
43
44
45
46
{
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
       i += blockDim.x * gridDim.x)
  {
    uint64_t v = A[i] || B[i];
    A[i] = v;
    B[i] = v;
  }
47
48
49
}


50
51
52
53
54
55
56
57
58
59
__global__ void gating(float* __restrict__ G0,
        float* __restrict__ G1,
        const uint64_t* __restrict__ sideChannelData,
        size_t N, size_t heapSize, size_t bitpos,
        size_t noOfSideChannels, size_t selectedSideChannel,
        const float*  __restrict__ _baseLineG0,
        const float*  __restrict__ _baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
        uint64_cu* stats_G0, uint64_cu* stats_G1) {
60
  // statistics values for samopels to G0, G1
61
62
63
  uint32_t _G0stats = 0;
  uint32_t _G1stats = 0;

64
65
66
  const float baseLineG0 = _baseLineG0[0];
  const float baseLineG1 = _baseLineG1[0];

67
68
69
  float baselineUpdateG0 = 0;
  float baselineUpdateG1 = 0;

Tobias Winchen's avatar
Tobias Winchen committed
70
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
71
       i += blockDim.x * gridDim.x) {
72
73
    const float v = G0[i];

74
75
    const uint64_t sideChannelItem = sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel];
76

77
78
    const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
    const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
79
80
81
    G1[i] = (v - baseLineG1) * bit_set * (!heap_lost) + baseLineG1;
    G0[i] = (v - baseLineG0) * (!bit_set) *(!heap_lost) + baseLineG0;

82
83
    _G0stats += (!bit_set) *(!heap_lost);
    _G1stats += bit_set * (!heap_lost);
84
85
86

    baselineUpdateG1 += v * bit_set * (!heap_lost);
    baselineUpdateG0 += v * (!bit_set) *(!heap_lost);
87
  }
88

89
90
91
  __shared__ uint32_t x[1024];

  // Reduce G0, G1
92
93
  sum_reduce<uint32_t>(x, _G0stats);
  if(threadIdx.x == 0) {
94
    atomicAdd(stats_G0,  (uint64_cu) x[threadIdx.x]);
95
  }
96
  __syncthreads();
97
98
99

  sum_reduce<uint32_t>(x, _G1stats);
  if(threadIdx.x == 0) {
Tobias Winchen's avatar
Tobias Winchen committed
100
    atomicAdd(stats_G1,  (uint64_cu) x[threadIdx.x]);
101
102
  }
  __syncthreads();
103

104
105
106
  //reuse shared array
  float *y = (float*) x;
  //update the baseline array
107
108
  sum_reduce<float>(y, baselineUpdateG0);
  if(threadIdx.x == 0) {
109
    atomicAdd(baseLineNG0, y[threadIdx.x]);
110
  }
111
  __syncthreads();
112
113
114

  sum_reduce<float>(y, baselineUpdateG1);
  if(threadIdx.x == 0) {
Tobias Winchen's avatar
Tobias Winchen committed
115
    atomicAdd(baseLineNG1, y[threadIdx.x]);
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
  }
  __syncthreads();
}



// Updates the baselines of the gates for the polarization set for the next
// block
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
__global__ void update_baselines(float*  __restrict__ baseLineG0,
        float*  __restrict__ baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
        uint64_cu* stats_G0, uint64_cu* stats_G1,
        size_t N)
{
    size_t NG0 = 0;
    size_t NG1 = 0;

    for (size_t i =0; i < N; i++)
    {
       NG0 += stats_G0[i];
       NG1 += stats_G1[i];
    }

    baseLineG0[0] = baseLineNG0[0] / NG0;
    baseLineG1[0] = baseLineNG1[0] / NG1;
    baseLineNG0[0] = 0;
    baseLineNG1[0] = 0;
Tobias Winchen's avatar
Tobias Winchen committed
146
}
147

148

149

150
151
152
153
154
155
156
157
158
159
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
    const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel,
    std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
    std::size_t nbits, float input_level, float output_level, HandlerType
    &handler) : _dadaBufferLayout(dadaBufferLayout),
    _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
    _fft_length(fft_length), _naccumulate(naccumulate), _nbits(nbits),
    _handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096)
{
160
161

  // Sanity checks
162
  assert(((_nbits == 12) || (_nbits == 8)));
163
164
165
166
167
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

168
  BOOST_LOG_TRIVIAL(info)
169
      << "Creating new GatedSpectrometer instance with parameters: \n"
170
171
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
172
173
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
174
175
176
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
177

178
179
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
180
181
  assert(selectedBit < 64); // Sanity check of selected bit

182
   _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
183
184
185
186
187
188
  _nsamps_per_output_spectra = fft_length * naccumulate;
  if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
  { // one buffer block is used for one or multiple output spectra
    size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
189
    _nBlocks = 1;
190
191
192
193
194
195
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  _nsamps_per_output_spectra /  _nsamps_per_buffer;
    // All data in multiple blocks has to be used
    assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
196
    _nBlocks = N;
197
  }
198
199
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << _nsamps_per_output_spectra <<
      " samples from " << _nBlocks << " into one output spectrum.";
200

201
  _nchans = _fft_length / 2 + 1;
202
  int batch = _nsamps_per_buffer / _fft_length;
203

204
205
206
207
208
209
210
211
212
213
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  int n[] = {static_cast<int>(_fft_length)};
214
215
216
217
218
219
220
221
  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n"
      << "   fft_length = " << _fft_length << "\n"
      << "   n[0] = " << n[0] << "\n"
      << "   _nchans = " << _nchans << "\n"
      << "   batch = " << batch << "\n";


      ;
222
223
224
225
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
226

227
  inputDataStream = new InputType(fft_length, batch, _dadaBufferLayout);
228

229
230
  _unpacked_voltage_G0.resize(_nsamps_per_buffer);
  _unpacked_voltage_G1.resize(_nsamps_per_buffer);
231
232
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
233

234
  outputDataStream = new OutputType(_nchans, batch / (_naccumulate / _nBlocks));
235
236
237
238
239
240
241
242
243
244

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
} // constructor


245
246
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::~GatedSpectrometer() {
247
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
248
  cudaDeviceSynchronize();
249
250
251
252
253
254
255
256
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


257
258
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::init(RawBytes &block) {
259
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
260
261
262
263
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
264
      << "nchannels                " << _fft_length /2 + 1 << "\n"
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

283
284
285
286
  _handler.init(block);
}


287

288
289
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft(
290
291
292
293
294
  PolarizationData &data,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1
        )
{
295
296
297
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
298
    _unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0);
299
300
    break;
  case 12:
301
    _unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0);
302
303
304
305
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
306
307

  // Loop over outputblocks, for case of multiple output blocks per input block
308
309
310
  int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size();

  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
311
  { // ToDo: Should be in one kernel call
312
  gating<<<1024, 1024, 0, _proc_stream>>>(
313
314
315
316
317
318
319
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step),
      _unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(),
      _dadaBufferLayout.getHeapSize(),
      _selectedBit,
      _dadaBufferLayout.getNSideChannels(),
320
      _selectedSideChannel,
321
322
323
324
325
326
      thrust::raw_pointer_cast(data._baseLineG0.data()),
      thrust::raw_pointer_cast(data._baseLineG1.data()),
      thrust::raw_pointer_cast(data._baseLineG0_update.data()),
      thrust::raw_pointer_cast(data._baseLineG1_update.data()),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i)
327
      );
328
  }
329

330
331
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
332
333
334
335
336
337
338
339
340
341
342
    // only few output blocks per input block thus execution on only one thread.
    // Important is that the execution is async on the GPU.
    update_baselines<<<1,1,0, _proc_stream>>>(
        thrust::raw_pointer_cast(data._baseLineG0.data()),
        thrust::raw_pointer_cast(data._baseLineG1.data()),
        thrust::raw_pointer_cast(data._baseLineG0_update.data()),
        thrust::raw_pointer_cast(data._baseLineG1_update.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()),
        _noOfBitSetsIn_G0.size()
            );
343

344
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
345
  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
346
347
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
  BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage";
348
349
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
350
351
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
  BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage";
352
  ChannelisedVoltageType *_channelised_voltage_ptr =
353
      thrust::raw_pointer_cast(data._channelised_voltage_G0.data());
354
355
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());

356
357
358
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

359
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
360
361
  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
362
  _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data());
363
364
365
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

366
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
Tobias Winchen's avatar
Tobias Winchen committed
367
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
368
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
369
370
371
} // process


372
373
374
375




376
377
template <class HandlerType, class InputType, class OutputType>
bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes &block) {
Tobias Winchen's avatar
Tobias Winchen committed
378
379
380
381
382
383
384
385
386
387
388
389
390
391
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _dadaBufferLayout.getBufferSize() << " byte)";
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
    cudaProfilerStop();
    return true;
  }

  // Copy data to device
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
392
  inputDataStream->swap();
393
  inputDataStream->getFromBlock(block, _h2d_stream);
394

395

396
397
398
  if (_call_count == 1) {
    return false;
  }
399
  // process data
400

401
402
403
  // check if new outblock is started:  _call_count -1 because this is the block number on the device
  bool newBlock = (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0);

404
405
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
406
  if (newBlock)
407
  {
Tobias Winchen's avatar
Tobias Winchen committed
408
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
409
410
    outputDataStream->swap();
    outputDataStream->reset(_proc_stream);
411
  }
412

413
414
415
416
417
  BOOST_LOG_TRIVIAL(debug) << "Processing block.";
  cudaDeviceSynchronize();
  process(inputDataStream, outputDataStream);
  cudaDeviceSynchronize();
  BOOST_LOG_TRIVIAL(debug) << "Processing block finished.";
418
419
420
421
  /// For one pol input and power out
  /// ToDo: For two pol input and power out
  /// ToDo: For two pol input and stokes out

Tobias Winchen's avatar
Tobias Winchen committed
422

423
  if ((_call_count == 2) || (!newBlock)) {
424
425
426
    return false;
  }

427
    outputDataStream->data2Host(_d2h_stream);
428
429
430
431
  if (_call_count == 3) {
    return false;
  }

432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
//  // calculate off value
//  BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:";
//  size_t total_samples_lost = 0;
//  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
//  {
//    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
//
//    size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
//    size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
//
//    size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
//    total_samples_lost += samples_lost;
//
//    BOOST_LOG_TRIVIAL(info) << "    Heap " << i << ":\n"
//      <<"                            Samples with  bit set  : " << *on_values << std::endl
//      <<"                            Samples without bit set: " << *off_values << std::endl
//      <<"                            Samples lost           : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
//  }
//  double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size());
//  double prev_average = _processing_efficiency / (_call_count- 3 - 1);
//  _processing_efficiency += efficiency;
//  double average = _processing_efficiency / (_call_count-3);
//  BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")";
//
//  // Wrap in a RawBytes object here;
457
458
459
  RawBytes bytes(reinterpret_cast<char *>(outputDataStream->_host_power.b_ptr()),
                 outputDataStream->_host_power.size(),
                 outputDataStream->_host_power.size());
460
461
462
463
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
464
465
466

  _handler(bytes);
  return false; //
467
468
} // operator ()

469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496


template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(PolarizationData *inputDataStream, GatedPowerSpectrumOutput *outputDataStream)
{
  gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()),
            thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()),
            _nchans,
            inputDataStream->_channelised_voltage_G0.size() / _nchans,
            _naccumulate / _nBlocks,
            1, 0., 1, 0);

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G1.data()),
            thrust::raw_pointer_cast(outputDataStream->G1.data.a().data()),
            _nchans,
            inputDataStream->_channelised_voltage_G1.size() / _nchans,
            _naccumulate / _nBlocks,
            1, 0., 1, 0);

  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
}



497
498
499
500
} // edd
} // effelsberg
} // psrdada_cpp