GatedSpectrometer.cu 15.5 KB
Newer Older
1
2
3
4
5
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include <cuda.h>
6
#include <cuda_profiler_api.h>
7
8

#include <iostream>
9
10
#include <cstring>
#include <sstream>
11
12
13
14
15

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

Tobias Winchen's avatar
Tobias Winchen committed
16
17
18
19
20





Tobias Winchen's avatar
Tobias Winchen committed
21
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const int64_t* __restrict__ sideChannelData,
22
                       size_t N, size_t heapSize, size_t bitpos,
Tobias Winchen's avatar
Tobias Winchen committed
23
                       size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) {
24
  float baseLine = (*_baseLineN) / N;
Tobias Winchen's avatar
Tobias Winchen committed
25
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
26
       i += blockDim.x * gridDim.x) {
Tobias Winchen's avatar
Tobias Winchen committed
27
    const float w = G0[i] - baseLine;
28
29
30
31
32
33
34
35
    const int64_t sideChannelItem =
        sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel]; // Probably not optimal access as
                                              // same data is copied for several
                                              // threads, but maybe efficiently
                                              // handled by cache?

    const int bit_set = TEST_BIT(sideChannelItem, bitpos);
Tobias Winchen's avatar
Tobias Winchen committed
36
37
    G1[i] = w * bit_set + baseLine;
    G0[i] = w * (!bit_set) + baseLine;
38
39
40
  }
}

Tobias Winchen's avatar
Tobias Winchen committed
41

42
__global__ void countBitSet(const int64_t *sideChannelData, size_t N, size_t
43
    bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
44
    *nBitsSet)
45
{
Tobias Winchen's avatar
Tobias Winchen committed
46
47
  // really not optimized reduction, but here only trivial array sizes.
  int i = blockIdx.x * blockDim.x + threadIdx.x;
48
  __shared__ unsigned int x[256];
Tobias Winchen's avatar
Tobias Winchen committed
49
50

  if (i == 0)
51
    nBitsSet[0] = 0;
Tobias Winchen's avatar
Tobias Winchen committed
52

53
  if (i * noOfSideChannels + selectedSideChannel < N)
Tobias Winchen's avatar
Tobias Winchen committed
54
55
56
57
58
59
60
61
62
63
64
    x[threadIdx.x] = TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
  else
    x[threadIdx.x] = 0;
  __syncthreads();

  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
65

Tobias Winchen's avatar
Tobias Winchen committed
66
  if(threadIdx.x == 0)
67
   nBitsSet[0] += x[threadIdx.x];
Tobias Winchen's avatar
Tobias Winchen committed
68
}
69

70

Tobias Winchen's avatar
Tobias Winchen committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// blocksize for the array sum kernel
#define array_sum_Nthreads 1024

__global__ void array_sum(float *in, size_t N, float *out) {
  extern __shared__ float data[];

  size_t tid = threadIdx.x;

  float ls = 0;

  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
       i += blockDim.x * gridDim.x) {
    ls += in[i]; // + in[i + blockDim.x];   // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
  }

  data[tid] = ls;
  __syncthreads();

  for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
    if (tid < i) {
      data[tid] += data[tid + i];
    }
    __syncthreads();
  }

  // unroll last warp
  // if (tid < 32)
  //{
  //  warpReduce(data, tid);
  //}

  if (tid == 0) {
    out[blockIdx.x] = data[0];
  }
}


108
109
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
110
111
112
113
114
115
116
117
118
119
120
121
    std::size_t buffer_bytes, std::size_t nSideChannels,
    std::size_t selectedSideChannel, std::size_t selectedBit,
    std::size_t speadHeapSize, std::size_t fft_length, std::size_t naccumulate,
    std::size_t nbits, float input_level, float output_level,
    HandlerType &handler)
    : _buffer_bytes(buffer_bytes), _nSideChannels(nSideChannels),
      _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
      _speadHeapSize(speadHeapSize), _fft_length(fft_length),
      _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
      _call_count(0) {
  assert(((_nbits == 12) || (_nbits == 8)));
  assert(_naccumulate > 0); // Sanity check
122
  BOOST_LOG_TRIVIAL(info)
123
      << "Creating new GatedSpectrometer instance with parameters: \n"
124
125
126
127
128
129
130
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
      << "  nSideChannels        " << _nSideChannels << "\n"
      << "  speadHeapSize        " << _speadHeapSize << " byte\n"
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
131
132
133
134
135
136
137
138
139
140

  _sideChannelSize = nSideChannels * sizeof(int64_t);
  _totalHeapSize = _speadHeapSize + _sideChannelSize;
  _nHeaps = buffer_bytes / _totalHeapSize;
  _gapSize = (buffer_bytes - _nHeaps * _totalHeapSize);
  _dataBlockBytes = _nHeaps * _speadHeapSize;
  assert((nSideChannels == 0) ||
         (selectedSideChannel <
          nSideChannels));  // Sanity check of side channel value
  assert(selectedBit < 64); // Sanity check of selected bit
141
  BOOST_LOG_TRIVIAL(info) << "Resulting memory configuration: \n"
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
                           << "  totalSizeOfHeap: " << _totalHeapSize
                           << " byte\n"
                           << "  number of heaps per buffer: " << _nHeaps
                           << "\n"
                           << "  resulting gap: " << _gapSize << " byte\n"
                           << "  datablock size in buffer: " << _dataBlockBytes
                           << " byte\n";

  std::size_t nsamps_per_buffer = _dataBlockBytes * 8 / nbits;
  std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t);
  _nchans = _fft_length / 2 + 1;
  int batch = nsamps_per_buffer / _fft_length;
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
  int n[] = {static_cast<int>(_fft_length)};
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));
  cufftSetStream(_fft_plan, _proc_stream);

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
  _raw_voltage_db.resize(n64bit_words);
  _sideChannelData_db.resize(_sideChannelSize * _nHeaps);
  BOOST_LOG_TRIVIAL(debug) << "  Input voltages size (in 64-bit words): "
                           << _raw_voltage_db.size();
  _unpacked_voltage_G0.resize(nsamps_per_buffer);
  _unpacked_voltage_G1.resize(nsamps_per_buffer);
Tobias Winchen's avatar
Tobias Winchen committed
176
177

  _baseLineN.resize(array_sum_Nthreads);
178
179
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
180
  _channelised_voltage.resize(_nchans * batch);
181
  BOOST_LOG_TRIVIAL(debug) << "  Channelised voltages size: "
182
                           << _channelised_voltage.size();
183
184
185
186
187
188
  _power_db_G0.resize(_nchans * batch / _naccumulate);
  _power_db_G1.resize(_nchans * batch / _naccumulate);
  BOOST_LOG_TRIVIAL(debug) << "  Powers size: " << _power_db_G0.size() << ", "
                           << _power_db_G1.size();
  // on the host both power are stored in the same data buffer
  _host_power_db.resize( _power_db_G0.size() + _power_db_G1 .size());
Tobias Winchen's avatar
Tobias Winchen committed
189
  _noOfBitSetsInSideChannel.resize(1);
190
191
192
193
194
195
196

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
197
  _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate, scaling,
198
199
200
201
                                          offset, _proc_stream));
} // constructor


202
203
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
204
205
206
207
208
209
210
211
212
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


213
214
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
215
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
      << "nchannels                " << _fft_length << "\n"
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

239
240
241
242
  _handler.init(block);
}


243
244
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
245
    thrust::device_vector<RawVoltageType> const &digitiser_raw,
246
    thrust::device_vector<int64_t> const &sideChannelData,
247
    thrust::device_vector<IntegratedPowerType> &detected_G0,
248
    thrust::device_vector<IntegratedPowerType> &detected_G1, thrust::device_vector<size_t> &noOfBitSet) {
249
250
251
252
253
254
255
256
257
258
259
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
    _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
    break;
  case 12:
    _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
260
261
262
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
  psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
  psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
263

264
  BOOST_LOG_TRIVIAL(debug) << "Perform gating";
265
266
  gating<<<1024, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data()),
267
268
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data()),
      thrust::raw_pointer_cast(sideChannelData.data()),
269
      _unpacked_voltage_G0.size(), _speadHeapSize, _selectedBit, _nSideChannels,
Tobias Winchen's avatar
Tobias Winchen committed
270
      _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()));
271

Tobias Winchen's avatar
Tobias Winchen committed
272
  countBitSet<<<(sideChannelData.size()+255)/256, 256, 0,
273
274
    _proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data()),
        sideChannelData.size(), _selectedBit,
Tobias Winchen's avatar
Tobias Winchen committed
275
276
        _nSideChannels, _selectedBit,
        thrust::raw_pointer_cast(noOfBitSet.data()));
277
278
279
280
281

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
  ChannelisedVoltageType *_channelised_voltage_ptr =
282
      thrust::raw_pointer_cast(_channelised_voltage.data());
283
284
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));
285
  _detector->detect(_channelised_voltage, detected_G0);
286
287
288
289
290
291

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

292
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
293
  _detector->detect(_channelised_voltage, detected_G1);
Tobias Winchen's avatar
Tobias Winchen committed
294
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
295
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
296
297
298
} // process


299
300
template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
301
302
303
304
305
306
307
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
  if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _buffer_bytes << " byte)";
Tobias Winchen's avatar
Tobias Winchen committed
308
309
310
    cudaDeviceSynchronize();
    cudaProfilerStop();
    return true;
311
312
  }

313
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
314
315
316
317
318
  _raw_voltage_db.swap();
  _sideChannelData_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
                           << ", dataBlockBytes = " << _dataBlockBytes << "\n";
Tobias Winchen's avatar
Tobias Winchen committed
319

320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
                                   _dataBlockBytes, cudaMemcpyHostToDevice,
                                   _h2d_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_sideChannelData_db.a_ptr()),
      static_cast<void *>(block.ptr() + _dataBlockBytes + _gapSize),
      _sideChannelSize * _nHeaps, cudaMemcpyHostToDevice, _h2d_stream));

  if (_call_count == 1) {
    return false;
  }

  // Synchronize all streams
  _power_db_G0.swap();
  _power_db_G1.swap();
Tobias Winchen's avatar
Tobias Winchen committed
336
  _noOfBitSetsInSideChannel.swap();
337
338

  process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db_G0.a(),
339
          _power_db_G1.a(), _noOfBitSetsInSideChannel.a());
340

Tobias Winchen's avatar
Tobias Winchen committed
341
  // signal that data block has been processed
342
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
343

344
345
346
347
  if (_call_count == 2) {
    return false;
  }

348
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
349
  _host_power_db.swap();
350
  std::swap(_noOfBitSetsInSideChannel_host[0], _noOfBitSetsInSideChannel_host[1]);
351
352
353
354
355
356
357
  CUDA_ERROR_CHECK(
      cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr()),
                      static_cast<void *>(_power_db_G0.b_ptr()),
                      _power_db_G0.size() * sizeof(IntegratedPowerType),
                      cudaMemcpyDeviceToHost, _d2h_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_host_power_db.a_ptr() +
Tobias Winchen's avatar
Tobias Winchen committed
358
                          (_power_db_G0.size())),           // as I am adding BEFORE the cast to void, I dont need the sizeof
359
360
361
362
      static_cast<void *>(_power_db_G1.b_ptr()),
      _power_db_G1.size() * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost,
      _d2h_stream));

Tobias Winchen's avatar
Tobias Winchen committed
363
364
365
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(&_noOfBitSetsInSideChannel_host[0]),
        static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr()),
          1 * sizeof(size_t),cudaMemcpyDeviceToHost, _d2h_stream));
366

Tobias Winchen's avatar
Tobias Winchen committed
367
  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to device";
368

369
370
371
372
  if (_call_count == 3) {
    return false;
  }

Tobias Winchen's avatar
Tobias Winchen committed
373
  BOOST_LOG_TRIVIAL(info) << _call_count << ": No of bit set in side channel: " << _noOfBitSetsInSideChannel_host[1] << std::endl;
374
375
376
377
378
379
380
381
  // Wrap _detected_host_previous in a RawBytes object here;
  RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
                 _host_power_db.size() * sizeof(IntegratedPowerType),
                 _host_power_db.size() * sizeof(IntegratedPowerType));
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
382
383
384

  _handler(bytes);
  return false; //
385
386
387
388
389
390
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp