Commit 7255049f authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Deleted obsolete Channeliser and removed whitespace

parent 4ffa6a44
......@@ -9,10 +9,10 @@ namespace psrdada_cpp
class NullSink
{
public:
NullSink() {};
~NullSink() {};
void init(RawBytes&) {};
bool operator()(RawBytes&) {};
NullSink();
~NullSink();
void init(RawBytes&);
bool operator()(RawBytes&);
};
} //namespace psrdada_cpp
#endif //PSRDADA_CPP_DADA_NULL_SINK_HPP
......@@ -9,7 +9,6 @@ set(PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES
${DEPENDENCY_LIBRARIES})
set(psrdada_cpp_effelsberg_edd_src
src/Channeliser.cu
src/DadaBufferLayout.cpp
src/DetectorAccumulator.cu
src/EDDPolnMerge.cpp
......@@ -23,7 +22,6 @@ set(psrdada_cpp_effelsberg_edd_src
set(psrdada_cpp_effelsberg_edd_inc
Channeliser.cuh
DadaBufferLayout.hpp
DetectorAccumulator.cuh
EDDPolnMerge.hpp
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/effelsberg/edd/ScaledTransposeTFtoTFT.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/dada_write_client.hpp"
#include "thrust/device_vector.h"
#include "cufft.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class Channeliser
{
public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef char2 PackedChannelisedVoltageType;
public:
Channeliser(
std::size_t buffer_bytes,
std::size_t fft_length,
std::size_t nbits,
float input_level,
float output_level,
DadaWriteClient& client);
~Channeliser();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
void process(thrust::device_vector<RawVoltageType> const& digitiser_raw,
thrust::device_vector<PackedChannelisedVoltageType>& packed_channelised);
private:
std::size_t _buffer_bytes;
std::size_t _fft_length;
std::size_t _nbits;
DadaWriteClient& _client;
cufftHandle _fft_plan;
int _nchans;
int _call_count;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<ScaledTransposeTFtoTFT> _transposer;
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<PackedChannelisedVoltageType> _packed_channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
};
} //edd
} //effelsberg
} //psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
......@@ -27,73 +27,81 @@ namespace edd {
typedef unsigned long long int uint64_cu;
static_assert(sizeof(uint64_cu) == sizeof(uint64_t), "Long long int not of 64 bit! This is problematic for CUDA!");
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
// Input data and intermediate processing data for one polarization
struct PolarizationData
/// Input data and intermediate processing data for one polarization
struct PolarizationData
{
/// Raw ADC Voltage
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
/// Side channel data
DoubleDeviceBuffer<uint64_t> _sideChannelData;
/// Baseline in gate 0 state
thrust::device_vector<UnpackedVoltageType> _baseLineG0;
/// Baseline in gate 1 state
thrust::device_vector<UnpackedVoltageType> _baseLineG1;
/// Channelized voltage in gate 0 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
/// Channelized voltage in gate 1 state
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;
/// Swaps input buffers
void swap()
{
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
DoubleDeviceBuffer<uint64_t> _sideChannelData;
_raw_voltage.swap();
_sideChannelData.swap();
}
};
thrust::device_vector<UnpackedVoltageType> _baseLineG0;
thrust::device_vector<UnpackedVoltageType> _baseLineG1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;
void swap()
{
_raw_voltage.swap();
_sideChannelData.swap();
}
};
// Output data for one gate
struct StokesOutput
{
/// Stokes parameters
DoubleDeviceBuffer<IntegratedPowerType> I;
DoubleDeviceBuffer<IntegratedPowerType> Q;
DoubleDeviceBuffer<IntegratedPowerType> U;
DoubleDeviceBuffer<IntegratedPowerType> V;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
// Outptu data for one gate
class StokesOutput
/// Reset outptu for new integration
void reset(cudaStream_t &_proc_stream)
{
public:
DoubleDeviceBuffer<IntegratedPowerType> I;
DoubleDeviceBuffer<IntegratedPowerType> Q;
DoubleDeviceBuffer<IntegratedPowerType> U;
DoubleDeviceBuffer<IntegratedPowerType> V;
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
void reset(cudaStream_t &_proc_stream)
{
thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
void swap()
{
I.swap();
Q.swap();
U.swap();
V.swap();
_noOfBitSets.swap();
}
void resize(size_t size, size_t blocks)
{
I.resize(size * blocks);
Q.resize(size * blocks);
U.resize(size * blocks);
V.resize(size * blocks);
_noOfBitSets.resize(blocks);
}
};
thrust::fill(thrust::cuda::par.on(_proc_stream),I.a().begin(), I.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),Q.a().begin(), Q.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),U.a().begin(), U.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream),V.a().begin(), V.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
/// Swap output buffers
void swap()
{
I.swap();
Q.swap();
U.swap();
V.swap();
_noOfBitSets.swap();
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
{
I.resize(size * blocks);
Q.resize(size * blocks);
U.resize(size * blocks);
V.resize(size * blocks);
_noOfBitSets.resize(blocks);
}
};
......@@ -291,23 +299,6 @@ __global__ void stokes_accumulate(float2 const __restrict__ *pol1,
}
/**
* @brief calculate stokes IQUV from two complex valuies for each polarization
*/
__device__ stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V)
{
// I = fabs(p1.x*p1.x + p1.y * p1.y) + fabs(p2.x*p2.x + p2.y * p2.y);
// Q = fabs(p1.x*p1.x + p1.y * p1.y) - fabs(p2.x*p2.x + p2.y * p2.y);
// U = p1.x*p2.x + p1.y * p2.y;
// V = p1.y*p2.x - p1.x * p2.y;
}
// __global__ void stokes_accumulate(float2 const __restrict__ *pol1, float2 const __restrict__ *pol2, float ... output int nchans, int naccumulate)
} // edd
......
......@@ -52,7 +52,7 @@ FftSpectrometer<HandlerType>::FftSpectrometer(
_raw_voltage_db.resize(n64bit_words);
BOOST_LOG_TRIVIAL(debug) << "Input voltages size (in 64-bit words): " << _raw_voltage_db.size();
//_unpacked_voltage.resize(nsamps_per_buffer);
_unpacked_voltage.resize(_nchans * batch * 2);
_unpacked_voltage.resize(nsamps_per_buffer);
//BOOST_LOG_TRIVIAL(debug) << "Unpacked voltages size (in samples): " << _unpacked_voltage.size();
//_channelised_voltage.resize(_nchans * batch);
//BOOST_LOG_TRIVIAL(debug) << "Channelised voltages size: " << _channelised_voltage.size();
......@@ -161,19 +161,19 @@ bool FftSpectrometer<HandlerType>::operator()(RawBytes& block)
_power_db.size() * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost,
_d2h_stream));
if (_call_count == 3)
{
return false;
}
}
//Wrap _detected_host_previous in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char*>(_host_power_db.b_ptr()),
_host_power_db.size() * sizeof(IntegratedPowerType),
_host_power_db.size() * sizeof(IntegratedPowerType));
BOOST_LOG_TRIVIAL(debug) << "Calling handler";
// The handler can't do anything asynchronously without a copy here
// as it would be unsafe (given that it does not own the memory it
// The handler can't do anything asynchronously without a copy here
// as it would be unsafe (given that it does not own the memory it
// is being passed).
return _handler(bytes);
}
......
......@@ -86,26 +86,30 @@ __global__ void gating(float* __restrict__ G0,
// Reduce G0, G1
sum_reduce<uint32_t>(x, _G0stats);
if(threadIdx.x == 0)
if(threadIdx.x == 0) {
atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]);
}
__syncthreads();
sum_reduce<uint32_t>(x, _G1stats);
if(threadIdx.x == 0)
if(threadIdx.x == 0) {
atomicAdd(stats_G1, (uint64_cu) x[threadIdx.x]);
}
__syncthreads();
//reuse shared array
float *y = (float*) x;
//update the baseline array
sum_reduce<float>(y, baselineUpdateG0);
if(threadIdx.x == 0)
if(threadIdx.x == 0) {
atomicAdd(baseLineNG0, y[threadIdx.x]);
}
__syncthreads();
sum_reduce<float>(y, baselineUpdateG1);
if(threadIdx.x == 0)
if(threadIdx.x == 0) {
atomicAdd(baseLineNG1, y[threadIdx.x]);
}
__syncthreads();
}
......@@ -289,8 +293,9 @@ void GatedSpectrometer<HandlerType>::gated_fft(
uint64_t NG0 = 0;
uint64_t NG1 = 0;
// Loop over outputblocks, for case of multiple output blocks per input block
// Loop over outputblocks, for case of multiple output blocks per input block
int step = data._sideChannelData.b().size() / _noOfBitSetsIn_G0.size();
for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
{ // ToDo: Should be in one kernel call
gating<<<1024, 1024, 0, _proc_stream>>>(
......
#include "psrdada_cpp/effelsberg/edd/Channeliser.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "thrust/functional.h"
#include "thrust/transform.h"
#include <cuda.h>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
Channeliser::Channeliser(
std::size_t buffer_bytes,
std::size_t fft_length,
std::size_t nbits,
float input_level,
float output_level,
DadaWriteClient& client)
: _buffer_bytes(buffer_bytes)
, _fft_length(fft_length)
, _nbits(nbits)
, _client(client)
, _fft_plan(0)
, _call_count(0)
{
assert(((_nbits == 12) || (_nbits == 8)));
BOOST_LOG_TRIVIAL(debug)
<< "Creating new Channeliser instance with parameters: \n"
<< "fft_length = " << _fft_length << "\n"
<< "nbits = " << _nbits;
std::size_t nsamps_per_buffer = buffer_bytes * 8 / nbits;
assert(nsamps_per_buffer % _fft_length == 0 /*Number of samples is not multiple of FFT size*/);
std::size_t n64bit_words = buffer_bytes / sizeof(uint64_t);
_nchans = _fft_length / 2 + 1;
int batch = nsamps_per_buffer/_fft_length;
std::size_t packed_channelised_voltage_bytes = _nchans * batch * sizeof(PackedChannelisedVoltageType);
BOOST_LOG_TRIVIAL(debug) << "Output buffer bytes: " << packed_channelised_voltage_bytes;
assert(_client.data_buffer_size() == packed_channelised_voltage_bytes /* Incorrect output DADA buffer size */);
BOOST_LOG_TRIVIAL(debug) << "Calculating scales and offsets";
float scale = std::sqrt(_nchans) * input_level;
BOOST_LOG_TRIVIAL(debug) << "Correction factors for 8-bit conversion: scaling = " << scale;
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
int n[] = {static_cast<int>(_fft_length)};
//Not we put this into transposed output order, so the inner dimension will be time.
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length,
NULL, 1, _nchans, CUFFT_R2C, batch));
cufftSetStream(_fft_plan, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
_raw_voltage_db.resize(n64bit_words);
BOOST_LOG_TRIVIAL(debug) << "Input voltages size (in 64-bit words): " << _raw_voltage_db.size();
_unpacked_voltage.resize(nsamps_per_buffer);
BOOST_LOG_TRIVIAL(debug) << "Unpacked voltages size (in samples): " << _unpacked_voltage.size();
_channelised_voltage.resize(_nchans * batch);
BOOST_LOG_TRIVIAL(debug) << "Channelised voltages size: " << _channelised_voltage.size();
_packed_channelised_voltage.resize(_channelised_voltage.size());
BOOST_LOG_TRIVIAL(debug) << "Packed channelised voltages size: " << _packed_channelised_voltage.size();
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));
_unpacker.reset(new Unpacker(_proc_stream));
_transposer.reset(new ScaledTransposeTFtoTFT(_nchans, 8192, scale, 0.0, _proc_stream));
}
Channeliser::~Channeliser()
{
BOOST_LOG_TRIVIAL(debug) << "Destroying Channeliser";
if (!_fft_plan)
cufftDestroy(_fft_plan);
cudaStreamDestroy(_h2d_stream);
cudaStreamDestroy(_proc_stream);
cudaStreamDestroy(_d2h_stream);
}
void Channeliser::init(RawBytes& block)
{
BOOST_LOG_TRIVIAL(debug) << "Channeliser init called";
auto& header_block = _client.header_stream().next();
/* Populate new header */
std::memcpy(header_block.ptr(), block.ptr(), block.total_bytes());
header_block.used_bytes(header_block.total_bytes());
_client.header_stream().release();
}
void Channeliser::process(
thrust::device_vector<RawVoltageType> const& digitiser_raw,
thrust::device_vector<PackedChannelisedVoltageType>& packed_channelised)
{
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
switch (_nbits)
{
case 8: _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage); break;
case 12: _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage); break;
default: throw std::runtime_error("Unsupported number of bits");
}
BOOST_LOG_TRIVIAL(debug) << "Performing FFT";
UnpackedVoltageType* _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage.data());
ChannelisedVoltageType* _channelised_voltage_ptr = thrust::raw_pointer_cast(_channelised_voltage.data());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan,
(cufftReal*) _unpacked_voltage_ptr,
(cufftComplex*) _channelised_voltage_ptr));
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
_transposer->transpose(_channelised_voltage, packed_channelised);
}
bool Channeliser::operator()(RawBytes& block)
{
++_call_count;
BOOST_LOG_TRIVIAL(debug) << "Channeliser operator() called (count = " << _call_count << ")";
assert(block.used_bytes() == _buffer_bytes /* Unexpected buffer size */);
CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
_raw_voltage_db.swap();
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void*>(_raw_voltage_db.a_ptr()),
static_cast<void*>(block.ptr()), block.used_bytes(),
cudaMemcpyHostToDevice, _h2d_stream));
if (_call_count == 1)
{
return false;
}
// Synchronize all streams
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
_packed_channelised_voltage.swap();
process(_raw_voltage_db.b(), _packed_channelised_voltage.a());
if (_call_count == 2)
{
return false;
}
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
if (_call_count > 3)
{
_client.data_stream().release();
}
auto& data_block = _client.data_stream().next();
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void*>(data_block.ptr()),
static_cast<void*>(_packed_channelised_voltage.b_ptr()),
_packed_channelised_voltage.size() * sizeof(PackedChannelisedVoltageType),
cudaMemcpyDeviceToHost,
_d2h_stream));
data_block.used_bytes(data_block.total_bytes());
return false;
}
} //edd
} //effelsberg
} //psrdada_cpp
......@@ -4,7 +4,6 @@ link_directories(${GTEST_LIBRARY_DIR})
set(gtest_edd_src
gtest_edd_src.cu
src/ChanneliserTester.cu
src/DetectorAccumulatorTester.cu
src/FftSpectrometerTester.cu
src/GatedSpectrometerTest.cu
......
#include "psrdada_cpp/effelsberg/edd/test/ChanneliserTester.cuh"
#include "psrdada_cpp/effelsberg/edd/Channeliser.cuh"
#include "psrdada_cpp/dada_null_sink.hpp"
#include "psrdada_cpp/dada_db.hpp"
#include "psrdada_cpp/dada_write_client.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include <vector>
#include <thread>
#include <chrono>
#include <exception>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
namespace test {
ChanneliserTester::ChanneliserTester()
: ::testing::Test()
{
}
ChanneliserTester::~ChanneliserTester()
{
}
void ChanneliserTester::SetUp()
{
}
void ChanneliserTester::TearDown()
{
}
void ChanneliserTester::performance_test(std::size_t fft_length, std::size_t nbits)
{
std::size_t input_block_bytes = fft_length * 8192 * 1024 * nbits / 8;
std::size_t output_block_bytes = (fft_length/2 + 1) * 8192 * 1024 * sizeof(Channeliser::PackedChannelisedVoltageType);
DadaDB output_buffer(8, output_block_bytes);
output_buffer.create();
MultiLog log("test_log");
NullSink null_sink;
DadaInputStream<NullSink> consumer(output_buffer.key(), log, null_sink);
std::thread consumer_thread( [&](){
try {
consumer.start();
} catch (std::exception& e) {
BOOST_LOG_TRIVIAL(error) << e.what();
}
});
DadaWriteClient client(output_buffer.key(), log);
DoublePinnedHostBuffer<char> input_block;
input_block.resize(input_block_bytes);
RawBytes input_raw_bytes(input_block.a_ptr(), input_block_bytes, input_block_bytes);
std::vector<char> header_block(4096);
RawBytes header_raw_bytes(header_block.data(), 4096, 4096);
Channeliser channeliser(input_block_bytes, fft_length, nbits, 16.0f, 16.0f, client);
channeliser.init(header_raw_bytes);
for (int ii = 0; ii < 100; ++ii)
{
channeliser(input_raw_bytes);
}
consumer.stop();
std::this_thread::sleep_for(std::chrono::seconds(1));