GatedSpectrometer.cuh 13.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP

#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
10
#include "psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp"
11
12
13
14

#include "thrust/device_vector.h"
#include "cufft.h"

Tobias Winchen's avatar
Tobias Winchen committed
15
16
#include "cublas_v2.h"

17
#include <bitset>
18
19
20
21
22
23
#include <iostream>
#include <iomanip>
#include <cstring>
#include <sstream>


24
25
26
27
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

28
/// Macro to manipulate single bits of an 64-bit type.
29
#define BIT_MASK(bit) (1uL << (bit))
30
31
32
33
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0)

34
35
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
36
37
38
39
40
41

typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef float IntegratedPowerType;

42
43
44
45
/**
 @class PolarizationData
 @brief Device data arrays for raw voltage input and intermediate processing data for one polarization
 */
46
struct PolarizationData
47
{
48
    size_t _nbits;
49
50
51
52
53
    /**
    * @brief      Constructor
    *
    * @param      nbits Bit-depth of the input data.
    */
54
    PolarizationData(size_t nbits): _nbits(nbits) {};
55
56
57
58
    /// Raw ADC Voltage
    DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
    /// Side channel data
    DoubleDeviceBuffer<uint64_t> _sideChannelData;
59
    DoubleHostBuffer<uint64_t> _sideChannelData_h;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

    /// Baseline in gate 0 state
    thrust::device_vector<UnpackedVoltageType> _baseLineG0;
    /// Baseline in gate 1 state
    thrust::device_vector<UnpackedVoltageType> _baseLineG1;

    /// Baseline in gate 0 state after update
    thrust::device_vector<UnpackedVoltageType> _baseLineG0_update;
    /// Baseline in gate 1 state after update
    thrust::device_vector<UnpackedVoltageType> _baseLineG1_update;

    /// Channelized voltage in gate 0 state
    thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
    /// Channelized voltage in gate 1 state
    thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;

    /// Swaps input buffers
77
78
    void swap();

79
80
81
82
83
    ///resize the internal buffers
    void resize(size_t rawVolttageBufferBytes, size_t nsidechannelitems, size_t channelized_samples);
};


84
85
86
87
88
/**
 @class SinglePolarizationInput
 @brief Input data for a buffer containing one polarization
 */
class SinglePolarizationInput : public PolarizationData
89
90
91
92
{
    DadaBufferLayout _dadaBufferLayout;
    size_t _fft_length;

93
94
95
96
97
98
99
100
101
102
public:

    /**
    * @brief      Constructor
    *
    * @param      fft_length length of the fft.
    * @param      nbits bit-depth of the input data.
    * @param      dadaBufferLayout layout of the input dada buffer
    */
    SinglePolarizationInput(size_t fft_length, size_t nbits,
103
104
            const DadaBufferLayout &dadaBufferLayout);

105
    /// Copy data from input block to input dubble buffer
106
    void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream);
107

108
109
    /// Number of samples per input polarization
    size_t getSamplesPerInputPolarization();
110
};
111
112


113
114
115
116
117
/**
 @class SinglePolarizationInput
 @brief Input data for a buffer containing two polarizations
 */
class DualPolarizationInput
118
{
119
120
    DadaBufferLayout _dadaBufferLayout;
    size_t _fft_length;
121
122

    public:
123
    PolarizationData polarization0, polarization1;
124

125
126
127
128
129
130
131
132
    /**
    * @brief      Constructor
    *
    * @param      fft_length length of the fft.
    * @param      nbits bit-depth of the input data.
    * @param      dadaBufferLayout layout of the input dada buffer
    */
    DualPolarizationInput(size_t fft_length, size_t nbits, const DadaBufferLayout
133
134
            &dadaBufferLayout);

135
    /// Swaps input buffers for both polarizations
136
    void swap();
137

138
    /// Copy data from input block to input dubble buffer
139
    void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream);
140

141
142
    /// Number of samples per input polarization
    size_t getSamplesPerInputPolarization();
143
};
144
145
146



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167

/**
 @class PowerSpectrumOutput
 @brief Output data for one gate, single power spectrum
 */
struct PowerSpectrumOutput
{
    /**
    * @brief      Constructor
    *
    * @param      size size of the output, i.e. number of channels.
    * @param      blocks number of blocks in the output.
    */
    PowerSpectrumOutput(size_t size, size_t blocks);

    /// spectrum data
    DoubleDeviceBuffer<IntegratedPowerType> data;

    /// Number of samples integrated in this output block
    DoubleDeviceBuffer<uint64_cu> _noOfBitSets;

168
    /// Number of samples overflowed
Tobias Winchen's avatar
Tobias Winchen committed
169
    DoubleHostBuffer<uint64_t> _noOfOverflowed;
170
171


172
173
174
175
176
177
178
179
180
    /// Swap output buffers and reset the buffer in given stream for new integration
    void swap(cudaStream_t &_proc_stream);
};


/**
 @class OutputDataStream
 @brief Interface for the processed output data stream
 */
181
182
struct OutputDataStream
{
183
184
185
    size_t _nchans;
    size_t _blocks;

186
187
188
189
190
191
    /**
    * @brief      Constructor
    *
    * @param      nchans number of channels.
    * @param      blocks number of blocks in the output.
    */
192
193
194
195
196
    OutputDataStream(size_t nchans, size_t blocks) : _nchans(nchans), _blocks(blocks)
    {
    }

    /// Swap output buffers
197
    virtual void swap(cudaStream_t &_proc_stream) = 0;
198

199
    // output buffer on the host
200
    DoublePinnedHostBuffer<char> _host_power;
201
202
203
204

    // copy data from internal buffers of the implementation to the host output
    // buffer
    virtual void data2Host(cudaStream_t &_d2h_stream) = 0;
205
206
207
};


208
209
210
211
/**
 @class GatedPowerSpectrumOutput
 @brief Output Stream for power spectrum output
 */
212
213
struct GatedPowerSpectrumOutput : public OutputDataStream
{
214
    GatedPowerSpectrumOutput(size_t nchans, size_t blocks);
215

216
    /// Power spectrum for off and on gate
217
218
    PowerSpectrumOutput G0, G1;

219
    void swap(cudaStream_t &_proc_stream);
220

221
    void data2Host(cudaStream_t &_d2h_stream);
222
};
223
224


225
226
227
228
/**
 @class FullStokesOutput
 @brief Output data for one gate full stokes
 */
229
struct FullStokesOutput
230
{
231
232
233
234
235
236
237
238
239
240
    /**
    * @brief      Constructor
    *
    * @param      size size of the output, i.e. number of channels.
    * @param      blocks number of blocks in the output.
    */
    FullStokesOutput(size_t size, size_t blocks);

    /// Buffer for Stokes Parameters
    DoubleDeviceBuffer<IntegratedPowerType> I, Q, U, V;
241
242
243
244

    /// Number of samples integrated in this output block
    DoubleDeviceBuffer<uint64_cu> _noOfBitSets;

245
    /// Number of samples overflowed
246
    DoubleHostBuffer<uint64_t> _noOfOverflowed;
247

248
    /// Swap output buffers
249
    void swap(cudaStream_t &_proc_stream);
250
251
};

252

253
/**
254
 @class GatedFullStokesOutput
255
256
 @brief Output Stream for power spectrum output
 */
257
258
struct GatedFullStokesOutput: public OutputDataStream
{
259
    /// stokes output for on/off gate
260
    FullStokesOutput G0, G1;
261

262
    GatedFullStokesOutput(size_t nchans, size_t blocks);
263

264
265
    /// Swap output buffers
    void swap(cudaStream_t &_proc_stream);
266

267
    void data2Host(cudaStream_t &_d2h_stream);
268
};
269

270
271


Tobias Winchen's avatar
Tobias Winchen committed
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
struct GatedSpectrometerInputParameters
    {
        /**
         * @brief      GatedSpectrometerInputParameters defining behavior of GatedSpectrometer
         *
         * @param      buffer_bytes A RawBytes object wrapping a DADA header buffer
         * @param      nSideChannels Number of side channel items in the data stream,
         * @param      selectedSideChannel Side channel item used for gating
         * @param      selectedBit bit of side channel item used for gating
         * @param      speadHeapSize Size of the spead heap block.
         * @param      fftLength Size of the FFT
         * @param      naccumulate Number of samples to integrate in the individual
         *             FFT bins
         * @param      nbits Bit depth of the sampled signal
         * @param      handler Output handler
         *
         */

        effelsberg::edd::DadaBufferLayout dadaBufferLayout;
        size_t selectedSideChannel;
        size_t speadHeapSize;
        size_t nSideChannels;
        size_t selectedBit;
        size_t fft_length;
        size_t naccumulate;
        unsigned int nbits;
298
299
300
        std::bitset<2> active_gates;

        GatedSpectrometerInputParameters(): selectedSideChannel(0), speadHeapSize(0), nSideChannels(0), selectedBit(0), fft_length(0), naccumulate(0), nbits(0), active_gates(3) {}; // Default both gates active
Tobias Winchen's avatar
Tobias Winchen committed
301
302
303
304
    };



305

306
307
308
309
310
/**
 @class GatedSpectrometer
 @brief Split data into two streams and create integrated spectra depending on
 bit set in side channel data.
 */
311
312
313
314
template <class HandlerType,
         class InputType,
         class OutputType
         > class GatedSpectrometer {
315
public:
Tobias Winchen's avatar
Tobias Winchen committed
316

317
318
  /**
   * @brief      Constructor
Tobias Winchen's avatar
Tobias Winchen committed
319

320
   */
Tobias Winchen's avatar
Tobias Winchen committed
321
  GatedSpectrometer(const GatedSpectrometerInputParameters &ip,
322
323
324
325
326
327
328
                    HandlerType &handler);
  ~GatedSpectrometer();

  /**
   * @brief      A callback to be called on connection
   *             to a ring buffer.
   *
329
   * @details     The first available header block in the
330
331
332
333
334
335
336
337
338
339
340
341
342
   *             in the ring buffer is provided as an argument.
   *             It is here that header parameters could be read
   *             if desired.
   *
   * @param      block  A RawBytes object wrapping a DADA header buffer
   */
  void init(RawBytes &block);

  /**
   * @brief      A callback to be called on acqusition of a new
   *             data block.
   *
   * @param      block  A RawBytes object wrapping a DADA data buffer output
343
   *             are the integrated specttra with/without bit set.
344
345
346
   */
  bool operator()(RawBytes &block);

347
348
349
  // make the relevant processing methods proteceed only for testing
protected:
  /// Processing strategy for single pol mode
350
  void process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
351

352
  /// Processing strategy for dual pol  power mode
353
  //void process(DualPolarizationInput*inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
354

355
  /// Processing strategy for full stokes mode
356
  void process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream);
357

358
359
360
361
  OutputType* outputDataStream;
  InputType* inputDataStream;

private:
362
363
364
365
366
  // gate the data from the input stream and fft data per gate. Number of bit
  // sets in every gate is stored in the output datasets
  void gated_fft(PolarizationData &data,
    thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
    thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
367

368
  DadaBufferLayout _dadaBufferLayout;
Tobias Winchen's avatar
Tobias Winchen committed
369

370
371
372
373
  std::size_t _fft_length;
  std::size_t _naccumulate;
  std::size_t _selectedSideChannel;
  std::size_t _selectedBit;
374
  std::size_t _batch;
375
  std::size_t _nsamps_per_heap;
376
  std::bitset<2> _active_gates;
377
378
379

  HandlerType &_handler;
  cufftHandle _fft_plan;
Tobias Winchen's avatar
Tobias Winchen committed
380
  uint64_t _nchans;
381
  uint64_t _nBlocks;            // Number of dada blocks to integrate into one spectrum
Tobias Winchen's avatar
Tobias Winchen committed
382
  uint64_t _call_count;
383

384
  std::unique_ptr<Unpacker> _unpacker;
385

386
  // Temporary processing block
387
  // ToDo: Use inplace FFT to avoid temporary voltage array
388
389
  thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
  thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
390

391
392
393
394
395
396
  cudaStream_t _h2d_stream;
  cudaStream_t _proc_stream;
  cudaStream_t _d2h_stream;
};


Tobias Winchen's avatar
Tobias Winchen committed
397

398
399
400
/**
   * @brief      Splits the input data depending on a bit set into two arrays.
   *
401
   * @details     The resulting gaps are filled with zeros in the other stream.
402
   *
Tobias Winchen's avatar
Tobias Winchen committed
403
   * @param      GO Input data. Data is set to the baseline value if corresponding
404
   *             sideChannelData bit at bitpos os set.
Tobias Winchen's avatar
Tobias Winchen committed
405
   * @param      G1 Data in this array is set to the baseline value if corresponding
406
407
408
409
410
411
412
413
414
415
416
   *             sideChannelData bit at bitpos is not set.
   * @param      sideChannelData noOfSideChannels items per block of heapSize
   *             bytes in the input data.
   * @param      N lebgth of the input/output arrays G0.
   * @param      heapsize Size of the blocks for which there is an entry in the
                 sideChannelData.
   * @param      bitpos Position of the bit to evaluate for processing.
   * @param      noOfSideChannels Number of side channels items per block of
   *             data.
   * @param      selectedSideChannel No. of side channel item to be eveluated.
                 0 <= selectedSideChannel < noOfSideChannels.
417
418
419
420
   * @param      stats_G0 No. of sampels contributing to G0, accounting also
   *             for loat heaps
   * @param      stats_G1 No. of sampels contributing to G1, accounting also
   *             for loat heaps
421
   */
422
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
423
                       size_t N, size_t heapSize, size_t bitpos,
424
                       size_t noOfSideChannels, size_t selectedSideChannel,
425
426
                       const float*  __restrict__ _baseLineG0,
                       const float*  __restrict__ _baseLineG1,
427
428
429
430
                       float* __restrict__ baseLineNG0,
                       float* __restrict__ baseLineNG1,
                       uint64_cu* stats_G0,
                       uint64_cu* stats_G1);
Tobias Winchen's avatar
Tobias Winchen committed
431

432

433
434
435
436
437
438
439
440
441
442
443
444
445
446
/**
 * @brief calculate stokes IQUV from two complex valuies for each polarization
 */
__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V);


/**
 * @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate
 * complex spectra for individual polarizations
 */
__global__ void stokes_accumulate(float2 const __restrict__ *pol1,
        float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V,
        int nchans, int naccumulate);

447
448
449
450
451
452
453
454


} // edd
} // effelsberg
} // psrdada_cpp

#include "psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP