diff --git a/nifty6/minimization/metric_gaussian_kl.py b/nifty6/minimization/metric_gaussian_kl.py index 77a8120866efe8bb52f4e6e9143ab95cfe68965d..4491bf617e32c3e2c1da89284d9b7a272c327f73 100644 --- a/nifty6/minimization/metric_gaussian_kl.py +++ b/nifty6/minimization/metric_gaussian_kl.py @@ -28,14 +28,6 @@ from ..sugar import full, makeOp from .energy import Energy -def _shareRange(nwork, nshares, myshare): - nbase = nwork//nshares - additional = nwork % nshares - lo = myshare*nbase + min(myshare, additional) - hi = lo + nbase + int(myshare < additional) - return lo, hi - - class _KLMetric(EndomorphicOperator): def __init__(self, KL): self._KL = KL @@ -138,7 +130,7 @@ class MetricGaussianKL(Energy): self._comm = comm ntask = self._comm.Get_size() rank = self._comm.Get_rank() - self._lo, self._hi = _shareRange(self._n_samples, ntask, rank) + self._lo, self._hi = utilities.shareRange(self._n_samples, ntask, rank) else: self._comm = None self._lo, self._hi = 0, self._n_samples @@ -175,10 +167,10 @@ class MetricGaussianKL(Energy): tg = tg + tmp.gradient v.append(tv) g.append(tg) - self._val = self._sumup(v)[()]/self._n_eff_samples + self._val = utilities.allreduce_sum(v, self._comm)[()]/self._n_eff_samples if np.isnan(self._val) and self._mitigate_nans: self._val = np.inf - self._grad = self._sumup(g)/self._n_eff_samples + self._grad = utilities.allreduce_sum(g, self._comm)/self._n_eff_samples def at(self, position): return MetricGaussianKL( @@ -202,7 +194,7 @@ class MetricGaussianKL(Energy): if self._mirror_samples: tmp = tmp + self._hamiltonian(lin-s).metric(x) res.append(tmp) - return self._sumup(res)/self._n_eff_samples + return utilities.allreduce_sum(res, self._comm)/self._n_eff_samples @property def metric(self): @@ -218,7 +210,7 @@ class MetricGaussianKL(Energy): else: ntask = self._comm.Get_size() rank = self._comm.Get_rank() - rank_lo_hi = [_shareRange(self._n_samples, ntask, i) for i in range(ntask)] + rank_lo_hi = [utilities.shareRange(self._n_samples, ntask, i) for i in range(ntask)] for itask, (l, h) in enumerate(rank_lo_hi): for i in range(l, h): data = self._local_samples[i-self._lo] if rank == itask else None @@ -227,63 +219,6 @@ class MetricGaussianKL(Energy): if self._mirror_samples: yield -s - def _sumup(self, obj): - """ This is a deterministic implementation of MPI allreduce - - Numeric addition is not associative due to rounding errors. - Therefore we provide our own implementation that is consistent - no matter if MPI is used and how many tasks there are. - - At the beginning, a list `who` is constructed, that states which obj can - be found on which MPI task. - Then elements are added pairwise, with increasing pair distance. - In the first round, the distance between pair members is 1: - v[0] := v[0] + v[1] - v[2] := v[2] + v[3] - v[4] := v[4] + v[5] - Entries whose summation partner lies beyond the end of the array - stay unchanged. - When both summation partners are not located on the same MPI task, - the second summand is sent to the task holding the first summand and - the operation is carried out there. - For the next round, the distance is doubled: - v[0] := v[0] + v[2] - v[4] := v[4] + v[6] - v[8] := v[8] + v[10] - This is repeated until the distance exceeds the length of the array. - At this point v[0] contains the sum of all entries, which is then - broadcast to all tasks. - """ - if self._comm is None: - who = np.zeros(self._n_samples, dtype=np.int32) - rank = 0 - vals = list(obj) # necessary since we don't want to modify `obj` - else: - ntask = self._comm.Get_size() - rank = self._comm.Get_rank() - rank_lo_hi = [_shareRange(self._n_samples, ntask, i) for i in range(ntask)] - lo, hi = rank_lo_hi[rank] - vals = [None]*lo + list(obj) + [None]*(self._n_samples-hi) - who = [t for t, (l, h) in enumerate(rank_lo_hi) for cnt in range(h-l)] - - step = 1 - while step < self._n_samples: - for j in range(0, self._n_samples, 2*step): - if j+step < self._n_samples: # summation partner found - if rank == who[j]: - if who[j] == who[j+step]: # no communication required - vals[j] = vals[j] + vals[j+step] - vals[j+step] = None - else: - vals[j] = vals[j] + self._comm.recv(source=who[j+step]) - elif rank == who[j+step]: - self._comm.send(vals[j+step], dest=who[j]) - vals[j+step] = None - step *= 2 - if self._comm is None: - return vals[0] - return self._comm.bcast(vals[0], root=who[0]) - def _metric_sample(self, from_inverse=False): if from_inverse: raise NotImplementedError() @@ -296,4 +231,4 @@ class MetricGaussianKL(Energy): if self._mirror_samples: tmp = tmp + self._hamiltonian(lin-v).metric.draw_sample(from_inverse=False) samp.append(tmp) - return self._sumup(samp)/self._n_eff_samples + return utilities.allreduce_sum(samp, self._comm)/self._n_eff_samples diff --git a/nifty6/utilities.py b/nifty6/utilities.py index 008fa0ec9f51e824e26d2f5ba75b348f03d9fa13..678c8ee86d03c330efe3696d850eed15bbd3954c 100644 --- a/nifty6/utilities.py +++ b/nifty6/utilities.py @@ -248,3 +248,121 @@ def iscomplextype(dtype): def indent(inp): return "\n".join(((" "+s).rstrip() for s in inp.splitlines())) + + +def shareRange(nwork, nshares, myshare): + """Divides a number of work items as fairly as possible into a given number + of shares. + + Parameters + ---------- + nwork: int + number of work items + nshares: int + number of shares among which the work should be distributed + myshare: int + the share for which the range of work items is requested + + + Returns + ------- + lo, hi: int + index range of work items for this share + """ + + nbase = nwork//nshares + additional = nwork % nshares + lo = myshare*nbase + min(myshare, additional) + hi = lo + nbase + int(myshare < additional) + return lo, hi + + +def get_MPI_params(): + """Returns basic information about the MPI setup of the running script. + + Returns + ------- + comm: MPI communicator or None + if MPI is detected _and_ more than one task is active, returns + the world communicator, else returns None + size: int + the number of tasks running in total + rank: int + the rank of this task + master: bool + True if rank == 0, else False + """ + + try: + from mpi4py import MPI + comm = MPI.COMM_WORLD + size = comm.Get_size() + if size == 1: + return None, 1, 0, True + rank = comm.Get_rank() + return comm, size, rank, rank == 0 + except ImportError: + return None, 1, 0, True + + +def allreduce_sum(obj, comm): + """ This is a deterministic implementation of MPI allreduce + + Numeric addition is not associative due to rounding errors. + Therefore we provide our own implementation that is consistent + no matter if MPI is used and how many tasks there are. + + At the beginning, a list `who` is constructed, that states which obj can + be found on which MPI task. + Then elements are added pairwise, with increasing pair distance. + In the first round, the distance between pair members is 1: + v[0] := v[0] + v[1] + v[2] := v[2] + v[3] + v[4] := v[4] + v[5] + Entries whose summation partner lies beyond the end of the array + stay unchanged. + When both summation partners are not located on the same MPI task, + the second summand is sent to the task holding the first summand and + the operation is carried out there. + For the next round, the distance is doubled: + v[0] := v[0] + v[2] + v[4] := v[4] + v[6] + v[8] := v[8] + v[10] + This is repeated until the distance exceeds the length of the array. + At this point v[0] contains the sum of all entries, which is then + broadcast to all tasks. + """ + vals = list(obj) + if comm is None: + nobj = len(vals) + who = np.zeros(nobj, dtype=np.int32) + rank = 0 + else: + ntask = comm.Get_size() + rank = comm.Get_rank() + nobj_list = comm.allgather(len(vals)) + all_hi = list(np.cumsum(nobj_list)) + all_lo = [0] + all_hi[:-1] + nobj = all_hi[-1] + rank_lo_hi = [(l, h) for l, h in zip(all_lo, all_hi)] + lo, hi = rank_lo_hi[rank] + vals = [None]*lo + vals + [None]*(nobj-hi) + who = [t for t, (l, h) in enumerate(rank_lo_hi) for cnt in range(h-l)] + + step = 1 + while step < nobj: + for j in range(0, nobj, 2*step): + if j+step < nobj: # summation partner found + if rank == who[j]: + if who[j] == who[j+step]: # no communication required + vals[j] = vals[j] + vals[j+step] + vals[j+step] = None + else: + vals[j] = vals[j] + comm.recv(source=who[j+step]) + elif rank == who[j+step]: + comm.send(vals[j+step], dest=who[j]) + vals[j+step] = None + step *= 2 + if comm is None: + return vals[0] + return comm.bcast(vals[0], root=who[0])