GatedSpectrometer.cu 17 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
3
4
5
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
6

7
#include <cuda.h>
8
#include <cuda_profiler_api.h>
9
#include <thrust/system/cuda/execution_policy.h>
10
11

#include <iostream>
12
#include <iomanip>
13
14
#include <cstring>
#include <sstream>
15
16
17
18
19

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

20
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData,
21
                       size_t N, size_t heapSize, size_t bitpos,
Tobias Winchen's avatar
Tobias Winchen committed
22
                       size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) {
23
  float baseLine = (*_baseLineN) / N;
Tobias Winchen's avatar
Tobias Winchen committed
24
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
25
       i += blockDim.x * gridDim.x) {
Tobias Winchen's avatar
Tobias Winchen committed
26
    const float w = G0[i] - baseLine;
27
    const uint64_t sideChannelItem =
28
29
30
31
32
33
34
        sideChannelData[((i / heapSize) * (noOfSideChannels)) +
                        selectedSideChannel]; // Probably not optimal access as
                                              // same data is copied for several
                                              // threads, but maybe efficiently
                                              // handled by cache?

    const int bit_set = TEST_BIT(sideChannelItem, bitpos);
Tobias Winchen's avatar
Tobias Winchen committed
35
36
    G1[i] = w * bit_set + baseLine;
    G0[i] = w * (!bit_set) + baseLine;
37
38
39
  }
}

Tobias Winchen's avatar
Tobias Winchen committed
40

41
__global__ void countBitSet(const uint64_t *sideChannelData, size_t N, size_t
42
    bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
43
    *nBitsSet)
44
{
Tobias Winchen's avatar
Tobias Winchen committed
45
  // really not optimized reduction, but here only trivial array sizes.
46
47
48
  // run only in one block!
  __shared__ size_t x[1024];
  size_t ls = 0;
Tobias Winchen's avatar
Tobias Winchen committed
49

50
51
52
53
54
  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
       i += blockDim.x * gridDim.x) {
    ls += TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
  }
  x[threadIdx.x] = ls;
Tobias Winchen's avatar
Tobias Winchen committed
55
56
57
58
59
60
61
62

  __syncthreads();
  for(int s = blockDim.x / 2; s > 0; s = s / 2)
  {
    if (threadIdx.x < s)
      x[threadIdx.x] += x[threadIdx.x + s];
    __syncthreads();
  }
63

Tobias Winchen's avatar
Tobias Winchen committed
64
  if(threadIdx.x == 0)
65
   nBitsSet[0] += x[threadIdx.x];
Tobias Winchen's avatar
Tobias Winchen committed
66
}
67

68

69
70
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
71
72
    const DadaBufferLayout &dadaBufferLayout,
    std::size_t selectedSideChannel, std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
73
    std::size_t nbits, float input_level, float output_level,
74
    HandlerType &handler) : _dadaBufferLayout(dadaBufferLayout),
75
      _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
76
      _fft_length(fft_length),
77
      _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
78
      _call_count(0), _nsamps_per_heap(4096) {
79
80

  // Sanity checks
81
  assert(((_nbits == 12) || (_nbits == 8)));
82
83
84
85
86
  assert(_naccumulate > 0);

  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

87
  BOOST_LOG_TRIVIAL(info)
88
      << "Creating new GatedSpectrometer instance with parameters: \n"
89
90
      << "  fft_length           " << _fft_length << "\n"
      << "  naccumulate          " << _naccumulate << "\n"
91
92
      << "  nSideChannels        " << _dadaBufferLayout.getNSideChannels() << "\n"
      << "  speadHeapSize        " << _dadaBufferLayout.getHeapSize() << " byte\n"
93
94
95
      << "  selectedSideChannel  " << _selectedSideChannel << "\n"
      << "  selectedBit          " << _selectedBit << "\n"
      << "  output bit depth     " << sizeof(IntegratedPowerType) * 8;
96

97
98
  assert((_dadaBufferLayout.getNSideChannels() == 0) ||
         (selectedSideChannel < _dadaBufferLayout.getNSideChannels()));  // Sanity check of side channel value
99
100
  assert(selectedBit < 64); // Sanity check of selected bit

101
   _nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120

  _nsamps_per_output_spectra = fft_length * naccumulate;
  int nBlocks;
  if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
  { // one buffer block is used for one or multiple output spectra
    size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
    // All data in one block has to be used
    assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
    nBlocks = 1;
  }
  else
  { // multiple blocks are integrated intoone output
    size_t N =  _nsamps_per_output_spectra /  _nsamps_per_buffer;
    // All data in multiple blocks has to be used
    assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
    nBlocks = N;
  }
  BOOST_LOG_TRIVIAL(debug) << "Integrating  " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";

121
  _nchans = _fft_length / 2 + 1;
122
  int batch = _nsamps_per_buffer / _fft_length;
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
  float dof = 2 * _naccumulate;
  float scale =
      std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
  float offset = scale * dof;
  float scaling = scale * std::sqrt(2 * dof) / output_level;
  BOOST_LOG_TRIVIAL(debug)
      << "Correction factors for 8-bit conversion: offset = " << offset
      << ", scaling = " << scaling;

  BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
  int n[] = {static_cast<int>(_fft_length)};
  CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
                                  1, _nchans, CUFFT_R2C, batch));
  cufftSetStream(_fft_plan, _proc_stream);

  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
139
140
  _raw_voltage_db.resize(_dadaBufferLayout.sizeOfData() / sizeof(uint64_t));
  _sideChannelData_db.resize(_dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps());
141
142
  BOOST_LOG_TRIVIAL(debug) << "  Input voltages size (in 64-bit words): "
                           << _raw_voltage_db.size();
143
144
  _unpacked_voltage_G0.resize(_nsamps_per_buffer);
  _unpacked_voltage_G1.resize(_nsamps_per_buffer);
Tobias Winchen's avatar
Tobias Winchen committed
145
146

  _baseLineN.resize(array_sum_Nthreads);
147
148
  BOOST_LOG_TRIVIAL(debug) << "  Unpacked voltages size (in samples): "
                           << _unpacked_voltage_G0.size();
149
  _channelised_voltage.resize(_nchans * batch);
150
  BOOST_LOG_TRIVIAL(debug) << "  Channelised voltages size: "
151
                           << _channelised_voltage.size();
152
153
154
155
156
157
  _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2);  // hold on and off spectra to simplify output
  thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
  thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
  BOOST_LOG_TRIVIAL(debug) << "  Powers size: " << _power_db.size() / 2;

  _noOfBitSetsInSideChannel.resize( batch / (_naccumulate / nBlocks));
158
159
  thrust::fill(_noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0L);
  thrust::fill(_noOfBitSetsInSideChannel.b().begin(), _noOfBitSetsInSideChannel.b().end(), 0L);
Tobias Winchen's avatar
Tobias Winchen committed
160
  BOOST_LOG_TRIVIAL(debug) << "  Bit set counter size: " << _noOfBitSetsInSideChannel.size();
161

162
163
164
  // on the host both power are stored in the same data buffer together with
  // the number of bit sets
  _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsInSideChannel.size());
165
166
167
168
169
170
171

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
  CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));

  _unpacker.reset(new Unpacker(_proc_stream));
172
  _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling,
173
174
175
176
                                          offset, _proc_stream));
} // constructor


177
178
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
179
180
181
182
183
184
185
186
187
  BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
  if (!_fft_plan)
    cufftDestroy(_fft_plan);
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


188
189
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) {
190
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
  std::stringstream headerInfo;
  headerInfo << "\n"
      << "# Gated spectrometer parameters: \n"
      << "fft_length               " << _fft_length << "\n"
      << "nchannels                " << _fft_length << "\n"
      << "naccumulate              " << _naccumulate << "\n"
      << "selected_side_channel    " << _selectedSideChannel << "\n"
      << "selected_bit             " << _selectedBit << "\n"
      << "output_bit_depth         " << sizeof(IntegratedPowerType) * 8;

  size_t bEnd = std::strlen(block.ptr());
  if (bEnd + headerInfo.str().size() < block.total_bytes())
  {
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
  }
  else
  {
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
      << " bytes already contains " << bEnd
      << "bytes. Cannot add gated spectrometer info of size "
      << headerInfo.str().size() << " bytes.";
  }

214
215
216
217
  _handler.init(block);
}


218
219
template <class HandlerType, typename IntegratedPowerType>
void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
220
    thrust::device_vector<RawVoltageType> const &digitiser_raw,
221
    thrust::device_vector<uint64_t> const &sideChannelData,
222
    thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<size_t> &noOfBitSet) {
223
224
225
226
227
228
229
230
231
232
233
  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_nbits) {
  case 8:
    _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
    break;
  case 12:
    _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }
Tobias Winchen's avatar
Tobias Winchen committed
234
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
235
236
  psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
  psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
237

238
  BOOST_LOG_TRIVIAL(debug) << "Perform gating";
239
240
  gating<<<1024, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data()),
241
242
      thrust::raw_pointer_cast(_unpacked_voltage_G1.data()),
      thrust::raw_pointer_cast(sideChannelData.data()),
243
      _unpacked_voltage_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
Tobias Winchen's avatar
Tobias Winchen committed
244
      _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()));
245

246
247
  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  { // ToDo: Should be in one kernel call
248
    countBitSet<<<1, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / _noOfBitSetsInSideChannel.size() ),
249
          sideChannelData.size() / _noOfBitSetsInSideChannel.size(), _selectedBit,
250
          _dadaBufferLayout.getNSideChannels(), _selectedBit,
251
          thrust::raw_pointer_cast(noOfBitSet.data() + i));
252
253
    
    CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
254
  }
255
256
257
258
259

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
  UnpackedVoltageType *_unpacked_voltage_ptr =
      thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
  ChannelisedVoltageType *_channelised_voltage_ptr =
260
      thrust::raw_pointer_cast(_channelised_voltage.data());
261
262
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));
263
  _detector->detect(_channelised_voltage, detected, 2, 0);
264
265
266
267
268
269

  BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
  _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
  CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
                                 (cufftComplex *)_channelised_voltage_ptr));

270
  _detector->detect(_channelised_voltage, detected, 2, 1);
Tobias Winchen's avatar
Tobias Winchen committed
271
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
272
  BOOST_LOG_TRIVIAL(debug) << "Exit processing";
273
274
275
} // process


276
277
template <class HandlerType, typename IntegratedPowerType>
bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) {
278
279
280
  ++_call_count;
  BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
                           << _call_count << ")";
281
  if (block.used_bytes() != _dadaBufferLayout.getBufferSize()) { /* Unexpected buffer size */
282
283
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
284
                             << _dadaBufferLayout.getBufferSize() << " byte)";
285
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
Tobias Winchen's avatar
Tobias Winchen committed
286
287
    cudaProfilerStop();
    return true;
288
289
  }

290
  // Copy data to device
291
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
292
293
294
295
  _raw_voltage_db.swap();
  _sideChannelData_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
296
                           << ", dataBlockBytes = " << _dadaBufferLayout.sizeOfData() << "\n";
Tobias Winchen's avatar
Tobias Winchen committed
297

298
299
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
300
                                   _dadaBufferLayout.sizeOfData() , cudaMemcpyHostToDevice,
301
302
303
                                   _h2d_stream));
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(_sideChannelData_db.a_ptr()),
304
305
      static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
      _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
306
307
  BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" <<   std::setw(12) << std::setfill('0') << std::hex <<  (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()))[0] << std::dec;

308
309
310
311

  if (_call_count == 1) {
    return false;
  }
312
  // process data
313

314
315
316
317
318
319
320
321
322
323
  // only if  a newblock is started the output buffer is swapped. Otherwise the
  // new data is added to it
  bool newBlock = false;
  if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device
  {
    BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
    newBlock = true;
    _power_db.swap();
    _noOfBitSetsInSideChannel.swap();
    // move to specific stream!
324
    thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.);
325
    thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0L);
326
  }
327

328
  process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsInSideChannel.a());
329
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
Tobias Winchen's avatar
Tobias Winchen committed
330

331
  if ((_call_count == 2) || (!newBlock)) {
332
333
334
    return false;
  }

335
  // copy data to host if block is finished
336
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
337
  _host_power_db.swap();
338
339
340
341
342
343
344
345
346
347
348
349

  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
    // copy 2x channel data
    CUDA_ERROR_CHECK(
        cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) ,
                        static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans),
                        2 * _nchans * sizeof(IntegratedPowerType),
                        cudaMemcpyDeviceToHost, _d2h_stream));
    // copy noOf bit set data
    CUDA_ERROR_CHECK(
350
        cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)),
351
352
353
          static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr() + i ),
            1 * sizeof(size_t),
            cudaMemcpyDeviceToHost, _d2h_stream));
354
  }
355
356

  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
357

358
359
360
361
  if (_call_count == 3) {
    return false;
  }

362
  // calculate off value
Tobias Winchen's avatar
Tobias Winchen committed
363
  BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsInSideChannel.size() << "x2 output heaps:";
364
365
366
367
368
  for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++)
  {
    size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));

    size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
369
    *on_values *= _nsamps_per_heap;
370
    size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
371
    *off_values =  _nsamps_per_output_spectra - (*on_values);
372

373
    BOOST_LOG_TRIVIAL(info) << "    " << i << ": No of samples wo/w. bit set in side channel: " << *on_values << " / " << *off_values << std::endl;
374
375
376
  }

  // Wrap in a RawBytes object here;
377
  RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
378
379
                 _host_power_db.size(),
                 _host_power_db.size());
380
381
382
383
  BOOST_LOG_TRIVIAL(debug) << "Calling handler";
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).
Tobias Winchen's avatar
Tobias Winchen committed
384
385
386

  _handler(bytes);
  return false; //
387
388
389
390
391
392
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp