GatedSpectrometer.cu 17.4 KB
Newer Older
1
2
3
4
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
5

6
#include <cuda.h>
7
#include <cuda_profiler_api.h>
8
#include <thrust/system/cuda/execution_policy.h>
9
10

#include <iostream>
11
12
#include <cstring>
#include <sstream>
13
14
15
16
17

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

Tobias Winchen's avatar
Tobias Winchen committed
18
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const int64_t* __restrict__ sideChannelData,
19
                       size_t N, size_t heapSize, size_t bitpos,
Tobias Winchen's avatar
Tobias Winchen committed
20
                       size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) {
21
  float baseLine = (*_baseLineN) / N;
Tobias Winchen's avatar
Tobias Winchen committed
22
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
23
       i += blockDim.x * gridDim.x) {
Tobias Winchen's avatar
Tobias Winchen committed
24
    const float w = G0[i] - baseLine;
25
26
27
28
29
30
31
32
    const int64_t sideChannelItem =
        sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel]; // Probably not optimal access as
                                              // same data is copied for several
                                              // threads, but maybe efficiently
                                              // handled by cache?

    const int bit_set = TEST_BIT(sideChannelItem, bitpos);
Tobias Winchen's avatar
Tobias Winchen committed
33
34
    G1[i] = w * bit_set + baseLine;
    G0[i] = w * (!bit_set) + baseLine;
35
36
37
  }
}

Tobias Winchen's avatar
Tobias Winchen committed
38

39
__global__ void countBitSet(const int64_t *sideChannelData, size_t N, size_t
40
    bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
41
    *nBitsSet)
42
{
Tobias Winchen's avatar
Tobias Winchen committed
43
44
  // really not optimized reduction, but here only trivial array sizes.
  int i = blockIdx.x * blockDim.x + threadIdx.x;
45
  __shared__ unsigned int x[256];
Tobias Winchen's avatar
Tobias Winchen committed
46
47

  if (i == 0)
48
    nBitsSet[0] = 0;
Tobias Winchen's avatar
Tobias Winchen committed
49

50
  if (i * noOfSideChannels + selectedSideChannel < N)
Tobias Winchen's avatar
Tobias Winchen committed
51
52
53
54
55
56
57
58
59
60
61
    x[threadIdx.x] = TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
  else
    x[threadIdx.x] = 0;
  __syncthreads();

  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
62

Tobias Winchen's avatar
Tobias Winchen committed
63
  if(threadIdx.x == 0)
64
   nBitsSet[0] += x[threadIdx.x];
Tobias Winchen's avatar
Tobias Winchen committed
65
}
66

67

Tobias Winchen's avatar
Tobias Winchen committed
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// blocksize for the array sum kernel
#define array_sum_Nthreads 1024

__global__ void array_sum(float *in, size_t N, float *out) {
  extern __shared__ float data[];

  size_t tid = threadIdx.x;

  float ls = 0;

  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
       i += blockDim.x * gridDim.x) {
    ls += in[i]; // + in[i + blockDim.x];   // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
  }

  data[tid] = ls;
  __syncthreads();

  for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
    if (tid < i) {
      data[tid] += data[tid + i];
    }
    __syncthreads();
  }

  // unroll last warp
  // if (tid < 32)
  //{
  //  warpReduce(data, tid);
  //}

  if (tid == 0) {
    out[blockIdx.x] = data[0];
  }
}


105
106
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
107
108
    const DadaBufferLayout &dadaBufferLayout,
    std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
109
    std::size_t nbits, float input_level, float output_level,
110
    HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout),
111
      _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
112
      _fft_length(fft_length),
113
114
      _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
      _call_count(0) {
115
116

  // Sanity checks
117
  assert(((_nbits == 12) || (_nbits == 8)));
118
119
120
121
122
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

123
  BOOST_LOG_TRIVIAL(info)
124
      << "Creating new GatedSpectrometer instance with parameters: \n"
125
126
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
127
128
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
129
130
131
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
132

133
134
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
135
136
  assert(selectedBit < 64); // Sanity check of selected bit

137
   _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

  _nsamps_per_output_spectra = fft_length * naccumulate;
  int nBlocks;
  if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
  { // one buffer block is used for one or multiple output spectra
    size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
    nBlocks = 1;
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  _nsamps_per_output_spectra /  _nsamps_per_buffer;
    // All data in multiple blocks has to be used
    assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
    nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";

157
  _nchans = _fft_length / 2 + 1;
158
  int batch = _nsamps_per_buffer / _fft_length;
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
  int n[] = {static_cast<int>(_fft_length)};
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));
  cufftSetStream(_fft_plan, _proc_stream);

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
175
176
  _raw_voltage_db.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
  _sideChannelData_db.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
177
178
  BOOST_LOG_TRIVIAL(debug) << "  Input voltages size (in 64-bit words): "
                           << _raw_voltage_db.size();
179
180
  _unpacked_voltage_G0.resize(_nsamps_per_buffer);
  _unpacked_voltage_G1.resize(_nsamps_per_buffer);
Tobias Winchen's avatar
Tobias Winchen committed
181
182

  _baseLineN.resize(array_sum_Nthreads);
183
184
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
185
  _channelised_voltage.resize(_nchans * batch);
186
  BOOST_LOG_TRIVIAL(debug) << "  Channelised voltages size: "
187
                           << _channelised_voltage.size();
188
189
190
191
192
193
  _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2);  // hold on and off spectra to simplify output
  thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
  thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
  BOOST_LOG_TRIVIAL(debug) << "  Powers size: " << _power_db.size() / 2;

  _noOfBitSetsInSideChannel.resize( batch / (_naccumulate / nBlocks));
194
195
  thrust::fill(_noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.);
  thrust::fill(_noOfBitSetsInSideChannel.b().begin(), _noOfBitSetsInSideChannel.b().end(), 0.);
196

197
198
199
  // on the host both power are stored in the same data buffer together with
  // the number of bit sets
  _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsInSideChannel.size());
200
201
202
203
204
205
206

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
207
  _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling,
208
209
210
211
                                          offset, _proc_stream));
} // constructor


212
213
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
214
215
216
217
218
219
220
221
222
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


223
224
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
225
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
      << "nchannels                " << _fft_length << "\n"
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

249
250
251
252
  _handler.init(block);
}


253
254
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
255
    thrust::device_vector<RawVoltageType> const &digitiser_raw,
256
    thrust::device_vector<int64_t> const &sideChannelData,
257
    thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<size_t> &noOfBitSet) {
258
259
260
261
262
263
264
265
266
267
268
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
    _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
    break;
  case 12:
    _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
269
270
271
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
  psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
  psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
272

273
  BOOST_LOG_TRIVIAL(debug) << "Perform gating";
274
275
  gating<<<1024, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data()),
276
277
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data()),
      thrust::raw_pointer_cast(sideChannelData.data()),
278
      _unpacked_voltage_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
Tobias Winchen's avatar
Tobias Winchen committed
279
      _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()));
280

281
282
283
284
285
  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  { // ToDo: Should be in one kernel call
    countBitSet<<<(sideChannelData.size()+255)/256, 256, 0,
      _proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / _noOfBitSetsInSideChannel.size() ),
          sideChannelData.size() / _noOfBitSetsInSideChannel.size(), _selectedBit,
286
          _dadaBufferLayout.getNSideChannels(), _selectedBit,
287
288
          thrust::raw_pointer_cast(noOfBitSet.data() + i));
  }
289
290
291
292
293

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
  ChannelisedVoltageType *_channelised_voltage_ptr =
294
      thrust::raw_pointer_cast(_channelised_voltage.data());
295
296
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));
297
  _detector->detect(_channelised_voltage, detected, 2, 0);
298
299
300
301
302
303

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

304
  _detector->detect(_channelised_voltage, detected, 2, 1);
Tobias Winchen's avatar
Tobias Winchen committed
305
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
306
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
307
308
309
} // process


310
311
template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
312
313
314
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
315
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
316
317
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
318
                             << _dadaBufferLayout.getBufferSize() << " byte)";
319
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
Tobias Winchen's avatar
Tobias Winchen committed
320
321
    cudaProfilerStop();
    return true;
322
323
  }

324
  // Copy data to device
325
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
326
327
328
329
  _raw_voltage_db.swap();
  _sideChannelData_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
330
                           << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n";
Tobias Winchen's avatar
Tobias Winchen committed
331

332
333
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
334
                                   _dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice,
335
336
337
                                   _h2d_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_sideChannelData_db.a_ptr()),
338
339
      static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
      _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
340
341
342
343

  if (_call_count == 1) {
    return false;
  }
344
  // process data
345

346
347
348
349
350
351
352
353
354
355
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
  bool newBlock = false;
  if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device
  {
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
    newBlock = true;
    _power_db.swap();
    _noOfBitSetsInSideChannel.swap();
    // move to specific stream!
356
357
    thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.);
    thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.);
358
  }
359

360
  process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsInSideChannel.a());
361
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
362

363
  if ((_call_count == 2) || (!newBlock)) {
364
365
366
    return false;
  }

367
  // copy data to host if block is finished
368
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
369
  _host_power_db.swap();
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385

  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
    // copy 2x channel data
    CUDA_ERROR_CHECK(
        cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) ,
                        static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans),
                        2 * _nchans * sizeof(IntegratedPowerType),
                        cudaMemcpyDeviceToHost, _d2h_stream));
    // copy noOf bit set data
    CUDA_ERROR_CHECK(
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans ),
          static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr() + i ),
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));
386
  }
387
388

  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
389

390
391
392
393
  if (_call_count == 3) {
    return false;
  }

394
  // calculate off value
395
  BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count << " with " << _noOfBitSetsInSideChannel.size() << " output heaps:";
396
397
398
399
400
401
  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

    size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
    size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
402
    *off_values =  _dadaBufferLayout.getNHeaps() - (*on_values);
403
404
405
406
407

    BOOST_LOG_TRIVIAL(info) << "    " << i << ": No of bit set in side channel: " << *on_values << " / " << *off_values << std::endl;
  }

  // Wrap in a RawBytes object here;
408
  RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
409
410
                 _host_power_db.size(),
                 _host_power_db.size());
411
412
413
414
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
415
416
417

  _handler(bytes);
  return false; //
418
419
420
421
422
423
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp