Unverified Commit f4f4ac60 authored by Tobias Winchen's avatar Tobias Winchen Committed by GitHub
Browse files

Merge pull request #10 from TobiasWinchen/devel

Fix for VLBI Timestamps and gated baseline calculation
parents 58f9e6ac bfbe9db7
......@@ -133,7 +133,8 @@ private:
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G0;
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _baseLineN;
thrust::device_vector<UnpackedVoltageType> _baseLineNG0;
thrust::device_vector<UnpackedVoltageType> _baseLineNG1;
cudaStream_t _h2d_stream;
cudaStream_t _proc_stream;
......@@ -169,8 +170,12 @@ private:
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float *_baseLine, uint64_cu* stats_G0, uint64_cu*
stats_G1);
const float baseLineG0,
const float baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0,
uint64_cu* stats_G1);
......
......@@ -141,6 +141,7 @@ private:
std::size_t _speadHeapSize;
std::size_t _outputBlockSize;
double _sampleRate, _digitizer_threshold;
size_t _samples_to_skip;
VDIFHeader _vdifHeader;
HandlerType &_handler;
......
......@@ -17,19 +17,44 @@ namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
template<typename T>
__device__ void reduce(T *x, const T &v)
{
x[threadIdx.x] = v;
__syncthreads();
for(int s = blockDim.x / 2; s > 0; s = s / 2)
{
if (threadIdx.x < s)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
}
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uint64_t* __restrict__ sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel,
const float* __restrict__ _baseLineN, uint64_cu* stats_G0, uint64_cu* stats_G1) {
float baseLine = (*_baseLineN) / N;
// statistics values for samopels to G0, G1
const float baseLineG0,
const float baseLineG1,
float* __restrict__ baseLineNG0,
float* __restrict__ baseLineNG1,
uint64_cu* stats_G0, uint64_cu* stats_G1) {
// float baseLineG0 = (*_baseLineNG0) / N;
// float baseLineG1 = (*_baseLineNG1) / N;
// statistics values for samopels to G0, G1
uint32_t _G0stats = 0;
uint32_t _G1stats = 0;
float baselineUpdateG0 = 0;
float baselineUpdateG1 = 0;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
const float w = G0[i] - baseLine;
const float v = G0[i];
const uint64_t sideChannelItem =
sideChannelData[((i / heapSize) * (noOfSideChannels)) +
selectedSideChannel]; // Probably not optimal access as
......@@ -39,71 +64,43 @@ __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uin
const unsigned int bit_set = TEST_BIT(sideChannelItem, bitpos);
const unsigned int heap_lost = TEST_BIT(sideChannelItem, 63);
G1[i] = w * bit_set * (!heap_lost) + baseLine;
G0[i] = w * (!bit_set) *(!heap_lost) + baseLine;
G1[i] = (v - baseLineG1) * bit_set * (!heap_lost) + baseLineG1;
G0[i] = (v - baseLineG0) * (!bit_set) *(!heap_lost) + baseLineG0;
_G0stats += (!bit_set) *(!heap_lost);
_G1stats += bit_set * (!heap_lost);
baselineUpdateG1 += v * bit_set * (!heap_lost);
baselineUpdateG0 += v * (!bit_set) *(!heap_lost);
}
__shared__ uint32_t x[1024];
// Reduce G0, G1
x[threadIdx.x] = _G0stats;
__syncthreads();
for(int s = blockDim.x / 2; s > 0; s = s / 2)
{
if (threadIdx.x < s)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
reduce<uint32_t>(x, _G0stats);
if(threadIdx.x == 0)
atomicAdd(stats_G0, (uint64_cu) x[threadIdx.x]);
x[threadIdx.x] = _G1stats;
__syncthreads();
for(int s = blockDim.x / 2; s > 0; s = s / 2)
{
if (threadIdx.x < s)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
reduce<uint32_t>(x, _G1stats);
if(threadIdx.x == 0)
{
uint64_cu y = x[threadIdx.x];
atomicAdd(stats_G1, y) ;
}
}
atomicAdd(stats_G1, (uint64_cu) x[threadIdx.x]);
//reuse shared array
float *y = (float*) x;
__global__ void countBitSet(const uint64_t *sideChannelData, size_t N, size_t
bitpos, size_t noOfSideChannels, size_t selectedSideChannel, size_t
*nBitsSet)
{
// really not optimized reduction, but here only trivial array sizes.
// run only in one block!
__shared__ uint64_t x[1024];
uint64_t ls = 0;
for (uint64_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
ls += TEST_BIT(sideChannelData[i * noOfSideChannels + selectedSideChannel], bitpos);
}
x[threadIdx.x] = ls;
//update the baseline array
reduce<float>(y, baselineUpdateG0);
if(threadIdx.x == 0)
atomicAdd(baseLineNG0, y[threadIdx.x]);
__syncthreads();
for(int s = blockDim.x / 2; s > 0; s = s / 2)
{
if (threadIdx.x < s)
x[threadIdx.x] += x[threadIdx.x + s];
__syncthreads();
}
reduce<float>(y, baselineUpdateG1);
if(threadIdx.x == 0)
nBitsSet[0] += x[threadIdx.x];
atomicAdd(baseLineNG1, y[threadIdx.x]);
}
template <class HandlerType, typename IntegratedPowerType>
GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
const DadaBufferLayout &dadaBufferLayout,
......@@ -181,7 +178,8 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
_unpacked_voltage_G0.resize(_nsamps_per_buffer);
_unpacked_voltage_G1.resize(_nsamps_per_buffer);
_baseLineN.resize(array_sum_Nthreads);
_baseLineNG0.resize(1);
_baseLineNG1.resize(1);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): "
<< _unpacked_voltage_G0.size();
_channelised_voltage.resize(_nchans * batch);
......@@ -273,12 +271,17 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
throw std::runtime_error("Unsupported number of bits");
}
BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, 0, _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
//calculate baseline from previos block
BOOST_LOG_TRIVIAL(debug) << "Perform gating";
float baseLineG0 = _baseLineNG0[0];
float baseLineG1 = _baseLineNG1[0];
uint64_t NG0 = 0;
uint64_t NG1 = 0;
// Loop over outputblocks, for case of multiple output blocks per input block
for (size_t i = 0; i < noOfBitSetsIn_G0.size(); i++)
{ // ToDo: Should be in one kernel call
gating<<<1024, 1024, 0, _proc_stream>>>(
......@@ -286,11 +289,20 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::process(
thrust::raw_pointer_cast(_unpacked_voltage_G1.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
thrust::raw_pointer_cast(sideChannelData.data() + i * sideChannelData.size() / noOfBitSetsIn_G0.size()),
_unpacked_voltage_G0.size() / noOfBitSetsIn_G0.size(), _dadaBufferLayout.getHeapSize(), _selectedBit, _dadaBufferLayout.getNSideChannels(),
_selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()),
_selectedSideChannel,
baseLineG0, baseLineG1,
thrust::raw_pointer_cast(_baseLineNG0.data()),
thrust::raw_pointer_cast(_baseLineNG1.data()),
thrust::raw_pointer_cast(noOfBitSetsIn_G0.data() + i),
thrust::raw_pointer_cast(noOfBitSetsIn_G1.data() + i)
);
NG0 += noOfBitSetsIn_G0[i];
NG1 += noOfBitSetsIn_G1[i];
}
_baseLineNG0[0] /= NG0;
_baseLineNG1[0] /= NG1;
BOOST_LOG_TRIVIAL(debug) << "Updating Baselines\n G0: " << baseLineG0 << " -> " << _baseLineNG0[0] << ", " << baseLineG1 << " -> " << _baseLineNG1[0] ;
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
UnpackedVoltageType *_unpacked_voltage_ptr =
......@@ -342,7 +354,11 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
static_cast<void *>(_sideChannelData_db.a_ptr()),
static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
_dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16) << std::setfill('0') << std::hex << (reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()))[0] << std::dec;
BOOST_LOG_TRIVIAL(debug) << "First side channel item: 0x" << std::setw(16)
<< std::setfill('0') << std::hex <<
(reinterpret_cast<uint64_t*>(block.ptr() + _dadaBufferLayout.sizeOfData()
+ _dadaBufferLayout.sizeOfGap()))[0] <<
std::dec;
if (_call_count == 1) {
......@@ -420,7 +436,7 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
total_samples_lost += samples_lost;
BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n"
BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n"
<<" Samples with bit set : " << *on_values << std::endl
<<" Samples without bit set: " << *off_values << std::endl
<<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
......
......@@ -99,8 +99,15 @@ template <class HandlerType> void VLBI<HandlerType>::init(RawBytes &block) {
return;
}
size_t timestamp = sync_time + sample_clock_start / _sampleRate;
BOOST_LOG_TRIVIAL(info) << "POSIX timestamp captured from header: " << timestamp << " = " << sync_time << " + " << sample_clock_start << " / " << _sampleRate << " = SYNC_TIME + SAMPLE_CLOCK_START/SAMPLERATE" ;
// timestamp of first sample
double fractional_timestamp = sync_time + sample_clock_start / _sampleRate;
// VDIF starts only at full seconds
size_t timestamp = (size_t) fractional_timestamp + 1;
// We thus have to skip some samples at the beginning
_samples_to_skip = (timestamp - fractional_timestamp) * _sampleRate;
BOOST_LOG_TRIVIAL(info) << "POSIX timestamp for first VDIF package: " << timestamp << " = " << sync_time << " + " << sample_clock_start << " / " << _sampleRate + 1 << " = SYNC_TIME + SAMPLE_CLOCK_START/SAMPLERATE + 1";
BOOST_LOG_TRIVIAL(info) << "Fractional timestamp: " << fractional_timestamp << ", sampels to skip: " << _samples_to_skip;
_vdifHeader.setTimeReferencesFromTimestamp(timestamp);
std::stringstream headerInfo;
......@@ -223,9 +230,15 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
////////////////////////////////////////////////////////////////////////
BOOST_LOG_TRIVIAL(debug) << " - Copy Data back to host";
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
if (_samples_to_skip > _packed_voltage.size())
{
BOOST_LOG_TRIVIAL(debug) << " - Skipping full block for second alignemnt";
_samples_to_skip -= _packed_voltage.size();
return false;
}
const size_t outputBlockSize = _vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize;
const size_t totalSizeOfData = _packed_voltage.size() + _spillOver.size(); // current array + remaining of previous
const size_t totalSizeOfData = _packed_voltage.size() + _spillOver.size() - _samples_to_skip; // current array + remaining of previous - samples to skip at the beginning
size_t numberOfBlocksInOutput = totalSizeOfData / outputBlockSize;
size_t remainingBytes = outputBlockSize - _spillOver.size();
BOOST_LOG_TRIVIAL(debug) << " - Number of blocks in output "
......@@ -242,7 +255,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
BOOST_LOG_TRIVIAL(debug) << " - Copying remaining " << remainingBytes
<< " bytes for first block";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_outputBuffer.a_ptr() + vlbiHeaderSize + _spillOver.size()),
static_cast<void *>(_packed_voltage.a_ptr()),
static_cast<void *>(_packed_voltage.a_ptr() + _samples_to_skip),
remainingBytes, cudaMemcpyDeviceToHost,
_d2h_stream));
......@@ -255,7 +268,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
<< " blocks of " << outputBlockSize << " bytes";
CUDA_ERROR_CHECK(cudaMemcpy2DAsync(
(void *)(_outputBuffer.a_ptr() + outputBlockSize + 2 * vlbiHeaderSize),
dpitch, (void *)thrust::raw_pointer_cast(_packed_voltage.a_ptr() +
dpitch, (void *)thrust::raw_pointer_cast(_packed_voltage.a_ptr() + _samples_to_skip +
remainingBytes),
spitch, width, height, cudaMemcpyDeviceToHost, _d2h_stream));
......@@ -266,9 +279,12 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
<< " bytes with offset " << offset;
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void *>(thrust::raw_pointer_cast(_spillOver.data())),
static_cast<void *>(_packed_voltage.a_ptr() + offset),
static_cast<void *>(_packed_voltage.a_ptr() + offset + _samples_to_skip),
_spillOver.size(), cudaMemcpyDeviceToHost, _d2h_stream));
// ONly skip sampels in first output block to achieve alignement
_samples_to_skip = 0;
// fill in header data
const uint32_t samplesPerDataFrame = outputBlockSize * 8 / _output_bitDepth;
const uint32_t dataFramesPerSecond = _sampleRate / samplesPerDataFrame;
......
......@@ -52,15 +52,16 @@ TEST(GatedSpectrometer, BitManipulationMacros) {
TEST(GatedSpectrometer, GatingKernel)
{
size_t blockSize = 1024;
size_t nBlocks = 16 * 1024;
const size_t blockSize = 1024;
const size_t nBlocks = 16 * 1024;
thrust::device_vector<float> G0(blockSize * nBlocks);
thrust::device_vector<float> G1(blockSize * nBlocks);
thrust::device_vector<uint64_t> _sideChannelData(nBlocks);
thrust::device_vector<psrdada_cpp::effelsberg::edd::uint64_cu> _nG0(1);
thrust::device_vector<psrdada_cpp::effelsberg::edd::uint64_cu> _nG1(1);
thrust::device_vector<float> baseLine(1);
thrust::device_vector<psrdada_cpp::effelsberg::edd::uint64_cu> _nG0(nBlocks);
thrust::device_vector<psrdada_cpp::effelsberg::edd::uint64_cu> _nG1(nBlocks);
thrust::device_vector<float> baseLineG0(1);
thrust::device_vector<float> baseLineG1(1);
thrust::fill(G0.begin(), G0.end(), 42);
thrust::fill(G1.begin(), G1.end(), 23);
......@@ -70,50 +71,62 @@ TEST(GatedSpectrometer, GatingKernel)
{
thrust::fill(_nG0.begin(), _nG0.end(), 0);
thrust::fill(_nG1.begin(), _nG1.end(), 0);
baseLine[0] = 0.;
baseLineG0[0] = 0.;
baseLineG1[0] = 0.;
const uint64_t *sideCD =
(uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data()));
psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>(
psrdada_cpp::effelsberg::edd::gating<<<1024 , 1024>>>(
thrust::raw_pointer_cast(G0.data()),
thrust::raw_pointer_cast(G1.data()), sideCD,
G0.size(), blockSize, 0, 1,
0, thrust::raw_pointer_cast(baseLine.data()),
G0.size(), G0.size(), 0, 1,
0,
-3., -4,
thrust::raw_pointer_cast(baseLineG0.data()),
thrust::raw_pointer_cast(baseLineG1.data()),
thrust::raw_pointer_cast(_nG0.data()),
thrust::raw_pointer_cast(_nG1.data())
);
thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax;
minmax = thrust::minmax_element(G0.begin(), G0.end());
EXPECT_EQ(*minmax.first, 42);
EXPECT_EQ(*minmax.second, 42);
minmax = thrust::minmax_element(G1.begin(), G1.end());
EXPECT_EQ(*minmax.first, 0);
EXPECT_EQ(*minmax.second, 0);
EXPECT_EQ(*minmax.first, -4.);
EXPECT_EQ(*minmax.second, -4.);
EXPECT_EQ(_nG0[0], G0.size());
EXPECT_EQ(_nG1[0], 0u);
EXPECT_FLOAT_EQ(baseLineG0[0] / (_nG0[0] + 1E-127), 42.f);
EXPECT_FLOAT_EQ(baseLineG1[0] / (_nG1[0] + 1E-127), 0.f);
}
// everything to G1 // with baseline -5
{
thrust::fill(_nG0.begin(), _nG0.end(), 0);
thrust::fill(_nG1.begin(), _nG1.end(), 0);
baseLine[0] = -5. * G0.size();
baseLineG0[0] = 0.;
baseLineG1[0] = 0.;
thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L);
const uint64_t *sideCD =
(uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data()));
psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>(
thrust::raw_pointer_cast(G0.data()),
thrust::raw_pointer_cast(G1.data()), sideCD,
G0.size(), blockSize, 0, 1,
0, thrust::raw_pointer_cast(baseLine.data()),
G0.size(), G0.size(), 0, 1,
0,
5., -2.,
thrust::raw_pointer_cast(baseLineG0.data()),
thrust::raw_pointer_cast(baseLineG1.data()),
thrust::raw_pointer_cast(_nG0.data()),
thrust::raw_pointer_cast(_nG1.data())
);
thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax;
minmax = thrust::minmax_element(G0.begin(), G0.end());
EXPECT_EQ(*minmax.first, -5.);
EXPECT_EQ(*minmax.second, -5.);
EXPECT_EQ(*minmax.first, 5.);
EXPECT_EQ(*minmax.second, 5.);
minmax = thrust::minmax_element(G1.begin(), G1.end());
EXPECT_EQ(*minmax.first, 42);
......@@ -121,54 +134,12 @@ TEST(GatedSpectrometer, GatingKernel)
EXPECT_EQ(_nG0[0], 0u);
EXPECT_EQ(_nG1[0], G1.size());
EXPECT_FLOAT_EQ(baseLineG0[0] / (_nG0[0] + 1E-127), 0.);
EXPECT_FLOAT_EQ(baseLineG1[0] / (_nG1[0] + 1E-127), 42.);
}
}
TEST(GatedSpectrometer, countBitSet) {
size_t nBlocks = 100000;
int nSideChannels = 4;
thrust::device_vector<uint64_t> _sideChannelData(nBlocks * nSideChannels);
thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0L);
const uint64_t *sideCD =
(uint64_t *)(thrust::raw_pointer_cast(_sideChannelData.data()));
thrust::device_vector<size_t> res(1);
// test 1 side channel
res[0] = 0;
psrdada_cpp::effelsberg::edd::
countBitSet<<<1, 1024>>>(
sideCD, nBlocks, 0, 1, 0, thrust::raw_pointer_cast(res.data()));
EXPECT_EQ(res[0], 0u);
res[0] = 0;
thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L);
psrdada_cpp::effelsberg::edd::countBitSet<<<1, 1024>>>(
sideCD, nBlocks, 0, 1, 0, thrust::raw_pointer_cast(res.data()));
EXPECT_EQ(res[0], nBlocks);
// test mult side channels w stride.
res[0] = 0;
thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 0L);
for (size_t i = 2; i < _sideChannelData.size(); i += nSideChannels)
_sideChannelData[i] = 1L;
psrdada_cpp::effelsberg::edd::countBitSet<<<1, 1024>>>(
sideCD, nBlocks, 0, nSideChannels, 3,
thrust::raw_pointer_cast(res.data()));
cudaDeviceSynchronize();
EXPECT_EQ(0U, res[0]);
res[0] = 0;
psrdada_cpp::effelsberg::edd::countBitSet<<<1, 1024>>>(
sideCD, nBlocks, 0, nSideChannels, 2,
thrust::raw_pointer_cast(res.data()));
cudaDeviceSynchronize();
EXPECT_EQ(nBlocks, res[0]);
}
TEST(GatedSpectrometer, array_sum) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment