diff --git a/psrdada_cpp/effelsberg/edd/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/CMakeLists.txt index d8a5b44dba827d0bf8eb6d888ed440ef3d472f92..29e89a136259620d40a2104cacdd3d66bbaf66ee 100644 --- a/psrdada_cpp/effelsberg/edd/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/CMakeLists.txt @@ -10,6 +10,7 @@ set(psrdada_cpp_effelsberg_edd_src src/DetectorAccumulator.cu src/ScaledTransposeTFtoTFT.cu src/VLBI.cu + src/Tools.cu src/DadaBufferLayout.cpp ) diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index a00bc7d30c9a73ccaa82a84740ed5edc1b42f26d..098fc51e3c66426d34a745ec7d221d6efbb4fe83 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -159,18 +159,6 @@ __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, size_t noOfSideChannels, size_t selectedSideChannel, const float *_baseLine); -/** - * @brief Sums all elements of an input array. - * - * @detail The results is stored in an array with one value per launch - * block. Full reuction thus requires two kernel launches. - * - * @param in. Input array. - * @param N. Size of input array. - * @param out. Output array. - * */ -__global__ void array_sum(float *in, size_t N, float *out); - } // edd diff --git a/psrdada_cpp/effelsberg/edd/Packer.cuh b/psrdada_cpp/effelsberg/edd/Packer.cuh index 97a3e213ad10c870e0c2f732afb3f996e6843f0d..f5607de63662f7318311b14601c8febafeaeaad6 100644 --- a/psrdada_cpp/effelsberg/edd/Packer.cuh +++ b/psrdada_cpp/effelsberg/edd/Packer.cuh @@ -11,7 +11,7 @@ namespace edd { namespace kernels { -// pack float to 2,4,8,16 bit integers +// pack float to 2,4,8,16 bit integers with linear scaling template <unsigned int input_bit_depth> __global__ void packNbit(const float *__restrict__ input, uint32_t *__restrict__ output, size_t inputSize, diff --git a/psrdada_cpp/effelsberg/edd/Tools.cuh b/psrdada_cpp/effelsberg/edd/Tools.cuh new file mode 100644 index 0000000000000000000000000000000000000000..1a434ea09cbe461f7d54384d5ce81d7ca29c1798 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/Tools.cuh @@ -0,0 +1,33 @@ +#ifndef PSRDADA_CPP_EFFELSBERG_EDD_TOOLS_CUH +#define PSRDADA_CPP_EFFELSBERG_EDD_TOOLS_CUH + +// collection of multi purpose kernels + +// blocksize for the array sum kernel +#define array_sum_Nthreads 1024 + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { +/** + * @brief Sums all elements of an input array. + * + * @detail The results is stored in an array with one value per launch + * block. Full reduction thus requires two kernel launches. + * + * @param in. Input array. + * @param N. Size of input array. + * @param out. Output array. + * */ +__global__ void array_sum(float *in, size_t N, float *out); + + +/// Calculates 1/N \sum (x_i - 1/N offset)**2 per block +/// To calculate the std dev sum partial results of block using array sum +__global__ void scaled_square_offset_sum(float *in, size_t N, float* offset, float *out); + +} // edd +} // effelsberg +} // psrdada_cpp + +#endif //PSRDADA_CPP_EFFELSBERG_EDD_TOOLS_CUH diff --git a/psrdada_cpp/effelsberg/edd/VLBI.cuh b/psrdada_cpp/effelsberg/edd/VLBI.cuh index 349882220a6d8cf8ed1ef841c078ac281fcca15f..dc451b97c5c37633d1f7e5595dc5b11040e9d91e 100644 --- a/psrdada_cpp/effelsberg/edd/VLBI.cuh +++ b/psrdada_cpp/effelsberg/edd/VLBI.cuh @@ -149,6 +149,7 @@ private: // process tmp thrust::device_vector<float> _unpacked_voltage; + thrust::device_vector<float> _baseLineN; // Output data DoubleDeviceBuffer<uint32_t> _packed_voltage; @@ -163,6 +164,15 @@ private: }; +// pack float to 2 bit integers with VLBI non linear scaling with levels +// sigma = \sqrt(sigma2)) +// -n * sigma, -1 signa, sigma, n * sigma +__global__ void pack2bit_nonLinear(const float *__restrict__ input, + uint32_t *__restrict__ output, size_t inputSize, + float n, float *sigma2); + + + } //namespace edd } //namespace effelsberg } //namespace psrdada_cpp diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index d750c4e71f0cb68ed4569f1d6ee49d1b14d72a4b..7883d7d791288c1ddc27ca71d0d53001118c316c 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -1,4 +1,5 @@ #include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh" +#include "psrdada_cpp/effelsberg/edd/Tools.cuh" #include "psrdada_cpp/common.hpp" #include "psrdada_cpp/cuda_utils.hpp" #include "psrdada_cpp/raw_bytes.hpp" @@ -65,43 +66,6 @@ __global__ void countBitSet(const uint64_t *sideChannelData, size_t N, size_t } -// blocksize for the array sum kernel -#define array_sum_Nthreads 1024 - -__global__ void array_sum(float *in, size_t N, float *out) { - extern __shared__ float data[]; - - size_t tid = threadIdx.x; - - float ls = 0; - - for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); - i += blockDim.x * gridDim.x) { - ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array - } - - data[tid] = ls; - __syncthreads(); - - for (size_t i = blockDim.x / 2; i > 0; i /= 2) { - if (tid < i) { - data[tid] += data[tid + i]; - } - __syncthreads(); - } - - // unroll last warp - // if (tid < 32) - //{ - // warpReduce(data, tid); - //} - - if (tid == 0) { - out[blockIdx.x] = data[0]; - } -} - - template <class HandlerType, typename IntegratedPowerType> GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer( const DadaBufferLayout &dadaBufferLayout, @@ -268,8 +232,8 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::process( throw std::runtime_error("Unsupported number of bits"); } BOOST_LOG_TRIVIAL(debug) << "Calculate baseline"; - psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data())); - psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data())); BOOST_LOG_TRIVIAL(debug) << "Perform gating"; gating<<<1024, 1024, 0, _proc_stream>>>( diff --git a/psrdada_cpp/effelsberg/edd/detail/VLBI.cu b/psrdada_cpp/effelsberg/edd/detail/VLBI.cu index ff67862132119cdf78068e2099306add19680dca..164d1d1610901b32f788ae1f2a1809b4f2768590 100644 --- a/psrdada_cpp/effelsberg/edd/detail/VLBI.cu +++ b/psrdada_cpp/effelsberg/edd/detail/VLBI.cu @@ -1,7 +1,7 @@ #include "psrdada_cpp/effelsberg/edd/VLBI.cuh" -#include "psrdada_cpp/effelsberg/edd/Packer.cuh" +//#include "psrdada_cpp/effelsberg/edd/Packer.cuh" +#include "psrdada_cpp/effelsberg/edd/Tools.cuh" -//#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh" #include "psrdada_cpp/cuda_utils.hpp" #include <cuda.h> @@ -17,6 +17,9 @@ namespace effelsberg { namespace edd { + + + template <class HandlerType> VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth, std::size_t speadHeapSize, @@ -48,7 +51,7 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth, _packed_voltage.resize(n64bit_words * 64 / input_bitDepth / 16); - _spillOver.reserve(5000); + _spillOver.reserve(vdifHeader.getDataFrameLength() * 8); BOOST_LOG_TRIVIAL(debug) << " Output voltages size: " << _packed_voltage.size() << " byte"; @@ -78,6 +81,8 @@ template <class HandlerType> void VLBI<HandlerType>::init(RawBytes &block) { headerInfo << "\n" << "# VLBI parameters: \n"; + + size_t bEnd = std::strlen(block.ptr()); if (bEnd + headerInfo.str().size() < block.total_bytes()) { std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str()); @@ -85,10 +90,12 @@ template <class HandlerType> void VLBI<HandlerType>::init(RawBytes &block) { BOOST_LOG_TRIVIAL(warning) << "Header of size " << block.total_bytes() << " bytes already contains " << bEnd - << "bytes. Cannot add gated spectrometer info of size " + << "bytes. Cannot add VLBI info of size " << headerInfo.str().size() << " bytes."; } + _baseLineN.resize(array_sum_Nthreads); + _handler.init(block); } @@ -137,12 +144,26 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) { throw std::runtime_error("Unsupported number of bits"); } - // ToDo: Eventually calulate minV, maxV from mean and std. - float minV = -2; - float maxV = 2; - pack<2>(_unpacked_voltage, _packed_voltage.b(), minV, maxV, _proc_stream); + BOOST_LOG_TRIVIAL(debug) << "Calculate baseline"; + psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()), _unpacked_voltage.size(), thrust::raw_pointer_cast(_baseLineN.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data())); + + BOOST_LOG_TRIVIAL(debug) << "Calculate std dev"; + psrdada_cpp::effelsberg::edd::scaled_square_offset_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()), _unpacked_voltage.size(), thrust::raw_pointer_cast(_baseLineN.data()), thrust::raw_pointer_cast(_baseLineN.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data())); + + // non linear packing + float v = 0.981599; // L. Kogan, Correction functions for digital correlators + // with two and four quantization levels, Radio Science 33 (1998), 1289-1296 + BOOST_LOG_TRIVIAL(debug) << "Packing data with non linear 2-bit packaging using levels -v*sigma, 0, v*sigma with n = " << v; + _packed_voltage.b().resize(_unpacked_voltage.size() / 16); + BOOST_LOG_TRIVIAL(debug) << "Input size: " << _unpacked_voltage.size() << " elements"; + BOOST_LOG_TRIVIAL(debug) << "Resizing output buffer to " << _packed_voltage.b().size() << " elements"; + pack2bit_nonLinear<<<128, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()), + thrust::raw_pointer_cast(_packed_voltage.b().data()), + _unpacked_voltage.size(), v, thrust::raw_pointer_cast(_baseLineN.data())); CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); diff --git a/psrdada_cpp/effelsberg/edd/src/Tools.cu b/psrdada_cpp/effelsberg/edd/src/Tools.cu new file mode 100644 index 0000000000000000000000000000000000000000..bcd3b53c3deaa31223dfa2d21159e40fb23cabb0 --- /dev/null +++ b/psrdada_cpp/effelsberg/edd/src/Tools.cu @@ -0,0 +1,74 @@ +#include "psrdada_cpp/effelsberg/edd/Tools.cuh" + +namespace psrdada_cpp { +namespace effelsberg { +namespace edd { + + __global__ void array_sum(float *in, size_t N, float *out) +{ + __shared__ float data[array_sum_Nthreads]; + + size_t tid = threadIdx.x; + + float ls = 0; + + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + i += blockDim.x * gridDim.x) { + ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array + } + + data[tid] = ls; + __syncthreads(); + + for (size_t i = blockDim.x / 2; i > 0; i /= 2) { + if (tid < i) { + data[tid] += data[tid + i]; + } + __syncthreads(); + } + + // unroll last warp + // if (tid < 32) + //{ + // warpReduce(data, tid); + //} + + if (tid == 0) { + out[blockIdx.x] = data[0]; + } +} + + + +__global__ void scaled_square_offset_sum(float *in, size_t N, float* offset, float *out) { + __shared__ float data[array_sum_Nthreads]; + + size_t tid = threadIdx.x; + + double ls = 0; + + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + i += blockDim.x * gridDim.x) { + const double t = (in[i] - (*offset) / N); + ls += t * t; + } + + data[tid] = ls / N; + __syncthreads(); + + for (size_t i = blockDim.x / 2; i > 0; i /= 2) { + if (tid < i) { + data[tid] += data[tid + i]; + } + __syncthreads(); + } + + if (tid == 0) { + out[blockIdx.x] = data[0]; + } +} + + +} // edd +} // effelsberg +} // psrdada_cpp diff --git a/psrdada_cpp/effelsberg/edd/src/VLBI.cu b/psrdada_cpp/effelsberg/edd/src/VLBI.cu index b59b6f158d348215cd55690b174f2d8d7f2bb7fe..705e4e0f1fe2b0e37a629b3ea402ec140402650c 100644 --- a/psrdada_cpp/effelsberg/edd/src/VLBI.cu +++ b/psrdada_cpp/effelsberg/edd/src/VLBI.cu @@ -1,9 +1,10 @@ #include "psrdada_cpp/effelsberg/edd/VLBI.cuh" #include "psrdada_cpp/cuda_utils.hpp" +#include <cuda.h> +#include <cstdint> -#define EDD_NTHREADS_PACK 1024 -#define NPACK 16 +#define EDD_NTHREADS_PACK 1024 namespace psrdada_cpp { namespace effelsberg { @@ -41,7 +42,7 @@ void setBitsWithValue(uint32_t &target, uint32_t firstBit, uint32_t lastBit, uin target |= value; } -/// get numerical value from the specified bits in the target +/// get numerical value from the specified bits in the target uint32_t getBitsValue(const uint32_t &target, uint32_t firstBit, uint32_t lastBit) { uint32_t mask = bitMask(firstBit, lastBit); @@ -193,6 +194,61 @@ uint32_t VDIFHeader::getStationId() const } +// pack float to 2 bit integers with VLBI non linear scaling with levels +// sigma = \sqrt(sigma2)) +// -n * sigma, -1 signa, sigma, n * sigma +__global__ void pack2bit_nonLinear(const float *__restrict__ input, + uint32_t *__restrict__ output, size_t inputSize, + float v, float *sigma2) { + // number of values to pack into one output element, use 32 bit here to + // maximize number of threads + const uint8_t NPACK = 32 / 2; + + __shared__ uint32_t tmp[1024]; + const float vsigma = v * sqrt(*sigma2); + + for (uint32_t i = NPACK * blockIdx.x * blockDim.x + threadIdx.x; + (i < inputSize); i += blockDim.x * gridDim.x * NPACK) { + tmp[threadIdx.x] = 0; + + for (uint8_t j = 0; j < NPACK; j++) { + // Load new input value, clip and convert to Nbit integer + const float inp = input[i + j * blockDim.x]; + + uint32_t p = 0; + + p += inp > ( - 1. * vsigma); + p += inp > ( 0); + p += inp > ( 1. * vsigma); + // this is more efficient than fmin, fmax for clamp and cast. + + // store in shared memory with linear access + tmp[threadIdx.x] += p << (2 * j); + } + __syncthreads(); + + // load value from shared memory and rearange to output - the read value is + // reused per warp + uint32_t out = 0; + + // bit mask: Thread 0 always first input_bit_depth bits, thread 1 always + // second input_bit_depth bits, ... + const uint32_t mask = ((1 << 2) - 1) << (2 * (threadIdx.x % NPACK)); + #pragma unroll + for (uint32_t j = 0; j < NPACK; j++) { + uint32_t v = tmp[(threadIdx.x / NPACK) * NPACK + j] & mask; + // retrieve correct bits + v = v >> (2 * (threadIdx.x % NPACK)); + v = v << (2 * j); + out += v; + } + + size_t oidx = threadIdx.x / NPACK + (threadIdx.x % NPACK) * (blockDim.x / NPACK) + (i - threadIdx.x) / NPACK; + output[oidx] = out; + __syncthreads(); + } +} + diff --git a/psrdada_cpp/effelsberg/edd/src/VLBI_cli.cu b/psrdada_cpp/effelsberg/edd/src/VLBI_cli.cu index 61d11d82fa01d99ca311b137500ce94ffb03b5dc..8c92df1365f002ebb0e21db79a4062d996377db7 100644 --- a/psrdada_cpp/effelsberg/edd/src/VLBI_cli.cu +++ b/psrdada_cpp/effelsberg/edd/src/VLBI_cli.cu @@ -39,6 +39,9 @@ int main(int argc, char **argv) { std::string filename(buffer); std::string output_type = "file"; + int thread_id, station_id, payload_size; + double sampleRate; + /** Define and parse the program options */ namespace po = boost::program_options; @@ -68,7 +71,23 @@ int main(int argc, char **argv) { "size of the spead data heaps. The number of the " "heaps in the dada block depends on the number of " "side channel items."); - + desc.add_options()("thread_id", + po::value<int>()->default_value(0)->notifier( + [&thread_id](int in) { thread_id = in; }), + "Thread ID for the VDIF header"); + desc.add_options()("station_id", + po::value<int>()->default_value(0)->notifier( + [&station_id](int in) { station_id = in; }), + "Station ID for the VDIF header"); + desc.add_options()("payload_size", + po::value<int>()->default_value(5000)->notifier( + [&payload_size](int in) { payload_size = in; }), + "Payload size [bytes]"); + + desc.add_options()("sample_rate", + po::value<double>()->default_value(2.6E9)->notifier( + [&sampleRate](double in) { sampleRate = in; }), + "Sample rate of the input signal"); desc.add_options()( "log_level", po::value<std::string>()->default_value("info")->notifier( [](std::string level) { set_log_level(level); }), @@ -105,11 +124,12 @@ int main(int argc, char **argv) { // ToDo: Options to set values effelsberg::edd::VDIFHeader vdifHeader; - vdifHeader.setThreadId(0); - vdifHeader.setStationId(0); + vdifHeader.setDataFrameLength(payload_size / 8); + vdifHeader.setThreadId(thread_id); + vdifHeader.setStationId(station_id); + BOOST_LOG_TRIVIAL(warning) << "SETTING FIXED REFERENCE EPOCH AND SECONDS FROM EPOCH!! Should be read from data stream!!"; vdifHeader.setReferenceEpoch(123); vdifHeader.setSecondsFromReferenceEpoch(42); // for first block - double sampleRate = 2.6E9; std::cout << "Running with output_type: " << output_type << std::endl;