diff --git a/cmake/cuda.cmake b/cmake/cuda.cmake index 05a79711ff1adb3c67175b11cf52986bfa8b35a8..395fd5e4e16178165127d355cfd3653c4889741a 100644 --- a/cmake/cuda.cmake +++ b/cmake/cuda.cmake @@ -16,7 +16,7 @@ if(ENABLE_CUDA) # Pass options to NVCC ( -ccbin /path --compiler-options -lfftw3f --compiler-options -lm --verbose) list(APPEND CUDA_NVCC_FLAGS -DENABLE_CUDA --std c++11 -Wno-deprecated-gpu-targets --ptxas-options=-v) - list(APPEND CUDA_NVCC_FLAGS_DEBUG --debug; --device-debug; --generate-line-info -Xcompiler "-Werror") + list(APPEND CUDA_NVCC_FLAGS_DEBUG --debug; --device-debug; --generate-line-info -Xcompiler "-Wextra" -Xcompiler "-Werror") list(APPEND CUDA_NVCC_FLAGS_PROFILE --generate-line-info) #list(APPEND CUDA_NVCC_FLAGS -arch compute_35) # minumum compute level (Sps restriction) string(TOUPPER "${CMAKE_BUILD_TYPE}" uppercase_CMAKE_BUILD_TYPE) diff --git a/psrdada_cpp/CMakeLists.txt b/psrdada_cpp/CMakeLists.txt index aebf807c6e3ea0ad4e8939058326f7499ec69155..18daba2f300ebcffbbd1ee97419bdea28599efe2 100644 --- a/psrdada_cpp/CMakeLists.txt +++ b/psrdada_cpp/CMakeLists.txt @@ -69,6 +69,7 @@ install (TARGETS ${CMAKE_PROJECT_NAME} LIBRARY DESTINATION lib ARCHIVE DESTINATION lib) install(FILES ${psrdada_cpp_inc} DESTINATION include/psrdada_cpp) +install(DIRECTORY detail DESTINATION include/psrdada_cpp) add_subdirectory(meerkat) add_subdirectory(effelsberg) diff --git a/psrdada_cpp/effelsberg/edd/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/CMakeLists.txt index 6c3d25256e5bbe47bfd64f88653a16bd2cda78fd..a27a6a88f4cf407f11333ec998b4c33a460c88c4 100644 --- a/psrdada_cpp/effelsberg/edd/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/CMakeLists.txt @@ -23,5 +23,10 @@ cuda_add_executable(VLBI_prof src/VLBI_prof.cu) target_link_libraries(VLBI_prof ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES}) install(TARGETS VLBI_prof DESTINATION bin) +#Gated FFT spectrometer interface +cuda_add_executable(gated_spectrometer src/GatedSpectrometer_cli.cu) +target_link_libraries(gated_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas) +install(TARGETS gated_spectrometer DESTINATION bin) + add_subdirectory(test) endif(ENABLE_CUDA) diff --git a/psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh b/psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh index 0a95d045f0cbc6a1d5671c8d6eef8f43c1460e2a..6302d105d6dd02c35460e3130648b4ee3322effa 100644 --- a/psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh +++ b/psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh @@ -9,23 +9,103 @@ namespace effelsberg { namespace edd { namespace kernels { + +// template argument unused but needed as nvcc gets otherwise confused. +template <typename T> __global__ void detect_and_accumulate(float2 const* __restrict__ in, int8_t* __restrict__ out, - int nchans, int nsamps, int naccumulate, float scale, float offset); + int nchans, int nsamps, int naccumulate, float scale, float offset, int stride, int out_offset) +{ + // grid stride loop over output array to keep + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nsamps * nchans / naccumulate); i += blockDim.x * gridDim.x) + { + double sum = 0.0f; + size_t currentOutputSpectra = i / nchans; + size_t currentChannel = i % nchans; + + for (size_t j = 0; j < naccumulate; j++) + { + float2 tmp = in[ j * nchans + currentOutputSpectra * nchans * naccumulate + currentChannel]; + double x = tmp.x * tmp.x; + double y = tmp.y * tmp.y; + sum += x + y; + } + size_t toff = out_offset * nchans + currentOutputSpectra * nchans *stride; + out[toff + i] += (int8_t) ((sum - offset)/scale); + } + +} + +template <typename T> +__global__ +void detect_and_accumulate(float2 const* __restrict__ in, float* __restrict__ out, + int nchans, int nsamps, int naccumulate, float scale, float offset, int stride, int out_offset) +{ + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < nsamps * nchans / naccumulate); i += blockDim.x * gridDim.x) + { + double sum = 0; + size_t currentOutputSpectra = i / nchans; + size_t currentChannel = i % nchans; + + for (size_t j = 0; j < naccumulate; j++) + { + float2 tmp = in[ j * nchans + currentOutputSpectra * nchans * naccumulate + currentChannel]; + double x = tmp.x * tmp.x; + double y = tmp.y * tmp.y; + sum += x + y; + } + size_t toff = out_offset * nchans + currentOutputSpectra * nchans * stride; + out[i + toff] += sum; + } } +} // namespace kernels + + + +template <typename T> class DetectorAccumulator { public: typedef thrust::device_vector<float2> InputType; - typedef thrust::device_vector<int8_t> OutputType; + typedef thrust::device_vector<T> OutputType; public: - DetectorAccumulator(int nchans, int tscrunch, float scale, float offset, cudaStream_t stream); - ~DetectorAccumulator(); DetectorAccumulator(DetectorAccumulator const&) = delete; - void detect(InputType const& input, OutputType& output); + + + DetectorAccumulator( + int nchans, int tscrunch, float scale, + float offset, cudaStream_t stream) + : _nchans(nchans) + , _tscrunch(tscrunch) + , _scale(scale) + , _offset(offset) + , _stream(stream) + { + + } + + ~DetectorAccumulator() + { + + } + + // stride sets an offset of _nChans * stride to the detection in the output + // to allow multiple spectra in one output + void detect(InputType const& input, OutputType& output, int stride = 0, int stoff = 0) + { + assert(input.size() % (_nchans * _tscrunch) == 0 /* Input is not a multiple of _nchans * _tscrunch*/); + //output.resize(input.size()/_tscrunch); + int nsamps = input.size() / _nchans; + float2 const* input_ptr = thrust::raw_pointer_cast(input.data()); + T * output_ptr = thrust::raw_pointer_cast(output.data()); + kernels::detect_and_accumulate<T> <<<1024, 1024, 0, _stream>>>( + input_ptr, output_ptr, _nchans, nsamps, _tscrunch, _scale, _offset, stride, stoff); + } + + private: int _nchans; diff --git a/psrdada_cpp/effelsberg/edd/FftSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/FftSpectrometer.cuh index 8f168cb9fcbcf1a10a3ba420bceb6a3e0fa7f9fa..df9f755270147a91c82fef402aa59eb1557db684 100644 --- a/psrdada_cpp/effelsberg/edd/FftSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/FftSpectrometer.cuh @@ -68,7 +68,7 @@ private: int _nchans; int _call_count; std::unique_ptr<Unpacker> _unpacker; - std::unique_ptr<DetectorAccumulator> _detector; + std::unique_ptr<DetectorAccumulator<int8_t> > _detector; DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db; DoubleDeviceBuffer<IntegratedPowerType> _power_db; thrust::device_vector<UnpackedVoltageType> _unpacked_voltage; diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh new file mode 100644 index 0000000000000000000000000000000000000000..56e6ff7c00416f952eb98a453f9de78791bb1345 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -0,0 +1,185 @@ +#ifndef PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP +#define PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP + +#include "psrdada_cpp/effelsberg/edd/Unpacker.cuh" +#include "psrdada_cpp/raw_bytes.hpp" +#include "psrdada_cpp/cuda_utils.hpp" +#include "psrdada_cpp/double_device_buffer.cuh" +#include "psrdada_cpp/double_host_buffer.cuh" +#include "psrdada_cpp/effelsberg/edd/DetectorAccumulator.cuh" + +#include "thrust/device_vector.h" +#include "cufft.h" + +#include "cublas_v2.h" + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { + + +#define BIT_MASK(bit) (1L << (bit)) +#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit)) +#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit)) +#define TEST_BIT(value, bit) (((value)&BIT_MASK(bit)) ? 1 : 0) + + +/** + @class GatedSpectrometer + @brief Split data into two streams and create integrated spectra depending on + bit set in side channel data. + + */ +template <class HandlerType, typename IntegratedPowerType> class GatedSpectrometer { +public: + typedef uint64_t RawVoltageType; + typedef float UnpackedVoltageType; + typedef float2 ChannelisedVoltageType; +// typedef float IntegratedPowerType; + //typedef int8_t IntegratedPowerType; + +public: + /** + * @brief Constructor + * + * @param buffer_bytes A RawBytes object wrapping a DADA header buffer + * @param nSideChannels Number of side channel items in the data stream, + * @param selectedSideChannel Side channel item used for gating + * @param selectedBit bit of side channel item used for gating + * @param speadHeapSize Size of the spead heap block. + * @param fftLength Size of the FFT + * @param naccumulate Number of samples to integrate in the individual + * FFT bins + * @param nbits Bit depth of the sampled signal + * @param input_level Normalization level of the input signal + * @param output_level Normalization level of the output signal + * @param handler Output handler + * + */ + GatedSpectrometer(std::size_t buffer_bytes, std::size_t nSideChannels, + std::size_t selectedSideChannel, std::size_t selectedBit, + std::size_t speadHeapSize, std::size_t fft_length, + std::size_t naccumulate, std::size_t nbits, + float input_level, float output_level, + HandlerType &handler); + ~GatedSpectrometer(); + + /** + * @brief A callback to be called on connection + * to a ring buffer. + * + * @detail The first available header block in the + * in the ring buffer is provided as an argument. + * It is here that header parameters could be read + * if desired. + * + * @param block A RawBytes object wrapping a DADA header buffer + */ + void init(RawBytes &block); + + /** + * @brief A callback to be called on acqusition of a new + * data block. + * + * @param block A RawBytes object wrapping a DADA data buffer output + * are the integrated specttra with/without bit set. + */ + bool operator()(RawBytes &block); + +private: + void process(thrust::device_vector<RawVoltageType> const &digitiser_raw, + thrust::device_vector<int64_t> const &sideChannelData, + thrust::device_vector<IntegratedPowerType> &detected, + thrust::device_vector<size_t> &noOfBitSet); + +private: + std::size_t _buffer_bytes; + std::size_t _fft_length; + std::size_t _naccumulate; + std::size_t _nbits; + std::size_t _nSideChannels; + std::size_t _selectedSideChannel; + std::size_t _selectedBit; + std::size_t _speadHeapSize; + std::size_t _sideChannelSize; + std::size_t _totalHeapSize; + std::size_t _nHeaps; + std::size_t _gapSize; + std::size_t _dataBlockBytes; + std::size_t _batch; + std::size_t _nsamps_per_output_spectra; + std::size_t _nsamps_per_buffer; + + HandlerType &_handler; + cufftHandle _fft_plan; + int _nchans; + int _call_count; + std::unique_ptr<Unpacker> _unpacker; + std::unique_ptr<DetectorAccumulator<IntegratedPowerType> > _detector; + + // Input data + DoubleDeviceBuffer<RawVoltageType> _raw_voltage_db; + DoubleDeviceBuffer<int64_t> _sideChannelData_db; + + // Output data + DoubleDeviceBuffer<IntegratedPowerType> _power_db; + DoubleDeviceBuffer<size_t> _noOfBitSetsInSideChannel; + DoublePinnedHostBuffer<char> _host_power_db; + + // Intermediate process steps + thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0; + thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1; + thrust::device_vector<ChannelisedVoltageType> _channelised_voltage; + thrust::device_vector<UnpackedVoltageType> _baseLineN; + + cudaStream_t _h2d_stream; + cudaStream_t _proc_stream; + cudaStream_t _d2h_stream; +}; + + +/** + * @brief Splits the input data depending on a bit set into two arrays. + * + * @detail The resulting gaps are filled with zeros in the other stream. + * + * @param GO Input data. Data is set to the baseline value if corresponding + * sideChannelData bit at bitpos os set. + * @param G1 Data in this array is set to the baseline value if corresponding + * sideChannelData bit at bitpos is not set. + * @param sideChannelData noOfSideChannels items per block of heapSize + * bytes in the input data. + * @param N lebgth of the input/output arrays G0. + * @param heapsize Size of the blocks for which there is an entry in the + sideChannelData. + * @param bitpos Position of the bit to evaluate for processing. + * @param noOfSideChannels Number of side channels items per block of + * data. + * @param selectedSideChannel No. of side channel item to be eveluated. + 0 <= selectedSideChannel < noOfSideChannels. + */ +__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, + size_t N, size_t heapSize, size_t bitpos, + size_t noOfSideChannels, size_t selectedSideChannel, const float *_baseLine); + + +/** + * @brief Sums all elements of an input array. + * + * @detail The results is stored in an array with one value per launch + * block. Full reuction thus requires two kernel launches. + * + * @param in. Input array. + * @param N. Size of input array. + * @param out. Output array. + * */ +__global__ void array_sum(float *in, size_t N, float *out); + + + +} // edd +} // effelsberg +} // psrdada_cpp + +#include "psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu" +#endif //PSRDADA_CPP_EFFELSBERG_EDD_GATEDSPECTROMETER_HPP diff --git a/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu index a14c9b06423363e5a0e9bc22fb907aec52a8a232..a0108d0dd22aa58e4ef5f88a27fb792fcc3b535a 100644 --- a/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu @@ -2,6 +2,7 @@ #include "psrdada_cpp/common.hpp" #include "psrdada_cpp/cuda_utils.hpp" #include "psrdada_cpp/raw_bytes.hpp" +#include <cuda_profiler_api.h> #include <cuda.h> namespace psrdada_cpp { @@ -62,7 +63,7 @@ FftSpectrometer<HandlerType>::FftSpectrometer( CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream)); CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream)); _unpacker.reset(new Unpacker(_proc_stream)); - _detector.reset(new DetectorAccumulator(_nchans, _naccumulate, + _detector.reset(new DetectorAccumulator<int8_t>(_nchans, _naccumulate, scaling, offset, _proc_stream)); } @@ -111,7 +112,15 @@ bool FftSpectrometer<HandlerType>::operator()(RawBytes& block) { ++_call_count; BOOST_LOG_TRIVIAL(debug) << "FftSpectrometer operator() called (count = " << _call_count << ")"; - assert(block.used_bytes() == _buffer_bytes /* Unexpected buffer size */); + if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */ + BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got " + << block.used_bytes() << " byte, expected " + << _buffer_bytes << " byte)"; + cudaDeviceSynchronize(); + cudaProfilerStop(); + return true; + } + CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream)); _raw_voltage_db.swap(); diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu new file mode 100644 index 0000000000000000000000000000000000000000..cba8928945eaac85636948ed9579440fb2ba125f --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -0,0 +1,438 @@ +#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh" +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/cuda_utils.hpp" +#include "psrdada_cpp/raw_bytes.hpp" + +#include <cuda.h> +#include <cuda_profiler_api.h> +#include <thrust/system/cuda/execution_policy.h> + +#include <iostream> +#include <cstring> +#include <sstream> + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { + +__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const int64_t* __restrict__ sideChannelData, + size_t N, size_t heapSize, size_t bitpos, + size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) { + float baseLine = (*_baseLineN) / N; + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + i += blockDim.x * gridDim.x) { + const float w = G0[i] - baseLine; + const int64_t sideChannelItem = + sideChannelData[((i / heapSize) * (noOfSideChannels)) + + selectedSideChannel]; // Probably not optimal access as + // same data is copied for several + // threads, but maybe efficiently + // handled by cache? + + const int bit_set = TEST_BIT(sideChannelItem, bitpos); + G1[i] = w * bit_set + baseLine; + G0[i] = w * (!bit_set) + baseLine; + } +} + + +__global__ void countBitSet(const int64_t *sideChannelData, size_t N, size_t + bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t + *nBitsSet) +{ + // really not optimized reduction, but here only trivial array sizes. + int i = blockIdx.x * blockDim.x + threadIdx.x; + __shared__ unsigned int x[256]; + + if (i == 0) + nBitsSet[0] = 0; + + if (i * noOfSideChannels + selectedSideChannel < N) + x[threadIdx.x] = TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos); + else + x[threadIdx.x] = 0; + __syncthreads(); + + for(int s = blockDim.x / 2; s > 0; s = s / 2) + { + if (threadIdx.x < s) + x[threadIdx.x] += x[threadIdx.x + s]; + __syncthreads(); + } + + if(threadIdx.x == 0) + nBitsSet[0] += x[threadIdx.x]; +} + + +// blocksize for the array sum kernel +#define array_sum_Nthreads 1024 + +__global__ void array_sum(float *in, size_t N, float *out) { + extern __shared__ float data[]; + + size_t tid = threadIdx.x; + + float ls = 0; + + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + i += blockDim.x * gridDim.x) { + ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array + } + + data[tid] = ls; + __syncthreads(); + + for (size_t i = blockDim.x / 2; i > 0; i /= 2) { + if (tid < i) { + data[tid] += data[tid + i]; + } + __syncthreads(); + } + + // unroll last warp + // if (tid < 32) + //{ + // warpReduce(data, tid); + //} + + if (tid == 0) { + out[blockIdx.x] = data[0]; + } +} + + +template <class HandlerType, typename IntegratedPowerType> +GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( + std::size_t buffer_bytes, std::size_t nSideChannels, + std::size_t selectedSideChannel, std::size_t selectedBit, + std::size_t speadHeapSize, std::size_t fft_length, std::size_t naccumulate, + std::size_t nbits, float input_level, float output_level, + HandlerType &handler) + : _buffer_bytes(buffer_bytes), _nSideChannels(nSideChannels), + _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit), + _speadHeapSize(speadHeapSize), _fft_length(fft_length), + _naccumulate(naccumulate), _nbits(nbits), _handler(handler), _fft_plan(0), + _call_count(0) { + + // Sanity checks + assert(((_nbits == 12) || (_nbits == 8))); + assert(_naccumulate > 0); + + // check for any device errors + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + + BOOST_LOG_TRIVIAL(info) + << "Creating new GatedSpectrometer instance with parameters: \n" + << " fft_length " << _fft_length << "\n" + << " naccumulate " << _naccumulate << "\n" + << " nSideChannels " << _nSideChannels << "\n" + << " speadHeapSize " << _speadHeapSize << " byte\n" + << " selectedSideChannel " << _selectedSideChannel << "\n" + << " selectedBit " << _selectedBit << "\n" + << " output bit depth " << sizeof(IntegratedPowerType) * 8; + + _sideChannelSize = nSideChannels * sizeof(int64_t); + _totalHeapSize = _speadHeapSize + _sideChannelSize; + _nHeaps = buffer_bytes / _totalHeapSize; + _gapSize = (buffer_bytes - _nHeaps * _totalHeapSize); + _dataBlockBytes = _nHeaps * _speadHeapSize; + + assert((nSideChannels == 0) || + (selectedSideChannel < + nSideChannels)); // Sanity check of side channel value + assert(selectedBit < 64); // Sanity check of selected bit + BOOST_LOG_TRIVIAL(info) << "Resulting memory configuration: \n" + << " totalSizeOfHeap: " << _totalHeapSize << " byte\n" + << " number of heaps per buffer: " << _nHeaps << "\n" + << " resulting gap: " << _gapSize << " byte\n" + << " datablock size in buffer: " << _dataBlockBytes << " byte\n"; + + _nsamps_per_buffer = _dataBlockBytes * 8 / nbits; + + _nsamps_per_output_spectra = fft_length * naccumulate; + int nBlocks; + if (_nsamps_per_output_spectra <= _nsamps_per_buffer) + { // one buffer block is used for one or multiple output spectra + size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra; + // All data in one block has to be used + assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer); + nBlocks = 1; + } + else + { // multiple blocks are integrated intoone output + size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer; + // All data in multiple blocks has to be used + assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra); + nBlocks = N; + } + BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra << " samples from " << nBlocks << " into one spectra."; + + std::size_t n64bit_words = _dataBlockBytes / sizeof(uint64_t); + _nchans = _fft_length / 2 + 1; + int batch = _nsamps_per_buffer / _fft_length; + float dof = 2 * _naccumulate; + float scale = + std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2); + float offset = scale * dof; + float scaling = scale * std::sqrt(2 * dof) / output_level; + BOOST_LOG_TRIVIAL(debug) + << "Correction factors for 8-bit conversion: offset = " << offset + << ", scaling = " << scaling; + + BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan"; + int n[] = {static_cast<int>(_fft_length)}; + CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL, + 1, _nchans, CUFFT_R2C, batch)); + cufftSetStream(_fft_plan, _proc_stream); + + BOOST_LOG_TRIVIAL(debug) << "Allocating memory"; + _raw_voltage_db.resize(n64bit_words); + _sideChannelData_db.resize(_sideChannelSize * _nHeaps); + BOOST_LOG_TRIVIAL(debug) << " Input voltages size (in 64-bit words): " + << _raw_voltage_db.size(); + _unpacked_voltage_G0.resize(_nsamps_per_buffer); + _unpacked_voltage_G1.resize(_nsamps_per_buffer); + + _baseLineN.resize(array_sum_Nthreads); + BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " + << _unpacked_voltage_G0.size(); + _channelised_voltage.resize(_nchans * batch); + BOOST_LOG_TRIVIAL(debug) << " Channelised voltages size: " + << _channelised_voltage.size(); + _power_db.resize(_nchans * batch / (_naccumulate / nBlocks) * 2); // hold on and off spectra to simplify output + thrust::fill(_power_db.a().begin(), _power_db.a().end(), 0.); + thrust::fill(_power_db.b().begin(), _power_db.b().end(), 0.); + BOOST_LOG_TRIVIAL(debug) << " Powers size: " << _power_db.size() / 2; + + _noOfBitSetsInSideChannel.resize( batch / (_naccumulate / nBlocks)); + thrust::fill(_noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.); + thrust::fill(_noOfBitSetsInSideChannel.b().begin(), _noOfBitSetsInSideChannel.b().end(), 0.); + + // on the host both power are stored in the same data buffer together with + // the number of bit sets + _host_power_db.resize( _power_db.size() * sizeof(IntegratedPowerType) + 2 * sizeof(size_t) * _noOfBitSetsInSideChannel.size()); + + CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream)); + CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream)); + CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream)); + CUFFT_ERROR_CHECK(cufftSetStream(_fft_plan, _proc_stream)); + + _unpacker.reset(new Unpacker(_proc_stream)); + _detector.reset(new DetectorAccumulator<IntegratedPowerType>(_nchans, _naccumulate / nBlocks, scaling, + offset, _proc_stream)); +} // constructor + + +template <class HandlerType, typename IntegratedPowerType> +GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() { + BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer"; + if (!_fft_plan) + cufftDestroy(_fft_plan); + cudaStreamDestroy(_h2d_stream); + cudaStreamDestroy(_proc_stream); + cudaStreamDestroy(_d2h_stream); +} + + +template <class HandlerType, typename IntegratedPowerType> +void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block) { + BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer init called"; + std::stringstream headerInfo; + headerInfo << "\n" + << "# Gated spectrometer parameters: \n" + << "fft_length " << _fft_length << "\n" + << "nchannels " << _fft_length << "\n" + << "naccumulate " << _naccumulate << "\n" + << "selected_side_channel " << _selectedSideChannel << "\n" + << "selected_bit " << _selectedBit << "\n" + << "output_bit_depth " << sizeof(IntegratedPowerType) * 8; + + size_t bEnd = std::strlen(block.ptr()); + if (bEnd + headerInfo.str().size() < block.total_bytes()) + { + std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str()); + } + else + { + BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes() + << " bytes already contains " << bEnd + << "bytes. Cannot add gated spectrometer info of size " + << headerInfo.str().size() << " bytes."; + } + + _handler.init(block); +} + + +template <class HandlerType, typename IntegratedPowerType> +void GatedSpectrometer<HandlerType, IntegratedPowerType>::process( + thrust::device_vector<RawVoltageType> const &digitiser_raw, + thrust::device_vector<int64_t> const &sideChannelData, + thrust::device_vector<IntegratedPowerType> &detected, thrust::device_vector<size_t> &noOfBitSet) { + BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages"; + switch (_nbits) { + case 8: + _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage_G0); + break; + case 12: + _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage_G0); + break; + default: + throw std::runtime_error("Unsupported number of bits"); + } + BOOST_LOG_TRIVIAL(debug) << "Calculate baseline"; + psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data())); + + BOOST_LOG_TRIVIAL(debug) << "Perform gating"; + gating<<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), + thrust::raw_pointer_cast(_unpacked_voltage_G1.data()), + thrust::raw_pointer_cast(sideChannelData.data()), + _unpacked_voltage_G0.size(), _speadHeapSize, _selectedBit, _nSideChannels, + _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data())); + + for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++) + { // ToDo: Should be in one kernel call + countBitSet<<<(sideChannelData.size()+255)/256, 256, 0, + _proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / _noOfBitSetsInSideChannel.size() ), + sideChannelData.size() / _noOfBitSetsInSideChannel.size(), _selectedBit, + _nSideChannels, _selectedBit, + thrust::raw_pointer_cast(noOfBitSet.data() + i)); + } + + BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1"; + UnpackedVoltageType *_unpacked_voltage_ptr = + thrust::raw_pointer_cast(_unpacked_voltage_G0.data()); + ChannelisedVoltageType *_channelised_voltage_ptr = + thrust::raw_pointer_cast(_channelised_voltage.data()); + CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, + (cufftComplex *)_channelised_voltage_ptr)); + _detector->detect(_channelised_voltage, detected, 2, 0); + + BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2"; + _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data()); + CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, + (cufftComplex *)_channelised_voltage_ptr)); + + _detector->detect(_channelised_voltage, detected, 2, 1); + CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); + BOOST_LOG_TRIVIAL(debug) << "Exit processing"; +} // process + + +template <class HandlerType, typename IntegratedPowerType> +bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &block) { + ++_call_count; + BOOST_LOG_TRIVIAL(debug) << "GatedSpectrometer operator() called (count = " + << _call_count << ")"; + if (block.used_bytes() != _buffer_bytes) { /* Unexpected buffer size */ + BOOST_LOG_TRIVIAL(error) << "Unexpected Buffer Size - Got " + << block.used_bytes() << " byte, expected " + << _buffer_bytes << " byte)"; + CUDA_ERROR_CHECK(cudaDeviceSynchronize()); + cudaProfilerStop(); + return true; + } + + // Copy data to device + CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream)); + _raw_voltage_db.swap(); + _sideChannelData_db.swap(); + + BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes() + << ", dataBlockBytes = " << _dataBlockBytes << "\n"; + + CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()), + static_cast<void *>(block.ptr()), + _dataBlockBytes, cudaMemcpyHostToDevice, + _h2d_stream)); + CUDA_ERROR_CHECK(cudaMemcpyAsync( + static_cast<void *>(_sideChannelData_db.a_ptr()), + static_cast<void *>(block.ptr() + _dataBlockBytes + _gapSize), + _sideChannelSize * _nHeaps, cudaMemcpyHostToDevice, _h2d_stream)); + + if (_call_count == 1) { + return false; + } + // process data + + // only if a newblock is started the output buffer is swapped. Otherwise the + // new data is added to it + bool newBlock = false; + if (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0) // _call_count -1 because this is the block number on the device + { + BOOST_LOG_TRIVIAL(debug) << "Starting new output block."; + newBlock = true; + _power_db.swap(); + _noOfBitSetsInSideChannel.swap(); + // move to specific stream! + thrust::fill(thrust::cuda::par.on(_proc_stream),_power_db.a().begin(), _power_db.a().end(), 0.); + thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSetsInSideChannel.a().begin(), _noOfBitSetsInSideChannel.a().end(), 0.); + } + + process(_raw_voltage_db.b(), _sideChannelData_db.b(), _power_db.a(), _noOfBitSetsInSideChannel.a()); + CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); + + if ((_call_count == 2) || (!newBlock)) { + return false; + } + + // copy data to host if block is finished + CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream)); + _host_power_db.swap(); + + for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++) + { + size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); + // copy 2x channel data + CUDA_ERROR_CHECK( + cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset) , + static_cast<void *>(_power_db.b_ptr() + 2 * i * _nchans), + 2 * _nchans * sizeof(IntegratedPowerType), + cudaMemcpyDeviceToHost, _d2h_stream)); + // copy noOf bit set data + CUDA_ERROR_CHECK( + cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * _nchans ), + static_cast<void *>(_noOfBitSetsInSideChannel.b_ptr() + i ), + 1 * sizeof(size_t), + cudaMemcpyDeviceToHost, _d2h_stream)); + } + + BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host"; + + if (_call_count == 3) { + return false; + } + + // calculate off value + BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count << " with " << _noOfBitSetsInSideChannel.size() << " output heaps:"; + for (size_t i = 0; i < _noOfBitSetsInSideChannel.size(); i++) + { + size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); + + size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType)); + size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t)); + *off_values = _nHeaps - (*on_values); + + BOOST_LOG_TRIVIAL(info) << " " << i << ": No of bit set in side channel: " << *on_values << " / " << *off_values << std::endl; + } + + // Wrap in a RawBytes object here; + RawBytes bytes(reinterpret_cast<char *>(_host_power_db.b_ptr()), + _host_power_db.size(), + _host_power_db.size()); + BOOST_LOG_TRIVIAL(debug) << "Calling handler"; + // The handler can't do anything asynchronously without a copy here + // as it would be unsafe (given that it does not own the memory it + // is being passed). + + _handler(bytes); + return false; // +} // operator () + +} // edd +} // effelsberg +} // psrdada_cpp + diff --git a/psrdada_cpp/effelsberg/edd/src/DetectorAccumulator.cu b/psrdada_cpp/effelsberg/edd/src/DetectorAccumulator.cu index e4c1b08f7671ca4d2f49c7316a2b38a5dbe5f8f5..eb6f88b9fc26e6af7ea9fcf68e08cd6afcabef6b 100644 --- a/psrdada_cpp/effelsberg/edd/src/DetectorAccumulator.cu +++ b/psrdada_cpp/effelsberg/edd/src/DetectorAccumulator.cu @@ -6,60 +6,9 @@ namespace effelsberg { namespace edd { namespace kernels { -__global__ -void detect_and_accumulate(float2 const* __restrict__ in, int8_t* __restrict__ out, - int nchans, int nsamps, int naccumulate, float scale, float offset) -{ - for (int block_idx = blockIdx.x; block_idx < nsamps/naccumulate; block_idx += gridDim.x) - { - int read_offset = block_idx * naccumulate * nchans; - int write_offset = block_idx * nchans; - for (int chan_idx = threadIdx.x; chan_idx < nchans; chan_idx += blockDim.x) - { - float sum = 0.0f; - for (int ii=0; ii < naccumulate; ++ii) - { - float2 tmp = in[read_offset + chan_idx + ii*nchans]; - float x = tmp.x * tmp.x; - float y = tmp.y * tmp.y; - sum += x + y; - } - out[write_offset + chan_idx] = (int8_t) ((sum - offset)/scale); - } - } -} } //namespace kernels -DetectorAccumulator::DetectorAccumulator( - int nchans, int tscrunch, float scale, - float offset, cudaStream_t stream) - : _nchans(nchans) - , _tscrunch(tscrunch) - , _scale(scale) - , _offset(offset) - , _stream(stream) -{ - -} - -DetectorAccumulator::~DetectorAccumulator() -{ - -} - -void DetectorAccumulator::detect(InputType const& input, OutputType& output) -{ - assert(input.size() % (_nchans * _tscrunch) == 0 /* Input is not a multiple of _nchans * _tscrunch*/); - output.resize(input.size()/_tscrunch); - int nsamps = input.size() / _nchans; - float2 const* input_ptr = thrust::raw_pointer_cast(input.data()); - int8_t* output_ptr = thrust::raw_pointer_cast(output.data()); - kernels::detect_and_accumulate<<<1024, 1024, 0, _stream>>>( - input_ptr, output_ptr, _nchans, nsamps, _tscrunch, _scale, _offset); - CUDA_ERROR_CHECK(cudaStreamSynchronize(_stream)); -} - } //namespace edd } //namespace effelsberg } //namespace psrdada_cpp diff --git a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu new file mode 100644 index 0000000000000000000000000000000000000000..85873e1c277ca4801d6951be443af22374b9defb --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu @@ -0,0 +1,211 @@ +#include "boost/program_options.hpp" +#include "psrdada_cpp/cli_utils.hpp" +#include "psrdada_cpp/common.hpp" +#include "psrdada_cpp/dada_client_base.hpp" +#include "psrdada_cpp/dada_input_stream.hpp" +#include "psrdada_cpp/dada_null_sink.hpp" +#include "psrdada_cpp/dada_output_stream.hpp" +#include "psrdada_cpp/multilog.hpp" +#include "psrdada_cpp/simple_file_writer.hpp" + +#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh" + +#include <ctime> +#include <iostream> +#include <time.h> + + +using namespace psrdada_cpp; + + +namespace { +const size_t ERROR_IN_COMMAND_LINE = 1; +const size_t SUCCESS = 0; +const size_t ERROR_UNHANDLED_EXCEPTION = 2; +} // namespace + + +template<typename T> +void launchSpectrometer(std::string const &output_type, key_t input_key, std::string const &filename, size_t nSideChannels, size_t selectedSideChannel, size_t selectedBit, size_t speadHeapSize, size_t fft_length, size_t naccumulate, unsigned int nbits, float input_level, float output_level) +{ + MultiLog log("edd::GatedSpectrometer"); + DadaClientBase client(input_key, log); + std::size_t buffer_bytes = client.data_buffer_size(); + + std::cout << "Running with output_type: " << output_type << std::endl; + if (output_type == "file") + { + SimpleFileWriter sink(filename); + effelsberg::edd::GatedSpectrometer<decltype(sink), T> spectrometer( + buffer_bytes, nSideChannels, selectedSideChannel, selectedBit, + speadHeapSize, fft_length, naccumulate, nbits, input_level, + output_level, sink); + DadaInputStream<decltype(spectrometer)> istream(input_key, log, + spectrometer); + istream.start(); + } + else if (output_type == "dada") + { + DadaOutputStream sink(string_to_key(filename), log); + effelsberg::edd::GatedSpectrometer<decltype(sink), T> spectrometer( + buffer_bytes, nSideChannels, selectedSideChannel, selectedBit, + speadHeapSize, fft_length, naccumulate, nbits, input_level, + output_level, sink); + DadaInputStream<decltype(spectrometer)> istream(input_key, log, + spectrometer); + istream.start(); + } + else + { + throw std::runtime_error("Unknown oputput-type"); + } +} + + + +int main(int argc, char **argv) { + try { + key_t input_key; + int fft_length; + int naccumulate; + unsigned int nbits; + size_t nSideChannels; + size_t selectedSideChannel; + size_t selectedBit; + size_t speadHeapSize; + float input_level; + float output_level; + std::time_t now = std::time(NULL); + std::tm *ptm = std::localtime(&now); + char buffer[32]; + std::strftime(buffer, 32, "%Y-%m-%d-%H:%M:%S.bp", ptm); + std::string filename(buffer); + std::string output_type = "file"; + unsigned int output_bit_depth; + + /** Define and parse the program options + */ + namespace po = boost::program_options; + po::options_description desc("Options"); + + desc.add_options()("help,h", "Print help messages"); + desc.add_options()( + "input_key,i", + po::value<std::string>()->default_value("dada")->notifier( + [&input_key](std::string in) { input_key = string_to_key(in); }), + "The shared memory key for the dada buffer to connect to (hex " + "string)"); + desc.add_options()( + "output_type", po::value<std::string>(&output_type)->default_value(output_type), + "output type [dada, file]. Default is file." + ); + desc.add_options()( + "output_bit_depth", po::value<unsigned int>(&output_bit_depth)->default_value(8), + "output_bit_depth [8, 32]. Default is 32." + ); + desc.add_options()( + "output_key,o", po::value<std::string>(&filename)->default_value(filename), + "The key of the output bnuffer / name of the output file to write spectra " + "to"); + + desc.add_options()("nsidechannelitems,s", + po::value<size_t>()->default_value(1)->notifier( + [&nSideChannels](size_t in) { nSideChannels = in; }), + "Number of side channel items ( s >= 1)"); + desc.add_options()( + "selected_sidechannel,e", + po::value<size_t>()->default_value(0)->notifier( + [&selectedSideChannel](size_t in) { selectedSideChannel = in; }), + "Side channel selected for evaluation."); + desc.add_options()("selected_bit,B", + po::value<size_t>()->default_value(63)->notifier( + [&selectedBit](size_t in) { selectedBit = in; }), + "Side channel selected for evaluation."); + desc.add_options()("speadheap_size", + po::value<size_t>()->default_value(4096)->notifier( + [&speadHeapSize](size_t in) { speadHeapSize = in; }), + "size of the spead data heaps. The number of the " + "heaps in the dada block depends on the number of " + "side channel items."); + desc.add_options()("nbits,b", po::value<unsigned int>(&nbits)->required(), + "The number of bits per sample in the " + "packetiser output (8 or 12)"); + + + + desc.add_options()("fft_length,n", po::value<int>(&fft_length)->required(), + "The length of the FFT to perform on the data"); + desc.add_options()("naccumulate,a", + po::value<int>(&naccumulate)->required(), + "The number of samples to integrate in each channel"); + desc.add_options()("input_level", + po::value<float>(&input_level)->required(), + "The input power level (standard " + "deviation, used for 8-bit conversion)"); + desc.add_options()("output_level", + po::value<float>(&output_level)->required(), + "The output power level (standard " + "deviation, used for 8-bit " + "conversion)"); + desc.add_options()( + "log_level", po::value<std::string>()->default_value("info")->notifier( + [](std::string level) { set_log_level(level); }), + "The logging level to use " + "(debug, info, warning, " + "error)"); + + po::variables_map vm; + try { + po::store(po::parse_command_line(argc, argv, desc), vm); + if (vm.count("help")) { + std::cout << "GatedSpectrometer -- Read EDD data from a DADA buffer " + "and split the data into two streams depending on a bit " + "set in the side channel data. On each stream a simple " + "FFT spectrometer is performed." + << std::endl + << desc << std::endl; + return SUCCESS; + } + + po::notify(vm); + if (vm.count("output_type") && (!(output_type == "dada" || output_type == "file") )) + { + throw po::validation_error(po::validation_error::invalid_option_value, "output_type", output_type); + } + + if (!(nSideChannels >= 1)) + { + throw po::validation_error(po::validation_error::invalid_option_value, "Number of side channels must be 1 or larger!"); + } + + } catch (po::error &e) { + std::cerr << "ERROR: " << e.what() << std::endl << std::endl; + std::cerr << desc << std::endl; + return ERROR_IN_COMMAND_LINE; + } + + if (output_bit_depth == 8) + { + launchSpectrometer<int8_t>(output_type, input_key, filename, + nSideChannels, selectedSideChannel, selectedBit, speadHeapSize, + fft_length, naccumulate, nbits, input_level, output_level); + } + else if (output_bit_depth == 32) + { + launchSpectrometer<float>(output_type, input_key, filename, + nSideChannels, selectedSideChannel, selectedBit, speadHeapSize, + fft_length, naccumulate, nbits, input_level, output_level); + } + else + { + throw po::validation_error(po::validation_error::invalid_option_value, "Output bit depth must be 8 or 32"); + } + + } catch (std::exception &e) { + std::cerr << "Unhandled Exception reached the top of main: " << e.what() + << ", application will now exit" << std::endl; + return ERROR_UNHANDLED_EXCEPTION; + } + return SUCCESS; +} + diff --git a/psrdada_cpp/effelsberg/edd/src/Unpacker.cu b/psrdada_cpp/effelsberg/edd/src/Unpacker.cu index 3aa6cdc5d81c7b983cd307b9e6f5e0ccdb3788e0..dbb4a7917be5adecff2348c51ab19ea38693dd3c 100644 --- a/psrdada_cpp/effelsberg/edd/src/Unpacker.cu +++ b/psrdada_cpp/effelsberg/edd/src/Unpacker.cu @@ -133,7 +133,6 @@ void Unpacker::unpack<12>(InputType const& input, OutputType& output) OutputType::value_type* output_ptr = thrust::raw_pointer_cast(output.data()); kernels::unpack_edd_12bit_to_float32<<< nblocks, EDD_NTHREADS_UNPACK, 0, _stream>>>( input_ptr, output_ptr, input.size()); - CUDA_ERROR_CHECK(cudaStreamSynchronize(_stream)); } template <> @@ -148,7 +147,6 @@ void Unpacker::unpack<8>(InputType const& input, OutputType& output) OutputType::value_type* output_ptr = thrust::raw_pointer_cast(output.data()); kernels::unpack_edd_8bit_to_float32<<< nblocks, EDD_NTHREADS_UNPACK, 0, _stream>>>( input_ptr, output_ptr, input.size()); - CUDA_ERROR_CHECK(cudaStreamSynchronize(_stream)); } } //namespace edd diff --git a/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt index 2d06dc86f930dc5167a808d100053e1aa9d93ba8..c00af677d2c5da97f530bd22e31261bd253254fb 100644 --- a/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt @@ -10,7 +10,8 @@ set( src/UnpackerTester.cu src/ScaledTransposeTFtoTFTTester.cu src/VLBITest.cu + src/GatedSpectrometerTest.cu ) cuda_add_executable(gtest_edd ${gtest_edd_src} ) -target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES}) +target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas) add_test(gtest_edd gtest_edd --test_data "${CMAKE_CURRENT_LIST_DIR}/data") diff --git a/psrdada_cpp/effelsberg/edd/test/DetectorAccumulatorTester.cuh b/psrdada_cpp/effelsberg/edd/test/DetectorAccumulatorTester.cuh index 5f2dfea94f7a8f97f34397460c70c1bc1ebb3e2c..f472e4336b97a5b8e2f6bde4ff398a9c54856bba 100644 --- a/psrdada_cpp/effelsberg/edd/test/DetectorAccumulatorTester.cuh +++ b/psrdada_cpp/effelsberg/edd/test/DetectorAccumulatorTester.cuh @@ -35,7 +35,7 @@ protected: float offset); void compare_against_host( - DetectorAccumulator::OutputType const& gpu_output, + DetectorAccumulator<int8_t>::OutputType const& gpu_output, OutputType const& host_output); protected: diff --git a/psrdada_cpp/effelsberg/edd/test/src/DetectorAccumulatorTester.cu b/psrdada_cpp/effelsberg/edd/test/src/DetectorAccumulatorTester.cu index 97e7381971c881aa09fe881111b262086cfe5577..fdcacff20a361028cb758a81326e0bf79d33ba02 100644 --- a/psrdada_cpp/effelsberg/edd/test/src/DetectorAccumulatorTester.cu +++ b/psrdada_cpp/effelsberg/edd/test/src/DetectorAccumulatorTester.cu @@ -47,7 +47,7 @@ void DetectorAccumulatorTester::detect_c_reference( output_sample_idx < nsamples_out; ++output_sample_idx) { - float value = 0.0f; + double value = 0.0f; for (int input_sample_offset=0; input_sample_offset < tscrunch; ++input_sample_offset) @@ -62,7 +62,7 @@ void DetectorAccumulatorTester::detect_c_reference( } void DetectorAccumulatorTester::compare_against_host( - DetectorAccumulator::OutputType const& gpu_output, + DetectorAccumulator<int8_t>::OutputType const& gpu_output, OutputType const& host_output) { OutputType copy_from_gpu = gpu_output; @@ -88,12 +88,14 @@ TEST_F(DetectorAccumulatorTester, noise_test) host_input[ii].x = distribution(generator); host_input[ii].y = distribution(generator); } - DetectorAccumulator::InputType gpu_input = host_input; - DetectorAccumulator::OutputType gpu_output; + DetectorAccumulator<int8_t>::InputType gpu_input = host_input; + DetectorAccumulator<int8_t>::OutputType gpu_output; + gpu_output.resize(gpu_input.size() / tscrunch ); OutputType host_output; - DetectorAccumulator detector(nchans, tscrunch, scale, 0.0, _stream); + DetectorAccumulator<int8_t> detector(nchans, tscrunch, scale, 0.0, _stream); detector.detect(gpu_input, gpu_output); detect_c_reference(host_input, host_output, nchans, tscrunch, scale, 0.0); + CUDA_ERROR_CHECK(cudaStreamSynchronize(_stream)); compare_against_host(gpu_output, host_output); } diff --git a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu new file mode 100644 index 0000000000000000000000000000000000000000..22babc11922bbab55b9fa06a31da15ca411ad6b4 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu @@ -0,0 +1,179 @@ +#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh" + +#include "psrdada_cpp/dada_null_sink.hpp" +#include "psrdada_cpp/multilog.hpp" +#include "gtest/gtest.h" + +#include "thrust/device_vector.h" +#include "thrust/extrema.h" + +namespace { + +TEST(GatedSpectrometer, BitManipulationMacros) { + for (int i = 0; i < 64; i++) { + int64_t v = 0; + SET_BIT(v, i); + + for (int j = 0; j < 64; j++) { + if (j == i) + EXPECT_EQ(TEST_BIT(v, j), 1); + else + EXPECT_EQ(TEST_BIT(v, j), 0); + } + } +} + +// +//TEST(GatedSpectrometer, ParameterSanity) { +// ::testing::FLAGS_gtest_death_test_style = "threadsafe"; +// psrdada_cpp::NullSink sink; +// +// // 8 or 12 bit sampling +// EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 0, 0, 0, 4096, 0, 0, 0, 0, 0, sink), +// "_nbits == 8"); +// // naccumulate > 0 +// EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 0, 0, 0, 4096, 0, 0, 8, 0, 0, sink), +// "_naccumulate"); +// +// // selected side channel +// EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 1, 2, 0, 4096, 0, 1, 8, 0, 0, sink), +// "nSideChannels"); +// +// // selected bit +// EXPECT_DEATH(psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink),int8_t > (0, 2, 1, 65, 4096, 0, 1, 8, 0, 0, sink), +// "selectedBit"); +// +// // valid construction +// psrdada_cpp::effelsberg::edd::GatedSpectrometer<decltype(sink), int8_t> a( +// 4096 * 4096, 2, 1, 63, 4096, 1024, 1, 8, 100., 100., sink); +//} +} // namespace + + +TEST(GatedSpectrometer, GatingKernel) +{ + size_t blockSize = 1024; + size_t nBlocks = 16; + + thrust::device_vector<float> G0(blockSize * nBlocks); + thrust::device_vector<float> G1(blockSize * nBlocks); + thrust::device_vector<int64_t> _sideChannelData(nBlocks); + thrust::device_vector<float> baseLine(1); + + thrust::fill(G0.begin(), G0.end(), 42); + thrust::fill(G1.begin(), G1.end(), 23); + thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0); + + // everything to G0 + { + baseLine[0] = 0.; + const int64_t *sideCD = + (int64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); + psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>( + thrust::raw_pointer_cast(G0.data()), + thrust::raw_pointer_cast(G1.data()), sideCD, + G0.size(), blockSize, 0, 1, + 0, thrust::raw_pointer_cast(baseLine.data())); + thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; + minmax = thrust::minmax_element(G0.begin(), G0.end()); + EXPECT_EQ(*minmax.first, 42); + EXPECT_EQ(*minmax.second, 42); + + minmax = thrust::minmax_element(G1.begin(), G1.end()); + EXPECT_EQ(*minmax.first, 0); + EXPECT_EQ(*minmax.second, 0); + } + + // everything to G1 // with baseline -5 + { + baseLine[0] = -5. * G0.size(); + thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L); + const int64_t *sideCD = + (int64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); + psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>( + thrust::raw_pointer_cast(G0.data()), + thrust::raw_pointer_cast(G1.data()), sideCD, + G0.size(), blockSize, 0, 1, + 0, thrust::raw_pointer_cast(baseLine.data())); + thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; + minmax = thrust::minmax_element(G0.begin(), G0.end()); + EXPECT_EQ(*minmax.first, -5.); + EXPECT_EQ(*minmax.second, -5.); + + minmax = thrust::minmax_element(G1.begin(), G1.end()); + EXPECT_EQ(*minmax.first, 42); + EXPECT_EQ(*minmax.second, 42); + } +} + + +TEST(GatedSpectrometer, countBitSet) { + size_t nBlocks = 16; + thrust::device_vector<int64_t> _sideChannelData(nBlocks); + thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0); + const int64_t *sideCD = + (int64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); + + thrust::device_vector<size_t> res(1); + + // test 1 side channel + res[0] = 0; + psrdada_cpp::effelsberg::edd:: + countBitSet<<<(_sideChannelData.size() + 255) / 256, 256>>>( + sideCD, nBlocks, 0, 1, 0, thrust::raw_pointer_cast(res.data())); + EXPECT_EQ(res[0], 0u); + + res[0] = 0; + thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L); + psrdada_cpp::effelsberg::edd::countBitSet<<<(_sideChannelData.size() + 255) / 256, 256>>>( + sideCD, nBlocks, 0, 1, 0, thrust::raw_pointer_cast(res.data())); + EXPECT_EQ(res[0], nBlocks); + + // test mult side channels w stride. + res[0] = 0; + int nSideChannels = 4; + thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0); + for (size_t i = 2; i < _sideChannelData.size(); i += nSideChannels) + _sideChannelData[i] = 1L; + psrdada_cpp::effelsberg::edd::countBitSet<<<(_sideChannelData.size() + 255) / 256, 256>>>( + sideCD, nBlocks, 0, nSideChannels, 3, + thrust::raw_pointer_cast(res.data())); + EXPECT_EQ(res[0], 0u); + + res[0] = 0; + psrdada_cpp::effelsberg::edd::countBitSet<<<(_sideChannelData.size() + 255) / 256, 256>>>( + sideCD, nBlocks, 0, nSideChannels, 2, + thrust::raw_pointer_cast(res.data())); + EXPECT_EQ(res[0], nBlocks / nSideChannels); +} + + +TEST(GatedSpectrometer, array_sum) { + + const size_t NBLOCKS = 16 * 32; + const size_t NTHREADS = 1024; + + size_t inputLength = 1 << 22 + 1 ; + size_t dataLength = inputLength; + ////zero pad input array + //if (inputLength % (2 * NTHREADS) != 0) + // dataLength = (inputLength / (2 * NTHREADS) + 1) * 2 * NTHREADS; + thrust::device_vector<float> data(dataLength); + thrust::fill(data.begin(), data.begin() + inputLength, 1); + //thrust::fill(data.begin() + inputLength, data.end(), 0); + thrust::device_vector<float> blr(NTHREADS * 2); + + thrust::fill(blr.begin(), blr.end(), 0); + + psrdada_cpp::effelsberg::edd::array_sum<<<NBLOCKS, NTHREADS, NTHREADS* sizeof(float)>>>(thrust::raw_pointer_cast(data.data()), data.size(), thrust::raw_pointer_cast(blr.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<1, NTHREADS, NTHREADS* sizeof(float)>>>(thrust::raw_pointer_cast(blr.data()), blr.size(), thrust::raw_pointer_cast(blr.data())); + + EXPECT_EQ(size_t(blr[0]), inputLength); +} + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + + return RUN_ALL_TESTS(); +} + diff --git a/psrdada_cpp/effelsberg/edd/test/src/UnpackerTester.cu b/psrdada_cpp/effelsberg/edd/test/src/UnpackerTester.cu index 92d371ba88e8c4c62cba3ad069e8d6ef155dcfc9..093b625bff4db00125a4114047727f76fe32df73 100644 --- a/psrdada_cpp/effelsberg/edd/test/src/UnpackerTester.cu +++ b/psrdada_cpp/effelsberg/edd/test/src/UnpackerTester.cu @@ -218,7 +218,7 @@ TEST_F(UnpackerTester, 12_bit_unpack_test) std::default_random_engine generator; std::uniform_int_distribution<int> distribution(1,1<<31); InputType host_input(n); - for (int ii = 0; ii < n; ++ii) + for (size_t ii = 0; ii < n; ++ii) { host_input[ii] = distribution(generator); } @@ -237,7 +237,7 @@ TEST_F(UnpackerTester, 8_bit_unpack_test) std::default_random_engine generator; std::uniform_int_distribution<int> distribution(1,1<<31); InputType host_input(n); - for (int ii = 0; ii < n; ++ii) + for (size_t ii = 0; ii < n; ++ii) { host_input[ii] = distribution(generator); } diff --git a/psrdada_cpp/meerkat/fbfuse/test/src/CoherentBeamformerTester.cu b/psrdada_cpp/meerkat/fbfuse/test/src/CoherentBeamformerTester.cu index c0224c472967ea1a7223321824dd64ccedab457c..d443dd17b902f507a0af72c349f490b58050c399 100644 --- a/psrdada_cpp/meerkat/fbfuse/test/src/CoherentBeamformerTester.cu +++ b/psrdada_cpp/meerkat/fbfuse/test/src/CoherentBeamformerTester.cu @@ -139,7 +139,7 @@ void CoherentBeamformerTester::compare_against_host( _config.npol(), _config.cb_power_scaling(), _config.cb_power_offset()); - for (int ii = 0; ii < btf_powers_host.size(); ++ii) + for (size_t ii = 0; ii < btf_powers_host.size(); ++ii) { ASSERT_TRUE(std::abs(static_cast<int>(btf_powers_host[ii]) - btf_powers_cuda[ii]) <= 1); } @@ -164,13 +164,13 @@ TEST_F(CoherentBeamformerTester, representative_noise_test) int nsamples = _config.nsamples_per_heap() * ntimestamps; std::size_t weights_size = _config.cb_nantennas() * _config.nchans() * _config.cb_nbeams(); HostVoltageVectorType ftpa_voltages_host(input_size); - for (int ii = 0; ii < ftpa_voltages_host.size(); ++ii) + for (size_t ii = 0; ii < ftpa_voltages_host.size(); ++ii) { ftpa_voltages_host[ii].x = static_cast<int8_t>(std::lround(normal_dist(generator))); ftpa_voltages_host[ii].y = static_cast<int8_t>(std::lround(normal_dist(generator))); } HostWeightsVectorType fbpa_weights_host(weights_size); - for (int ii = 0; ii < fbpa_weights_host.size(); ++ii) + for (size_t ii = 0; ii < fbpa_weights_host.size(); ++ii) { // Build complex weight as C * exp(i * theta). std::complex<double> val = 127.0f * std::exp(std::complex<float>(0.0f, uniform_dist(generator))); diff --git a/psrdada_cpp/meerkat/fbfuse/test/src/IncoherentBeamformerTester.cu b/psrdada_cpp/meerkat/fbfuse/test/src/IncoherentBeamformerTester.cu index 65bf753c78e32487ea48df9d4aee87bf0719c127..00c816e0d76f1c5ed31de2ec83e98daa7ecb65eb 100644 --- a/psrdada_cpp/meerkat/fbfuse/test/src/IncoherentBeamformerTester.cu +++ b/psrdada_cpp/meerkat/fbfuse/test/src/IncoherentBeamformerTester.cu @@ -115,10 +115,10 @@ void IncoherentBeamformerTester::compare_against_host( _config.nsamples_per_heap(), _config.ib_power_scaling(), _config.ib_power_offset()); - for (int ii = 0; ii < tf_powers_host.size(); ++ii) + for (size_t ii = 0; ii < tf_powers_host.size(); ++ii) { ASSERT_TRUE(std::abs(static_cast<int>(tf_powers_host[ii]) - tf_powers_cuda[ii]) <= 1); - } + } } TEST_F(IncoherentBeamformerTester, ib_representative_noise_test) @@ -133,7 +133,7 @@ TEST_F(IncoherentBeamformerTester, ib_representative_noise_test) std::size_t input_size = (ntimestamps * _config.ib_nantennas() * _config.nchans() * _config.nsamples_per_heap() * _config.npol()); HostVoltageVectorType taftp_voltages_host(input_size); - for (int ii = 0; ii < taftp_voltages_host.size(); ++ii) + for (size_t ii = 0; ii < taftp_voltages_host.size(); ++ii) { taftp_voltages_host[ii].x = static_cast<int8_t>(std::lround(normal_dist(generator))); taftp_voltages_host[ii].y = static_cast<int8_t>(std::lround(normal_dist(generator))); diff --git a/psrdada_cpp/meerkat/fbfuse/test/src/SplitTransposeTester.cu b/psrdada_cpp/meerkat/fbfuse/test/src/SplitTransposeTester.cu index 1c353cfdbb3e5b63407c79e4f5f8b58ed7dd2138..046e901e858cc027d317f941859230c865377d5d 100644 --- a/psrdada_cpp/meerkat/fbfuse/test/src/SplitTransposeTester.cu +++ b/psrdada_cpp/meerkat/fbfuse/test/src/SplitTransposeTester.cu @@ -57,9 +57,9 @@ void SplitTransposeTester::transpose_c_reference( int input_antenna_idx = antenna_idx + start_antenna; for (int chan_idx = 0; chan_idx < nchans; ++chan_idx) { - for (int samp_idx = 0; samp_idx < _config.nsamples_per_heap(); ++samp_idx) + for (size_t samp_idx = 0; samp_idx < _config.nsamples_per_heap(); ++samp_idx) { - for (int pol_idx = 0; pol_idx < _config.npol(); ++pol_idx) + for (size_t pol_idx = 0; pol_idx < _config.npol(); ++pol_idx) { int input_idx = (timestamp_idx * aftp + input_antenna_idx * ftp + chan_idx * tp + samp_idx * _config.npol() + pol_idx); @@ -86,7 +86,7 @@ void SplitTransposeTester::compare_against_host( _config.total_nantennas(), _config.cb_nantennas(), _config.cb_antenna_offset(), _config.nchans(), ntimestamps); - for (int ii = 0; ii < host_output.size(); ++ii) + for (size_t ii = 0; ii < host_output.size(); ++ii) { ASSERT_EQ(host_output[ii].x, cuda_output[ii].x); ASSERT_EQ(host_output[ii].y, cuda_output[ii].y); @@ -101,7 +101,7 @@ TEST_F(SplitTransposeTester, cycling_prime_test) * _config.nchans() * _config.nsamples_per_heap() * _config.npol()); HostVoltageType host_gpu_input(input_size); - for (int ii = 0; ii < input_size; ++ii) + for (size_t ii = 0; ii < input_size; ++ii) { host_gpu_input[ii].x = (ii % 113); host_gpu_input[ii].y = (ii % 107); diff --git a/psrdada_cpp/meerkat/fbfuse/test/src/WeightsManagerTester.cu b/psrdada_cpp/meerkat/fbfuse/test/src/WeightsManagerTester.cu index 14264f6cca9a6917d441187de3072cd98754feea..014b23431e2471a31bb553031510c346813c4394 100644 --- a/psrdada_cpp/meerkat/fbfuse/test/src/WeightsManagerTester.cu +++ b/psrdada_cpp/meerkat/fbfuse/test/src/WeightsManagerTester.cu @@ -85,7 +85,7 @@ void WeightsManagerTester::compare_against_host( _config.channel_frequencies(), _config.cb_nantennas(), _config.cb_nbeams(), _config.channel_frequencies().size(), epoch, 0.0, 1); - for (int ii = 0; ii < cuda_weights.size(); ++ii) + for (size_t ii = 0; ii < cuda_weights.size(); ++ii) { ASSERT_EQ(c_weights[ii].x, cuda_weights[ii].x); ASSERT_EQ(c_weights[ii].y, cuda_weights[ii].y); diff --git a/psrdada_cpp/src/dada_read_client.cpp b/psrdada_cpp/src/dada_read_client.cpp index e25ebca4d7df997b1f26de2c13b426e06f533ede..3fb504f25a99f654f081a6c07ae1c73c769dd545 100644 --- a/psrdada_cpp/src/dada_read_client.cpp +++ b/psrdada_cpp/src/dada_read_client.cpp @@ -10,7 +10,7 @@ namespace psrdada_cpp { { BOOST_LOG_TRIVIAL(debug) << this->id() << "Pinning dada buffers for CUDA memcpy"; - dada_cuda_dbregister(_hdu); + cuda_register_memory(); lock(); }