Commit a8c78eb9 authored by Philipp Arras's avatar Philipp Arras
Browse files

Merge branch 'deterministic_mpi_kl' into 'NIFTy_6'

Deterministic MPI KL

See merge request !510
parents 747f275d b34559db
Pipeline #75611 passed with stages
in 14 minutes and 17 seconds
......@@ -36,27 +36,6 @@ def _shareRange(nwork, nshares, myshare):
return lo, hi
def _np_allreduce_sum(comm, arr):
if comm is None:
return arr
from mpi4py import MPI
arr = np.array(arr)
res = np.empty_like(arr)
comm.Allreduce(arr, res, MPI.SUM)
return res
def _allreduce_sum_field(comm, fld):
if comm is None:
return fld
if isinstance(fld, Field):
return Field(fld.domain, _np_allreduce_sum(comm, fld.val))
res = tuple(
Field(f.domain, _np_allreduce_sum(comm, f.val))
for f in fld.values())
return MultiField(fld.domain, res)
class _KLMetric(EndomorphicOperator):
def __init__(self, KL):
self._KL = KL
......@@ -185,26 +164,21 @@ class MetricGaussianKL(Energy):
raise ValueError("# of samples mismatch")
self._local_samples = _local_samples
self._lin = Linearization.make_partial_var(mean, self._constants)
v, g = None, None
if len(self._local_samples) == 0: # hack if there are too many MPI tasks
tmp = self._hamiltonian(self._lin)
v = 0. * tmp.val.val
g = 0. * tmp.gradient
else:
for s in self._local_samples:
tmp = self._hamiltonian(self._lin+s)
if self._mirror_samples:
tmp = tmp + self._hamiltonian(self._lin-s)
if v is None:
v = tmp.val.val_rw()
g = tmp.gradient
else:
v += tmp.val.val
g = g + tmp.gradient
self._val = _np_allreduce_sum(self._comm, v)[()] / self._n_eff_samples
v, g = [], []
for s in self._local_samples:
tmp = self._hamiltonian(self._lin+s)
tv = tmp.val.val
tg = tmp.gradient
if self._mirror_samples:
tmp = self._hamiltonian(self._lin-s)
tv = tv + tmp.val.val
tg = tg + tmp.gradient
v.append(tv)
g.append(tg)
self._val = self._sumup(v)[()]/self._n_eff_samples
if np.isnan(self._val) and self._mitigate_nans:
self._val = np.inf
self._grad = _allreduce_sum_field(self._comm, g) / self._n_eff_samples
self._grad = self._sumup(g)/self._n_eff_samples
self._metric = None
def at(self, position):
......@@ -221,24 +195,15 @@ class MetricGaussianKL(Energy):
def gradient(self):
return self._grad
def _get_metric(self):
lin = self._lin.with_want_metric()
if self._metric is None:
if len(self._local_samples) == 0: # hack if there are too many MPI tasks
self._metric = self._hamiltonian(lin).metric.scale(0.)
else:
mymap = map(lambda v: self._hamiltonian(lin+v).metric,
self._local_samples)
unscaled_metric = utilities.my_sum(mymap)
if self._mirror_samples:
mymap = map(lambda v: self._hamiltonian(lin-v).metric,
self._local_samples)
unscaled_metric = unscaled_metric + utilities.my_sum(mymap)
self._metric = unscaled_metric.scale(1./self._n_eff_samples)
def apply_metric(self, x):
self._get_metric()
return _allreduce_sum_field(self._comm, self._metric(x))
lin = self._lin.with_want_metric()
res = []
for s in self._local_samples:
tmp = self._hamiltonian(lin+s).metric(x)
if self._mirror_samples:
tmp = tmp + self._hamiltonian(lin-s).metric(x)
res.append(tmp)
return self._sumup(res)/self._n_eff_samples
@property
def metric(self):
......@@ -263,15 +228,35 @@ class MetricGaussianKL(Energy):
if self._mirror_samples:
yield -s
def _sumup(self, obj):
# This is a deterministic implementation of MPI allreduce in the sense
# that it takes into account that floating point operations are not
# associative.
res = None
if self._comm is None:
for o in obj:
res = o if res is None else res + o
else:
ntask = self._comm.Get_size()
rank = self._comm.Get_rank()
rank_lo_hi = [_shareRange(self._n_samples, ntask, i) for i in range(ntask)]
for itask, (l, h) in enumerate(rank_lo_hi):
for i in range(l, h):
o = obj[i-self._lo] if rank == itask else None
o = self._comm.bcast(o, root=itask)
res = o if res is None else res + o
return res
def _metric_sample(self, from_inverse=False):
if from_inverse:
raise NotImplementedError()
lin = self._lin.with_want_metric()
samp = full(self._hamiltonian.domain, 0.)
samp = []
sseq = random.spawn_sseq(self._n_samples)
for i, v in enumerate(self._local_samples):
with random.Context(sseq[self._lo+i]):
samp = samp + self._hamiltonian(lin+v).metric.draw_sample(from_inverse=False)
tmp = self._hamiltonian(lin+v).metric.draw_sample(from_inverse=False)
if self._mirror_samples:
samp = samp + self._hamiltonian(lin-v).metric.draw_sample(from_inverse=False)
return _allreduce_sum_field(self._comm, samp)/self._n_eff_samples
tmp = tmp + self._hamiltonian(lin-v).metric.draw_sample(from_inverse=False)
samp.append(tmp)
return self._sumup(samp)/self._n_eff_samples
......@@ -18,7 +18,7 @@
import numpy as np
import pytest
from mpi4py import MPI
from numpy.testing import assert_, assert_allclose
from numpy.testing import assert_, assert_equal
import nifty6 as ift
......@@ -76,33 +76,35 @@ def test_kl(constants, point_estimates, mirror_samples, mode, mf):
assert_(len(tuple(kl1.samples)) == expected_nsamps)
# Test value
assert_allclose(kl0.value, kl1.value)
assert_equal(kl0.value, kl1.value)
# Test gradient
if not mf:
ift.extra.assert_allclose(kl0.gradient, kl1.gradient, 0, 1e-14)
return
for kk in h.domain.keys():
res0 = kl0.gradient[kk].val
if kk in constants:
res0 = 0*res0
res1 = kl1.gradient[kk].val
assert_allclose(res0, res1)
if mf:
for kk in h.domain.keys():
res0 = kl0.gradient[kk].val
if kk in constants:
res0 = 0*res0
res1 = kl1.gradient[kk].val
assert_equal(res0, res1)
else:
assert_equal(kl0.gradient.val, kl1.gradient.val)
# Test point_estimates (after drawing samples)
for kk in point_estimates:
for ss in kl0.samples:
ss = ss[kk].val
assert_allclose(ss, 0*ss)
for ss in kl1.samples:
ss = ss[kk].val
assert_allclose(ss, 0*ss)
if mf:
for kk in point_estimates:
for ss in kl0.samples:
ss = ss[kk].val
assert_equal(ss, 0*ss)
for ss in kl1.samples:
ss = ss[kk].val
assert_equal(ss, 0*ss)
# Test constants (after some minimization)
cg = ift.GradientNormController(iteration_limit=5)
minimizer = ift.NewtonCG(cg)
for e in [kl0, kl1]:
e, _ = minimizer(e)
diff = (mean0 - e.position).to_dict()
for kk in constants:
assert_allclose(diff[kk].val, 0*diff[kk].val)
if mf:
cg = ift.GradientNormController(iteration_limit=5)
minimizer = ift.NewtonCG(cg)
for e in [kl0, kl1]:
e, _ = minimizer(e)
diff = (mean0 - e.position).to_dict()
for kk in constants:
assert_equal(diff[kk].val, 0*diff[kk].val)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment