Commit 09928fff authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Fixed fft_length, improved documentation

parent 281f2a39
......@@ -25,7 +25,7 @@ namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
/// Macro to manipulate single bits of an 64-bit type.
#define BIT_MASK(bit) (1uL << (bit))
#define SET_BIT(value, bit) ((value) |= BIT_MASK(bit))
#define CLEAR_BIT(value, bit) ((value) &= ~BIT_MASK(bit))
......@@ -39,12 +39,18 @@ typedef float UnpackedVoltageType;
typedef float2 ChannelisedVoltageType;
typedef float IntegratedPowerType;
/// Input data and intermediate processing data for one polarization
/**
@class PolarizationData
@brief Device data arrays for raw voltage input and intermediate processing data for one polarization
*/
struct PolarizationData
{
size_t _nbits;
/**
* @brief Constructor
*
* @param nbits Bit-depth of the input data.
*/
PolarizationData(size_t nbits): _nbits(nbits) {};
/// Raw ADC Voltage
DoubleDeviceBuffer<RawVoltageType> _raw_voltage;
......@@ -74,62 +80,116 @@ struct PolarizationData
};
struct SinglePolarizationInput : public PolarizationData
/**
@class SinglePolarizationInput
@brief Input data for a buffer containing one polarization
*/
class SinglePolarizationInput : public PolarizationData
{
DadaBufferLayout _dadaBufferLayout;
size_t _fft_length;
size_t _batch;
// a buffer contains batch * fft_length samples
SinglePolarizationInput(size_t fft_length, size_t _batch, size_t nbits,
public:
/**
* @brief Constructor
*
* @param fft_length length of the fft.
* @param nbits bit-depth of the input data.
* @param dadaBufferLayout layout of the input dada buffer
*/
SinglePolarizationInput(size_t fft_length, size_t nbits,
const DadaBufferLayout &dadaBufferLayout);
/// Copy data from input block to input dubble buffer
void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream);
size_t getSamplesPerInputPolarization()
{
return _dadaBufferLayout.sizeOfData() * 8 / _nbits;
}
/// Number of samples per input polarization
size_t getSamplesPerInputPolarization();
};
/// Input data and intermediate processing data for two polarizations
struct DualPolarizationInput
/**
@class SinglePolarizationInput
@brief Input data for a buffer containing two polarizations
*/
class DualPolarizationInput
{
DadaBufferLayout _dadaBufferLayout;
size_t _fft_length;
size_t _batch;
public:
PolarizationData polarization0, polarization1;
DualPolarizationInput(size_t fft_length, size_t batch, size_t nbit, const DadaBufferLayout
/**
* @brief Constructor
*
* @param fft_length length of the fft.
* @param nbits bit-depth of the input data.
* @param dadaBufferLayout layout of the input dada buffer
*/
DualPolarizationInput(size_t fft_length, size_t nbits, const DadaBufferLayout
&dadaBufferLayout);
/// Swaps input buffers for both polarizations
void swap();
/// Copy data from input block to input dubble buffer
void getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream);
size_t getSamplesPerInputPolarization()
{
return _dadaBufferLayout.sizeOfData() * 8 / polarization0._nbits / 2;
}
/// Number of samples per input polarization
size_t getSamplesPerInputPolarization();
};
/// INterface for the processed output data stream
/**
@class PowerSpectrumOutput
@brief Output data for one gate, single power spectrum
*/
struct PowerSpectrumOutput
{
/**
* @brief Constructor
*
* @param size size of the output, i.e. number of channels.
* @param blocks number of blocks in the output.
*/
PowerSpectrumOutput(size_t size, size_t blocks);
/// spectrum data
DoubleDeviceBuffer<IntegratedPowerType> data;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Swap output buffers and reset the buffer in given stream for new integration
void swap(cudaStream_t &_proc_stream);
};
/**
@class OutputDataStream
@brief Interface for the processed output data stream
*/
struct OutputDataStream
{
size_t _nchans;
size_t _blocks;
/**
* @brief Constructor
*
* @param nchans number of channels.
* @param blocks number of blocks in the output.
*/
OutputDataStream(size_t nchans, size_t blocks) : _nchans(nchans), _blocks(blocks)
{
}
/// Reset output to for new integration
virtual void reset(cudaStream_t &_proc_stream) = 0;
/// Swap output buffers
virtual void swap() = 0;
virtual void swap(cudaStream_t &_proc_stream) = 0;
// output buffer on the host
DoublePinnedHostBuffer<char> _host_power;
......@@ -140,216 +200,63 @@ struct OutputDataStream
};
// Output data for one gate, single power spectrum
struct PowerSpectrumOutput
{
PowerSpectrumOutput(size_t size, size_t blocks);
/// spectrum data
DoubleDeviceBuffer<IntegratedPowerType> data;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Reset outptu for new integration
void reset(cudaStream_t &_proc_stream);
/// Swap output buffers
void swap();
};
/**
@class GatedPowerSpectrumOutput
@brief Output Stream for power spectrum output
*/
struct GatedPowerSpectrumOutput : public OutputDataStream
{
GatedPowerSpectrumOutput(size_t nchans, size_t blocks);
/// Power spectrum for off and on gate
PowerSpectrumOutput G0, G1;
void reset(cudaStream_t &_proc_stream);
/// Swap output buffers
void swap();
void swap(cudaStream_t &_proc_stream);
void data2Host(cudaStream_t &_d2h_stream);
};
};
// Output data for one gate full stokes
/**
@class FullStokesOutput
@brief Output data for one gate full stokes
*/
struct FullStokesOutput
{
/// Stokes parameters
DoubleDeviceBuffer<IntegratedPowerType> I;
DoubleDeviceBuffer<IntegratedPowerType> Q;
DoubleDeviceBuffer<IntegratedPowerType> U;
DoubleDeviceBuffer<IntegratedPowerType> V;
/**
* @brief Constructor
*
* @param size size of the output, i.e. number of channels.
* @param blocks number of blocks in the output.
*/
FullStokesOutput(size_t size, size_t blocks);
/// Buffer for Stokes Parameters
DoubleDeviceBuffer<IntegratedPowerType> I, Q, U, V;
/// Number of samples integrated in this output block
DoubleDeviceBuffer<uint64_cu> _noOfBitSets;
/// Reset outptu for new integration
void reset(cudaStream_t &_proc_stream)
{
thrust::fill(thrust::cuda::par.on(_proc_stream), I.a().begin(), I.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), Q.a().begin(), Q.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), U.a().begin(), U.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), V.a().begin(), V.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
/// Swap output buffers
void swap()
{
I.swap();
Q.swap();
U.swap();
V.swap();
_noOfBitSets.swap();
}
/// Resize all buffers
FullStokesOutput(size_t size, size_t blocks)
{
I.resize(size * blocks);
Q.resize(size * blocks);
U.resize(size * blocks);
V.resize(size * blocks);
_noOfBitSets.resize(blocks);
}
void swap(cudaStream_t &_proc_stream);
};
/**
@class GatedPowerSpectrumOutput
@brief Output Stream for power spectrum output
*/
struct GatedFullStokesOutput: public OutputDataStream
{
GatedFullStokesOutput(size_t nchans, size_t blocks): OutputDataStream(nchans, blocks / 2), G0(nchans, blocks / 2),
G1(nchans, blocks / 2)
{
BOOST_LOG_TRIVIAL(debug) << "Output with " << _blocks << " blocks a " << _nchans << " channels";
_host_power.resize( 8 * ( _nchans * sizeof(IntegratedPowerType) + sizeof(size_t) ) * _blocks);
BOOST_LOG_TRIVIAL(debug) << "Allocated " << _host_power.size() << " bytes.";
};
/// stokes output for on/off gate
FullStokesOutput G0, G1;
void reset(cudaStream_t &_proc_stream)
{
G0.reset(_proc_stream);
G1.reset(_proc_stream);
}
/// Swap output buffers
void swap()
{
G0.swap();
G1.swap();
_host_power.swap();
}
void data2Host(cudaStream_t &_d2h_stream)
{
for (size_t i = 0; i < G0._noOfBitSets.size(); i++)
{
size_t memslicesize = (_nchans * sizeof(IntegratedPowerType));
size_t memOffset = 8 * i * (memslicesize + sizeof(size_t));
// Copy II QQ UU VV
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset) ,
static_cast<void *>(G0.I.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 1 * memslicesize) ,
static_cast<void *>(G1.I.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 2 * memslicesize) ,
static_cast<void *>(G0.Q.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 3 * memslicesize) ,
static_cast<void *>(G1.Q.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 4 * memslicesize) ,
static_cast<void *>(G0.U.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 5 * memslicesize) ,
static_cast<void *>(G1.U.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 6 * memslicesize) ,
static_cast<void *>(G0.V.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync(static_cast<void *>(_host_power.a_ptr() + memOffset + 7 * memslicesize) ,
static_cast<void *>(G1.V.b_ptr() + i * _nchans),
_nchans * sizeof(IntegratedPowerType),
cudaMemcpyDeviceToHost, _d2h_stream));
// Copy SCI
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 1 * sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 2 * sizeof(size_t)),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 3 * sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 4 * sizeof(size_t)),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 5 * sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 6 * sizeof(size_t)),
static_cast<void *>(G0._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
CUDA_ERROR_CHECK(
cudaMemcpyAsync( static_cast<void *>(_host_power.a_ptr() + memOffset + 8 * memslicesize + 7 * sizeof(size_t)),
static_cast<void *>(G1._noOfBitSets.b_ptr() + i ),
1 * sizeof(size_t),
cudaMemcpyDeviceToHost, _d2h_stream));
}
GatedFullStokesOutput(size_t nchans, size_t blocks);
/// Swap output buffers
void swap(cudaStream_t &_proc_stream);
}
void data2Host(cudaStream_t &_d2h_stream);
};
......@@ -359,7 +266,6 @@ G1(nchans, blocks / 2)
@class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template <class HandlerType,
class InputType,
......
......@@ -108,19 +108,7 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
<< "Correction factors for 8-bit conversion: offset = " << offset
<< ", scaling = " << scaling;
// plan the FFT
size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
int batch = nsamps_per_buffer / _fft_length;
int n[] = {static_cast<int>(_fft_length)};
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n"
<< " fft_length = " << _fft_length << "\n"
<< " n[0] = " << n[0] << "\n"
<< " _nchans = " << _nchans << "\n"
<< " batch = " << batch << "\n";
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
1, _nchans, CUFFT_R2C, batch));
inputDataStream = new InputType(fft_length, batch, nbits, _dadaBufferLayout);
inputDataStream = new InputType(fft_length, nbits, _dadaBufferLayout);
//How many output spectra per input block?
size_t nsamps_per_output_spectra = fft_length * naccumulate;
......@@ -143,6 +131,20 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer(
BOOST_LOG_TRIVIAL(debug) << "Integrating " << nsamps_per_output_spectra <<
" samples from " << _nBlocks << "blocks into one output spectrum.";
// plan the FFT
size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
int batch = nsamps_per_pol / _fft_length;
int n[] = {static_cast<int>(_fft_length)};
BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan: \n"
<< " fft_length = " << _fft_length << "\n"
<< " n[0] = " << n[0] << "\n"
<< " _nchans = " << _nchans << "\n"
<< " batch = " << batch << "\n";
CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, NULL,
1, _nchans, CUFFT_R2C, batch));
// We unpack one pol at a time
_unpacked_voltage_G0.resize(nsamps_per_pol);
_unpacked_voltage_G1.resize(nsamps_per_pol);
......@@ -328,8 +330,7 @@ bool GatedSpectrometer<HandlerType, InputType, OutputType>::operator()(RawBytes
{
BOOST_LOG_TRIVIAL(debug) << "Starting new output block.";
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
outputDataStream->swap();
outputDataStream->reset(_proc_stream);
outputDataStream->swap(_proc_stream);
}
BOOST_LOG_TRIVIAL(debug) << "Processing block.";
......
......@@ -185,13 +185,23 @@ void PolarizationData::resize(size_t rawVolttageBufferBytes, size_t nsidechannel
}
SinglePolarizationInput::SinglePolarizationInput(size_t fft_length, size_t batch, size_t nbits, const DadaBufferLayout
&dadaBufferLayout) : PolarizationData(nbits), _fft_length(fft_length), _batch(batch), _dadaBufferLayout(dadaBufferLayout)
SinglePolarizationInput::SinglePolarizationInput(size_t fft_length, size_t nbits, const DadaBufferLayout
&dadaBufferLayout) : PolarizationData(nbits), _fft_length(fft_length), _dadaBufferLayout(dadaBufferLayout)
{
size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
size_t _batch = nsamps_per_buffer / _fft_length;
resize(_dadaBufferLayout.sizeOfData(), _dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps(), (_fft_length / 2 + 1) * _batch);
};
size_t SinglePolarizationInput::getSamplesPerInputPolarization()
{
return _dadaBufferLayout.sizeOfData() * 8 / _nbits;
}
void PolarizationData::swap()
{
_raw_voltage.swap();
......@@ -220,13 +230,16 @@ void SinglePolarizationInput::getFromBlock(RawBytes &block, cudaStream_t &_h2d_s
}
DualPolarizationInput::DualPolarizationInput(size_t fft_length, size_t batch, size_t nbits, const DadaBufferLayout
DualPolarizationInput::DualPolarizationInput(size_t fft_length, size_t nbits, const DadaBufferLayout
&dadaBufferLayout) : _fft_length(fft_length),
_batch(batch),
polarization0(nbits),
polarization1(nbits),
_dadaBufferLayout(dadaBufferLayout)
{
size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits;
size_t _batch = nsamps_per_buffer / _fft_length / 2;
polarization0.resize(_dadaBufferLayout.sizeOfData() / 2, _dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps() / 2, (_fft_length / 2 + 1) * _batch);
polarization1.resize(_dadaBufferLayout.sizeOfData() / 2, _dadaBufferLayout.getNSideChannels() * _dadaBufferLayout.getNHeaps() / 2, (_fft_length / 2 + 1) * _batch);
};
......@@ -239,6 +252,12 @@ void DualPolarizationInput::swap()
}
size_t DualPolarizationInput::getSamplesPerInputPolarization()
{
return _dadaBufferLayout.sizeOfData() * 8 / polarization0._nbits / 2;
}
void DualPolarizationInput::getFromBlock(RawBytes &block, cudaStream_t &_h2d_stream)
{
// Copy the data with stride to the GPU:
......@@ -296,17 +315,12 @@ PowerSpectrumOutput::PowerSpectrumOutput(size_t size, size_t blocks)
}
void PowerSpectrumOutput::reset(cudaStream_t &_proc_stream)
{
thrust::fill(thrust::cuda::par.on(_proc_stream), data.a().begin(), data.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
void PowerSpectrumOutput::swap()
void PowerSpectrumOutput::swap(cudaStream_t &_proc_stream)
{
data.swap();
_noOfBitSets.swap();
thrust::fill(thrust::cuda::par.on(_proc_stream), data.a().begin(), data.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
......@@ -320,18 +334,11 @@ G1(nchans, blocks)
}
void GatedPowerSpectrumOutput::reset(cudaStream_t &_proc_stream)
{
G0.reset(_proc_stream);
G1.reset(_proc_stream);
}
/// Swap output buffers
void GatedPowerSpectrumOutput::swap()
void GatedPowerSpectrumOutput::swap(cudaStream_t &_proc_stream)
{
G0.swap();
G1.swap();
G0.swap(_proc_stream);
G1.swap(_proc_stream);
_host_power.swap();
}
......@@ -377,10 +384,147 @@ void GatedPowerSpectrumOutput::data2Host(cudaStream_t &_d2h_stream)
}
void FullStokesOutput::swap(cudaStream_t &_proc_stream)
{
I.swap();
Q.swap();
U.swap();
V.swap();
_noOfBitSets.swap();
thrust::fill(thrust::cuda::par.on(_proc_stream), I.a().begin(), I.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), Q.a().begin(), Q.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), U.a().begin(), U.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), V.a().begin(), V.a().end(), 0.);
thrust::fill(thrust::cuda::par.on(_proc_stream), _noOfBitSets.a().begin(), _noOfBitSets.a().end(), 0L);
}
FullStokesOutput::FullStokesOutput(size_t size, size_t blocks)
{
I.resize(size * blocks);