GatedSpectrometer.cuh 9.89 KB
Newer Older
1
2
3
4
5
6
7
8
9
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP

#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
10
#include "psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp"
11
12
13
14

#include "thrust/device_vector.h"
#include "cufft.h"

Tobias Winchen's avatar
Tobias Winchen committed
15
16
#include "cublas_v2.h"

17
18
19
20
21
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {


22
#define BIT_MASK(bit) (1uL << (bit))
23
24
25
26
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0)

27
28
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
29

30
31
32
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
33

34
35
typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
36

37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/// Input data and intermediate processing data for one polarization
struct PolarizationData
{
    /// Raw ADC Voltage
    DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
    /// Side channel data
    DoubleDeviceBuffer<uint64_t> _sideChannelData;

    /// Baseline in gate 0 state
    thrust::device_vector<UnpackedVoltageType> _baseLineG0;
    /// Baseline in gate 1 state
    thrust::device_vector<UnpackedVoltageType> _baseLineG1;
    /// Channelized voltage in gate 0 state
    thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
    /// Channelized voltage in gate 1 state
    thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;

    /// Swaps input buffers
    void swap()
56
    {
57
58
59
60
        _raw_voltage.swap();
        _sideChannelData.swap();
    }
};
61
62


63
64
65
66
67
68
69
70
// Output data for one gate
struct StokesOutput
{
    /// Stokes parameters
    DoubleDeviceBuffer<IntegratedPowerType> I;
    DoubleDeviceBuffer<IntegratedPowerType> Q;
    DoubleDeviceBuffer<IntegratedPowerType> U;
    DoubleDeviceBuffer<IntegratedPowerType> V;
71

72
73
    /// Number of samples integrated in this output block
    DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
74

75
76
    /// Reset outptu for new integration
    void reset(cudaStream_t &_proc_stream)
77
    {
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
      thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.);
      thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.);
      thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.);
      thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.);
      thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
    }

    /// Swap output buffers
    void swap()
    {
      I.swap();
      Q.swap();
      U.swap();
      V.swap();
      _noOfBitSets.swap();
    }

    /// Resize all buffers
    void resize(size_t size, size_t blocks)
    {
      I.resize(size * blocks);
      Q.resize(size * blocks);
      U.resize(size * blocks);
      V.resize(size * blocks);
      _noOfBitSets.resize(blocks);
    }
};
105
106
107
108
109
110
111







112
113
114
115
116
117
/**
 @class GatedSpectrometer
 @brief Split data into two streams and create integrated spectra depending on
 bit set in side channel data.

 */
118
template <class HandlerType> class GatedSpectrometer {
119
public:
120

121

122
123
124
125
126
127
128
129
130

public:
  /**
   * @brief      Constructor
   *
   * @param      buffer_bytes A RawBytes object wrapping a DADA header buffer
   * @param      nSideChannels Number of side channel items in the data stream,
   * @param      selectedSideChannel Side channel item used for gating
   * @param      selectedBit bit of side channel item used for gating
131
132
133
134
135
136
137
138
139
   * @param      speadHeapSize Size of the spead heap block.
   * @param      fftLength Size of the FFT
   * @param      naccumulate Number of samples to integrate in the individual
   *             FFT bins
   * @param      nbits Bit depth of the sampled signal
   * @param      input_level Normalization level of the input signal
   * @param      output_level Normalization level of the output signal
   * @param      handler Output handler
   *
140
   */
141
  GatedSpectrometer(const DadaBufferLayout &bufferLayout,
142
                    std::size_t selectedSideChannel, std::size_t selectedBit,
143
                     std::size_t fft_length,
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
                    std::size_t naccumulate, std::size_t nbits,
                    float input_level, float output_level,
                    HandlerType &handler);
  ~GatedSpectrometer();

  /**
   * @brief      A callback to be called on connection
   *             to a ring buffer.
   *
   * @detail     The first available header block in the
   *             in the ring buffer is provided as an argument.
   *             It is here that header parameters could be read
   *             if desired.
   *
   * @param      block  A RawBytes object wrapping a DADA header buffer
   */
  void init(RawBytes &block);

  /**
   * @brief      A callback to be called on acqusition of a new
   *             data block.
   *
   * @param      block  A RawBytes object wrapping a DADA data buffer output
167
   *             are the integrated specttra with/without bit set.
168
169
170
171
   */
  bool operator()(RawBytes &block);

private:
172
173
174
175
  // gate the data and fft data per gate
  void gated_fft(PolarizationData &data,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
  thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
176
177

private:
178
  DadaBufferLayout _dadaBufferLayout;
179
180
181
182
183
  std::size_t _fft_length;
  std::size_t _naccumulate;
  std::size_t _nbits;
  std::size_t _selectedSideChannel;
  std::size_t _selectedBit;
184
  std::size_t _batch;
185
186
  std::size_t _nsamps_per_output_spectra;
  std::size_t _nsamps_per_buffer;
187
  std::size_t _nsamps_per_heap;
188
189
190

  HandlerType &_handler;
  cufftHandle _fft_plan;
Tobias Winchen's avatar
Tobias Winchen committed
191
192
  uint64_t _nchans;
  uint64_t _call_count;
193
194
  double _processing_efficiency;

195
196
  std::unique_ptr<Unpacker> _unpacker;

197
198
  // Input data and per pol intermediate data
  PolarizationData polarization0, polarization1;
199
200

  // Output data
201
  StokesOutput stokes_G0, stokes_G1;
202

203
  DoublePinnedHostBuffer<char> _host_power_db;
204

205
206
  // Temporary processing block
  // ToDo: Use inplace FFT to avoid temporary coltage array
207
208
  thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
  thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
209

Tobias Winchen's avatar
Tobias Winchen committed
210

211
212
213
214
215
216
  cudaStream_t _h2d_stream;
  cudaStream_t _proc_stream;
  cudaStream_t _d2h_stream;
};


217
218
219
/**
   * @brief      Splits the input data depending on a bit set into two arrays.
   *
220
   * @detail     The resulting gaps are filled with a given baseline value in the other stream.
221
   *
Tobias Winchen's avatar
Tobias Winchen committed
222
   * @param      GO Input data. Data is set to the baseline value if corresponding
223
   *             sideChannelData bit at bitpos os set.
Tobias Winchen's avatar
Tobias Winchen committed
224
   * @param      G1 Data in this array is set to the baseline value if corresponding
225
226
227
228
229
230
231
232
233
234
235
   *             sideChannelData bit at bitpos is not set.
   * @param      sideChannelData noOfSideChannels items per block of heapSize
   *             bytes in the input data.
   * @param      N lebgth of the input/output arrays G0.
   * @param      heapsize Size of the blocks for which there is an entry in the
                 sideChannelData.
   * @param      bitpos Position of the bit to evaluate for processing.
   * @param      noOfSideChannels Number of side channels items per block of
   *             data.
   * @param      selectedSideChannel No. of side channel item to be eveluated.
                 0 <= selectedSideChannel < noOfSideChannels.
236
237
238
239
   * @param      stats_G0 No. of sampels contributing to G0, accounting also
   *             for loat heaps
   * @param      stats_G1 No. of sampels contributing to G1, accounting also
   *             for loat heaps
240
   */
241
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
242
                       size_t N, size_t heapSize, size_t bitpos,
243
                       size_t noOfSideChannels, size_t selectedSideChannel,
244
245
246
247
248
249
                       const float  baseLineG0,
                       const float  baseLineG1,
                       float* __restrict__ baseLineNG0,
                       float* __restrict__ baseLineNG1,
                       uint64_cu* stats_G0,
                       uint64_cu* stats_G1);
Tobias Winchen's avatar
Tobias Winchen committed
250

251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/**
 * @brief calculate stokes IQUV from two complex valuies for each polarization
 */
//__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V);
__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V)
{
    I = fabs(p1.x*p1.x + p1.y * p1.y) + fabs(p2.x*p2.x + p2.y * p2.y);
    Q = fabs(p1.x*p1.x + p1.y * p1.y) - fabs(p2.x*p2.x + p2.y * p2.y);
    U = 2 * (p1.x*p2.x + p1.y * p2.y);
    V = -2 * (p1.y*p2.x - p1.x * p2.y);
}




/**
 * @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate
 * complex spectra for individual polarizations
 */
__global__ void stokes_accumulate(float2 const __restrict__ *pol1,
        float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V,
        int nchans, int naccumulate)
{

  for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nchans);
       i += blockDim.x * gridDim.x)
  {
      float rI = 0;
      float rQ = 0;
      float rU = 0;
      float rV = 0;

      for (int k=0; k < naccumulate; k++)
      {
        const float2 p1 = pol1[i + k * nchans];
        const float2 p2 = pol2[i + k * nchans];

        rI += fabs(p1.x * p1.x + p1.y * p1.y) + fabs(p2.x * p2.x + p2.y * p2.y);
        rQ += fabs(p1.x * p1.x + p1.y * p1.y) - fabs(p2.x * p2.x + p2.y * p2.y);
        rU += 2.f * (p1.x * p2.x + p1.y * p2.y);
        rV += -2.f * (p1.y * p2.x - p1.x * p2.y);
      }
      I[i] += rI;
      Q[i] += rQ;
      U[i] += rU;
      V[i] += rV;
  }

}


302
303
304
305
306
307
308
309


} // edd
} // effelsberg
} // psrdada_cpp

#include "psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP