diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 4f38e130e8d99dd7c0c936734241561566a563ea..2c3a1314800e59d4f1eaeac2a957bf97c27fea48 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -34,9 +34,18 @@ build_docker_from_cache: mytest: stage: testing script: - - pytest-3 -q --cov=resolve test.py + - pip3 install . + - pytest-3 -q --cov=resolve test coverage: '/^TOTAL.+?(\d+\%)$/' +test_mpi: + stage: testing + variables: + OMPI_MCA_btl_vader_single_copy_mechanism: none + script: + - pip3 install . + - mpiexec -n 2 --bind-to none pytest-3 -q test/test_mpi + # staticchecks: # stage: testing # script: diff --git a/Dockerfile b/Dockerfile index 692c2a43c218c55671a2ed60a4fb5a54f52e2bbd..49c98916ba192c15fa86f3dbc439c536d04c2b2c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,12 @@ RUN pip3 install scipy git+https://gitlab.mpcdf.mpg.de/ift/nifty.git@NIFTy_7 RUN pip3 install git+https://gitlab.mpcdf.mpg.de/mtr/ducc.git@ducc0 # Optional dependencies RUN pip3 install astropy +RUN apt-get install -qq python3-mpi4py # Testing dependencies RUN apt-get install -qq python3-pytest-cov RUN pip3 install flake8 + +# Create user (openmpi does not like to be run as root) +RUN useradd -ms /bin/bash testinguser +USER testinguser +WORKDIR /home/testinguser diff --git a/resolve/__init__.py b/resolve/__init__.py index 42ff985e6824c5b311ee28c02c039329f5feb0d5..19433247bf696d0c8a817aaab98975edd4c01566 100644 --- a/resolve/__init__.py +++ b/resolve/__init__.py @@ -5,6 +5,7 @@ from .global_config import * from .likelihood import * from .minimization import Minimization, MinimizationState, simple_minimize from .mpi import onlymaster +from .mpi_operators import * from .ms_import import ms2observations, ms_n_spectral_windows from .multi_frequency.irg_space import IRGSpace from .multi_frequency.operators import ( @@ -17,7 +18,7 @@ from .plotter import MfPlotter, Plotter from .points import PointInserter from .polarization import polarization_matrix_exponential from .primary_beam import vla_beam -from .response import MfResponse, StokesIResponse, ResponseDistributor +from .response import MfResponse, ResponseDistributor, StokesIResponse from .simple_operators import * from .util import ( Reshaper, diff --git a/resolve/mpi_operators.py b/resolve/mpi_operators.py new file mode 100644 index 0000000000000000000000000000000000000000..93c261a373a0c11180b15d0402e6d727badc4786 --- /dev/null +++ b/resolve/mpi_operators.py @@ -0,0 +1,74 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# Copyright(C) 2021 Max-Planck-Society +# Author: Philipp Arras + +import numpy as np + +import nifty7 as ift + + +class AllreduceSum(ift.Operator): + def __init__(self, oplist, comm): + self._oplist, self._comm = oplist, comm + self._domain = ift.makeDomain( + _get_global_unique(oplist, lambda op: op.domain, comm) + ) + self._target = ift.makeDomain( + _get_global_unique(oplist, lambda op: op.target, comm) + ) + + def apply(self, x): + self._check_input(x) + if not ift.is_linearization(x): + return ift.utilities.allreduce_sum( + [op(x) for op in self._oplist], self._comm + ) + opx = [op(x) for op in self._oplist] + val = ift.utilities.allreduce_sum([lin.val for lin in opx], self._comm) + jac = AllreduceSumLinear([lin.jac for lin in opx], self._comm) + if _get_global_unique(opx, lambda op: op.metric is None, self._comm): + return x.new(val, jac) + met = AllreduceSumLinear([lin.metric for lin in opx], self._comm) + return x.new(val, jac, met) + + +class AllreduceSumLinear(ift.LinearOperator): + def __init__(self, oplist, comm=None): + assert all(isinstance(oo, ift.LinearOperator) for oo in oplist) + self._domain = ift.makeDomain( + _get_global_unique(oplist, lambda op: op.domain, comm) + ) + self._target = ift.makeDomain( + _get_global_unique(oplist, lambda op: op.target, comm) + ) + cap = _get_global_unique(oplist, lambda op: op._capability, comm) + self._capability = (self.TIMES | self.ADJOINT_TIMES) & cap + self._oplist = oplist + self._comm = comm + local_nwork = [len(oplist)] if comm is None else comm.allgather(len(oplist)) + size, rank, _ = ift.utilities.get_MPI_params_from_comm(comm) + self._nwork = sum(local_nwork) + self._lo = ([0] + list(np.cumsum(local_nwork)))[rank] + + def apply(self, x, mode): + self._check_input(x, mode) + lst = [op.apply(x, mode) for op in self._oplist] + return ift.utilities.allreduce_sum(lst, self._comm) + + def draw_sample(self, from_inverse=False): + sseq = ift.random.spawn_sseq(self._nwork) + local_samples = [] + for ii, op in enumerate(self._oplist): + with ift.random.Context(sseq[self._lo + ii]): + local_samples.append(op.draw_sample(from_inverse)) + return ift.utilities.allreduce_sum(local_samples, self._comm) + + +def _get_global_unique(lst, f, comm): + caps = [f(oo) for oo in lst] + if comm is not None: + caps = comm.allgather(caps) + caps = [aa for cc in caps for aa in cc] + cap = caps[0] + assert all(cc == cap for cc in caps) + return cap diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test.py b/test/test_general.py similarity index 100% rename from test.py rename to test/test_general.py diff --git a/test/test_mpi/__init__.py b/test/test_mpi/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/test/test_mpi/test_add.py b/test/test_mpi/test_add.py new file mode 100644 index 0000000000000000000000000000000000000000..c4a073e139c4c50fc9106eaaa4250cb906ad063d --- /dev/null +++ b/test/test_mpi/test_add.py @@ -0,0 +1,118 @@ +# SPDX-License-Identifier: GPL-3.0-or-later +# Copyright(C) 2021 Max-Planck-Society +# Author: Philipp Arras + +from functools import reduce +from operator import add +from types import GeneratorType + +import numpy as np + +import nifty7 as ift +import resolve as rve + + +def getop(comm): + """Return energy operator that maps the full multi-frequency sky onto + the log-likelihood value for a frequency slice.""" + + d = np.load("data.npy") + invcov = np.load("invcov.npy") + if comm == -1: + nwork = d.shape[0] + ddom = ift.UnstructuredDomain(d[0].shape) + ops = [ + ift.GaussianEnergy( + ift.makeField(ddom, d[ii]), ift.makeOp(ift.makeField(ddom, invcov[ii])) + ) + for ii in range(nwork) + ] + op = reduce(add, ops) + else: + nwork = d.shape[0] + size, rank, _ = ift.utilities.get_MPI_params_from_comm(comm) + lo, hi = ift.utilities.shareRange(nwork, size, rank) + local_indices = range(lo, hi) + lst = [] + for ii in local_indices: + ddom = ift.UnstructuredDomain(d[ii].shape) + dd = ift.makeField(ddom, d[ii]) + iicc = ift.makeOp(ift.makeField(ddom, invcov[ii])) + ee = ift.GaussianEnergy(dd, iicc) + lst.append(ee) + op = rve.AllreduceSum(lst, comm) + ift.extra.check_operator(op, ift.from_random(op.domain)) + sky = ift.FieldAdapter(op.domain, "sky") + return op @ sky.exp() + + +def allclose(gen): + ref = next(gen) if isinstance(gen, GeneratorType) else gen[0] + for aa in gen: + ift.extra.assert_allclose(ref, aa) + + +def test_mpi_adder(): + ddomain = ift.UnstructuredDomain(4), ift.UnstructuredDomain(1) + comm, size, rank, master = ift.utilities.get_MPI_params() + data = ift.from_random(ddomain) + invcov = ift.from_random(ddomain).exp() + if master: + np.save("data.npy", data.val) + np.save("invcov.npy", invcov.val) + if comm is not None: + comm.Barrier() + + lhs = getop(-1), getop(None), getop(comm) + hams = tuple( + ift.StandardHamiltonian(lh, ift.GradientNormController(iteration_limit=10)) + for lh in lhs + ) + lhs_for_sampling = lhs[1:] + hams_for_sampling = hams[1:] + + # Evaluate Field + dom, tgt = lhs[0].domain, lhs[0].target + pos = ift.from_random(dom) + allclose(op(pos) for op in lhs) + + # Evaluate Linearization + for wm in [False, True]: + lin = ift.Linearization.make_var(pos, wm) + allclose(lh(lin).val for lh in lhs) + allclose(lh(lin).gradient for lh in lhs) + + for _ in range(10): + foo = ift.Linearization.make_var(ift.from_random(dom), wm) + bar = ift.from_random(dom) + allclose(lh(foo).jac(bar) for lh in lhs) + if wm: + allclose(lh(foo).metric(bar) for lh in lhs) + bar = ift.from_random(tgt) + allclose(lh(foo).jac.adjoint(bar) for lh in lhs) + + # Minimization + pos = ift.from_random(dom) + es = tuple(ift.EnergyAdapter(pos, ham, want_metric=wm) for ham in hams) + ic = ift.GradientNormController(iteration_limit=5) + mini = ift.NewtonCG(ic) if wm else ift.SteepestDescent(ic) + allclose(mini(e)[0].position for e in es) + + # Draw samples + lin = ift.Linearization.make_var(ift.from_random(dom), True) + samps_lh, samps_ham = [], [] + for ii, (llhh, hh) in enumerate(zip(lhs_for_sampling, hams_for_sampling)): + with ift.random.Context(42): + samps_lh.append(llhh(lin).metric.draw_sample()) + with ift.random.Context(42): + samps_ham.append(hh(lin).metric.draw_sample()) + allclose(samps_lh) + allclose(samps_ham) + + mini_results = [] + for ham in hams_for_sampling: + with ift.random.Context(42): + mini_results.append( + mini(ift.MetricGaussianKL.make(pos, ham, 3, True))[0].position + ) + allclose(mini_results)