Commit bdaf6a0c authored by Ewan Barr's avatar Ewan Barr
Browse files

added ScaledTranspose and started channeliser

parent a872e811
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "thrust/device_vector.h"
#include "cufft.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
template <class HandlerType>
class Channeliser
{
public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef char2 PackedChannelisedVoltageType;
public:
Channeliser(
std::size_t buffer_bytes,
std::size_t fft_length,
std::size_t nbits,
float input_level,
float output_level,
HandlerType& handler);
~Channeliser();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes& block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer
*/
bool operator()(RawBytes& block);
private:
void process(thrust::device_vector<RawVoltageType> const& digitiser_raw,
thrust::device_vector<PackedChannelisedVoltageType>& packed_channelised);
private:
std::size_t _buffer_bytes;
std::size_t _fft_length;
std::size_t _nbits;
HandlerType& _handler;
cufftHandle _fft_plan;
int _nchans;
int _call_count;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator> _detector;
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<PackedChannelisedVoltageType> _packed_channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
DoublePinnedHostBuffer<PackedChannelisedVoltageType> _host_packed_channelised_voltage;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
};
} //edd
} //effelsberg
} //psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/detail/Channeliser.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_CHANNELISER_HPP
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_FftSpectrometer_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_FftSpectrometer_HPP
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_FFTSPECTROMETER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_FFTSPECTROMETER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
......@@ -85,4 +85,4 @@ private:
} //psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_FftSpectrometer_HPP
#endif //PSRDADA_CPP_EFFELSBERG_EDD_FFTSPECTROMETER_HPP
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_SCALEDTRANSPOSETFTOTFT_CUH
#define PSRDADA_CPP_EFFELSBERG_EDD_SCALEDTRANSPOSETFTOTFT_CUH
#include "psrdada_cpp/common.hpp"
#include <thrust/device_vector.h>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
namespace kernels {
__global__
void tf_to_tft_transpose(
float2 const* __restrict__ input,
char2* __restrict__ output,
const int nchans,
const int nsamps,
const int nsamps_per_packet,
const int nsamps_per_load,
const float scale,
const float offset);
} // namespace kernels
class ScaledTransposeTFtoTFT
{
public:
typedef thrust::device_vector<float2> InputType;
typedef thrust::device_vector<char2> OutputType;
public:
ScaledTransposeTFtoTFT(int nchans, int nsamps_per_packet, float scale, float offset, cudaStream_t stream);
~ScaledTransposeTFtoTFT();
ScaledTransposeTFtoTFT(ScaledTransposeTFtoTFT const&) = delete;
void transpose(InputType const& input, OutputType& output);
private:
int _nchans;
int _nsamps_per_packet;
float _scale;
float _offset;
cudaStream_t _stream;
};
} //namespace edd
} //namespace effelsberg
} //namespace psrdada_cpp
#endif // PSRDADA_CPP_EFFELSBERG_EDD_SCALEDTRANSPOSETFTOTFT_CUH
#include "psrdada_cpp/effelsberg/edd/Channeliser.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "thrust/functional.h"
#include "thrust/transform.h"
#include <cuda.h>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
template <class HandlerType>
Channeliser<HandlerType>::Channeliser(
std::size_t buffer_bytes,
std::size_t fft_length,
std::size_t nbits,
float input_level,
float output_level,
HandlerType& handler)
: _buffer_bytes(buffer_bytes)
, _fft_length(fft_length)
, _nbits(nbits)
, _handler(handler)
, _fft_plan(0)
, _call_count(0)
{
assert(((_nbits == 12) || (_nbits == 8)));
BOOST_LOG_TRIVIAL(debug)
<< "Creating new Channeliser instance with parameters: \n"
<< "fft_length = " << _fft_length << "\n"
<< "naccumulate = " << _naccumulate;
std::size_t nsamps_per_buffer = buffer_bytes * 8 / nbits;
assert(nsamps_per_buffer % _fft_length == 0 /*Number of samples is not multiple of FFT size*/);
std::size_t n64bit_words = buffer_bytes / sizeof(uint64_t);
_nchans = _fft_length / 2 + 1;
int batch = nsamps_per_buffer/_fft_length;
BOOST_LOG_TRIVIAL(debug) << "Calculating scales and offsets";
float scale = std::sqrt(_nchans) * input_level;
BOOST_LOG_TRIVIAL(debug) << "Correction factors for 8-bit conversion: offset = " << offset << ", scaling = " << scaling;
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
int n[] = {static_cast<int>(_fft_length)};
//Not we put this into transposed output order, so the inner dimension will be time.
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length,
NULL, 1, _nchans, CUFFT_R2C, batch));
cufftSetStream(_fft_plan, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
_raw_voltage_db.resize(n64bit_words);
BOOST_LOG_TRIVIAL(debug) << "Input voltages size (in 64-bit words): " << _raw_voltage_db.size();
_unpacked_voltage.resize(nsamps_per_buffer);
BOOST_LOG_TRIVIAL(debug) << "Unpacked voltages size (in samples): " << _unpacked_voltage.size();
_channelised_voltage.resize(_nchans * batch);
BOOST_LOG_TRIVIAL(debug) << "Channelised voltages size: " << _channelised_voltage.size();
_packed_channelised_voltage.resize(_channelised_voltage.size());
BOOST_LOG_TRIVIAL(debug) << "Packed channelised voltages size: " << _packed_channelised_voltage.size();
_host_packed_channelised_voltage.resize(_packed_channelised_voltage.size());
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));
_unpacker.reset(new Unpacker(_proc_stream));
}
template <class HandlerType>
Channeliser<HandlerType>::~Channeliser()
{
BOOST_LOG_TRIVIAL(debug) << "Destroying Channeliser";
if (!_fft_plan)
cufftDestroy(_fft_plan);
cudaStreamDestroy(_h2d_stream);
cudaStreamDestroy(_proc_stream);
cudaStreamDestroy(_d2h_stream);
}
template <class HandlerType>
void Channeliser<HandlerType>::init(RawBytes& block)
{
BOOST_LOG_TRIVIAL(debug) << "Channeliser init called";
_handler.init(block);
}
template <class HandlerType>
void Channeliser<HandlerType>::process(
thrust::device_vector<RawVoltageType> const& digitiser_raw,
thrust::device_vector<PackedChannelisedVoltageType>& packed_channelised))
{
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
switch (_nbits)
{
case 8: _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage); break;
case 12: _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage); break;
default: throw std::runtime_error("Unsupported number of bits");
}
BOOST_LOG_TRIVIAL(debug) << "Performing FFT";
UnpackedVoltageType* _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage.data());
ChannelisedVoltageType* _channelised_voltage_ptr = thrust::raw_pointer_cast(_channelised_voltage.data());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan,
(cufftReal*) _unpacked_voltage_ptr,
(cufftComplex*) _channelised_voltage_ptr));
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
}
template <class HandlerType>
bool Channeliser<HandlerType>::operator()(RawBytes& block)
{
++_call_count;
BOOST_LOG_TRIVIAL(debug) << "Channeliser operator() called (count = " << _call_count << ")";
assert(block.used_bytes() == _buffer_bytes /* Unexpected buffer size */);
CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
_raw_voltage_db.swap();
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void*>(_raw_voltage_db.a_ptr()),
static_cast<void*>(block.ptr()), block.used_bytes(),
cudaMemcpyHostToDevice, _h2d_stream));
if (_call_count == 1)
{
return false;
}
// Synchronize all streams
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
_power_db.swap();
process(_raw_voltage_db.b(), _power_db.a());
if (_call_count == 2)
{
return false;
}
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
_host_power_db.swap();
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void*>(_host_power_db.a_ptr()),
static_cast<void*>(_power_db.b_ptr()),
_power_db.size() * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost,
_d2h_stream));
if (_call_count == 3)
{
return false;
}
//Wrap _detected_host_previous in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char*>(_host_power_db.b_ptr()),
_host_power_db.size() * sizeof(IntegratedPowerType),
_host_power_db.size() * sizeof(IntegratedPowerType));
BOOST_LOG_TRIVIAL(debug) << "Calling handler";
// The handler can't do anything asynchronously without a copy here
// as it would be unsafe (given that it does not own the memory it
// is being passed).
return _handler(bytes);
}
} //edd
} //effelsberg
} //psrdada_cpp
......@@ -45,7 +45,7 @@ FftSpectrometer<HandlerType>::FftSpectrometer(
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
int n[] = {static_cast<int>(_fft_length)};
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length,
NULL, 1, _fft_length/2 + 1, CUFFT_R2C, batch));
NULL, 1, _nchans, CUFFT_R2C, batch));
cufftSetStream(_fft_plan, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
_raw_voltage_db.resize(n64bit_words);
......
#include "psrdada_cpp/effelsberg/edd/ScaledTransposeTFtoTFT.cuh"
#include "psrdada_cpp/cuda_utils.hpp"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
namespace kernels {
__global__
void tf_to_tft_transpose(
float2 const* __restrict__ input,
char2* __restrict__ output,
const int nchans,
const int nsamps,
const int nsamps_per_packet,
const int nsamps_per_load,
const float scale,
const float offset)
{
extern __shared__ char2* temp; //nbytes = sizeof(char2) * nsamps_per_load * nchans;
const int load_offset = nsamps_per_packet * blockIdx.x * nchans;
for (int sub_samp_load_idx = 0;
sub_samp_load_idx < nsamps_per_packet/nsamps_per_load;
++sub_samp_load_idx)
{
for (int samp_load_idx = threadIdx.y;
samp_load_idx < nsamps_per_load;
samp_load_idx += blockDim.y)
{
for (int chan_load_idx = threadIdx.x;
chan_load_idx < nchans;
chan_load_idx += blockDim.x)
{
int load_idx = (load_offset + (sub_samp_load_idx * nsamps_per_load
+ samp_load_idx) * nchans + chan_load_idx);
float2 val = input[load_idx];
char2 store_val;
store_val.x = (char)((val.x - offset)/scale);
store_val.y = (char)((val.y - offset)/scale);
temp[samp_load_idx * nsamps_per_load + chan_load_idx] = store_val;
}
}
__syncthreads();
int store_offset = load_offset + nsamps_per_load * sub_samp_load_idx;
for (int chan_store_idx = threadIdx.y;
chan_store_idx < nchans;
chan_store_idx += blockDim.y)
{
for (int samp_store_idx = threadIdx.x;
samp_store_idx < nsamps_per_load;
samp_store_idx += blockDix.x)
{
int store_idx = (load_offset + chan_store_idx * nsamps_per_packet
+ samps_per_load * sub_samp_load_idx + samp_store_idx);
output[store_idx] = temp[samp_store_idx * nsamps_per_load + chan_store_idx];
}
}
__syncthreads();
}
}
} //namespace kernels
ScaledTransposeTFtoTFT::ScaledTransposeTFtoTFT(
int nchans, int nsamps_per_packet,
float scale, float offset, cudaStream_t stream)
: _nchans(nchans)
, _nsamps_per_packet(nsamps_per_packet)
, _scale(scale)
, _offset(offset)
, _stream(stream)
{
}
ScaledTransposeTFtoTFT::~ScaledTransposeTFtoTFT()
{
}
void ScaledTransposeTFtoTFT::transpose(InputType const& input, OutputType& output)
{
const int max_threads = 1024;
const int dim_x = std::min(_nchans, max_threads);
const int dim_y = max_threads/dim_x;
//assert sizes
assert(input.size() % (_nchans * _nsamps_per_packet) == 0 /* Input is not a multiple of _nchans * _nsamps_per_packet*/);
output.resize(input.size());
const int nsamps_per_load = 16;
assert(_nsamps_per_packet % nsamps_per_load) == 0;
const int nsamps = input.size() / _nchans;
int shared_mem_bytes = sizeof(OutputType::value_type) * _nchans * nsamps_per_load;
int nblocks = nsamps / _nsamps_per_packet;
dim3 grid(nblocks);
dim3 block(dim_x, dim_y);
InputType::value_type const* input_ptr = thrust::raw_pointer_cast(input.data());
OutputType::value_type* output_ptr = thrust::raw_pointer_cast(output.data());
kernels::tf_to_tft_transpose<<<grid, block, shared_mem_bytes, _stream>>>(
input_ptr, output_ptr, _nchans, nsamps, _nsamps_per_packet, nsamps_per_load, _scale, _offset);
CUDA_ERROR_CHECK(cudaStreamSynchronize(_stream));
}
} //namespace edd
} //namespace effelsberg
} //namespace psrdada_cpp
......@@ -4,9 +4,10 @@ link_directories(${GTEST_LIBRARY_DIR})
set(
gtest_edd_src
src/UnpackerTester.cu
src/DetectorAccumulatorTester.cu
src/FftSpectrometerTester.cu
src/UnpackerTester.cu
src/ScaledTransposeTFtoTFT.cu
)
cuda_add_executable(gtest_edd ${gtest_edd_src} )
target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_SCALEDTRANSPOSETFTOTFTTESTER_CUH
#define PSRDADA_CPP_EFFELSBERG_EDD_SCALEDTRANSPOSETFTOTFTTESTER_CUH
#include "psrdada_cpp/effelsberg/edd/ScaledTransposeTFtoTFT.cuh"
#include "thrust/host_vector.h"
#include <gtest/gtest.h>
#include <vector>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
namespace test {
class ScaledTransposeTFtoTFTTester: public ::testing::Test
{
public:
typedef thrust::host_vector<float2> InputType;
typedef thrust::host_vector<char2> OutputType;
protected:
void SetUp() override;
void TearDown() override;
public:
ScaledTransposeTFtoTFTTester();
~ScaledTransposeTFtoTFTTester();
protected:
void transpose_c_reference(
InputType const& input,
OutputType& output,
const int nchans,
const int nsamps,
const int nsamps_per_packet,
const float scale,
const float offset);
void compare_against_host(
ScaledTransposeTFtoTFT::OutputType const& gpu_output,
OutputType const& host_output);
protected:
cudaStream_t _stream;
};
} //namespace test
} //namespace edd
} //namespace meerkat
} //namespace psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_SCALEDTRANSPOSETFTOTFTTESTER_CUH
#include "psrdada_cpp/effelsberg/edd/test/ScaledTransposeTFtoTFTTester.cuh"
#include "psrdada_cpp/cuda_utils.hpp"
#include <random>
#include <cmath>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
namespace test {
ScaledTransposeTFtoTFTTester::ScaledTransposeTFtoTFTTester()
: ::testing::Test()
, _stream(0)
{
}
ScaledTransposeTFtoTFTTester::~ScaledTransposeTFtoTFTTester()
{
}
void ScaledTransposeTFtoTFTTester::SetUp()
{
CUDA_ERROR_CHECK(cudaStreamCreate(&_stream));
}
void ScaledTransposeTFtoTFTTester::TearDown()
{
CUDA_ERROR_CHECK(cudaStreamDestroy(_stream));
}
void ScaledTransposeTFtoTFTTester::transpose_c_reference(
InputType const& input,
OutputType& output,
const int nchans,
const int nsamps,
const int nsamps_per_packet,
const float scale,
const float offset);
{
int nsamples = input.size() / nchans;
int outer_t_dim = nsamps / nsamps_per_packet;
output.size(input.size());
for (int outer_t_idx = 0; outer_t_idx < outer_t_dim; ++outer_t_idx)
{
for (int chan_idx = 0; chan_idx < nchans; ++chan_idx)
{
for (int inner_t_idx = 0; inner_t_idx < nsamps_per_packet; ++inner_t_idx)
{
int load_idx = outer_t_idx * nchans * nsamps_per_packet + inner_t_idx * nchans + chan_idx;
float2 val = input[load_idx];
char2 out_val;
out_val.x = (char)((val.x - offset)/scale);
out_val.y = (char)((val.y - offset)/scale);
int store_idx = outer_t_idx * nchans * nsamps_per_packet + chan_idx * nsamps_per_packet + inner_t_idx;
output[store_idx] = out_val;
}
}
}
}
void ScaledTransposeTFtoTFTTester::compare_against_host(
ScaledTransposeTFtoTFT::OutputType const& gpu_output,
OutputType const& host_output)
{
OutputType copy_from_gpu = gpu_output;
ASSERT_EQ(host_output.size(), copy_from_gpu.size());
for (std::size_t ii = 0; ii < host_output.size(); ++ii)
{
ASSERT_EQ(host_output[ii], copy_from_gpu[ii]);
}
}
TEST_F(ScaledTransposeTFtoTFTTester, counter_test)
{
int nchans = 16;
int nsamps_per_packet = 8192/nchans;
float stdev = 64.0f;
float scale = 4.0f;
int nsamps = nsamps_per_packet * 1024
int n = nchans * nsamps;
std::default_random_engine generator;
std::normal_distribution<float> distribution(0.0, stdev);
InputType host_input(n);
for (int ii = 0; ii < n; ++ii)
{
host_input[ii].x = distribution(generator);
host_input[ii].y = distribution(generator);