GatedSpectrometer.cu 16.3 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
16
17
18
19

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

20
// Reduce thread local vatiable v in shared array x, so that x[0] contains sum
21
template<typename T>
22
__device__ void sum_reduce(T *x, const T &v)
23
24
25
26
27
28
29
30
31
{
  x[threadIdx.x] = v;
  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
32
}
33
34


35
// If one of the side channel items is lost, then both are considered as lost
36
// here
37
__global__ void mergeSideChannels(uint64_t* __restrict__ A, uint64_t*
38
        __restrict__ B, size_t N);
39
40


41
42
43
44
45
46
47
48
49
__global__ void gating(float* __restrict__ G0,
        float* __restrict__ G1,
        const uint64_t* __restrict__ sideChannelData,
        size_t N, size_t heapSize, size_t bitpos,
        size_t noOfSideChannels, size_t selectedSideChannel,
        const float*  __restrict__ _baseLineG0,
        const float*  __restrict__ _baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
50
        uint64_cu* stats_G0, uint64_cu* stats_G1);
51
52
53
54
55
56
57
58
59
60
61


// Updates the baselines of the gates for the polarization set for the next
// block
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
__global__ void update_baselines(float*  __restrict__ baseLineG0,
        float*  __restrict__ baseLineG1,
        float* __restrict__ baseLineNG0,
        float* __restrict__ baseLineNG1,
        uint64_cu* stats_G0, uint64_cu* stats_G1,
62
        size_t N);
63

64

65
66
67
68
69
70
71
72
73
74
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
    const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel,
    std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
    std::size_t nbits, float input_level, float output_level, HandlerType
    &handler) : _dadaBufferLayout(dadaBufferLayout),
    _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
    _fft_length(fft_length), _naccumulate(naccumulate), _nbits(nbits),
    _handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096)
{
75
76

  // Sanity checks
77
  assert(((_nbits == 12) || (_nbits == 8)));
78
79
80
81
82
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

83
  BOOST_LOG_TRIVIAL(info)
84
      << "Creating new GatedSpectrometer instance with parameters: \n"
85
86
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
87
88
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
89
90
91
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
92

93
94
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
95
96
  assert(selectedBit < 64); // Sanity check of selected bit

97
   _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
98
99
100
101
102
103
  _nsamps_per_output_spectra = fft_length * naccumulate;
  if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
  { // one buffer block is used for one or multiple output spectra
    size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
104
    _nBlocks = 1;
105
106
107
108
109
110
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  _nsamps_per_output_spectra /  _nsamps_per_buffer;
    // All data in multiple blocks has to be used
    assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
111
    _nBlocks = N;
112
  }
113
114
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << _nsamps_per_output_spectra <<
      " samples from " << _nBlocks << " into one output spectrum.";
115

116
  _nchans = _fft_length / 2 + 1;
117
  int batch = _nsamps_per_buffer / _fft_length;
118

119
120
121
122
123
124
125
126
127
128
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  int n[] = {static_cast<int>(_fft_length)};
129
130
131
132
133
134
135
136
  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n"
      << "   fft_length = " << _fft_length << "\n"
      << "   n[0] = " << n[0] << "\n"
      << "   _nchans = " << _nchans << "\n"
      << "   batch = " << batch << "\n";


      ;
137
138
139
140
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
141

142
  inputDataStream = new InputType(fft_length, batch, _dadaBufferLayout);
143

144
145
  _unpacked_voltage_G0.resize(_nsamps_per_buffer);
  _unpacked_voltage_G1.resize(_nsamps_per_buffer);
146
147
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
148

149
  outputDataStream = new OutputType(_nchans, batch / (_naccumulate / _nBlocks));
150
151
152
153
154
155
156
157
158
159

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
} // constructor


160
161
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::~GatedSpectrometer() {
162
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
163
  cudaDeviceSynchronize();
164
165
166
167
168
169
170
171
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


172
173
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::init(RawBytes &block) {
174
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
175
176
177
178
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
179
      << "nchannels                " << _fft_length /2 + 1 << "\n"
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

198
199
200
201
  _handler.init(block);
}


202

203
204
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft(
205
206
207
208
209
  PolarizationData &data,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1
        )
{
210
211
212
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
213
    _unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0);
214
215
    break;
  case 12:
216
    _unpacker->unpack<12>(data._raw_voltage.b(), _unpacked_voltage_G0);
217
218
219
220
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
221
222

  // Loop over outputblocks, for case of multiple output blocks per input block
223
224
225
  int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size();

  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
226
  { // ToDo: Should be in one kernel call
227
  gating<<<1024, 1024, 0, _proc_stream>>>(
228
229
230
231
232
233
234
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * step * _nsamps_per_heap),
      thrust::raw_pointer_cast(data._sideChannelData.b().data() + i * step),
      _unpacked_voltage_G0.size() / _noOfBitSetsIn_G0.size(),
      _dadaBufferLayout.getHeapSize(),
      _selectedBit,
      _dadaBufferLayout.getNSideChannels(),
235
      _selectedSideChannel,
236
237
238
239
240
241
      thrust::raw_pointer_cast(data._baseLineG0.data()),
      thrust::raw_pointer_cast(data._baseLineG1.data()),
      thrust::raw_pointer_cast(data._baseLineG0_update.data()),
      thrust::raw_pointer_cast(data._baseLineG1_update.data()),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data() + i),
      thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data() + i)
242
      );
243
  }
244

245
246
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
247
248
249
250
251
252
253
254
255
256
257
    // only few output blocks per input block thus execution on only one thread.
    // Important is that the execution is async on the GPU.
    update_baselines<<<1,1,0, _proc_stream>>>(
        thrust::raw_pointer_cast(data._baseLineG0.data()),
        thrust::raw_pointer_cast(data._baseLineG1.data()),
        thrust::raw_pointer_cast(data._baseLineG0_update.data()),
        thrust::raw_pointer_cast(data._baseLineG1_update.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G0.data()),
        thrust::raw_pointer_cast(_noOfBitSetsIn_G1.data()),
        _noOfBitSetsIn_G0.size()
            );
258

259
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
260
  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
261
262
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
  BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage";
263
264
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
265
266
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
  BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage";
267
  ChannelisedVoltageType *_channelised_voltage_ptr =
268
      thrust::raw_pointer_cast(data._channelised_voltage_G0.data());
269
270
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());

271
272
273
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

274
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
275
276
  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
277
  _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data());
278
279
280
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

281
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
Tobias Winchen's avatar
Tobias Winchen committed
282
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
283
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
284
285
286
} // process


287
288
289
290




291
292
template <class HandlerType, class InputType, class OutputType>
bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes &block) {
Tobias Winchen's avatar
Tobias Winchen committed
293
294
295
296
297
298
299
300
301
302
303
304
305
306
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _dadaBufferLayout.getBufferSize() << " byte)";
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
    cudaProfilerStop();
    return true;
  }

  // Copy data to device
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
307
  inputDataStream->swap();
308
  inputDataStream->getFromBlock(block, _h2d_stream);
309

310

311
312
313
  if (_call_count == 1) {
    return false;
  }
314
  // process data
315

316
317
318
  // check if new outblock is started:  _call_count -1 because this is the block number on the device
  bool newBlock = (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0);

319
320
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
321
  if (newBlock)
322
  {
Tobias Winchen's avatar
Tobias Winchen committed
323
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
324
325
    outputDataStream->swap();
    outputDataStream->reset(_proc_stream);
326
  }
327

328
329
330
331
332
  BOOST_LOG_TRIVIAL(debug) << "Processing block.";
  cudaDeviceSynchronize();
  process(inputDataStream, outputDataStream);
  cudaDeviceSynchronize();
  BOOST_LOG_TRIVIAL(debug) << "Processing block finished.";
333
334
335
336
  /// For one pol input and power out
  /// ToDo: For two pol input and power out
  /// ToDo: For two pol input and stokes out

Tobias Winchen's avatar
Tobias Winchen committed
337

338
  if ((_call_count == 2) || (!newBlock)) {
339
340
341
    return false;
  }

342
    outputDataStream->data2Host(_d2h_stream);
343
344
345
346
  if (_call_count == 3) {
    return false;
  }

347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
//  // calculate off value
//  BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:";
//  size_t total_samples_lost = 0;
//  for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
//  {
//    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
//
//    size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
//    size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
//
//    size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
//    total_samples_lost += samples_lost;
//
//    BOOST_LOG_TRIVIAL(info) << "    Heap " << i << ":\n"
//      <<"                            Samples with  bit set  : " << *on_values << std::endl
//      <<"                            Samples without bit set: " << *off_values << std::endl
//      <<"                            Samples lost           : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
//  }
//  double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size());
//  double prev_average = _processing_efficiency / (_call_count- 3 - 1);
//  _processing_efficiency += efficiency;
//  double average = _processing_efficiency / (_call_count-3);
//  BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")";
//
//  // Wrap in a RawBytes object here;
372
373
374
  RawBytes bytes(reinterpret_cast<char *>(outputDataStream->_host_power.b_ptr()),
                 outputDataStream->_host_power.size(),
                 outputDataStream->_host_power.size());
375
376
377
378
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
379
380
381

  _handler(bytes);
  return false; //
382
383
} // operator ()

384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411


template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(PolarizationData *inputDataStream, GatedPowerSpectrumOutput *outputDataStream)
{
  gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()),
            thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()),
            _nchans,
            inputDataStream->_channelised_voltage_G0.size() / _nchans,
            _naccumulate / _nBlocks,
            1, 0., 1, 0);

  kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
            thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G1.data()),
            thrust::raw_pointer_cast(outputDataStream->G1.data.a().data()),
            _nchans,
            inputDataStream->_channelised_voltage_G1.size() / _nchans,
            _naccumulate / _nBlocks,
            1, 0., 1, 0);

  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
}



412
413
414
415
} // edd
} // effelsberg
} // psrdada_cpp