Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
ift
NIFTy
Commits
533fcc7e
Commit
533fcc7e
authored
May 30, 2020
by
Martin Reinecke
Browse files
Merge branch 'mpi_utils' into 'NIFTy_6'
Provide some MPI utilities in utilities.py See merge request
!519
parents
2249183a
94ec206a
Pipeline
#75812
passed with stages
in 25 minutes and 3 seconds
Changes
2
Pipelines
1
Show whitespace changes
Inline
Side-by-side
nifty6/minimization/metric_gaussian_kl.py
View file @
533fcc7e
...
...
@@ -28,14 +28,6 @@ from ..sugar import full, makeOp
from
.energy
import
Energy
def
_shareRange
(
nwork
,
nshares
,
myshare
):
nbase
=
nwork
//
nshares
additional
=
nwork
%
nshares
lo
=
myshare
*
nbase
+
min
(
myshare
,
additional
)
hi
=
lo
+
nbase
+
int
(
myshare
<
additional
)
return
lo
,
hi
class
_KLMetric
(
EndomorphicOperator
):
def
__init__
(
self
,
KL
):
self
.
_KL
=
KL
...
...
@@ -138,7 +130,7 @@ class MetricGaussianKL(Energy):
self
.
_comm
=
comm
ntask
=
self
.
_comm
.
Get_size
()
rank
=
self
.
_comm
.
Get_rank
()
self
.
_lo
,
self
.
_hi
=
_
shareRange
(
self
.
_n_samples
,
ntask
,
rank
)
self
.
_lo
,
self
.
_hi
=
utilities
.
shareRange
(
self
.
_n_samples
,
ntask
,
rank
)
else
:
self
.
_comm
=
None
self
.
_lo
,
self
.
_hi
=
0
,
self
.
_n_samples
...
...
@@ -175,10 +167,10 @@ class MetricGaussianKL(Energy):
tg
=
tg
+
tmp
.
gradient
v
.
append
(
tv
)
g
.
append
(
tg
)
self
.
_val
=
self
.
_sumup
(
v
)[()]
/
self
.
_n_eff_samples
self
.
_val
=
utilities
.
allreduce_sum
(
v
,
self
.
_comm
)[()]
/
self
.
_n_eff_samples
if
np
.
isnan
(
self
.
_val
)
and
self
.
_mitigate_nans
:
self
.
_val
=
np
.
inf
self
.
_grad
=
self
.
_sumup
(
g
)
/
self
.
_n_eff_samples
self
.
_grad
=
utilities
.
allreduce_sum
(
g
,
self
.
_comm
)
/
self
.
_n_eff_samples
def
at
(
self
,
position
):
return
MetricGaussianKL
(
...
...
@@ -202,7 +194,7 @@ class MetricGaussianKL(Energy):
if
self
.
_mirror_samples
:
tmp
=
tmp
+
self
.
_hamiltonian
(
lin
-
s
).
metric
(
x
)
res
.
append
(
tmp
)
return
self
.
_sum
up
(
res
)
/
self
.
_n_eff_samples
return
utilities
.
allreduce
_sum
(
res
,
self
.
_comm
)
/
self
.
_n_eff_samples
@
property
def
metric
(
self
):
...
...
@@ -218,7 +210,7 @@ class MetricGaussianKL(Energy):
else
:
ntask
=
self
.
_comm
.
Get_size
()
rank
=
self
.
_comm
.
Get_rank
()
rank_lo_hi
=
[
_
shareRange
(
self
.
_n_samples
,
ntask
,
i
)
for
i
in
range
(
ntask
)]
rank_lo_hi
=
[
utilities
.
shareRange
(
self
.
_n_samples
,
ntask
,
i
)
for
i
in
range
(
ntask
)]
for
itask
,
(
l
,
h
)
in
enumerate
(
rank_lo_hi
):
for
i
in
range
(
l
,
h
):
data
=
self
.
_local_samples
[
i
-
self
.
_lo
]
if
rank
==
itask
else
None
...
...
@@ -227,63 +219,6 @@ class MetricGaussianKL(Energy):
if
self
.
_mirror_samples
:
yield
-
s
def
_sumup
(
self
,
obj
):
""" This is a deterministic implementation of MPI allreduce
Numeric addition is not associative due to rounding errors.
Therefore we provide our own implementation that is consistent
no matter if MPI is used and how many tasks there are.
At the beginning, a list `who` is constructed, that states which obj can
be found on which MPI task.
Then elements are added pairwise, with increasing pair distance.
In the first round, the distance between pair members is 1:
v[0] := v[0] + v[1]
v[2] := v[2] + v[3]
v[4] := v[4] + v[5]
Entries whose summation partner lies beyond the end of the array
stay unchanged.
When both summation partners are not located on the same MPI task,
the second summand is sent to the task holding the first summand and
the operation is carried out there.
For the next round, the distance is doubled:
v[0] := v[0] + v[2]
v[4] := v[4] + v[6]
v[8] := v[8] + v[10]
This is repeated until the distance exceeds the length of the array.
At this point v[0] contains the sum of all entries, which is then
broadcast to all tasks.
"""
if
self
.
_comm
is
None
:
who
=
np
.
zeros
(
self
.
_n_samples
,
dtype
=
np
.
int32
)
rank
=
0
vals
=
list
(
obj
)
# necessary since we don't want to modify `obj`
else
:
ntask
=
self
.
_comm
.
Get_size
()
rank
=
self
.
_comm
.
Get_rank
()
rank_lo_hi
=
[
_shareRange
(
self
.
_n_samples
,
ntask
,
i
)
for
i
in
range
(
ntask
)]
lo
,
hi
=
rank_lo_hi
[
rank
]
vals
=
[
None
]
*
lo
+
list
(
obj
)
+
[
None
]
*
(
self
.
_n_samples
-
hi
)
who
=
[
t
for
t
,
(
l
,
h
)
in
enumerate
(
rank_lo_hi
)
for
cnt
in
range
(
h
-
l
)]
step
=
1
while
step
<
self
.
_n_samples
:
for
j
in
range
(
0
,
self
.
_n_samples
,
2
*
step
):
if
j
+
step
<
self
.
_n_samples
:
# summation partner found
if
rank
==
who
[
j
]:
if
who
[
j
]
==
who
[
j
+
step
]:
# no communication required
vals
[
j
]
=
vals
[
j
]
+
vals
[
j
+
step
]
vals
[
j
+
step
]
=
None
else
:
vals
[
j
]
=
vals
[
j
]
+
self
.
_comm
.
recv
(
source
=
who
[
j
+
step
])
elif
rank
==
who
[
j
+
step
]:
self
.
_comm
.
send
(
vals
[
j
+
step
],
dest
=
who
[
j
])
vals
[
j
+
step
]
=
None
step
*=
2
if
self
.
_comm
is
None
:
return
vals
[
0
]
return
self
.
_comm
.
bcast
(
vals
[
0
],
root
=
who
[
0
])
def
_metric_sample
(
self
,
from_inverse
=
False
):
if
from_inverse
:
raise
NotImplementedError
()
...
...
@@ -296,4 +231,4 @@ class MetricGaussianKL(Energy):
if
self
.
_mirror_samples
:
tmp
=
tmp
+
self
.
_hamiltonian
(
lin
-
v
).
metric
.
draw_sample
(
from_inverse
=
False
)
samp
.
append
(
tmp
)
return
self
.
_sum
up
(
samp
)
/
self
.
_n_eff_samples
return
utilities
.
allreduce
_sum
(
samp
,
self
.
_comm
)
/
self
.
_n_eff_samples
nifty6/utilities.py
View file @
533fcc7e
...
...
@@ -248,3 +248,121 @@ def iscomplextype(dtype):
def
indent
(
inp
):
return
"
\n
"
.
join
(((
" "
+
s
).
rstrip
()
for
s
in
inp
.
splitlines
()))
def
shareRange
(
nwork
,
nshares
,
myshare
):
"""Divides a number of work items as fairly as possible into a given number
of shares.
Parameters
----------
nwork: int
number of work items
nshares: int
number of shares among which the work should be distributed
myshare: int
the share for which the range of work items is requested
Returns
-------
lo, hi: int
index range of work items for this share
"""
nbase
=
nwork
//
nshares
additional
=
nwork
%
nshares
lo
=
myshare
*
nbase
+
min
(
myshare
,
additional
)
hi
=
lo
+
nbase
+
int
(
myshare
<
additional
)
return
lo
,
hi
def
get_MPI_params
():
"""Returns basic information about the MPI setup of the running script.
Returns
-------
comm: MPI communicator or None
if MPI is detected _and_ more than one task is active, returns
the world communicator, else returns None
size: int
the number of tasks running in total
rank: int
the rank of this task
master: bool
True if rank == 0, else False
"""
try
:
from
mpi4py
import
MPI
comm
=
MPI
.
COMM_WORLD
size
=
comm
.
Get_size
()
if
size
==
1
:
return
None
,
1
,
0
,
True
rank
=
comm
.
Get_rank
()
return
comm
,
size
,
rank
,
rank
==
0
except
ImportError
:
return
None
,
1
,
0
,
True
def
allreduce_sum
(
obj
,
comm
):
""" This is a deterministic implementation of MPI allreduce
Numeric addition is not associative due to rounding errors.
Therefore we provide our own implementation that is consistent
no matter if MPI is used and how many tasks there are.
At the beginning, a list `who` is constructed, that states which obj can
be found on which MPI task.
Then elements are added pairwise, with increasing pair distance.
In the first round, the distance between pair members is 1:
v[0] := v[0] + v[1]
v[2] := v[2] + v[3]
v[4] := v[4] + v[5]
Entries whose summation partner lies beyond the end of the array
stay unchanged.
When both summation partners are not located on the same MPI task,
the second summand is sent to the task holding the first summand and
the operation is carried out there.
For the next round, the distance is doubled:
v[0] := v[0] + v[2]
v[4] := v[4] + v[6]
v[8] := v[8] + v[10]
This is repeated until the distance exceeds the length of the array.
At this point v[0] contains the sum of all entries, which is then
broadcast to all tasks.
"""
vals
=
list
(
obj
)
if
comm
is
None
:
nobj
=
len
(
vals
)
who
=
np
.
zeros
(
nobj
,
dtype
=
np
.
int32
)
rank
=
0
else
:
ntask
=
comm
.
Get_size
()
rank
=
comm
.
Get_rank
()
nobj_list
=
comm
.
allgather
(
len
(
vals
))
all_hi
=
list
(
np
.
cumsum
(
nobj_list
))
all_lo
=
[
0
]
+
all_hi
[:
-
1
]
nobj
=
all_hi
[
-
1
]
rank_lo_hi
=
[(
l
,
h
)
for
l
,
h
in
zip
(
all_lo
,
all_hi
)]
lo
,
hi
=
rank_lo_hi
[
rank
]
vals
=
[
None
]
*
lo
+
vals
+
[
None
]
*
(
nobj
-
hi
)
who
=
[
t
for
t
,
(
l
,
h
)
in
enumerate
(
rank_lo_hi
)
for
cnt
in
range
(
h
-
l
)]
step
=
1
while
step
<
nobj
:
for
j
in
range
(
0
,
nobj
,
2
*
step
):
if
j
+
step
<
nobj
:
# summation partner found
if
rank
==
who
[
j
]:
if
who
[
j
]
==
who
[
j
+
step
]:
# no communication required
vals
[
j
]
=
vals
[
j
]
+
vals
[
j
+
step
]
vals
[
j
+
step
]
=
None
else
:
vals
[
j
]
=
vals
[
j
]
+
comm
.
recv
(
source
=
who
[
j
+
step
])
elif
rank
==
who
[
j
+
step
]:
comm
.
send
(
vals
[
j
+
step
],
dest
=
who
[
j
])
vals
[
j
+
step
]
=
None
step
*=
2
if
comm
is
None
:
return
vals
[
0
]
return
comm
.
bcast
(
vals
[
0
],
root
=
who
[
0
])
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment