Commit 10e8ffb7 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Added GatedSpectrometer implementation

parent 7f44fd00
......@@ -18,5 +18,10 @@ cuda_add_executable(fft_spectrometer src/fft_spectrometer_cli.cu)
target_link_libraries(fft_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
install(TARGETS fft_spectrometer DESTINATION bin)
#Gated FFT spectrometer interface
cuda_add_executable(gated_spectrometer src/GatedSpectrometer_cli.cu)
target_link_libraries(gated_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
install(TARGETS gated_spectrometer DESTINATION bin)
add_subdirectory(test)
endif(ENABLE_CUDA)
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh"
#include "thrust/device_vector.h"
#include "cufft.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
#define BIT_MASK(bit) (1L << (bit))
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0)
/**
@class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template <class HandlerType> class GatedSpectrometer {
public:
typedef uint64_t RawVoltageType;
typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef int8_t IntegratedPowerType;
public:
/**
* @brief Constructor
*
* @param buffer_bytes A RawBytes object wrapping a DADA header buffer
* @param nSideChannels Number of side channel items in the data stream,
* @param selectedSideChannel Side channel item used for gating
* @param selectedBit bit of side channel item used for gating
*/
GatedSpectrometer(std::size_t buffer_bytes, std::size_t nSideChannels,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t speadHeapSize, std::size_t fft_length,
std::size_t naccumulate, std::size_t nbits,
float input_level, float output_level,
HandlerType &handler);
~GatedSpectrometer();
/**
* @brief A callback to be called on connection
* to a ring buffer.
*
* @detail The first available header block in the
* in the ring buffer is provided as an argument.
* It is here that header parameters could be read
* if desired.
*
* @param block A RawBytes object wrapping a DADA header buffer
*/
void init(RawBytes &block);
/**
* @brief A callback to be called on acqusition of a new
* data block.
*
* @param block A RawBytes object wrapping a DADA data buffer output
* are the integrated specttra with/without bit set.
*/
bool operator()(RawBytes &block);
private:
void process(thrust::device_vector<RawVoltageType> const &digitiser_raw,
thrust::device_vector<RawVoltageType> const &sideChannelData,
thrust::device_vector<IntegratedPowerType> &detected_G0,
thrust::device_vector<IntegratedPowerType> &detected_G1);
private:
std::size_t _buffer_bytes;
std::size_t _fft_length;
std::size_t _naccumulate;
std::size_t _nbits;
std::size_t _nSideChannels;
std::size_t _selectedSideChannel;
std::size_t _selectedBit;
std::size_t _speadHeapSize;
std::size_t _sideChannelSize;
std::size_t _totalHeapSize;
std::size_t _nHeaps;
std::size_t _gapSize;
std::size_t _dataBlockBytes;
HandlerType &_handler;
cufftHandle _fft_plan;
int _nchans;
int _call_count;
std::unique_ptr<Unpacker> _unpacker;
std::unique_ptr<DetectorAccumulator> _detector;
DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db;
DoubleDeviceBuffer<IntegratedPowerType> _power_db_G0;
DoubleDeviceBuffer<IntegratedPowerType> _power_db_G1;
DoubleDeviceBuffer<RawVoltageType> _sideChannelData_db;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G0;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage_G1;
DoublePinnedHostBuffer<IntegratedPowerType> _host_power_db;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
cudaStream_t _d2h_stream;
};
/// Route the data in G0 to G1 if corresponding sideChannelData bit at bitpos is
/// set to 1.
/// The data in the other stream is set to 0.
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, int64_t bitpos,
int64_t noOfSideChannels, int64_t selectedSideChannel);
} // edd
} // effelsberg
} // psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu"
#endif //PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include <cuda.h>
#include <iostream>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, int64_t bitpos,
int64_t noOfSideChannels, int64_t selectedSideChannel) {
for (int i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
const float w = G0[i];
const int64_t sideChannelItem =
sideChannelData[((i / heapSize) * (noOfSideChannels)) +
selectedSideChannel]; // Probably not optimal access as
// same data is copied for several
// threads, but maybe efficiently
// handled by cache?
const int bit_set = TEST_BIT(sideChannelItem, bitpos);
G1[i] = w * bit_set;
G0[i] = w * (!bit_set);
}
}
template <class HandlerType>
GatedSpectrometer<HandlerType>::GatedSpectrometer(
std::size_t buffer_bytes, std::size_t nSideChannels,
std::size_t selectedSideChannel, std::size_t selectedBit,
std::size_t speadHeapSize, std::size_t fft_length, std::size_t naccumulate,
std::size_t nbits, float input_level, float output_level,
HandlerType &handler)
: _buffer_bytes(buffer_bytes), _nSideChannels(nSideChannels),
_selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
_speadHeapSize(speadHeapSize), _fft_length(fft_length),
_naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0),
_call_count(0) {
assert(((_nbits == 12) || (_nbits == 8)));
assert(_naccumulate > 0); // Sanity check
BOOST_LOG_TRIVIAL(debug)
<< "Creating new GatedSpectrometer instance with parameters: \n"
<< " fft_length = " << _fft_length << "\n"
<< " naccumulate = " << _naccumulate << "\n"
<< " nSideChannels = " << _nSideChannels << "\n"
<< " speadHeapSize = " << _speadHeapSize << " byte\n"
<< " selectedSideChannel = " << _selectedSideChannel
<< " selectedBit = " << _selectedBit;
_sideChannelSize = nSideChannels * sizeof(int64_t);
_totalHeapSize = _speadHeapSize + _sideChannelSize;
_nHeaps = buffer_bytes / _totalHeapSize;
_gapSize = (buffer_bytes - _nHeaps * _totalHeapSize);
_dataBlockBytes = _nHeaps * _speadHeapSize;
assert((nSideChannels == 0) ||
(selectedSideChannel <
nSideChannels)); // Sanity check of side channel value
assert(selectedBit < 64); // Sanity check of selected bit
BOOST_LOG_TRIVIAL(debug) << "Resulting memory configuration: \n"
<< " totalSizeOfHeap: " << _totalHeapSize
<< " byte\n"
<< " number of heaps per buffer: " << _nHeaps
<< "\n"
<< " resulting gap: " << _gapSize << " byte\n"
<< " datablock size in buffer: " << _dataBlockBytes
<< " byte\n";
std::size_t nsamps_per_buffer = _dataBlockBytes * 8 / nbits;
assert(nsamps_per_buffer % _fft_length ==
0 /*Number of samples is not multiple of FFT size*/);
std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t);
_nchans = _fft_length / 2 + 1;
int batch = nsamps_per_buffer / _fft_length;
float dof = 2 * _naccumulate;
float scale =
std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
float offset = scale * dof;
float scaling = scale * std::sqrt(2 * dof) / output_level;
BOOST_LOG_TRIVIAL(debug)
<< "Correction factors for 8-bit conversion: offset = " << offset
<< ", scaling = " << scaling;
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan";
int n[] = {static_cast<int>(_fft_length)};
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
1, _nchans, CUFFT_R2C, batch));
cufftSetStream(_fft_plan, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
_raw_voltage_db.resize(n64bit_words);
_sideChannelData_db.resize(_sideChannelSize * _nHeaps);
BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): "
<< _raw_voltage_db.size();
_unpacked_voltage_G0.resize(nsamps_per_buffer);
_unpacked_voltage_G1.resize(nsamps_per_buffer);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): "
<< _unpacked_voltage_G0.size();
_channelised_voltage_G0.resize(_nchans * batch);
_channelised_voltage_G1.resize(_nchans * batch);
BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: "
<< _channelised_voltage_G0.size();
_power_db_G0.resize(_nchans * batch / _naccumulate);
_power_db_G1.resize(_nchans * batch / _naccumulate);
BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db_G0.size() << ", "
<< _power_db_G1.size();
// on the host both power are stored in the same data buffer
_host_power_db.resize( _power_db_G0.size() + _power_db_G1 .size());
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));
_unpacker.reset(new Unpacker(_proc_stream));
_detector.reset(new DetectorAccumulator(_nchans, _naccumulate, scaling,
offset, _proc_stream));
} // constructor
template <class HandlerType>
GatedSpectrometer<HandlerType>::~GatedSpectrometer() {
BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
if (!_fft_plan)
cufftDestroy(_fft_plan);
cudaStreamDestroy(_h2d_stream);
cudaStreamDestroy(_proc_stream);
cudaStreamDestroy(_d2h_stream);
}
template <class HandlerType>
void GatedSpectrometer<HandlerType>::init(RawBytes &block) {
BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called";
_handler.init(block);
}
template <class HandlerType>
void GatedSpectrometer<HandlerType>::process(
thrust::device_vector<RawVoltageType> const &digitiser_raw,
thrust::device_vector<RawVoltageType> const &sideChannelData,
thrust::device_vector<IntegratedPowerType> &detected_G0,
thrust::device_vector<IntegratedPowerType> &detected_G1) {
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
switch (_nbits) {
case 8:
_unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0);
break;
case 12:
_unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0);
break;
default:
throw std::runtime_error("Unsupported number of bits");
}
BOOST_LOG_TRIVIAL(debug) << "Perfore gating";
const int64_t *sideCD =
(int64_t *)(thrust::raw_pointer_cast(sideChannelData.data()));
gating<<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage_G0.data()),
thrust::raw_pointer_cast(_unpacked_voltage_G1.data()), sideCD,
_unpacked_voltage_G0.size(), _speadHeapSize, _selectedBit, _nSideChannels,
_selectedSideChannel);
// CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
UnpackedVoltageType *_unpacked_voltage_ptr =
thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
ChannelisedVoltageType *_channelised_voltage_ptr =
thrust::raw_pointer_cast(_channelised_voltage_G0.data());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
(cufftComplex *)_channelised_voltage_ptr));
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
_channelised_voltage_ptr =
thrust::raw_pointer_cast(_channelised_voltage_G1.data());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
(cufftComplex *)_channelised_voltage_ptr));
// CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
_detector->detect(_channelised_voltage_G0, detected_G0);
_detector->detect(_channelised_voltage_G1, detected_G1);
} // process
template <class HandlerType>
bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) {
++_call_count;
BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = "
<< _call_count << ")";
if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */
BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got "
<< block.used_bytes() << " byte, expected "
<< _buffer_bytes << " byte)";
exit(-1);
}
// CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
_raw_voltage_db.swap();
_sideChannelData_db.swap();
BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes()
<< ", dataBlockBytes = " << _dataBlockBytes << "\n";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
static_cast<void *>(block.ptr()),
_dataBlockBytes, cudaMemcpyHostToDevice,
_h2d_stream));
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void *>(_sideChannelData_db.a_ptr()),
static_cast<void *>(block.ptr() + _dataBlockBytes + _gapSize),
_sideChannelSize * _nHeaps, cudaMemcpyHostToDevice, _h2d_stream));
if (_call_count == 1) {
return false;
}
// Synchronize all streams
//CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
_power_db_G0.swap();
_power_db_G1.swap();
process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db_G0.a(),
_power_db_G1.a());
if (_call_count == 2) {
return false;
}
//CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
_host_power_db.swap();
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr()),
static_cast<void *>(_power_db_G0.b_ptr()),
_power_db_G0.size() * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void *>(_host_power_db.a_ptr() +
(_power_db_G0.size() * sizeof(IntegratedPowerType))),
static_cast<void *>(_power_db_G1.b_ptr()),
_power_db_G1.size() * sizeof(IntegratedPowerType), cudaMemcpyDeviceToHost,
_d2h_stream));
if (_call_count == 3) {
return false;
}
// Wrap _detected_host_previous in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()),
_host_power_db.size() * sizeof(IntegratedPowerType),
_host_power_db.size() * sizeof(IntegratedPowerType));
BOOST_LOG_TRIVIAL(debug) << "Calling handler";
// The handler can't do anything asynchronously without a copy here
// as it would be unsafe (given that it does not own the memory it
// is being passed).
return _handler(bytes);
} // operator ()
} // edd
} // effelsberg
} // psrdada_cpp
#include "boost/program_options.hpp"
#include "psrdada_cpp/cli_utils.hpp"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/dada_client_base.hpp"
#include "psrdada_cpp/dada_input_stream.hpp"
#include "psrdada_cpp/dada_null_sink.hpp"
#include "psrdada_cpp/dada_output_stream.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/simple_file_writer.hpp"
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include <ctime>
#include <iostream>
#include <time.h>
using namespace psrdada_cpp;
namespace {
const size_t ERROR_IN_COMMAND_LINE = 1;
const size_t SUCCESS = 0;
const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char **argv) {
try {
key_t input_key;
int fft_length;
int naccumulate;
int nbits;
size_t nSideChannels;
size_t selectedSideChannel;
size_t selectedBit;
size_t speadHeapSize;
float input_level;
float output_level;
std::time_t now = std::time(NULL);
std::tm *ptm = std::localtime(&now);
char buffer[32];
std::strftime(buffer, 32, "%Y-%m-%d-%H:%M:%S.bp", ptm);
std::string filename(buffer);
/** Define and parse the program options
*/
namespace po = boost::program_options;
po::options_description desc("Options");
desc.add_options()("help,h", "Print help messages");
desc.add_options()(
"input_key,i",
po::value<std::string>()->default_value("dada")->notifier(
[&input_key](std::string in) { input_key = string_to_key(in); }),
"The shared memory key for the dada buffer to connect to (hex "
"string)");
desc.add_options()("fft_length,n", po::value<int>(&fft_length)->required(),
"The length of the FFT to perform on the data");
desc.add_options()("naccumulate,a",
po::value<int>(&naccumulate)->required(),
"The number of samples to integrate in each channel");
desc.add_options()("nsidechannelitems,n",
po::value<size_t>()->default_value(0)->notifier(
[&nSideChannels](size_t in) { nSideChannels = in; }),
"Number of side channel items");
desc.add_options()(
"selected_sidechannel",
po::value<size_t>()->default_value(0)->notifier(
[&selectedSideChannel](size_t in) { selectedSideChannel = in; }),
"Side channel selected for evaluation.");
desc.add_options()("selected_bit",
po::value<size_t>()->default_value(63)->notifier(
[&selectedBit](size_t in) { selectedBit = in; }),
"Side channel selected for evaluation.");
desc.add_options()("speadheap_size,s",
po::value<size_t>()->default_value(4096)->notifier(
[&speadHeapSize](size_t in) { speadHeapSize = in; }),
"size of the spead data heaps. The number of the "
"heaps in the dada block depends on the number of "
"side channel items.");
desc.add_options()("nbits,b", po::value<int>(&nbits)->required(),
"The number of bits per sample in the "
"packetiser output (8 or 12)");
desc.add_options()("input_level",
po::value<float>(&input_level)->required(),
"The input power level (standard "
"deviation, used for 8-bit conversion)");
desc.add_options()("output_level",
po::value<float>(&output_level)->required(),
"The output power level (standard "
"deviation, used for 8-bit "
"conversion)");
desc.add_options()(
"outfile,o", po::value<std::string>(&filename)->default_value(filename),
"The output file to write spectra "
"to");
desc.add_options()(
"log_level", po::value<std::string>()->default_value("info")->notifier(
[](std::string level) { set_log_level(level); }),
"The logging level to use "
"(debug, info, warning, "
"error)");
po::variables_map vm;
try {
po::store(po::parse_command_line(argc, argv, desc), vm);
if (vm.count("help")) {
std::cout << "GatedSpectrometer -- Read EDD data from a DADA buffer "
"and split the data into two streams depending on a bit "
"set in the side channel data. On each stream a simple "
"FFT spectrometer is performed."
<< std::endl
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
} catch (po::error &e) {
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return ERROR_IN_COMMAND_LINE;
}
/**
* All the application code goes here
*/
MultiLog log("edd::GatedSpectrometer");
DadaClientBase client(input_key, log);
std::size_t buffer_bytes = client.data_buffer_size();
SimpleFileWriter sink(filename);
effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer(
buffer_bytes, nSideChannels, selectedSideChannel, selectedBit,
speadHeapSize, fft_length, naccumulate, nbits, input_level,
output_level, sink);
DadaInputStream<decltype(spectrometer)> istream(input_key, log,
spectrometer);
istream.start();
/**
* End of application code
*/
} catch (std::exception &e) {
std::cerr << "Unhandled Exception reached the top of main: " << e.what()
<< ", application will now exit" << std::endl;
return ERROR_UNHANDLED_EXCEPTION;
}
return SUCCESS;
}
......@@ -9,6 +9,7 @@ set(
src/FftSpectrometerTester.cu
src/UnpackerTester.cu
src/ScaledTransposeTFtoTFTTester.cu
src/GatedSpectrometerTest.cu
)
cuda_add_executable(gtest_edd ${gtest_edd_src} )
target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
......
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/dada_null_sink.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "gtest/gtest.h"
#include "thrust/device_vector.h"
#include "thrust/extrema.h"