Commit fb5726c1 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Calculate baseline

parent f4d097c8
......@@ -20,7 +20,7 @@ install(TARGETS fft_spectrometer DESTINATION bin)
#Gated FFT spectrometer interface
cuda_add_executable(gated_spectrometer src/GatedSpectrometer_cli.cu)
target_link_libraries(gated_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
target_link_libraries(gated_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas)
install(TARGETS gated_spectrometer DESTINATION bin)
add_subdirectory(test)
......
......@@ -11,6 +11,8 @@
#include "thrust/device_vector.h"
#include "cufft.h"
#include "cublas_v2.h"
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
......@@ -122,6 +124,8 @@ private:
thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1;
thrust::device_vector<ChannelisedVoltageType> _channelised_voltage;
thrust::device_vector<UnpackedVoltageType> _baseLineN;
DoublePinnedHostBuffer<IntegratedPowerType> _host_power_db;
cudaStream_t _h2d_stream;
......@@ -135,9 +139,9 @@ private:
*
* @detail The resulting gaps are filled with zeros in the other stream.
*
* @param GO Input data. Data is set to zero if corresponding
* @param GO Input data. Data is set to the baseline value if corresponding
* sideChannelData bit at bitpos os set.
* @param G1 Data in this array is set to zero if corresponding
* @param G1 Data in this array is set to the baseline value if corresponding
* sideChannelData bit at bitpos is not set.
* @param sideChannelData noOfSideChannels items per block of heapSize
* bytes in the input data.
......@@ -149,11 +153,24 @@ private:
* data.
* @param selectedSideChannel No. of side channel item to be eveluated.
0 <= selectedSideChannel < noOfSideChannels.
*
*/
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel);
size_t noOfSideChannels, size_t selectedSideChannel, const float *_baseLine);
/**
* @brief Sums all elements of an input array.
*
* @detail The results is stored in an array with one value per launch
* block. Full reuction thus requires two kernel launches.
*
* @param in. Input array.
* @param N. Size of input array.
* @param out. Output array.
* */
__global__ void array_sum(float *in, size_t N, float *out);
} // edd
......
......@@ -11,13 +11,13 @@ namespace psrdada_cpp {
namespace effelsberg {
namespace edd {
__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const int64_t* __restrict__ sideChannelData,
size_t N, size_t heapSize, size_t bitpos,
size_t noOfSideChannels, size_t selectedSideChannel) {
for (int i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) {
float baseLine = *_baseLineN / N;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
const float w = G0[i];
const float w = G0[i] - baseLine;
const int64_t sideChannelItem =
sideChannelData[((i / heapSize) * (noOfSideChannels)) +
selectedSideChannel]; // Probably not optimal access as
......@@ -26,8 +26,8 @@ __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
// handled by cache?
const int bit_set = TEST_BIT(sideChannelItem, bitpos);
G1[i] = w * bit_set;
G0[i] = w * (!bit_set);
G1[i] = w * bit_set + baseLine;
G0[i] = w * (!bit_set) + baseLine;
}
}
......@@ -61,6 +61,43 @@ __global__ void countBitSet(const int64_t *sideChannelData, size_t N, size_t
}
// blocksize for the array sum kernel
#define array_sum_Nthreads 1024
__global__ void array_sum(float *in, size_t N, float *out) {
extern __shared__ float data[];
size_t tid = threadIdx.x;
float ls = 0;
for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N);
i += blockDim.x * gridDim.x) {
ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array
}
data[tid] = ls;
__syncthreads();
for (size_t i = blockDim.x / 2; i > 0; i /= 2) {
if (tid < i) {
data[tid] += data[tid + i];
}
__syncthreads();
}
// unroll last warp
// if (tid < 32)
//{
// warpReduce(data, tid);
//}
if (tid == 0) {
out[blockIdx.x] = data[0];
}
}
template <class HandlerType>
GatedSpectrometer<HandlerType>::GatedSpectrometer(
std::size_t buffer_bytes, std::size_t nSideChannels,
......@@ -128,6 +165,8 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer(
<< _raw_voltage_db.size();
_unpacked_voltage_G0.resize(nsamps_per_buffer);
_unpacked_voltage_G1.resize(nsamps_per_buffer);
_baseLineN.resize(array_sum_Nthreads);
BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): "
<< _unpacked_voltage_G0.size();
_channelised_voltage.resize(_nchans * batch);
......@@ -187,8 +226,9 @@ void GatedSpectrometer<HandlerType>::process(
default:
throw std::runtime_error("Unsupported number of bits");
}
// raw voltage buffer is free again
//CUDA_ERROR_CHECK(cudaEventRecord(_procB, _proc_stream));
BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data()));
BOOST_LOG_TRIVIAL(debug) << "Perform gating";
gating<<<1024, 1024, 0, _proc_stream>>>(
......@@ -196,7 +236,7 @@ void GatedSpectrometer<HandlerType>::process(
thrust::raw_pointer_cast(_unpacked_voltage_G1.data()),
thrust::raw_pointer_cast(sideChannelData.data()),
_unpacked_voltage_G0.size(), _speadHeapSize, _selectedBit, _nSideChannels,
_selectedSideChannel);
_selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data()));
countBitSet<<<(sideChannelData.size()+255)/256, 256, 0,
_proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data()),
......@@ -306,7 +346,9 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) {
// The handler can't do anything asynchronously without a copy here
// as it would be unsafe (given that it does not own the memory it
// is being passed).
return _handler(bytes);
_handler(bytes);
return false; //
} // operator ()
} // edd
......
......@@ -25,6 +25,7 @@ const size_t ERROR_UNHANDLED_EXCEPTION = 2;
} // namespace
int main(int argc, char **argv) {
try {
key_t input_key;
......@@ -42,6 +43,7 @@ int main(int argc, char **argv) {
char buffer[32];
std::strftime(buffer, 32, "%Y-%m-%d-%H:%M:%S.bp", ptm);
std::string filename(buffer);
std::string output_type = "file";
/** Define and parse the program options
*/
......@@ -55,25 +57,29 @@ int main(int argc, char **argv) {
[&input_key](std::string in) { input_key = string_to_key(in); }),
"The shared memory key for the dada buffer to connect to (hex "
"string)");
desc.add_options()("fft_length,n", po::value<int>(&fft_length)->required(),
"The length of the FFT to perform on the data");
desc.add_options()("naccumulate,a",
po::value<int>(&naccumulate)->required(),
"The number of samples to integrate in each channel");
desc.add_options()("nsidechannelitems,n",
desc.add_options()(
"output_type", po::value<std::string>(&output_type)->default_value(output_type),
"output type [dada, file]. Default is file."
);
desc.add_options()(
"output_key,o", po::value<std::string>(&filename)->default_value(filename),
"The key of the output bnuffer / name of the output file to write spectra "
"to");
desc.add_options()("nsidechannelitems,s",
po::value<size_t>()->default_value(0)->notifier(
[&nSideChannels](size_t in) { nSideChannels = in; }),
"Number of side channel items");
desc.add_options()(
"selected_sidechannel",
"selected_sidechannel,e",
po::value<size_t>()->default_value(0)->notifier(
[&selectedSideChannel](size_t in) { selectedSideChannel = in; }),
"Side channel selected for evaluation.");
desc.add_options()("selected_bit",
desc.add_options()("selected_bit,B",
po::value<size_t>()->default_value(63)->notifier(
[&selectedBit](size_t in) { selectedBit = in; }),
"Side channel selected for evaluation.");
desc.add_options()("speadheap_size,s",
desc.add_options()("speadheap_size",
po::value<size_t>()->default_value(4096)->notifier(
[&speadHeapSize](size_t in) { speadHeapSize = in; }),
"size of the spead data heaps. The number of the "
......@@ -82,6 +88,14 @@ int main(int argc, char **argv) {
desc.add_options()("nbits,b", po::value<int>(&nbits)->required(),
"The number of bits per sample in the "
"packetiser output (8 or 12)");
desc.add_options()("fft_length,n", po::value<int>(&fft_length)->required(),
"The length of the FFT to perform on the data");
desc.add_options()("naccumulate,a",
po::value<int>(&naccumulate)->required(),
"The number of samples to integrate in each channel");
desc.add_options()("input_level",
po::value<float>(&input_level)->required(),
"The input power level (standard "
......@@ -91,10 +105,6 @@ int main(int argc, char **argv) {
"The output power level (standard "
"deviation, used for 8-bit "
"conversion)");
desc.add_options()(
"outfile,o", po::value<std::string>(&filename)->default_value(filename),
"The output file to write spectra "
"to");
desc.add_options()(
"log_level", po::value<std::string>()->default_value("info")->notifier(
[](std::string level) { set_log_level(level); }),
......@@ -114,7 +124,12 @@ int main(int argc, char **argv) {
<< desc << std::endl;
return SUCCESS;
}
po::notify(vm);
if (vm .count("output_type") && (!(output_type == "dada" || output_type == "file") ))
{
throw po::validation_error(po::validation_error::invalid_option_value, "output_type", output_type);
}
} catch (po::error &e) {
std::cerr << "ERROR: " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
......@@ -129,14 +144,36 @@ int main(int argc, char **argv) {
//client.cuda_register_memory();
std::size_t buffer_bytes = client.data_buffer_size();
SimpleFileWriter sink(filename);
effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer(
buffer_bytes, nSideChannels, selectedSideChannel, selectedBit,
speadHeapSize, fft_length, naccumulate, nbits, input_level,
output_level, sink);
DadaInputStream<decltype(spectrometer)> istream(input_key, log,
spectrometer);
istream.start();
std::cout << "Running with output_type: " << output_type << std::endl;
if (output_type == "file")
{
SimpleFileWriter sink(filename);
effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer(
buffer_bytes, nSideChannels, selectedSideChannel, selectedBit,
speadHeapSize, fft_length, naccumulate, nbits, input_level,
output_level, sink);
DadaInputStream<decltype(spectrometer)> istream(input_key, log,
spectrometer);
istream.start();
}
else if (output_type == "dada")
{
DadaOutputStream sink(string_to_key(filename), log);
effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer(
buffer_bytes, nSideChannels, selectedSideChannel, selectedBit,
speadHeapSize, fft_length, naccumulate, nbits, input_level,
output_level, sink);
DadaInputStream<decltype(spectrometer)> istream(input_key, log,
spectrometer);
istream.start();
}
else
{
throw std::runtime_error("Unknown oputput-type");
}
/**
* End of application code
*/
......
......@@ -12,5 +12,5 @@ set(
src/GatedSpectrometerTest.cu
)
cuda_add_executable(gtest_edd ${gtest_edd_src} )
target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES})
target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas)
add_test(gtest_edd gtest_edd --test_data "${CMAKE_CURRENT_LIST_DIR}/data")
......@@ -9,7 +9,7 @@
namespace {
TEST(BitManipulationMacros, SetBit_TestBit) {
TEST(GatedSpectrometer, BitManipulationMacros) {
for (int i = 0; i < 64; i++) {
int64_t v = 0;
SET_BIT(v, i);
......@@ -62,6 +62,7 @@ TEST(GatedSpectrometer, GatingKernel)
thrust::device_vector<float> G0(blockSize * nBlocks);
thrust::device_vector<float> G1(blockSize * nBlocks);
thrust::device_vector<int64_t> _sideChannelData(nBlocks);
thrust::device_vector<float> baseLine(1);
thrust::fill(G0.begin(), G0.end(), 42);
thrust::fill(G1.begin(), G1.end(), 23);
......@@ -69,13 +70,14 @@ TEST(GatedSpectrometer, GatingKernel)
// everything to G0
{
baseLine[0] = 0.;
const int64_t *sideCD =
(int64_t *)(thrust::raw_pointer_cast(_sideChannelData.data()));
psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>(
thrust::raw_pointer_cast(G0.data()),
thrust::raw_pointer_cast(G1.data()), sideCD,
G0.size(), blockSize, 0, 1,
0);
0, thrust::raw_pointer_cast(baseLine.data()));
thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax;
minmax = thrust::minmax_element(G0.begin(), G0.end());
EXPECT_EQ(*minmax.first, 42);
......@@ -86,8 +88,9 @@ TEST(GatedSpectrometer, GatingKernel)
EXPECT_EQ(*minmax.second, 0);
}
// everything to G1
// everything to G1 // with baseline -5
{
baseLine[0] = -5. * G0.size();
thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L);
const int64_t *sideCD =
(int64_t *)(thrust::raw_pointer_cast(_sideChannelData.data()));
......@@ -95,11 +98,11 @@ TEST(GatedSpectrometer, GatingKernel)
thrust::raw_pointer_cast(G0.data()),
thrust::raw_pointer_cast(G1.data()), sideCD,
G0.size(), blockSize, 0, 1,
0);
0, thrust::raw_pointer_cast(baseLine.data()));
thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax;
minmax = thrust::minmax_element(G0.begin(), G0.end());
EXPECT_EQ(*minmax.first, 0);
EXPECT_EQ(*minmax.second, 0);
EXPECT_EQ(*minmax.first, -5.);
EXPECT_EQ(*minmax.second, -5.);
minmax = thrust::minmax_element(G1.begin(), G1.end());
EXPECT_EQ(*minmax.first, 42);
......@@ -149,6 +152,29 @@ TEST(GatedSpectrometer, countBitSet) {
}
TEST(GatedSpectrometer, array_sum) {
const size_t NBLOCKS = 16 * 32;
const size_t NTHREADS = 1024;
size_t inputLength = 1 << 22 + 1 ;
size_t dataLength = inputLength;
////zero pad input array
//if (inputLength % (2 * NTHREADS) != 0)
// dataLength = (inputLength / (2 * NTHREADS) + 1) * 2 * NTHREADS;
thrust::device_vector<float> data(dataLength);
thrust::fill(data.begin(), data.begin() + inputLength, 1);
//thrust::fill(data.begin() + inputLength, data.end(), 0);
thrust::device_vector<float> blr(NTHREADS * 2);
thrust::fill(blr.begin(), blr.end(), 0);
psrdada_cpp::effelsberg::edd::array_sum<<<NBLOCKS, NTHREADS, NTHREADS* sizeof(float)>>>(thrust::raw_pointer_cast(data.data()), data.size(), thrust::raw_pointer_cast(blr.data()));
psrdada_cpp::effelsberg::edd::array_sum<<<1, NTHREADS, NTHREADS* sizeof(float)>>>(thrust::raw_pointer_cast(blr.data()), blr.size(), thrust::raw_pointer_cast(blr.data()));
EXPECT_EQ(size_t(blr[0]), inputLength);
}
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment