diff --git a/psrdada_cpp/effelsberg/edd/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/CMakeLists.txt index 85972e96356eddfc7fe95b19965e0f5e3c1747af..7b5a0cb887f9edc765af79a8e28f2f5c7e511fc2 100644 --- a/psrdada_cpp/effelsberg/edd/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/CMakeLists.txt @@ -20,7 +20,7 @@ install(TARGETS fft_spectrometer DESTINATION bin) #Gated FFT spectrometer interface cuda_add_executable(gated_spectrometer src/GatedSpectrometer_cli.cu) -target_link_libraries(gated_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES}) +target_link_libraries(gated_spectrometer ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas) install(TARGETS gated_spectrometer DESTINATION bin) add_subdirectory(test) diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index 43b6306a092a854a2a4106760c5b07f5a0205ad8..7e13b494ea297247283b06edf908fb31cc5ce02c 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -11,6 +11,8 @@ #include "thrust/device_vector.h" #include "cufft.h" +#include "cublas_v2.h" + namespace psrdada_cpp { namespace effelsberg { namespace edd { @@ -122,6 +124,8 @@ private: thrust::device_vector<UnpackedVoltageType> _unpacked_voltage_G1; thrust::device_vector<ChannelisedVoltageType> _channelised_voltage; + thrust::device_vector<UnpackedVoltageType> _baseLineN; + DoublePinnedHostBuffer<IntegratedPowerType> _host_power_db; cudaStream_t _h2d_stream; @@ -135,9 +139,9 @@ private: * * @detail The resulting gaps are filled with zeros in the other stream. * - * @param GO Input data. Data is set to zero if corresponding + * @param GO Input data. Data is set to the baseline value if corresponding * sideChannelData bit at bitpos os set. - * @param G1 Data in this array is set to zero if corresponding + * @param G1 Data in this array is set to the baseline value if corresponding * sideChannelData bit at bitpos is not set. * @param sideChannelData noOfSideChannels items per block of heapSize * bytes in the input data. @@ -149,11 +153,24 @@ private: * data. * @param selectedSideChannel No. of side channel item to be eveluated. 0 <= selectedSideChannel < noOfSideChannels. - * */ __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, size_t N, size_t heapSize, size_t bitpos, - size_t noOfSideChannels, size_t selectedSideChannel); + size_t noOfSideChannels, size_t selectedSideChannel, const float *_baseLine); + + +/** + * @brief Sums all elements of an input array. + * + * @detail The results is stored in an array with one value per launch + * block. Full reuction thus requires two kernel launches. + * + * @param in. Input array. + * @param N. Size of input array. + * @param out. Output array. + * */ +__global__ void array_sum(float *in, size_t N, float *out); + } // edd diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index 5d1ab717e4b6bcff015f4ac267ab236f13f662b4..56e1247b8f79ecc6dce600230314d76f986b8b68 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -11,13 +11,13 @@ namespace psrdada_cpp { namespace effelsberg { namespace edd { - -__global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, +__global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const int64_t* __restrict__ sideChannelData, size_t N, size_t heapSize, size_t bitpos, - size_t noOfSideChannels, size_t selectedSideChannel) { - for (int i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + size_t noOfSideChannels, size_t selectedSideChannel, const float* __restrict__ _baseLineN) { + float baseLine = *_baseLineN / N; + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); i += blockDim.x * gridDim.x) { - const float w = G0[i]; + const float w = G0[i] - baseLine; const int64_t sideChannelItem = sideChannelData[((i / heapSize) * (noOfSideChannels)) + selectedSideChannel]; // Probably not optimal access as @@ -26,8 +26,8 @@ __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData, // handled by cache? const int bit_set = TEST_BIT(sideChannelItem, bitpos); - G1[i] = w * bit_set; - G0[i] = w * (!bit_set); + G1[i] = w * bit_set + baseLine; + G0[i] = w * (!bit_set) + baseLine; } } @@ -61,6 +61,43 @@ __global__ void countBitSet(const int64_t *sideChannelData, size_t N, size_t } +// blocksize for the array sum kernel +#define array_sum_Nthreads 1024 + +__global__ void array_sum(float *in, size_t N, float *out) { + extern __shared__ float data[]; + + size_t tid = threadIdx.x; + + float ls = 0; + + for (size_t i = blockIdx.x * blockDim.x + threadIdx.x; (i < N); + i += blockDim.x * gridDim.x) { + ls += in[i]; // + in[i + blockDim.x]; // loading two elements increase the used bandwidth by ~10% but requires matching blocksize and size of input array + } + + data[tid] = ls; + __syncthreads(); + + for (size_t i = blockDim.x / 2; i > 0; i /= 2) { + if (tid < i) { + data[tid] += data[tid + i]; + } + __syncthreads(); + } + + // unroll last warp + // if (tid < 32) + //{ + // warpReduce(data, tid); + //} + + if (tid == 0) { + out[blockIdx.x] = data[0]; + } +} + + template <class HandlerType> GatedSpectrometer<HandlerType>::GatedSpectrometer( std::size_t buffer_bytes, std::size_t nSideChannels, @@ -128,6 +165,8 @@ GatedSpectrometer<HandlerType>::GatedSpectrometer( << _raw_voltage_db.size(); _unpacked_voltage_G0.resize(nsamps_per_buffer); _unpacked_voltage_G1.resize(nsamps_per_buffer); + + _baseLineN.resize(array_sum_Nthreads); BOOST_LOG_TRIVIAL(debug) << " Unpacked voltages size (in samples): " << _unpacked_voltage_G0.size(); _channelised_voltage.resize(_nchans * batch); @@ -187,8 +226,9 @@ void GatedSpectrometer<HandlerType>::process( default: throw std::runtime_error("Unsupported number of bits"); } - // raw voltage buffer is free again - //CUDA_ERROR_CHECK(cudaEventRecord(_procB, _proc_stream)); + BOOST_LOG_TRIVIAL(debug) << "Calculate baseline"; + psrdada_cpp::effelsberg::edd::array_sum<<<64, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_unpacked_voltage_G0.data()), _unpacked_voltage_G0.size(), thrust::raw_pointer_cast(_baseLineN.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<1, array_sum_Nthreads, array_sum_Nthreads * sizeof(float), _proc_stream>>>(thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(), thrust::raw_pointer_cast(_baseLineN.data())); BOOST_LOG_TRIVIAL(debug) << "Perform gating"; gating<<<1024, 1024, 0, _proc_stream>>>( @@ -196,7 +236,7 @@ void GatedSpectrometer<HandlerType>::process( thrust::raw_pointer_cast(_unpacked_voltage_G1.data()), thrust::raw_pointer_cast(sideChannelData.data()), _unpacked_voltage_G0.size(), _speadHeapSize, _selectedBit, _nSideChannels, - _selectedSideChannel); + _selectedSideChannel, thrust::raw_pointer_cast(_baseLineN.data())); countBitSet<<<(sideChannelData.size()+255)/256, 256, 0, _proc_stream>>>(thrust::raw_pointer_cast(sideChannelData.data()), @@ -306,7 +346,9 @@ bool GatedSpectrometer<HandlerType>::operator()(RawBytes &block) { // The handler can't do anything asynchronously without a copy here // as it would be unsafe (given that it does not own the memory it // is being passed). - return _handler(bytes); + + _handler(bytes); + return false; // } // operator () } // edd diff --git a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu index abc0939c48ff36df72298bc7a8319ecb7c886da7..5db96005e79d5c0df7ecb58125f2b886d244ff4d 100644 --- a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu +++ b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu @@ -25,6 +25,7 @@ const size_t ERROR_UNHANDLED_EXCEPTION = 2; } // namespace + int main(int argc, char **argv) { try { key_t input_key; @@ -42,6 +43,7 @@ int main(int argc, char **argv) { char buffer[32]; std::strftime(buffer, 32, "%Y-%m-%d-%H:%M:%S.bp", ptm); std::string filename(buffer); + std::string output_type = "file"; /** Define and parse the program options */ @@ -55,25 +57,29 @@ int main(int argc, char **argv) { [&input_key](std::string in) { input_key = string_to_key(in); }), "The shared memory key for the dada buffer to connect to (hex " "string)"); - desc.add_options()("fft_length,n", po::value<int>(&fft_length)->required(), - "The length of the FFT to perform on the data"); - desc.add_options()("naccumulate,a", - po::value<int>(&naccumulate)->required(), - "The number of samples to integrate in each channel"); - desc.add_options()("nsidechannelitems,n", + desc.add_options()( + "output_type", po::value<std::string>(&output_type)->default_value(output_type), + "output type [dada, file]. Default is file." + ); + desc.add_options()( + "output_key,o", po::value<std::string>(&filename)->default_value(filename), + "The key of the output bnuffer / name of the output file to write spectra " + "to"); + + desc.add_options()("nsidechannelitems,s", po::value<size_t>()->default_value(0)->notifier( [&nSideChannels](size_t in) { nSideChannels = in; }), "Number of side channel items"); desc.add_options()( - "selected_sidechannel", + "selected_sidechannel,e", po::value<size_t>()->default_value(0)->notifier( [&selectedSideChannel](size_t in) { selectedSideChannel = in; }), "Side channel selected for evaluation."); - desc.add_options()("selected_bit", + desc.add_options()("selected_bit,B", po::value<size_t>()->default_value(63)->notifier( [&selectedBit](size_t in) { selectedBit = in; }), "Side channel selected for evaluation."); - desc.add_options()("speadheap_size,s", + desc.add_options()("speadheap_size", po::value<size_t>()->default_value(4096)->notifier( [&speadHeapSize](size_t in) { speadHeapSize = in; }), "size of the spead data heaps. The number of the " @@ -82,6 +88,14 @@ int main(int argc, char **argv) { desc.add_options()("nbits,b", po::value<int>(&nbits)->required(), "The number of bits per sample in the " "packetiser output (8 or 12)"); + + + + desc.add_options()("fft_length,n", po::value<int>(&fft_length)->required(), + "The length of the FFT to perform on the data"); + desc.add_options()("naccumulate,a", + po::value<int>(&naccumulate)->required(), + "The number of samples to integrate in each channel"); desc.add_options()("input_level", po::value<float>(&input_level)->required(), "The input power level (standard " @@ -91,10 +105,6 @@ int main(int argc, char **argv) { "The output power level (standard " "deviation, used for 8-bit " "conversion)"); - desc.add_options()( - "outfile,o", po::value<std::string>(&filename)->default_value(filename), - "The output file to write spectra " - "to"); desc.add_options()( "log_level", po::value<std::string>()->default_value("info")->notifier( [](std::string level) { set_log_level(level); }), @@ -114,7 +124,12 @@ int main(int argc, char **argv) { << desc << std::endl; return SUCCESS; } + po::notify(vm); + if (vm .count("output_type") && (!(output_type == "dada" || output_type == "file") )) + { + throw po::validation_error(po::validation_error::invalid_option_value, "output_type", output_type); + } } catch (po::error &e) { std::cerr << "ERROR: " << e.what() << std::endl << std::endl; std::cerr << desc << std::endl; @@ -129,14 +144,36 @@ int main(int argc, char **argv) { //client.cuda_register_memory(); std::size_t buffer_bytes = client.data_buffer_size(); - SimpleFileWriter sink(filename); - effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer( - buffer_bytes, nSideChannels, selectedSideChannel, selectedBit, - speadHeapSize, fft_length, naccumulate, nbits, input_level, - output_level, sink); - DadaInputStream<decltype(spectrometer)> istream(input_key, log, - spectrometer); - istream.start(); + std::cout << "Running with output_type: " << output_type << std::endl; + if (output_type == "file") + { + + SimpleFileWriter sink(filename); + effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer( + buffer_bytes, nSideChannels, selectedSideChannel, selectedBit, + speadHeapSize, fft_length, naccumulate, nbits, input_level, + output_level, sink); + DadaInputStream<decltype(spectrometer)> istream(input_key, log, + spectrometer); + istream.start(); + } + else if (output_type == "dada") + { + DadaOutputStream sink(string_to_key(filename), log); + effelsberg::edd::GatedSpectrometer<decltype(sink)> spectrometer( + buffer_bytes, nSideChannels, selectedSideChannel, selectedBit, + speadHeapSize, fft_length, naccumulate, nbits, input_level, + output_level, sink); + DadaInputStream<decltype(spectrometer)> istream(input_key, log, + spectrometer); + istream.start(); + } + else + { + throw std::runtime_error("Unknown oputput-type"); + } + + /** * End of application code */ diff --git a/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt b/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt index c23c817492de209a2a06c2e94a7b5fe48f3e602c..839675860f0ed1ae6daca0414aff2f9c4b5e1dc5 100644 --- a/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt +++ b/psrdada_cpp/effelsberg/edd/test/CMakeLists.txt @@ -12,5 +12,5 @@ set( src/GatedSpectrometerTest.cu ) cuda_add_executable(gtest_edd ${gtest_edd_src} ) -target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES}) +target_link_libraries(gtest_edd ${PSRDADA_CPP_EFFELSBERG_EDD_LIBRARIES} ${CUDA_CUFFT_LIBRARIES} -lcublas) add_test(gtest_edd gtest_edd --test_data "${CMAKE_CURRENT_LIST_DIR}/data") diff --git a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu index 6eb5e46ea3f9b7b7e11d6a2979e3248a23d05ed6..f5dee4bd1ae4d452e882e22e94c253a5faa5ec6b 100644 --- a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu +++ b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu @@ -9,7 +9,7 @@ namespace { -TEST(BitManipulationMacros, SetBit_TestBit) { +TEST(GatedSpectrometer, BitManipulationMacros) { for (int i = 0; i < 64; i++) { int64_t v = 0; SET_BIT(v, i); @@ -62,6 +62,7 @@ TEST(GatedSpectrometer, GatingKernel) thrust::device_vector<float> G0(blockSize * nBlocks); thrust::device_vector<float> G1(blockSize * nBlocks); thrust::device_vector<int64_t> _sideChannelData(nBlocks); + thrust::device_vector<float> baseLine(1); thrust::fill(G0.begin(), G0.end(), 42); thrust::fill(G1.begin(), G1.end(), 23); @@ -69,13 +70,14 @@ TEST(GatedSpectrometer, GatingKernel) // everything to G0 { + baseLine[0] = 0.; const int64_t *sideCD = (int64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); psrdada_cpp::effelsberg::edd::gating<<<1024, 1024>>>( thrust::raw_pointer_cast(G0.data()), thrust::raw_pointer_cast(G1.data()), sideCD, G0.size(), blockSize, 0, 1, - 0); + 0, thrust::raw_pointer_cast(baseLine.data())); thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; minmax = thrust::minmax_element(G0.begin(), G0.end()); EXPECT_EQ(*minmax.first, 42); @@ -86,8 +88,9 @@ TEST(GatedSpectrometer, GatingKernel) EXPECT_EQ(*minmax.second, 0); } - // everything to G1 + // everything to G1 // with baseline -5 { + baseLine[0] = -5. * G0.size(); thrust::fill(_sideChannelData.begin(), _sideChannelData.end(), 1L); const int64_t *sideCD = (int64_t *)(thrust::raw_pointer_cast(_sideChannelData.data())); @@ -95,11 +98,11 @@ TEST(GatedSpectrometer, GatingKernel) thrust::raw_pointer_cast(G0.data()), thrust::raw_pointer_cast(G1.data()), sideCD, G0.size(), blockSize, 0, 1, - 0); + 0, thrust::raw_pointer_cast(baseLine.data())); thrust::pair<thrust::device_vector<float>::iterator, thrust::device_vector<float>::iterator> minmax; minmax = thrust::minmax_element(G0.begin(), G0.end()); - EXPECT_EQ(*minmax.first, 0); - EXPECT_EQ(*minmax.second, 0); + EXPECT_EQ(*minmax.first, -5.); + EXPECT_EQ(*minmax.second, -5.); minmax = thrust::minmax_element(G1.begin(), G1.end()); EXPECT_EQ(*minmax.first, 42); @@ -149,6 +152,29 @@ TEST(GatedSpectrometer, countBitSet) { } +TEST(GatedSpectrometer, array_sum) { + + const size_t NBLOCKS = 16 * 32; + const size_t NTHREADS = 1024; + + size_t inputLength = 1 << 22 + 1 ; + size_t dataLength = inputLength; + ////zero pad input array + //if (inputLength % (2 * NTHREADS) != 0) + // dataLength = (inputLength / (2 * NTHREADS) + 1) * 2 * NTHREADS; + thrust::device_vector<float> data(dataLength); + thrust::fill(data.begin(), data.begin() + inputLength, 1); + //thrust::fill(data.begin() + inputLength, data.end(), 0); + thrust::device_vector<float> blr(NTHREADS * 2); + + thrust::fill(blr.begin(), blr.end(), 0); + + psrdada_cpp::effelsberg::edd::array_sum<<<NBLOCKS, NTHREADS, NTHREADS* sizeof(float)>>>(thrust::raw_pointer_cast(data.data()), data.size(), thrust::raw_pointer_cast(blr.data())); + psrdada_cpp::effelsberg::edd::array_sum<<<1, NTHREADS, NTHREADS* sizeof(float)>>>(thrust::raw_pointer_cast(blr.data()), blr.size(), thrust::raw_pointer_cast(blr.data())); + + EXPECT_EQ(size_t(blr[0]), inputLength); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv);