Unverified Commit 9a0102f3 authored by Tobias Winchen's avatar Tobias Winchen Committed by GitHub
Browse files

Merge pull request #19 from TobiasWinchen/gated_full_stokes

Full stokes version of gated spectrometer
parents e27e465e 4adcb11e
......@@ -7,6 +7,9 @@ enable_testing()
set (psrdada_cpp_VERSION_MAJOR 0)
set (psrdada_cpp_VERSION_MINOR 1)
set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
# cmake setup.
list(INSERT CMAKE_MODULE_PATH 0 ${CMAKE_SOURCE_DIR}/cmake)
......
......@@ -15,4 +15,4 @@ namespace psrdada_cpp
bool operator()(RawBytes&);
};
} //namespace psrdada_cpp
#endif //PSRDADA_CPP_DADA_NULL_SINK_HPP
\ No newline at end of file
#endif //PSRDADA_CPP_DADA_NULL_SINK_HPP
......@@ -9,9 +9,9 @@ set(PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES
${DEPENDENCY_LIBRARIES})
set(psrdada_cpp_effelsberg_edd_src
src/Channeliser.cu
src/DadaBufferLayout.cpp
src/DetectorAccumulator.cu
src/GatedSpectrometer.cu
src/EDDPolnMerge.cpp
src/EDDRoach.cpp
src/EDDRoach_merge.cpp
......@@ -23,7 +23,6 @@ set(psrdada_cpp_effelsberg_edd_src
set(psrdada_cpp_effelsberg_edd_inc
Channeliser.cuh
DadaBufferLayout.hpp
DetectorAccumulator.cuh
EDDPolnMerge.hpp
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/effelsberg/edd/ScaledTransposeTFtoTFT.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/dada_write_client.hpp"
#include "thrust/device_vector.h"
#include "cufft.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class Channeliser
{
public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef char2 PackedChannelisedVoltageType;
public:
Channeliser(
std::size_t buffer_bytes,
std::size_t fft_length,
std::size_t nbits,
float input_level,
float output_level,
DadaWriteClient& client);
~Channeliser();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
void process(thrust::device_vector<RawVoltageType> const& digitiser_raw,
thrust::device_vector<PackedChannelisedVoltageType>& packed_channelised);
private:
std::size_t _buffer_bytes;
std::size_t _fft_length;
std::size_t _nbits;
DadaWriteClient& _client;
cufftHandle _fft_plan;
int _nchans;
int _call_count;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<ScaledTransposeTFtoTFT> _transposer;
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<PackedChannelisedVoltageType> _packed_channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
};
} //edd
} //effelsberg
} //psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
......@@ -24,12 +24,18 @@ class DadaBufferLayout
public:
// input key of the dadad buffer
// size of spead heaps in bytes
// number of side channels
DadaBufferLayout();
DadaBufferLayout(key_t input_key , size_t heapSize, size_t nSideChannels);
void intitialize(key_t input_key , size_t heapSize, size_t nSideChannels);
key_t getInputkey() const;
size_t getBufferSize() const;
// get size of heaps in bytes
size_t getHeapSize() const;
size_t getNSideChannels() const;
......
......@@ -14,11 +14,18 @@
#include "cublas_v2.h"
#include <iostream>
#include <iomanip>
#include <cstring>
#include <sstream>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
/// Macro to manipulate single bits of an 64-bit type.
#define BIT_MASK(bit) (1uL << (bit))
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
......@@ -26,21 +33,244 @@ namespace edd {
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef float IntegratedPowerType;
/**
@class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
@class PolarizationData
@brief Device data arrays for raw voltage input and intermediate processing data for one polarization
*/
struct PolarizationData
{
size_t _nbits;
/**
* @brief Constructor
*
* @param nbits Bit-depth of the input data.
*/
PolarizationData(size_t nbits): _nbits(nbits) {};
/// Raw ADC Voltage
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
/// Side channel data
DoubleDeviceBuffer<uint64_t> _sideChannelData;
/// Baseline in gate 0 state
thrust::device_vector<UnpackedVoltageType> _baseLineG0;
/// Baseline in gate 1 state
thrust::device_vector<UnpackedVoltageType> _baseLineG1;
/// Baseline in gate 0 state after update
thrust::device_vector<UnpackedVoltageType> _baseLineG0_update;
/// Baseline in gate 1 state after update
thrust::device_vector<UnpackedVoltageType> _baseLineG1_update;
/// Channelized voltage in gate 0 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
/// Channelized voltage in gate 1 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;
/// Swaps input buffers
void swap();
///resize the internal buffers
void resize(size_t rawVolttageBufferBytes, size_t nsidechannelitems, size_t channelized_samples);
};
/**
@class SinglePolarizationInput
@brief Input data for a buffer containing one polarization
*/
template <class HandlerType, typename IntegratedPowerType> class GatedSpectrometer {
class SinglePolarizationInput : public PolarizationData
{
DadaBufferLayout _dadaBufferLayout;
size_t _fft_length;
public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
// typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
/**
* @brief Constructor
*
* @param fft_length length of the fft.
* @param nbits bit-depth of the input data.
* @param dadaBufferLayout layout of the input dada buffer
*/
SinglePolarizationInput(size_t fft_length, size_t nbits,
const DadaBufferLayout &dadaBufferLayout);
/// Copy data from input block to input dubble buffer
void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream);
/// Number of samples per input polarization
size_t getSamplesPerInputPolarization();
};
/**
@class SinglePolarizationInput
@brief Input data for a buffer containing two polarizations
*/
class DualPolarizationInput
{
DadaBufferLayout _dadaBufferLayout;
size_t _fft_length;
public:
PolarizationData polarization0, polarization1;
/**
* @brief Constructor
*
* @param fft_length length of the fft.
* @param nbits bit-depth of the input data.
* @param dadaBufferLayout layout of the input dada buffer
*/
DualPolarizationInput(size_t fft_length, size_t nbits, const DadaBufferLayout
&dadaBufferLayout);
/// Swaps input buffers for both polarizations
void swap();
/// Copy data from input block to input dubble buffer
void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream);
/// Number of samples per input polarization
size_t getSamplesPerInputPolarization();
};
/**
@class PowerSpectrumOutput
@brief Output data for one gate, single power spectrum
*/
struct PowerSpectrumOutput
{
/**
* @brief Constructor
*
* @param size size of the output, i.e. number of channels.
* @param blocks number of blocks in the output.
*/
PowerSpectrumOutput(size_t size, size_t blocks);
/// spectrum data
DoubleDeviceBuffer<IntegratedPowerType> data;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Swap output buffers and reset the buffer in given stream for new integration
void swap(cudaStream_t &_proc_stream);
};
/**
@class OutputDataStream
@brief Interface for the processed output data stream
*/
struct OutputDataStream
{
size_t _nchans;
size_t _blocks;
/**
* @brief Constructor
*
* @param nchans number of channels.
* @param blocks number of blocks in the output.
*/
OutputDataStream(size_t nchans, size_t blocks) : _nchans(nchans), _blocks(blocks)
{
}
/// Swap output buffers
virtual void swap(cudaStream_t &_proc_stream) = 0;
// output buffer on the host
DoublePinnedHostBuffer<char> _host_power;
// copy data from internal buffers of the implementation to the host output
// buffer
virtual void data2Host(cudaStream_t &_d2h_stream) = 0;
};
/**
@class GatedPowerSpectrumOutput
@brief Output Stream for power spectrum output
*/
struct GatedPowerSpectrumOutput : public OutputDataStream
{
GatedPowerSpectrumOutput(size_t nchans, size_t blocks);
/// Power spectrum for off and on gate
PowerSpectrumOutput G0, G1;
void swap(cudaStream_t &_proc_stream);
void data2Host(cudaStream_t &_d2h_stream);
};
/**
@class FullStokesOutput
@brief Output data for one gate full stokes
*/
struct FullStokesOutput
{
/**
* @brief Constructor
*
* @param size size of the output, i.e. number of channels.
* @param blocks number of blocks in the output.
*/
FullStokesOutput(size_t size, size_t blocks);
/// Buffer for Stokes Parameters
DoubleDeviceBuffer<IntegratedPowerType> I, Q, U, V;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Swap output buffers
void swap(cudaStream_t &_proc_stream);
};
/**
@class GatedPowerSpectrumOutput
@brief Output Stream for power spectrum output
*/
struct GatedFullStokesOutput: public OutputDataStream
{
/// stokes output for on/off gate
FullStokesOutput G0, G1;
GatedFullStokesOutput(size_t nchans, size_t blocks);
/// Swap output buffers
void swap(cudaStream_t &_proc_stream);
void data2Host(cudaStream_t &_d2h_stream);
};
/**
@class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template <class HandlerType,
class InputType,
class OutputType
> class GatedSpectrometer {
public:
/**
* @brief Constructor
......@@ -90,51 +320,45 @@ public:
bool operator()(RawBytes &block);
private:
void process(thrust::device_vector<RawVoltageType> const &digitiser_raw,
thrust::device_vector<uint64_t> const &sideChannelData,
thrust::device_vector<IntegratedPowerType> &detected,
thrust::device_vector<uint64_cu> &noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &noOfBitSetsIn_G1);
// Processing strategy for single pol mode
void process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
// Processing strategy for dual pol power mode
//void process(DualPolarizationInput*inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
// Processing strategy for full stokes mode
void process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream);
// gate the data from the input stream and fft data per gate. Number of bit
// sets in every gate is stored in the output datasets
void gated_fft(PolarizationData &data,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G0,
thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1);
private:
DadaBufferLayout _dadaBufferLayout;
std::size_t _fft_length;
std::size_t _naccumulate;
std::size_t _nbits;
std::size_t _selectedSideChannel;
std::size_t _selectedBit;
std::size_t _batch;
std::size_t _nsamps_per_output_spectra;
std::size_t _nsamps_per_buffer;
std::size_t _nsamps_per_heap;
HandlerType &_handler;
cufftHandle _fft_plan;
uint64_t _nchans;
uint64_t _nBlocks;
uint64_t _call_count;
double _processing_efficiency;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector;
// Input data
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<uint64_t> _sideChannelData_db;
// Output data
DoubleDeviceBuffer<IntegratedPowerType> _power_db;
OutputType* outputDataStream;
InputType* inputDataStream;
DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G0;
DoubleDeviceBuffer<uint64_cu> _noOfBitSetsIn_G1;
DoublePinnedHostBuffer<char> _host_power_db;
// Intermediate process steps
// Temporary processing block
// ToDo: Use inplace FFT to avoid temporary voltage array
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _baseLineNG0;
thrust::device_vector<UnpackedVoltageType> _baseLineNG1;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
......@@ -170,14 +394,28 @@ private:
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float baseLineG0,
const float baseLineG1,
const float* __restrict__ _baseLineG0,
const float* __restrict__ _baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0,
uint64_cu* stats_G1);
/**
* @brief calculate stokes IQUV from two complex valuies for each polarization
*/
__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V);
/**
* @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate
* complex spectra for individual polarizations
*/
__global__ void stokes_accumulate(float2 const __restrict__ *pol1,
float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V,
int nchans, int naccumulate);
} // edd
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
#include "psrdada_cpp/effelsberg/edd/DadaBufferLayout.hpp"
#include "thrust/device_vector.h"
#include "cufft.h"
#include "cublas_v2.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
#define BIT_MASK(bit) (1uL << (bit))
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0)
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
/**
@class GatedStokesSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template <class HandlerType> class GatedStokesSpectrometer {
public:
public:
/**
* @brief Constructor
*
* @param buffer_bytes A RawBytes object wrapping a DADA header buffer
* @param nSideChannels Number of side channel items in the data stream,
* @param selectedSideChannel Side channel item used for gating
* @param selectedBit bit of side channel item used for gating
* @param speadHeapSize Size of the spead heap block.
* @param fftLength Size of the FFT
* @param naccumulate Number of samples to integrate in the individual
* FFT bins
* @param nbits Bit depth of the sampled signal
* @param input_level Normalization level of the input signal
* @param output_level Normalization level of the output signal
* @param handler Output handler
*
*/
GatedStokesSpectrometer(const DadaBufferLayout &bufferLayout,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t fft_length,
std::size_t naccumulate, std::size_t nbits,
float input_level, float output_level,
HandlerType &handler);
~GatedStokesSpectrometer();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes &block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*