Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
MPIfR-BDG
psrdada_cpp
Commits
0c789044
Commit
0c789044
authored
Jan 27, 2021
by
Jason Wu
Browse files
fix formatting and adding nthreads option
parent
c2ecee6f
Pipeline
#92250
failed with stages
in 1 minute and 27 seconds
Changes
16
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
psrdada_cpp/effelsberg/edd/EDDPolnMerge.hpp
View file @
0c789044
...
...
@@ -39,7 +39,7 @@ public:
private:
std
::
size_t
_nsamps_per_heap
;
std
::
size_t
_npol
;
int
_nthreads
;
int
_nthreads
;
DadaWriteClient
&
_writer
;
};
...
...
psrdada_cpp/effelsberg/edd/EDDPolnMerge10to8.hpp
View file @
0c789044
...
...
@@ -12,7 +12,7 @@ namespace edd {
class
EDDPolnMerge10to8
{
public:
EDDPolnMerge10to8
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
DadaWriteClient
&
writer
);
EDDPolnMerge10to8
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
int
nthreads
,
DadaWriteClient
&
writer
);
~
EDDPolnMerge10to8
();
/**
...
...
@@ -38,7 +38,8 @@ public:
private:
std
::
size_t
_nsamps_per_heap
;
std
::
size_t
_npol
;
std
::
size_t
_npol
;
int
_nthreads
;
DadaWriteClient
&
_writer
;
};
...
...
psrdada_cpp/effelsberg/edd/EDDRoach_merge.hpp
View file @
0c789044
...
...
@@ -12,7 +12,7 @@ namespace edd {
class
EDDRoach_merge
{
public:
EDDRoach_merge
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
nchunck
,
DadaWriteClient
&
writer
);
EDDRoach_merge
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
nchunck
,
int
nthreads
,
DadaWriteClient
&
writer
);
~
EDDRoach_merge
();
/**
...
...
@@ -38,7 +38,8 @@ public:
private:
std
::
size_t
_nsamps_per_heap
;
std
::
size_t
_nchunck
;
std
::
size_t
_nchunck
;
int
_nthreads
,
DadaWriteClient
&
_writer
;
};
...
...
psrdada_cpp/effelsberg/edd/EDDRoach_merge_leap.hpp
View file @
0c789044
...
...
@@ -12,7 +12,7 @@ namespace edd {
class
EDDRoach_merge_leap
{
public:
EDDRoach_merge_leap
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
nchunck
,
DadaWriteClient
&
writer
);
EDDRoach_merge_leap
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
nchunck
,
int
nthreads
,
DadaWriteClient
&
writer
);
~
EDDRoach_merge_leap
();
/**
...
...
@@ -38,7 +38,8 @@ public:
private:
std
::
size_t
_nsamps_per_heap
;
std
::
size_t
_nchunck
;
std
::
size_t
_nchunck
;
int
_nthreads
;
DadaWriteClient
&
_writer
;
};
...
...
psrdada_cpp/effelsberg/edd/dada_disk_sink_leap.hpp
View file @
0c789044
...
...
@@ -6,27 +6,27 @@
#include
"psrdada_cpp/dada_write_client.hpp"
#include
<fstream>
#include
<vector>
namespace
psrdada_cpp
{
namespace
psrdada_cpp
{
namespace
effelsberg
{
namespace
edd
{
class
DiskSinkLeap
{
public:
DiskSinkLeap
(
std
::
string
prefix
,
std
::
size_
t
nchan
);
~
DiskSinkLeap
();
void
init
(
RawBytes
&
block
);
bool
operator
()(
RawBytes
&
block
);
class
DiskSinkLeap
{
public:
DiskSinkLeap
(
std
::
string
prefix
,
in
t
nchan
);
~
DiskSinkLeap
();
void
init
(
RawBytes
&
block
);
bool
operator
()(
RawBytes
&
block
);
public:
std
::
string
_prefix
;
std
::
size_t
_counter
;
std
::
size_
t
_nchan
;
char
_header
[
4096
];
char
_start_time
[
1024
];
bool
first_block
;
std
::
vector
<
char
>
_transpose
;
std
::
vector
<
std
::
ofstream
>
_output_streams
;
};
public:
std
::
string
_prefix
;
std
::
size_t
_counter
;
in
t
_nchan
;
char
_header
[
4096
];
char
_start_time
[
1024
];
bool
first_block
;
std
::
vector
<
char
>
_transpose
;
std
::
vector
<
std
::
ofstream
>
_output_streams
;
};
}
// edd
}
// effelsberg
}
//namespace psrdada_cpp
...
...
psrdada_cpp/effelsberg/edd/src/EDDPolnMerge.cpp
View file @
0c789044
...
...
@@ -9,116 +9,116 @@ namespace psrdada_cpp {
namespace
effelsberg
{
namespace
edd
{
uint64_t
interleave
(
uint32_t
x
,
uint32_t
y
)
{
__m128i
xvec
=
_mm_cvtsi32_si128
(
x
);
__m128i
yvec
=
_mm_cvtsi32_si128
(
y
);
__m128i
interleaved
=
_mm_unpacklo_epi8
(
yvec
,
xvec
);
return
_mm_cvtsi128_si64
(
interleaved
);
}
uint64_t
interleave
(
uint32_t
x
,
uint32_t
y
)
{
__m128i
xvec
=
_mm_cvtsi32_si128
(
x
);
__m128i
yvec
=
_mm_cvtsi32_si128
(
y
);
__m128i
interleaved
=
_mm_unpacklo_epi8
(
yvec
,
xvec
);
return
_mm_cvtsi128_si64
(
interleaved
);
}
void
merge2pol
(
char
const
*
buf
,
char
*
out
)
{
uint8_t
*
qword0
=
(
uint8_t
*
)(
buf
);
uint8_t
*
qword1
=
(
uint8_t
*
)(
buf
)
+
4096
;
uint64_t
*
D
=
reinterpret_cast
<
uint64_t
*>
(
out
);
for
(
int
i
=
0
;
i
<
4096
/
4
;
i
++
)
{
uint8_t
*
qword0
=
(
uint8_t
*
)(
buf
);
uint8_t
*
qword1
=
(
uint8_t
*
)(
buf
)
+
4096
;
uint64_t
*
D
=
reinterpret_cast
<
uint64_t
*>
(
out
);
for
(
int
i
=
0
;
i
<
4096
/
4
;
i
++
)
{
uint32_t
*
S0
=
reinterpret_cast
<
uint32_t
*>
(
qword0
);
uint32_t
*
S1
=
reinterpret_cast
<
uint32_t
*>
(
qword1
);
uint32_t
*
S0
=
reinterpret_cast
<
uint32_t
*>
(
qword0
);
uint32_t
*
S1
=
reinterpret_cast
<
uint32_t
*>
(
qword1
);
*
D
++
=
interleave
(
*
S1
++
,
*
S0
++
);
qword0
+=
4
;
qword1
+=
4
;
}
}
qword0
+=
4
;
qword1
+=
4
;
}
}
EDDPolnMerge
::
EDDPolnMerge
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
int
nthreads
,
DadaWriteClient
&
writer
)
EDDPolnMerge
::
EDDPolnMerge
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
int
nthreads
,
DadaWriteClient
&
writer
)
:
_nsamps_per_heap
(
nsamps_per_heap
)
,
_npol
(
npol
)
,
_nthreads
(
nthreads
)
,
_nthreads
(
nthreads
)
,
_writer
(
writer
)
{
}
{
}
EDDPolnMerge
::~
EDDPolnMerge
()
{
}
EDDPolnMerge
::~
EDDPolnMerge
()
{
}
void
EDDPolnMerge
::
init
(
RawBytes
&
block
)
void
EDDPolnMerge
::
init
(
RawBytes
&
block
)
{
RawBytes
&
oblock
=
_writer
.
header_stream
().
next
();
if
(
block
.
used_bytes
()
>
oblock
.
total_bytes
())
{
RawBytes
&
oblock
=
_writer
.
header_stream
().
next
();
if
(
block
.
used_bytes
()
>
oblock
.
total_bytes
())
{
_writer
.
header_stream
().
release
();
throw
std
::
runtime_error
(
"Output DADA buffer does not have enough space for header"
);
}
std
::
memcpy
(
oblock
.
ptr
(),
block
.
ptr
(),
block
.
used_bytes
());
char
buffer
[
1024
];
ascii_header_get
(
block
.
ptr
(),
"SAMPLE_CLOCK_START"
,
"%s"
,
buffer
);
std
::
size_t
sample_clock_start
=
std
::
strtoul
(
buffer
,
NULL
,
0
);
ascii_header_get
(
block
.
ptr
(),
"CLOCK_SAMPLE"
,
"%s"
,
buffer
);
long
double
sample_clock
=
std
::
strtold
(
buffer
,
NULL
);
ascii_header_get
(
block
.
ptr
(),
"SYNC_TIME"
,
"%s"
,
buffer
);
long
double
sync_time
=
std
::
strtold
(
buffer
,
NULL
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start "
<<
sample_clock_start
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock "
<<
sample_clock
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sync_time "
<<
sync_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start / sample_clock "
<<
sample_clock_start
/
sample_clock
;
long
double
unix_time
=
sync_time
+
(
sample_clock_start
/
sample_clock
);
long
double
mjd_time
=
unix_time
/
86400
-
40587.5
;
char
time_buffer
[
80
];
std
::
time_t
unix_time_int
;
struct
std
::
tm
*
timeinfo
;
double
fractpart
,
intpart
;
fractpart
=
std
::
modf
(
static_cast
<
double
>
(
unix_time
)
,
&
intpart
);
unix_time_int
=
static_cast
<
std
::
time_t
>
(
intpart
);
timeinfo
=
std
::
gmtime
(
&
unix_time_int
);
std
::
strftime
(
time_buffer
,
80
,
"%Y-%m-%d-%H:%M:%S"
,
timeinfo
);
std
::
stringstream
utc_time_stamp
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"unix_time"
<<
unix_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"fractional part "
<<
fractpart
;
//BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
//utc_time_stamp<< time_buffer << "." <<fractpart;
utc_time_stamp
<<
time_buffer
<<
"."
<<
std
::
setw
(
10
)
<<
std
::
setfill
(
'0'
)
<<
std
::
size_t
(
fractpart
*
10000000000
)
<<
std
::
setfill
(
' '
);
//BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
//utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is start time in utc "
<<
utc_time_stamp
.
str
().
c_str
()
<<
"
\n
"
;
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set
(
oblock
.
ptr
(),
"UTC_START"
,
"%s"
,
utc_time_stamp
.
str
().
c_str
());
ascii_header_set
(
oblock
.
ptr
(),
"UNIX_TIME"
,
"%Lf"
,
unix_time
);
oblock
.
used_bytes
(
oblock
.
total_bytes
());
_writer
.
header_stream
().
release
();
throw
std
::
runtime_error
(
"Output DADA buffer does not have enough space for header"
);
}
std
::
memcpy
(
oblock
.
ptr
(),
block
.
ptr
(),
block
.
used_bytes
());
char
buffer
[
1024
];
ascii_header_get
(
block
.
ptr
(),
"SAMPLE_CLOCK_START"
,
"%s"
,
buffer
);
std
::
size_t
sample_clock_start
=
std
::
strtoul
(
buffer
,
NULL
,
0
);
ascii_header_get
(
block
.
ptr
(),
"CLOCK_SAMPLE"
,
"%s"
,
buffer
);
long
double
sample_clock
=
std
::
strtold
(
buffer
,
NULL
);
ascii_header_get
(
block
.
ptr
(),
"SYNC_TIME"
,
"%s"
,
buffer
);
long
double
sync_time
=
std
::
strtold
(
buffer
,
NULL
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start "
<<
sample_clock_start
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock "
<<
sample_clock
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sync_time "
<<
sync_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start / sample_clock "
<<
sample_clock_start
/
sample_clock
;
long
double
unix_time
=
sync_time
+
(
sample_clock_start
/
sample_clock
);
long
double
mjd_time
=
unix_time
/
86400
-
40587.5
;
char
time_buffer
[
80
];
std
::
time_t
unix_time_int
;
struct
std
::
tm
*
timeinfo
;
double
fractpart
,
intpart
;
fractpart
=
std
::
modf
(
static_cast
<
double
>
(
unix_time
)
,
&
intpart
);
unix_time_int
=
static_cast
<
std
::
time_t
>
(
intpart
);
timeinfo
=
std
::
gmtime
(
&
unix_time_int
);
std
::
strftime
(
time_buffer
,
80
,
"%Y-%m-%d-%H:%M:%S"
,
timeinfo
);
std
::
stringstream
utc_time_stamp
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"unix_time"
<<
unix_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"fractional part "
<<
fractpart
;
//BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
//utc_time_stamp<< time_buffer << "." <<fractpart;
utc_time_stamp
<<
time_buffer
<<
"."
<<
std
::
setw
(
10
)
<<
std
::
setfill
(
'0'
)
<<
std
::
size_t
(
fractpart
*
10000000000
)
<<
std
::
setfill
(
' '
);
//BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
//utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is start time in utc "
<<
utc_time_stamp
.
str
().
c_str
()
<<
"
\n
"
;
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set
(
oblock
.
ptr
(),
"UTC_START"
,
"%s"
,
utc_time_stamp
.
str
().
c_str
());
ascii_header_set
(
oblock
.
ptr
(),
"UNIX_TIME"
,
"%Lf"
,
unix_time
);
oblock
.
used_bytes
(
oblock
.
total_bytes
());
_writer
.
header_stream
().
release
();
}
bool
EDDPolnMerge
::
operator
()(
RawBytes
&
block
)
{
std:
size_t
nheap_groups
=
block
.
used_bytes
()
/
_npol
/
_nsamps_per_heap
;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
RawBytes
&
oblock
=
_writer
.
data_stream
().
next
();
bool
EDDPolnMerge
::
operator
()(
RawBytes
&
block
)
{
std:
size_t
nheap_groups
=
block
.
used_bytes
()
/
_npol
/
_nsamps_per_heap
;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
RawBytes
&
oblock
=
_writer
.
data_stream
().
next
();
if
(
block
.
used_bytes
()
>
oblock
.
total_bytes
())
{
_writer
.
data_stream
().
release
();
throw
std
::
runtime_error
(
"Output DADA buffer does not match with the input dada buffer"
);
}
if
(
block
.
used_bytes
()
>
oblock
.
total_bytes
())
{
_writer
.
data_stream
().
release
();
throw
std
::
runtime_error
(
"Output DADA buffer does not match with the input dada buffer"
);
}
#pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads)
for
(
std
::
size_t
kk
=
0
;
kk
<
block
.
used_bytes
()
/
_nsamps_per_heap
/
_npol
;
++
kk
)
{
#pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads)
for
(
std
::
size_t
kk
=
0
;
kk
<
block
.
used_bytes
()
/
_nsamps_per_heap
/
_npol
;
++
kk
)
{
char
*
buffer
=
block
.
ptr
()
+
_nsamps_per_heap
*
_npol
*
kk
;
merge2pol
(
buffer
,
oblock
.
ptr
()
+
kk
*
_npol
*
_nsamps_per_heap
);
}
oblock
.
used_bytes
(
block
.
used_bytes
());
_writer
.
data_stream
().
release
();
return
false
;
}
oblock
.
used_bytes
(
block
.
used_bytes
());
_writer
.
data_stream
().
release
();
return
false
;
}
}
//edd
}
//effelsberg
}
//psrdada_cpp
...
...
psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8.cpp
View file @
0c789044
...
...
@@ -13,12 +13,12 @@ uint64_t interleave(uint32_t x, uint32_t y) {
__m128i
xvec
=
_mm_cvtsi32_si128
(
x
);
__m128i
yvec
=
_mm_cvtsi32_si128
(
y
);
__m128i
interleaved
=
_mm_unpacklo_epi8
(
yvec
,
xvec
);
return
_mm_cvtsi128_si64
(
interleaved
);
}
return
_mm_cvtsi128_si64
(
interleaved
);
}
uint64_t
*
unpack5
(
uint64_t
*
qword
,
uint8_t
*
out
)
{
uint64_t
val
,
rest
;
uint64_t
val
,
rest
;
val
=
be64toh
(
*
qword
);
//printf("0x%016lX\n",val);
qword
++
;
...
...
@@ -37,7 +37,7 @@ uint64_t *unpack5(uint64_t *qword, uint8_t *out)
out
[
7
]
=
((
int64_t
)((
0x03FF000000000000
&
val
)
<<
6
)
>>
54
)
&
0xFF
;
out
[
8
]
=
((
int64_t
)((
0x0000FFC000000000
&
val
)
<<
16
)
>>
54
)
&
0xFF
;
out
[
9
]
=
((
int64_t
)((
0x0000003FF0000000
&
val
)
<<
26
)
>>
54
)
&
0xFF
;
out
[
10
]
=
((
int64_t
)((
0x000000000FFC0000
&
val
)
<<
36
)
>>
54
)
&
0xFF
;
out
[
10
]
=
((
int64_t
)((
0x000000000FFC0000
&
val
)
<<
36
)
>>
54
)
&
0xFF
;
out
[
11
]
=
((
int64_t
)((
0x000000000003FF00
&
val
)
<<
46
)
>>
54
)
&
0xFF
;
rest
=
(
0x00000000000000FF
&
val
)
<<
56
;
// 8 bits rest.
// 3rd:
...
...
@@ -76,90 +76,91 @@ uint64_t *unpack5(uint64_t *qword, uint8_t *out)
out
[
31
]
=
((
int64_t
)((
0x00000000000003FF
&
val
)
<<
54
)
>>
54
)
&
0xFF
;
rest
=
0
;
// No rest.
return
qword
;
}
}
void
handle_packet_numbers_4096x10_s
(
char
const
*
buf
,
char
*
out
)
{
// Print 4096 numbers of 10 bit signed integers.
{
// Print 4096 numbers of 10 bit signed integers.
uint64_t
val
,
rest
;
uint8_t
S0_8bit
[
32
];
uint8_t
S1_8bit
[
32
];
uint64_t
*
qword0
=
(
uint64_t
*
)(
buf
);
uint64_t
*
qword1
=
(
uint64_t
*
)(
buf
)
+
640
;
uint64_t
*
D
=
reinterpret_cast
<
uint64_t
*>
(
out
);
for
(
int
i
=
0
;
i
<
640
/
5
;
i
++
)
uint64_t
val
,
rest
;
uint8_t
S0_8bit
[
32
];
uint8_t
S1_8bit
[
32
];
uint64_t
*
qword0
=
(
uint64_t
*
)(
buf
);
uint64_t
*
qword1
=
(
uint64_t
*
)(
buf
)
+
640
;
uint64_t
*
D
=
reinterpret_cast
<
uint64_t
*>
(
out
);
for
(
int
i
=
0
;
i
<
640
/
5
;
i
++
)
{
qword0
=
unpack5
(
qword0
,
S0_8bit
);
qword1
=
unpack5
(
qword1
,
S1_8bit
);
uint32_t
*
S0
=
reinterpret_cast
<
uint32_t
*>
(
S0_8bit
);
uint32_t
*
S1
=
reinterpret_cast
<
uint32_t
*>
(
S1_8bit
);
qword0
=
unpack5
(
qword0
,
S0_8bit
);
qword1
=
unpack5
(
qword1
,
S1_8bit
);
uint32_t
*
S0
=
reinterpret_cast
<
uint32_t
*>
(
S0_8bit
);
uint32_t
*
S1
=
reinterpret_cast
<
uint32_t
*>
(
S1_8bit
);
for
(
std
::
size_t
ii
=
0
;
ii
<
8
;
++
ii
)
{
{
*
D
++
=
interleave
(
*
S1
++
,
*
S0
++
);
}
}
}
EDDPolnMerge10to8
::
EDDPolnMerge10to8
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
DadaWriteClient
&
writer
)
}
}
}
EDDPolnMerge10to8
::
EDDPolnMerge10to8
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
int
nthreads
,
DadaWriteClient
&
writer
)
:
_nsamps_per_heap
(
nsamps_per_heap
)
,
_npol
(
npol
)
,
_nthreads
(
nthreads
)
,
_writer
(
writer
)
{
}
{
}
EDDPolnMerge10to8
::~
EDDPolnMerge10to8
()
{
}
EDDPolnMerge10to8
::~
EDDPolnMerge10to8
()
{
}
void
EDDPolnMerge10to8
::
init
(
RawBytes
&
block
)
{
RawBytes
&
oblock
=
_writer
.
header_stream
().
next
();
if
(
block
.
used_bytes
()
>
oblock
.
total_bytes
())
{
_writer
.
header_stream
().
release
();
throw
std
::
runtime_error
(
"Output DADA buffer does not have enough space for header"
);
}
std
::
memcpy
(
oblock
.
ptr
(),
block
.
ptr
(),
block
.
used_bytes
());
char
buffer
[
1024
];
ascii_header_get
(
block
.
ptr
(),
"SAMPLE_CLOCK_START"
,
"%s"
,
buffer
);
std
::
size_t
sample_clock_start
=
std
::
strtoul
(
buffer
,
NULL
,
0
);
ascii_header_get
(
block
.
ptr
(),
"CLOCK_SAMPLE"
,
"%s"
,
buffer
);
long
double
sample_clock
=
std
::
strtold
(
buffer
,
NULL
);
ascii_header_get
(
block
.
ptr
(),
"SYNC_TIME"
,
"%s"
,
buffer
);
long
double
sync_time
=
std
::
strtold
(
buffer
,
NULL
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start "
<<
sample_clock_start
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock "
<<
sample_clock
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sync_time "
<<
sync_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start / sample_clock "
<<
sample_clock_start
/
sample_clock
;
long
double
unix_time
=
sync_time
+
(
sample_clock_start
/
sample_clock
);
long
double
mjd_time
=
unix_time
/
86400
-
40587.5
;
char
time_buffer
[
80
];
std
::
time_t
unix_time_int
;
struct
std
::
tm
*
timeinfo
;
double
fractpart
,
intpart
;
fractpart
=
std
::
modf
(
static_cast
<
double
>
(
unix_time
)
,
&
intpart
);
unix_time_int
=
static_cast
<
std
::
time_t
>
(
intpart
);
timeinfo
=
std
::
gmtime
(
&
unix_time_int
);
std
::
strftime
(
time_buffer
,
80
,
"%Y-%m-%d-%H:%M:%S"
,
timeinfo
);
std
::
stringstream
utc_time_stamp
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"unix_time"
<<
unix_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"fractional part "
<<
fractpart
;
utc_time_stamp
<<
time_buffer
<<
"."
<<
std
::
setw
(
10
)
<<
std
::
setfill
(
'0'
)
<<
std
::
size_t
(
fractpart
*
10000000000
)
<<
std
::
setfill
(
' '
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is start time in utc "
<<
utc_time_stamp
.
str
().
c_str
()
<<
"
\n
"
;
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set
(
oblock
.
ptr
(),
"UTC_START"
,
"%s"
,
utc_time_stamp
.
str
().
c_str
());
ascii_header_set
(
oblock
.
ptr
(),
"UNIX_TIME"
,
"%Lf"
,
unix_time
);
oblock
.
used_bytes
(
oblock
.
total_bytes
());
void
EDDPolnMerge10to8
::
init
(
RawBytes
&
block
)
{
RawBytes
&
oblock
=
_writer
.
header_stream
().
next
();
if
(
block
.
used_bytes
()
>
oblock
.
total_bytes
())
{
_writer
.
header_stream
().
release
();
BOOST_LOG_TRIVIAL
(
info
)
<<
"Output header released"
<<
"
\n
"
;
}
throw
std
::
runtime_error
(
"Output DADA buffer does not have enough space for header"
);
}
std
::
memcpy
(
oblock
.
ptr
(),
block
.
ptr
(),
block
.
used_bytes
());
char
buffer
[
1024
];
ascii_header_get
(
block
.
ptr
(),
"SAMPLE_CLOCK_START"
,
"%s"
,
buffer
);
std
::
size_t
sample_clock_start
=
std
::
strtoul
(
buffer
,
NULL
,
0
);
ascii_header_get
(
block
.
ptr
(),
"CLOCK_SAMPLE"
,
"%s"
,
buffer
);
long
double
sample_clock
=
std
::
strtold
(
buffer
,
NULL
);
ascii_header_get
(
block
.
ptr
(),
"SYNC_TIME"
,
"%s"
,
buffer
);
long
double
sync_time
=
std
::
strtold
(
buffer
,
NULL
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start "
<<
sample_clock_start
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock "
<<
sample_clock
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sync_time "
<<
sync_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start / sample_clock "
<<
sample_clock_start
/
sample_clock
;
long
double
unix_time
=
sync_time
+
(
sample_clock_start
/
sample_clock
);
long
double
mjd_time
=
unix_time
/
86400
-
40587.5
;
char
time_buffer
[
80
];
std
::
time_t
unix_time_int
;
struct
std
::
tm
*
timeinfo
;
double
fractpart
,
intpart
;
fractpart
=
std
::
modf
(
static_cast
<
double
>
(
unix_time
)
,
&
intpart
);
unix_time_int
=
static_cast
<
std
::
time_t
>
(
intpart
);
timeinfo
=
std
::
gmtime
(
&
unix_time_int
);
std
::
strftime
(
time_buffer
,
80
,
"%Y-%m-%d-%H:%M:%S"
,
timeinfo
);
std
::
stringstream
utc_time_stamp
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"unix_time"
<<
unix_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"fractional part "
<<
fractpart
;
utc_time_stamp
<<
time_buffer
<<
"."
<<
std
::
setw
(
10
)
<<
std
::
setfill
(
'0'
)
<<
std
::
size_t
(
fractpart
*
10000000000
)
<<
std
::
setfill
(
' '
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is start time in utc "
<<
utc_time_stamp
.
str
().
c_str
()
<<
"
\n
"
;
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set
(
oblock
.
ptr
(),
"UTC_START"
,
"%s"
,
utc_time_stamp
.
str
().
c_str
());
ascii_header_set
(
oblock
.
ptr
(),
"UNIX_TIME"
,
"%Lf"
,
unix_time
);
oblock
.
used_bytes
(
oblock
.
total_bytes
());
_writer
.
header_stream
().
release
();
BOOST_LOG_TRIVIAL
(
info
)
<<
"Output header released"
<<
"
\n
"
;
}
bool
EDDPolnMerge10to8
::
operator
()(
RawBytes
&
block
)
{
std
::
cout
<<
"Beginning of the operator"
<<
std
::
endl
;
std:
size_t
nheap_groups
=
0.8
*
block
.
used_bytes
()
/
_npol
/
_nsamps_per_heap
;
RawBytes
&
oblock
=
_writer
.
data_stream
().
next
();
bool
EDDPolnMerge10to8
::
operator
()(
RawBytes
&
block
)
{
std
::
cout
<<
"Beginning of the operator"
<<
std
::
endl
;
std:
size_t
nheap_groups
=
0.8
*
block
.
used_bytes
()
/
_npol
/
_nsamps_per_heap
;
RawBytes
&
oblock
=
_writer
.
data_stream
().
next
();
// if (block.used_bytes() > oblock.total_bytes())
// {
...
...
@@ -167,21 +168,21 @@ void handle_packet_numbers_4096x10_s(char const *buf, char *out)
// throw std::runtime_error("Output DADA buffer does not match with the input dada buffer");
// }
/* convert 10 bit to 8 bit data here */
BOOST_LOG_TRIVIAL
(
debug
)
<<
"block.used_bytes() = "
<<
block
.
used_bytes
();
/* convert 10 bit to 8 bit data here */
BOOST_LOG_TRIVIAL
(
debug
)
<<
"block.used_bytes() = "
<<
block
.
used_bytes
();
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Entering unpack loop"
;
#pragma omp parallel for schedule(dynamic,
4
) num_threads(
4
)
for
(
std
::
size_t
kk
=
0
;
kk
<
block
.
used_bytes
()
/
5120
/
2
;
++
kk
)
{
#pragma omp parallel for schedule(dynamic,
_nthreads
) num_threads(
_nthreads
)
for
(
std
::
size_t
kk
=
0
;
kk
<
block
.
used_bytes
()
/
5120
/
2
;
++
kk
)
{
char
*
buffer
=
block
.
ptr
()
+
5120
*
2
*
kk
;
handle_packet_numbers_4096x10_s
(
buffer
,
oblock
.
ptr
()
+
kk
*
8192
);
}
oblock
.
used_bytes
(
block
.
used_bytes
()
*
0.8
);
}
oblock
.
used_bytes
(
block
.
used_bytes
()
*
0.8
);
//oblock.used_bytes(block.used_bytes());
_writer
.
data_stream
().
release
();
return
false
;
}
return
false
;
}
}
//edd
}
//effelsberg
}
//psrdada_cpp
psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8_cli.cpp
View file @
0c789044
...
...
@@ -11,9 +11,9 @@ using namespace psrdada_cpp;
namespace
{
const
size_t
ERROR_IN_COMMAND_LINE
=
1
;
const
size_t
SUCCESS
=
0
;
const
size_t
ERROR_UNHANDLED_EXCEPTION
=
2
;
const
size_t
ERROR_IN_COMMAND_LINE
=
1
;
const
size_t
SUCCESS
=
0
;
const
size_t
ERROR_UNHANDLED_EXCEPTION
=
2
;
}
// namespace
...
...
@@ -22,9 +22,10 @@ int main(int argc, char** argv)
try
{
key_t
input_key
;
key_t
output_key
;
std
::
size_t
npol
;
std
::
size_t
nsamps_per_heap
;
key_t
output_key
;
std
::
size_t
npol
;
std
::
size_t
nsamps_per_heap
;
int
nthreads
;
/** Define and parse the program options