Commit a5489844 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Restructured VLBI code, calculate std dev and added options

parent 57a55b04
......@@ -10,6 +10,7 @@ set(psrdada_cpp_effelsberg_edd_src
src/DetectorAccumulator.cu
src/ScaledTransposeTFtoTFT.cu
src/VLBI.cu
src/Tools.cu
src/DadaBufferLayout.cpp
)
......
......@@ -159,18 +159,6 @@ __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t noOfSideChannels, size_t selectedSideChannel, const float *_baseLine);
/**
* @brief Sums all elements of an input array.
*
* @detail The results is stored in an array with one value per launch
* block. Full reuction thus requires two kernel launches.
*
* @param in. Input array.
* @param N. Size of input array.
* @param out. Output array.
* */
__global__ void array_sum(float *in, size_t N, float *out);
} // edd
......
......@@ -11,7 +11,7 @@ namespace edd {
namespace kernels {
// pack float to 2,4,8,16 bit integers
// pack float to 2,4,8,16 bit integers with linear scaling
template <unsigned int input_bit_depth>
__global__ void packNbit(const float *__restrict__ input,
uint32_t *__restrict__ output, size_t inputSize,
......
#ifndef PSRDADA_CPP_EFFELSBERG_EDD_TOOLS_CUH
#define PSRDADA_CPP_EFFELSBERG_EDD_TOOLS_CUH
// collection of multi purpose kernels
// blocksize for the array sum kernel
#define array_sum_Nthreads 1024
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
/**
* @brief Sums all elements of an input array.
*
* @detail The results is stored in an array with one value per launch
* block. Full reduction thus requires two kernel launches.
*
* @param in. Input array.
* @param N. Size of input array.
* @param out. Output array.
* */
__global__ void array_sum(float *in, size_t N, float *out);
/// Calculates 1/N \sum (x_i - 1/N offset)**2 per block
/// To calculate the std dev sum partial results of block using array sum
__global__ void scaled_square_offset_sum(float *in, size_t N, float* offset, float *out);
} // edd
} // effelsberg
} // psrdada_cpp
#endif //PSRDADA_CPP_EFFELSBERG_EDD_TOOLS_CUH
......@@ -149,6 +149,7 @@ private:
// process tmp
thrust::device_vector<float> _unpacked_voltage;
thrust::device_vector<float> _baseLineN;
// Output data
DoubleDeviceBuffer<uint32_t> _packed_voltage;
......@@ -163,6 +164,15 @@ private:
};
// pack float to 2 bit integers with VLBI non linear scaling with levels
// sigma = \sqrt(sigma2))
// -n * sigma, -1 signa, sigma, n * sigma
__global__ void pack2bit_nonLinear(const float *__restrict__ input,
uint32_t *__restrict__ output, size_t inputSize,
float n, float *sigma2);
} //namespace edd
} //namespace effelsberg
} //namespace psrdada_cpp
......
#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
#include "psrdada_cpp/common.hpp"
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
......@@ -65,43 +66,6 @@ __global__ void countBitSet(const uint64_t *sideChannelData, size_t N, size_t
}
// blocksize for the array sum kernel
#define array_sum_Nthreads 1024
__global__ void array_sum(float *in, size_t N, float *out) {
extern __shared__ float data[];
size_t tid = threadIdx.x;
float ls = 0;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
}
data[tid] = ls;
__syncthreads();
for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
if (tid < i) {
data[tid] += data[tid + i];
}
__syncthreads();
}
// unroll last warp
// if (tid < 32)
//{
// warpReduce(data, tid);
//}
if (tid == 0) {
out[blockIdx.x] = data[0];
}
}
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
const DadaBufferLayout &dadaBufferLayout,
......@@ -268,8 +232,8 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
throw std::runtime_error("Unsupported number of bits");
}
BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
BOOST_LOG_TRIVIAL(debug) << "Perform gating";
gating<<<1024, 1024, 0, _proc_stream>>>(
......
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"
#include "psrdada_cpp/effelsberg/edd/Packer.cuh"
//#include "psrdada_cpp/effelsberg/edd/Packer.cuh"
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
//#include "psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh"
#include "psrdada_cpp/cuda_utils.hpp"
#include <cuda.h>
......@@ -17,6 +17,9 @@ namespace effelsberg {
namespace edd {
template <class HandlerType>
VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
std::size_t speadHeapSize,
......@@ -48,7 +51,7 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
_packed_voltage.resize(n64bit_words * 64 / input_bitDepth / 16);
_spillOver.reserve(5000);
_spillOver.reserve(vdifHeader.getDataFrameLength() * 8);
BOOST_LOG_TRIVIAL(debug) << " Output voltages size: "
<< _packed_voltage.size() << " byte";
......@@ -78,6 +81,8 @@ template <class HandlerType> void VLBI<HandlerType>::init(RawBytes &block) {
headerInfo << "\n"
<< "# VLBI parameters: \n";
size_t bEnd = std::strlen(block.ptr());
if (bEnd + headerInfo.str().size() < block.total_bytes()) {
std::strcpy(block.ptr() + bEnd, headerInfo.str().c_str());
......@@ -85,10 +90,12 @@ template <class HandlerType> void VLBI<HandlerType>::init(RawBytes &block) {
BOOST_LOG_TRIVIAL(warning)
<< "Header of size " << block.total_bytes()
<< " bytes already contains " << bEnd
<< "bytes. Cannot add gated spectrometer info of size "
<< "bytes. Cannot add VLBI info of size "
<< headerInfo.str().size() << " bytes.";
}
_baseLineN.resize(array_sum_Nthreads);
_handler.init(block);
}
......@@ -137,12 +144,26 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
throw std::runtime_error("Unsupported number of bits");
}
// ToDo: Eventually calulate minV, maxV from mean and std.
float minV = -2;
float maxV = 2;
pack<2>(_unpacked_voltage, _packed_voltage.b(), minV, maxV, _proc_stream);
BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()), _unpacked_voltage.size(), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
BOOST_LOG_TRIVIAL(debug) << "Calculate std dev";
psrdada_cpp::effelsberg::edd::scaled_square_offset_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()), _unpacked_voltage.size(), thrust::raw_pointer_cast(_baseLineN.data()), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
// non linear packing
float v = 0.981599; // L. Kogan, Correction functions for digital correlators
// with two and four quantization levels, Radio Science 33 (1998), 1289-1296
BOOST_LOG_TRIVIAL(debug) << "Packing data with non linear 2-bit packaging using levels -v*sigma, 0, v*sigma with n = " << v;
_packed_voltage.b().resize(_unpacked_voltage.size() / 16);
BOOST_LOG_TRIVIAL(debug) << "Input size: " << _unpacked_voltage.size() << " elements";
BOOST_LOG_TRIVIAL(debug) << "Resizing output buffer to " << _packed_voltage.b().size() << " elements";
pack2bit_nonLinear<<<128, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage.data()),
thrust::raw_pointer_cast(_packed_voltage.b().data()),
_unpacked_voltage.size(), v, thrust::raw_pointer_cast(_baseLineN.data()));
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
......
#include "psrdada_cpp/effelsberg/edd/Tools.cuh"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
__global__ void array_sum(float *in, size_t N, float *out)
{
__shared__ float data[array_sum_Nthreads];
size_t tid = threadIdx.x;
float ls = 0;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
}
data[tid] = ls;
__syncthreads();
for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
if (tid < i) {
data[tid] += data[tid + i];
}
__syncthreads();
}
// unroll last warp
// if (tid < 32)
//{
// warpReduce(data, tid);
//}
if (tid == 0) {
out[blockIdx.x] = data[0];
}
}
__global__ void scaled_square_offset_sum(float *in, size_t N, float* offset, float *out) {
__shared__ float data[array_sum_Nthreads];
size_t tid = threadIdx.x;
double ls = 0;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
const double t = (in[i] - (*offset) / N);
ls += t * t;
}
data[tid] = ls / N;
__syncthreads();
for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
if (tid < i) {
data[tid] += data[tid + i];
}
__syncthreads();
}
if (tid == 0) {
out[blockIdx.x] = data[0];
}
}
} // edd
} // effelsberg
} // psrdada_cpp
#include "psrdada_cpp/effelsberg/edd/VLBI.cuh"
#include "psrdada_cpp/cuda_utils.hpp"
#include <cuda.h>
#include <cstdint>
#define EDD_NTHREADS_PACK 1024
#define NPACK 16
#define EDD_NTHREADS_PACK 1024
namespace psrdada_cpp {
namespace effelsberg {
......@@ -41,7 +42,7 @@ void setBitsWithValue(uint32_t &target, uint32_t firstBit, uint32_t lastBit, uin
target |= value;
}
/// get numerical value from the specified bits in the target
/// get numerical value from the specified bits in the target
uint32_t getBitsValue(const uint32_t &target, uint32_t firstBit, uint32_t lastBit)
{
uint32_t mask = bitMask(firstBit, lastBit);
......@@ -193,6 +194,61 @@ uint32_t VDIFHeader::getStationId() const
}
// pack float to 2 bit integers with VLBI non linear scaling with levels
// sigma = \sqrt(sigma2))
// -n * sigma, -1 signa, sigma, n * sigma
__global__ void pack2bit_nonLinear(const float *__restrict__ input,
uint32_t *__restrict__ output, size_t inputSize,
float v, float *sigma2) {
// number of values to pack into one output element, use 32 bit here to
// maximize number of threads
const uint8_t NPACK = 32 / 2;
__shared__ uint32_t tmp[1024];
const float vsigma = v * sqrt(*sigma2);
for (uint32_t i = NPACK * blockIdx.x * blockDim.x + threadIdx.x;
(i < inputSize); i += blockDim.x * gridDim.x * NPACK) {
tmp[threadIdx.x] = 0;
for (uint8_t j = 0; j < NPACK; j++) {
// Load new input value, clip and convert to Nbit integer
const float inp = input[i + j * blockDim.x];
uint32_t p = 0;
p += inp > ( - 1. * vsigma);
p += inp > ( 0);
p += inp > ( 1. * vsigma);
// this is more efficient than fmin, fmax for clamp and cast.
// store in shared memory with linear access
tmp[threadIdx.x] += p << (2 * j);
}
__syncthreads();
// load value from shared memory and rearange to output - the read value is
// reused per warp
uint32_t out = 0;
// bit mask: Thread 0 always first input_bit_depth bits, thread 1 always
// second input_bit_depth bits, ...
const uint32_t mask = ((1 << 2) - 1) << (2 * (threadIdx.x % NPACK));
#pragma unroll
for (uint32_t j = 0; j < NPACK; j++) {
uint32_t v = tmp[(threadIdx.x / NPACK) * NPACK + j] & mask;
// retrieve correct bits
v = v >> (2 * (threadIdx.x % NPACK));
v = v << (2 * j);
out += v;
}
size_t oidx = threadIdx.x / NPACK + (threadIdx.x % NPACK) * (blockDim.x / NPACK) + (i - threadIdx.x) / NPACK;
output[oidx] = out;
__syncthreads();
}
}
......
......@@ -39,6 +39,9 @@ int main(int argc, char **argv) {
std::string filename(buffer);
std::string output_type = "file";
int thread_id, station_id, payload_size;
double sampleRate;
/** Define and parse the program options
*/
namespace po = boost::program_options;
......@@ -68,7 +71,23 @@ int main(int argc, char **argv) {
"size of the spead data heaps. The number of the "
"heaps in the dada block depends on the number of "
"side channel items.");
desc.add_options()("thread_id",
po::value<int>()->default_value(0)->notifier(
[&thread_id](int in) { thread_id = in; }),
"Thread ID for the VDIF header");
desc.add_options()("station_id",
po::value<int>()->default_value(0)->notifier(
[&station_id](int in) { station_id = in; }),
"Station ID for the VDIF header");
desc.add_options()("payload_size",
po::value<int>()->default_value(5000)->notifier(
[&payload_size](int in) { payload_size = in; }),
"Payload size [bytes]");
desc.add_options()("sample_rate",
po::value<double>()->default_value(2.6E9)->notifier(
[&sampleRate](double in) { sampleRate = in; }),
"Sample rate of the input signal");
desc.add_options()(
"log_level", po::value<std::string>()->default_value("info")->notifier(
[](std::string level) { set_log_level(level); }),
......@@ -105,11 +124,12 @@ int main(int argc, char **argv) {
// ToDo: Options to set values
effelsberg::edd::VDIFHeader vdifHeader;
vdifHeader.setThreadId(0);
vdifHeader.setStationId(0);
vdifHeader.setDataFrameLength(payload_size / 8);
vdifHeader.setThreadId(thread_id);
vdifHeader.setStationId(station_id);
BOOST_LOG_TRIVIAL(warning) << "SETTING FIXED REFERENCE EPOCH AND SECONDS FROM EPOCH!! Should be read from data stream!!";
vdifHeader.setReferenceEpoch(123);
vdifHeader.setSecondsFromReferenceEpoch(42); // for first block
double sampleRate = 2.6E9;
std::cout << "Running with output_type: " << output_type << std::endl;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment