Commit f2332816 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Added stokes mode to gated_cli

parent 8ebc2515
......@@ -27,7 +27,9 @@ class DadaBufferLayout
// input key of the dadad buffer
// size of spead heaps in bytes
// number of side channels
DadaBufferLayout();
DadaBufferLayout(key_t input_key , size_t heapSize, size_t nSideChannels);
void intitialize(key_t input_key , size_t heapSize, size_t nSideChannels);
key_t getInputkey() const;
......
......@@ -44,14 +44,8 @@ typedef float IntegratedPowerType;
/// Input data and intermediate processing data for one polarization
struct PolarizationData
{
DadaBufferLayout _dadaBufferLayout;
size_t _fft_length;
size_t _batch;
// a buffer contains batch * fft_length samples
PolarizationData(size_t fft_length, size_t batch, const DadaBufferLayout
&dadaBufferLayout);
size_t _nbits;
PolarizationData(size_t nbits): _nbits(nbits) {};
/// Raw ADC Voltage
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
/// Side channel data
......@@ -75,25 +69,49 @@ struct PolarizationData
/// Swaps input buffers
void swap();
///resize the internal buffers
void resize(size_t rawVolttageBufferBytes, size_t nsidechannelitems, size_t channelized_samples);
};
struct SinglePolarizationInput : public PolarizationData
{
DadaBufferLayout _dadaBufferLayout;
size_t _fft_length;
size_t _batch;
// a buffer contains batch * fft_length samples
SinglePolarizationInput(size_t fft_length, size_t _batch, size_t nbits,
const DadaBufferLayout &dadaBufferLayout);
void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream);
size_t getSamplesPerInputPolarization()
{
return _dadaBufferLayout.sizeOfData() * 8 / _nbits;
}
};
/// Input data and intermediate processing data for two polarizations
struct DualPolarizationData
struct DualPolarizationInput
{
DadaBufferLayout _dadaBufferLayout;
size_t _fft_length;
size_t _batch;
PolarizationData polarization0, polarization1;
DualPolarizationData(size_t fft_length, size_t batch, const DadaBufferLayout
DualPolarizationInput(size_t fft_length, size_t batch, size_t nbit, const DadaBufferLayout
&dadaBufferLayout);
DadaBufferLayout _dadaBufferLayout;
PolarizationData polarization0, polarization1;
void swap();
void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream);
size_t getSamplesPerInputPolarization()
{
return _dadaBufferLayout.sizeOfData() * 8 / polarization0._nbits / 2;
}
};
......@@ -190,7 +208,7 @@ struct FullStokesOutput
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
FullStokesOutput(size_t size, size_t blocks)
{
I.resize(size * blocks);
Q.resize(size * blocks);
......@@ -200,10 +218,17 @@ struct FullStokesOutput
}
};
/*
struct GatedFullStokesOutput: public OutputDataStream
{
GatedFullStokesOutput(size_t nchans, size_t blocks): OutputDataStream(nchans, blocks / 2), G0(nchans, blocks / 2),
G1(nchans, blocks / 2)
{
BOOST_LOG_TRIVIAL(debug) << "Output with " << _blocks << " blocks a " << _nchans << " channels";
_host_power.resize( 8 * ( _nchans * sizeof(IntegratedPowerType) + sizeof(size_t) ) * _blocks);
BOOST_LOG_TRIVIAL(debug) << "Allocated " << _host_power.size() << " bytes.";
};
FullStokesOutput G0, G1;
void reset(cudaStream_t &_proc_stream)
......@@ -217,108 +242,104 @@ struct GatedFullStokesOutput: public OutputDataStream
{
G0.swap();
G1.swap();
_host_power.swap();
}
/// Resize all buffers
void resize(size_t size, size_t blocks)
{
G0.resize(size, blocks);
G1.resize(size, blocks);
}
void data2Host(cudaStream_t &_d2h_stream)
{
for (size_t i = 0; i < G0._noOfBitSets.size(); i++)
{
size_t memslicesize = (_nchans * sizeof(IntegratedPowerType));
size_t memOffset = 8 * i * (memslicesize + + sizeof(size_t));
size_t memOffset = 8 * i * (memslicesize + sizeof(size_t));
// Copy II QQ UU VV
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset) ,
static_cast<void *>(G0.I.b_ptr() + i * memslicesize),
static_cast<void *>(G0.I.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 1 * memslicesize) ,
static_cast<void *>(G1.I.b_ptr() + i * memslicesize),
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 1 * memslicesize) ,
static_cast<void *>(G1.I.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 2 * memslicesize) ,
static_cast<void *>(G0.Q.b_ptr() + i * memslicesize),
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 2 * memslicesize) ,
static_cast<void *>(G0.Q.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 3 * memslicesize) ,
static_cast<void *>(G1.Q.b_ptr() + i * memslicesize),
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 3 * memslicesize) ,
static_cast<void *>(G1.Q.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 4 * memslicesize) ,
static_cast<void *>(G0.U.b_ptr() + i * memslicesize),
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 4 * memslicesize) ,
static_cast<void *>(G0.U.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 5 * memslicesize) ,
static_cast<void *>(G1.U.b_ptr() + i * memslicesize),
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 5 * memslicesize) ,
static_cast<void *>(G1.U.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 6 * memslicesize) ,
static_cast<void *>(G0.V.b_ptr() + i * memslicesize),
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 6 * memslicesize) ,
static_cast<void *>(G0.V.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power_db.a_ptr() + memOffset + 7 * memslicesize) ,
static_cast<void *>(G1.V.b_ptr() + i * memslicesize),
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 7 * memslicesize) ,
static_cast<void *>(G1.V.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
// Copy SCI
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize),
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 1 * sizeof(size_t)),
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 1 * sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 2 * sizeof(size_t)),
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 2 * sizeof(size_t)),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 3 * sizeof(size_t)),
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 3 * sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 4 * sizeof(size_t)),
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 4 * sizeof(size_t)),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 5 * sizeof(size_t)),
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 5 * sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 6 * sizeof(size_t)),
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 6 * sizeof(size_t)),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power_db.a_ptr() + memOffset + 8 * memslicesize + 7 * sizeof(size_t)),
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 7 * sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
......@@ -330,7 +351,7 @@ struct GatedFullStokesOutput: public OutputDataStream
}
};
*/
......@@ -392,16 +413,16 @@ public:
*/
bool operator()(RawBytes &block);
private:
// Processing strategy for single pol mode
void process(PolarizationData *inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
void process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
// Processing strategy for dual pol power mode
//void process(DualPolarizationData*inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
//void process(DualPolarizationInput*inputDataStream, GatedPowerSpectrumOutput *outputDataStream);
// Processing strategy for full stokes mode
//void process(DualPolarizationData*inputDataStream, FullStokesOutput *outputDataStream);
void process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream);
private:
// gate the data from the input stream and fft data per gate. Number of bit
// sets in every gate is stored in the output datasets
void gated_fft(PolarizationData &data,
......@@ -412,12 +433,9 @@ private:
DadaBufferLayout _dadaBufferLayout;
std::size_t _fft_length;
std::size_t _naccumulate;
std::size_t _nbits;
std::size_t _selectedSideChannel;
std::size_t _selectedBit;
std::size_t _batch;
std::size_t _nsamps_per_output_spectra;
std::size_t _nsamps_per_buffer;
std::size_t _nsamps_per_heap;
HandlerType &_handler;
......@@ -478,6 +496,20 @@ __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
uint64_cu* stats_G1);
/**
* @brief calculate stokes IQUV from two complex valuies for each polarization
*/
__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V);
/**
* @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate
* complex spectra for individual polarizations
*/
__global__ void stokes_accumulate(float2 const __restrict__ *pol1,
float2 const __restrict__ *pol2, float *I, float* Q, float *U, float*V,
int nchans, int naccumulate);
} // edd
......
......@@ -12,6 +12,7 @@
#include <iomanip>
#include <cstring>
#include <sstream>
#include <typeinfo>
namespace psrdada_cpp {
namespace effelsberg {
......@@ -69,12 +70,12 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
std::size_t nbits, float input_level, float output_level, HandlerType
&handler) : _dadaBufferLayout(dadaBufferLayout),
_selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit),
_fft_length(fft_length), _naccumulate(naccumulate), _nbits(nbits),
_fft_length(fft_length), _naccumulate(naccumulate),
_handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096)
{
// Sanity checks
assert(((_nbits == 12) || (_nbits == 8)));
assert(((nbits == 12) || (nbits == 8)));
assert(_naccumulate > 0);
// check for any device errors
......@@ -94,28 +95,10 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
(selectedSideChannel < _dadaBufferLayout.getNSideChannels())); // Sanity check of side channel value
assert(selectedBit < 64); // Sanity check of selected bit
_nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
_nsamps_per_output_spectra = fft_length * naccumulate;
if (_nsamps_per_output_spectra <= _nsamps_per_buffer)
{ // one buffer block is used for one or multiple output spectra
size_t N = _nsamps_per_buffer / _nsamps_per_output_spectra;
// All data in one block has to be used
assert(N * _nsamps_per_output_spectra == _nsamps_per_buffer);
_nBlocks = 1;
}
else
{ // multiple blocks are integrated intoone output
size_t N = _nsamps_per_output_spectra / _nsamps_per_buffer;
// All data in multiple blocks has to be used
assert(N * _nsamps_per_buffer == _nsamps_per_output_spectra);
_nBlocks = N;
}
BOOST_LOG_TRIVIAL(debug) << "Integrating " << _nsamps_per_output_spectra <<
" samples from " << _nBlocks << " into one output spectrum.";
_nchans = _fft_length / 2 + 1;
int batch = _nsamps_per_buffer / _fft_length;
// Calculate the scaling parameters for 8 bit output
float dof = 2 * _naccumulate;
float scale =
std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2);
......@@ -125,26 +108,45 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
<< "Correction factors for 8-bit conversion: offset = " << offset
<< ", scaling = " << scaling;
// plan the FFT
size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
int batch = nsamps_per_buffer / _fft_length;
int n[] = {static_cast<int>(_fft_length)};
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n"
<< " fft_length = " << _fft_length << "\n"
<< " n[0] = " << n[0] << "\n"
<< " _nchans = " << _nchans << "\n"
<< " batch = " << batch << "\n";
;
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
1, _nchans, CUFFT_R2C, batch));
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
inputDataStream = new InputType(fft_length, batch, nbits, _dadaBufferLayout);
inputDataStream = new InputType(fft_length, batch, _dadaBufferLayout);
//How many output spectra per input block?
size_t nsamps_per_output_spectra = fft_length * naccumulate;
size_t nsamps_per_pol = inputDataStream->getSamplesPerInputPolarization();
if (nsamps_per_output_spectra <= nsamps_per_pol)
{ // one buffer block is used for one or multiple output spectra
size_t N = nsamps_per_pol / nsamps_per_output_spectra;
// All data in one block has to be used
assert(N * nsamps_per_output_spectra == nsamps_per_pol);
_nBlocks = 1;
}
else
{ // multiple blocks are integrated intoone output
size_t N = nsamps_per_output_spectra / nsamps_per_pol;
// All data in multiple blocks has to be used
assert(N * nsamps_per_pol == nsamps_per_output_spectra);
_nBlocks = N;
}
BOOST_LOG_TRIVIAL(debug) << "Integrating " << nsamps_per_output_spectra <<
" samples from " << _nBlocks << "blocks into one output spectrum.";
_unpacked_voltage_G0.resize(_nsamps_per_buffer);
_unpacked_voltage_G1.resize(_nsamps_per_buffer);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): "
<< _unpacked_voltage_G0.size();
// We unpack one pol at a time
_unpacked_voltage_G0.resize(nsamps_per_pol);
_unpacked_voltage_G1.resize(nsamps_per_pol);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size();
outputDataStream = new OutputType(_nchans, batch / (_naccumulate / _nBlocks));
......@@ -160,9 +162,12 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
template <class HandlerType, class InputType, class OutputType>
GatedSpectrometer<HandlerType, InputType, OutputType>::~GatedSpectrometer() {
BOOST_LOG_TRIVIAL(debug) << "Destroying GatedSpectrometer";
cudaDeviceSynchronize();
if (!_fft_plan)
if (_fft_plan)
cufftDestroy(_fft_plan);
delete inputDataStream;
delete outputDataStream;
cudaStreamDestroy(_h2d_stream);
cudaStreamDestroy(_proc_stream);
cudaStreamDestroy(_d2h_stream);
......@@ -176,11 +181,20 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::init(RawBytes &block
headerInfo << "\n"
<< "# Gated spectrometer parameters: \n"
<< "fft_length " << _fft_length << "\n"
<< "nchannels " << _fft_length /2 + 1 << "\n"
<< "nchannels " << _nchans << "\n"
<< "naccumulate " << _naccumulate << "\n"
<< "selected_side_channel " << _selectedSideChannel << "\n"
<< "selected_bit " << _selectedBit << "\n"
<< "output_bit_depth " << sizeof(IntegratedPowerType) * 8;
<< "output_bit_depth " << sizeof(IntegratedPowerType) * 8 << "\n"
<< "full_stokes_output ";
if (typeid(OutputType) == typeid(GatedFullStokesOutput))
{
headerInfo << "yes\n";
}
else
{
headerInfo << "no\n";
}
size_t bEnd = std::strlen(block.ptr());
if (bEnd + headerInfo.str().size() < block.total_bytes())
......@@ -208,7 +222,7 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft(
)
{
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
switch (_nbits) {
switch (data._nbits) {
case 8:
_unpacker->unpack<8>(data._raw_voltage.b(), _unpacked_voltage_G0);
break;
......@@ -242,8 +256,6 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft(
);
}
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
// only few output blocks per input block thus execution on only one thread.
// Important is that the execution is async on the GPU.
update_baselines<<<1,1,0, _proc_stream>>>(
......@@ -256,29 +268,23 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft(
_noOfBitSetsIn_G0.size()
);
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1";
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage";
UnpackedVoltageType *_unpacked_voltage_ptr =
thrust::raw_pointer_cast(_unpacked_voltage_G0.data());
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage";
ChannelisedVoltageType *_channelised_voltage_ptr =
thrust::raw_pointer_cast(data._channelised_voltage_G0.data());
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
(cufftComplex *)_channelised_voltage_ptr));
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2";
_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data());
_channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data());
CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr,
(cufftComplex *)_channelised_voltage_ptr));
CUDA_ERROR_CHECK(cudaDeviceSynchronize());
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
BOOST_LOG_TRIVIAL(debug) << "Exit processing";
} // process
......@@ -314,21 +320,21 @@ bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes
// process data
// check if new outblock is started: _call_count -1 because this is the block number on the device
bool newBlock = (((_call_count-1) * _nsamps_per_buffer) % _nsamps_per_output_spectra == 0);
bool newBlock = ((((_call_count-1) * inputDataStream->getSamplesPerInputPolarization()) % (_fft_length * _naccumulate) ) == 0);
// only if a newblock is started the output buffer is swapped. Otherwise the
// new data is added to it
if (newBlock)
{
BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
outputDataStream->swap();
outputDataStream->reset(_proc_stream);
}
BOOST_LOG_TRIVIAL(debug) << "Processing block.";
cudaDeviceSynchronize();
process(inputDataStream, outputDataStream);
cudaDeviceSynchronize();
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
BOOST_LOG_TRIVIAL(debug) << "Processing block finished.";
/// For one pol input and power out
/// ToDo: For two pol input and power out
......@@ -339,36 +345,12 @@ bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes
return false;
}
outputDataStream->data2Host(_d2h_stream);
outputDataStream->data2Host(_d2h_stream);
if (_call_count == 3) {
return false;
}
// // calculate off value
// BOOST_LOG_TRIVIAL(info) << "Buffer block: " << _call_count-3 << " with " << _noOfBitSetsIn_G0.size() << "x2 output heaps:";
// size_t total_samples_lost = 0;
// for (size_t i = 0; i < _noOfBitSetsIn_G0.size(); i++)
// {
// size_t memOffset = 2 * i * (_nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
//
// size_t* on_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType));
// size_t* off_values = reinterpret_cast<size_t*> (_host_power_db.b_ptr() + memOffset + 2 * _nchans * sizeof(IntegratedPowerType) + sizeof(size_t));
//
// size_t samples_lost = _nsamps_per_output_spectra - (*on_values) - (*off_values);
// total_samples_lost += samples_lost;
//
// BOOST_LOG_TRIVIAL(info) << " Heap " << i << ":\n"
// <<" Samples with bit set : " << *on_values << std::endl
// <<" Samples without bit set: " << *off_values << std::endl
// <<" Samples lost : " << samples_lost << " out of " << _nsamps_per_output_spectra << std::endl;
// }
// double efficiency = 1. - double(total_samples_lost) / (_nsamps_per_output_spectra * _noOfBitSetsIn_G0.size());
// double prev_average = _processing_efficiency / (_call_count- 3 - 1);
// _processing_efficiency += efficiency;
// double average = _processing_efficiency / (_call_count-3);
// BOOST_LOG_TRIVIAL(info) << "Total processing efficiency of this buffer block:" << std::setprecision(6) << efficiency << ". Run average: " << average << " (Trend: " << std::showpos << (average - prev_average) << ")";
//
// // Wrap in a RawBytes object here;
// Wrap in a RawBytes object here;
RawBytes bytes(reinterpret_cast<char *>(outputDataStream->_host_power.b_ptr()),
outputDataStream->_host_power.size(),
outputDataStream->_host_power.size());
......@@ -384,7 +366,7 @@ bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(PolarizationData *inputDataStream, GatedPowerSpectrumOutput *outputDataStream)
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(SinglePolarizationInput *inputDataStream, GatedPowerSpectrumOutput *outputDataStream)
{
gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());
......@@ -404,10 +386,51 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::process(Polarization
_naccumulate / _nBlocks,
1, 0., 1, 0);
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
}
template <class HandlerType, class InputType, class OutputType>
void GatedSpectrometer<HandlerType, InputType, OutputType>::process(DualPolarizationInput *inputDataStream, GatedFullStokesOutput *outputDataStream)
{
mergeSideChannels<<<1024, 1024, 0, _proc_stream>>>(thrust::raw_pointer_cast(inputDataStream->polarization0._sideChannelData.a().data()),
thrust::raw_pointer_cast(inputDataStream->polarization1._sideChannelData.a().data()), inputDataStream->polarization1._sideChannelData.a().size());
gated_fft(inputDataStream->polarization0, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());
gated_fft(inputDataStream->polarization1, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a());
stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>(
thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G0.data()),
thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G0.data()),
thrust::raw_pointer_cast(outputDataStream->G0.I.a().data()),
thrust::raw_pointer_cast(outputDataStream->G0.Q.a().data()),