Commit 76130897 authored by Torsten Ensslin's avatar Torsten Ensslin
Browse files

Merge branch 'do_cleanup' into 'NIFTy_6'

Remove standard MPI parallelization

See merge request !387
parents 5765b889 07ef6674
Pipeline #65084 passed with stages
in 8 minutes and 17 seconds
......@@ -39,17 +39,10 @@ test_serial:
script:
- pytest-3 -q --cov=nifty6 test
- >
python3 -m coverage report --omit "*plot*,*distributed_do*" | tee coverage.txt
python3 -m coverage report --omit "*plot*" | tee coverage.txt
- >
grep TOTAL coverage.txt | awk '{ print "TOTAL: "$4; }'
test_mpi:
stage: test
variables:
OMPI_MCA_btl_vader_single_copy_mechanism: none
script:
- mpiexec -n 2 --bind-to none pytest-3 -q test
pages:
stage: release
script:
......
......@@ -16,3 +16,24 @@ but it is hard to make explicit tests since the two approaches cannot be mapped
onto each other exactly. We experienced that preconditioning in the `MetricGaussianKL`
via `napprox` breaks the inference scheme with the new model so `napprox` may not
be used here.
Removal of the standard parallelization scheme:
===============================================
When several MPI tasks are present, NIFTy5 distributes every Field over these
tasks by splitting it along the first axis. This approach to parallelism is not
very efficient, and it has not been used by anyone for several years, so we
decided to remove it, which led to many simplifications within NIFTy.
User-visible changes:
- the methods `to_global_data`, `from_global_data`, `from_local_data` and
the property `local_data` have been removed from `Field` and `MultiField`.
Instead there are now the property `val` (returning a read-only numpy.ndarray
for `Field` and a dictionary of read-only numpy.ndarrays for `MultiField`) and
the method `val_rw()` (returning the same structures with writable copies of
the arrays). Fields and MultiFields can be created from such structures using
the static method `from_raw`
- the functions `from_global_data` and `from_local_data` in `sugar` have been
replaced by a single function called `makeField`
- the property `local_shape` has been removed from `Domain` (and subclasses)
and `DomainTuple`.
......@@ -296,9 +296,9 @@
"outputs": [],
"source": [
"# Get signal data and reconstruction data\n",
"s_data = HT(sh).to_global_data()\n",
"m_data = HT(m).to_global_data()\n",
"d_data = d.to_global_data()\n",
"s_data = HT(sh).val\n",
"m_data = HT(m).val\n",
"d_data = d.val\n",
"\n",
"plt.figure(figsize=(15,10))\n",
"plt.plot(s_data, 'r', label=\"Signal\", linewidth=3)\n",
......@@ -350,8 +350,8 @@
},
"outputs": [],
"source": [
"s_power_data = ift.power_analyze(sh).to_global_data()\n",
"m_power_data = ift.power_analyze(m).to_global_data()\n",
"s_power_data = ift.power_analyze(sh).val\n",
"m_power_data = ift.power_analyze(m).val\n",
"plt.figure(figsize=(15,10))\n",
"plt.loglog()\n",
"plt.xlim(1, int(N_pixels/2))\n",
......@@ -427,12 +427,12 @@
"\n",
"mask = np.full(s_space.shape, 1.)\n",
"mask[l:h] = 0\n",
"mask = ift.Field.from_global_data(s_space, mask)\n",
"mask = ift.Field.from_raw(s_space, mask)\n",
"\n",
"R = ift.DiagonalOperator(mask)(HT)\n",
"n = n.to_global_data_rw()\n",
"n = n.val_rw()\n",
"n[l:h] = 0\n",
"n = ift.Field.from_global_data(s_space, n)\n",
"n = ift.Field.from_raw(s_space, n)\n",
"\n",
"d = R(sh) + n"
]
......@@ -497,11 +497,11 @@
"outputs": [],
"source": [
"# Get signal data and reconstruction data\n",
"s_data = s.to_global_data()\n",
"m_data = HT(m).to_global_data()\n",
"m_var_data = m_var.to_global_data()\n",
"s_data = s.val\n",
"m_data = HT(m).val\n",
"m_var_data = m_var.val\n",
"uncertainty = np.sqrt(m_var_data)\n",
"d_data = d.to_global_data_rw()\n",
"d_data = d.val_rw()\n",
"\n",
"# Set lost data to NaN for proper plotting\n",
"d_data[d_data == 0] = np.nan"
......@@ -583,12 +583,12 @@
"\n",
"mask = np.full(s_space.shape, 1.)\n",
"mask[l:h,l:h] = 0.\n",
"mask = ift.Field.from_global_data(s_space, mask)\n",
"mask = ift.Field.from_raw(s_space, mask)\n",
"\n",
"R = ift.DiagonalOperator(mask)(HT)\n",
"n = n.to_global_data_rw()\n",
"n = n.val_rw()\n",
"n[l:h, l:h] = 0\n",
"n = ift.Field.from_global_data(s_space, n)\n",
"n = ift.Field.from_raw(s_space, n)\n",
"curv = Curvature(R=R, N=N, Sh=Sh)\n",
"D = curv.inverse\n",
"\n",
......@@ -602,10 +602,10 @@
"m_mean, m_var = ift.probe_with_posterior_samples(curv, HT, 20)\n",
"\n",
"# Get data\n",
"s_data = HT(sh).to_global_data()\n",
"m_data = HT(m).to_global_data()\n",
"m_var_data = m_var.to_global_data()\n",
"d_data = d.to_global_data()\n",
"s_data = HT(sh).val\n",
"m_data = HT(m).val\n",
"m_var_data = m_var.val\n",
"d_data = d.val\n",
"uncertainty = np.sqrt(np.abs(m_var_data))"
]
},
......@@ -653,8 +653,8 @@
"ma = np.max(s_data)\n",
"\n",
"fig, axes = plt.subplots(3, 2, figsize=(15, 22.5))\n",
"sample = HT(curv.draw_sample(from_inverse=True)+m).to_global_data()\n",
"post_mean = (m_mean + HT(m)).to_global_data()\n",
"sample = HT(curv.draw_sample(from_inverse=True)+m).val\n",
"post_mean = (m_mean + HT(m)).val\n",
"\n",
"data = [s_data, m_data, post_mean, sample, s_data - m_data, uncertainty]\n",
"caption = [\"Signal\", \"Reconstruction\", \"Posterior mean\", \"Sample\", \"Residuals\", \"Uncertainty Map\"]\n",
......@@ -731,7 +731,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
"version": "3.7.5"
}
},
"nbformat": 4,
......
......@@ -24,16 +24,16 @@ for ii in range(10, 26):
img = np.random.randn(nu*nv)
img = img.reshape((nu, nv))
img = ift.from_global_data(uvspace, img)
img = ift.makeField(uvspace, img)
t0 = time()
GM = ift.GridderMaker(uvspace, eps=1e-7, uv=uv)
vis = ift.from_global_data(visspace, vis)
vis = ift.makeField(visspace, vis)
op = GM.getFull().adjoint
t1 = time()
op(img).to_global_data()
op(img).val
t2 = time()
op.adjoint(vis).to_global_data()
op.adjoint(vis).val
t3 = time()
print(t2-t1, t3-t2)
N0s.append(N)
......
......@@ -61,9 +61,9 @@ if __name__ == '__main__':
# Generate mock data
p = R(sky)
mock_position = ift.from_random('normal', harmonic_space)
tmp = p(mock_position).to_global_data().astype(np.float64)
tmp = p(mock_position).val.astype(np.float64)
data = np.random.binomial(1, tmp)
data = ift.Field.from_global_data(R.target, data)
data = ift.Field.from_raw(R.target, data)
# Compute likelihood and Hamiltonian
position = ift.from_random('normal', harmonic_space)
......
......@@ -57,7 +57,7 @@ if __name__ == '__main__':
for _ in range(n_samps):
fld = pspec(ift.from_random('normal', pspec.domain))
klengths = fld.domain[0].k_lengths
ycoord = fld.to_global_data_rw()
ycoord = fld.val_rw()
ycoord[0] = ycoord[1]
ax.plot(klengths, ycoord, alpha=1)
......@@ -80,7 +80,7 @@ if __name__ == '__main__':
foo = []
for ax in axs:
pos = ift.from_random('normal', correlated_field.domain)
fld = correlated_field(pos).to_global_data()
fld = correlated_field(pos).val
foo.append((ax, fld))
mi, ma = np.inf, -np.inf
for _, fld in foo:
......@@ -106,7 +106,7 @@ if __name__ == '__main__':
flds = []
for _ in range(n_samps):
pos = ift.from_random('normal', correlated_field.domain)
ax.plot(correlated_field(pos).to_global_data())
ax.plot(correlated_field(pos).val)
plt.savefig('correlated_fields.png')
plt.close()
......@@ -42,7 +42,7 @@ def make_random_mask():
# Random mask for spherical mode
mask = ift.from_random('pm1', position_space)
mask = (mask + 1)/2
return mask.to_global_data()
return mask.val
if __name__ == '__main__':
......@@ -95,7 +95,7 @@ if __name__ == '__main__':
# and harmonic transformaion
# Masking operator to model that parts of the field have not been observed
mask = ift.Field.from_global_data(position_space, mask)
mask = ift.Field.from_raw(position_space, mask)
Mask = ift.MaskOperator(mask)
# The response operator consists of
......
......@@ -40,7 +40,7 @@ def exposure_2d():
exposure[:, x_shape*4//5:x_shape] *= .1
exposure[:, x_shape//2:x_shape*3//2] *= 3.
return ift.Field.from_global_data(position_space, exposure)
return ift.Field.from_raw(position_space, exposure)
if __name__ == '__main__':
......@@ -94,8 +94,8 @@ if __name__ == '__main__':
lamb = R(sky)
mock_position = ift.from_random('normal', domain)
data = lamb(mock_position)
data = np.random.poisson(data.to_global_data().astype(np.float64))
data = ift.Field.from_global_data(d_space, data)
data = np.random.poisson(data.val.astype(np.float64))
data = ift.Field.from_raw(d_space, data)
likelihood = ift.PoissonianEnergy(data)(lamb)
# Settings for minimization
......
......@@ -40,7 +40,7 @@ class SingleDomain(ift.LinearOperator):
def apply(self, x, mode):
self._check_input(x, mode)
return ift.from_global_data(self._tgt(mode), x.to_global_data())
return ift.makeField(self._tgt(mode), x.val)
def random_los(n_los):
......
......@@ -38,11 +38,11 @@ signal_vals = np.zeros(npix, dtype=np.float64)
for i in range(0, npix, npix//12 + 27):
signal_vals[i] = 500.
signal = ift.from_global_data(domain, signal_vals)
signal = ift.makeField(domain, signal_vals)
delta_vals = np.zeros(npix, dtype=np.float64)
delta_vals[0] = 1.0
delta = ift.from_global_data(domain, delta_vals)
delta = ift.makeField(domain, delta_vals)
# Define kernel function
......@@ -58,12 +58,12 @@ domain = ift.RGSpace((100, 100))
signal_vals = np.zeros(domain.shape, dtype=np.float64)
signal_vals[35, 70] = 5000.
signal_vals[45, 8] = 5000.
signal = ift.from_global_data(domain, signal_vals)
signal = ift.makeField(domain, signal_vals)
# Define delta signal, generate kernel image
delta_vals = np.zeros(domain.shape, dtype=np.float64)
delta_vals[0, 0] = 1.0
delta = ift.from_global_data(domain, delta_vals)
delta = ift.makeField(domain, delta_vals)
# Define kernel function
......
......@@ -9,12 +9,12 @@ def testAmplitudesConsistency(seed, sspace):
sc = ift.StatCalculator()
for s in samples:
sc.add(op(s.extract(op.domain)))
return sc.mean.to_global_data(), sc.var.sqrt().to_global_data()
return sc.mean.val, sc.var.sqrt().val
np.random.seed(seed)
offset_std = .1
intergated_fluct_std0 = .003
intergated_fluct_std1 = 0.1
nsam = 1000
......@@ -32,7 +32,7 @@ def testAmplitudesConsistency(seed, sspace):
offset_std,_ = stats(fa.amplitude_total_offset,samples)
intergated_fluct_std0,_ = stats(fa.average_fluctuation(0),samples)
intergated_fluct_std1,_ = stats(fa.average_fluctuation(1),samples)
slice_fluct_std0,_ = stats(fa.slice_fluctuation(0),samples)
slice_fluct_std1,_ = stats(fa.slice_fluctuation(1),samples)
......@@ -54,7 +54,7 @@ def testAmplitudesConsistency(seed, sspace):
print("Expected integrated fluct. frequency Std: " +
str(intergated_fluct_std1))
print("Estimated integrated fluct. frequency Std: " + str(fluct_freq))
print("Expected slice fluct. space Std: " +
str(slice_fluct_std0))
print("Estimated slice fluct. space Std: " + str(sl_fluct_space))
......@@ -65,8 +65,8 @@ def testAmplitudesConsistency(seed, sspace):
print("Expected total fluct. Std: " + str(tot_flm))
print("Estimated total fluct. Std: " + str(fluct_total))
np.testing.assert_allclose(offset_std, zm_std_mean, rtol=0.5)
np.testing.assert_allclose(intergated_fluct_std0, fluct_space, rtol=0.5)
np.testing.assert_allclose(intergated_fluct_std1, fluct_freq, rtol=0.5)
......@@ -74,7 +74,7 @@ def testAmplitudesConsistency(seed, sspace):
np.testing.assert_allclose(slice_fluct_std0, sl_fluct_space, rtol=0.5)
np.testing.assert_allclose(slice_fluct_std1, sl_fluct_freq, rtol=0.5)
fa = ift.CorrelatedFieldMaker.make(offset_std, .1, '')
fa.add_fluctuations(fsspace, intergated_fluct_std1, 1., 3.1, 1., .5, .1,
-4, 1., 'freq')
......@@ -87,7 +87,7 @@ def testAmplitudesConsistency(seed, sspace):
print("Forced slice fluct. space Std: "+str(m))
print("Expected slice fluct. Std: " + str(em))
np.testing.assert_allclose(m, em, rtol=0.5)
assert op.target[0] == sspace
assert op.target[1] == fsspace
......
......@@ -36,7 +36,7 @@ def polynomial(coefficients, sampling_points):
if not (isinstance(coefficients, ift.Field)
and isinstance(sampling_points, np.ndarray)):
raise TypeError
params = coefficients.to_global_data()
params = coefficients.val
out = np.zeros_like(sampling_points)
for ii in range(len(params)):
out += params[ii] * sampling_points**ii
......@@ -71,14 +71,14 @@ class PolynomialResponse(ift.LinearOperator):
def apply(self, x, mode):
self._check_input(x, mode)
val = x.to_global_data_rw()
val = x.val_rw()
if mode == self.TIMES:
# FIXME Use polynomial() here
out = self._mat.dot(val)
else:
# FIXME Can this be optimized?
out = self._mat.conj().T.dot(val)
return ift.from_global_data(self._tgt(mode), out)
return ift.makeField(self._tgt(mode), out)
# Generate some mock data
......@@ -99,8 +99,8 @@ R = PolynomialResponse(p_space, x)
ift.extra.consistency_check(R)
d_space = R.target
d = ift.from_global_data(d_space, y)
N = ift.DiagonalOperator(ift.from_global_data(d_space, var))
d = ift.makeField(d_space, y)
N = ift.DiagonalOperator(ift.makeField(d_space, var))
IC = ift.DeltaEnergyController(tol_rel_deltaE=1e-12, iteration_limit=200)
likelihood = ift.GaussianEnergy(d, N)(R)
......@@ -136,9 +136,8 @@ plt.savefig('fit.png')
plt.close()
# Print parameters
mean = sc.mean.to_global_data()
sigma = np.sqrt(sc.var.to_global_data())
if ift.dobj.master:
for ii in range(len(mean)):
print('Coefficient x**{}: {:.2E} +/- {:.2E}'.format(ii, mean[ii],
mean = sc.mean.val
sigma = np.sqrt(sc.var.val)
for ii in range(len(mean)):
print('Coefficient x**{}: {:.2E} +/- {:.2E}'.format(ii, mean[ii],
sigma[ii]))
from .version import __version__
from . import dobj
from .domains.domain import Domain
from .domains.structured_domain import StructuredDomain
from .domains.unstructured_domain import UnstructuredDomain
......@@ -93,10 +91,5 @@ from .linearization import Linearization
from .operator_spectrum import operator_spectrum
from . import internal_config
_scheme = internal_config.parallelization_scheme()
if _scheme == "Samples":
from .minimization.metric_gaussian_kl_mpi import MetricGaussianKL_MPI
# We deliberately don't set __all__ here, because we don't want people to do a
# "from nifty6 import *"; that would swamp the global namespace.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2019 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik.
import sys
from functools import reduce
import numpy as np
from mpi4py import MPI
from .random import Random
__all__ = ["ntask", "rank", "master", "local_shape", "data_object", "full",
"empty", "zeros", "ones", "empty_like", "vdot", "exp",
"log", "tanh", "sqrt", "from_object", "from_random",
"local_data", "ibegin", "ibegin_from_shape", "np_allreduce_sum",
"np_allreduce_min", "np_allreduce_max",
"distaxis", "from_local_data", "from_global_data", "to_global_data",
"redistribute", "default_distaxis", "is_numpy", "absmax", "norm",
"lock", "locked", "uniform_full", "transpose", "to_global_data_rw",
"ensure_not_distributed", "ensure_default_distributed",
"tanh", "conjugate", "sin", "cos", "tan", "log10", "log1p", "expm1",
"sinh", "cosh", "sinc", "absolute", "sign", "clip"]
_comm = MPI.COMM_WORLD
ntask = _comm.Get_size()
rank = _comm.Get_rank()
master = (rank == 0)
def is_numpy():
return False
def _shareSize(nwork, nshares, myshare):
return (nwork//nshares) + int(myshare < nwork % nshares)
def _shareRange(nwork, nshares, myshare):
nbase = nwork//nshares
additional = nwork % nshares
lo = myshare*nbase + min(myshare, additional)
hi = lo + nbase + int(myshare < additional)
return lo, hi
def local_shape(shape, distaxis=0):
if len(shape) == 0 or distaxis == -1:
return shape
shape2 = list(shape)
shape2[distaxis] = _shareSize(shape[distaxis], ntask, rank)
return tuple(shape2)
class data_object(object):
def __init__(self, shape, data, distaxis):
self._shape = tuple(shape)
if len(self._shape) == 0:
distaxis = -1
if not isinstance(data, np.ndarray):
data = np.full((), data)
self._distaxis = distaxis
self._data = data
if local_shape(self._shape, self._distaxis) != self._data.shape:
raise ValueError("shape mismatch")
def copy(self):
return data_object(self._shape, self._data.copy(), self._distaxis)
# def _sanity_checks(self):
# # check whether the distaxis is consistent
# if self._distaxis < -1 or self._distaxis >= len(self._shape):
# raise ValueError
# itmp = np.array(self._distaxis)
# otmp = np.empty(ntask, dtype=np.int)
# _comm.Allgather(itmp, otmp)
# if np.any(otmp != self._distaxis):
# raise ValueError
# # check whether the global shape is consistent
# itmp = np.array(self._shape)
# otmp = np.empty((ntask, len(self._shape)), dtype=np.int)
# _comm.Allgather(itmp, otmp)
# for i in range(ntask):
# if np.any(otmp[i, :] != self._shape):
# raise ValueError
# # check shape of local data
# if self._distaxis < 0:
# if self._data.shape != self._shape:
# raise ValueError
# else:
# itmp = np.array(self._shape)
# itmp[self._distaxis] = _shareSize(self._shape[self._distaxis],
# ntask, rank)
# if np.any(self._data.shape != itmp):
# raise ValueError
@property
def dtype(self):
return self._data.dtype
@property
def shape(self):
return self._shape
@property
def size(self):
return np.prod(self._shape)
@property
def real(self):
return data_object(self._shape, self._data.real, self._distaxis)
@property
def imag(self):
return data_object(self._shape, self._data.imag, self._distaxis)
def conj(self):
return data_object(self._shape, self._data.conj(), self._distaxis)
def conjugate(self):
return data_object(self._shape, self._data.conjugate(), self._distaxis)
def _contraction_helper(self, op, mpiop, axis):
if axis is not None:
if len(axis) == len(self._data.shape):
axis = None
if axis is None:
res = np.array(getattr(self._data, op)())
if (self._distaxis == -1):
return res[()]
res2 = np.empty((), dtype=res.dtype)
_comm.Allreduce(res, res2, mpiop)
return res2[()]
if self._distaxis in axis:
res = getattr(self._data, op)(axis=axis)
res2 = np.empty_like(res)
_comm.Allreduce(res, res2, mpiop)
return from_global_data(res2, distaxis=0)
else:
# perform the contraction on the local data
res = getattr(self._data, op)(axis=axis)
if self._distaxis == -1:
return from_global_data(res, distaxis=0)
shp = list(res.shape)
shift = 0
for ax in axis:
if ax < self._distaxis:
shift += 1
shp[self._distaxis-shift] = self.shape[self._distaxis]
return from_local_data(shp, res, self._distaxis-shift)
def sum(self, axis=None):
return self._contraction_helper("sum", MPI.SUM, axis)
def prod(self, axis=None):
return self._contraction_helper("prod", MPI.PROD, axis)
# def min(self, axis=None):
# return self._contraction_helper("min", MPI.MIN, axis)
# def max(self, axis=None):
# return self._contraction_helper("max", MPI.MAX, axis)
def mean(self, axis=None):
if axis is None:
sz = self.size
else:
sz = reduce(lambda x, y: x*y, [self.shape[i] for i in axis])
return self.sum(axis)/sz
def std(self, axis=None):
return np.sqrt(self.var(axis))
# FIXME: to be improved!
def var(self, axis=None):