Commit c8b3d3b6 authored by Niclas Esser's avatar Niclas Esser
Browse files

Merged cryopaf into devel

parent 6a287b3c
Pipeline #100153 passed with stages
in 17 minutes and 31 seconds
......@@ -88,3 +88,4 @@ install(DIRECTORY detail DESTINATION include/psrdada_cpp)
add_subdirectory(meerkat)
add_subdirectory(effelsberg)
add_subdirectory(cryopaf)
/*
* Beamformer.cuh
* Author: Niclas Esser <nesser@mpifr-bonn.mpg.de>
* Description:
* This file consists of a single class (Beamformer<ComputeType>). An object of Beamformer
* can be used o either perform a Stokes I detection or raw voltage beamforming
* on a GPU.
* Both beamforming kernels expect the same dataproduct (linear aligned in device memory)
* Input: F-P-T-E
* Weight: F-P-B-E
* Output: F-T-B-P (voltage beams)
* Output: F-T-B (Stokes I beams)
*/
#ifndef BEAMFORMER_CUH_
#define BEAMFORMER_CUH_
#include <cuda.h>
#include <cuda_fp16.h>
#include <thrust/device_vector.h>
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/multilog.hpp"
namespace psrdada_cpp{
namespace cryopaf{
// Constants for beamform kernels
#define NTHREAD 1024
#define TILE_SIZE 32
#define WARP_SIZE 32
/**
* @brief GPU kernel to perform Stokes I detection beamforming
*
* @detail Template type T has to be etiher T=float2 or T=__half2.
* According to T, U has to be either U=float or U=__half
*
* @param T* idata pointer to input memory (format: F-P-T-E)
* @param T* wdata pointer to beam weight memory (format: F-P-B-E)
* @param U* odata pointer to output memory type of U is equal to T::x (format: F-T-B)
* @param int time Width of time dimension (T)
* @param int elem Number of elements (E)
* @param int beam Number of beams (B)
* @param int integrate Integration time, currently limited to 32 and a power of 2
*
* @TODO: Allow greater integration time
*/
template<typename T, typename U>__global__
void beamformer_power_fpte_fpbe_ftb(
const T *idata,
const T* wdata,
U *odata,
int time,
int elem,
int beam,
int integrate);
/**
* @brief GPU kernel to perform raw voltage beamforming
*
* @detail Template type T has to be etiher T=float2 or T=__half2
*
* @param T* idata pointer to input memory (format: F-P-T-E)
* @param T* wdata pointer to beam weight memory (format: F-P-B-E)
* @param T* odata pointer to output memory (format: F-T-B)
* @param int time Width of time dimension (T)
* @param int elem Number of elements (E)
* @param int beam Number of beams (B)
*/
template<typename T>__global__
void beamformer_voltage_fpte_fpbe_fptb(
const T *idata,
const T* wdata,
T *odata,
int time,
int elem,
int beam);
template<class ComputeType>
class Beamformer{
// Internal typedefintions
private:
typedef decltype(ComputeType::x) ResultType; // Just necessary for Stokes I beamformer
// Public functions
public:
/**
* @brief constructs an object of Beamformer<ComputeType> (ComputeType=float2 or ComputeType=__half2)
*
* @param cudaStream_t& stream Object of cudaStream_t to allow parallel copy + processing (has to be created and destroyed elsewhere)
* @param std::size_t sample Number of samples to process in on kernel launch (no restrictions)
* @param std::size_t channel Number of channels to process in on kernel launch (no restrictions)
* @param std::size_t element Number of elements to process in on kernel launch (no restrictions)
* @param std::size_t beam Number of beams to process in on kernel launch (no restrictions)
* @param std::size_t integration Samples to be integrated, has to be power of 2 and smaller 32
*/
Beamformer(
cudaStream_t& stream,
std::size_t sample,
std::size_t channel,
std::size_t element,
std::size_t beam,
std::size_t integration = 1);
/**
* @brief deconstructs an object of Beamformer<ComputeType> (ComputeType=float2 or ComputeType=__half2)
*/
~Beamformer();
/**
* @brief Launches voltage beamforming GPU kernel
*
* @param ComputeType* input pointer to input memory (format: F-P-T-E)
* @param ComputeType* weights pointer to beam weight memory (format: F-P-B-E)
* @param ComputeType* output pointer to output memory (format: F-T-B-P)
*/
void process(
const ComputeType* input,
const ComputeType* weights,
ComputeType* output);
/**
* @brief Launches Stokes I beamforming GPU kernel
*
* @param ComputeType* input pointer to input memory (format: F-P-T-E)
* @param ComputeType* weights pointer to beam weight memory (format: F-P-B-E)
* @param ResultType* output pointer to output memory (format: F-T-B)
*/
void process(
const ComputeType* input,
const ComputeType* weights,
ResultType* output);
/**
* @brief Prints the block and grid layout of a kernel (used for debugging purposes)
*/
void print_layout();
// Private attributes
private:
cudaStream_t& _stream;
dim3 grid;
dim3 block;
std::size_t _sample;
std::size_t _channel;
std::size_t _element;
std::size_t _beam;
std::size_t _integration;
};
} // namespace cryopaf
} // namespace psrdada_cpp
#include "psrdada_cpp/cryopaf/details/utils.cu"
#include "psrdada_cpp/cryopaf/details/BeamformerKernels.cu"
#include "psrdada_cpp/cryopaf/src/Beamformer.cu"
#endif /* BEAMFORMER_CUH_ */
/*
* BufferTypes.cuh
* Author: Niclas Esser <nesser@mpifr-bonn.mpg.de>
* Description:
* This file contains classes for different kinds of buffer representation.
* All implemented classes inherit from DoubleBuffer<thrust::device_vector<T>>.
*/
#ifndef BUFFERTYPES_HPP_
#define BUFFERTYPES_HPP_
// boost::interprocess used to upload weights via POSIX shared memory
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/thread.hpp>
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/double_device_buffer.cuh"
#include "psrdada_cpp/double_host_buffer.cuh"
#include "psrdada_cpp/cryopaf/QueueHeader.hpp"
namespace psrdada_cpp {
namespace cryopaf{
/**
* @brief Class providing buffers for raw voltage data
*/
template<class T>
class RawVoltage : public DoubleBuffer<thrust::device_vector<T>>
{
public:
typedef T type;
public:
/**
* @brief Instantiates an object of RawVoltage
*
* @param std::size_t Number of items in buffer
*
* @detail Allocates twice the size in device memory as double device buffer
*/
RawVoltage(std::size_t size)
: DoubleBuffer<thrust::device_vector<T>>()
{
this->resize(size);
_bytes = size * sizeof(T);
}
/**
* @brief Destroys an object of RawVoltage
*/
~RawVoltage(){}
/**
* @brief Returns the number of bytes used for a single buffer
*
* @detail The occupied memory is twice
*/
std::size_t total_bytes(){return _bytes;}
private:
std::size_t _bytes;
};
/**
* @brief Class providing buffers for beam data (is always the Output of the Pipeline)
* @detail An object of BeamOutput also contains an instance of DoublePinnedHostBuffer<T>
* to allow an asynchronous copy to the host memory.
*/
template<class T>
class BeamOutput : public DoubleBuffer<thrust::device_vector<T>>
{
public:
typedef T type;
DoublePinnedHostBuffer<T> host;
public:
/**
* @brief Instantiates an object of BeamOutput
*
* @param std::size_t Number of items in buffer
*
* @detail Allocates twice the size in device memory and in host memory as double buffers
*/
BeamOutput(std::size_t size)
: DoubleBuffer<thrust::device_vector<T>>()
{
this->resize(size);
host.resize(size);
_bytes = size * sizeof(T);
}
/**
* @brief Destroys an object of BeamOutput
*/
~BeamOutput(){}
/**
* @brief Asynchronous copy to host memory
*
* @param cudaStream_t& stream Device to host stream
*/
void async_copy(cudaStream_t& stream)
{
CUDA_ERROR_CHECK(cudaMemcpyAsync(host.a_ptr(), this->a_ptr(), _bytes, cudaMemcpyDeviceToHost, stream));
}
/**
* @brief Returns the number of bytes used for a single buffer
*
* @detail The occupied memory is twice
*/
std::size_t total_bytes(){return _bytes;}
private:
std::size_t _bytes;
};
// Define namespace for convinient access to boost::interprocess functionalitys, just used for weights
namespace bip = boost::interprocess;
/**
* @brief Class providing buffers for beam weights
* @detail An object of Weights as the ability to read out a POSIX shared memory namespace
* to load updated beam weights.
* @note The current state is not final and will change in future. The idea for future is
* to provide an update method which is called by a shared memory instance.
*/
template<class T>
class Weights : public DoubleBuffer<thrust::device_vector<T>>
{
public:
typedef T type;
public:
/**
* @brief Instantiates an object of BeamOutput
*
* @param std::size_t Number of items in buffer
* @param std::string Name of the POSIX shared memory
*
* @detail Allocates twice the size in device memory as double device buffer.
* It also launches a boost::thread to create, read and write from shared
* memory.
*/
Weights(std::size_t size, std::string smem_name="SharedMemoryWeights")
: DoubleBuffer<thrust::device_vector<T>>()
, _smem_name(smem_name)
{
this->resize(size);
_bytes = size * sizeof(T);
t = new boost::thread(boost::bind(&Weights::run, this));
}
/**
* @brief Destroys an object of BeamOutput
*/
~Weights(){}
/**
* @brief Creates, read, write and removes a POSIX shared memory space
*
* @detail This function is a temporary solution to update beam weights on-the-fly
* while the pipeline is operating. In the future a clean interface will be created
* that provides addtional monitoring informations (e.g. power level) besides the
* beam weight updating mechanism.
*/
void run()
{
bip::shared_memory_object::remove("SharedMemoryWeights");
bip::shared_memory_object smem(bip::create_only, "SharedMemoryWeights", bip::read_write);
// Set size of shared memory including QueueHeader + payload
BOOST_LOG_TRIVIAL(info) << "Size of shared memory for weight uploading (IPC) " << sizeof(QueueHeader) + (this->size()) * sizeof(T);
smem.truncate(sizeof(QueueHeader) + (this->size()) * sizeof(T));
// Map shared memory to a addressable region
bip::mapped_region region(smem, bip::read_write);
void* smem_addr = region.get_address(); // get it's address
QueueHeader* qheader = static_cast<QueueHeader*>(smem_addr); // Interpret first bytes as QueueHeader
T *ptr = &(static_cast<T*>(smem_addr)[sizeof(QueueHeader)]); // Pointer to address of payload (behind QueueHeader)
qheader->stop = true;
while(qheader->stop){usleep(1000);}
while(!qheader->stop)
{
bip::scoped_lock<bip::interprocess_mutex> lock(qheader->mutex);
if(!qheader->data_in)
{
BOOST_LOG_TRIVIAL(debug) << "Waiting for writing weights to shared memory";
qheader->ready_to_read.wait(lock); // Wait for read out
}
BOOST_LOG_TRIVIAL(debug) << "Reading new weights from shared memory";
CUDA_ERROR_CHECK(cudaMemcpy((void*)this->b_ptr(), (void*)ptr,
_bytes, cudaMemcpyHostToDevice));
// Swap double buffer, so next batch is calculated with new weights
this->swap();
//Notify the other process that the buffer is empty
qheader->data_in = false;
qheader->ready_to_write.notify_all();
}
bip::shared_memory_object::remove("SharedMemoryWeights");
BOOST_LOG_TRIVIAL(info) << "Closed shared memory for weights uploading";
}
std::size_t total_bytes(){return _bytes;}
private:
std::size_t _bytes;
std::string _smem_name;
boost::thread *t;
};
}
}
#endif /* BUFFERTYPES_HPP_ */
if(ENABLE_CUDA)
set(PSRDADA_CPP_CRYOPAF_LIBRARIES
${CMAKE_PROJECT_NAME}
${CMAKE_PROJECT_NAME}_cryopaf
${DEPENDENCY_LIBRARIES}
-lboost_system
-lpthread)
set(psrdada_cpp_cryopaf_src
Unpacker.cuh
Beamformer.cuh
BufferTypes.cuh
Pipeline.cuh
PipelineInterface.cuh)
cuda_add_library(${CMAKE_PROJECT_NAME}_cryopaf ${psrdada_cpp_cryopaf_src})
cuda_add_executable(beamforming src/beamforming_cli.cu)
target_link_libraries(beamforming ${PSRDADA_CPP_CRYOPAF_LIBRARIES})
install(TARGETS beamforming DESTINATION bin)
cuda_add_executable(weightupdater src/weightupdater_cli.cu)
target_link_libraries(weightupdater ${PSRDADA_CPP_CRYOPAF_LIBRARIES})
install(TARGETS weightupdater DESTINATION bin)
add_subdirectory(profiling)
add_subdirectory(test)
endif(ENABLE_CUDA)
/*
* Pipeline.cuh
* Author: Niclas Esser <nesser@mpifr-bonn.mpg.de>
* Description:
* This files consists of a single class (Pipeline<HandlerType, ComputeType, ResultType>)
* and a configuration structure (PipelineConfig).
* An object of Pipeline is used to access data from psrdada buffers, unpacks them,
* performs beamforming and writes the results back to another psrdada buffer.
* TODO:
* - We need a packer
* - We need a monitoring interface
*/
#ifndef PIPELINE_CUH_
#define PIPELINE_CUH_
#include <thrust/device_vector.h>
#include <thrust/copy.h>
#include <cuda.h>
#include "psrdada_cpp/cuda_utils.hpp"
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/raw_bytes.hpp"
#include "psrdada_cpp/cryopaf/Unpacker.cuh"
#include "psrdada_cpp/cryopaf/Beamformer.cuh"
#include "psrdada_cpp/cryopaf/BufferTypes.cuh"
namespace psrdada_cpp{
namespace cryopaf{
struct PipelineConfig{
key_t in_key;
key_t out_key;
int device_id;
std::string logname;
std::size_t n_samples;
std::size_t n_channel;
std::size_t n_elements;
std::size_t n_beam;
std::size_t integration;
std::string input_type;
std::string mode;
std::string protocol;
const std::size_t n_pol = 2;
void print()
{
std::cout << "Pipeline configuration" << std::endl;
std::cout << "in_key: " << in_key << std::endl;
std::cout << "out_key: " << out_key << std::endl;
std::cout << "device_id: " << device_id << std::endl;
std::cout << "logname: " << logname << std::endl;
std::cout << "n_samples: " << n_samples << std::endl;
std::cout << "n_channel: " << n_channel << std::endl;
std::cout << "input_type: " << input_type << std::endl;
std::cout << "n_elements: " << n_elements << std::endl;
std::cout << "n_pol: " << n_pol << std::endl;
std::cout << "n_beam: " << n_beam << std::endl;
std::cout << "integration: " << integration << std::endl;
std::cout << "mode: " << mode << std::endl;
}
};
template<class HandlerType, class ComputeType, class ResultType>
class Pipeline{
// Internal type defintions
private:
typedef RawVoltage<char> RawInputType; // Type for received raw input data (voltage)
typedef RawVoltage<ComputeType> InputType;// Type for unpacked raw input data (voltage)
typedef Weights<ComputeType> WeightType; // Type for beam weights
typedef BeamOutput<ResultType> OutputType;// Type for beamfored output data
public:
/**
* @brief Constructs an object of Pipeline
*
* @param PipelineConfig conf Pipeline configuration containing all necessary parameters (declaration can be found in Types.cuh)
* @param MultiLog log Logging instance
* @param HandlerType handler Object for handling output data
*
* @detail Initializes the pipeline enviroment including device memory and processor objects.
*/
Pipeline(PipelineConfig& conf, MultiLog &log, HandlerType &handler);
/**
* @brief Deconstructs an object of Pipeline
*
* @detail Destroys all objects and allocated memory
*/
~Pipeline();
/**
* @brief Initialise the pipeline with a DADA header block
*
* @param header A RawBytes object wrapping the DADA header block
*/
void init(RawBytes &header_block);
/**
* @brief Process the data in a DADA data buffer
*
* @param data A RawBytes object wrapping the DADA data block
*/
bool operator()(RawBytes &dada_block);
// Internal attributes
private:
HandlerType &_handler;
MultiLog &_log;
PipelineConfig& _conf;
// Processors
Unpacker<ComputeType>* unpacker = nullptr; // Object to unpack and transpose received input data on GPU to an expected format
Beamformer<ComputeType>* beamformer = nullptr; // Object to perform beamforming on GPU
// Buffers
RawInputType *_raw_input_buffer = nullptr; // Received input buffer
InputType *_input_buffer = nullptr; // Unpacked and transposed input buffer
WeightType *_weight_buffer = nullptr; // Beam weights, updated through shared memory
OutputType *_output_buffer = nullptr; // Output buffer containing processed beams
std::size_t _call_cnt = 0; // Internal dada block counter
cudaStream_t _h2d_stream; // Host to device cuda stream (used for async copys)
cudaStream_t _prc_stream; // Processing stream
cudaStream_t _d2h_stream; // Device to host cuda stream (used for async copys)
#ifdef DEBUG
// Time measurement variables (debugging only)
cudaEvent_t start, stop;
float ms;
#endif
};
} // namespace cryopaf
} // namespace psrdada_cpp
#include "psrdada_cpp/cryopaf/src/Pipeline.cu"
#endif /* POWERBEAMFORMER_CUH_ */
#ifndef PIPELINE_INTERFACE_HPP
#define PIPELINE_INTERFACE_HPP
#include <vector>
#include <string>
#include <unistd.h>
#include <random>
#include <cmath>
#include <complex>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/thread.hpp>
#include "psrdada_cpp/multilog.hpp"
#include "psrdada_cpp/cryopaf/QueueHeader.hpp"
namespace psrdada_cpp{
namespace cryopaf{
namespace bip = boost::interprocess;
struct PipelineInterfaceConfig{
std::string logname;
std::size_t n_channel;
std::size_t n_elements;
std::size_t n_pol;
std::size_t n_beam;
std::string mode;
void print()
{
std::cout << "Pipeline interface configuration" << std::endl;
std::cout << "logname: " << logname << std::endl;
std::cout << "n_channel: " << n_channel << std::endl;
std::cout << "n_elements: " << n_elements << std::endl;
std::cout << "n_pol: " << n_pol << std::endl;
std::cout << "n_beam: " << n_beam << std::endl;
std::cout << "mode: " << mode << std::endl;
}
};
template<class T>
class PipelineInterface
{
public:
PipelineInterface(PipelineInterfaceConfig& config, MultiLog& logger);
~PipelineInterface();
void run();
virtual void update() = 0;