Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
MPIfR-BDG
psrdada_cpp
Commits
265c8527
Commit
265c8527
authored
Mar 11, 2020
by
Tobias Winchen
Browse files
Added stokes component calculation
parent
7beb2222
Changes
4
Hide whitespace changes
Inline
Side-by-side
psrdada_cpp/effelsberg/edd/GatedSpectrometer.cuh
View file @
265c8527
...
...
@@ -26,20 +26,91 @@ namespace edd {
typedef
unsigned
long
long
int
uint64_cu
;
static_assert
(
sizeof
(
uint64_cu
)
==
sizeof
(
uint64_t
),
"Long long int not of 64 bit! This is problematic for CUDA!"
);
typedef
uint64_t
RawVoltageType
;
typedef
float
UnpackedVoltageType
;
typedef
float2
ChannelisedVoltageType
;
typedef
float
IntegratedPowerType
;
//typedef int8_t IntegratedPowerType;
// Input data and intermediate processing data for one polarization
struct
PolarizationData
{
DoubleDeviceBuffer
<
RawVoltageType
>
_raw_voltage
;
DoubleDeviceBuffer
<
uint64_t
>
_sideChannelData
;
thrust
::
device_vector
<
UnpackedVoltageType
>
_baseLineG0
;
thrust
::
device_vector
<
UnpackedVoltageType
>
_baseLineG1
;
thrust
::
device_vector
<
ChannelisedVoltageType
>
_channelised_voltage_G0
;
thrust
::
device_vector
<
ChannelisedVoltageType
>
_channelised_voltage_G1
;
void
swap
()
{
_raw_voltage
.
swap
();
_sideChannelData
.
swap
();
}
};
// Outptu data for one gate
class
StokesOutput
{
public:
DoubleDeviceBuffer
<
IntegratedPowerType
>
I
;
DoubleDeviceBuffer
<
IntegratedPowerType
>
Q
;
DoubleDeviceBuffer
<
IntegratedPowerType
>
U
;
DoubleDeviceBuffer
<
IntegratedPowerType
>
V
;
DoubleDeviceBuffer
<
uint64_cu
>
_noOfBitSets
;
void
reset
(
cudaStream_t
&
_proc_stream
)
{
thrust
::
fill
(
thrust
::
cuda
::
par
.
on
(
_proc_stream
),
I
.
a
().
begin
(),
I
.
a
().
end
(),
0.
);
thrust
::
fill
(
thrust
::
cuda
::
par
.
on
(
_proc_stream
),
Q
.
a
().
begin
(),
Q
.
a
().
end
(),
0.
);
thrust
::
fill
(
thrust
::
cuda
::
par
.
on
(
_proc_stream
),
U
.
a
().
begin
(),
U
.
a
().
end
(),
0.
);
thrust
::
fill
(
thrust
::
cuda
::
par
.
on
(
_proc_stream
),
V
.
a
().
begin
(),
V
.
a
().
end
(),
0.
);
thrust
::
fill
(
thrust
::
cuda
::
par
.
on
(
_proc_stream
),
_noOfBitSets
.
a
().
begin
(),
_noOfBitSets
.
a
().
end
(),
0L
);
}
void
swap
()
{
I
.
swap
();
Q
.
swap
();
U
.
swap
();
V
.
swap
();
_noOfBitSets
.
swap
();
}
void
resize
(
size_t
size
,
size_t
blocks
)
{
I
.
resize
(
size
*
blocks
);
Q
.
resize
(
size
*
blocks
);
U
.
resize
(
size
*
blocks
);
V
.
resize
(
size
*
blocks
);
_noOfBitSets
.
resize
(
blocks
);
}
};
/**
@class GatedSpectrometer
@brief Split data into two streams and create integrated spectra depending on
bit set in side channel data.
*/
template
<
class
HandlerType
,
typename
IntegratedPowerType
>
class
GatedSpectrometer
{
template
<
class
HandlerType
>
class
GatedSpectrometer
{
public:
typedef
uint64_t
RawVoltageType
;
typedef
float
UnpackedVoltageType
;
typedef
float2
ChannelisedVoltageType
;
// typedef float IntegratedPowerType;
//typedef int8_t IntegratedPowerType;
public:
/**
...
...
@@ -90,11 +161,10 @@ public:
bool
operator
()(
RawBytes
&
block
);
private:
void
process
(
thrust
::
device_vector
<
RawVoltageType
>
const
&
digitiser_raw
,
thrust
::
device_vector
<
uint64_t
>
const
&
sideChannelData
,
thrust
::
device_vector
<
IntegratedPowerType
>
&
detected
,
thrust
::
device_vector
<
uint64_cu
>
&
noOfBitSetsIn_G0
,
thrust
::
device_vector
<
uint64_cu
>
&
noOfBitSetsIn_G1
);
// gate the data and fft data per gate
void
gated_fft
(
PolarizationData
&
data
,
thrust
::
device_vector
<
uint64_cu
>
&
_noOfBitSetsIn_G0
,
thrust
::
device_vector
<
uint64_cu
>
&
_noOfBitSetsIn_G1
);
private:
DadaBufferLayout
_dadaBufferLayout
;
...
...
@@ -115,26 +185,20 @@ private:
double
_processing_efficiency
;
std
::
unique_ptr
<
Unpacker
>
_unpacker
;
std
::
unique_ptr
<
DetectorAccumulator
<
IntegratedPowerType
>
>
_detector
;
// Input data
DoubleDeviceBuffer
<
RawVoltageType
>
_raw_voltage_db
;
DoubleDeviceBuffer
<
uint64_t
>
_sideChannelData_db
;
// Input data and per pol intermediate data
PolarizationData
polarization0
,
polarization1
;
// Output data
DoubleDeviceBuffer
<
IntegratedPowerType
>
_power_db
;
StokesOutput
stokes_G0
,
stokes_G1
;
DoubleDeviceBuffer
<
uint64_cu
>
_noOfBitSetsIn_G0
;
DoubleDeviceBuffer
<
uint64_cu
>
_noOfBitSetsIn_G1
;
DoublePinnedHostBuffer
<
char
>
_host_power_db
;
// Intermediate process steps
// Temporary processing block
// ToDo: Use inplace FFT to avoid temporary coltage array
thrust
::
device_vector
<
UnpackedVoltageType
>
_unpacked_voltage_G0
;
thrust
::
device_vector
<
UnpackedVoltageType
>
_unpacked_voltage_G1
;
thrust
::
device_vector
<
ChannelisedVoltageType
>
_channelised_voltage
;
thrust
::
device_vector
<
UnpackedVoltageType
>
_baseLineNG0
;
thrust
::
device_vector
<
UnpackedVoltageType
>
_baseLineNG1
;
cudaStream_t
_h2d_stream
;
cudaStream_t
_proc_stream
;
...
...
@@ -142,11 +206,10 @@ private:
};
/**
* @brief Splits the input data depending on a bit set into two arrays.
*
* @detail The resulting gaps are filled with
zeros
in the other stream.
* @detail The resulting gaps are filled with
a given baseline value
in the other stream.
*
* @param GO Input data. Data is set to the baseline value if corresponding
* sideChannelData bit at bitpos os set.
...
...
@@ -177,6 +240,57 @@ __global__ void gating(float *G0, float *G1, const int64_t *sideChannelData,
uint64_cu
*
stats_G0
,
uint64_cu
*
stats_G1
);
/**
* @brief calculate stokes IQUV from two complex valuies for each polarization
*/
//__host__ __device__ void stokes_IQUV(const float2 &p1, const float2 &p2, float &I, float &Q, float &U, float &V);
__host__
__device__
void
stokes_IQUV
(
const
float2
&
p1
,
const
float2
&
p2
,
float
&
I
,
float
&
Q
,
float
&
U
,
float
&
V
)
{
I
=
fabs
(
p1
.
x
*
p1
.
x
+
p1
.
y
*
p1
.
y
)
+
fabs
(
p2
.
x
*
p2
.
x
+
p2
.
y
*
p2
.
y
);
Q
=
fabs
(
p1
.
x
*
p1
.
x
+
p1
.
y
*
p1
.
y
)
-
fabs
(
p2
.
x
*
p2
.
x
+
p2
.
y
*
p2
.
y
);
U
=
2
*
(
p1
.
x
*
p2
.
x
+
p1
.
y
*
p2
.
y
);
V
=
-
2
*
(
p1
.
y
*
p2
.
x
-
p1
.
x
*
p2
.
y
);
}
/**
* @brief calculate stokes IQUV spectra pol1, pol2 are arrays of naccumulate
* complex spectra for individual polarizations
*/
__global__
void
stokes_accumulate
(
float2
const
__restrict__
*
pol1
,
float2
const
__restrict__
*
pol2
,
float
*
I
,
float
*
Q
,
float
*
U
,
float
*
V
,
int
nchans
,
int
naccumulate
)
{
for
(
size_t
i
=
blockIdx
.
x
*
blockDim
.
x
+
threadIdx
.
x
;
(
i
<
nchans
);
i
+=
blockDim
.
x
*
gridDim
.
x
)
{
float
rI
=
0
;
float
rQ
=
0
;
float
rU
=
0
;
float
rV
=
0
;
for
(
int
k
=
0
;
k
<
naccumulate
;
k
++
)
{
const
float2
p1
=
pol1
[
i
+
k
*
nchans
];
const
float2
p2
=
pol2
[
i
+
k
*
nchans
];
rI
+=
fabs
(
p1
.
x
*
p1
.
x
+
p1
.
y
*
p1
.
y
)
+
fabs
(
p2
.
x
*
p2
.
x
+
p2
.
y
*
p2
.
y
);
rQ
+=
fabs
(
p1
.
x
*
p1
.
x
+
p1
.
y
*
p1
.
y
)
-
fabs
(
p2
.
x
*
p2
.
x
+
p2
.
y
*
p2
.
y
);
rU
+=
2.
f
*
(
p1
.
x
*
p2
.
x
+
p1
.
y
*
p2
.
y
);
rV
+=
-
2.
f
*
(
p1
.
y
*
p2
.
x
-
p1
.
x
*
p2
.
y
);
}
I
[
i
]
+=
rI
;
Q
[
i
]
+=
rQ
;
U
[
i
]
+=
rU
;
V
[
i
]
+=
rV
;
}
}
...
...
psrdada_cpp/effelsberg/edd/detail/GatedSpectrometer.cu
View file @
265c8527
...
...
@@ -17,8 +17,9 @@ namespace psrdada_cpp {
namespace
effelsberg
{
namespace
edd
{
// Reduce thread local vatiable v in shared array x, so that x[0]
template
<
typename
T
>
__device__
void
reduce
(
T
*
x
,
const
T
&
v
)
__device__
void
sum_
reduce
(
T
*
x
,
const
T
&
v
)
{
x
[
threadIdx
.
x
]
=
v
;
__syncthreads
();
...
...
@@ -28,22 +29,33 @@ __device__ void reduce(T *x, const T &v)
x
[
threadIdx
.
x
]
+=
x
[
threadIdx
.
x
+
s
];
__syncthreads
();
}
}
// If one of the side channel items is lsot, then both are considered as lost
// here
__global__
void
mergeSideChannels
(
uint64_t
*
__restrict__
A
,
uint64_t
*
__restrict__
B
,
size_t
N
)
{
for
(
size_t
i
=
blockIdx
.
x
*
blockDim
.
x
+
threadIdx
.
x
;
(
i
<
N
);
i
+=
blockDim
.
x
*
gridDim
.
x
)
{
uint64_t
v
=
A
[
i
]
||
B
[
i
];
A
[
i
]
=
v
;
B
[
i
]
=
v
;
}
}
__global__
void
gating
(
float
*
__restrict__
G0
,
float
*
__restrict__
G1
,
const
uint64_t
*
__restrict__
sideChannelData
,
size_t
N
,
size_t
heapSize
,
size_t
bitpos
,
size_t
noOfSideChannels
,
size_t
selectedSideChannel
,
const
float
baseLineG0
,
const
float
baseLineG1
,
float
*
__restrict__
baseLineNG0
,
float
*
__restrict__
baseLineNG1
,
uint64_cu
*
stats_G0
,
uint64_cu
*
stats_G1
)
{
// float baseLineG0 = (*_baseLineNG0) / N;
// float baseLineG1 = (*_baseLineNG1) / N;
__global__
void
gating
(
float
*
__restrict__
G0
,
float
*
__restrict__
G1
,
const
uint64_t
*
__restrict__
sideChannelData
,
size_t
N
,
size_t
heapSize
,
size_t
bitpos
,
size_t
noOfSideChannels
,
size_t
selectedSideChannel
,
const
float
baseLineG0
,
const
float
baseLineG1
,
float
*
__restrict__
baseLineNG0
,
float
*
__restrict__
baseLineNG1
,
uint64_cu
*
stats_G0
,
uint64_cu
*
stats_G1
)
{
// statistics values for samopels to G0, G1
uint32_t
_G0stats
=
0
;
uint32_t
_G1stats
=
0
;
...
...
@@ -55,12 +67,8 @@ __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uin
i
+=
blockDim
.
x
*
gridDim
.
x
)
{
const
float
v
=
G0
[
i
];
const
uint64_t
sideChannelItem
=
sideChannelData
[((
i
/
heapSize
)
*
(
noOfSideChannels
))
+
selectedSideChannel
];
// Probably not optimal access as
// same data is copied for several
// threads, but maybe efficiently
// handled by cache?
const
uint64_t
sideChannelItem
=
sideChannelData
[((
i
/
heapSize
)
*
(
noOfSideChannels
))
+
selectedSideChannel
];
const
unsigned
int
bit_set
=
TEST_BIT
(
sideChannelItem
,
bitpos
);
const
unsigned
int
heap_lost
=
TEST_BIT
(
sideChannelItem
,
63
);
...
...
@@ -73,36 +81,38 @@ __global__ void gating(float* __restrict__ G0, float* __restrict__ G1, const uin
baselineUpdateG1
+=
v
*
bit_set
*
(
!
heap_lost
);
baselineUpdateG0
+=
v
*
(
!
bit_set
)
*
(
!
heap_lost
);
}
__shared__
uint32_t
x
[
1024
];
// Reduce G0, G1
reduce
<
uint32_t
>
(
x
,
_G0stats
);
sum_
reduce
<
uint32_t
>
(
x
,
_G0stats
);
if
(
threadIdx
.
x
==
0
)
atomicAdd
(
stats_G0
,
(
uint64_cu
)
x
[
threadIdx
.
x
]);
__syncthreads
();
reduce
<
uint32_t
>
(
x
,
_G1stats
);
sum_reduce
<
uint32_t
>
(
x
,
_G1stats
);
if
(
threadIdx
.
x
==
0
)
atomicAdd
(
stats_G1
,
(
uint64_cu
)
x
[
threadIdx
.
x
]);
__syncthreads
();
//reuse shared array
float
*
y
=
(
float
*
)
x
;
//update the baseline array
reduce
<
float
>
(
y
,
baselineUpdateG0
);
sum_
reduce
<
float
>
(
y
,
baselineUpdateG0
);
if
(
threadIdx
.
x
==
0
)
atomicAdd
(
baseLineNG0
,
y
[
threadIdx
.
x
]);
__syncthreads
();
reduce
<
float
>
(
y
,
baselineUpdateG1
);
sum_reduce
<
float
>
(
y
,
baselineUpdateG1
);
if
(
threadIdx
.
x
==
0
)
atomicAdd
(
baseLineNG1
,
y
[
threadIdx
.
x
]);
__syncthreads
();
}
template
<
class
HandlerType
,
typename
IntegratedPowerType
>
GatedSpectrometer
<
HandlerType
,
IntegratedPowerType
>::
GatedSpectrometer
(
template
<
class
HandlerType
>
GatedSpectrometer
<
HandlerType
>::
GatedSpectrometer
(
const
DadaBufferLayout
&
dadaBufferLayout
,
std
::
size_t
selectedSideChannel
,
std
::
size_t
selectedBit
,
std
::
size_t
fft_length
,
std
::
size_t
naccumulate
,
std
::
size_t
nbits
,
float
input_level
,
float
output_level
,
...
...
@@ -171,36 +181,34 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
cufftSetStream
(
_fft_plan
,
_proc_stream
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Allocating memory"
;
_raw_voltage_db
.
resize
(
_dadaBufferLayout
.
sizeOfData
()
/
sizeof
(
uint64_t
));
_sideChannelData_db
.
resize
(
_dadaBufferLayout
.
getNSideChannels
()
*
_dadaBufferLayout
.
getNHeaps
());
polarization0
.
_raw_voltage
.
resize
(
_dadaBufferLayout
.
sizeOfData
()
/
sizeof
(
uint64_t
));
polarization1
.
_raw_voltage
.
resize
(
_dadaBufferLayout
.
sizeOfData
()
/
sizeof
(
uint64_t
));
polarization0
.
_sideChannelData
.
resize
(
_dadaBufferLayout
.
getNSideChannels
()
*
_dadaBufferLayout
.
getNHeaps
());
polarization1
.
_sideChannelData
.
resize
(
_dadaBufferLayout
.
getNSideChannels
()
*
_dadaBufferLayout
.
getNHeaps
());
BOOST_LOG_TRIVIAL
(
debug
)
<<
" Input voltages size (in 64-bit words): "
<<
_raw_voltage
_db
.
size
();
<<
polarization0
.
_raw_voltage
.
size
();
_unpacked_voltage_G0
.
resize
(
_nsamps_per_buffer
);
_unpacked_voltage_G1
.
resize
(
_nsamps_per_buffer
);
_baseLineNG0
.
resize
(
1
);
_baseLineNG1
.
resize
(
1
);
polarization0
.
_baseLineG0
.
resize
(
1
);
polarization0
.
_baseLineG1
.
resize
(
1
);
polarization1
.
_baseLineG0
.
resize
(
1
);
polarization1
.
_baseLineG1
.
resize
(
1
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
" Unpacked voltages size (in samples): "
<<
_unpacked_voltage_G0
.
size
();
_channelised_voltage
.
resize
(
_nchans
*
batch
);
polarization0
.
_channelised_voltage_G0
.
resize
(
_nchans
*
batch
);
polarization0
.
_channelised_voltage_G1
.
resize
(
_nchans
*
batch
);
polarization1
.
_channelised_voltage_G0
.
resize
(
_nchans
*
batch
);
polarization1
.
_channelised_voltage_G1
.
resize
(
_nchans
*
batch
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
" Channelised voltages size: "
<<
_channelised_voltage
.
size
();
_power_db
.
resize
(
_nchans
*
batch
/
(
_naccumulate
/
nBlocks
)
*
2
);
// hold on and off spectra to simplify output
thrust
::
fill
(
_power_db
.
a
().
begin
(),
_power_db
.
a
().
end
(),
0.
);
thrust
::
fill
(
_power_db
.
b
().
begin
(),
_power_db
.
b
().
end
(),
0.
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
" Powers size: "
<<
_power_db
.
size
()
/
2
;
_noOfBitSetsIn_G0
.
resize
(
batch
/
(
_naccumulate
/
nBlocks
));
_noOfBitSetsIn_G1
.
resize
(
batch
/
(
_naccumulate
/
nBlocks
));
thrust
::
fill
(
_noOfBitSetsIn_G0
.
a
().
begin
(),
_noOfBitSetsIn_G0
.
a
().
end
(),
0L
);
thrust
::
fill
(
_noOfBitSetsIn_G0
.
b
().
begin
(),
_noOfBitSetsIn_G0
.
b
().
end
(),
0L
);
thrust
::
fill
(
_noOfBitSetsIn_G1
.
a
().
begin
(),
_noOfBitSetsIn_G1
.
a
().
end
(),
0L
);
thrust
::
fill
(
_noOfBitSetsIn_G1
.
b
().
begin
(),
_noOfBitSetsIn_G1
.
b
().
end
(),
0L
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
" Bit set counter size: "
<<
_noOfBitSetsIn_G0
.
size
();
// on the host both power are stored in the same data buffer together with
// the number of bit sets
_host_power_db
.
resize
(
_power_db
.
size
()
*
sizeof
(
IntegratedPowerType
)
+
2
*
sizeof
(
size_t
)
*
_noOfBitSetsIn_G0
.
size
());
<<
polarization0
.
_channelised_voltage_G0
.
size
();
stokes_G0
.
resize
(
_nchans
,
batch
/
(
_naccumulate
/
nBlocks
));
stokes_G1
.
resize
(
_nchans
,
batch
/
(
_naccumulate
/
nBlocks
));
// on the host full output is stored together with sci data in one buffer
_host_power_db
.
resize
(
8
*
(
_nchans
*
sizeof
(
IntegratedPowerType
)
+
sizeof
(
size_t
))
*
batch
/
(
_naccumulate
/
nBlocks
));
CUDA_ERROR_CHECK
(
cudaStreamCreate
(
&
_h2d_stream
));
CUDA_ERROR_CHECK
(
cudaStreamCreate
(
&
_proc_stream
));
...
...
@@ -208,13 +216,12 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::GatedSpectrometer(
CUFFT_ERROR_CHECK
(
cufftSetStream
(
_fft_plan
,
_proc_stream
));
_unpacker
.
reset
(
new
Unpacker
(
_proc_stream
));
_detector
.
reset
(
new
DetectorAccumulator
<
IntegratedPowerType
>
(
_nchans
,
_naccumulate
/
nBlocks
,
scaling
,
offset
,
_proc_stream
));
}
// constructor
template
<
class
HandlerType
,
typename
IntegratedPowerType
>
GatedSpectrometer
<
HandlerType
,
IntegratedPowerType
>::~
GatedSpectrometer
()
{
template
<
class
HandlerType
>
GatedSpectrometer
<
HandlerType
>::~
GatedSpectrometer
()
{
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Destroying GatedSpectrometer"
;
if
(
!
_fft_plan
)
cufftDestroy
(
_fft_plan
);
...
...
@@ -224,8 +231,9 @@ GatedSpectrometer<HandlerType, IntegratedPowerType>::~GatedSpectrometer() {
}
template
<
class
HandlerType
,
typename
IntegratedPowerType
>
void
GatedSpectrometer
<
HandlerType
,
IntegratedPowerType
>::
init
(
RawBytes
&
block
)
{
template
<
class
HandlerType
>
void
GatedSpectrometer
<
HandlerType
>::
init
(
RawBytes
&
block
)
{
BOOST_LOG_TRIVIAL
(
debug
)
<<
"GatedSpectrometer init called"
;
std
::
stringstream
headerInfo
;
headerInfo
<<
"
\n
"
...
...
@@ -254,78 +262,82 @@ void GatedSpectrometer<HandlerType, IntegratedPowerType>::init(RawBytes &block)
}
template
<
class
HandlerType
,
typename
IntegratedPowerType
>
void
GatedSpectrometer
<
HandlerType
,
IntegratedPowerType
>::
process
(
thrust
::
device_vector
<
RawVoltageType
>
const
&
digitiser_raw
,
thrust
::
device_vector
<
uint64_t
>
const
&
sideChannelData
,
thrust
::
device_vector
<
IntegratedPowerType
>
&
detected
,
thrust
::
device_vector
<
uint64_cu
>
&
noOfBitSetsIn_G0
,
thrust
::
device_vector
<
uint64_cu
>
&
noOfBitSetsIn_G1
)
{
template
<
class
HandlerType
>
void
GatedSpectrometer
<
HandlerType
>::
gated_fft
(
PolarizationData
&
thispol
,
PolarizationData
&
otherpol
,
thrust
::
device_vector
<
uint64_cu
>
&
_noOfBitSetsIn_G0
,
thrust
::
device_vector
<
uint64_cu
>
&
_noOfBitSetsIn_G1
)
{
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Unpacking raw voltages"
;
switch
(
_nbits
)
{
case
8
:
_unpacker
->
unpack
<
8
>
(
d
igitiser_raw
,
_unpacked_voltage_G0
);
_unpacker
->
unpack
<
8
>
(
d
ata
.
_raw_voltage
.
b
()
,
_unpacked_voltage_G0
);
break
;
case
12
:
_unpacker
->
unpack
<
12
>
(
d
igitiser_raw
,
_unpacked_voltage_G0
);
_unpacker
->
unpack
<
12
>
(
d
ata
.
_raw_voltage
.
b
()
,
_unpacked_voltage_G0
);
break
;
default:
throw
std
::
runtime_error
(
"Unsupported number of bits"
);
}
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Calculate baseline"
;
//calculate baseline from previos block
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Perform gating"
;
float
baseLineG0
=
_baseLineNG0
[
0
];
float
baseLineG1
=
_baseLineNG1
[
0
];
// Get baseline from previous block
float
previous_baseLineG0
=
data
.
_baseLineG0
[
0
];
float
previous_baseLineG1
=
data
.
_baseLineG1
[
0
];
uint64_t
NG0
=
0
;
uint64_t
NG1
=
0
;
// Loop over outputblocks, for case of multiple output blocks per input block
for
(
size_t
i
=
0
;
i
<
noOfBitSetsIn_G0
.
size
();
i
++
)
int
step
=
data
.
_sideChannelData
.
size
()
/
_noOfBitSetsIn_G0
.
size
();
for
(
size_t
i
=
0
;
i
<
_noOfBitSetsIn_G0
.
size
();
i
++
)
{
// ToDo: Should be in one kernel call
gating
<<<
1024
,
1024
,
0
,
_proc_stream
>>>
(
thrust
::
raw_pointer_cast
(
_unpacked_voltage_G0
.
data
()
+
i
*
sideChannelData
.
size
()
/
noOfBitSetsIn_G0
.
size
()),
thrust
::
raw_pointer_cast
(
_unpacked_voltage_G1
.
data
()
+
i
*
sideChannelData
.
size
()
/
noOfBitSetsIn_G0
.
size
()),
thrust
::
raw_pointer_cast
(
sideChannelData
.
data
()
+
i
*
sideChannelData
.
size
()
/
noOfBitSetsIn_G0
.
size
()),
_unpacked_voltage_G0
.
size
()
/
noOfBitSetsIn_G0
.
size
(),
_dadaBufferLayout
.
getHeapSize
(),
_selectedBit
,
_dadaBufferLayout
.
getNSideChannels
(),
thrust
::
raw_pointer_cast
(
_unpacked_voltage_G0
.
data
()
+
i
*
step
*
_nsamps_per_heap
),
thrust
::
raw_pointer_cast
(
_unpacked_voltage_G1
.
data
()
+
i
*
step
*
_nsamps_per_heap
),
thrust
::
raw_pointer_cast
(
data
.
_sideChannelData
.
b
().
data
()
+
i
*
step
),
_unpacked_voltage_G0
.
size
()
/
_noOfBitSetsIn_G0
.
size
(),
_dadaBufferLayout
.
getHeapSize
(),
_selectedBit
,
_dadaBufferLayout
.
getNSideChannels
(),
_selectedSideChannel
,
baseLineG0
,
baseLineG1
,
thrust
::
raw_pointer_cast
(
_baseLine
N
G0
.
data
()),
thrust
::
raw_pointer_cast
(
_baseLine
N
G1
.
data
()),
thrust
::
raw_pointer_cast
(
noOfBitSetsIn_G0
.
data
()
+
i
),
thrust
::
raw_pointer_cast
(
noOfBitSetsIn_G1
.
data
()
+
i
)
previous_
baseLineG0
,
previous_
baseLineG1
,
thrust
::
raw_pointer_cast
(
data
.
_baseLineG0
.
data
()),
thrust
::
raw_pointer_cast
(
data
.
_baseLineG1
.
data
()),
thrust
::
raw_pointer_cast
(
_
noOfBitSetsIn_G0
.
data
()
+
i
),
thrust
::
raw_pointer_cast
(
_
noOfBitSetsIn_G1
.
data
()
+
i
)
);
NG0
+=
noOfBitSetsIn_G0
[
i
];
NG1
+=
noOfBitSetsIn_G1
[
i
];
NG0
+=
_
noOfBitSetsIn_G0
[
i
];
NG1
+=
_
noOfBitSetsIn_G1
[
i
];
}
_baseLine
N
G0
[
0
]
/=
NG0
;
_baseLine
N
G1
[
0
]
/=
NG1
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Updating Baselines
\n
G0: "
<<
baseLineG0
<<
" -> "
<<
_baseLine
N
G0
[
0
]
<<
", "
<<
baseLineG1
<<
" -> "
<<
_baseLine
N
G1
[
0
]
;
data
.
_baseLineG0
[
0
]
/=
NG0
;
data
.
_baseLineG1
[
0
]
/=
NG1
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Updating Baselines
\n
G0: "
<<
previous_
baseLineG0
<<
" -> "
<<
data
.
_baseLineG0
[
0
]
<<
", "
<<
previous_
baseLineG1
<<
" -> "
<<
data
.
_baseLineG1
[
0
]
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Performing FFT 1"
;
UnpackedVoltageType
*
_unpacked_voltage_ptr
=
thrust
::
raw_pointer_cast
(
_unpacked_voltage_G0
.
data
());
ChannelisedVoltageType
*
_channelised_voltage_ptr
=
thrust
::
raw_pointer_cast
(
_channelised_voltage
.
data
());
thrust
::
raw_pointer_cast
(
data
.
_channelised_voltage
_G0
.
data
());
CUFFT_ERROR_CHECK
(
cufftExecR2C
(
_fft_plan
,
(
cufftReal
*
)
_unpacked_voltage_ptr
,
(
cufftComplex
*
)
_channelised_voltage_ptr
));
_detector
->
detect
(
_channelised_voltage
,
detected
,
2
,
0
);
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Performing FFT 2"
;
_unpacked_voltage_ptr
=
thrust
::
raw_pointer_cast
(
_unpacked_voltage_G1
.
data
());
_channelised_voltage_ptr
=
thrust
::
raw_pointer_cast
(
data
.
_channelised_voltage_G1
.
data
());
CUFFT_ERROR_CHECK
(
cufftExecR2C
(
_fft_plan
,
(
cufftReal
*
)
_unpacked_voltage_ptr
,
(
cufftComplex
*
)
_channelised_voltage_ptr
));
_detector
->
detect
(
_channelised_voltage
,
detected
,
2
,
1
);
CUDA_ERROR_CHECK
(
cudaStreamSynchronize
(
_proc_stream
));
BOOST_LOG_TRIVIAL
(
debug
)
<<
"Exit processing"
;
}
// process
template
<
class
HandlerType
,
typename
IntegratedPowerType
>
bool
GatedSpectrometer
<
HandlerType
,
IntegratedPowerType
>::
operator
()(
RawBytes
&
block
)
{
template
<
class
HandlerType
>
bool
GatedSpectrometer
<
HandlerType
>::
operator
()(
RawBytes
&
block
)
{
++
_call_count
;
BOOST_LOG_TRIVIAL
(
debug
)
<<
"GatedSpectrometer operator() called (count = "
<<
_call_count
<<
")"
;
...
...
@@ -340,20 +352,43 @@ bool GatedSpectrometer<HandlerType, IntegratedPowerType>::operator()(RawBytes &b
// Copy data to device
CUDA_ERROR_CHECK
(
cudaStreamSynchronize
(
_h2d_stream
));
_raw_voltage_db
.
swap
();
_sideChannelData_db
.
swap
();
polarization0
.
swap
();
polarization1
.
swap
();
BOOST_LOG_TRIVIAL
(
debug
)
<<
" block.used_bytes() = "
<<
block
.
used_bytes
()
<<
", dataBlockBytes = "
<<
_dadaBufferLayout
.
sizeOfData
()
<<
"
\n
"
;
CUDA_ERROR_CHECK
(
cudaMemcpyAsync
(
static_cast
<
void
*>
(
_raw_voltage_db
.
a_ptr
()),
static_cast
<
void
*>
(
block
.
ptr
()),
_dadaBufferLayout
.
sizeOfData
()
,
cudaMemcpyHostToDevice
,
_h2d_stream
));
CUDA_ERROR_CHECK
(
cudaMemcpyAsync
(
static_cast
<
void
*>
(
_sideChannelData_db
.
a_ptr
()),
static_cast
<
void
*>
(
block
.
ptr
()
+
_dadaBufferLayout
.
sizeOfData
()
+
_dadaBufferLayout
.
sizeOfGap
()),
_dadaBufferLayout
.
sizeOfSideChannelData
(),
cudaMemcpyHostToDevice
,
_h2d_stream
));
// Copy the data with stride to the GPU:
// CPU: P1P2P1P2P1P2 ...
// GPU: P1P1P1 ... P2P2P2 ...
int
heapsize_bytes
=
_nsamps_per_heap
*
_nbits
/
8
;
CUDA_ERROR_CHECK
(
cudaMemcpy2DAsync
(
static_cast
<
void
*>
(
polarization0
.
_raw_voltage
.
a_ptr
()),
heapsize_bytes
,
static_cast
<
void
*>
(
block
.
ptr
()),
2
*
heapsize_bytes
,
heapsize_bytes
,
_dadaBufferLayout
.
sizeOfData
()
/
heapsize_bytes
/
2
,
cudaMemcpyHostToDevice
,
_h2d_stream
));
CUDA_ERROR_CHECK
(
cudaMemcpy2DAsync
(
static_cast
<
void
*>
(
polarization1
.
_raw_voltage
.
a_ptr
()),
heapsize_bytes
,
static_cast
<
void
*>
(
block
.
ptr
())
+
heapsize_bytes
,
2
*
heapsize_bytes
,
heapsize_bytes
,
_dadaBufferLayout
.
sizeOfData
()
/
heapsize_bytes
/
2
,
cudaMemcpyHostToDevice
,
_h2d_stream
));
// ToDo: Strided copy of side channel data
// CUDA_ERROR_CHECK(cudaMemcpyAsync(
// static_cast<void *>(polarization0._sideChannelData.a_ptr()),
// static_cast<void *>(block.ptr() + _dadaBufferLayout.sizeOfData() + _dadaBufferLayout.sizeOfGap()),
// _dadaBufferLayout.sizeOfSideChannelData(), cudaMemcpyHostToDevice, _h2d_stream));
//
// CUDA_ERROR_CHECK(cudaMemcpyAsync(