EDDPolnMerge.cpp 4.83 KB
Newer Older
root's avatar
root committed
1
2
3
4
5
#include "psrdada_cpp/effelsberg/edd/EDDPolnMerge.hpp"
#include "ascii_header.h"
#include <immintrin.h>
#include <time.h>
#include <iomanip>
6
7
#include <cmath>

root's avatar
root committed
8
9
10
11
namespace psrdada_cpp {
namespace effelsberg {
namespace edd {

12
13
14
15
16
17
uint64_t interleave(uint32_t x, uint32_t y) {
    __m128i xvec = _mm_cvtsi32_si128(x);
    __m128i yvec = _mm_cvtsi32_si128(y);
    __m128i interleaved = _mm_unpacklo_epi8(yvec, xvec);
    return _mm_cvtsi128_si64(interleaved);
}
root's avatar
root committed
18

19
void merge2pol(char const *buf, char *out)
20
21
22
23
24
{
    uint8_t *qword0 = (uint8_t*)(buf);
    uint8_t *qword1 = (uint8_t*)(buf) + 4096;
    uint64_t* D = reinterpret_cast<uint64_t*>(out);
    for (int i = 0; i < 4096 / 4; i++)
25
    {
26
27
        uint32_t* S0 = reinterpret_cast<uint32_t*>(qword0);
        uint32_t* S1 = reinterpret_cast<uint32_t*>(qword1);
28
        *D++ = interleave(*S1++, *S0++);
29
30
31
32
        qword0 += 4;
        qword1 += 4;
    }
}
33

34
EDDPolnMerge::EDDPolnMerge(std::size_t nsamps_per_heap, std::size_t npol, int nthreads, DadaWriteClient& writer)
root's avatar
root committed
35
36
    : _nsamps_per_heap(nsamps_per_heap)
    , _npol(npol)
37
    , _nthreads(nthreads)
root's avatar
root committed
38
    , _writer(writer)
39
40
{
}
root's avatar
root committed
41

42
43
44
EDDPolnMerge::~EDDPolnMerge()
{
}
root's avatar
root committed
45

46
47
48
49
void EDDPolnMerge::init(RawBytes& block)
{
    RawBytes& oblock = _writer.header_stream().next();
    if (block.used_bytes() > oblock.total_bytes())
root's avatar
root committed
50
    {
51
        _writer.header_stream().release();
52
        throw std::runtime_error("Output DADA buffer does not have enough space for header");
53
    }
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
    std::memcpy(oblock.ptr(), block.ptr(), block.used_bytes());
    char buffer[1024];
    ascii_header_get(block.ptr(), "SAMPLE_CLOCK_START", "%s", buffer);
    std::size_t sample_clock_start = std::strtoul(buffer, NULL, 0);
    ascii_header_get(block.ptr(), "CLOCK_SAMPLE", "%s", buffer);
    long double sample_clock = std::strtold(buffer, NULL);
    ascii_header_get(block.ptr(), "SYNC_TIME", "%s", buffer);
    long double sync_time = std::strtold(buffer, NULL);
    BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start " << sample_clock_start;
    BOOST_LOG_TRIVIAL(debug) << "this is sample_clock " << sample_clock;
    BOOST_LOG_TRIVIAL(debug) << "this is sync_time " << sync_time;
    BOOST_LOG_TRIVIAL(debug) << "this is sample_clock_start / sample_clock " << sample_clock_start / sample_clock;
    long double unix_time = sync_time + (sample_clock_start / sample_clock);
    long double mjd_time = unix_time / 86400 - 40587.5;
    char time_buffer[80];
    std::time_t unix_time_int;
    struct std::tm * timeinfo;
    double fractpart, intpart;
    fractpart = std::modf (static_cast<double>(unix_time) , &intpart);
    unix_time_int = static_cast<std::time_t>(intpart);
    timeinfo = std::gmtime (&unix_time_int);
    std::strftime(time_buffer, 80, "%Y-%m-%d-%H:%M:%S", timeinfo);
    std::stringstream utc_time_stamp;
    BOOST_LOG_TRIVIAL(debug) << "unix_time" << unix_time;
    BOOST_LOG_TRIVIAL(debug) << "fractional part " << fractpart;
    //BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
    //utc_time_stamp<< time_buffer << "." <<fractpart;
    utc_time_stamp << time_buffer << "." << std::setw(10) << std::setfill('0') << std::size_t(fractpart * 10000000000) << std::setfill(' ');
    //BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
    //utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
    BOOST_LOG_TRIVIAL(debug) << "this is start time in utc " << utc_time_stamp.str().c_str() << "\n";
//  std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
    ascii_header_set(oblock.ptr(), "UTC_START", "%s", utc_time_stamp.str().c_str());
    ascii_header_set(oblock.ptr(), "UNIX_TIME", "%Lf", unix_time);
    oblock.used_bytes(oblock.total_bytes());
    _writer.header_stream().release();
}
root's avatar
root committed
91

92
93
94
95
96
97
98
99
100
101
102
103
104
bool EDDPolnMerge::operator()(RawBytes& block)
{
std: size_t nheap_groups = block.used_bytes() / _npol / _nsamps_per_heap;
    /**
            if (block.used_bytes() < block.total_bytes())
            {
                BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
                _writer.data_stream().next();
                _writer.data_stream().release();
                return true;
            }
    **/
    RawBytes& oblock = _writer.data_stream().next();
root's avatar
root committed
105

106
107
108
109
110
    if (block.used_bytes() > oblock.total_bytes())
    {
        _writer.data_stream().release();
        throw std::runtime_error("Output DADA buffer does not match with the input dada buffer");
    }
111

112
113
114
    #pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads)
    for (std::size_t kk = 0; kk < block.used_bytes() / _nsamps_per_heap / _npol ; ++kk)
    {
115
116
        char *buffer = block.ptr() + _nsamps_per_heap * _npol * kk;
        merge2pol(buffer, oblock.ptr() + kk * _npol * _nsamps_per_heap);
root's avatar
root committed
117
    }
118
119
120
121
    oblock.used_bytes(block.used_bytes());
    _writer.data_stream().release();
    return false;
}
root's avatar
root committed
122
123
124
125
}//edd
}//effelsberg
}//psrdada_cpp