Commit 133d4984 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Merge with gated_spectrometer branch

parents ca7b19a0 d17de137
......@@ -16,7 +16,7 @@ if(ENABLE_CUDA)
# Pass options to NVCC ( -ccbin /path --compiler-options -lfftw3f --compiler-options -lm --verbose)
list(APPEND CUDA_NVCC_FLAGS -DENABLE_CUDA --std c++11 -Wno-deprecated-gpu-targets --ptxas-options=-v)
list(APPEND CUDA_NVCC_FLAGS_DEBUG --debug; --device-debug; --generate-line-info -Xcompiler "-Werror")
list(APPEND CUDA_NVCC_FLAGS_DEBUG --debug; --device-debug; --generate-line-info -Xcompiler "-Wextra" -Xcompiler "-Werror")
list(APPEND CUDA_NVCC_FLAGS_PROFILE --generate-line-info)
#list(APPEND CUDA_NVCC_FLAGS -arch compute_35) # minumum compute level (Sps restriction)
string(TOUPPER "${CMAKE_BUILD_TYPE}" uppercase_CMAKE_BUILD_TYPE)
......
......@@ -69,6 +69,7 @@ install (TARGETS ${CMAKE_PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib)
install(FILES ${psrdada_cpp_inc} DESTINATION include/psrdada_cpp)
install(DIRECTORY detail DESTINATION include/psrdada_cpp)
add_subdirectory(meerkat)
add_subdirectory(effelsberg)
......@@ -23,5 +23,10 @@ cuda_add_executable(VLBI_prof src/VLBI_prof.cu)
target_link_libraries(VLBI_prof ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
install(TARGETS VLBI_prof DESTINATION bin)
#Gated FFT spectrometer interface
cuda_add_executable(gated_spectrometer src/GatedSpectrometer_cli.cu)
target_link_libraries(gated_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas)
install(TARGETS gated_spectrometer DESTINATION bin)
add_subdirectory(test)
endif(ENABLE_CUDA)
......@@ -9,23 +9,103 @@ namespace effelsberg {
namespace edd {
namespace kernels {
// template argument unused but needed as nvcc gets otherwise confused.
template <typename T>
__global__
void detect_and_accumulate(float2 const* __restrict__ in, int8_t* __restrict__ out,
int nchans, int nsamps, int naccumulate, float scale, float offset);
int nchans, int nsamps, int naccumulate, float scale, float offset, int stride, int out_offset)
{
// grid stride loop over output array to keep
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nsamps * nchans / naccumulate); i += blockDim.x * gridDim.x)
{
double sum = 0.0f;
size_t currentOutputSpectra = i / nchans;
size_t currentChannel = i % nchans;
for (size_t j = 0; j < naccumulate; j++)
{
float2 tmp = in[ j * nchans + currentOutputSpectra * nchans * naccumulate + currentChannel];
double x = tmp.x * tmp.x;
double y = tmp.y * tmp.y;
sum += x + y;
}
size_t toff = out_offset * nchans + currentOutputSpectra * nchans *stride;
out[toff + i] += (int8_t) ((sum - offset)/scale);
}
}
template <typename T>
__global__
void detect_and_accumulate(float2 const* __restrict__ in, float* __restrict__ out,
int nchans, int nsamps, int naccumulate, float scale, float offset, int stride, int out_offset)
{
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nsamps * nchans / naccumulate); i += blockDim.x * gridDim.x)
{
double sum = 0;
size_t currentOutputSpectra = i / nchans;
size_t currentChannel = i % nchans;
for (size_t j = 0; j < naccumulate; j++)
{
float2 tmp = in[ j * nchans + currentOutputSpectra * nchans * naccumulate + currentChannel];
double x = tmp.x * tmp.x;
double y = tmp.y * tmp.y;
sum += x + y;
}
size_t toff = out_offset * nchans + currentOutputSpectra * nchans * stride;
out[i + toff] += sum;
}
}
} // namespace kernels
template <typename T>
class DetectorAccumulator
{
public:
typedef thrust::device_vector<float2> InputType;
typedef thrust::device_vector<int8_t> OutputType;
typedef thrust::device_vector<T> OutputType;
public:
DetectorAccumulator(int nchans, int tscrunch, float scale, float offset, cudaStream_t stream);
~DetectorAccumulator();
DetectorAccumulator(DetectorAccumulator const&) = delete;
void detect(InputType const& input, OutputType& output);
DetectorAccumulator(
int nchans, int tscrunch, float scale,
float offset, cudaStream_t stream)
: _nchans(nchans)
, _tscrunch(tscrunch)
, _scale(scale)
, _offset(offset)
, _stream(stream)
{
}
~DetectorAccumulator()
{
}
// stride sets an offset of _nChans * stride to the detection in the output
// to allow multiple spectra in one output
void detect(InputType const& input, OutputType& output, int stride = 0, int stoff = 0)
{
assert(input.size() % (_nchans * _tscrunch) == 0 /* Input is not a multiple of _nchans * _tscrunch*/);
//output.resize(input.size()/_tscrunch);
int nsamps = input.size() / _nchans;
float2 const* input_ptr = thrust::raw_pointer_cast(input.data());
T * output_ptr = thrust::raw_pointer_cast(output.data());
kernels::detect_and_accumulate<T> <<<1024, 1024, 0, _stream>>>(
input_ptr, output_ptr, _nchans, nsamps, _tscrunch, _scale, _offset, stride, stoff);
}
private:
int _nchans;
......
......@@ -68,7 +68,7 @@ private:
int _nchans;
int _call_count;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator> _detector;
std::unique_ptr<DetectorAccumulator<int8_t> > _detector;
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<IntegratedPowerType> _power_db;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage;
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
#include "thrust/device_vector.h"
#include "cufft.h"
#include "cublas_v2.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
#define BIT_MASK(bit) (1L << (bit))
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0)
/**
@class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template <class HandlerType, typename IntegratedPowerType> class GatedSpectrometer {
public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
// typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
public:
/**
* @brief Constructor
*
* @param buffer_bytes A RawBytes object wrapping a DADA header buffer
* @param nSideChannels Number of side channel items in the data stream,
* @param selectedSideChannel Side channel item used for gating
* @param selectedBit bit of side channel item used for gating
* @param speadHeapSize Size of the spead heap block.
* @param fftLength Size of the FFT
* @param naccumulate Number of samples to integrate in the individual
* FFT bins
* @param nbits Bit depth of the sampled signal
* @param input_level Normalization level of the input signal
* @param output_level Normalization level of the output signal
* @param handler Output handler
*
*/
GatedSpectrometer(std::size_t buffer_bytes, std::size_t nSideChannels,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t speadHeapSize, std::size_t fft_length,
std::size_t naccumulate, std::size_t nbits,
float input_level, float output_level,
HandlerType &handler);
~GatedSpectrometer();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes &block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer output
* are the integrated specttra with/without bit set.
*/
bool operator()(RawBytes &block);
private:
void process(thrust::device_vector<RawVoltageType> const &digitiser_raw,
thrust::device_vector<int64_t> const &sideChannelData,
thrust::device_vector<IntegratedPowerType> &detected,
thrust::device_vector<size_t> &noOfBitSet);
private:
std::size_t _buffer_bytes;
std::size_t _fft_length;
std::size_t _naccumulate;
std::size_t _nbits;
std::size_t _nSideChannels;
std::size_t _selectedSideChannel;
std::size_t _selectedBit;
std::size_t _speadHeapSize;
std::size_t _sideChannelSize;
std::size_t _totalHeapSize;
std::size_t _nHeaps;
std::size_t _gapSize;
std::size_t _dataBlockBytes;
std::size_t _batch;
std::size_t _nsamps_per_output_spectra;
std::size_t _nsamps_per_buffer;
HandlerType &_handler;
cufftHandle _fft_plan;
int _nchans;
int _call_count;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector;
// Input data
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<int64_t> _sideChannelData_db;
// Output data
DoubleDeviceBuffer<IntegratedPowerType> _power_db;
DoubleDeviceBuffer<size_t> _noOfBitSetsInSideChannel;
DoublePinnedHostBuffer<char> _host_power_db;
// Intermediate process steps
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _baseLineN;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
};
/**
* @brief Splits the input data depending on a bit set into two arrays.
*
* @detail The resulting gaps are filled with zeros in the other stream.
*
* @param GO Input data. Data is set to the baseline value if corresponding
* sideChannelData bit at bitpos os set.
* @param G1 Data in this array is set to the baseline value if corresponding
* sideChannelData bit at bitpos is not set.
* @param sideChannelData noOfSideChannels items per block of heapSize
* bytes in the input data.
* @param N lebgth of the input/output arrays G0.
* @param heapsize Size of the blocks for which there is an entry in the
sideChannelData.
* @param bitpos Position of the bit to evaluate for processing.
* @param noOfSideChannels Number of side channels items per block of
* data.
* @param selectedSideChannel No. of side channel item to be eveluated.
0 <= selectedSideChannel < noOfSideChannels.
*/
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel, const float *_baseLine);
/**
* @brief Sums all elements of an input array.
*
* @detail The results is stored in an array with one value per launch
* block. Full reuction thus requires two kernel launches.
*
* @param in. Input array.
* @param N. Size of input array.
* @param out. Output array.
* */
__global__ void array_sum(float *in, size_t N, float *out);
} // edd
} // effelsberg
} // psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
......@@ -2,6 +2,7 @@
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include <cuda_profiler_api.h>
#include <cuda.h>
namespace psrdada_cpp {
......@@ -62,7 +63,7 @@ FftSpectrometer<HandlerType>::FftSpectrometer(
CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));
_unpacker.reset(new Unpacker(_proc_stream));
_detector.reset(new DetectorAccumulator(_nchans, _naccumulate,
_detector.reset(new DetectorAccumulator<int8_t>(_nchans, _naccumulate,
scaling, offset, _proc_stream));
}
......@@ -111,7 +112,15 @@ bool FftSpectrometer<HandlerType>::operator()(RawBytes& block)
{
++_call_count;
BOOST_LOG_TRIVIAL(debug) << "FftSpectrometer operator() called (count = " << _call_count << ")";
assert(block.used_bytes() == _buffer_bytes /* Unexpected buffer size */);
if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
<< block.used_bytes() << " byte, expected "
<< _buffer_bytes << " byte)";
cudaDeviceSynchronize();
cudaProfilerStop();
return true;
}
CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
_raw_voltage_db.swap();
......
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include <cuda.h>
#include <cuda_profiler_api.h>
#include <thrust/system/cuda/execution_policy.h>
#include <iostream>
#include <cstring>
#include <sstream>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const int64_t* __restrict__ sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) {
float baseLine = (*_baseLineN) / N;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
const float w = G0[i] - baseLine;
const int64_t sideChannelItem =
sideChannelData[((i / heapSize) * (noOfSideChannels)) +
selectedSideChannel]; // Probably not optimal access as
// same data is copied for several
// threads, but maybe efficiently
// handled by cache?
const int bit_set = TEST_BIT(sideChannelItem, bitpos);
G1[i] = w * bit_set + baseLine;
G0[i] = w * (!bit_set) + baseLine;
}
}
__global__ void countBitSet(const int64_t *sideChannelData, size_t N, size_t
bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
*nBitsSet)
{
// really not optimized reduction, but here only trivial array sizes.
int i = blockIdx.x * blockDim.x + threadIdx.x;
__shared__ unsigned int x[256];
if (i == 0)
nBitsSet[0] = 0;
if (i * noOfSideChannels + selectedSideChannel < N)
x[threadIdx.x] = TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
else
x[threadIdx.x] = 0;
__syncthreads();
for(int s = blockDim.x / 2; s > 0; s = s / 2)
{
if (threadIdx.x < s)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
if(threadIdx.x == 0)
nBitsSet[0] += x[threadIdx.x];
}
// blocksize for the array sum kernel
#define array_sum_Nthreads 1024
__global__ void array_sum(float *in, size_t N, float *out) {
extern __shared__ float data[];
size_t tid = threadIdx.x;
float ls = 0;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
}
data[tid] = ls;
__syncthreads();
for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
if (tid < i) {
data[tid] += data[tid + i];
}
__syncthreads();
}
// unroll last warp
// if (tid < 32)
//{
// warpReduce(data, tid);
//}
if (tid == 0) {
out[blockIdx.x] = data[0];
}
}
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
std::size_t buffer_bytes, std::size_t nSideChannels,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t speadHeapSize, std::size_t fft_length, std::size_t naccumulate,
std::size_t nbits, float input_level, float output_level,
HandlerType &handler)
: _buffer_bytes(buffer_bytes), _nSideChannels(nSideChannels),
_selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
_speadHeapSize(speadHeapSize), _fft_length(fft_length),
_naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
_call_count(0) {
// Sanity checks
assert(((_nbits == 12) || (_nbits == 8)));
assert(_naccumulate > 0);
// check for any device errors
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(info)
<< "Creating new GatedSpectrometer instance with parameters: \n"
<< " fft_length " << _fft_length << "\n"
<< " naccumulate " << _naccumulate << "\n"
<< " nSideChannels " << _nSideChannels << "\n"
<< " speadHeapSize " << _speadHeapSize << " byte\n"
<< " selectedSideChannel " << _selectedSideChannel << "\n"
<< " selectedBit " << _selectedBit << "\n"
<< " output bit depth " << sizeof(IntegratedPowerType) * 8;
_sideChannelSize = nSideChannels * sizeof(int64_t);
_totalHeapSize = _speadHeapSize + _sideChannelSize;
_nHeaps = buffer_bytes / _totalHeapSize;
_gapSize = (buffer_bytes - _nHeaps * _totalHeapSize);
_dataBlockBytes = _nHeaps * _speadHeapSize;
assert((nSideChannels == 0) ||
(selectedSideChannel <
nSideChannels)); // Sanity check of side channel value
assert(selectedBit < 64); // Sanity check of selected bit
BOOST_LOG_TRIVIAL(info) << "Resulting memory configuration: \n"
<< " totalSizeOfHeap: " << _totalHeapSize << " byte\n"
<< " number of heaps per buffer: " << _nHeaps << "\n"
<< " resulting gap: " << _gapSize << " byte\n"
<< " datablock size in buffer: " << _dataBlockBytes << " byte\n";
_nsamps_per_buffer = _dataBlockBytes * 8 / nbits;
_nsamps_per_output_spectra = fft_length * naccumulate;
int nBlocks;
if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
{ // one buffer block is used for one or multiple output spectra
size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
// All data in one block has to be used
assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
nBlocks = 1;
}
else
{ // multiple blocks are integrated intoone output
size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer;
// All data in multiple blocks has to be used
assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
nBlocks = N;
}
BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra.";
std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t);
_nchans = _fft_length / 2 + 1;
int batch = _nsamps_per_buffer / _fft_length;
float dof = 2 * _naccumulate;
float scale =
std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
float offset = scale * dof;
float scaling = scale * std::sqrt(2 * dof) / output_level;
BOOST_LOG_TRIVIAL(debug)
<< "Correction factors for 8-bit conversion: offset = " << offset
<< ", scaling = " << scaling;
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
int n[] = {static_cast<int>(_fft_length)};
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
1, _nchans, CUFFT_R2C, batch));
cufftSetStream(_fft_plan, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
_raw_voltage_db.resize(n64bit_words);
_sideChannelData_db.resize(_sideChannelSize * _nHeaps);
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): "
<< _raw_voltage_db.size();
_unpacked_voltage_G0.resize(_nsamps_per_buffer);
_unpacked_voltage_G1.resize(_nsamps_per_buffer);
_baseLineN.resize(array_sum_Nthreads);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): "
<< _unpacked_voltage_G0.size();
_channelised_voltage.resize(_nchans * batch);
BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: "
<< _channelised_voltage.size();
_power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2); // hold on and off spectra to simplify output
thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.);
thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.);
BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2;
_noOfBitSetsInSideChannel.resize( batch / (_naccumulate / nBlocks));
thrust::fill(_noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.);
thrust::fill(_noOfBitSetsInSideChannel.b().begin(), _noOfBitSetsInSideChannel.b().end(), 0.);
// on the host both power are stored in the same data buffer together with
// the number of bit sets
_host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsInSideChannel.size());
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));