Unverified Commit 94761ee4 authored by Tobias Winchen's avatar Tobias Winchen Committed by GitHub
Browse files

Merge pull request #22 from MPIfR-BDG/sk_tests

Spectral Kurtosis Feature
parents bf1daac9 0aef88f5
......@@ -16,6 +16,8 @@ set(psrdada_cpp_effelsberg_edd_src
src/EDDRoach.cpp
src/EDDRoach_merge.cpp
src/ScaledTransposeTFtoTFT.cu
src/SKRfiReplacementCuda.cu
src/SpectralKurtosisCuda.cu
src/Tools.cu
src/Unpacker.cu
src/VLBI.cu
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_SKRFIREPLACEMENTCUDA_CUH
#define PSRDADA_CPP_EFFELSBERG_EDD_SKRFIREPLACEMENTCUDA_CUH
#include "psrdada_cpp/common.hpp"
#include <thrust/device_vector.h>
#include <thrust/complex.h>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
class SKRfiReplacementCuda{
public:
/**
* @brief constructor
*/
SKRfiReplacementCuda();
/**
* @brief destructor
*/
~SKRfiReplacementCuda();
/**
* @brief Replaces RFI data with data generated using statistics of data from chosen number of clean_windows.
*
* @param(in) rfi_status rfi_status of input data stream
* @param(in & out) data Data on which RFI has to be replaced. Returns the same but with RFI replaced.
* @param(in) clean_windows number of clean windows used for computing data statistics.
*/
void replace_rfi_data(const thrust::device_vector<int> &rfi_status,
thrust::device_vector<thrust::complex<float>> &data,
std::size_t clean_windows = 5, cudaStream_t stream=0);
private:
thrust::device_vector<int> _rfi_window_indices;
thrust::device_vector<int> _clean_window_indices;
thrust::device_vector<thrust::complex <float>> _clean_data;
};
} //edd
} //effelsberg
} //psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_SKRFIREPLACEMENTCUDA_CUH
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_SPECTRALKURTOSISCUDA_CUH
#define PSRDADA_CPP_EFFELSBERG_EDD_SPECTRALKURTOSISCUDA_CUH
#include "psrdada_cpp/common.hpp"
#include <thrust/device_vector.h>
#include <thrust/complex.h>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
#define BLOCK_DIM 1024
struct RFIStatistics{
thrust::device_vector<int> rfi_status;
float rfi_fraction;
};
class SpectralKurtosisCuda{
public:
/**
* @brief constructor
*
* @param(in) nchannels number of channels.
* @param(in) window_size number of samples per window.
* @param(in) sk_min minimum value of spectral kurtosis.
* @param(in) sk_max maximum value of spectral kurtosis.
*/
SpectralKurtosisCuda(std::size_t nchannels, std::size_t window_size, float sk_min = 0.8,
float sk_max = 1.2);
~SpectralKurtosisCuda();
/**
* @brief computes spectral kurtosis for the given data using thrust calls and returns its rfi statistics.
* This is used to compare and verify the results generated by compute_sk - an optimized kernel implementation.
*
* @param(in) data input data
* @param(out) stats RFI statistics
*
*/
void compute_sk_thrust(thrust::device_vector<thrust::complex<float>> const& data, RFIStatistics &stats, cudaStream_t _stream = 0);
/**
* @brief computes spectral kurtosis (using optimized kernel function) for the given data and returns its rfi statistics.
*
* @param(in) data input data
* @param(out) stats RFI statistics
*
*/
void compute_sk(thrust::device_vector<thrust::complex<float>> &data, RFIStatistics &stats, cudaStream_t _stream = 0);
private:
/**
* @brief initializes the data members of the class.
*
* */
void init();
std::size_t _nchannels; //number of channels
std::size_t _window_size; //window size
std::size_t _nwindows; //number of windows
std::size_t _sample_size; //size of input data
float _sk_min, _sk_max;
thrust::device_vector<float> _d_s1, _d_s2;
};
} //edd
} //effelsberg
} //psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_SPECTRALKURTOSISCUDA_CUH
#include "psrdada_cpp/effelsberg/edd/SKRfiReplacementCuda.cuh"
#include <thrust/reduce.h>
#include <thrust/transform.h>
#include <thrust/copy.h>
#include <thrust/random/normal_distribution.h>
#include <thrust/random/linear_congruential_engine.h>
#include <nvToolsExt.h>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
struct mean_subtraction_square{
const float mean;
mean_subtraction_square(float _mean) :mean(_mean) {}
__host__ __device__
float operator() (thrust::complex<float> val) const{
float x = val.real() - mean;
return (x * x);
}
};
struct generate_replacement_data{
float normal_dist_mean, normal_dist_std;
generate_replacement_data(float mean, float std)
: normal_dist_mean(mean),
normal_dist_std(std)
{};
__host__ __device__
thrust::complex<float> operator() (unsigned int n) const{
thrust::minstd_rand gen;
thrust::random::normal_distribution<float> dist(normal_dist_mean, normal_dist_std);
gen.discard(n);
return thrust::complex<float> (dist(gen), dist(gen));
}
};
SKRfiReplacementCuda::SKRfiReplacementCuda() {
BOOST_LOG_TRIVIAL(debug) << "Creating new SKRfiReplacementCuda instance..\n";
}
SKRfiReplacementCuda::~SKRfiReplacementCuda() {
BOOST_LOG_TRIVIAL(debug) << "Destroying SKRfiReplacementCuda instance..\n";
}
void SKRfiReplacementCuda::replace_rfi_data(const thrust::device_vector<int> &rfi_status,
thrust::device_vector<thrust::complex<float>> &data,
std::size_t clean_windows, cudaStream_t stream) {
nvtxRangePushA("replace_rfi_data");
thrust::cuda::par.on(stream);
thrust::device_vector<thrust::complex<float>> replacement_data;
//initialize data members of the class
BOOST_LOG_TRIVIAL(debug) << "getting RFI window indices..\n";
_rfi_window_indices.resize(thrust::count(rfi_status.begin(), rfi_status.end(), 1));
thrust::copy_if(thrust::make_counting_iterator<int>(0),
thrust::make_counting_iterator<int>(rfi_status.size()),
rfi_status.begin(),
_rfi_window_indices.begin(),
thrust::placeholders::_1 == 1);
BOOST_LOG_TRIVIAL(debug) << "getting clean window indices..\n";
size_t _nclean_windows = thrust::count(rfi_status.begin(), rfi_status.end(), 0);
_clean_window_indices.resize(_nclean_windows);
thrust::copy_if(thrust::make_counting_iterator<int>(0),
thrust::make_counting_iterator<int>(rfi_status.size()),
rfi_status.begin(),
_clean_window_indices.begin(),
thrust::placeholders::_1 == 0);
if(_nclean_windows < rfi_status.size()){
//RFI present and not in all windows
if (_nclean_windows < clean_windows) {
clean_windows = _nclean_windows;
}
BOOST_LOG_TRIVIAL(debug) << "collecting clean data from chosen number of clean windows..\n";
std::size_t _window_size = data.size() / rfi_status.size();
_clean_data.resize(clean_windows * _window_size);
for(std::size_t ii = 0; ii < clean_windows; ii++){
std::size_t window_index = _clean_window_indices[ii];
std::size_t ibegin = window_index * _window_size;
std::size_t iend = ibegin + _window_size - 1;
std::size_t jj = ii * _window_size;
thrust::copy((data.begin() + ibegin), (data.begin() + iend), (_clean_data.begin() + jj));
BOOST_LOG_TRIVIAL(debug) <<"clean_win_index = " << window_index
<< " ibegin = " << ibegin << " iend = " << iend;
}
BOOST_LOG_TRIVIAL(debug) << "computing statistics of clean data..\n";
//The distribution of both real and imag are expected to ahve same mean and standard deviation.
//Therefore computing _ref_mean, _ref_sd for real distribution only.
std::size_t length = _clean_data.size();
float _ref_mean = (thrust::reduce(_clean_data.begin(), _clean_data.end(), thrust::complex<float> (0.0f, 0.0f))). real() / length;
float _ref_sd = std::sqrt(thrust::transform_reduce(_clean_data.begin(), _clean_data.end(), mean_subtraction_square(_ref_mean),
0.0f, thrust::plus<float> ()) / length);
BOOST_LOG_TRIVIAL(debug) << "DataStatistics mean = " << _ref_mean
<< " sd = " << _ref_sd;
//Replacing RFI
thrust::counting_iterator<unsigned int> sequence_index_begin(0);
nvtxRangePushA("replace_rfi_datai_loop");
for(std::size_t ii = 0; ii < _rfi_window_indices.size(); ii++){
std::size_t index = _rfi_window_indices[ii] * _window_size;
thrust::transform(sequence_index_begin, (sequence_index_begin + _window_size),
(data.begin() + index), generate_replacement_data(_ref_mean, _ref_sd));
}
nvtxRangePop();
}
nvtxRangePop();
}
} //edd
} //effelsberg
} //psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/SpectralKurtosisCuda.cuh"
#include <thrust/transform.h>
#include <thrust/iterator/discard_iterator.h>
#include <nvToolsExt.h>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
struct compute_power{
__host__ __device__
float operator()(thrust::complex<float> z)
{
return (thrust::abs(z) * thrust::abs(z));
}
};
struct power_square{
__host__ __device__
float operator()(thrust::complex<float> z)
{
float abs = thrust::abs(z);
float power = abs * abs;
return (power * power);
}
};
struct check_rfi{
const std::size_t M; //_window_size
const float sk_min;
const float sk_max;
check_rfi(std::size_t m, float min, float max)
: M(m),
sk_min(min),
sk_max(max)
{}
__host__ __device__
float operator() (float s1, float s2) const {
float sk = ((M + 1) / (M - 1)) * (((M * s2) / (s1 * s1)) - 1);
return ((sk < sk_min) || (sk > sk_max)) ;
}
};
__device__ int rfi_count = 0;
__device__ void warpReduce(volatile float *shmem_ptr1, volatile float *shmem_ptr2, int tid){
shmem_ptr1[tid] += shmem_ptr1[tid + 32];
shmem_ptr2[tid] += shmem_ptr2[tid + 32];
shmem_ptr1[tid] += shmem_ptr1[tid + 16];
shmem_ptr2[tid] += shmem_ptr2[tid + 16];
shmem_ptr1[tid] += shmem_ptr1[tid + 8];
shmem_ptr2[tid] += shmem_ptr2[tid + 8];
shmem_ptr1[tid] += shmem_ptr1[tid + 4];
shmem_ptr2[tid] += shmem_ptr2[tid + 4];
shmem_ptr1[tid] += shmem_ptr1[tid + 2];
shmem_ptr2[tid] += shmem_ptr2[tid + 2];
shmem_ptr1[tid] += shmem_ptr1[tid + 1];
shmem_ptr2[tid] += shmem_ptr2[tid + 1];
}
__global__ void compute_sk_kernel(const thrust::complex<float>* __restrict__ data, std::size_t sample_size, std::size_t window_size,
float sk_max, float sk_min, int* __restrict__ rfi_status)
{
volatile __shared__ float s1[BLOCK_DIM];
volatile __shared__ float s2[BLOCK_DIM];
int l_index = threadIdx.x;
float pow = 0.0f;
float pow_sq = 0.0f;
s1[l_index] = 0.0f;
s2[l_index] = 0.0f;
for(int thread_offset = l_index; thread_offset < window_size; thread_offset += blockDim.x){
int g_index = thread_offset + blockIdx.x * window_size;
pow = thrust::abs(data[g_index]) * thrust::abs(data[g_index]);
pow_sq = pow * pow;
s1[l_index] += pow;
s2[l_index] += pow_sq;
}
__syncthreads();
for(int s = blockDim.x / 2; s > 32; s >>= 1){
if(l_index < s){
s1[l_index] += s1[l_index + s];
s2[l_index] += s2[l_index + s];
}
__syncthreads();
}
if(l_index < 32) warpReduce(s1, s2, l_index);
if(l_index == 0){
float sk = ((window_size + 1) / (window_size - 1)) *(((window_size * s2[0]) / (s1[0] * s1[0])) - 1);
rfi_status[blockIdx.x] = (int) ((sk < sk_min) || (sk > sk_max));
if (rfi_status[blockIdx.x] == 1) atomicAdd(&rfi_count, 1);
}
}
SpectralKurtosisCuda::SpectralKurtosisCuda(std::size_t nchannels, std::size_t window_size, float sk_min, float sk_max)
: _nchannels(nchannels),
_window_size(window_size),
_sk_min(sk_min),
_sk_max(sk_max)
{
BOOST_LOG_TRIVIAL(debug) << "Creating new SpectralKurtosisCuda instance... \n";
}
SpectralKurtosisCuda::~SpectralKurtosisCuda()
{
BOOST_LOG_TRIVIAL(debug) << "Destroying SpectralKurtosisCuda instance... \n";
}
void SpectralKurtosisCuda::init()
{
if((_sample_size % _window_size) != 0){
BOOST_LOG_TRIVIAL(error) << "Sample(data) size " << _sample_size <<" is not a multiple of _window_size "
<< _window_size <<". Give different window size.\n";
throw std::runtime_error("Data(sample) size is not a multiple of window_size. Give different window size. \n");
}
_nwindows = _sample_size /_window_size;
}
void SpectralKurtosisCuda::compute_sk_thrust(const thrust::device_vector<thrust::complex<float>> &data, RFIStatistics &stats, cudaStream_t _stream){
nvtxRangePushA("compute_sk");
thrust::cuda::par.on(_stream);
_sample_size = data.size();
BOOST_LOG_TRIVIAL(debug) << "Computing SK (thrust version) for sample_size " << _sample_size
<< " and window_size " << _window_size <<".\n";
//initializing class variables
init();
_d_s1.resize(_nwindows);
_d_s2.resize(_nwindows);
//computing _d_s1 for all windows
nvtxRangePushA("compute_sk_reduce_by_key_call");
thrust::reduce_by_key(thrust::device,
thrust::make_transform_iterator(thrust::counting_iterator<int> (0), (thrust::placeholders::_1 / _window_size)),
thrust::make_transform_iterator(thrust::counting_iterator<int> (_sample_size - 1), (thrust::placeholders::_1 / _window_size)),
thrust::make_transform_iterator(data.begin(), compute_power()),
thrust::discard_iterator<int>(),
_d_s1.begin());
//computing _d_s2 for all windows
thrust::reduce_by_key(thrust::device,
thrust::make_transform_iterator(thrust::counting_iterator<int> (0), (thrust::placeholders::_1 / _window_size)),
thrust::make_transform_iterator(thrust::counting_iterator<int> (_sample_size - 1), (thrust::placeholders::_1 / _window_size)),
thrust::make_transform_iterator(data.begin(), power_square()),
thrust::discard_iterator<int>(),
_d_s2.begin());
nvtxRangePop();
//computes SK and checks the threshold to detect RFI.
stats.rfi_status.resize(_nwindows);
nvtxRangePushA("compute_sk_thrust_transform_reduce");
thrust::transform(_d_s1.begin(), _d_s1.end(), _d_s2.begin(), stats.rfi_status.begin(), check_rfi(_window_size, _sk_min, _sk_max));
stats.rfi_fraction = thrust::reduce(stats.rfi_status.begin(), stats.rfi_status.end(), 0.0f) / _nwindows;
nvtxRangePop();
BOOST_LOG_TRIVIAL(info) << "RFI fraction: " << stats.rfi_fraction;
nvtxRangePop();
}
void SpectralKurtosisCuda::compute_sk(thrust::device_vector<thrust::complex<float>> &data, RFIStatistics &stats, cudaStream_t _stream){
nvtxRangePushA("compute_sk_kernel");
_sample_size = data.size();
BOOST_LOG_TRIVIAL(debug) << "Computing SK (kernel version) for sample_size " << _sample_size
<< " and window_size " << _window_size <<".\n";
//initializing variables
init();
stats.rfi_status.resize(_nwindows);
int *k_rfi_status = thrust::raw_pointer_cast(stats.rfi_status.data());
thrust::complex<float> *k_data = thrust::raw_pointer_cast(data.data());
nvtxRangePushA("compute_sk_kernel_call");
compute_sk_kernel<<<_nwindows, BLOCK_DIM, 0, _stream>>> (k_data, _sample_size, _window_size, _sk_max, _sk_min, k_rfi_status);
cudaDeviceSynchronize();
nvtxRangePop();
nvtxRangePushA("compute_sk_kernel_rfi_fraction");
int nrfiwindows = 0;
cudaMemcpyFromSymbol(&nrfiwindows, rfi_count, sizeof(int));
stats.rfi_fraction = (float)nrfiwindows / _nwindows;
int reset_rfi_count = 0;
cudaMemcpyToSymbol(rfi_count, &reset_rfi_count, sizeof(int));
nvtxRangePop();
BOOST_LOG_TRIVIAL(info) << "RFI fraction: " << stats.rfi_fraction;
nvtxRangePop();
}
} //edd
} //effelsberg
} //psrdada_cpp
......@@ -12,8 +12,13 @@ set(gtest_edd_src
src/ScaledTransposeTFtoTFTTester.cu
src/VLBITest.cu
src/EDDPolnMergeTester.cpp
src/SKTestVector.cpp
src/SpectralKurtosis.cpp
src/SKRfiReplacement.cpp
src/SpectralKurtosisTester.cpp
src/SpectralKurtosisCudaTester.cu
)
cuda_add_executable(gtest_edd ${gtest_edd_src} )
target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas)
target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas -lnvToolsExt -L/usr/local/cuda-11.0/lib64/)
add_test(gtest_edd gtest_edd --test_data "${CMAKE_CURRENT_LIST_DIR}/data")
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_SKRFIREPLACEMENT_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_SKRFIREPLACEMENT_HPP
#include "psrdada_cpp/common.hpp"
#include <vector>
#include <complex>
#include <algorithm>
#include <numeric>
#include <random>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
#define DEFAULT_NUM_CLEAN_WINDOWS 1 //number of clean windows used for computing DataStatistics
struct DataStatistics
{
float r_mean, r_sd, i_mean, i_sd;
};
class SKRfiReplacement{
public:
/**
* @brief constructor
*
* @param(in) rfi_status rfi_status of input data
*
*/
SKRfiReplacement(const std::vector<int> &rfi_status);
/**
* @brief destructor
*/
~SKRfiReplacement();
/**
* @brief Replaces data in rfi_windows with replacement data (generated using statistics of data from clean_windows).
*
* @param(in & out) data Data on which RFI has to be replaced. Returns the same but with RFI replaced.
*/
void replace_rfi_data(std::vector<std::complex<float>> &data);
private:
/**
* @brief Initializes data members of the class
*/
void init();
/**
* @brief Gets indices of clean windows, _clean_window_indices
*/
void get_clean_window_indices();
/**
* @brief Gets indices of RFI windows, _rfi_window_indices
*/
void get_rfi_window_indices();
/**
* @brief Computes statistics for the given input data. Here it is the data from few clean windows.
*
* @param(in) data data from default number of clean windows.
* @param(out) stats DataStatistics of input data
*/
void compute_data_statistics(const std::vector<std::complex<float>> &data, DataStatistics &stats);
/**
* @brief Gathers data from DEFAULT_NUM_CLEAN_WINDOW number of clean windows and computes its statistics
*
* @param(in) data actual data
* @param(out) ref_data_statistics Statistics of data from clean windows
*/
void get_clean_data_statistics(const std::vector<std::complex<float>> &data,
DataStatistics &ref_data_statistics);
/**
* @brief Generates replacement data using clean window data statistics
*
* @param(in) stats data statistics
* @param(out) replacement_data replacement data of size = _window_size generated using input stats.
*/
void generate_replacement_data(const DataStatistics &stats, std::vector<std::complex<float>> &replacement_data);
std::vector<int> _rfi_status;
std::size_t _window_size;
std::size_t _nwindows, _nrfi_windows, _nclean_windows;
std::vector<int> _rfi_window_indices;
std::vector<int> _clean_window_indices;
};
} //edd
} //effelsberg
} //psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_SKRFIREPLACEMENT_HPP
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_SKTESTVECTOR_HPP
#define PSRDADA_CPP_EFFELSBERG_EDD_SKTESTVECTOR_HPP
#include "psrdada_cpp/common.hpp"
#include <complex>
#include <vector>
#include <functional>
#include <numeric>
#include <random>
#include <algorithm>
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
namespace test {
#define DEFAULT_MEAN 0 //default mean for normal ditribution test vector
#define DEFAULT_STD 0.5 //default standard deviation for normal ditribution test vector
class SKTestVector{
public:
/**
* @param sample_size size of test vector
* window_size number of samples in a window
* with_rfi Flag to include RFI in test vector