GatedSpectrometer.cu 17.8 KB
Newer Older
1
2
3
4
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
5

6
#include <cuda.h>
7
#include <cuda_profiler_api.h>
8
#include <thrust/system/cuda/execution_policy.h>
9
10

#include <iostream>
11
12
#include <cstring>
#include <sstream>
13
14
15
16
17

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

Tobias Winchen's avatar
Tobias Winchen committed
18
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const int64_t* __restrict__ sideChannelData,
19
                       size_t N, size_t heapSize, size_t bitpos,
Tobias Winchen's avatar
Tobias Winchen committed
20
                       size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) {
21
  float baseLine = (*_baseLineN) / N;
Tobias Winchen's avatar
Tobias Winchen committed
22
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
23
       i += blockDim.x * gridDim.x) {
Tobias Winchen's avatar
Tobias Winchen committed
24
    const float w = G0[i] - baseLine;
25
26
27
28
29
30
31
32
    const int64_t sideChannelItem =
        sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel]; // Probably not optimal access as
                                              // same data is copied for several
                                              // threads, but maybe efficiently
                                              // handled by cache?

    const int bit_set = TEST_BIT(sideChannelItem, bitpos);
Tobias Winchen's avatar
Tobias Winchen committed
33
34
    G1[i] = w * bit_set + baseLine;
    G0[i] = w * (!bit_set) + baseLine;
35
36
37
  }
}

Tobias Winchen's avatar
Tobias Winchen committed
38

39
__global__ void countBitSet(const int64_t *sideChannelData, size_t N, size_t
40
    bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
41
    *nBitsSet)
42
{
Tobias Winchen's avatar
Tobias Winchen committed
43
44
  // really not optimized reduction, but here only trivial array sizes.
  int i = blockIdx.x * blockDim.x + threadIdx.x;
45
  __shared__ unsigned int x[256];
Tobias Winchen's avatar
Tobias Winchen committed
46
47

  if (i == 0)
48
    nBitsSet[0] = 0;
Tobias Winchen's avatar
Tobias Winchen committed
49

50
  if (i * noOfSideChannels + selectedSideChannel < N)
Tobias Winchen's avatar
Tobias Winchen committed
51
52
53
54
55
56
57
58
59
60
61
    x[threadIdx.x] = TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
  else
    x[threadIdx.x] = 0;
  __syncthreads();

  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
62

Tobias Winchen's avatar
Tobias Winchen committed
63
  if(threadIdx.x == 0)
64
   nBitsSet[0] += x[threadIdx.x];
Tobias Winchen's avatar
Tobias Winchen committed
65
}
66

67

Tobias Winchen's avatar
Tobias Winchen committed
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// blocksize for the array sum kernel
#define array_sum_Nthreads 1024

__global__ void array_sum(float *in, size_t N, float *out) {
  extern __shared__ float data[];

  size_t tid = threadIdx.x;

  float ls = 0;

  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
       i += blockDim.x * gridDim.x) {
    ls += in[i]; // + in[i + blockDim.x];   // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
  }

  data[tid] = ls;
  __syncthreads();

  for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
    if (tid < i) {
      data[tid] += data[tid + i];
    }
    __syncthreads();
  }

  // unroll last warp
  // if (tid < 32)
  //{
  //  warpReduce(data, tid);
  //}

  if (tid == 0) {
    out[blockIdx.x] = data[0];
  }
}


105
106
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
107
108
109
110
111
112
113
114
115
116
    std::size_t buffer_bytes, std::size_t nSideChannels,
    std::size_t selectedSideChannel, std::size_t selectedBit,
    std::size_t speadHeapSize, std::size_t fft_length, std::size_t naccumulate,
    std::size_t nbits, float input_level, float output_level,
    HandlerType &handler)
    : _buffer_bytes(buffer_bytes), _nSideChannels(nSideChannels),
      _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
      _speadHeapSize(speadHeapSize), _fft_length(fft_length),
      _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
      _call_count(0) {
117
118

  // Sanity checks
119
  assert(((_nbits == 12) || (_nbits == 8)));
120
121
122
123
124
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

125
  BOOST_LOG_TRIVIAL(info)
126
      << "Creating new GatedSpectrometer instance with parameters: \n"
127
128
129
130
131
132
133
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
      << "  nSideChannels        " << _nSideChannels << "\n"
      << "  speadHeapSize        " << _speadHeapSize << " byte\n"
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
134
135
136
137
138
139

  _sideChannelSize = nSideChannels * sizeof(int64_t);
  _totalHeapSize = _speadHeapSize + _sideChannelSize;
  _nHeaps = buffer_bytes / _totalHeapSize;
  _gapSize = (buffer_bytes - _nHeaps * _totalHeapSize);
  _dataBlockBytes = _nHeaps * _speadHeapSize;
140

141
142
143
144
  assert((nSideChannels == 0) ||
         (selectedSideChannel <
          nSideChannels));  // Sanity check of side channel value
  assert(selectedBit < 64); // Sanity check of selected bit
145
  BOOST_LOG_TRIVIAL(info) << "Resulting memory configuration: \n"
146
147
                           << "  totalSizeOfHeap: " << _totalHeapSize << " byte\n"
                           << "  number of heaps per buffer: " << _nHeaps << "\n"
148
                           << "  resulting gap: " << _gapSize << " byte\n"
149
                           << "  datablock size in buffer: " << _dataBlockBytes << " byte\n";
150

151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
   _nsamps_per_buffer = _dataBlockBytes * 8 / nbits;

  _nsamps_per_output_spectra = fft_length * naccumulate;
  int nBlocks;
  if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
  { // one buffer block is used for one or multiple output spectra
    size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
    nBlocks = 1;
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  _nsamps_per_output_spectra /  _nsamps_per_buffer;
    // All data in multiple blocks has to be used
    assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
    nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";

171
172
  std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t);
  _nchans = _fft_length / 2 + 1;
173
  int batch = _nsamps_per_buffer / _fft_length;
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
  int n[] = {static_cast<int>(_fft_length)};
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));
  cufftSetStream(_fft_plan, _proc_stream);

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
  _raw_voltage_db.resize(n64bit_words);
  _sideChannelData_db.resize(_sideChannelSize * _nHeaps);
  BOOST_LOG_TRIVIAL(debug) << "  Input voltages size (in 64-bit words): "
                           << _raw_voltage_db.size();
194
195
  _unpacked_voltage_G0.resize(_nsamps_per_buffer);
  _unpacked_voltage_G1.resize(_nsamps_per_buffer);
Tobias Winchen's avatar
Tobias Winchen committed
196
197

  _baseLineN.resize(array_sum_Nthreads);
198
199
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
200
  _channelised_voltage.resize(_nchans * batch);
201
  BOOST_LOG_TRIVIAL(debug) << "  Channelised voltages size: "
202
                           << _channelised_voltage.size();
203
204
205
206
207
208
209
210
211
  _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2);  // hold on and off spectra to simplify output
  thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
  thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
  BOOST_LOG_TRIVIAL(debug) << "  Powers size: " << _power_db.size() / 2;

  _noOfBitSetsInSideChannel.resize( batch / (_naccumulate / nBlocks));
  thrust::fill( _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.);
  thrust::fill( _noOfBitSetsInSideChannel.b().begin(), _noOfBitSetsInSideChannel.b().end(), 0.);

212
213
214
  // on the host both power are stored in the same data buffer together with
  // the number of bit sets
  _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsInSideChannel.size());
215
216
217
218
219
220
221

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
222
  _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling,
223
224
225
226
                                          offset, _proc_stream));
} // constructor


227
228
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
229
230
231
232
233
234
235
236
237
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


238
239
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
240
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
      << "nchannels                " << _fft_length << "\n"
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

264
265
266
267
  _handler.init(block);
}


268
269
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
270
    thrust::device_vector<RawVoltageType> const &digitiser_raw,
271
    thrust::device_vector<int64_t> const &sideChannelData,
272
    thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<size_t> &noOfBitSet) {
273
274
275
276
277
278
279
280
281
282
283
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
    _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
    break;
  case 12:
    _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
284
285
286
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
  psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
  psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
287

288
  BOOST_LOG_TRIVIAL(debug) << "Perform gating";
289
290
  gating<<<1024, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data()),
291
292
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data()),
      thrust::raw_pointer_cast(sideChannelData.data()),
293
      _unpacked_voltage_G0.size(), _speadHeapSize, _selectedBit, _nSideChannels,
Tobias Winchen's avatar
Tobias Winchen committed
294
      _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()));
295

296
297
298
299
300
301
302
303
  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  { // ToDo: Should be in one kernel call
    countBitSet<<<(sideChannelData.size()+255)/256, 256, 0,
      _proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / _noOfBitSetsInSideChannel.size() ),
          sideChannelData.size() / _noOfBitSetsInSideChannel.size(), _selectedBit,
          _nSideChannels, _selectedBit,
          thrust::raw_pointer_cast(noOfBitSet.data() + i));
  }
304
305
306
307
308

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
  ChannelisedVoltageType *_channelised_voltage_ptr =
309
      thrust::raw_pointer_cast(_channelised_voltage.data());
310
311
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));
312
  _detector->detect(_channelised_voltage, detected, 2, 0);
313
314
315
316
317
318

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

319
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
320
  _detector->detect(_channelised_voltage, detected, 2, 1);
Tobias Winchen's avatar
Tobias Winchen committed
321
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
322
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
323
324
325
} // process


326
327
template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
328
329
330
331
332
333
334
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
  if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _buffer_bytes << " byte)";
Tobias Winchen's avatar
Tobias Winchen committed
335
336
337
    cudaDeviceSynchronize();
    cudaProfilerStop();
    return true;
338
339
  }

340
  // Copy data to device
341
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
342
343
344
345
346
  _raw_voltage_db.swap();
  _sideChannelData_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
                           << ", dataBlockBytes = " << _dataBlockBytes << "\n";
Tobias Winchen's avatar
Tobias Winchen committed
347

348
349
350
351
352
353
354
355
356
357
358
359
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
                                   _dataBlockBytes, cudaMemcpyHostToDevice,
                                   _h2d_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_sideChannelData_db.a_ptr()),
      static_cast<void *>(block.ptr() + _dataBlockBytes + _gapSize),
      _sideChannelSize * _nHeaps, cudaMemcpyHostToDevice, _h2d_stream));

  if (_call_count == 1) {
    return false;
  }
360
  // process data
361

362
363
364
365
366
367
368
369
370
371
372
373
374
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
  bool newBlock = false;
  if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device
  {
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
    newBlock = true;
    _power_db.swap();
    _noOfBitSetsInSideChannel.swap();
    // move to specific stream!
    thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
    thrust::fill( _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.);
  }
375

376
  process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsInSideChannel.a());
377
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
378

379
  if ((_call_count == 2) || (!newBlock)) {
380
381
382
    return false;
  }

383
  // copy data to host if block is finished
384
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
385
  _host_power_db.swap();
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401

  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
    // copy 2x channel data
    CUDA_ERROR_CHECK(
        cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) ,
                        static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans),
                        2 * _nchans * sizeof(IntegratedPowerType),
                        cudaMemcpyDeviceToHost, _d2h_stream));
    // copy noOf bit set data
    CUDA_ERROR_CHECK(
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans ),
          static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr() + i ),
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));
402
  }
403
404

  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
405

406
407
408
409
  if (_call_count == 3) {
    return false;
  }

410
  // calculate off value
411
  BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count << " with " << _noOfBitSetsInSideChannel.size() << " output heaps:";
412
413
414
415
416
417
418
419
420
421
422
423
  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

    size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
    size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
    *off_values =  _nHeaps - (*on_values);

    BOOST_LOG_TRIVIAL(info) << "    " << i << ": No of bit set in side channel: " << *on_values << " / " << *off_values << std::endl;
  }

  // Wrap in a RawBytes object here;
424
  RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
425
426
                 _host_power_db.size(),
                 _host_power_db.size());
427
428
429
430
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
431
432
433

  _handler(bytes);
  return false; //
434
435
436
437
438
439
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp