From 2383cb2634dfcc66f7626b0bd7865e7c5a0ecfee Mon Sep 17 00:00:00 2001
From: ewanbarr <ewan.d.barr@googlemail.com>
Date: Wed, 19 Feb 2020 08:24:02 +0100
Subject: [PATCH] Rfi chamber (#7)

* moved rfi chamber development from ewanbarr/psrdada_cpp to MPIfR-BDG/psrdada_cpp

* added global OpenMP linker flag (temporary fix)

* RS sepctrometer running with copy overlap

* added code for writing output spectrum to disk

* added code for writing output spectrum to disk

* file writer running

* added CLI for rsspectrometer

* added FFT shift on output write

* fixed missing commas

* added FFT shifting on write

* added test for dc power

* typo resulting in excess spectra being accumulated

* added missing sychrnonise on proc stream

* fixed possible bug with inplace FFT when using advanced data layout

* typo fix

* added automatic read back of output and check for DC bin power in test

* automatic DC power test runs and passing

* generalised the spectrometer test

* fixed error where first channel block was not being processed

* changed logging level for some messages

* fixes for handling correct processing of channel blocks

* added network reorder on short2 to float2 conversion

* fixed BE to LE conversion in short2 to float2 cast

* added host to network order conversion in tests

* removed unused variable

* Added some doc strings to RSSpectrometer.cuh header
---
 psrdada_cpp/detail/double_buffer.cu           |   1 +
 psrdada_cpp/effelsberg/CMakeLists.txt         |   3 +-
 .../effelsberg/rfi_chamber/CMakeLists.txt     |  26 ++
 .../effelsberg/rfi_chamber/RSSpectrometer.cuh | 104 ++++++
 .../rfi_chamber/src/RSSpectrometer.cu         | 320 ++++++++++++++++++
 .../rfi_chamber/src/rfi_chamber_cli.cu        | 116 +++++++
 .../rfi_chamber/test/CMakeLists.txt           |  12 +
 .../rfi_chamber/test/RSSpectrometerTester.cuh |  29 ++
 .../test/src/RSSpectrometerTester.cu          | 123 +++++++
 .../rfi_chamber/test/src/gtest_rfi_chamber.cu |   7 +
 10 files changed, 740 insertions(+), 1 deletion(-)
 create mode 100644 psrdada_cpp/effelsberg/rfi_chamber/CMakeLists.txt
 create mode 100644 psrdada_cpp/effelsberg/rfi_chamber/RSSpectrometer.cuh
 create mode 100644 psrdada_cpp/effelsberg/rfi_chamber/src/RSSpectrometer.cu
 create mode 100644 psrdada_cpp/effelsberg/rfi_chamber/src/rfi_chamber_cli.cu
 create mode 100644 psrdada_cpp/effelsberg/rfi_chamber/test/CMakeLists.txt
 create mode 100644 psrdada_cpp/effelsberg/rfi_chamber/test/RSSpectrometerTester.cuh
 create mode 100644 psrdada_cpp/effelsberg/rfi_chamber/test/src/RSSpectrometerTester.cu
 create mode 100644 psrdada_cpp/effelsberg/rfi_chamber/test/src/gtest_rfi_chamber.cu

diff --git a/psrdada_cpp/detail/double_buffer.cu b/psrdada_cpp/detail/double_buffer.cu
index 8324bc91..8852f0cf 100644
--- a/psrdada_cpp/detail/double_buffer.cu
+++ b/psrdada_cpp/detail/double_buffer.cu
@@ -1,4 +1,5 @@
 #include "psrdada_cpp/double_buffer.cuh"
+#include "thrust/device_vector.h"
 #include <utility>
 
 namespace psrdada_cpp {
diff --git a/psrdada_cpp/effelsberg/CMakeLists.txt b/psrdada_cpp/effelsberg/CMakeLists.txt
index 4dd63723..ddfa62be 100644
--- a/psrdada_cpp/effelsberg/CMakeLists.txt
+++ b/psrdada_cpp/effelsberg/CMakeLists.txt
@@ -1,2 +1,3 @@
 add_subdirectory(edd)
-add_subdirectory(paf)
\ No newline at end of file
+add_subdirectory(paf)
+add_subdirectory(rfi_chamber)
diff --git a/psrdada_cpp/effelsberg/rfi_chamber/CMakeLists.txt b/psrdada_cpp/effelsberg/rfi_chamber/CMakeLists.txt
new file mode 100644
index 00000000..bd3d39e9
--- /dev/null
+++ b/psrdada_cpp/effelsberg/rfi_chamber/CMakeLists.txt
@@ -0,0 +1,26 @@
+set(PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_LIBRARIES
+    ${CMAKE_PROJECT_NAME}_effelsberg_rfi_chamber
+    ${CMAKE_PROJECT_NAME}
+    ${DEPENDENCY_LIBRARIES})
+
+if(ENABLE_CUDA)
+
+   set(psrdada_cpp_effelsberg_rfi_chamber_src
+      src/RSSpectrometer.cu
+      )
+
+   cuda_add_library(${CMAKE_PROJECT_NAME}_effelsberg_rfi_chamber ${psrdada_cpp_effelsberg_rfi_chamber_src})
+   cuda_add_executable(rsspectrometer src/rfi_chamber_cli.cu)
+   target_link_libraries(rsspectrometer ${PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
+   install(TARGETS rsspectrometer DESTINATION bin)
+
+else()
+   set(psrdada_cpp_effelsberg_rfi_chamber_src
+      )
+
+   add_library(${CMAKE_PROJECT_NAME}_effelsberg_rfi_chamber ${psrdada_cpp_effelsberg_rfi_chamber_src})
+endif(ENABLE_CUDA)
+
+add_subdirectory(test)
+
+
diff --git a/psrdada_cpp/effelsberg/rfi_chamber/RSSpectrometer.cuh b/psrdada_cpp/effelsberg/rfi_chamber/RSSpectrometer.cuh
new file mode 100644
index 00000000..770ceec3
--- /dev/null
+++ b/psrdada_cpp/effelsberg/rfi_chamber/RSSpectrometer.cuh
@@ -0,0 +1,104 @@
+#ifndef PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_RSSPECTROMETER_CUH
+#define PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_RSSPECTROMETER_CUH
+
+#include "psrdada_cpp/raw_bytes.hpp"
+#include "psrdada_cpp/double_device_buffer.cuh"
+#include "psrdada_cpp/common.hpp"
+#include "thrust/device_vector.h"
+#include "thrust/host_vector.h"
+#include "cufft.h"
+#include <string>
+
+namespace psrdada_cpp {
+namespace effelsberg {
+namespace rfi_chamber {
+
+/**
+ * @brief      Pipeline for processing single polarisation channelised
+ *             data in TF order.
+ *
+ * @detail     Pipeline has been developed to handle the output of an FPGA
+ *             attached to a Rohde & Schwarz spectrum analyser running in
+ *             IQ sampling mode.
+ *
+ *             Data passed to the operator() method of the class is first converted
+ *             from network-order shorts to host-order single precision floats. The
+ *             floating point data is then FFT'd along the T axis of the data before
+ *             the power is detected and the data is integrated into an accumulation
+ *             buffer. After a certain number of spectra have been accumulated the
+ *             integrated spectrum will be written to disk and the object will be
+ *             destroyed.
+ */
+class RSSpectrometer
+{
+public:
+    typedef short2 InputType;
+    typedef float2 FftType;
+    typedef float OutputType;
+
+public:
+    /**
+     * @brief      Constructs a new instance.
+     *
+     * @param[in]  input_nchans  The number of input nchans
+     * @param[in]  fft_length    The length of the FFT to apply to each channel
+     * @param[in]  naccumulate   The number of detected spectra to accumulate
+     * @param[in]  nskip         The number of DADA blocks to skip before accumulating
+     *                           (to allow network settle time)
+     * @param[in]  filename      The name of the output file to write to
+     */
+    RSSpectrometer(
+        std::size_t input_nchans, std::size_t fft_length,
+        std::size_t naccumulate, std::size_t nskip,
+        std::string filename);
+    RSSpectrometer(RSSpectrometer const&) = delete;
+    ~RSSpectrometer();
+
+    /**
+     * @brief      Handle the DADA header.
+     *
+     * @param      header  The header in DADA format
+     *
+     * @detail     Currently a NO-OP, as no information is required from the header.
+     */
+    void init(RawBytes &header);
+
+    /**
+     * @brief      Invoke the pipeline for a block of valid TF data
+     *
+     * @param      block  A RawBytes block containing network order shorts in TF[IQ] order
+     *
+     * @return     Flag indicating if the complete number of spectra has been accumulated already
+     */
+    bool operator()(RawBytes &block);
+
+private:
+    void process(std::size_t chan_block_idx);
+    void copy(RawBytes& block, std::size_t spec_idx, std::size_t chan_block_idx, std::size_t nspectra_in);
+    void write_output();
+
+private:
+    DoubleDeviceBuffer<InputType> _copy_buffer;
+    thrust::device_vector<FftType> _fft_input_buffer;
+    thrust::device_vector<FftType> _fft_output_buffer;
+    thrust::device_vector<OutputType> _accumulation_buffer;
+    thrust::host_vector<OutputType> _h_accumulation_buffer;
+    std::size_t _input_nchans;
+    std::size_t _fft_length;
+    std::size_t _naccumulate;
+    std::size_t _nskip;
+    std::string _filename;
+    std::size_t _output_nchans;
+    std::size_t _bytes_per_input_spectrum;
+    std::size_t _chans_per_copy;
+    cufftHandle _fft_plan;
+    cudaStream_t _copy_stream;
+    cudaStream_t _proc_stream;
+};
+
+
+} //namespace rfi_chamber
+} //namespace effelsberg
+} //namespace psrdada_cpp
+
+#endif //PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_RSSPECTROMETER_CUH
diff --git a/psrdada_cpp/effelsberg/rfi_chamber/src/RSSpectrometer.cu b/psrdada_cpp/effelsberg/rfi_chamber/src/RSSpectrometer.cu
new file mode 100644
index 00000000..26832895
--- /dev/null
+++ b/psrdada_cpp/effelsberg/rfi_chamber/src/RSSpectrometer.cu
@@ -0,0 +1,320 @@
+#include "psrdada_cpp/effelsberg/rfi_chamber/RSSpectrometer.cuh"
+#include "psrdada_cpp/cuda_utils.hpp"
+#include <thrust/system/cuda/execution_policy.h>
+#include "thrust/functional.h"
+#include "thrust/transform.h"
+#include "thrust/fill.h"
+#include <cassert>
+#include <fstream>
+#include <iomanip>
+
+namespace psrdada_cpp {
+namespace effelsberg {
+namespace rfi_chamber {
+namespace kernels {
+
+struct short2_be_to_float2_le
+    : public thrust::unary_function<short2, float2>
+{
+    __host__ __device__
+    float2 operator()(short2 in)
+    {
+        char4 swap;
+        char4* in_ptr = (char4*)(&in);
+        swap.x = in_ptr->y;
+        swap.y = in_ptr->x;
+        swap.z = in_ptr->w;
+        swap.w = in_ptr->z;
+        short2* swap_as_short2 = (short2*)(&swap);
+        float2 out;
+        out.x = (float) swap_as_short2->x;
+        out.y = (float) swap_as_short2->y;
+        return out;
+    }
+};
+
+struct detect_accumulate
+    : public thrust::binary_function<float2, float, float>
+{
+    __host__ __device__
+    float operator()(float2 voltage, float power_accumulator)
+    {
+        float power = voltage.x * voltage.x + voltage.y * voltage.y;
+        return power_accumulator + power;
+    }
+};
+
+} // namespace kernels
+
+
+RSSpectrometer::RSSpectrometer(
+    std::size_t input_nchans, std::size_t fft_length,
+    std::size_t naccumulate, std::size_t nskip,
+    std::string filename)
+    : _input_nchans(input_nchans)
+    , _fft_length(fft_length)
+    , _naccumulate(naccumulate)
+    , _nskip(nskip)
+    , _filename(filename)
+    , _output_nchans(_fft_length * _input_nchans)
+    , _bytes_per_input_spectrum(_input_nchans * sizeof(InputType))
+{
+
+    BOOST_LOG_TRIVIAL(info) << "Initialising RSSpectrometer";
+    BOOST_LOG_TRIVIAL(info) << "Number of input channels: " << _input_nchans;
+    BOOST_LOG_TRIVIAL(info) << "FFT length: " << _fft_length;
+    BOOST_LOG_TRIVIAL(info) << "Number of spectra to accumulate: " << _naccumulate;
+    BOOST_LOG_TRIVIAL(info) << "Number of DADA blocks to skip: " << _nskip;
+    BOOST_LOG_TRIVIAL(info) << "Number of output channels: " << _output_nchans;
+
+    std::size_t total_mem, free_mem;
+    CUDA_ERROR_CHECK(cudaMemGetInfo(&free_mem, &total_mem));
+    BOOST_LOG_TRIVIAL(debug) << "Total GPU memory: " << total_mem << " bytes";
+    BOOST_LOG_TRIVIAL(debug) << "Free GPU memory: " << free_mem << " bytes";
+
+    // Memory required for accumulation buffer
+    std::size_t accumulator_buffer_bytes = _output_nchans * sizeof(OutputType);
+    BOOST_LOG_TRIVIAL(debug) << "Memory required for accumulator buffer: " << accumulator_buffer_bytes << " bytes";
+    if (accumulator_buffer_bytes > free_mem)
+    {
+        throw std::runtime_error("The requested FFT length exceeds the free GPU memory");
+    }
+    std::size_t mem_budget = static_cast<std::size_t>((free_mem - accumulator_buffer_bytes) * 0.8) ; // Make only 80% of memory available
+    BOOST_LOG_TRIVIAL(debug) << "Memory budget: " << mem_budget << " bytes";
+    // Memory required per input channel
+    std::size_t mem_per_input_channel = (_fft_length *  (sizeof(FftType) * 2 + 2 * sizeof(InputType)));
+    BOOST_LOG_TRIVIAL(debug) << "Memory required per input channel: " << mem_per_input_channel << " bytes";
+    _chans_per_copy = min(_input_nchans, mem_budget / mem_per_input_channel);
+    if (mem_per_input_channel > mem_budget)
+    {
+	 throw std::runtime_error("The requested FFT length exceeds the free GPU memory");
+    }
+    BOOST_LOG_TRIVIAL(debug) << "Max possible Nchans per copy: " << mem_budget / mem_per_input_channel;
+    while (_input_nchans % _chans_per_copy != 0)
+    {
+        _chans_per_copy -= 1;
+    }
+    assert(_chans_per_copy > 0); /** Must be able to process at least 1 channel */
+    BOOST_LOG_TRIVIAL(debug) << "Nchannels per GPU transfer: " << _chans_per_copy;
+    mem_budget -= _chans_per_copy * mem_per_input_channel;
+    BOOST_LOG_TRIVIAL(debug) << "Remaining memory budget: " << mem_budget << " bytes";
+
+    // Resize all buffers.
+    BOOST_LOG_TRIVIAL(debug) << "Resizing all memory buffers";
+    CUDA_ERROR_CHECK(cudaMemGetInfo(&free_mem, &total_mem));
+    BOOST_LOG_TRIVIAL(debug) << "Free GPU memory: " << free_mem << " bytes";
+    _accumulation_buffer.resize(_output_nchans, 0.0f);
+    CUDA_ERROR_CHECK(cudaMemGetInfo(&free_mem, &total_mem));
+    BOOST_LOG_TRIVIAL(debug) << "Free GPU memory after acc buffer: " << free_mem << " bytes";
+    _h_accumulation_buffer.resize(_output_nchans, 0.0f);
+    BOOST_LOG_TRIVIAL(debug) << "Allocating " << _chans_per_copy * _fft_length * 8  * 2 << " bytes for FFT buffers";
+    _fft_input_buffer.resize(_chans_per_copy * _fft_length);
+    _fft_output_buffer.resize(_chans_per_copy * _fft_length);
+    CUDA_ERROR_CHECK(cudaMemGetInfo(&free_mem, &total_mem));
+    BOOST_LOG_TRIVIAL(debug) << "Free GPU memory after FFT buffer: " << free_mem << " bytes";
+    _copy_buffer.resize(_chans_per_copy * _fft_length);
+    CUDA_ERROR_CHECK(cudaMemGetInfo(&free_mem, &total_mem));
+    BOOST_LOG_TRIVIAL(debug) << "Free GPU memory after copy buffer: " << free_mem << " bytes";
+
+    // Allocate streams
+    BOOST_LOG_TRIVIAL(debug) << "Allocating CUDA streams";
+    CUDA_ERROR_CHECK(cudaStreamCreate(&_copy_stream));
+    CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
+
+    CUDA_ERROR_CHECK(cudaMemGetInfo(&free_mem, &total_mem));
+    BOOST_LOG_TRIVIAL(debug) << "Free GPU memory pre-cufft: " << free_mem << " bytes";
+    // Configure CUFFT for FFT execution
+
+    BOOST_LOG_TRIVIAL(debug) << "Generating CUFFT plan";
+    int n[] = {static_cast<int>(_fft_length)};
+    int inembed[] = {static_cast<int>(_chans_per_copy)};
+    int onembed[] = {static_cast<int>(_fft_length)};
+    CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, inembed, _chans_per_copy, 1, onembed, 1, _fft_length,
+        CUFFT_C2C, _chans_per_copy));
+
+    BOOST_LOG_TRIVIAL(debug) << "Setting CUFFT stream";
+    CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream));
+    CUDA_ERROR_CHECK(cudaMemGetInfo(&free_mem, &total_mem));
+    BOOST_LOG_TRIVIAL(debug) << "Free GPU memory post-cufft: " << free_mem << " bytes";
+    BOOST_LOG_TRIVIAL(debug) << "RSSpectrometer instance initialised";
+}
+
+RSSpectrometer::~RSSpectrometer()
+{
+    BOOST_LOG_TRIVIAL(debug) << "Destroying RSSpectrometer instance";
+    BOOST_LOG_TRIVIAL(debug) << "Destroying CUDA streams";
+    CUDA_ERROR_CHECK(cudaStreamDestroy(_copy_stream));
+    CUDA_ERROR_CHECK(cudaStreamDestroy(_proc_stream));
+    BOOST_LOG_TRIVIAL(debug) << "Destroying CUFFT plan";
+    CUFFT_ERROR_CHECK(cufftDestroy(_fft_plan));
+    BOOST_LOG_TRIVIAL(info) << "RSSpectrometer destroyed";
+}
+
+void RSSpectrometer::init(RawBytes &header)
+{
+    BOOST_LOG_TRIVIAL(debug) << "RSSpectrometer received header block";
+}
+
+bool RSSpectrometer::operator()(RawBytes &block)
+{
+    BOOST_LOG_TRIVIAL(debug) << "RSSpectrometer received data block";
+    if (_nskip > 0)
+    {
+        BOOST_LOG_TRIVIAL(debug) << "Skipping block while stream stabilizes";
+        --_nskip;
+        return false;
+    }
+    assert(block.used_bytes() % _bytes_per_input_spectrum == 0); /** Block is not a multiple of the heap group size */
+    std::size_t nspectra_in = block.used_bytes() / _bytes_per_input_spectrum;
+    BOOST_LOG_TRIVIAL(debug) << "Number of input spectra per block: " << nspectra_in;
+    assert(block.used_bytes() % _output_nchans * sizeof(InputType) == 0); /** Block is not a multiple of the spectrum size */
+    std::size_t nspectra_out = block.used_bytes() / (_output_nchans * sizeof(InputType));
+    BOOST_LOG_TRIVIAL(debug) << "Number of output spectra per block: " << nspectra_out;
+
+    std::size_t n_to_accumulate;
+    if (nspectra_out > _naccumulate)
+    {
+        n_to_accumulate = _naccumulate;
+    }
+    else
+    {
+        n_to_accumulate = nspectra_out;
+    }
+    BOOST_LOG_TRIVIAL(debug) << "Number of spectra to accumulate in current block: " << n_to_accumulate;
+    BOOST_LOG_TRIVIAL(debug) << "Entering processing loop";
+    std::size_t nchan_blocks = _input_nchans / _chans_per_copy;
+    for (std::size_t spec_idx = 0; spec_idx < n_to_accumulate; ++spec_idx)
+    {
+        copy(block, spec_idx, 0, nspectra_in);
+        for (std::size_t chan_block_idx = 1;
+            chan_block_idx < nchan_blocks;
+            ++chan_block_idx)
+        {
+            copy(block, spec_idx, chan_block_idx, nspectra_in);
+            process(chan_block_idx - 1);
+        }
+        CUDA_ERROR_CHECK(cudaStreamSynchronize(_copy_stream));
+        _copy_buffer.swap();
+        process(nchan_blocks - 1);
+    }
+    CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
+    BOOST_LOG_TRIVIAL(debug) << "Processing loop complete";
+    _naccumulate -= n_to_accumulate;
+    BOOST_LOG_TRIVIAL(info) << "Accumulated " << n_to_accumulate
+    << " spectra ("<< _naccumulate << " remaining)";
+    if (_naccumulate == 0)
+    {
+        BOOST_LOG_TRIVIAL(debug) << "Processing loop complete";
+        write_output();
+        return true;
+    }
+    return false;
+}
+
+void RSSpectrometer::process(std::size_t chan_block_idx)
+{
+    /** Note streams do not actually work as expected
+     *  with Thrust. The code is synchronous with respect
+     *  to the host. The Thrust 1.9.4 (CUDA 10.1) release
+     *  includes thrust::async which alleviates this problem.
+     *  This can be included here if need be, but as it is the
+     *  H2D copy should still run in parallel to the FFT, so
+     *  there is no performance cost to blocking on the host.
+     */
+    CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
+    // Convert shorts to floats
+    BOOST_LOG_TRIVIAL(debug) << "Performing short2 to float2 conversion";
+
+    thrust::transform(
+        thrust::cuda::par.on(_proc_stream),
+        _copy_buffer.b().begin(),
+        _copy_buffer.b().end(),
+        _fft_input_buffer.begin(),
+        kernels::short2_be_to_float2_le());
+
+    // Perform forward C2C transform
+    BOOST_LOG_TRIVIAL(debug) << "Executing FFT";
+    cufftComplex* in_ptr = static_cast<cufftComplex*>(
+        thrust::raw_pointer_cast(_fft_input_buffer.data()));
+    cufftComplex* out_ptr = static_cast<cufftComplex*>(
+        thrust::raw_pointer_cast(_fft_output_buffer.data()));
+    CUFFT_ERROR_CHECK(cufftExecC2C(
+        _fft_plan, in_ptr, out_ptr, CUFFT_FORWARD));
+    std::size_t chan_offset = chan_block_idx * _chans_per_copy * _fft_length;
+    // Detect FFT output and accumulate
+    BOOST_LOG_TRIVIAL(debug) << "Detecting and accumulating";
+
+    thrust::transform(
+        thrust::cuda::par.on(_proc_stream),
+        _fft_output_buffer.begin(),
+        _fft_output_buffer.end(),
+        _accumulation_buffer.begin() + chan_offset,
+        _accumulation_buffer.begin() + chan_offset,
+        kernels::detect_accumulate());
+
+}
+
+void RSSpectrometer::copy(RawBytes& block, std::size_t spec_idx, std::size_t chan_block_idx, std::size_t nspectra_in)
+{
+    BOOST_LOG_TRIVIAL(debug) << "Copying block to GPU";
+    std::size_t spitch = _input_nchans * sizeof(short2); // Width of a row in bytes (so number of channels total)
+    std::size_t width = _chans_per_copy * sizeof(short2);; // Total number of samples in the input
+    std::size_t dpitch = _chans_per_copy * sizeof(short2); // Width of row in bytes in the output
+    std::size_t height = _fft_length; // Total number of samples to copy
+    CUDA_ERROR_CHECK(cudaStreamSynchronize(_copy_stream));
+    _copy_buffer.swap();
+    char* src = block.ptr() + spec_idx * spitch * height + chan_block_idx * width;
+    BOOST_LOG_TRIVIAL(debug) << "Calling cudaMemcpy2DAsync with args: "
+	    << "dest=" << _copy_buffer.a_ptr() << ", "
+	    << "dpitch=" << dpitch << ", "
+	    << "src=" << (void*) src << ", "
+	    << "spitch=" << spitch << ", "
+	    << "width=" << width << ", "
+	    << "height=" << height << ", "
+	    << cudaMemcpyHostToDevice << ", "
+	    << _copy_stream;
+    CUDA_ERROR_CHECK(cudaMemcpy2DAsync(_copy_buffer.a_ptr(),
+        dpitch, src, spitch, width, height,
+        cudaMemcpyHostToDevice, _copy_stream));
+}
+
+void RSSpectrometer::write_output()
+{
+    // Copy accumulation buffer to host
+    BOOST_LOG_TRIVIAL(debug) << "Copying accumulated spectrum to host";
+    _h_accumulation_buffer = _accumulation_buffer;
+    BOOST_LOG_TRIVIAL(debug) << "Perparing output file";
+    std::ofstream outfile;
+    outfile.open(_filename.c_str(),std::ifstream::out | std::ifstream::binary);
+    if (outfile.is_open())
+    {
+        BOOST_LOG_TRIVIAL(debug) << "Opened file " << _filename;
+    }
+    else
+    {
+        std::stringstream stream;
+        stream << "Could not open file " << _filename;
+        throw std::runtime_error(stream.str().c_str());
+    }
+    BOOST_LOG_TRIVIAL(debug) << "Writing output to file with FFT shift included";
+    //char const* ptr = reinterpret_cast<char*>(thrust::raw_pointer_cast(_h_accumulation_buffer.data()));
+
+    for (std::size_t subband_idx=0; subband_idx < _h_accumulation_buffer.size() / _fft_length; ++subband_idx)
+    {
+        std::size_t offset = subband_idx * _fft_length;
+        //First write upper half of the band
+        char* back = reinterpret_cast<char*>(&_h_accumulation_buffer[offset + _fft_length/2]);
+        char* front = reinterpret_cast<char*>(&_h_accumulation_buffer[offset]);
+        outfile.write(back, (_fft_length/2) * sizeof(decltype(_h_accumulation_buffer)::value_type));
+        outfile.write(front, (_fft_length/2) * sizeof(decltype(_h_accumulation_buffer)::value_type));
+    }
+    outfile.flush();
+    outfile.close();
+    BOOST_LOG_TRIVIAL(debug) << "File write complete";
+}
+
+
+} //namespace rfi_chamber
+} //namespace effelsberg
+} //namespace psrdada_cpp
+
diff --git a/psrdada_cpp/effelsberg/rfi_chamber/src/rfi_chamber_cli.cu b/psrdada_cpp/effelsberg/rfi_chamber/src/rfi_chamber_cli.cu
new file mode 100644
index 00000000..6e7acac3
--- /dev/null
+++ b/psrdada_cpp/effelsberg/rfi_chamber/src/rfi_chamber_cli.cu
@@ -0,0 +1,116 @@
+#include "psrdada_cpp/effelsberg/rfi_chamber/RSSpectrometer.cuh"
+#include "psrdada_cpp/multilog.hpp"
+#include "psrdada_cpp/raw_bytes.hpp"
+#include "psrdada_cpp/dada_input_stream.hpp"
+#include "psrdada_cpp/cli_utils.hpp"
+
+#include "boost/program_options.hpp"
+
+#include <sys/types.h>
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <ios>
+#include <algorithm>
+
+using namespace psrdada_cpp;
+
+namespace
+{
+  const size_t ERROR_IN_COMMAND_LINE = 1;
+  const size_t SUCCESS = 0;
+  const size_t ERROR_UNHANDLED_EXCEPTION = 2;
+} // namespace
+
+int main(int argc, char** argv)
+{
+
+    key_t dada_key;
+    std::size_t input_nchans;
+    std::size_t fft_length;
+    std::size_t naccumulate;
+    std::size_t nskip;
+    std::string filename;
+
+    try
+    {
+        /** Define and parse the program options
+        */
+        namespace po = boost::program_options;
+        po::options_description desc("Options");
+        desc.add_options()
+        ("help,h", "Print help messages")
+        ("key", po::value<std::string>()
+            ->default_value("dada")
+            ->notifier([&dada_key](std::string key)
+                {
+                    dada_key = string_to_key(key);
+                }),
+           "The shared memory key (hex string) for the dada buffer containing input data")
+        ("input-nchans", po::value<std::size_t>(&input_nchans)
+            ->default_value(1<<15),
+            "The number of PFB channels in the input data")
+        ("fft-length", po::value<std::size_t>(&fft_length)
+            ->default_value(1<<12),
+            "The length of FFT to perform on the data")
+        ("naccumulate", po::value<std::size_t>(&naccumulate)
+            ->default_value(10),
+            "The number of spectra to accumulate before writing to disk")
+        ("nskip", po::value<std::size_t>(&nskip)
+            ->default_value(2),
+            "The number of DADA blocks to skip before recording starts (this allows time for the stream to settle)")
+        ("output,o", po::value<std::string>(&filename)
+            ->required(),
+            "The full path of the output file to which the final accumulated spectrum will be written")
+        ("log-level", po::value<std::string>()
+            ->default_value("info")
+            ->notifier([](std::string level)
+                {
+                    set_log_level(level);
+                }),
+            "The logging level to use (debug, info, warning, error)");
+
+        po::variables_map vm;
+        try
+        {
+            po::store(po::parse_command_line(argc, argv, desc), vm);
+            if ( vm.count("help")  )
+            {
+                std::cout << "rsspectrometer -- Spectrometer for FSW IQ output" << std::endl
+                << desc << std::endl;
+                return SUCCESS;
+            }
+            po::notify(vm);
+        }
+        catch(po::error& e)
+        {
+            std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
+            std::cerr << desc << std::endl;
+            return ERROR_IN_COMMAND_LINE;
+        }
+
+        //
+
+        /**
+         * All the application code goes here
+         */
+        MultiLog log("rs_spectro");
+        DadaClientBase client(dada_key, log);
+        client.cuda_register_memory();
+        effelsberg::rfi_chamber::RSSpectrometer spectrometer(
+            input_nchans, fft_length, naccumulate, nskip, filename);
+        DadaInputStream<decltype(spectrometer)> stream(dada_key, log, spectrometer);
+        stream.start();
+        /**
+         * End of application code
+         */
+    }
+    catch(std::exception& e)
+    {
+        std::cerr << "Unhandled Exception reached the top of main: "
+        << e.what() << ", application will now exit" << std::endl;
+        return ERROR_UNHANDLED_EXCEPTION;
+    }
+    return SUCCESS;
+
+}
diff --git a/psrdada_cpp/effelsberg/rfi_chamber/test/CMakeLists.txt b/psrdada_cpp/effelsberg/rfi_chamber/test/CMakeLists.txt
new file mode 100644
index 00000000..e0d1e293
--- /dev/null
+++ b/psrdada_cpp/effelsberg/rfi_chamber/test/CMakeLists.txt
@@ -0,0 +1,12 @@
+include_directories(${GTEST_INCLUDE_DIR})
+
+link_directories(${GTEST_LIBRARY_DIR})
+
+set(
+    gtest_rfi_chamber_src
+    src/RSSpectrometerTester.cu
+    src/gtest_rfi_chamber.cu
+)
+cuda_add_executable(gtest_rfi_chamber ${gtest_rfi_chamber_src} )
+target_link_libraries(gtest_rfi_chamber ${PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
+add_test(gtest_rfi_chamber gtest_rfi_chamber --test_data "${CMAKE_CURRENT_LIST_DIR}/data")
diff --git a/psrdada_cpp/effelsberg/rfi_chamber/test/RSSpectrometerTester.cuh b/psrdada_cpp/effelsberg/rfi_chamber/test/RSSpectrometerTester.cuh
new file mode 100644
index 00000000..472c9b42
--- /dev/null
+++ b/psrdada_cpp/effelsberg/rfi_chamber/test/RSSpectrometerTester.cuh
@@ -0,0 +1,29 @@
+#ifndef PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_RSSPECTROMETERTESTER_CUH
+#define PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_RSSPECTROMETERTESTER_CUH
+
+#include <gtest/gtest.h>
+
+namespace psrdada_cpp {
+namespace effelsberg {
+namespace rfi_chamber {
+namespace test {
+
+class RSSpectrometerTester: public ::testing::Test
+{
+protected:
+    void SetUp() override;
+    void TearDown() override;
+
+public:
+    RSSpectrometerTester();
+    ~RSSpectrometerTester();
+
+    void run_dc_power_test(std::size_t input_nchans, std::size_t fft_length, std::size_t naccumulate);
+};
+
+} //namespace test
+} //namespace rfi_chamber
+} //namespace effeslberg
+} //namespace psrdada_cpp
+
+#endif //PSRDADA_CPP_EFFELSBERG_RFI_CHAMBER_RSSPECTROMETERTESTER_CUH
diff --git a/psrdada_cpp/effelsberg/rfi_chamber/test/src/RSSpectrometerTester.cu b/psrdada_cpp/effelsberg/rfi_chamber/test/src/RSSpectrometerTester.cu
new file mode 100644
index 00000000..af8d6f16
--- /dev/null
+++ b/psrdada_cpp/effelsberg/rfi_chamber/test/src/RSSpectrometerTester.cu
@@ -0,0 +1,123 @@
+#include "psrdada_cpp/effelsberg/rfi_chamber/test/RSSpectrometerTester.cuh"
+#include "psrdada_cpp/effelsberg/rfi_chamber/RSSpectrometer.cuh"
+#include "psrdada_cpp/common.hpp"
+#include "psrdada_cpp/cuda_utils.hpp"
+#include <thrust/host_vector.h>
+#include <thrust/system/cuda/experimental/pinned_allocator.h>
+#include <arpa/inet.h>
+#include <random>
+#include <cmath>
+#include <complex>
+#include <fstream>
+#include <iomanip>
+
+namespace psrdada_cpp {
+namespace effelsberg {
+namespace rfi_chamber {
+namespace test {
+
+RSSpectrometerTester::RSSpectrometerTester()
+    : ::testing::Test()
+{
+
+}
+
+RSSpectrometerTester::~RSSpectrometerTester()
+{
+
+
+}
+
+void RSSpectrometerTester::SetUp()
+{
+
+}
+
+void RSSpectrometerTester::TearDown()
+{
+
+}
+
+void RSSpectrometerTester::run_dc_power_test(std::size_t input_nchans, std::size_t fft_length, std::size_t naccumulate)
+{
+    std::size_t nskip = 0;
+    std::size_t nspec_per_block = 4;
+    std::size_t block_bytes = input_nchans * fft_length * nspec_per_block * sizeof(short2);
+    std::size_t nblocks = naccumulate / nspec_per_block + 1;
+    std::vector<char> vheader(4096);
+    char* ptr;
+    cudaMallocHost((void**)&ptr, block_bytes);
+    RawBytes header_block(vheader.data(), vheader.size(), vheader.size());
+    RawBytes data_block((char*) ptr, block_bytes, block_bytes);
+
+
+    short2* s2ptr = reinterpret_cast<short2*>(ptr);
+    for (std::size_t ii = 0;
+        ii < input_nchans * fft_length * nspec_per_block;
+        ii += input_nchans)
+    {
+        for (std::size_t chan_idx = 0; chan_idx < input_nchans; ++chan_idx)
+        {
+            s2ptr[ii + chan_idx].x = htons(static_cast<unsigned short>(chan_idx));
+            s2ptr[ii + chan_idx].y = 0;
+        }
+    }
+    RSSpectrometer spectrometer(input_nchans, fft_length, naccumulate, nskip, "/tmp/dc_power_test.bin");
+    spectrometer.init(header_block);
+    for (int ii=0; ii < nblocks; ++ii)
+    {
+        if (spectrometer(data_block))
+        {
+            break;
+        }
+    }
+    cudaFreeHost(ptr);
+
+
+    std::vector<float> acc_spec(input_nchans * fft_length);
+
+    std::ifstream infile;
+    infile.open("/tmp/dc_power_test.bin", std::ifstream::in | std::ifstream::binary);
+    if (!infile.is_open())
+    {
+        std::stringstream stream;
+        stream << "Could not open file " << "/tmp/dc_power_test.bin";
+        throw std::runtime_error(stream.str().c_str());
+    }
+    infile.read(reinterpret_cast<char*>(acc_spec.data()), input_nchans * fft_length * sizeof(float));
+    infile.close();
+
+    for (std::size_t input_chan_idx = 0; input_chan_idx < input_nchans; ++input_chan_idx)
+    {
+        float expected_peak = naccumulate * pow(input_chan_idx * fft_length, 2);
+        for (std::size_t fft_idx = 0; fft_idx < fft_length; ++fft_idx)
+        {
+            if (fft_idx == fft_length/2)
+            {
+                ASSERT_NEAR(acc_spec[input_chan_idx*fft_length + fft_idx], expected_peak, expected_peak * 0.00001f);
+            }
+            else
+            {
+                ASSERT_NEAR(acc_spec[input_chan_idx*fft_length + fft_idx], 0.0f, 0.00001f);
+            }
+
+        }
+    }
+}
+
+TEST_F(RSSpectrometerTester, test_dc_power_1024_chan)
+{
+    run_dc_power_test(1<<15, 1<<10, 10);
+}
+
+
+TEST_F(RSSpectrometerTester, test_dc_power_16384_chan)
+{
+    run_dc_power_test(1<<15, 1<<14, 10);
+}
+
+} //namespace test
+} //namespace rfi_chamber
+} //namespace effeslberg
+} //namespace psrdada_cpp
+
diff --git a/psrdada_cpp/effelsberg/rfi_chamber/test/src/gtest_rfi_chamber.cu b/psrdada_cpp/effelsberg/rfi_chamber/test/src/gtest_rfi_chamber.cu
new file mode 100644
index 00000000..96d7200c
--- /dev/null
+++ b/psrdada_cpp/effelsberg/rfi_chamber/test/src/gtest_rfi_chamber.cu
@@ -0,0 +1,7 @@
+#include <gtest/gtest.h>
+
+int main(int argc, char **argv)
+{
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
-- 
GitLab