diff --git a/psrdada_cpp/effelsberg/edd/FftSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/FftSpectrometer.cuh index 465e33b9042aa0483ef59a6f4bbe0144f9875578..d70a438779b3beeec098742a8a22d4bb75f94a59 100644 --- a/psrdada_cpp/effelsberg/edd/FftSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/FftSpectrometer.cuh @@ -28,8 +28,6 @@ public: std::size_t fft_length, std::size_t naccumulate, std::size_t nbits, - float input_level, - float output_level, HandlerType& handler); ~FftSpectrometer(); diff --git a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh index 3870f0fbd62297e13e0f1ee7d1fe4ffab45c2bbe..d6cf309f54ec8e8f90afa5f6482098cf9496628a 100644 --- a/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh +++ b/psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh @@ -14,7 +14,7 @@ #include "cublas_v2.h" - +#include <bitset> #include <iostream> #include <iomanip> #include <cstring> @@ -269,6 +269,39 @@ struct GatedFullStokesOutput: public OutputDataStream +struct GatedSpectrometerInputParameters + { + /** + * @brief GatedSpectrometerInputParameters defining behavior of GatedSpectrometer + * + * @param buffer_bytes A RawBytes object wrapping a DADA header buffer + * @param nSideChannels Number of side channel items in the data stream, + * @param selectedSideChannel Side channel item used for gating + * @param selectedBit bit of side channel item used for gating + * @param speadHeapSize Size of the spead heap block. + * @param fftLength Size of the FFT + * @param naccumulate Number of samples to integrate in the individual + * FFT bins + * @param nbits Bit depth of the sampled signal + * @param handler Output handler + * + */ + + effelsberg::edd::DadaBufferLayout dadaBufferLayout; + size_t selectedSideChannel; + size_t speadHeapSize; + size_t nSideChannels; + size_t selectedBit; + size_t fft_length; + size_t naccumulate; + unsigned int nbits; + std::bitset<2> active_gates; + + GatedSpectrometerInputParameters(): selectedSideChannel(0), speadHeapSize(0), nSideChannels(0), selectedBit(0), fft_length(0), naccumulate(0), nbits(0), active_gates(3) {}; // Default both gates active + }; + + + /** @class GatedSpectrometer @@ -280,25 +313,12 @@ template <class HandlerType, class OutputType > class GatedSpectrometer { public: + /** * @brief Constructor - * - * @param buffer_bytes A RawBytes object wrapping a DADA header buffer - * @param nSideChannels Number of side channel items in the data stream, - * @param selectedSideChannel Side channel item used for gating - * @param selectedBit bit of side channel item used for gating - * @param speadHeapSize Size of the spead heap block. - * @param fftLength Size of the FFT - * @param naccumulate Number of samples to integrate in the individual - * FFT bins - * @param nbits Bit depth of the sampled signal - * @param handler Output handler - * + */ - GatedSpectrometer(const DadaBufferLayout &bufferLayout, - std::size_t selectedSideChannel, std::size_t selectedBit, - std::size_t fft_length, - std::size_t naccumulate, std::size_t nbits, + GatedSpectrometer(const GatedSpectrometerInputParameters &ip, HandlerType &handler); ~GatedSpectrometer(); @@ -346,12 +366,14 @@ private: thrust::device_vector<uint64_cu> &_noOfBitSetsIn_G1); DadaBufferLayout _dadaBufferLayout; + std::size_t _fft_length; std::size_t _naccumulate; std::size_t _selectedSideChannel; std::size_t _selectedBit; std::size_t _batch; std::size_t _nsamps_per_heap; + std::bitset<2> _active_gates; HandlerType &_handler; cufftHandle _fft_plan; diff --git a/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu index 1f17862a8795a72b3c6177ab9f3e3cac12817401..2e825b535419ea24c20528d89e4674b8b64f78a2 100644 --- a/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/FftSpectrometer.cu @@ -15,8 +15,6 @@ FftSpectrometer<HandlerType>::FftSpectrometer( std::size_t fft_length, std::size_t naccumulate, std::size_t nbits, - float input_level, - float output_level, HandlerType& handler) : _buffer_bytes(buffer_bytes) , _fft_length(fft_length) @@ -25,9 +23,12 @@ FftSpectrometer<HandlerType>::FftSpectrometer( , _handler(handler) , _fft_plan(0) , _call_count(0) + , _scaling(1.) + , _offset(0.) + { - assert(((_nbits == 12) || (_nbits == 8))); + assert(((_nbits == 12) || (_nbits == 8) || (_nbits == 10))); BOOST_LOG_TRIVIAL(debug) << "Creating new FftSpectrometer instance with parameters: \n" << "fft_length = " << _fft_length << "\n" @@ -38,11 +39,12 @@ FftSpectrometer<HandlerType>::FftSpectrometer( _nchans = _fft_length / 2 + 1; int batch = nsamps_per_buffer/_fft_length; BOOST_LOG_TRIVIAL(debug) << "Calculating scales and offsets"; - float dof = 2 * _naccumulate; - float scale = std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2); - _offset = scale * dof; - _scaling = scale * std::sqrt(2 * dof) / output_level; - BOOST_LOG_TRIVIAL(debug) << "Correction factors for 8-bit conversion: offset = " << _offset << ", scaling = " << _scaling; + + //float dof = 2 * _naccumulate; + //float scale = std::pow(input_level * std::sqrt(static_cast<float>(_nchans)), 2); + //_offset = scale * dof; + //_scaling = scale * std::sqrt(2 * dof) / output_level; + //BOOST_LOG_TRIVIAL(debug) << "Correction factors for 8-bit conversion: offset = " << _offset << ", scaling = " << _scaling; BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan"; int n[] = {static_cast<int>(_fft_length)}; CUFFT_ERROR_CHECK(cufftPlanMany(&_fft_plan, 1, n, NULL, 1, _fft_length, @@ -95,6 +97,7 @@ void FftSpectrometer<HandlerType>::process( switch (_nbits) { case 8: _unpacker->unpack<8>(digitiser_raw, _unpacked_voltage); break; + case 10: _unpacker->unpack<10>(digitiser_raw, _unpacked_voltage); break; case 12: _unpacker->unpack<12>(digitiser_raw, _unpacked_voltage); break; default: throw std::runtime_error("Unsupported number of bits"); } @@ -175,7 +178,8 @@ bool FftSpectrometer<HandlerType>::operator()(RawBytes& block) // The handler can't do anything asynchronously without a copy here // as it would be unsafe (given that it does not own the memory it // is being passed). - return _handler(bytes); + _handler(bytes); + return false; } } //edd diff --git a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu index f32326097a9100ec512b4d7b43c75485814cf52c..7a33bebab460b8ae91a0ba0cde6b985677a26421 100644 --- a/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu +++ b/psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu @@ -65,17 +65,14 @@ __global__ void update_baselines(float* __restrict__ baseLineG0, template <class HandlerType, class InputType, class OutputType> GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer( - const DadaBufferLayout &dadaBufferLayout, std::size_t selectedSideChannel, - std::size_t selectedBit, std::size_t fft_length, std::size_t naccumulate, - std::size_t nbits, HandlerType - &handler) : _dadaBufferLayout(dadaBufferLayout), - _selectedSideChannel(selectedSideChannel), _selectedBit(selectedBit), - _fft_length(fft_length), _naccumulate(naccumulate), + const GatedSpectrometerInputParameters &ip, HandlerType &handler) : _dadaBufferLayout(ip.dadaBufferLayout), + _selectedSideChannel(ip.selectedSideChannel), _selectedBit(ip.selectedBit), _active_gates(ip.active_gates), + _fft_length(ip.fft_length), _naccumulate(ip.naccumulate), _handler(handler), _fft_plan(0), _call_count(0), _nsamps_per_heap(4096) { // Sanity checks - assert(((nbits == 12) || (nbits == 8) || (nbits == 10))); + assert(((ip.nbits == 12) || (ip.nbits == 8) || (ip.nbits == 10))); assert(_naccumulate > 0); // check for any device errors @@ -92,16 +89,16 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer( << " output bit depth " << sizeof(IntegratedPowerType) * 8; assert((_dadaBufferLayout.getNSideChannels() == 0) || - (selectedSideChannel < _dadaBufferLayout.getNSideChannels())); // Sanity check of side channel value - assert(selectedBit < 64); // Sanity check of selected bit + (_selectedSideChannel < _dadaBufferLayout.getNSideChannels())); // Sanity check of side channel value + assert(_selectedBit < 64); // Sanity check of selected bit _nchans = _fft_length / 2 + 1; - inputDataStream = new InputType(fft_length, nbits, _dadaBufferLayout); + inputDataStream = new InputType(_fft_length, ip.nbits, _dadaBufferLayout); //How many output spectra per input block? - size_t nsamps_per_output_spectra = fft_length * naccumulate; + size_t nsamps_per_output_spectra = _fft_length * _naccumulate; size_t nsamps_per_pol = inputDataStream->getSamplesPerInputPolarization(); BOOST_LOG_TRIVIAL(debug) << "Samples per input polarization: " << nsamps_per_pol; @@ -126,7 +123,7 @@ GatedSpectrometer<HandlerType, InputType, OutputType>::GatedSpectrometer( // plan the FFT - size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / nbits; + size_t nsamps_per_buffer = _dadaBufferLayout.sizeOfData() * 8 / ip.nbits; int batch = nsamps_per_pol / _fft_length; int n[] = {static_cast<int>(_fft_length)}; BOOST_LOG_TRIVIAL(debug) << "Generating FFT plan:\n" @@ -266,23 +263,29 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::gated_fft( _noOfBitSetsIn_G0.size() ); - BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1"; - BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage"; - UnpackedVoltageType *_unpacked_voltage_ptr = - thrust::raw_pointer_cast(_unpacked_voltage_G0.data()); - BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage"; - ChannelisedVoltageType *_channelised_voltage_ptr = - thrust::raw_pointer_cast(data._channelised_voltage_G0.data()); - - CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, - (cufftComplex *)_channelised_voltage_ptr)); + if (_active_gates[0]) + { - BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2"; - _unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data()); - _channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data()); - CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, - (cufftComplex *)_channelised_voltage_ptr)); + BOOST_LOG_TRIVIAL(debug) << "Performing FFT 1"; + BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage"; + UnpackedVoltageType *_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G0.data()); + BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage"; + ChannelisedVoltageType *_channelised_voltage_ptr = + thrust::raw_pointer_cast(data._channelised_voltage_G0.data()); + CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, + (cufftComplex *)_channelised_voltage_ptr)); + } + if (_active_gates[1]) + { + BOOST_LOG_TRIVIAL(debug) << "Performing FFT 2"; + BOOST_LOG_TRIVIAL(debug) << "Accessing unpacked voltage"; + UnpackedVoltageType *_unpacked_voltage_ptr = thrust::raw_pointer_cast(_unpacked_voltage_G1.data()); + BOOST_LOG_TRIVIAL(debug) << "Accessing channelized voltage"; + ChannelisedVoltageType *_channelised_voltage_ptr = thrust::raw_pointer_cast(data._channelised_voltage_G1.data()); + CUFFT_ERROR_CHECK(cufftExecR2C(_fft_plan, (cufftReal *)_unpacked_voltage_ptr, + (cufftComplex *)_channelised_voltage_ptr)); + } // CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream)); // BOOST_LOG_TRIVIAL(debug) << "Exit processing"; } // process @@ -367,21 +370,27 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::process(SinglePolari { gated_fft(*inputDataStream, outputDataStream->G0._noOfBitSets.a(), outputDataStream->G1._noOfBitSets.a()); - kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( - thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()), - thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()), - _nchans, - inputDataStream->_channelised_voltage_G0.size(), - _naccumulate / _nBlocks, - 1, 0., 1, 0); - - kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( - thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G1.data()), - thrust::raw_pointer_cast(outputDataStream->G1.data.a().data()), - _nchans, - inputDataStream->_channelised_voltage_G1.size(), - _naccumulate / _nBlocks, - 1, 0., 1, 0); + if (_active_gates[0]) + { + kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G0.data()), + thrust::raw_pointer_cast(outputDataStream->G0.data.a().data()), + _nchans, + inputDataStream->_channelised_voltage_G0.size(), + _naccumulate / _nBlocks, + 1, 0., 1, 0); + } + + if (_active_gates[1]) + { + kernels::detect_and_accumulate<IntegratedPowerType> <<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(inputDataStream->_channelised_voltage_G1.data()), + thrust::raw_pointer_cast(outputDataStream->G1.data.a().data()), + _nchans, + inputDataStream->_channelised_voltage_G1.size(), + _naccumulate / _nBlocks, + 1, 0., 1, 0); + } // count saturated samples for(size_t output_block_number = 0; output_block_number < outputDataStream->G0._noOfOverflowed.size(); output_block_number++) @@ -432,6 +441,9 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::process(DualPolariza size_t input_offset = output_block_number * inputDataStream->polarization0._channelised_voltage_G0.size() / outputDataStream->G0._noOfBitSets.size(); size_t output_offset = output_block_number * outputDataStream->G0.I.a().size() / outputDataStream->G0._noOfBitSets.size(); BOOST_LOG_TRIVIAL(debug) << "Accumulating data for output block " << output_block_number << " with input offset " << input_offset << " and output_offset " << output_offset; + + if (_active_gates[0]) + { stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>( thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G0.data() + input_offset), thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G0.data() + input_offset), @@ -441,16 +453,19 @@ void GatedSpectrometer<HandlerType, InputType, OutputType>::process(DualPolariza thrust::raw_pointer_cast(outputDataStream->G0.V.a().data() + output_offset), _nchans, _naccumulate / _nBlocks ); + } - stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>( - thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G1.data() + input_offset), - thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G1.data() + input_offset), - thrust::raw_pointer_cast(outputDataStream->G1.I.a().data() + output_offset), - thrust::raw_pointer_cast(outputDataStream->G1.Q.a().data() + output_offset), - thrust::raw_pointer_cast(outputDataStream->G1.U.a().data() + output_offset), - thrust::raw_pointer_cast(outputDataStream->G1.V.a().data() + output_offset), - _nchans, _naccumulate / _nBlocks - ); + if (_active_gates[1]){ + stokes_accumulate<<<1024, 1024, 0, _proc_stream>>>( + thrust::raw_pointer_cast(inputDataStream->polarization0._channelised_voltage_G1.data() + input_offset), + thrust::raw_pointer_cast(inputDataStream->polarization1._channelised_voltage_G1.data() + input_offset), + thrust::raw_pointer_cast(outputDataStream->G1.I.a().data() + output_offset), + thrust::raw_pointer_cast(outputDataStream->G1.Q.a().data() + output_offset), + thrust::raw_pointer_cast(outputDataStream->G1.U.a().data() + output_offset), + thrust::raw_pointer_cast(outputDataStream->G1.V.a().data() + output_offset), + _nchans, _naccumulate / _nBlocks + ); + } // count saturated samples outputDataStream->G0._noOfOverflowed.a().data()[output_block_number] = 0; diff --git a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu index be8bed6c8ee845c5d73099dce261cae36ae87093..1279307bece3f1bfe81880197e05cadb9f8de7eb 100644 --- a/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu +++ b/psrdada_cpp/effelsberg/edd/src/GatedSpectrometer_cli.cu @@ -24,60 +24,42 @@ const size_t ERROR_UNHANDLED_EXCEPTION = 2; } // namespace -struct GatedSpectrometerInputParameters -{ - effelsberg::edd::DadaBufferLayout dadaBufferLayout; - size_t selectedSideChannel; - size_t speadHeapSize; - size_t nSideChannels; - size_t selectedBit; - size_t fft_length; - size_t naccumulate; - unsigned int nbits; - std::string filename; - std::string output_type; -}; - template<typename T, class InputType, class OutputType > -void launchSpectrometer(const GatedSpectrometerInputParameters &i) +void launchSpectrometer(const effelsberg::edd::GatedSpectrometerInputParameters &i, const std::string &filename, const std::string &output_type) { MultiLog log("DadaBufferLayout"); - std::cout << "Running with output_type: " << i.output_type << std::endl; - if (i.output_type == "file") + std::cout << "Running with output_type: " << output_type << std::endl; + if (output_type == "file") { - SimpleFileWriter sink(i.filename); + SimpleFileWriter sink(filename); effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> - spectrometer(i.dadaBufferLayout, - i.selectedSideChannel, i.selectedBit, - i.fft_length, i.naccumulate, i.nbits, sink); + spectrometer(i, sink); DadaInputStream<decltype(spectrometer)> istream(i.dadaBufferLayout.getInputkey(), log, spectrometer); istream.start(); } - else if (i.output_type == "dada") + else if (output_type == "dada") { - DadaOutputStream sink(string_to_key(i.filename), log); - effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> spectrometer(i.dadaBufferLayout, - i.selectedSideChannel, i.selectedBit, - i.fft_length, i.naccumulate, i.nbits, sink); + DadaOutputStream sink(string_to_key(filename), log); + effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> + spectrometer(i, sink); DadaInputStream<decltype(spectrometer)> istream(i.dadaBufferLayout.getInputkey(), log, spectrometer); istream.start(); } - else if (i.output_type == "profile") + else if (output_type == "profile") { NullSink sink; - effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> spectrometer(i.dadaBufferLayout, - i.selectedSideChannel, i.selectedBit, - i.fft_length, i.naccumulate, i.nbits, sink); + effelsberg::edd::GatedSpectrometer<decltype(sink), InputType, OutputType> + spectrometer(i, sink); std::vector<char> buffer(i.dadaBufferLayout.getBufferSize()); cudaHostRegister(buffer.data(), buffer.size(), cudaHostRegisterPortable); @@ -97,12 +79,12 @@ void launchSpectrometer(const GatedSpectrometerInputParameters &i) } -template<typename T> void io_eval(const GatedSpectrometerInputParameters &inputParameters, const std::string &input_polarizations, const std::string &output_format) +template<typename T> void io_eval(const effelsberg::edd::GatedSpectrometerInputParameters &inputParameters, const std::string &input_polarizations, const std::string &output_format, const std::string &filename, const std::string &output_type) { if (input_polarizations == "Single" && output_format == "Power") { launchSpectrometer<T, effelsberg::edd::SinglePolarizationInput, - effelsberg::edd::GatedPowerSpectrumOutput>(inputParameters); + effelsberg::edd::GatedPowerSpectrumOutput>(inputParameters, filename, output_type); } else if (input_polarizations == "Dual" && output_format == "Power") { @@ -111,7 +93,7 @@ template<typename T> void io_eval(const GatedSpectrometerInputParameters &inputP else if (input_polarizations == "Dual" && output_format == "Stokes") { launchSpectrometer<T, effelsberg::edd::DualPolarizationInput, - effelsberg::edd::GatedFullStokesOutput>(inputParameters); + effelsberg::edd::GatedFullStokesOutput>(inputParameters, filename, output_type); } else { @@ -128,7 +110,7 @@ int main(int argc, char **argv) { try { key_t input_key; - GatedSpectrometerInputParameters ip; + effelsberg::edd::GatedSpectrometerInputParameters ip; std::time_t now = std::time(NULL); std::tm *ptm = std::localtime(&now); char default_filename[32]; @@ -136,7 +118,8 @@ int main(int argc, char **argv) { std::string input_polarizations = "Single"; std::string output_format = "Power"; - + std::string filename; + std::string output_type; /** Define and parse the program options */ namespace po = boost::program_options; @@ -150,11 +133,11 @@ int main(int argc, char **argv) { "The shared memory key for the dada buffer to connect to (hex " "string)"); desc.add_options()( - "output_type", po::value<std::string>(&ip.output_type)->default_value("file"), + "output_type", po::value<std::string>(&output_type)->default_value("file"), "output type [dada, file, profile]. Default is file. Profile executes the spectrometer 10x on random data and passes the ouput to a null sink." ); desc.add_options()( - "output_key,o", po::value<std::string>(&ip.filename)->default_value(default_filename), + "output_key,o", po::value<std::string>(&filename)->default_value(default_filename), "The key of the output bnuffer / name of the output file to write spectra " "to"); @@ -194,6 +177,11 @@ int main(int argc, char **argv) { desc.add_options()("naccumulate,a", po::value<size_t>(&ip.naccumulate)->required(), "The number of samples to integrate in each channel"); + desc.add_options()("disable_gate,d", + po::value<uint8_t>()->notifier( + [&ip](size_t in) { ip.active_gates.set(in, false); }), + "Disable processing of ND state 0 or 1."); + desc.add_options()( "log_level", po::value<std::string>()->default_value("info")->notifier( [](std::string level) { set_log_level(level); }), @@ -215,9 +203,9 @@ int main(int argc, char **argv) { } po::notify(vm); - if (vm.count("output_type") && (!(ip.output_type == "dada" || ip.output_type == "file" || ip.output_type== "profile") )) + if (vm.count("output_type") && (!(output_type == "dada" || output_type == "file" || output_type== "profile") )) { - throw po::validation_error(po::validation_error::invalid_option_value, "output_type", ip.output_type); + throw po::validation_error(po::validation_error::invalid_option_value, "output_type", output_type); } if (vm.count("input_polarizations") && (!(input_polarizations == "Single" || input_polarizations == "Dual") )) @@ -248,7 +236,7 @@ int main(int argc, char **argv) { ip.dadaBufferLayout.intitialize(input_key, ip.speadHeapSize, ip.nSideChannels); - io_eval<float>(ip, input_polarizations, output_format); + io_eval<float>(ip, input_polarizations, output_format, filename, output_type ); } catch (std::exception &e) { std::cerr << "Unhandled Exception reached the top of main: " << e.what() diff --git a/psrdada_cpp/effelsberg/edd/src/fft_spectrometer_cli.cu b/psrdada_cpp/effelsberg/edd/src/fft_spectrometer_cli.cu index 6bab2402935dfed64a7875867d5fe6928d65debe..aace99b85c6afdced8896d98eefe9dad4b9bc3ca 100644 --- a/psrdada_cpp/effelsberg/edd/src/fft_spectrometer_cli.cu +++ b/psrdada_cpp/effelsberg/edd/src/fft_spectrometer_cli.cu @@ -26,51 +26,56 @@ int main(int argc, char** argv) try { key_t input_key; + int fft_length; int naccumulate; int nbits; - float input_level; - float output_level; + std::time_t now = std::time(NULL); std::tm * ptm = std::localtime(&now); - char buffer[32]; - std::strftime(buffer, 32, "%Y-%m-%d-%H:%M:%S.bp", ptm); - std::string filename(buffer); + char default_filename[32]; + std::strftime(default_filename, 32, "%Y-%m-%d-%H:%M:%S.bp", ptm); + std::string filename(default_filename); + std::string output_type; /** Define and parse the program options */ namespace po = boost::program_options; po::options_description desc("Options"); desc.add_options() + ("help,h", "Print help messages"); - ("help,h", "Print help messages") - ("input_key,i", po::value<std::string>() + desc.add_options()("input_key,i", po::value<std::string>() ->default_value("dada") ->notifier([&input_key](std::string in) { input_key = string_to_key(in); }), - "The shared memory key for the dada buffer to connect to (hex string)") + "The shared memory key for the dada buffer to connect to (hex string)"); + desc.add_options() ("fft_length,n", po::value<int>(&fft_length)->required(), - "The length of the FFT to perform on the data") + "The length of the FFT to perform on the data"); + desc.add_options() ("naccumulate,a", po::value<int>(&naccumulate)->required(), - "The number of samples to integrate in each channel") + "The number of samples to integrate in each channel"); + desc.add_options() ("nbits,b", po::value<int>(&nbits)->required(), - "The number of bits per sample in the packetiser output (8 or 12)") - - ("input_level", po::value<float>(&input_level)->required(), - "The input power level (standard deviation, used for 8-bit conversion)") + "The number of bits per sample in the packetiser output (8, 10, or 12)"); - ("output_level", po::value<float>(&output_level)->required(), - "The output power level (standard deviation, used for 8-bit conversion)") - - ("outfile,o", po::value<std::string>(&filename) - ->default_value(filename), - "The output file to write spectra to") + desc.add_options()( + "output_type", po::value<std::string>(&output_type)->default_value("file"), + "output type [dada, file, profile]. Default is file. Profile executes the " + " spectrometer 10x on random data and passes the ouput to a null sink." + ); + desc.add_options()( + "output_key,o", po::value<std::string>(&filename)->default_value(default_filename), + "The key of the output bnuffer / name of the output file to write spectra " + "to"); + desc.add_options() ("log_level", po::value<std::string>() ->default_value("info") ->notifier([](std::string level) @@ -90,6 +95,11 @@ int main(int argc, char** argv) return SUCCESS; } po::notify(vm); + if (vm.count("output_type") && (!(output_type == "dada" || output_type == "file" || output_type== "profile") )) + { + throw po::validation_error(po::validation_error::invalid_option_value, "output_type", output_type); + } + } catch(po::error& e) { @@ -103,11 +113,41 @@ int main(int argc, char** argv) MultiLog log("edd::FftSpectrometer"); DadaClientBase client(input_key, log); std::size_t buffer_bytes = client.data_buffer_size(); - SimpleFileWriter sink(filename); - //NullSink sink; - effelsberg::edd::FftSpectrometer<decltype(sink)> spectrometer(buffer_bytes, fft_length, naccumulate, nbits, input_level, output_level, sink); - DadaInputStream<decltype(spectrometer)> istream(input_key, log, spectrometer); - istream.start(); + + + if (output_type == "file"){ + + SimpleFileWriter sink(filename); + //NullSink sink; + effelsberg::edd::FftSpectrometer<decltype(sink)> spectrometer(buffer_bytes, fft_length, naccumulate, nbits, sink); + + DadaInputStream<decltype(spectrometer)> istream(input_key, log, spectrometer); + istream.start(); + } else if (output_type == "dada"){ + DadaOutputStream sink(string_to_key(filename), log); + + effelsberg::edd::FftSpectrometer<decltype(sink)> spectrometer(buffer_bytes, fft_length, naccumulate, nbits, sink); + + DadaInputStream<decltype(spectrometer)> istream(input_key, log, spectrometer); + istream.start(); + } else if (output_type == "profile") { + + NullSink sink; + effelsberg::edd::FftSpectrometer<decltype(sink)> spectrometer(buffer_bytes, fft_length, naccumulate, nbits, sink); + std::vector<char> buffer(fft_length * naccumulate * nbits / 8); + cudaHostRegister(buffer.data(), buffer.size(), cudaHostRegisterPortable); + RawBytes ib(buffer.data(), buffer.size(), buffer.size()); + spectrometer.init(ib); + for (int i =0; i< 10; i++) + { + std::cout << "Profile Block: "<< i +1 << std::endl; + spectrometer(ib); + } + + DadaInputStream<decltype(spectrometer)> istream(input_key, log, spectrometer); + istream.start(); + } + /** * End of application code */ diff --git a/psrdada_cpp/effelsberg/edd/test/src/FftSpectrometerTester.cu b/psrdada_cpp/effelsberg/edd/test/src/FftSpectrometerTester.cu index c82f3ebe9e5efa5ca7b82ff4d8ceb67e592b5be1..2ccb98ccc02b6facb30069cf8ac365629216b5b5 100644 --- a/psrdada_cpp/effelsberg/edd/test/src/FftSpectrometerTester.cu +++ b/psrdada_cpp/effelsberg/edd/test/src/FftSpectrometerTester.cu @@ -31,18 +31,18 @@ void FftSpectrometerTester::TearDown() } void FftSpectrometerTester::performance_test( - std::size_t fft_length, std::size_t tscrunch, + std::size_t fft_length, std::size_t tscrunch, std::size_t nsamps_out, std::size_t nbits) { std::size_t input_block_bytes = tscrunch * fft_length * nsamps_out * nbits/8; - + DoublePinnedHostBuffer<char> input_block; - input_block.resize(input_block_bytes); + input_block.resize(input_block_bytes); RawBytes input_raw_bytes(input_block.a_ptr(), input_block_bytes, input_block_bytes); std::vector<char> header_block(4096); RawBytes header_raw_bytes(header_block.data(), 4096, 4096); NullSink null_sink; - FftSpectrometer<NullSink> spectrometer(input_block_bytes, fft_length, tscrunch, nbits, 16.0f, 16.0f, null_sink); + FftSpectrometer<NullSink> spectrometer(input_block_bytes, fft_length, tscrunch, nbits, null_sink); spectrometer.init(header_raw_bytes); for (int ii = 0; ii < 100; ++ii) { @@ -54,6 +54,8 @@ void FftSpectrometerTester::performance_test( TEST_F(FftSpectrometerTester, simple_exec_test) { performance_test(1024, 16, 128, 12); + performance_test(1024, 16, 128, 8); + performance_test(1024, 16, 128, 10); } } //namespace test diff --git a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu index e352aea3580735c76d4d2d301417c97aa1610e48..b1cfbf77d771d17f1ecb74ba04937d8343a8a442 100644 --- a/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu +++ b/psrdada_cpp/effelsberg/edd/test/src/GatedSpectrometerTest.cu @@ -313,29 +313,29 @@ class GatedTestSinkSinglePol{ TEST(GatedSpectrometer, processingSinglePol) { - const size_t nbits = 8; const size_t nHeaps = 1024; - const size_t fft_length = 1024 * 64; - const size_t naccumulate = 4096 * nHeaps / fft_length; - const size_t heapSize = 4096 * nbits / 8; - const size_t inputBufferSize = nHeaps * (heapSize + 64 / 8); + psrdada_cpp::effelsberg::edd::GatedSpectrometerInputParameters ip; + ip.nbits = 8; + ip.fft_length = 1024 * 64; + ip.naccumulate = 4096 * nHeaps / ip.fft_length; + ip.selectedBit = 0; + ip.selectedSideChannel = 0; + ip.speadHeapSize = 4096 * ip.nbits / 8; + + const size_t inputBufferSize = nHeaps * (ip.speadHeapSize + 64 / 8); psrdada_cpp::DadaDB idbuffer(5, inputBufferSize, 1, 4096); idbuffer.create(); - psrdada_cpp::effelsberg::edd::DadaBufferLayout bufferLayout(idbuffer.key(), heapSize, 1); + ip.dadaBufferLayout = psrdada_cpp::effelsberg::edd::DadaBufferLayout(idbuffer.key(), ip.speadHeapSize, 1); - GatedTestSinkSinglePol sink(fft_length, nHeaps, naccumulate); + GatedTestSinkSinglePol sink(ip.fft_length, nHeaps, ip.naccumulate); psrdada_cpp::effelsberg::edd::GatedSpectrometer< GatedTestSinkSinglePol, psrdada_cpp::effelsberg::edd::SinglePolarizationInput, psrdada_cpp::effelsberg::edd::GatedPowerSpectrumOutput> - spectrometer(bufferLayout, - 0, 0, - fft_length, - naccumulate, nbits, - sink); + spectrometer(ip, sink); char *raw_buffer = new char[inputBufferSize]; @@ -402,29 +402,30 @@ class GatedTestSinkFullStokes{ TEST(GatedSpectrometer, processingFullStokes) { - const size_t nbits = 8; const size_t nHeaps = 1024; - const size_t fft_length = 1024 * 64; - const size_t naccumulate = 4096 * nHeaps / fft_length; - const size_t heapSize = 4096 * nbits / 8; - const size_t inputBufferSize = 2 * nHeaps * (heapSize + 64 / 8) ; + psrdada_cpp::effelsberg::edd::GatedSpectrometerInputParameters ip; + ip.nbits = 8; + ip.fft_length = 1024 * 64; + ip.naccumulate = 4096 * nHeaps / ip.fft_length; + ip.selectedBit = 0; + ip.selectedSideChannel = 0; + ip.speadHeapSize = 4096 * ip.nbits / 8; + + + const size_t inputBufferSize = 2 * nHeaps * (ip.speadHeapSize + 64 / 8) ; psrdada_cpp::DadaDB idbuffer(5, inputBufferSize, 1, 4096); idbuffer.create(); - psrdada_cpp::effelsberg::edd::DadaBufferLayout bufferLayout(idbuffer.key(), heapSize, 1); + psrdada_cpp::effelsberg::edd::DadaBufferLayout bufferLayout(idbuffer.key(), ip.speadHeapSize, 1); - GatedTestSinkFullStokes sink(fft_length, nHeaps, naccumulate); + GatedTestSinkFullStokes sink(ip.fft_length, nHeaps, ip.naccumulate); psrdada_cpp::effelsberg::edd::GatedSpectrometer< GatedTestSinkFullStokes, psrdada_cpp::effelsberg::edd::DualPolarizationInput, psrdada_cpp::effelsberg::edd::GatedFullStokesOutput> - spectrometer(bufferLayout, - 0, 0, - fft_length, - naccumulate, nbits, - sink); + spectrometer(ip, sink); char *raw_buffer = new char[inputBufferSize]; @@ -472,6 +473,19 @@ struct gated_params std::size_t nbits; std::size_t nHeaps; std::string msg; + + + psrdada_cpp::effelsberg::edd::GatedSpectrometerInputParameters getInputParams() + { + psrdada_cpp::effelsberg::edd::GatedSpectrometerInputParameters ip; + ip.nbits = nbits; + ip.fft_length = fft_length; + ip.naccumulate = naccumulate; + ip.selectedBit = 0; + ip.selectedSideChannel = 0; + ip.speadHeapSize = 4096 * nbits / 8; + return ip; + } }; @@ -479,22 +493,24 @@ class ExecutionTests: public testing::TestWithParam<gated_params> { // Test that the spectrometers execute without error in certain parameter // settings + + }; TEST_P(ExecutionTests, SinglePolOutput) { gated_params params = GetParam(); + psrdada_cpp::effelsberg::edd::GatedSpectrometerInputParameters ip = params.getInputParams(); - const size_t heapSize = 4096 * params.nbits / 8; - const size_t inputBufferSize = params.nHeaps * (heapSize + 64 / 8); + const size_t inputBufferSize = params.nHeaps * (ip.speadHeapSize + 64 / 8); psrdada_cpp::DadaDB buffer(5, inputBufferSize, 1, 4096); buffer.create(); - psrdada_cpp::effelsberg::edd::DadaBufferLayout bufferLayout(buffer.key(), heapSize, 1); + ip.dadaBufferLayout = psrdada_cpp::effelsberg::edd::DadaBufferLayout(buffer.key(), ip.speadHeapSize, 1); // Test buffer consistency with input parameters - EXPECT_EQ(bufferLayout.getNHeaps(), params.nHeaps); + EXPECT_EQ(ip.dadaBufferLayout.getNHeaps(), params.nHeaps); struct output_pointer { @@ -564,11 +580,7 @@ TEST_P(ExecutionTests, SinglePolOutput) TestSink, psrdada_cpp::effelsberg::edd::SinglePolarizationInput, psrdada_cpp::effelsberg::edd::GatedPowerSpectrumOutput> - spectrometer(bufferLayout, - 0, 0, - params.fft_length, - params.naccumulate, params.nbits, - sink); + spectrometer(ip, sink); char *raw_buffer = new char[inputBufferSize]; @@ -584,7 +596,7 @@ TEST_P(ExecutionTests, SinglePolOutput) // 3 Block, set value and SCI raw_buffer[122] = int8_t(13); - uint64_t* sc_items = reinterpret_cast<uint64_t*>(raw_buffer + params.nHeaps * heapSize); + uint64_t* sc_items = reinterpret_cast<uint64_t*>(raw_buffer + params.nHeaps * ip.speadHeapSize); for (size_t i = 0; i < params.nHeaps; i++) SET_BIT(sc_items[i], 0); EXPECT_NO_THROW(spectrometer(buff)) << params.msg; @@ -601,16 +613,16 @@ TEST_P(ExecutionTests, SinglePolOutput) TEST_P(ExecutionTests, FullStokesOutput) { gated_params params = GetParam(); + psrdada_cpp::effelsberg::edd::GatedSpectrometerInputParameters ip = params.getInputParams(); - const size_t heapSize = 4096 * params.nbits / 8; - const size_t inputBufferSize = params.nHeaps * (heapSize + 64 / 8); + const size_t inputBufferSize = params.nHeaps * (ip.speadHeapSize + 64 / 8); psrdada_cpp::DadaDB buffer(5, inputBufferSize, 1, 4096); buffer.create(); - psrdada_cpp::effelsberg::edd::DadaBufferLayout bufferLayout(buffer.key(), heapSize, 1); + ip.dadaBufferLayout = psrdada_cpp::effelsberg::edd::DadaBufferLayout(buffer.key(), ip.speadHeapSize, 1); // Test buffer consistency with input parameters - EXPECT_EQ(bufferLayout.getNHeaps(), params.nHeaps); + EXPECT_EQ(ip.dadaBufferLayout.getNHeaps(), params.nHeaps); psrdada_cpp::NullSink sink; @@ -618,11 +630,8 @@ TEST_P(ExecutionTests, FullStokesOutput) psrdada_cpp::NullSink, psrdada_cpp::effelsberg::edd::DualPolarizationInput, psrdada_cpp::effelsberg::edd::GatedFullStokesOutput> - spectrometer(bufferLayout, - 0, 0, - params.fft_length, - params.naccumulate, params.nbits, - sink); + spectrometer(ip, sink); + char *raw_buffer = new char[inputBufferSize]; memset(raw_buffer, 0, inputBufferSize);