Commit c673c39a authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Merge branch 'gated_spectrometer' into 'devel'

Gated spectrometer low channel numbers and saturated samples

See merge request !9
parents a59e9f3b 2601e6a4
Pipeline #100382 passed with stages
in 16 minutes and 11 seconds
IndentWidth: 4
UseTab: Never:w
......@@ -24,7 +24,7 @@ build_cuda:
script:
- mkdir build
- cd build
- cmake .. -DPSRDADA_INCLUDE_DIR=/usr/local/include/psrdada -DENABLE_CUDA=True
- cmake .. -DPSRDADA_INCLUDE_DIR=/usr/local/include/psrdada -DENABLE_CUDA=True -DCMAKE_BUILD_TYPE=DEBUG
- make -j8
artifacts:
paths:
......
......@@ -15,22 +15,23 @@ if(ENABLE_CUDA)
# Pass options to NVCC ( -ccbin /path --compiler-options -lfftw3f --compiler-options -lm --verbose)
list(APPEND CUDA_NVCC_FLAGS -DENABLE_CUDA --std c++${CMAKE_CXX_STANDARD} -Wno-deprecated-gpu-targets --ptxas-options=-v)
list(APPEND CUDA_NVCC_FLAGS_DEBUG --debug; --device-debug; --generate-line-info -Xcompiler "-Wextra" -Xcompiler "-Werror")
list(APPEND CUDA_NVCC_FLAGS_DEBUG --debug --generate-line-info -Xcompiler "-Wextra" --Werror all-warnings ) # Do not use Xcompiler -Werror here as it prevents some kernels from execution
#list(APPEND CUDA_NVCC_FLAGS_DEBUG --debug --device-debug -Xcompiler "-Wextra" -Xcompiler "-Werror")
list(APPEND CUDA_NVCC_FLAGS_PROFILE --generate-line-info)
#list(APPEND CUDA_NVCC_FLAGS -arch compute_35) # minumum compute level (Sps restriction)
string(TOUPPER "${CMAKE_BUILD_TYPE}" uppercase_CMAKE_BUILD_TYPE)
if(NOT uppercase_CMAKE_BUILD_TYPE MATCHES "DEBUG")
message("Enabling device specific compilation as not in DEBUG mode")
message("Enabling device specific compilation as not in DEBUG mode")
list(APPEND CUDA_NVCC_FLAGS -gencode arch=compute_61,code=sm_61) # GTX1080Ti, Titan Xp
list(APPEND CUDA_NVCC_FLAGS -gencode arch=compute_61,code=compute_61) # Compatibility
#list(APPEND CUDA_NVCC_FLAGS -gencode arch=compute_52,code=sm_52) # TitanX
#list(APPEND CUDA_NVCC_FLAGS -gencode arch=compute_50,code=sm_50) # Maxwell
#list(APPEND CUDA_NVCC_FLAGS -gencode arch=compute_37,code=sm_37) # K80
if(CUDA_VERSION VERSION_GREATER 10.0)
list(APPEND CUDA_NVCC_FLAGS -gencode arch=compute_75,code=sm_75) # RTX2080, CUDA10
endif()
endif(NOT uppercase_CMAKE_BUILD_TYPE MATCHES "DEBUG")
if(CUDA_VERSION VERSION_GREATER 10.0)
list(APPEND CUDA_NVCC_FLAGS -gencode arch=compute_75,code=sm_75) # RTX2080, CUDA10
endif()
list(APPEND CUDA_NVCC_FLAGS -O3 -use_fast_math -restrict)
......
......@@ -16,7 +16,7 @@ __global__
void detect_and_accumulate(float2 const* __restrict__ in, int8_t* __restrict__ out,
int nchans, int nsamps, int naccumulate, float scale, float offset, int stride, int out_offset)
{
// grid stride loop over output array to keep
// grid stride loop over output array to keep
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nsamps * nchans / naccumulate); i += blockDim.x * gridDim.x)
{
double sum = 0.0f;
......@@ -30,36 +30,52 @@ void detect_and_accumulate(float2 const* __restrict__ in, int8_t* __restrict__ o
double y = tmp.y * tmp.y;
sum += x + y;
}
size_t toff = out_offset * nchans + currentOutputSpectra * nchans *stride;
size_t toff = out_offset * nchans + currentOutputSpectra * nchans *stride;
out[toff + i] += (int8_t) ((sum - offset)/scale);
// no atomic add for int8, thus no optimized version here.A tomic add can be
// implemented using an in32 atomicAdd and bit shifting, but this needs more effort.
}
}
template <typename T>
__global__
void detect_and_accumulate(float2 const* __restrict__ in, float* __restrict__ out,
int nchans, int nsamps, int naccumulate, float scale, float offset, int stride, int out_offset)
{
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nsamps * nchans / naccumulate); i += blockDim.x * gridDim.x)
const int nb = naccumulate / blockDim.x + 1;
const int bs = blockDim.x;
const int number_of_spectra = nsamps /( nchans * naccumulate);
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nsamps * nchans / naccumulate * nb); i += blockDim.x * gridDim.x)
{
double sum = 0;
size_t currentOutputSpectra = i / nchans;
size_t currentChannel = i % nchans;
const size_t bn = i / nchans / number_of_spectra;
const size_t currentOutputSpectra = i / nchans;
const size_t currentChannel = i % nchans;
for (size_t j = 0; j < naccumulate; j++)
double sum = 0;
for (size_t k = 0; k < bs; k++)
{
size_t j = k + bn * bs;
if (j >= naccumulate)
break;
float2 tmp = in[ j * nchans + currentOutputSpectra * nchans * naccumulate + currentChannel];
double x = tmp.x * tmp.x;
double y = tmp.y * tmp.y;
sum += x + y;
}
size_t toff = out_offset * nchans + currentOutputSpectra * nchans * stride;
out[i + toff] += sum;
atomicAdd(&out[toff + currentChannel], ((sum - offset)/scale));
}
}
} // namespace kernels
......
......@@ -56,6 +56,7 @@ struct PolarizationData
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
/// Side channel data
DoubleDeviceBuffer<uint64_t> _sideChannelData;
DoubleHostBuffer<uint64_t> _sideChannelData_h;
/// Baseline in gate 0 state
thrust::device_vector<UnpackedVoltageType> _baseLineG0;
......@@ -164,6 +165,10 @@ struct PowerSpectrumOutput
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Number of samples overflowed
DoubleHostBuffer<uint64_t> _noOfOverflowed;
/// Swap output buffers and reset the buffer in given stream for new integration
void swap(cudaStream_t &_proc_stream);
};
......@@ -237,13 +242,16 @@ struct FullStokesOutput
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Number of samples overflowed
DoubleHostBuffer<uint64_t> _noOfOverflowed;
/// Swap output buffers
void swap(cudaStream_t &_proc_stream);
};
/**
@class GatedPowerSpectrumOutput
@class GatedFullStokesOutput
@brief Output Stream for power spectrum output
*/
struct GatedFullStokesOutput: public OutputDataStream
......@@ -284,8 +292,6 @@ public:
* @param naccumulate Number of samples to integrate in the individual
* FFT bins
* @param nbits Bit depth of the sampled signal
* @param input_level Normalization level of the input signal
* @param output_level Normalization level of the output signal
* @param handler Output handler
*
*/
......@@ -293,7 +299,6 @@ public:
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t fft_length,
std::size_t naccumulate, std::size_t nbits,
float input_level, float output_level,
HandlerType &handler);
~GatedSpectrometer();
......@@ -319,23 +324,27 @@ public:
*/
bool operator()(RawBytes &block);
private:
// Processing strategy for single pol mode
// make the relevant processing methods proteceed only for testing
protected:
/// Processing strategy for single pol mode
void process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
// Processing strategy for dual pol power mode
/// Processing strategy for dual pol power mode
//void process(DualPolarizationInput*inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
// Processing strategy for full stokes mode
/// Processing strategy for full stokes mode
void process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream);
OutputType* outputDataStream;
InputType* inputDataStream;
private:
// gate the data from the input stream and fft data per gate. Number of bit
// sets in every gate is stored in the output datasets
void gated_fft(PolarizationData &data,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
private:
DadaBufferLayout _dadaBufferLayout;
std::size_t _fft_length;
std::size_t _naccumulate;
......@@ -347,14 +356,11 @@ private:
HandlerType &_handler;
cufftHandle _fft_plan;
uint64_t _nchans;
uint64_t _nBlocks;
uint64_t _nBlocks; // Number of dada blocks to integrate into one spectrum
uint64_t _call_count;
std::unique_ptr<Unpacker> _unpacker;
OutputType* outputDataStream;
InputType* inputDataStream;
// Temporary processing block
// ToDo: Use inplace FFT to avoid temporary voltage array
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
#include "psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp"
#include "thrust/device_vector.h"
#include "cufft.h"
#include "cublas_v2.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
#define BIT_MASK(bit) (1uL << (bit))
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0)
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
/**
@class GatedStokesSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template <class HandlerType> class GatedStokesSpectrometer {
public:
public:
/**
* @brief Constructor
*
* @param buffer_bytes A RawBytes object wrapping a DADA header buffer
* @param nSideChannels Number of side channel items in the data stream,
* @param selectedSideChannel Side channel item used for gating
* @param selectedBit bit of side channel item used for gating
* @param speadHeapSize Size of the spead heap block.
* @param fftLength Size of the FFT
* @param naccumulate Number of samples to integrate in the individual
* FFT bins
* @param nbits Bit depth of the sampled signal
* @param input_level Normalization level of the input signal
* @param output_level Normalization level of the output signal
* @param handler Output handler
*
*/
GatedStokesSpectrometer(const DadaBufferLayout &bufferLayout,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t fft_length,
std::size_t naccumulate, std::size_t nbits,
float input_level, float output_level,
HandlerType &handler);
~GatedStokesSpectrometer();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @details The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes &block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer output
* are the integrated specttra with/without bit set.
*/
bool operator()(RawBytes &block);
private:
// gate the data and fft data per gate
void gated_fft(PolarizationData &data,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
private:
DadaBufferLayout _dadaBufferLayout;
std::size_t _fft_length;
std::size_t _naccumulate;
std::size_t _nbits;
std::size_t _selectedSideChannel;
std::size_t _selectedBit;
std::size_t _batch;
std::size_t _nsamps_per_output_spectra;
std::size_t _nsamps_per_buffer;
std::size_t _nsamps_per_heap;
HandlerType &_handler;
cufftHandle _fft_plan;
uint64_t _nchans;
uint64_t _call_count;
double _processing_efficiency;
std::unique_ptr<Unpacker> _unpacker;
// Input data and per pol intermediate data
PolarizationData polarization0, polarization1;
// Output data
StokesOutput stokes_G0, stokes_G1;
DoublePinnedHostBuffer<char> _host_power_db;
// Temporary processing block
// ToDo: Use inplace FFT to avoid temporary coltage array
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
};
/**
* @brief Splits the input data depending on a bit set into two arrays.
*
* @details The resulting gaps are filled with a given baseline value in the other stream.
*
* @param GO Input data. Data is set to the baseline value if corresponding
* sideChannelData bit at bitpos os set.
* @param G1 Data in this array is set to the baseline value if corresponding
* sideChannelData bit at bitpos is not set.
* @param sideChannelData noOfSideChannels items per block of heapSize
* bytes in the input data.
* @param N lebgth of the input/output arrays G0.
* @param heapsize Size of the blocks for which there is an entry in the
sideChannelData.
* @param bitpos Position of the bit to evaluate for processing.
* @param noOfSideChannels Number of side channels items per block of
* data.
* @param selectedSideChannel No. of side channel item to be eveluated.
0 <= selectedSideChannel < noOfSideChannels.
* @param stats_G0 No. of sampels contributing to G0, accounting also
* for loat heaps
* @param stats_G1 No. of sampels contributing to G1, accounting also
* for loat heaps
*/
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float* __restrict__ _baseLineG0,
const float* __restrict__ _baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0,
uint64_cu* stats_G1);
/**
* @brief calculate stokes IQUV from two complex valuies for each polarization
*/
//__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V);
__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V)
{
I = fabs(p1.x*p1.x + p1.y * p1.y) + fabs(p2.x*p2.x + p2.y * p2.y);
Q = fabs(p1.x*p1.x + p1.y * p1.y) - fabs(p2.x*p2.x + p2.y * p2.y);
U = 2 * (p1.x*p2.x + p1.y * p2.y);
V = -2 * (p1.y*p2.x - p1.x * p2.y);
}
/**
* @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate
* complex spectra for individual polarizations
*/
__global__ void stokes_accumulate(float2 const __restrict__ *pol1,
float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V,
int nchans, int naccumulate)
{
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nchans);
i += blockDim.x * gridDim.x)
{
float rI = 0;
float rQ = 0;
float rU = 0;
float rV = 0;
for (int k=0; k < naccumulate; k++)
{
const float2 p1 = pol1[i + k * nchans];
const float2 p2 = pol2[i + k * nchans];
rI += fabs(p1.x * p1.x + p1.y * p1.y) + fabs(p2.x * p2.x + p2.y * p2.y);
rQ += fabs(p1.x * p1.x + p1.y * p1.y) - fabs(p2.x * p2.x + p2.y * p2.y);
rU += 2.f * (p1.x * p2.x + p1.y * p2.y);
rV += -2.f * (p1.y * p2.x - p1.x * p2.y);
}
I[i] += rI;
Q[i] += rQ;
U[i] += rU;
V[i] += rV;
}
}
} // edd
} // effelsberg
} // psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/detail/GatedStokesSpectrometer.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
......@@ -19,15 +19,15 @@ public:
void init(RawBytes& block);
bool operator()(RawBytes& block);
public:
private:
std::string _prefix;
std::size_t _counter;
int _nchan;
std::vector<std::ofstream> _output_streams;
size_t _nchan;
char _header[HEADER_SIZE];
char _start_time[START_TIME];
bool first_block;
std::vector<char> _transpose;
std::vector<std::ofstream> _output_streams;
};
} // edd
} // effelsberg
......
......@@ -19,15 +19,15 @@ public:
void init(RawBytes& block);
bool operator()(RawBytes& block);
public:
private:
std::string _prefix;
std::size_t _counter;
int _nthread;
std::vector<std::ofstream> _output_streams;
unsigned int _nthread;
char _header[HEADER_SIZE];
char _start_time[START_TIME];
bool first_block;
std::vector<char> _transpose;
std::vector<std::ofstream> _output_streams;
};
} // edd
} // effelsberg
......
......@@ -67,7 +67,7 @@ template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel,
std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate,
std::size_t nbits, float input_level, float output_level, HandlerType
std::size_t nbits, HandlerType
&handler) : _dadaBufferLayout(dadaBufferLayout),
_selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
_fft_length(fft_length), _naccumulate(naccumulate),
......@@ -82,7 +82,7 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(info)
<< "Creating new GatedSpectrometer instance with parameters: \n"
<< "Creating new GatedSpectrometer instance with parameters:\n"
<< " fft_length " << _fft_length << "\n"
<< " naccumulate " << _naccumulate << "\n"
<< " nSideChannels " << _dadaBufferLayout.getNSideChannels() << "\n"
......@@ -98,45 +98,38 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
_nchans = _fft_length / 2 + 1;
// Calculate the scaling parameters for 8 bit output
float dof = 2 * _naccumulate;
float scale =
std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
float offset = scale * dof;
float scaling = scale * std::sqrt(2 * dof) / output_level;
BOOST_LOG_TRIVIAL(debug)
<< "Correction factors for 8-bit conversion: offset = " << offset
<< ", scaling = " << scaling;
inputDataStream = new InputType(fft_length, nbits, _dadaBufferLayout);
//How many output spectra per input block?
size_t nsamps_per_output_spectra = fft_length * naccumulate;
size_t nsamps_per_pol = inputDataStream->getSamplesPerInputPolarization();
BOOST_LOG_TRIVIAL(debug) << "Samples per input polarization: " << nsamps_per_pol;
if (nsamps_per_output_spectra <= nsamps_per_pol)
{ // one buffer block is used for one or multiple output spectra
{ //
BOOST_LOG_TRIVIAL(debug) << "One buffer block is used for one or multiple output spectra";
size_t N = nsamps_per_pol / nsamps_per_output_spectra;
// All data in one block has to be used
assert(N * nsamps_per_output_spectra == nsamps_per_pol);
_nBlocks = 1;
}
else
{ // multiple blocks are integrated intoone output
{
BOOST_LOG_TRIVIAL(debug) << "Multiple blocks are integrated into one output spectrum";
size_t N = nsamps_per_output_spectra / nsamps_per_pol;
// All data in multiple blocks has to be used
assert(N * nsamps_per_pol == nsamps_per_output_spectra);
_nBlocks = N;
}
BOOST_LOG_TRIVIAL(debug) << "Integrating " << nsamps_per_output_spectra <<
" samples from " << _nBlocks << "blocks into one output spectrum.";
" samples from " << _nBlocks << " blocks into one output spectrum.";
// plan the FFT
size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
int batch = nsamps_per_pol / _fft_length;
int n[] = {static_cast<int>(_fft_length)};
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n"
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan:\n"
<< " fft_length = " << _fft_length << "\n"
<< " n[0] = " << n[0] << "\n"
<< " _nchans = " << _nchans << "\n"
......@@ -374,8 +367,6 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::process(SinglePolari
{
gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());
kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()),
thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()),
......@@ -392,6 +383,38 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::process(SinglePolari
_naccumulate / _nBlocks,
1, 0., 1, 0);
// count saturated samples
for(size_t output_block_number = 0; output_block_number < outputDataStream->G0._noOfOverflowed.size(); output_block_number++)
{
outputDataStream->G0._noOfOverflowed.a().data()[output_block_number] = 0;
outputDataStream->G1._noOfOverflowed.a().data()[output_block_number] = 0;
size_t lostHeaps = 0;
const int heaps_per_output_spectra = inputDataStream->_sideChannelData_h.size() / outputDataStream->G0._noOfOverflowed.size();
for (size_t j = output_block_number * heaps_per_output_spectra ; j < (output_block_number+1) * heaps_per_output_spectra * _dadaBufferLayout.getNSideChannels(); j+=_dadaBufferLayout.getNSideChannels())
{
if (TEST_BIT(inputDataStream->_sideChannelData_h.a().data()[j], 3))
{ // heap was lost
lostHeaps++;
continue;
}
uint16_t n_saturated = (inputDataStream->_sideChannelData_h.a().data()[j] >> 32);
if (TEST_BIT(inputDataStream->_sideChannelData_h.a().data()[