VLBI.cu 13.8 KB
Newer Older
1
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"
2
#include "psrdada_cpp/effelsberg/edd/Packer.cuh"
3
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
4
5
#include "psrdada_cpp/cuda_utils.hpp"

6
7
#include "ascii_header.h" // dada

8
9
10
#include <cuda.h>
#include <cuda_profiler_api.h>
#include <thrust/system/cuda/execution_policy.h>
11
#include <thrust/extrema.h>
12
13

#include <cstring>
14
#include <iostream>
15
16
17
18
19
20
21
22
#include <sstream>

namespace psrdada_cpp {
namespace effelsberg {
namespace edd {


template <class HandlerType>
23
VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
24
25
26
                        std::size_t speadHeapSize, double sampleRate,
                        double digitizer_threshold,
                        const VDIFHeader &vdifHeader, HandlerType &handler)
27
    : _buffer_bytes(buffer_bytes), _input_bitDepth(input_bitDepth),
28
29
30
      _sampleRate(sampleRate), _digitizer_threshold(digitizer_threshold),
      _vdifHeader(vdifHeader), _output_bitDepth(2),
      _speadHeapSize(speadHeapSize), _handler(handler), _call_count(0) {
31
32
33
34
35
36

  // Sanity checks
  // check for any device errors
  CUDA_ERROR_CHECK(cudaDeviceSynchronize());

  BOOST_LOG_TRIVIAL(info) << "Creating new VLBI instance";
37
  BOOST_LOG_TRIVIAL(info) << "   Output data in VDIF format with "
38
                          << vlbiHeaderSize << "bytes header info and "
39
40
                          << _vdifHeader.getDataFrameLength() * 8
                          << " bytes data frame length";
41
42
  BOOST_LOG_TRIVIAL(debug) << "   Expecting speadheaps of size "
                           << speadHeapSize << "   byte";
43

44
  BOOST_LOG_TRIVIAL(debug) << "   Sample rate " << _sampleRate << " Hz";
45
46
47
48

  std::size_t n64bit_words = _buffer_bytes / sizeof(uint64_t);
  BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
  _raw_voltage_db.resize(n64bit_words);
49
50
51
  BOOST_LOG_TRIVIAL(debug) << "   Input voltages size : "
                           << _raw_voltage_db.size() << " 64-bit words,"
                           << _raw_voltage_db.size() * 64 / 8 << " bytes";
52
  _unpacked_voltage.resize(n64bit_words * 64 / input_bitDepth );
53
54
55
  _packed_voltage.resize(n64bit_words * 64 / input_bitDepth * _output_bitDepth /
                         8);
  BOOST_LOG_TRIVIAL(debug) << "   Output voltages size: "
56
                           << _packed_voltage.size() << " byte";
57
58
  _spillOver.reserve(vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize);

59
60
61
  // number of vlbi frames per input block
  size_t nSamplesPerInputBlock = _packed_voltage.size() * 8 / _output_bitDepth;
  size_t frames_per_block = _packed_voltage.size() / (vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize);
62
63
  BOOST_LOG_TRIVIAL(debug) << "   this correspoonds to " << frames_per_block << " - " << frames_per_block + 1 << " frames";

64
65
  _outputBuffer.resize((frames_per_block+1) * vdifHeader.getDataFrameLength() * 8 );
  // potetnitally invalidating the last frame
66
67
68
  BOOST_LOG_TRIVIAL(info) << "   Output data in VDIF format with " << _outputBuffer.size() << " bytes per buffer";


69
70
71
72
73
74
75

  CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
  CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));

  _unpacker.reset(new Unpacker(_proc_stream));

76
  _vdifHeader.setBitsPerSample(_output_bitDepth - 1); // bits per sample - 1
77
78
79
80
  _vdifHeader.setRealDataType();
} // constructor


81
template <class HandlerType> VLBI<HandlerType>::~VLBI() {
82
83
84
85
86
87
88
  BOOST_LOG_TRIVIAL(debug) << "Destroying VLBI";
  cudaStreamDestroy(_h2d_stream);
  cudaStreamDestroy(_proc_stream);
  cudaStreamDestroy(_d2h_stream);
}


89
template <class HandlerType> void VLBI<HandlerType>::init(RawBytes &block) {
90
  BOOST_LOG_TRIVIAL(debug) << "VLBI init called";
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105

  size_t sync_time = 0;
  if (ascii_header_get(block.ptr(), "SYNC_TIME", "%ld", &sync_time) != 1)
  {
    BOOST_LOG_TRIVIAL(warning) << "No or multiple SYNC_TIME parameters in header stream! Not setting reference";
    return;
  }
  size_t sample_clock_start = 0;
  if (ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%ld", &sample_clock_start) != 1)
  {
    BOOST_LOG_TRIVIAL(warning) << "No or multiple SAMPLE_CLOCK_START in header stream! Not setting reference time";
    return;
  }

  size_t timestamp = sync_time + sample_clock_start / _sampleRate;
106
  BOOST_LOG_TRIVIAL(info) << "POSIX timestamp  captured from header: " << timestamp << " = " << sync_time << " + " << sample_clock_start << " / " << _sampleRate << " = SYNC_TIME + SAMPLE_CLOCK_START/SAMPLERATE" ;
107
108
  _vdifHeader.setTimeReferencesFromTimestamp(timestamp);

109
  std::stringstream headerInfo;
110
111
  headerInfo << "\n"
             << "# VLBI parameters: \n";
112
113

  size_t bEnd = std::strlen(block.ptr());
114
  if (bEnd + headerInfo.str().size() < block.total_bytes()) {
115
    std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
116
  } else {
117
118
119
120
    BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes()
                               << " bytes already contains " << bEnd
                               << "bytes. Cannot add VLBI info of size "
                               << headerInfo.str().size() << " bytes.";
121
122
  }

123
  _baseLineN.resize(array_sum_Nthreads);
124
  _stdDevN.resize(array_sum_Nthreads);
125

126
127
128
129
130
131
132
  _handler.init(block);
}


template <class HandlerType>
bool VLBI<HandlerType>::operator()(RawBytes &block) {
  ++_call_count;
133
134
  BOOST_LOG_TRIVIAL(debug) << "VLBI operator() called (count = " << _call_count
                           << ")";
135
136
137
138
139
140
141
142
143
144
145
146
147
148
  if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
    BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
                             << block.used_bytes() << " byte, expected "
                             << _buffer_bytes << " byte)";
    CUDA_ERROR_CHECK(cudaDeviceSynchronize());
    cudaProfilerStop();
    return true;
  }
  ////////////////////////////////////////////////////////////////////////
  // Copy data to device
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
  _raw_voltage_db.swap();

  BOOST_LOG_TRIVIAL(debug) << "   block.used_bytes() = " << block.used_bytes()
149
                           << ", dataBlockBytes = " << _buffer_bytes << "\n";
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174

  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
                                   static_cast<void *>(block.ptr()),
                                   _buffer_bytes, cudaMemcpyHostToDevice,
                                   _h2d_stream));
  if (_call_count == 1) {
    return false;
  }
  ////////////////////////////////////////////////////////////////////////
  // Process data
  _packed_voltage.swap();

  BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
  switch (_input_bitDepth) {
  case 8:
    _unpacker->unpack<8>(_raw_voltage_db.b(), _unpacked_voltage);
    break;
  case 12:
    _unpacker->unpack<12>(_raw_voltage_db.b(), _unpacked_voltage);
    break;
  default:
    throw std::runtime_error("Unsupported number of bits");
  }


175
  BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
  psrdada_cpp::effelsberg::edd::
      array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(
          thrust::raw_pointer_cast(_unpacked_voltage.data()),
          _unpacked_voltage.size(),
          thrust::raw_pointer_cast(_baseLineN.data()));
  psrdada_cpp::effelsberg::edd::
      array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(
          thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(),
          thrust::raw_pointer_cast(_baseLineN.data()));

  BOOST_LOG_TRIVIAL(debug) << "Calculate std-dev";
  psrdada_cpp::effelsberg::edd::
      scaled_square_residual_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(
          thrust::raw_pointer_cast(_unpacked_voltage.data()),
          _unpacked_voltage.size(), thrust::raw_pointer_cast(_baseLineN.data()),
          thrust::raw_pointer_cast(_stdDevN.data()));
  psrdada_cpp::effelsberg::edd::
      array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(
          thrust::raw_pointer_cast(_stdDevN.data()), _stdDevN.size(),
          thrust::raw_pointer_cast(_stdDevN.data()));
196
197
198


  // non linear packing
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
  BOOST_LOG_TRIVIAL(debug) << "Packing data with non linear 2-bit packaging "
                              "using levels -v*sigma, 0, v*sigma with v = "
                           << _digitizer_threshold;
  _packed_voltage.b().resize(_unpacked_voltage.size() * 2 / 8);
  BOOST_LOG_TRIVIAL(debug) << "Input size: " << _unpacked_voltage.size()
                           << " elements";
  BOOST_LOG_TRIVIAL(debug) << "Resizing output buffer to "
                           << _packed_voltage.b().size() << " byte";

  pack2bit_nonLinear<<<128, 1024, 0, _proc_stream>>>(
      thrust::raw_pointer_cast(_unpacked_voltage.data()),
      (uint32_t *)thrust::raw_pointer_cast(_packed_voltage.b().data()),
      _unpacked_voltage.size(), _digitizer_threshold,
      thrust::raw_pointer_cast(_stdDevN.data()),
      thrust::raw_pointer_cast(_baseLineN.data()));
214
215

  CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
216
217
218
219
  BOOST_LOG_TRIVIAL(trace) << " Standard Deviation squared: " << _stdDevN[0]
                           << " "
                           << "Mean Value: "
                           << _baseLineN[0] / _unpacked_voltage.size();
220
221
222
223
224
225
226
227
228
229

  if ((_call_count == 2)) {
    return false;
  }
  _outputBuffer.swap();

  ////////////////////////////////////////////////////////////////////////
  BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
  CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));

230
231
232
  const size_t outputBlockSize = _vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize;

  const size_t totalSizeOfData = _packed_voltage.size() + _spillOver.size(); // current array + remaining of previous
233

234
  size_t numberOfBlocksInOutput = totalSizeOfData / outputBlockSize;
235
236

  size_t remainingBytes = outputBlockSize - _spillOver.size();
Tobias Winchen's avatar
Tobias Winchen committed
237
  BOOST_LOG_TRIVIAL(debug) << "   Number of blocks in output "
238
                           << numberOfBlocksInOutput;
239

240
241
  //_outputBuffer.a().resize(numberOfBlocksInOutput *
   //                        (outputBlockSize + vlbiHeaderSize));
242

Tobias Winchen's avatar
Tobias Winchen committed
243
  BOOST_LOG_TRIVIAL(debug) << "   Copying " << _spillOver.size()
244
                           << " bytes spill over";
245
  // leave room for header and fill first block of output with spill over
246
247
  std::copy(_spillOver.begin(), _spillOver.end(),
            _outputBuffer.a().begin() + vlbiHeaderSize);
248

Tobias Winchen's avatar
Tobias Winchen committed
249
  BOOST_LOG_TRIVIAL(debug) << "   Copying remaining " << remainingBytes
250
                           << " bytes for first block";
251
  // cuda memcopy remainder of first block
Tobias Winchen's avatar
Tobias Winchen committed
252
253
  CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_outputBuffer.a_ptr() + vlbiHeaderSize + _spillOver.size()),
                                   static_cast<void *>(_packed_voltage.a_ptr()),
254
255
                                   remainingBytes, cudaMemcpyDeviceToHost,
                                   _d2h_stream));
256

257
258
259
  const size_t dpitch = outputBlockSize + vlbiHeaderSize;
  const size_t spitch = outputBlockSize;
  const size_t width = outputBlockSize;
260
  size_t height = numberOfBlocksInOutput-1;
261

Tobias Winchen's avatar
Tobias Winchen committed
262
  BOOST_LOG_TRIVIAL(debug) << "   Copying " << height
263
                           << " blocks a " << outputBlockSize << " bytes";
264
  // we now have a full first block, pitch copy rest leaving room for the header
265
266
267
268
269
  CUDA_ERROR_CHECK(cudaMemcpy2DAsync(
      (void *)(_outputBuffer.a_ptr() + outputBlockSize + 2 * vlbiHeaderSize),
      dpitch, (void *)thrust::raw_pointer_cast(_packed_voltage.a_ptr() +
                                               remainingBytes),
      spitch, width, height, cudaMemcpyDeviceToHost, _d2h_stream));
270
271
272


  // new spill over
273
  _spillOver.resize(totalSizeOfData - numberOfBlocksInOutput * outputBlockSize);
274

275
  size_t offset = (numberOfBlocksInOutput-1) * outputBlockSize + remainingBytes;
276
277
  BOOST_LOG_TRIVIAL(debug) << " New spill over size " << _spillOver.size()
                           << " bytes with offset " << offset;
278

279
280
  CUDA_ERROR_CHECK(cudaMemcpyAsync(
      static_cast<void *>(thrust::raw_pointer_cast(_spillOver.data())),
Tobias Winchen's avatar
Tobias Winchen committed
281
      static_cast<void *>(_packed_voltage.a_ptr() + offset),
282
      _spillOver.size(), cudaMemcpyDeviceToHost, _d2h_stream));
283
284

  // fill in header data
Tobias Winchen's avatar
Tobias Winchen committed
285
286
  const uint32_t samplesPerDataFrame = outputBlockSize * 8 / _output_bitDepth;
  const uint32_t dataFramesPerSecond = _sampleRate / samplesPerDataFrame;
287

288
289
  BOOST_LOG_TRIVIAL(debug) << " Samples per data frame: " << samplesPerDataFrame;
  BOOST_LOG_TRIVIAL(debug) << " Dataframes per second: " << dataFramesPerSecond;
Tobias Winchen's avatar
Tobias Winchen committed
290

291
  for (uint32_t ib = 0; ib < _outputBuffer.a().size(); ib += _vdifHeader.getDataFrameLength() * 8)
292
  {
Tobias Winchen's avatar
Tobias Winchen committed
293
294
     // copy header to correct position
    std::copy(reinterpret_cast<uint8_t *>(_vdifHeader.getData()),
295
        reinterpret_cast<uint8_t *>(_vdifHeader.getData()) + vlbiHeaderSize,
296
        _outputBuffer.a().begin() + ib);
297
298
299
300
301
302
303
304
305
306
307
308
309
    size_t i = ib / _vdifHeader.getDataFrameLength() / 8;

    // invalidate rest of data so it can be dropped later.
    // Needed so that the outpuitbuffer can have always the same size
    if (i < numberOfBlocksInOutput)
    {
      _vdifHeader.setValid();
    }
    else
    {
      _vdifHeader.setInvalid();
      continue;
    }
310

311
    // update header
Tobias Winchen's avatar
Tobias Winchen committed
312
313
314
    uint32_t dataFrame = _vdifHeader.getDataFrameNumber();
    if (i < 5)
      BOOST_LOG_TRIVIAL(debug) << i << " Dataframe Number: " << dataFrame;
315
316
317
318
319
320
321
    if (dataFrame < dataFramesPerSecond)
    {
      _vdifHeader.setDataFrameNumber(dataFrame + 1);
    }
    else
    {
      _vdifHeader.setDataFrameNumber(0);
Tobias Winchen's avatar
Tobias Winchen committed
322
      _vdifHeader.setSecondsFromReferenceEpoch(_vdifHeader.getSecondsFromReferenceEpoch() + 1);
323
    }
324
325
326
327
328
329
330
331
  }

  if (_call_count == 3) {
    return false;
  }

  // Wrap in a RawBytes object here;
  RawBytes bytes(reinterpret_cast<char *>(_outputBuffer.b_ptr()),
332
333
334
                 _outputBuffer.b().size(), _outputBuffer.b().size());
  BOOST_LOG_TRIVIAL(debug) << "Calling handler, processing "
                           << _outputBuffer.b().size() << " bytes";
335
336
337
338
339
340
341
342
343
344
345
346
  // The handler can't do anything asynchronously without a copy here
  // as it would be unsafe (given that it does not own the memory it
  // is being passed).

  _handler(bytes);
  return false; //
} // operator ()

} // edd
} // effelsberg
} // psrdada_cpp