Commit 94ec206a authored by Martin Reinecke's avatar Martin Reinecke
Browse files

provide some MPI utilities in utilties.py

parent 2249183a
Pipeline #75785 passed with stages
in 13 minutes and 40 seconds
......@@ -28,14 +28,6 @@ from ..sugar import full, makeOp
from .energy import Energy
def _shareRange(nwork, nshares, myshare):
nbase = nwork//nshares
additional = nwork % nshares
lo = myshare*nbase + min(myshare, additional)
hi = lo + nbase + int(myshare < additional)
return lo, hi
class _KLMetric(EndomorphicOperator):
def __init__(self, KL):
self._KL = KL
......@@ -138,7 +130,7 @@ class MetricGaussianKL(Energy):
self._comm = comm
ntask = self._comm.Get_size()
rank = self._comm.Get_rank()
self._lo, self._hi = _shareRange(self._n_samples, ntask, rank)
self._lo, self._hi = utilities.shareRange(self._n_samples, ntask, rank)
else:
self._comm = None
self._lo, self._hi = 0, self._n_samples
......@@ -175,10 +167,10 @@ class MetricGaussianKL(Energy):
tg = tg + tmp.gradient
v.append(tv)
g.append(tg)
self._val = self._sumup(v)[()]/self._n_eff_samples
self._val = utilities.allreduce_sum(v, self._comm)[()]/self._n_eff_samples
if np.isnan(self._val) and self._mitigate_nans:
self._val = np.inf
self._grad = self._sumup(g)/self._n_eff_samples
self._grad = utilities.allreduce_sum(g, self._comm)/self._n_eff_samples
def at(self, position):
return MetricGaussianKL(
......@@ -202,7 +194,7 @@ class MetricGaussianKL(Energy):
if self._mirror_samples:
tmp = tmp + self._hamiltonian(lin-s).metric(x)
res.append(tmp)
return self._sumup(res)/self._n_eff_samples
return utilities.allreduce_sum(res, self._comm)/self._n_eff_samples
@property
def metric(self):
......@@ -218,7 +210,7 @@ class MetricGaussianKL(Energy):
else:
ntask = self._comm.Get_size()
rank = self._comm.Get_rank()
rank_lo_hi = [_shareRange(self._n_samples, ntask, i) for i in range(ntask)]
rank_lo_hi = [utilities.shareRange(self._n_samples, ntask, i) for i in range(ntask)]
for itask, (l, h) in enumerate(rank_lo_hi):
for i in range(l, h):
data = self._local_samples[i-self._lo] if rank == itask else None
......@@ -227,63 +219,6 @@ class MetricGaussianKL(Energy):
if self._mirror_samples:
yield -s
def _sumup(self, obj):
""" This is a deterministic implementation of MPI allreduce
Numeric addition is not associative due to rounding errors.
Therefore we provide our own implementation that is consistent
no matter if MPI is used and how many tasks there are.
At the beginning, a list `who` is constructed, that states which obj can
be found on which MPI task.
Then elements are added pairwise, with increasing pair distance.
In the first round, the distance between pair members is 1:
v[0] := v[0] + v[1]
v[2] := v[2] + v[3]
v[4] := v[4] + v[5]
Entries whose summation partner lies beyond the end of the array
stay unchanged.
When both summation partners are not located on the same MPI task,
the second summand is sent to the task holding the first summand and
the operation is carried out there.
For the next round, the distance is doubled:
v[0] := v[0] + v[2]
v[4] := v[4] + v[6]
v[8] := v[8] + v[10]
This is repeated until the distance exceeds the length of the array.
At this point v[0] contains the sum of all entries, which is then
broadcast to all tasks.
"""
if self._comm is None:
who = np.zeros(self._n_samples, dtype=np.int32)
rank = 0
vals = list(obj) # necessary since we don't want to modify `obj`
else:
ntask = self._comm.Get_size()
rank = self._comm.Get_rank()
rank_lo_hi = [_shareRange(self._n_samples, ntask, i) for i in range(ntask)]
lo, hi = rank_lo_hi[rank]
vals = [None]*lo + list(obj) + [None]*(self._n_samples-hi)
who = [t for t, (l, h) in enumerate(rank_lo_hi) for cnt in range(h-l)]
step = 1
while step < self._n_samples:
for j in range(0, self._n_samples, 2*step):
if j+step < self._n_samples: # summation partner found
if rank == who[j]:
if who[j] == who[j+step]: # no communication required
vals[j] = vals[j] + vals[j+step]
vals[j+step] = None
else:
vals[j] = vals[j] + self._comm.recv(source=who[j+step])
elif rank == who[j+step]:
self._comm.send(vals[j+step], dest=who[j])
vals[j+step] = None
step *= 2
if self._comm is None:
return vals[0]
return self._comm.bcast(vals[0], root=who[0])
def _metric_sample(self, from_inverse=False):
if from_inverse:
raise NotImplementedError()
......@@ -296,4 +231,4 @@ class MetricGaussianKL(Energy):
if self._mirror_samples:
tmp = tmp + self._hamiltonian(lin-v).metric.draw_sample(from_inverse=False)
samp.append(tmp)
return self._sumup(samp)/self._n_eff_samples
return utilities.allreduce_sum(samp, self._comm)/self._n_eff_samples
......@@ -248,3 +248,121 @@ def iscomplextype(dtype):
def indent(inp):
return "\n".join(((" "+s).rstrip() for s in inp.splitlines()))
def shareRange(nwork, nshares, myshare):
"""Divides a number of work items as fairly as possible into a given number
of shares.
Parameters
----------
nwork: int
number of work items
nshares: int
number of shares among which the work should be distributed
myshare: int
the share for which the range of work items is requested
Returns
-------
lo, hi: int
index range of work items for this share
"""
nbase = nwork//nshares
additional = nwork % nshares
lo = myshare*nbase + min(myshare, additional)
hi = lo + nbase + int(myshare < additional)
return lo, hi
def get_MPI_params():
"""Returns basic information about the MPI setup of the running script.
Returns
-------
comm: MPI communicator or None
if MPI is detected _and_ more than one task is active, returns
the world communicator, else returns None
size: int
the number of tasks running in total
rank: int
the rank of this task
master: bool
True if rank == 0, else False
"""
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
if size == 1:
return None, 1, 0, True
rank = comm.Get_rank()
return comm, size, rank, rank == 0
except ImportError:
return None, 1, 0, True
def allreduce_sum(obj, comm):
""" This is a deterministic implementation of MPI allreduce
Numeric addition is not associative due to rounding errors.
Therefore we provide our own implementation that is consistent
no matter if MPI is used and how many tasks there are.
At the beginning, a list `who` is constructed, that states which obj can
be found on which MPI task.
Then elements are added pairwise, with increasing pair distance.
In the first round, the distance between pair members is 1:
v[0] := v[0] + v[1]
v[2] := v[2] + v[3]
v[4] := v[4] + v[5]
Entries whose summation partner lies beyond the end of the array
stay unchanged.
When both summation partners are not located on the same MPI task,
the second summand is sent to the task holding the first summand and
the operation is carried out there.
For the next round, the distance is doubled:
v[0] := v[0] + v[2]
v[4] := v[4] + v[6]
v[8] := v[8] + v[10]
This is repeated until the distance exceeds the length of the array.
At this point v[0] contains the sum of all entries, which is then
broadcast to all tasks.
"""
vals = list(obj)
if comm is None:
nobj = len(vals)
who = np.zeros(nobj, dtype=np.int32)
rank = 0
else:
ntask = comm.Get_size()
rank = comm.Get_rank()
nobj_list = comm.allgather(len(vals))
all_hi = list(np.cumsum(nobj_list))
all_lo = [0] + all_hi[:-1]
nobj = all_hi[-1]
rank_lo_hi = [(l, h) for l, h in zip(all_lo, all_hi)]
lo, hi = rank_lo_hi[rank]
vals = [None]*lo + vals + [None]*(nobj-hi)
who = [t for t, (l, h) in enumerate(rank_lo_hi) for cnt in range(h-l)]
step = 1
while step < nobj:
for j in range(0, nobj, 2*step):
if j+step < nobj: # summation partner found
if rank == who[j]:
if who[j] == who[j+step]: # no communication required
vals[j] = vals[j] + vals[j+step]
vals[j+step] = None
else:
vals[j] = vals[j] + comm.recv(source=who[j+step])
elif rank == who[j+step]:
comm.send(vals[j+step], dest=who[j])
vals[j+step] = None
step *= 2
if comm is None:
return vals[0]
return comm.bcast(vals[0], root=who[0])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment