Commit 46f2d697 authored by Philipp Arras's avatar Philipp Arras
Browse files

Merge branch 'nifty6tonifty7' into 'NIFTy_7'

Nifty6tonifty7

See merge request !526
parents 1299dd5e e47bd9ed
Pipeline #76111 passed with stages
in 13 minutes and 42 seconds
...@@ -248,7 +248,8 @@ def _linearization_value_consistency(op, loc): ...@@ -248,7 +248,8 @@ def _linearization_value_consistency(op, loc):
assert_allclose(fld0, fld1, 0, 1e-7) assert_allclose(fld0, fld1, 0, 1e-7)
def check_jacobian_consistency(op, loc, tol=1e-8, ntries=100, perf_check=True): def check_jacobian_consistency(op, loc, tol=1e-8, ntries=100, perf_check=True,
only_r_differentiable=True):
""" """
Checks the Jacobian of an operator against its finite difference Checks the Jacobian of an operator against its finite difference
approximation. approximation.
...@@ -267,6 +268,9 @@ def check_jacobian_consistency(op, loc, tol=1e-8, ntries=100, perf_check=True): ...@@ -267,6 +268,9 @@ def check_jacobian_consistency(op, loc, tol=1e-8, ntries=100, perf_check=True):
Tolerance for the check. Tolerance for the check.
perf_check : Boolean perf_check : Boolean
Do performance check. May be disabled for very unimportant operators. Do performance check. May be disabled for very unimportant operators.
only_r_differentiable : Boolean
Jacobians of C-differentiable operators need to be C-linear.
Default: True
""" """
_domain_check(op) _domain_check(op)
_actual_domain_check_nonlinear(op, loc) _actual_domain_check_nonlinear(op, loc)
...@@ -297,10 +301,7 @@ def check_jacobian_consistency(op, loc, tol=1e-8, ntries=100, perf_check=True): ...@@ -297,10 +301,7 @@ def check_jacobian_consistency(op, loc, tol=1e-8, ntries=100, perf_check=True):
raise ValueError("gradient and value seem inconsistent") raise ValueError("gradient and value seem inconsistent")
loc = locnext loc = locnext
# FIXME The following code shows that we need prober tests for complex
# derivatives
ddtype = loc.values()[0].dtype if isinstance(loc, MultiField) else loc.dtype ddtype = loc.values()[0].dtype if isinstance(loc, MultiField) else loc.dtype
tdtype = dirder.values()[0].dtype if isinstance(dirder, MultiField) else dirder.dtype tdtype = dirder.values()[0].dtype if isinstance(dirder, MultiField) else dirder.dtype
only_r_linear = ddtype != tdtype
consistency_check(linmid.jac, domain_dtype=ddtype, target_dtype=tdtype, consistency_check(linmid.jac, domain_dtype=ddtype, target_dtype=tdtype,
only_r_linear=only_r_linear) only_r_linear=only_r_differentiable)
...@@ -129,6 +129,7 @@ class _LognormalMomentMatching(Operator): ...@@ -129,6 +129,7 @@ class _LognormalMomentMatching(Operator):
op = _normal(logmean, logsig, key, N_copies).ptw("exp") op = _normal(logmean, logsig, key, N_copies).ptw("exp")
self._domain, self._target = op.domain, op.target self._domain, self._target = op.domain, op.target
self.apply = op.apply self.apply = op.apply
self._repr_str = f"_LognormalMomentMatching: " + op.__repr__()
@property @property
def mean(self): def mean(self):
...@@ -138,6 +139,9 @@ class _LognormalMomentMatching(Operator): ...@@ -138,6 +139,9 @@ class _LognormalMomentMatching(Operator):
def std(self): def std(self):
return self._sig return self._sig
def __repr__(self):
return self._repr_str
class _SlopeRemover(EndomorphicOperator): class _SlopeRemover(EndomorphicOperator):
def __init__(self, domain, space=0): def __init__(self, domain, space=0):
...@@ -346,11 +350,15 @@ class _Amplitude(Operator): ...@@ -346,11 +350,15 @@ class _Amplitude(Operator):
self.apply = op.apply self.apply = op.apply
self._domain, self._target = op.domain, op.target self._domain, self._target = op.domain, op.target
self._space = space self._space = space
self._repr_str = "_Amplitude: " + op.__repr__()
@property @property
def fluctuation_amplitude(self): def fluctuation_amplitude(self):
return self._fluc return self._fluc
def __repr__(self):
return self._repr_str
class CorrelatedFieldMaker: class CorrelatedFieldMaker:
"""Constrution helper for hirarchical correlated field models. """Constrution helper for hirarchical correlated field models.
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
import numpy as np import numpy as np
from .. import random from .. import random, utilities
from ..linearization import Linearization from ..linearization import Linearization
from ..logger import logger from ..logger import logger
from ..multi_field import MultiField from ..multi_field import MultiField
...@@ -28,14 +28,6 @@ from ..sugar import makeDomain, makeOp ...@@ -28,14 +28,6 @@ from ..sugar import makeDomain, makeOp
from .energy import Energy from .energy import Energy
def _shareRange(nwork, nshares, myshare):
nbase = nwork//nshares
additional = nwork % nshares
lo = myshare*nbase + min(myshare, additional)
hi = lo + nbase + int(myshare < additional)
return lo, hi
class _KLMetric(EndomorphicOperator): class _KLMetric(EndomorphicOperator):
def __init__(self, KL): def __init__(self, KL):
self._KL = KL self._KL = KL
...@@ -144,7 +136,7 @@ class MetricGaussianKL(Energy): ...@@ -144,7 +136,7 @@ class MetricGaussianKL(Energy):
self._comm = comm self._comm = comm
ntask = self._comm.Get_size() ntask = self._comm.Get_size()
rank = self._comm.Get_rank() rank = self._comm.Get_rank()
self._lo, self._hi = _shareRange(self._n_samples, ntask, rank) self._lo, self._hi = utilities.shareRange(self._n_samples, ntask, rank)
else: else:
self._comm = None self._comm = None
self._lo, self._hi = 0, self._n_samples self._lo, self._hi = 0, self._n_samples
...@@ -186,10 +178,10 @@ class MetricGaussianKL(Energy): ...@@ -186,10 +178,10 @@ class MetricGaussianKL(Energy):
tg = tg + tmp.gradient tg = tg + tmp.gradient
v.append(tv) v.append(tv)
g.append(tg) g.append(tg)
self._val = self._sumup(v)[()]/self._n_eff_samples self._val = utilities.allreduce_sum(v, self._comm)[()]/self._n_eff_samples
if self._mitigate_nans and np.isnan(self._val): if np.isnan(self._val) and self._mitigate_nans:
self._val = np.inf self._val = np.inf
self._grad = self._sumup(g)/self._n_eff_samples self._grad = utilities.allreduce_sum(g, self._comm)/self._n_eff_samples
def at(self, position): def at(self, position):
return MetricGaussianKL( return MetricGaussianKL(
...@@ -213,7 +205,7 @@ class MetricGaussianKL(Energy): ...@@ -213,7 +205,7 @@ class MetricGaussianKL(Energy):
if self._mirror_samples: if self._mirror_samples:
tmp = tmp + self._hamiltonian(lin-s).metric(x) tmp = tmp + self._hamiltonian(lin-s).metric(x)
res.append(tmp) res.append(tmp)
return self._sumup(res)/self._n_eff_samples return utilities.allreduce_sum(res, self._comm)/self._n_eff_samples
@property @property
def metric(self): def metric(self):
...@@ -229,7 +221,7 @@ class MetricGaussianKL(Energy): ...@@ -229,7 +221,7 @@ class MetricGaussianKL(Energy):
else: else:
ntask = self._comm.Get_size() ntask = self._comm.Get_size()
rank = self._comm.Get_rank() rank = self._comm.Get_rank()
rank_lo_hi = [_shareRange(self._n_samples, ntask, i) for i in range(ntask)] rank_lo_hi = [utilities.shareRange(self._n_samples, ntask, i) for i in range(ntask)]
for itask, (l, h) in enumerate(rank_lo_hi): for itask, (l, h) in enumerate(rank_lo_hi):
for i in range(l, h): for i in range(l, h):
data = self._local_samples[i-self._lo] if rank == itask else None data = self._local_samples[i-self._lo] if rank == itask else None
...@@ -238,63 +230,6 @@ class MetricGaussianKL(Energy): ...@@ -238,63 +230,6 @@ class MetricGaussianKL(Energy):
if self._mirror_samples: if self._mirror_samples:
yield -s yield -s
def _sumup(self, obj):
""" This is a deterministic implementation of MPI allreduce
Numeric addition is not associative due to rounding errors.
Therefore we provide our own implementation that is consistent
no matter if MPI is used and how many tasks there are.
At the beginning, a list `who` is constructed, that states which obj can
be found on which MPI task.
Then elements are added pairwise, with increasing pair distance.
In the first round, the distance between pair members is 1:
v[0] := v[0] + v[1]
v[2] := v[2] + v[3]
v[4] := v[4] + v[5]
Entries whose summation partner lies beyond the end of the array
stay unchanged.
When both summation partners are not located on the same MPI task,
the second summand is sent to the task holding the first summand and
the operation is carried out there.
For the next round, the distance is doubled:
v[0] := v[0] + v[2]
v[4] := v[4] + v[6]
v[8] := v[8] + v[10]
This is repeated until the distance exceeds the length of the array.
At this point v[0] contains the sum of all entries, which is then
broadcast to all tasks.
"""
if self._comm is None:
who = np.zeros(self._n_samples, dtype=np.int32)
rank = 0
vals = list(obj) # necessary since we don't want to modify `obj`
else:
ntask = self._comm.Get_size()
rank = self._comm.Get_rank()
rank_lo_hi = [_shareRange(self._n_samples, ntask, i) for i in range(ntask)]
lo, hi = rank_lo_hi[rank]
vals = [None]*lo + list(obj) + [None]*(self._n_samples-hi)
who = [t for t, (l, h) in enumerate(rank_lo_hi) for cnt in range(h-l)]
step = 1
while step < self._n_samples:
for j in range(0, self._n_samples, 2*step):
if j+step < self._n_samples: # summation partner found
if rank == who[j]:
if who[j] == who[j+step]: # no communication required
vals[j] = vals[j] + vals[j+step]
vals[j+step] = None
else:
vals[j] = vals[j] + self._comm.recv(source=who[j+step])
elif rank == who[j+step]:
self._comm.send(vals[j+step], dest=who[j])
vals[j+step] = None
step *= 2
if self._comm is None:
return vals[0]
return self._comm.bcast(vals[0], root=who[0])
def _metric_sample(self, from_inverse=False): def _metric_sample(self, from_inverse=False):
if from_inverse: if from_inverse:
raise NotImplementedError() raise NotImplementedError()
...@@ -311,4 +246,4 @@ class MetricGaussianKL(Energy): ...@@ -311,4 +246,4 @@ class MetricGaussianKL(Energy):
if self._mirror_samples: if self._mirror_samples:
tmp = tmp + self._hamiltonian(lin-v).metric.draw_sample(from_inverse=False) tmp = tmp + self._hamiltonian(lin-v).metric.draw_sample(from_inverse=False)
samp.append(tmp) samp.append(tmp)
return self._sumup(samp)/self._n_eff_samples return utilities.allreduce_sum(samp, self._comm)/self._n_eff_samples
...@@ -325,6 +325,9 @@ class _FunctionApplier(Operator): ...@@ -325,6 +325,9 @@ class _FunctionApplier(Operator):
self._check_input(x) self._check_input(x)
return x.ptw(self._funcname, *self._args, **self._kwargs) return x.ptw(self._funcname, *self._args, **self._kwargs)
def __repr__(self):
return f"_FunctionApplier ('{self._funcname}')"
class _CombinedOperator(Operator): class _CombinedOperator(Operator):
def __init__(self, ops, _callingfrommake=False): def __init__(self, ops, _callingfrommake=False):
......
...@@ -248,3 +248,121 @@ def iscomplextype(dtype): ...@@ -248,3 +248,121 @@ def iscomplextype(dtype):
def indent(inp): def indent(inp):
return "\n".join(((" "+s).rstrip() for s in inp.splitlines())) return "\n".join(((" "+s).rstrip() for s in inp.splitlines()))
def shareRange(nwork, nshares, myshare):
"""Divides a number of work items as fairly as possible into a given number
of shares.
Parameters
----------
nwork: int
number of work items
nshares: int
number of shares among which the work should be distributed
myshare: int
the share for which the range of work items is requested
Returns
-------
lo, hi: int
index range of work items for this share
"""
nbase = nwork//nshares
additional = nwork % nshares
lo = myshare*nbase + min(myshare, additional)
hi = lo + nbase + int(myshare < additional)
return lo, hi
def get_MPI_params():
"""Returns basic information about the MPI setup of the running script.
Returns
-------
comm: MPI communicator or None
if MPI is detected _and_ more than one task is active, returns
the world communicator, else returns None
size: int
the number of tasks running in total
rank: int
the rank of this task
master: bool
True if rank == 0, else False
"""
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
if size == 1:
return None, 1, 0, True
rank = comm.Get_rank()
return comm, size, rank, rank == 0
except ImportError:
return None, 1, 0, True
def allreduce_sum(obj, comm):
""" This is a deterministic implementation of MPI allreduce
Numeric addition is not associative due to rounding errors.
Therefore we provide our own implementation that is consistent
no matter if MPI is used and how many tasks there are.
At the beginning, a list `who` is constructed, that states which obj can
be found on which MPI task.
Then elements are added pairwise, with increasing pair distance.
In the first round, the distance between pair members is 1:
v[0] := v[0] + v[1]
v[2] := v[2] + v[3]
v[4] := v[4] + v[5]
Entries whose summation partner lies beyond the end of the array
stay unchanged.
When both summation partners are not located on the same MPI task,
the second summand is sent to the task holding the first summand and
the operation is carried out there.
For the next round, the distance is doubled:
v[0] := v[0] + v[2]
v[4] := v[4] + v[6]
v[8] := v[8] + v[10]
This is repeated until the distance exceeds the length of the array.
At this point v[0] contains the sum of all entries, which is then
broadcast to all tasks.
"""
vals = list(obj)
if comm is None:
nobj = len(vals)
who = np.zeros(nobj, dtype=np.int32)
rank = 0
else:
ntask = comm.Get_size()
rank = comm.Get_rank()
nobj_list = comm.allgather(len(vals))
all_hi = list(np.cumsum(nobj_list))
all_lo = [0] + all_hi[:-1]
nobj = all_hi[-1]
rank_lo_hi = [(l, h) for l, h in zip(all_lo, all_hi)]
lo, hi = rank_lo_hi[rank]
vals = [None]*lo + vals + [None]*(nobj-hi)
who = [t for t, (l, h) in enumerate(rank_lo_hi) for cnt in range(h-l)]
step = 1
while step < nobj:
for j in range(0, nobj, 2*step):
if j+step < nobj: # summation partner found
if rank == who[j]:
if who[j] == who[j+step]: # no communication required
vals[j] = vals[j] + vals[j+step]
vals[j+step] = None
else:
vals[j] = vals[j] + comm.recv(source=who[j+step])
elif rank == who[j+step]:
comm.send(vals[j+step], dest=who[j])
vals[j+step] = None
step *= 2
if comm is None:
return vals[0]
return comm.bcast(vals[0], root=who[0])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment