Commit 5ac007e8 authored by Tobias Winchen's avatar Tobias Winchen
Browse files

Merged

parents 8d803261 6be835f5
root = true
[*]
indent_style = space
indent_size = 4
trim_trailing_whitespace=true
......@@ -18,7 +18,7 @@ namespace edd {
const size_t vlbiHeaderSize = 8 * 32 / 8; // bytes [8 words a 32 bit]
/// class VDIFHeaderView provides interprets a data block as VDIF compliant header
/// class VDIFHeaderView provides interprets a data block as VDIF compliant header
/// See https://vlbi.org/vlbi-standards/vdif/specification 1.1.1 from June 2014 for details.
class VDIFHeaderView
{
......@@ -29,21 +29,20 @@ class VDIFHeaderView
void setDataLocation(const uint32_t* _data);
const uint32_t* getDataLocation() const;
uint32_t getVersionNumber() const;
bool isValid() const;
uint32_t getSecondsFromReferenceEpoch() const;
uint32_t getReferenceEpoch() const;
bool isValid() const;
uint32_t getSecondsFromReferenceEpoch() const;
uint32_t getReferenceEpoch() const;
size_t getTimestamp() const;
uint32_t getDataFrameNumber() const;
// Length of the data frame including the header, in units of 8 bytes
uint32_t getDataFrameLength() const;
uint32_t getNumberOfChannels() const;
uint32_t getDataFrameNumber() const;
// Length of the data frame including the header, in units of 8 bytes
uint32_t getDataFrameLength() const;
uint32_t getNumberOfChannels() const;
bool isRealDataType() const;
bool isComplexDataType() const;
// Number of bits per sample -1 (max 32/bits per sample)
uint32_t getBitsPerSample() const;
uint32_t getThreadId() const;
uint32_t getStationId() const;
bool isComplexDataType() const;
// Number of bits per sample -1 (max 32/bits per sample)
uint32_t getBitsPerSample() const;
uint32_t getThreadId() const;
uint32_t getStationId() const;
};
......@@ -52,33 +51,33 @@ class VDIFHeaderView
/// specification 1.1.1 from June 2014 for details.
class VDIFHeader : public VDIFHeaderView
{
private:
uint32_t data[8];
private:
uint32_t data[8];
public:
VDIFHeader();
VDIFHeader(const VDIFHeader &v);
VDIFHeader& operator=(const VDIFHeader& other);
public:
VDIFHeader();
VDIFHeader(const VDIFHeader &v);
VDIFHeader& operator=(const VDIFHeader& other);
// return pointer to the data block for low level manipulation
uint32_t* getData();
void setInvalid();
void setValid();
void setSecondsFromReferenceEpoch(uint32_t value);
void setReferenceEpoch(uint32_t value);
uint32_t* getData();
void setInvalid();
void setValid();
void setSecondsFromReferenceEpoch(uint32_t value);
void setReferenceEpoch(uint32_t value);
/// set reference epoch and seconds from reference epoch from POSIX time
/// stamp
void setTimeReferencesFromTimestamp(size_t);
/// converts time reference data to POSIX time
void setDataFrameNumber(uint32_t value);
void setDataFrameLength(uint32_t value);
void setNumberOfChannels(uint32_t value);
void setComplexDataType();
void setRealDataType();
void setBitsPerSample(uint32_t value);
void setThreadId(uint32_t value);
void setStationId(uint32_t value);
void setDataFrameNumber(uint32_t value);
void setDataFrameLength(uint32_t value);
void setNumberOfChannels(uint32_t value);
void setComplexDataType();
void setRealDataType();
void setBitsPerSample(uint32_t value);
void setThreadId(uint32_t value);
void setStationId(uint32_t value);
};
......
......@@ -37,14 +37,13 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with "
<< vlbiHeaderSize << "bytes header info and "
<< _vdifHeader.getDataFrameLength() * 8
<< " bytes data frame length";
<< " bytes total data frame length";
BOOST_LOG_TRIVIAL(debug) << " Expecting speadheaps of size "
<< speadHeapSize << " byte";
BOOST_LOG_TRIVIAL(debug) << " Sample rate " << _sampleRate << " Hz";
std::size_t n64bit_words = _buffer_bytes / sizeof(uint64_t);
BOOST_LOG_TRIVIAL(debug) << "Allocating memory";
BOOST_LOG_TRIVIAL(debug) << "Allocating memory:";
_raw_voltage_db.resize(n64bit_words);
BOOST_LOG_TRIVIAL(debug) << " Input voltages size : "
<< _raw_voltage_db.size() << " 64-bit words,"
......@@ -56,17 +55,15 @@ VLBI<HandlerType>::VLBI(std::size_t buffer_bytes, std::size_t input_bitDepth,
<< _packed_voltage.size() << " byte";
_spillOver.reserve(vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize);
// number of vlbi frames per input block
size_t nSamplesPerInputBlock = _packed_voltage.size() * 8 / _output_bitDepth;
size_t frames_per_block = _packed_voltage.size() / (vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize);
BOOST_LOG_TRIVIAL(debug) << " this correspoonds to " << frames_per_block << " - " << frames_per_block + 1 << " frames";
// number of vlbi frames per input block
size_t nSamplesPerInputBlock = _packed_voltage.size() * 8 / _output_bitDepth;
size_t frames_per_block = _packed_voltage.size() / (vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize);
BOOST_LOG_TRIVIAL(debug) << " this correspoonds to " << frames_per_block << " - " << frames_per_block + 1 << " frames.";
_outputBuffer.resize((frames_per_block+1) * vdifHeader.getDataFrameLength() * 8 );
// potetnitally invalidating the last frame
_outputBuffer.resize((frames_per_block+1) * vdifHeader.getDataFrameLength() * 8 );
// potetnitally invalidating the last frame
BOOST_LOG_TRIVIAL(info) << " Output data in VDIF format with " << _outputBuffer.size() << " bytes per buffer";
CUDA_ERROR_CHECK(cudaStreamCreate(&_h2d_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_proc_stream));
CUDA_ERROR_CHECK(cudaStreamCreate(&_d2h_stream));
......@@ -145,7 +142,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
CUDA_ERROR_CHECK(cudaStreamSynchronize(_h2d_stream));
_raw_voltage_db.swap();
BOOST_LOG_TRIVIAL(debug) << " block.used_bytes() = " << block.used_bytes()
BOOST_LOG_TRIVIAL(debug) << " - block.used_bytes() = " << block.used_bytes()
<< ", dataBlockBytes = " << _buffer_bytes << "\n";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_raw_voltage_db.a_ptr()),
......@@ -159,7 +156,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
// Process data
_packed_voltage.swap();
BOOST_LOG_TRIVIAL(debug) << "Unpacking raw voltages";
BOOST_LOG_TRIVIAL(debug) << " - Unpacking raw voltages";
switch (_input_bitDepth) {
case 8:
_unpacker->unpack<8>(_raw_voltage_db.b(), _unpacked_voltage);
......@@ -172,7 +169,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
}
BOOST_LOG_TRIVIAL(debug) << "Calculate baseline";
BOOST_LOG_TRIVIAL(debug) << " - Calculate baseline";
psrdada_cpp::effelsberg::edd::
array_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage.data()),
......@@ -183,7 +180,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
thrust::raw_pointer_cast(_baseLineN.data()), _baseLineN.size(),
thrust::raw_pointer_cast(_baseLineN.data()));
BOOST_LOG_TRIVIAL(debug) << "Calculate std-dev";
BOOST_LOG_TRIVIAL(debug) << " - Calculate std-dev";
psrdada_cpp::effelsberg::edd::
scaled_square_residual_sum<<<64, array_sum_Nthreads, 0, _proc_stream>>>(
thrust::raw_pointer_cast(_unpacked_voltage.data()),
......@@ -196,13 +193,13 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
// non linear packing
BOOST_LOG_TRIVIAL(debug) << "Packing data with non linear 2-bit packaging "
BOOST_LOG_TRIVIAL(debug) << " - Packing data with non linear 2-bit packaging "
"using levels -v*sigma, 0, v*sigma with v = "
<< _digitizer_threshold;
_packed_voltage.b().resize(_unpacked_voltage.size() * 2 / 8);
BOOST_LOG_TRIVIAL(debug) << "Input size: " << _unpacked_voltage.size()
BOOST_LOG_TRIVIAL(debug) << " - Input size: " << _unpacked_voltage.size()
<< " elements";
BOOST_LOG_TRIVIAL(debug) << "Resizing output buffer to "
BOOST_LOG_TRIVIAL(debug) << " - Resizing output buffer to "
<< _packed_voltage.b().size() << " byte";
pack2bit_nonLinear<<<128, 1024, 0, _proc_stream>>>(
......@@ -213,7 +210,7 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
thrust::raw_pointer_cast(_baseLineN.data()));
CUDA_ERROR_CHECK(cudaStreamSynchronize(_proc_stream));
BOOST_LOG_TRIVIAL(trace) << " Standard Deviation squared: " << _stdDevN[0]
BOOST_LOG_TRIVIAL(trace) << " - Standard Deviation squared: " << _stdDevN[0]
<< " "
<< "Mean Value: "
<< _baseLineN[0] / _unpacked_voltage.size();
......@@ -224,58 +221,49 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
_outputBuffer.swap();
////////////////////////////////////////////////////////////////////////
BOOST_LOG_TRIVIAL(debug) << "Copy Data back to host";
BOOST_LOG_TRIVIAL(debug) << " - Copy Data back to host";
CUDA_ERROR_CHECK(cudaStreamSynchronize(_d2h_stream));
const size_t outputBlockSize = _vdifHeader.getDataFrameLength() * 8 - vlbiHeaderSize;
const size_t totalSizeOfData = _packed_voltage.size() + _spillOver.size(); // current array + remaining of previous
size_t numberOfBlocksInOutput = totalSizeOfData / outputBlockSize;
size_t remainingBytes = outputBlockSize - _spillOver.size();
BOOST_LOG_TRIVIAL(debug) << " Number of blocks in output "
BOOST_LOG_TRIVIAL(debug) << " - Number of blocks in output "
<< numberOfBlocksInOutput;
//_outputBuffer.a().resize(numberOfBlocksInOutput *
// (outputBlockSize + vlbiHeaderSize));
BOOST_LOG_TRIVIAL(debug) << " Copying " << _spillOver.size()
// First copy spill-over from last block for first frame, leaving room for
// header of course
BOOST_LOG_TRIVIAL(debug) << " - Copying " << _spillOver.size()
<< " bytes spill over";
// leave room for header and fill first block of output with spill over
std::copy(_spillOver.begin(), _spillOver.end(),
_outputBuffer.a().begin() + vlbiHeaderSize);
BOOST_LOG_TRIVIAL(debug) << " Copying remaining " << remainingBytes
<< " bytes for first block";
// cuda memcopy remainder of first block
BOOST_LOG_TRIVIAL(debug) << " - Copying remaining " << remainingBytes
<< " bytes for first block";
CUDA_ERROR_CHECK(cudaMemcpyAsync(static_cast<void *>(_outputBuffer.a_ptr() + vlbiHeaderSize + _spillOver.size()),
static_cast<void *>(_packed_voltage.a_ptr()),
remainingBytes, cudaMemcpyDeviceToHost,
_d2h_stream));
// cuda memcopy rest with pitch, leaving room for the header
const size_t dpitch = outputBlockSize + vlbiHeaderSize;
const size_t spitch = outputBlockSize;
const size_t width = outputBlockSize;
size_t height = numberOfBlocksInOutput-1;
BOOST_LOG_TRIVIAL(debug) << " Copying " << height
<< " blocks a " << outputBlockSize << " bytes";
// we now have a full first block, pitch copy rest leaving room for the header
BOOST_LOG_TRIVIAL(debug) << " - Copying " << height
<< " blocks of " << outputBlockSize << " bytes";
CUDA_ERROR_CHECK(cudaMemcpy2DAsync(
(void *)(_outputBuffer.a_ptr() + outputBlockSize + 2 * vlbiHeaderSize),
dpitch, (void *)thrust::raw_pointer_cast(_packed_voltage.a_ptr() +
remainingBytes),
spitch, width, height, cudaMemcpyDeviceToHost, _d2h_stream));
// new spill over
// copy remaing part of device to spill over
_spillOver.resize(totalSizeOfData - numberOfBlocksInOutput * outputBlockSize);
size_t offset = (numberOfBlocksInOutput-1) * outputBlockSize + remainingBytes;
BOOST_LOG_TRIVIAL(debug) << " New spill over size " << _spillOver.size()
BOOST_LOG_TRIVIAL(debug) << " - New spill over size " << _spillOver.size()
<< " bytes with offset " << offset;
CUDA_ERROR_CHECK(cudaMemcpyAsync(
static_cast<void *>(thrust::raw_pointer_cast(_spillOver.data())),
static_cast<void *>(_packed_voltage.a_ptr() + offset),
......@@ -285,8 +273,8 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
const uint32_t samplesPerDataFrame = outputBlockSize * 8 / _output_bitDepth;
const uint32_t dataFramesPerSecond = _sampleRate / samplesPerDataFrame;
BOOST_LOG_TRIVIAL(debug) << " Samples per data frame: " << samplesPerDataFrame;
BOOST_LOG_TRIVIAL(debug) << " Dataframes per second: " << dataFramesPerSecond;
BOOST_LOG_TRIVIAL(debug) << " Samples per data frame: " << samplesPerDataFrame;
BOOST_LOG_TRIVIAL(debug) << " Dataframes per second: " << dataFramesPerSecond;
for (uint32_t ib = 0; ib < _outputBuffer.a().size(); ib += _vdifHeader.getDataFrameLength() * 8)
{
......@@ -294,32 +282,32 @@ bool VLBI<HandlerType>::operator()(RawBytes &block) {
std::copy(reinterpret_cast<uint8_t *>(_vdifHeader.getData()),
reinterpret_cast<uint8_t *>(_vdifHeader.getData()) + vlbiHeaderSize,
_outputBuffer.a().begin() + ib);
size_t i = ib / _vdifHeader.getDataFrameLength() / 8;
// invalidate rest of data so it can be dropped later.
// Needed so that the outpuitbuffer can have always the same size
if (i < numberOfBlocksInOutput)
{
_vdifHeader.setValid();
}
else
{
_vdifHeader.setInvalid();
continue;
}
size_t i = ib / _vdifHeader.getDataFrameLength() / 8;
// update header
uint32_t dataFrame = _vdifHeader.getDataFrameNumber();
if (i < 5)
BOOST_LOG_TRIVIAL(debug) << i << " Dataframe Number: " << dataFrame;
if (dataFrame < dataFramesPerSecond)
BOOST_LOG_TRIVIAL(debug) << i << " Dataframe Number: " << _vdifHeader.getDataFrameNumber();
// invalidate rest of data so it can be dropped later.
// Needed so that the outpuitbuffer can have always the same size
if (i < numberOfBlocksInOutput)
{
_vdifHeader.setDataFrameNumber(dataFrame + 1);
_vdifHeader.setValid();
}
else
{
_vdifHeader.setInvalid();
continue;
}
// increase dataframenumber
_vdifHeader.setDataFrameNumber(_vdifHeader.getDataFrameNumber() + 1);
// update header
if (_vdifHeader.getDataFrameNumber() >= dataFramesPerSecond)
{
_vdifHeader.setDataFrameNumber(0);
_vdifHeader.setSecondsFromReferenceEpoch(_vdifHeader.getSecondsFromReferenceEpoch() + 1);
BOOST_LOG_TRIVIAL(debug) << i << " Beginning new second after epoch: " << _vdifHeader.getSecondsFromReferenceEpoch();
}
}
......
......@@ -15,12 +15,13 @@ namespace edd {
VDIFHeaderView::VDIFHeaderView(const uint32_t* data) : data(data) {};
void VDIFHeaderView::setDataLocation(const uint32_t* _data) {
data = _data;
};
const uint32_t* VDIFHeaderView::getDataLocation() const {
return data;
return data;
};
bool VDIFHeaderView::isValid() const {
......@@ -97,18 +98,18 @@ VDIFHeader::VDIFHeader() : VDIFHeaderView(data)
VDIFHeader::VDIFHeader(const VDIFHeader &v): VDIFHeaderView(data)
{
for (int i = 0; i < 8; i++) {
for (int i = 0; i < 8; i++) {
data[i] = v.getDataLocation()[i];
}
setDataLocation(data);
setDataLocation(data);
}
VDIFHeader& VDIFHeader::operator=(const VDIFHeader& other)
{
for (int i = 0; i < 8; i++) {
for (int i = 0; i < 8; i++) {
data[i] = other.getDataLocation()[i];
}
return *this;
return *this;
}
uint32_t *VDIFHeader::getData() { return data; }
......
......@@ -34,6 +34,9 @@ class VDIF_Sender
int port;
double max_rate;
uint32_t currentSecondFromReferenceEpoch;
size_t noOfSendFrames; // frames in last second
boost::asio::ip::udp::socket socket;
boost::asio::ip::udp::endpoint remote_endpoint;
......@@ -48,7 +51,7 @@ class VDIF_Sender
* communication.
*/
VDIF_Sender(const std::string &source_ip, const std::string &destination_ip, int port, double max_rate, boost::asio::io_service&
io_service): socket(io_service), source_ip(source_ip), destination_ip(destination_ip), port(port), max_rate(max_rate)
io_service): socket(io_service), source_ip(source_ip), destination_ip(destination_ip), port(port), max_rate(max_rate), currentSecondFromReferenceEpoch(0), noOfSendFrames(0)
{
}
......@@ -97,18 +100,31 @@ class VDIF_Sender
BOOST_LOG_TRIVIAL(debug) << " Length of first frame: " << vdifHeader.getDataFrameLength() * 8 << " bytes";
size_t counter = 0;
size_t invalidFrames = 0;
auto start = std::chrono::high_resolution_clock::now();
for(char* frame_start = block.ptr(); frame_start < block.ptr() + blockSize; frame_start += vdifHeader.getDataFrameLength() * 8)
{
vdifHeader.setDataLocation(reinterpret_cast<uint32_t*>(frame_start));
// skip invalid blocks
if (!vdifHeader.isValid())
continue;
// skip invalid blocks
if (!vdifHeader.isValid())
{
invalidFrames++;
continue;
}
if (vdifHeader.getSecondsFromReferenceEpoch() > currentSecondFromReferenceEpoch)
{
BOOST_LOG_TRIVIAL(info) << " New second frome reference epoch: " << vdifHeader.getSecondsFromReferenceEpoch() << ", send " << noOfSendFrames << " in previous second.";
BOOST_LOG_TRIVIAL(debug) << " Previous second from refEpoch " << currentSecondFromReferenceEpoch << " delta = " << vdifHeader.getSecondsFromReferenceEpoch() - currentSecondFromReferenceEpoch;
currentSecondFromReferenceEpoch = vdifHeader.getSecondsFromReferenceEpoch();
noOfSendFrames = 0;
}
uint32_t frameLength = vdifHeader.getDataFrameLength() * 8; // in units of 8 bytes
socket.send_to(boost::asio::buffer(frame_start, frameLength), remote_endpoint, 0, err);
noOfSendFrames++;
counter++;
size_t processed_bytes = (frame_start - block.ptr()) + frameLength;
......@@ -124,11 +140,11 @@ class VDIF_Sender
//BOOST_LOG_TRIVIAL(debug) << counter << " Set delay to " << delay.count()<< " ns. Current rate " << current_rate << ", processed_bytes: " << processed_bytes;
}
if (counter < 5)
BOOST_LOG_TRIVIAL(debug) << counter << " Send - FN: " << vdifHeader.getDataFrameNumber() << ", Sec f. E.: " << vdifHeader.getSecondsFromReferenceEpoch() << " Get TS.: " << vdifHeader.getTimestamp();
if (counter < 5)
BOOST_LOG_TRIVIAL(debug) << counter << " Send - FN: " << vdifHeader.getDataFrameNumber() << ", Sec f. E.: " << vdifHeader.getSecondsFromReferenceEpoch() << " Get TS.: " << vdifHeader.getTimestamp();
}
BOOST_LOG_TRIVIAL(info) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size.";
BOOST_LOG_TRIVIAL(debug) << "Send " << counter << " frames of " << block.used_bytes() << " bytes total size. " << invalidFrames << " invalid frames in block.";
return false;
}
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment