Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
MPIfR-BDG
psrdada_cpp
Commits
6ced6c36
Commit
6ced6c36
authored
Jan 26, 2021
by
Jason Wu
Browse files
multithread pol merge and 8 bit unpacker update
parent
8617011d
Pipeline
#92227
failed with stages
in 1 minute and 10 seconds
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
psrdada_cpp/effelsberg/edd/src/EDDPolnMerge.cpp
View file @
6ced6c36
...
...
@@ -16,9 +16,25 @@ namespace edd {
return
_mm_cvtsi128_si64
(
interleaved
);
}
EDDPolnMerge
::
EDDPolnMerge
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
DadaWriteClient
&
writer
)
void
merge2pol
(
char
const
*
buf
,
char
*
out
)
{
uint8_t
*
qword0
=
(
uint8_t
*
)(
buf
);
uint8_t
*
qword1
=
(
uint8_t
*
)(
buf
)
+
4096
;
uint64_t
*
D
=
reinterpret_cast
<
uint64_t
*>
(
out
);
for
(
int
i
=
0
;
i
<
4096
/
4
;
i
++
)
{
uint32_t
*
S0
=
reinterpret_cast
<
uint32_t
*>
(
qword0
);
uint32_t
*
S1
=
reinterpret_cast
<
uint32_t
*>
(
qword1
);
*
D
++
=
interleave
(
*
S1
++
,
*
S0
++
);
qword0
+=
4
;
qword1
+=
4
;
}
}
EDDPolnMerge
::
EDDPolnMerge
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
std
::
size_t
nthreads
,
DadaWriteClient
&
writer
)
:
_nsamps_per_heap
(
nsamps_per_heap
)
,
_npol
(
npol
)
,
_nthreads
(
nthreads
)
,
_writer
(
writer
)
{
}
...
...
@@ -37,41 +53,54 @@ namespace edd {
}
std
::
memcpy
(
oblock
.
ptr
(),
block
.
ptr
(),
block
.
used_bytes
());
char
buffer
[
1024
];
ascii_header_get
(
block
.
ptr
(),
"SAMPLE_CLOCK_START"
,
"%s"
,
buffer
);
std
::
size_t
sample_clock_start
=
std
::
strtoul
(
buffer
,
NULL
,
0
);
ascii_header_get
(
block
.
ptr
(),
"CLOCK_SAMPLE"
,
"%s"
,
buffer
);
long
double
sample_clock
=
std
::
strtold
(
buffer
,
NULL
);
ascii_header_get
(
block
.
ptr
(),
"SYNC_TIME"
,
"%s"
,
buffer
);
long
double
sync_time
=
std
::
strtold
(
buffer
,
NULL
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Sample_clock_start "
<<
sample_clock_start
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Sample_clock "
<<
sample_clock
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Sync_time "
<<
sync_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Sample_clock_start / sample_clock "
<<
sample_clock_start
/
sample_clock
;
long
double
unix_time
=
sync_time
+
(
sample_clock_start
/
sample_clock
);
long
double
mjd_time
=
unix_time
/
86400
-
40587.5
;
char
time_buffer
[
80
];
std
::
time_t
unix_time_int
;
struct
std
::
tm
*
timeinfo
;
double
fractpart
,
intpart
;
fractpart
=
std
::
modf
(
static_cast
<
double
>
(
unix_time
)
,
&
intpart
);
unix_time_int
=
static_cast
<
std
::
time_t
>
(
intpart
);
timeinfo
=
std
::
gmtime
(
&
unix_time_int
);
std
::
strftime
(
time_buffer
,
80
,
"%Y-%m-%d-%H:%M:%S"
,
timeinfo
);
std
::
stringstream
utc_time_stamp
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"unix_time"
<<
unix_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"fractional part "
<<
fractpart
;
utc_time_stamp
<<
time_buffer
<<
"."
<<
std
::
setw
(
10
)
<<
std
::
setfill
(
'0'
)
<<
std
::
size_t
(
fractpart
*
10000000000
)
<<
std
::
setfill
(
' '
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is start time in utc "
<<
utc_time_stamp
.
str
().
c_str
()
<<
"
\n
"
;
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set
(
oblock
.
ptr
(),
"UTC_START"
,
"%s"
,
utc_time_stamp
.
str
().
c_str
());
ascii_header_set
(
oblock
.
ptr
(),
"UNIX_TIME"
,
"%Lf"
,
unix_time
);
oblock
.
used_bytes
(
oblock
.
total_bytes
());
_writer
.
header_stream
().
release
();
}
ascii_header_get
(
block
.
ptr
(),
"SAMPLE_CLOCK_START"
,
"%s"
,
buffer
);
std
::
size_t
sample_clock_start
=
std
::
strtoul
(
buffer
,
NULL
,
0
);
ascii_header_get
(
block
.
ptr
(),
"CLOCK_SAMPLE"
,
"%s"
,
buffer
);
long
double
sample_clock
=
std
::
strtold
(
buffer
,
NULL
);
ascii_header_get
(
block
.
ptr
(),
"SYNC_TIME"
,
"%s"
,
buffer
);
long
double
sync_time
=
std
::
strtold
(
buffer
,
NULL
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start "
<<
sample_clock_start
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock "
<<
sample_clock
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sync_time "
<<
sync_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is sample_clock_start / sample_clock "
<<
sample_clock_start
/
sample_clock
;
long
double
unix_time
=
sync_time
+
(
sample_clock_start
/
sample_clock
);
long
double
mjd_time
=
unix_time
/
86400
-
40587.5
;
char
time_buffer
[
80
];
std
::
time_t
unix_time_int
;
struct
std
::
tm
*
timeinfo
;
double
fractpart
,
intpart
;
fractpart
=
std
::
modf
(
static_cast
<
double
>
(
unix_time
)
,
&
intpart
);
unix_time_int
=
static_cast
<
std
::
time_t
>
(
intpart
);
timeinfo
=
std
::
gmtime
(
&
unix_time_int
);
std
::
strftime
(
time_buffer
,
80
,
"%Y-%m-%d-%H:%M:%S"
,
timeinfo
);
std
::
stringstream
utc_time_stamp
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"unix_time"
<<
unix_time
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"fractional part "
<<
fractpart
;
//BOOST_LOG_TRIVIAL(debug) << "fractional part ." << static_cast<std::size_t>(fractpart*10000000000);
//utc_time_stamp<< time_buffer << "." <<fractpart;
utc_time_stamp
<<
time_buffer
<<
"."
<<
std
::
setw
(
10
)
<<
std
::
setfill
(
'0'
)
<<
std
::
size_t
(
fractpart
*
10000000000
)
<<
std
::
setfill
(
' '
);
//BOOST_LOG_TRIVIAL(debug) << "fractional part" <<static_cast<std::size_t>(fractpart * 10000000000);
//utc_time_stamp<< time_buffer << "." << static_cast<std::size_t>(fractpart * 10000000000);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is start time in utc "
<<
utc_time_stamp
.
str
().
c_str
()
<<
"
\n
"
;
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set
(
oblock
.
ptr
(),
"UTC_START"
,
"%s"
,
utc_time_stamp
.
str
().
c_str
());
ascii_header_set
(
oblock
.
ptr
(),
"UNIX_TIME"
,
"%Lf"
,
unix_time
);
oblock
.
used_bytes
(
oblock
.
total_bytes
());
_writer
.
header_stream
().
release
();
}
bool
EDDPolnMerge
::
operator
()(
RawBytes
&
block
)
{
std:
size_t
nheap_groups
=
block
.
used_bytes
()
/
_npol
/
_nsamps_per_heap
;
/**
if (block.used_bytes() < block.total_bytes())
{
BOOST_LOG_TRIVIAL (debug) << "Reach end of data";
_writer.data_stream().next();
_writer.data_stream().release();
return true;
}
**/
RawBytes
&
oblock
=
_writer
.
data_stream
().
next
();
if
(
block
.
used_bytes
()
>
oblock
.
total_bytes
())
...
...
@@ -79,20 +108,13 @@ namespace edd {
_writer
.
data_stream
().
release
();
throw
std
::
runtime_error
(
"Output DADA buffer does not match with the input dada buffer"
);
}
uint32_t
*
S0
=
reinterpret_cast
<
uint32_t
*>
(
block
.
ptr
());
uint32_t
*
S1
=
reinterpret_cast
<
uint32_t
*>
(
block
.
ptr
()
+
_nsamps_per_heap
);
uint64_t
*
D
=
reinterpret_cast
<
uint64_t
*>
(
oblock
.
ptr
());
for
(
std
::
size_t
jj
=
0
;
jj
<
nheap_groups
;
++
jj
)
{
for
(
std
::
size_t
ii
=
0
;
ii
<
_nsamps_per_heap
/
sizeof
(
uint32_t
);
++
ii
)
{
*
D
++
=
interleave
(
*
S1
++
,
*
S0
++
);
}
S0
+=
_nsamps_per_heap
/
sizeof
(
uint32_t
);
S1
+=
_nsamps_per_heap
/
sizeof
(
uint32_t
);
}
#pragma omp parallel for schedule(dynamic, _nthreads) num_threads(_nthreads)
for
(
std
::
size_t
kk
=
0
;
kk
<
block
.
used_bytes
()
/
_nsamps_per_heap
/
_npol
;
++
kk
)
{
char
*
buffer
=
block
.
ptr
()
+
_nsamps_per_heap
*
_npol
*
kk
;
merge2pol
(
buffer
,
oblock
.
ptr
()
+
kk
*
_npol
*
_nsamps_per_heap
);
}
oblock
.
used_bytes
(
block
.
used_bytes
());
_writer
.
data_stream
().
release
();
return
false
;
...
...
psrdada_cpp/effelsberg/edd/src/EDDPolnMerge10to8.cpp
View file @
6ced6c36
...
...
@@ -9,82 +9,98 @@ namespace psrdada_cpp {
namespace
effelsberg
{
namespace
edd
{
uint64_t
interleave
(
uint32_t
x
,
uint32_t
y
)
{
uint64_t
interleave
(
uint32_t
x
,
uint32_t
y
)
{
__m128i
xvec
=
_mm_cvtsi32_si128
(
x
);
__m128i
yvec
=
_mm_cvtsi32_si128
(
y
);
__m128i
interleaved
=
_mm_unpacklo_epi8
(
yvec
,
xvec
);
return
_mm_cvtsi128_si64
(
interleaved
);
}
void
handle_packet_numbers_4096x10_s
(
char
const
*
buf
,
char
*
out
)
{
// Print 4096 numbers of 10 bit signed integers.
int
i
;
int16_t
adc
;
uint64_t
val
,
rest
;
uint64_t
*
qword
;
qword
=
(
uint64_t
*
)(
buf
);
for
(
i
=
0
;
i
<
640
/
5
;
i
++
)
{
// 1st:
uint64_t
*
unpack5
(
uint64_t
*
qword
,
uint8_t
*
out
)
{
uint64_t
val
,
rest
;
val
=
be64toh
(
*
qword
);
//printf("0x%016lX\n",val);
qword
++
;
out
[
i
]
=
((
int64_t
)((
0xFFC0000000000000
&
val
)
<<
0
)
>>
54
)
&
0x
3FC
;
out
[
i
+
1
]
=
((
int64_t
)((
0x003FF00000000000
&
val
)
<<
10
)
>>
54
)
&
0x
3FC
;
out
[
i
+
2
]
=
((
int64_t
)((
0x00000FFC00000000
&
val
)
<<
20
)
>>
54
)
&
0x
3FC
;
out
[
i
+
3
]
=
((
int64_t
)((
0x00000003FF000000
&
val
)
<<
30
)
>>
54
)
&
0x
3FC
;
out
[
i
+
4
]
=
((
int64_t
)((
0x0000000000FFC000
&
val
)
<<
40
)
>>
54
)
&
0x
3FC
;
out
[
i
+
5
]
=
((
int64_t
)((
0x0000000000003FF0
&
val
)
<<
50
)
>>
54
)
&
0x
3FC
;
out
[
0
]
=
((
int64_t
)((
0xFFC0000000000000
&
val
)
<<
0
)
>>
54
)
&
0x
FF
;
out
[
1
]
=
((
int64_t
)((
0x003FF00000000000
&
val
)
<<
10
)
>>
54
)
&
0x
FF
;
out
[
2
]
=
((
int64_t
)((
0x00000FFC00000000
&
val
)
<<
20
)
>>
54
)
&
0x
FF
;
out
[
3
]
=
((
int64_t
)((
0x00000003FF000000
&
val
)
<<
30
)
>>
54
)
&
0x
FF
;
out
[
4
]
=
((
int64_t
)((
0x0000000000FFC000
&
val
)
<<
40
)
>>
54
)
&
0x
FF
;
out
[
5
]
=
((
int64_t
)((
0x0000000000003FF0
&
val
)
<<
50
)
>>
54
)
&
0x
FF
;
rest
=
(
0x000000000000000F
&
val
)
<<
60
;
// 4 bits rest.
// 2nd:
val
=
be64toh
(
*
qword
);
//printf("0x%016lX\n",val);
qword
++
;
out
[
i
+
6
]
=
((
int64_t
)(((
0xFC00000000000000
&
val
)
>>
4
)
|
rest
)
>>
54
)
&
0x
3FC
;
out
[
i
+
7
]
=
((
int64_t
)((
0x03FF000000000000
&
val
)
<<
6
)
>>
54
)
&
0x
3FC
;
out
[
i
+
8
]
=
((
int64_t
)((
0x0000FFC000000000
&
val
)
<<
16
)
>>
54
)
&
0x
3FC
;
out
[
i
+
9
]
=
((
int64_t
)((
0x0000003FF0000000
&
val
)
<<
26
)
>>
54
)
&
0x
3FC
;
out
[
i
+
10
]
=
((
int64_t
)((
0x000000000FFC0000
&
val
)
<<
36
)
>>
54
)
&
0x
3FC
;
out
[
i
+
11
]
=
((
int64_t
)((
0x000000000003FF00
&
val
)
<<
46
)
>>
54
)
&
0x
3FC
;
rest
=
(
0x00000000000000FF
&
val
)
<<
56
;
// 8 bits rest.
out
[
6
]
=
((
int64_t
)(((
0xFC00000000000000
&
val
)
>>
4
)
|
rest
)
>>
54
)
&
0x
FF
;
out
[
7
]
=
((
int64_t
)((
0x03FF000000000000
&
val
)
<<
6
)
>>
54
)
&
0x
FF
;
out
[
8
]
=
((
int64_t
)((
0x0000FFC000000000
&
val
)
<<
16
)
>>
54
)
&
0x
FF
;
out
[
9
]
=
((
int64_t
)((
0x0000003FF0000000
&
val
)
<<
26
)
>>
54
)
&
0x
FF
;
out
[
10
]
=
((
int64_t
)((
0x000000000FFC0000
&
val
)
<<
36
)
>>
54
)
&
0x
FF
;
out
[
11
]
=
((
int64_t
)((
0x000000000003FF00
&
val
)
<<
46
)
>>
54
)
&
0x
FF
;
rest
=
(
0x00000000000000FF
&
val
)
<<
56
;
// 8 bits rest.
// 3rd:
val
=
be64toh
(
*
qword
);
//printf("0x%016lX\n",val);
qword
++
;
out
[
i
+
12
]
=
((
int64_t
)(((
0xC000000000000000
&
val
)
>>
8
)
|
rest
)
>>
54
)
&
0x
3FC
;
out
[
i
+
13
]
=
((
int64_t
)((
0x3FF0000000000000
&
val
)
<<
2
)
>>
54
)
&
0x
3FC
;
out
[
i
+
14
]
=
((
int64_t
)((
0x000FFC0000000000
&
val
)
<<
12
)
>>
54
)
&
0x
3FC
;
out
[
i
+
15
]
=
((
int64_t
)((
0x000003FF00000000
&
val
)
<<
22
)
>>
54
)
&
0x
3FC
;
out
[
i
+
16
]
=
((
int64_t
)((
0x00000000FFC00000
&
val
)
<<
32
)
>>
54
)
&
0x
3FC
;
out
[
i
+
17
]
=
((
int64_t
)((
0x00000000003FF000
&
val
)
<<
42
)
>>
54
)
&
0x
3FC
;
out
[
i
+
18
]
=
((
int64_t
)((
0x0000000000000FFC
&
val
)
<<
52
)
>>
54
)
&
0x
3FC
;
out
[
12
]
=
((
int64_t
)(((
0xC000000000000000
&
val
)
>>
8
)
|
rest
)
>>
54
)
&
0x
FF
;
out
[
13
]
=
((
int64_t
)((
0x3FF0000000000000
&
val
)
<<
2
)
>>
54
)
&
0x
FF
;
out
[
14
]
=
((
int64_t
)((
0x000FFC0000000000
&
val
)
<<
12
)
>>
54
)
&
0x
FF
;
out
[
15
]
=
((
int64_t
)((
0x000003FF00000000
&
val
)
<<
22
)
>>
54
)
&
0x
FF
;
out
[
16
]
=
((
int64_t
)((
0x00000000FFC00000
&
val
)
<<
32
)
>>
54
)
&
0x
FF
;
out
[
17
]
=
((
int64_t
)((
0x00000000003FF000
&
val
)
<<
42
)
>>
54
)
&
0x
FF
;
out
[
18
]
=
((
int64_t
)((
0x0000000000000FFC
&
val
)
<<
52
)
>>
54
)
&
0x
FF
;
rest
=
(
0x0000000000000003
&
val
)
<<
62
;
// 2 bits rest.
// 4th:
val
=
be64toh
(
*
qword
);
//printf("0x%016lX\n",val);
qword
++
;
out
[
i
+
19
]
=
((
int64_t
)(((
0xFF00000000000000
&
val
)
>>
2
)
|
rest
)
>>
54
)
&
0x
3FC
;
out
[
i
+
20
]
=
((
int64_t
)((
0x00FFC00000000000
&
val
)
<<
8
)
>>
54
)
&
0x
3FC
;
out
[
i
+
21
]
=
((
int64_t
)((
0x00003FF000000000
&
val
)
<<
18
)
>>
54
)
&
0x
3FC
;
out
[
i
+
22
]
=
((
int64_t
)((
0x0000000FFC000000
&
val
)
<<
28
)
>>
54
)
&
0x
3FC
;
out
[
i
+
23
]
=
((
int64_t
)((
0x0000000003FF0000
&
val
)
<<
38
)
>>
54
)
&
0x
3FC
;
out
[
i
+
24
]
=
((
int64_t
)((
0x000000000000FFC0
&
val
)
<<
48
)
>>
54
)
&
0x
3FC
;
out
[
19
]
=
((
int64_t
)(((
0xFF00000000000000
&
val
)
>>
2
)
|
rest
)
>>
54
)
&
0x
FF
;
out
[
20
]
=
((
int64_t
)((
0x00FFC00000000000
&
val
)
<<
8
)
>>
54
)
&
0x
FF
;
out
[
21
]
=
((
int64_t
)((
0x00003FF000000000
&
val
)
<<
18
)
>>
54
)
&
0x
FF
;
out
[
22
]
=
((
int64_t
)((
0x0000000FFC000000
&
val
)
<<
28
)
>>
54
)
&
0x
FF
;
out
[
23
]
=
((
int64_t
)((
0x0000000003FF0000
&
val
)
<<
38
)
>>
54
)
&
0x
FF
;
out
[
24
]
=
((
int64_t
)((
0x000000000000FFC0
&
val
)
<<
48
)
>>
54
)
&
0x
FF
;
rest
=
(
0x000000000000003F
&
val
)
<<
58
;
// 6 bits rest.
// 5th:
val
=
be64toh
(
*
qword
);
//printf("0x%016lX\n",val);
qword
++
;
out
[
i
+
25
]
=
((
int64_t
)(((
0xF000000000000000
&
val
)
>>
6
)
|
rest
)
>>
54
)
&
0x
3FC
;
out
[
i
+
26
]
=
((
int64_t
)((
0x0FFC000000000000
&
val
)
<<
4
)
>>
54
)
&
0x
3FC
;
out
[
i
+
27
]
=
((
int64_t
)((
0x0003FF0000000000
&
val
)
<<
14
)
>>
54
)
&
0x
3FC
;
out
[
i
+
28
]
=
((
int64_t
)((
0x000000FFC0000000
&
val
)
<<
24
)
>>
54
)
&
0x
3FC
;
out
[
i
+
29
]
=
((
int64_t
)((
0x000000003FF00000
&
val
)
<<
34
)
>>
54
)
&
0x
3FC
;
out
[
i
+
30
]
=
((
int64_t
)((
0x00000000000FFC00
&
val
)
<<
44
)
>>
54
)
&
0x
3FC
;
out
[
i
+
31
]
=
((
int64_t
)((
0x00000000000003FF
&
val
)
<<
54
)
>>
54
)
&
0x
3FC
;
out
[
25
]
=
((
int64_t
)(((
0xF000000000000000
&
val
)
>>
6
)
|
rest
)
>>
54
)
&
0x
FF
;
out
[
26
]
=
((
int64_t
)((
0x0FFC000000000000
&
val
)
<<
4
)
>>
54
)
&
0x
FF
;
out
[
27
]
=
((
int64_t
)((
0x0003FF0000000000
&
val
)
<<
14
)
>>
54
)
&
0x
FF
;
out
[
28
]
=
((
int64_t
)((
0x000000FFC0000000
&
val
)
<<
24
)
>>
54
)
&
0x
FF
;
out
[
29
]
=
((
int64_t
)((
0x000000003FF00000
&
val
)
<<
34
)
>>
54
)
&
0x
FF
;
out
[
30
]
=
((
int64_t
)((
0x00000000000FFC00
&
val
)
<<
44
)
>>
54
)
&
0x
FF
;
out
[
31
]
=
((
int64_t
)((
0x00000000000003FF
&
val
)
<<
54
)
>>
54
)
&
0x
FF
;
rest
=
0
;
// No rest.
}
}
return
qword
;
}
void
handle_packet_numbers_4096x10_s
(
char
const
*
buf
,
char
*
out
)
{
// Print 4096 numbers of 10 bit signed integers.
uint64_t
val
,
rest
;
uint8_t
S0_8bit
[
32
];
uint8_t
S1_8bit
[
32
];
uint64_t
*
qword0
=
(
uint64_t
*
)(
buf
);
uint64_t
*
qword1
=
(
uint64_t
*
)(
buf
)
+
640
;
uint64_t
*
D
=
reinterpret_cast
<
uint64_t
*>
(
out
);
for
(
int
i
=
0
;
i
<
640
/
5
;
i
++
)
{
qword0
=
unpack5
(
qword0
,
S0_8bit
);
qword1
=
unpack5
(
qword1
,
S1_8bit
);
uint32_t
*
S0
=
reinterpret_cast
<
uint32_t
*>
(
S0_8bit
);
uint32_t
*
S1
=
reinterpret_cast
<
uint32_t
*>
(
S1_8bit
);
for
(
std
::
size_t
ii
=
0
;
ii
<
8
;
++
ii
)
{
*
D
++
=
interleave
(
*
S1
++
,
*
S0
++
);
}
}
}
EDDPolnMerge10to8
::
EDDPolnMerge10to8
(
std
::
size_t
nsamps_per_heap
,
std
::
size_t
npol
,
DadaWriteClient
&
writer
)
:
_nsamps_per_heap
(
nsamps_per_heap
)
,
_npol
(
npol
)
...
...
@@ -131,35 +147,39 @@ void handle_packet_numbers_4096x10_s(char const *buf, char *out)
BOOST_LOG_TRIVIAL
(
debug
)
<<
"fractional part "
<<
fractpart
;
utc_time_stamp
<<
time_buffer
<<
"."
<<
std
::
setw
(
10
)
<<
std
::
setfill
(
'0'
)
<<
std
::
size_t
(
fractpart
*
10000000000
)
<<
std
::
setfill
(
' '
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"this is start time in utc "
<<
utc_time_stamp
.
str
().
c_str
()
<<
"
\n
"
;
// std::cout << "this is sync_time MJD "<< mjd_time<< "\n";
ascii_header_set
(
oblock
.
ptr
(),
"UTC_START"
,
"%s"
,
utc_time_stamp
.
str
().
c_str
());
ascii_header_set
(
oblock
.
ptr
(),
"UNIX_TIME"
,
"%Lf"
,
unix_time
);
oblock
.
used_bytes
(
oblock
.
total_bytes
());
_writer
.
header_stream
().
release
();
BOOST_LOG_TRIVIAL
(
info
)
<<
"Output header released"
<<
"
\n
"
;
BOOST_LOG_TRIVIAL
(
info
)
<<
"Output header released"
<<
"
\n
"
;
}
bool
EDDPolnMerge10to8
::
operator
()(
RawBytes
&
block
)
{
std
::
cout
<<
"Beginning of the operator"
<<
std
::
endl
;
std:
size_t
nheap_groups
=
0.8
*
block
.
used_bytes
()
/
_npol
/
_nsamps_per_heap
;
RawBytes
&
oblock
=
_writer
.
data_stream
().
next
();
BOOST_LOG_TRIVIAL
(
debug
)
<<
"block.used_bytes() = "
<<
block
.
used_bytes
();
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Entering unpack loop"
;
// if (block.used_bytes() > oblock.total_bytes())
// {
// _writer.data_stream().release();
// throw std::runtime_error("Output DADA buffer does not match with the input dada buffer");
// }
#pragma omp parallel for schedule(dynamic, 4) num_threads(4)
for
(
std
::
size_t
kk
=
0
;
kk
<
block
.
used_bytes
()
/
5120
/
2
;
++
kk
)
{
char
*
buffer_pol0
=
block
.
ptr
()
+
5120
*
kk
*
2
;
handle_packet_numbers_4096x10_s
(
buffer_pol0
,
oblock
.
ptr
()
+
kk
*
2
*
4096
);
char
*
buffer_pol1
=
block
.
ptr
()
+
(
5120
*
kk
*
2
)
+
5120
;
handle_packet_numbers_4096x10_s
(
buffer_pol1
,
oblock
.
ptr
()
+
kk
*
2
*
4096
+
4096
);
}
oblock
.
used_bytes
(
block
.
used_bytes
()
*
0.8
);
_writer
.
data_stream
().
release
();
/* convert 10 bit to 8 bit data here */
BOOST_LOG_TRIVIAL
(
debug
)
<<
"block.used_bytes() = "
<<
block
.
used_bytes
();
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Entering unpack loop"
;
#pragma omp parallel for schedule(dynamic, 4) num_threads(4)
for
(
std
::
size_t
kk
=
0
;
kk
<
block
.
used_bytes
()
/
5120
/
2
;
++
kk
)
{
char
*
buffer
=
block
.
ptr
()
+
5120
*
2
*
kk
;
handle_packet_numbers_4096x10_s
(
buffer
,
oblock
.
ptr
()
+
kk
*
8192
);
}
oblock
.
used_bytes
(
block
.
used_bytes
()
*
0.8
);
//oblock.used_bytes(block.used_bytes());
_writer
.
data_stream
().
release
();
return
false
;
}
}
//edd
...
...
psrdada_cpp/effelsberg/edd/src/EDDPolnMerge_cli.cpp
View file @
6ced6c36
...
...
@@ -22,10 +22,10 @@ int main(int argc, char** argv)
try
{
key_t
input_key
;
key_t
output_key
;
std
::
size_t
npol
;
std
::
size_t
nsamps_per_heap
;
key_t
output_key
;
std
::
size_t
npol
;
std
::
size_t
nsamps_per_heap
;
std
::
size_t
nthreads
;
/** Define and parse the program options
*/
namespace
po
=
boost
::
program_options
;
...
...
@@ -52,6 +52,9 @@ int main(int argc, char** argv)
(
"npol,p"
,
po
::
value
<
std
::
size_t
>
(
&
npol
)
->
default_value
(
2
),
"Value of number of pol"
)
(
"nthreads,n"
,
po
::
value
<
std
::
size_t
>
(
&
nthreads
)
->
default_value
(
2
),
"Value of number of threads"
)
(
"nsamps_per_heap,n"
,
po
::
value
<
std
::
size_t
>
(
&
nsamps_per_heap
)
->
default_value
(
4096
),
"Value of samples per heap"
)
...
...
@@ -85,8 +88,8 @@ int main(int argc, char** argv)
* All the application code goes here
*/
MultiLog
log
(
"edd::EDDPolnMerge"
);
DadaWriteClient
output
(
output_key
,
log
);
effelsberg
::
edd
::
EDDPolnMerge
merger
(
nsamps_per_heap
,
npol
,
output
);
DadaWriteClient
output
(
output_key
,
log
);
effelsberg
::
edd
::
EDDPolnMerge
merger
(
nsamps_per_heap
,
npol
,
nthreads
,
output
);
DadaInputStream
<
decltype
(
merger
)
>
input
(
input_key
,
log
,
merger
);
input
.
start
();
/**
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment