GatedSpectrometer.cu 15.3 KB
Newer Older
1
2
3
4
5
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include <cuda.h>
6
#include <cuda_profiler_api.h>
7
8

#include <iostream>
9
10
#include <cstring>
#include <sstream>
11
12
13
14
15

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

Tobias Winchen's avatar
Tobias Winchen committed
16
17
18
19
20





Tobias Winchen's avatar
Tobias Winchen committed
21
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const int64_t* __restrict__ sideChannelData,
22
                       size_t N, size_t heapSize, size_t bitpos,
Tobias Winchen's avatar
Tobias Winchen committed
23
                       size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) {
24
  float baseLine = (*_baseLineN) / N;
Tobias Winchen's avatar
Tobias Winchen committed
25
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
26
       i += blockDim.x * gridDim.x) {
Tobias Winchen's avatar
Tobias Winchen committed
27
    const float w = G0[i] - baseLine;
28
29
30
31
32
33
34
35
    const int64_t sideChannelItem =
        sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel]; // Probably not optimal access as
                                              // same data is copied for several
                                              // threads, but maybe efficiently
                                              // handled by cache?

    const int bit_set = TEST_BIT(sideChannelItem, bitpos);
Tobias Winchen's avatar
Tobias Winchen committed
36
37
    G1[i] = w * bit_set + baseLine;
    G0[i] = w * (!bit_set) + baseLine;
38
39
40
  }
}

Tobias Winchen's avatar
Tobias Winchen committed
41

42
__global__ void countBitSet(const int64_t *sideChannelData, size_t N, size_t
43
    bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
44
    *nBitsSet)
45
{
Tobias Winchen's avatar
Tobias Winchen committed
46
47
  // really not optimized reduction, but here only trivial array sizes.
  int i = blockIdx.x * blockDim.x + threadIdx.x;
48
  __shared__ unsigned int x[256];
Tobias Winchen's avatar
Tobias Winchen committed
49
50

  if (i == 0)
51
    nBitsSet[0] = 0;
Tobias Winchen's avatar
Tobias Winchen committed
52

53
  if (i * noOfSideChannels + selectedSideChannel < N)
Tobias Winchen's avatar
Tobias Winchen committed
54
55
56
57
58
59
60
61
62
63
64
    x[threadIdx.x] = TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
  else
    x[threadIdx.x] = 0;
  __syncthreads();

  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
65

Tobias Winchen's avatar
Tobias Winchen committed
66
  if(threadIdx.x == 0)
67
   nBitsSet[0] += x[threadIdx.x];
Tobias Winchen's avatar
Tobias Winchen committed
68
}
69

70

Tobias Winchen's avatar
Tobias Winchen committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// blocksize for the array sum kernel
#define array_sum_Nthreads 1024

__global__ void array_sum(float *in, size_t N, float *out) {
  extern __shared__ float data[];

  size_t tid = threadIdx.x;

  float ls = 0;

  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
       i += blockDim.x * gridDim.x) {
    ls += in[i]; // + in[i + blockDim.x];   // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
  }

  data[tid] = ls;
  __syncthreads();

  for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
    if (tid < i) {
      data[tid] += data[tid + i];
    }
    __syncthreads();
  }

  // unroll last warp
  // if (tid < 32)
  //{
  //  warpReduce(data, tid);
  //}

  if (tid == 0) {
    out[blockIdx.x] = data[0];
  }
}


108
109
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
110
111
112
113
114
115
116
117
118
119
120
121
    std::size_t buffer_bytes, std::size_t nSideChannels,
    std::size_t selectedSideChannel, std::size_t selectedBit,
    std::size_t speadHeapSize, std::size_t fft_length, std::size_t naccumulate,
    std::size_t nbits, float input_level, float output_level,
    HandlerType &handler)
    : _buffer_bytes(buffer_bytes), _nSideChannels(nSideChannels),
      _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
      _speadHeapSize(speadHeapSize), _fft_length(fft_length),
      _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
      _call_count(0) {
  assert(((_nbits == 12) || (_nbits == 8)));
  assert(_naccumulate > 0); // Sanity check
122
  BOOST_LOG_TRIVIAL(info)
123
      << "Creating new GatedSpectrometer instance with parameters: \n"
124
125
126
127
128
129
130
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
      << "  nSideChannels        " << _nSideChannels << "\n"
      << "  speadHeapSize        " << _speadHeapSize << " byte\n"
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
131
132
133
134
135
136
137
138
139
140

  _sideChannelSize = nSideChannels * sizeof(int64_t);
  _totalHeapSize = _speadHeapSize + _sideChannelSize;
  _nHeaps = buffer_bytes / _totalHeapSize;
  _gapSize = (buffer_bytes - _nHeaps * _totalHeapSize);
  _dataBlockBytes = _nHeaps * _speadHeapSize;
  assert((nSideChannels == 0) ||
         (selectedSideChannel <
          nSideChannels));  // Sanity check of side channel value
  assert(selectedBit < 64); // Sanity check of selected bit
141
  BOOST_LOG_TRIVIAL(info) << "Resulting memory configuration: \n"
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
                           << "  totalSizeOfHeap: " << _totalHeapSize
                           << " byte\n"
                           << "  number of heaps per buffer: " << _nHeaps
                           << "\n"
                           << "  resulting gap: " << _gapSize << " byte\n"
                           << "  datablock size in buffer: " << _dataBlockBytes
                           << " byte\n";

  std::size_t nsamps_per_buffer = _dataBlockBytes * 8 / nbits;
  std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t);
  _nchans = _fft_length / 2 + 1;
  int batch = nsamps_per_buffer / _fft_length;
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
  int n[] = {static_cast<int>(_fft_length)};
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));
  cufftSetStream(_fft_plan, _proc_stream);

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
  _raw_voltage_db.resize(n64bit_words);
  _sideChannelData_db.resize(_sideChannelSize * _nHeaps);
  BOOST_LOG_TRIVIAL(debug) << "  Input voltages size (in 64-bit words): "
                           << _raw_voltage_db.size();
  _unpacked_voltage_G0.resize(nsamps_per_buffer);
  _unpacked_voltage_G1.resize(nsamps_per_buffer);
Tobias Winchen's avatar
Tobias Winchen committed
176
177

  _baseLineN.resize(array_sum_Nthreads);
178
179
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
180
  _channelised_voltage.resize(_nchans * batch);
181
  BOOST_LOG_TRIVIAL(debug) << "  Channelised voltages size: "
182
                           << _channelised_voltage.size();
183
184
  _power_db.resize(_nchans * batch / _naccumulate * 2);  // hold on and off spectra to simplify output
  BOOST_LOG_TRIVIAL(debug) << "  Powers size: " << _power_db.size() / 2; 
185
  // on the host both power are stored in the same data buffer
186
  _host_power_db.resize( _power_db.size());
Tobias Winchen's avatar
Tobias Winchen committed
187
  _noOfBitSetsInSideChannel.resize(1);
188
189
190
191
192
193
194

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
195
  _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate, scaling,
196
197
198
199
                                          offset, _proc_stream));
} // constructor


200
201
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
202
203
204
205
206
207
208
209
210
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


211
212
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
213
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
      << "nchannels                " << _fft_length << "\n"
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

237
238
239
240
  _handler.init(block);
}


241
242
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
243
    thrust::device_vector<RawVoltageType> const &digitiser_raw,
244
    thrust::device_vector<int64_t> const &sideChannelData,
245
    thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<size_t> &noOfBitSet) {
246
247
248
249
250
251
252
253
254
255
256
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
    _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
    break;
  case 12:
    _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
257
258
259
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
  psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
  psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
260

261
  BOOST_LOG_TRIVIAL(debug) << "Perform gating";
262
263
  gating<<<1024, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data()),
264
265
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data()),
      thrust::raw_pointer_cast(sideChannelData.data()),
266
      _unpacked_voltage_G0.size(), _speadHeapSize, _selectedBit, _nSideChannels,
Tobias Winchen's avatar
Tobias Winchen committed
267
      _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()));
268

Tobias Winchen's avatar
Tobias Winchen committed
269
  countBitSet<<<(sideChannelData.size()+255)/256, 256, 0,
270
271
    _proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data()),
        sideChannelData.size(), _selectedBit,
Tobias Winchen's avatar
Tobias Winchen committed
272
273
        _nSideChannels, _selectedBit,
        thrust::raw_pointer_cast(noOfBitSet.data()));
274
275
276
277
278

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
  ChannelisedVoltageType *_channelised_voltage_ptr =
279
      thrust::raw_pointer_cast(_channelised_voltage.data());
280
281
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));
282
  _detector->detect(_channelised_voltage, detected, 2, 0);
283
284
285
286
287
288

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

289
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
290
  _detector->detect(_channelised_voltage, detected, 2, 1);
Tobias Winchen's avatar
Tobias Winchen committed
291
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
292
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
293
294
295
} // process


296
297
template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
298
299
300
301
302
303
304
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
  if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _buffer_bytes << " byte)";
Tobias Winchen's avatar
Tobias Winchen committed
305
306
307
    cudaDeviceSynchronize();
    cudaProfilerStop();
    return true;
308
309
  }

310
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
311
312
313
314
315
  _raw_voltage_db.swap();
  _sideChannelData_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
                           << ", dataBlockBytes = " << _dataBlockBytes << "\n";
Tobias Winchen's avatar
Tobias Winchen committed
316

317
318
319
320
321
322
323
324
325
326
327
328
329
330
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
                                   _dataBlockBytes, cudaMemcpyHostToDevice,
                                   _h2d_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_sideChannelData_db.a_ptr()),
      static_cast<void *>(block.ptr() + _dataBlockBytes + _gapSize),
      _sideChannelSize * _nHeaps, cudaMemcpyHostToDevice, _h2d_stream));

  if (_call_count == 1) {
    return false;
  }

  // Synchronize all streams
331
  _power_db.swap();
Tobias Winchen's avatar
Tobias Winchen committed
332
  _noOfBitSetsInSideChannel.swap();
333

334
  process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsInSideChannel.a());
335

Tobias Winchen's avatar
Tobias Winchen committed
336
  // signal that data block has been processed
337
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
338

339
340
341
342
  if (_call_count == 2) {
    return false;
  }

343
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
344
  _host_power_db.swap();
345
  std::swap(_noOfBitSetsInSideChannel_host[0], _noOfBitSetsInSideChannel_host[1]);
346
347
  CUDA_ERROR_CHECK(
      cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr()),
348
349
                      static_cast<void *>(_power_db.b_ptr()),
                      _power_db.size() * sizeof(IntegratedPowerType),
350
                      cudaMemcpyDeviceToHost, _d2h_stream));
351
352
353
354
355
356
//  CUDA_ERROR_CHECK(cudaMemcpyAsync(
//        static_cast<void *>(_host_power_db.a_ptr() +
//                          (_power_db_G0.size())),           // as I am adding BEFORE the cast to void, I dont need the sizeof
//      static_cast<void *>(_power_db_G1.b_ptr()),
//      _power_db_G1.size() * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost,
//      _d2h_stream));
357

Tobias Winchen's avatar
Tobias Winchen committed
358
359
360
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(&_noOfBitSetsInSideChannel_host[0]),
        static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr()),
          1 * sizeof(size_t),cudaMemcpyDeviceToHost, _d2h_stream));
361

Tobias Winchen's avatar
Tobias Winchen committed
362
  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to device";
363

364
365
366
367
  if (_call_count == 3) {
    return false;
  }

Tobias Winchen's avatar
Tobias Winchen committed
368
  BOOST_LOG_TRIVIAL(info) << _call_count << ": No of bit set in side channel: " << _noOfBitSetsInSideChannel_host[1] << std::endl;
369
370
371
372
373
374
375
376
  // Wrap _detected_host_previous in a RawBytes object here;
  RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
                 _host_power_db.size() * sizeof(IntegratedPowerType),
                 _host_power_db.size() * sizeof(IntegratedPowerType));
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
377
378
379

  _handler(bytes);
  return false; //
380
381
382
383
384
385
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp