GatedSpectrometer.cu 17.5 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
#include <typeinfo>
16
17
18
19
20

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

21
// Reduce thread local vatiable v in shared array x, so that x[0] contains sum
22
template<typename T>
23
__device__ void sum_reduce(T *x, const T &v)
24
25
26
27
28
29
30
31
32
{
  x[threadIdx.x] = v;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
33
}
34
35


36
// If one of the side channel items is lost, then both are considered as lost
37
// here
38
__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t*
39
        __restrict__ B, size_t N);
40
41


42
43
44
45
46
47
48
49
50
__global__ void gating(float* __restrict__ G0,
        float* __restrict__ G1,
        const uint64_t* __restrict__ sideChannelData,
        size_t N, size_t heapSize, size_t bitpos,
        size_t noOfSideChannels, size_t selectedSideChannel,
        const float*  __restrict__ _baseLineG0,
        const float*  __restrict__ _baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
51
        uint64_cu* stats_G0, uint64_cu* stats_G1);
52
53
54
55
56
57
58
59
60
61
62


// Updates the baselines of the gates for the polarization set for the next
// block
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
__global__ void update_baselines(float*  __restrict__ baseLineG0,
        float*  __restrict__ baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
        uint64_cu* stats_G0, uint64_cu* stats_G1,
63
        size_t N);
64

65

66
67
68
69
70
71
72
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
    const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel,
    std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
    std::size_t nbits, float input_level, float output_level, HandlerType
    &handler) : _dadaBufferLayout(dadaBufferLayout),
    _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
73
    _fft_length(fft_length), _naccumulate(naccumulate),
74
75
    _handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096)
{
76
77

  // Sanity checks
78
  assert(((nbits == 12) || (nbits == 8)));
79
80
81
82
83
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

84
  BOOST_LOG_TRIVIAL(info)
85
      << "Creating new GatedSpectrometer instance with parameters: \n"
86
87
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
88
89
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
90
91
92
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
93

94
95
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
96
97
  assert(selectedBit < 64); // Sanity check of selected bit

98

99
  _nchans = _fft_length / 2 + 1;
100

101
  // Calculate the scaling parameters for 8 bit output
102
103
104
105
106
107
108
109
110
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

111
  inputDataStream = new InputType(fft_length, nbits, _dadaBufferLayout);
112

113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
  //How many output spectra per input block?
  size_t nsamps_per_output_spectra = fft_length * naccumulate;

  size_t nsamps_per_pol = inputDataStream->getSamplesPerInputPolarization();
  if (nsamps_per_output_spectra <= nsamps_per_pol)
  { // one buffer block is used for one or multiple output spectra
    size_t N = nsamps_per_pol / nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * nsamps_per_output_spectra == nsamps_per_pol);
    _nBlocks = 1;
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  nsamps_per_output_spectra /  nsamps_per_pol;
    // All data in multiple blocks has to be used
    assert(N * nsamps_per_pol == nsamps_per_output_spectra);
    _nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << nsamps_per_output_spectra <<
      " samples from " << _nBlocks << "blocks into one output spectrum.";
133

134
135
136
137
138
139
140
141
142
143
144
145
146
  // plan the FFT
  size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
  int batch = nsamps_per_pol / _fft_length;
  int n[] = {static_cast<int>(_fft_length)};
  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n"
      << "   fft_length = " << _fft_length << "\n"
      << "   n[0] = " << n[0] << "\n"
      << "   _nchans = " << _nchans << "\n"
      << "   batch = " << batch << "\n";
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));


147
  // We unpack and fft one pol at a time
148
149
150
  _unpacked_voltage_G0.resize(nsamps_per_pol);
  _unpacked_voltage_G1.resize(nsamps_per_pol);
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size();
151

152
  outputDataStream = new OutputType(_nchans, batch / (_naccumulate / _nBlocks));
153
154
155
156
157
158
159
160
161
162

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
} // constructor


163
164
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::~GatedSpectrometer() {
165
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
166
  if (_fft_plan)
167
    cufftDestroy(_fft_plan);
168
169
170
171

  delete inputDataStream;
  delete outputDataStream;

172
173
174
175
176
177
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


178
179
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::init(RawBytes &block) {
180
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
181
182
183
184
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
185
      << "nchannels                " << _nchans << "\n"
186
187
188
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
189
190
191
192
193
194
195
196
197
198
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8 << "\n"
      << "full_stokes_output       ";
  if (typeid(OutputType) == typeid(GatedFullStokesOutput))
  {
          headerInfo << "yes\n";
  }
  else
  {
          headerInfo << "no\n";
  }
199
200
201
202
203
204
205
206
207
208
209
210
211
212

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

213
214
215
216
  _handler.init(block);
}


217

218
219
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft(
220
221
222
223
224
  PolarizationData &data,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1
        )
{
225
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
226
  switch (data._nbits) {
227
  case 8:
228
    _unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0);
229
230
    break;
  case 12:
231
    _unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0);
232
233
234
235
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
236
237

  // Loop over outputblocks, for case of multiple output blocks per input block
238
239
240
  int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size();

  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
241
  { // ToDo: Should be in one kernel call
242
  gating<<<1024, 1024, 0, _proc_stream>>>(
243
244
245
246
247
248
249
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step),
      _unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(),
      _dadaBufferLayout.getHeapSize(),
      _selectedBit,
      _dadaBufferLayout.getNSideChannels(),
250
      _selectedSideChannel,
251
252
253
254
255
256
      thrust::raw_pointer_cast(data._baseLineG0.data()),
      thrust::raw_pointer_cast(data._baseLineG1.data()),
      thrust::raw_pointer_cast(data._baseLineG0_update.data()),
      thrust::raw_pointer_cast(data._baseLineG1_update.data()),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i)
257
      );
258
  }
259

260
261
262
263
264
265
266
267
268
269
270
    // only few output blocks per input block thus execution on only one thread.
    // Important is that the execution is async on the GPU.
    update_baselines<<<1,1,0, _proc_stream>>>(
        thrust::raw_pointer_cast(data._baseLineG0.data()),
        thrust::raw_pointer_cast(data._baseLineG1.data()),
        thrust::raw_pointer_cast(data._baseLineG0_update.data()),
        thrust::raw_pointer_cast(data._baseLineG1_update.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()),
        _noOfBitSetsIn_G0.size()
            );
271
272

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
273
  BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage";
274
275
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
276
  BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage";
277
  ChannelisedVoltageType *_channelised_voltage_ptr =
278
      thrust::raw_pointer_cast(data._channelised_voltage_G0.data());
279

280
281
282
283
284
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
285
  _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data());
286
287
288
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

Tobias Winchen's avatar
Tobias Winchen committed
289
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
290
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
291
292
293
} // process


294
295
296
297




298
299
template <class HandlerType, class InputType, class OutputType>
bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes &block) {
Tobias Winchen's avatar
Tobias Winchen committed
300
301
302
303
304
305
306
307
308
309
310
311
312
313
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _dadaBufferLayout.getBufferSize() << " byte)";
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
    cudaProfilerStop();
    return true;
  }

  // Copy data to device
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
314
  inputDataStream->swap();
315
  inputDataStream->getFromBlock(block, _h2d_stream);
316

317

318
319
320
  if (_call_count == 1) {
    return false;
  }
321
  // process data
322

323
  // check if new outblock is started:  _call_count -1 because this is the block number on the device
324
  bool newBlock = (((_call_count-1)  % (_nBlocks)) == 0);
325

326
327
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
328
  if (newBlock)
329
  {
Tobias Winchen's avatar
Tobias Winchen committed
330
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
331
    CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
332
    outputDataStream->swap(_proc_stream);
333
  }
334

335
336
  BOOST_LOG_TRIVIAL(debug) << "Processing block.";
  process(inputDataStream, outputDataStream);
337
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
338
  BOOST_LOG_TRIVIAL(debug) << "Processing block finished.";
339
340
341
342
  /// For one pol input and power out
  /// ToDo: For two pol input and power out
  /// ToDo: For two pol input and stokes out

Tobias Winchen's avatar
Tobias Winchen committed
343

344
  if ((_call_count == 2) || (!newBlock)) {
345
346
347
    return false;
  }

348
  outputDataStream->data2Host(_d2h_stream);
349
350
351
352
  if (_call_count == 3) {
    return false;
  }

353
  // Wrap in a RawBytes object here;
354
355
356
  RawBytes bytes(reinterpret_cast<char *>(outputDataStream->_host_power.b_ptr()),
                 outputDataStream->_host_power.size(),
                 outputDataStream->_host_power.size());
357
358
359
360
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
361
362
363

  _handler(bytes);
  return false; //
364
365
} // operator ()

366
367
368


template <class HandlerType, class InputType, class OutputType>
369
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream)
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
{
  gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()),
            thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()),
            _nchans,
            inputDataStream->_channelised_voltage_G0.size() / _nchans,
            _naccumulate / _nBlocks,
            1, 0., 1, 0);

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G1.data()),
            thrust::raw_pointer_cast(outputDataStream->G1.data.a().data()),
            _nchans,
            inputDataStream->_channelised_voltage_G1.size() / _nchans,
            _naccumulate / _nBlocks,
            1, 0., 1, 0);

}


392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream)
{
  mergeSideChannels<<<1024, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(inputDataStream->polarization0._sideChannelData.a().data()),
          thrust::raw_pointer_cast(inputDataStream->polarization1._sideChannelData.a().data()), inputDataStream->polarization1._sideChannelData.a().size());

  gated_fft(inputDataStream->polarization0, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());
  gated_fft(inputDataStream->polarization1, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());

  stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
          thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G0.data()),
          thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G0.data()),
          thrust::raw_pointer_cast(outputDataStream->G0.I.a().data()),
          thrust::raw_pointer_cast(outputDataStream->G0.Q.a().data()),
          thrust::raw_pointer_cast(outputDataStream->G0.U.a().data()),
          thrust::raw_pointer_cast(outputDataStream->G0.V.a().data()),
408
          _nchans, _naccumulate / _nBlocks
409
410
411
412
413
414
415
416
417
          );

  stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
          thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G1.data()),
          thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G1.data()),
          thrust::raw_pointer_cast(outputDataStream->G1.I.a().data()),
          thrust::raw_pointer_cast(outputDataStream->G1.Q.a().data()),
          thrust::raw_pointer_cast(outputDataStream->G1.U.a().data()),
          thrust::raw_pointer_cast(outputDataStream->G1.V.a().data()),
418
          _nchans, _naccumulate / _nBlocks
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
          );

  //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G0.I.a().begin(), outputDataStream->G0.I.a().end(), _call_count);
  //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G0.Q.a().begin(), outputDataStream->G0.Q.a().end(), _call_count);
  //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G0.U.a().begin(), outputDataStream->G0.U.a().end(), _call_count);
  //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G0.V.a().begin(), outputDataStream->G0.V.a().end(), _call_count);


 // thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G1.I.a().begin(), outputDataStream->G1.I.a().end(), _call_count);
  //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G1.Q.a().begin(), outputDataStream->G1.Q.a().end(), _call_count);
  //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G1.U.a().begin(), outputDataStream->G1.U.a().end(), _call_count);
  //thrust::fill(thrust::cuda::par.on(_proc_stream),outputDataStream->G1.V.a().begin(), outputDataStream->G1.V.a().end(), _call_count);

    cudaDeviceSynchronize();
}
434

435
436
437
438
} // edd
} // effelsberg
} // psrdada_cpp