Commit 857ee0f3 authored by Martin Reinecke's avatar Martin Reinecke
Browse files

cleanups

parent af334cef
Pipeline #21521 passed with stage
in 4 minutes and 23 seconds
......@@ -12,26 +12,23 @@ __all__ = ["ntask", "rank", "master", "local_shape", "data_object", "full",
_comm = MPI.COMM_WORLD
ntask = _comm.Get_size()
rank = _comm.Get_rank()
master = rank == 0
master = (rank == 0)
def _shareSize(nwork, nshares, myshare):
nbase = nwork//nshares
return nbase if myshare >= nwork % nshares else nbase+1
return (nwork//nshares) + int(myshare < nwork % nshares)
def _shareRange(nwork, nshares, myshare):
nbase = nwork//nshares
additional = nwork % nshares
lo = myshare*nbase + min(myshare, additional)
hi = lo + nbase + (1 if myshare < additional else 0)
hi = lo + nbase + int(myshare<additional)
return lo, hi
def local_shape(shape, distaxis):
if len(shape) == 0:
distaxis = -1
if distaxis == -1:
if len(shape) == 0 or distaxis == -1:
return shape
shape2 = list(shape)
shape2[distaxis] = _shareSize(shape[distaxis], ntask, rank)
......@@ -40,7 +37,6 @@ def local_shape(shape, distaxis):
class data_object(object):
def __init__(self, shape, data, distaxis):
"""Must not be called directly by users"""
self._shape = tuple(shape)
if len(self._shape) == 0:
distaxis = -1
......@@ -139,13 +135,13 @@ class data_object(object):
def max(self, axis=None):
return self._contraction_helper("max", MPI.MAX, axis)
# FIXME: to be improved!
def mean(self):
return self.sum()/self.size
def std(self):
return np.sqrt(self.var())
# FIXME: to be improved!
def var(self):
return (abs(self-self.mean())**2).mean()
......@@ -307,11 +303,17 @@ def from_object(object, dtype=None, copy=True):
distaxis=object._distaxis)
def from_random(random_type, shape, dtype=np.float64, distaxis=0, **kwargs):
# This function draws all random numbers on all tasks, to produce the same
# array independent on the number of tasks
def from_random(random_type, shape, dtype=np.float64, **kwargs):
generator_function = getattr(Random, random_type)
# lshape = local_shape(shape, distaxis)
# return data_object(shape, generator_function(dtype=dtype, shape=lshape, **kwargs), distaxis=distaxis)
return from_global_data(generator_function(dtype=dtype, shape=shape, **kwargs), distaxis=distaxis)
for i in range(ntask):
lshape = list(shape)
lshape[0] = _shareSize(shape[0], ntask, i)
ldat = generator_function(dtype=dtype, shape=lshape, **kwargs)
if i == rank:
outdat = ldat
return from_local_data(shape, outdat, distaxis=0)
def local_data(arr):
......@@ -371,9 +373,9 @@ def redistribute(arr, dist=None, nodist=None):
dist = i
break
if arr._distaxis == -1: # just pick the proper subset
if arr._distaxis == -1: # all data available, just pick the proper subset
return from_global_data(arr._data, dist)
if dist == -1: # gather data
if dist == -1: # gather all data on all tasks
tmp = np.moveaxis(arr._data, arr._distaxis, 0)
slabsize = np.prod(tmp.shape[1:])*tmp.itemsize
sz = np.empty(ntask, dtype=np.int)
......@@ -382,7 +384,7 @@ def redistribute(arr, dist=None, nodist=None):
disp = np.empty(ntask, dtype=np.int)
disp[0] = 0
disp[1:] = np.cumsum(sz[:-1])
tmp = tmp.flatten()
tmp = np.require(tmp, requirements="C")
out = np.empty(arr.size, dtype=arr.dtype)
_comm.Allgatherv(tmp, [out, sz, disp, MPI.BYTE])
shp = np.array(arr._shape)
......@@ -391,45 +393,55 @@ def redistribute(arr, dist=None, nodist=None):
out = out.reshape(shp)
out = np.moveaxis(out, 0, arr._distaxis)
return from_global_data(out, distaxis=-1)
# real redistribution via Alltoallv
# temporary slow, but simple solution for comparison purposes:
# return redistribute(redistribute(arr,dist=-1),dist=dist)
tmp = np.moveaxis(arr._data, (dist, arr._distaxis), (0, 1))
tshape = tmp.shape
slabsize = np.prod(tmp.shape[2:])*tmp.itemsize
ssz0 = arr._data.size//arr.shape[dist]
ssz = np.empty(ntask, dtype=np.int)
rszall = arr.size//arr.shape[dist]*_shareSize(arr.shape[dist], ntask, rank)
rbuf = np.empty(rszall, dtype=arr.dtype)
rsz0 = rszall//arr.shape[arr._distaxis]
rsz = np.empty(ntask, dtype=np.int)
for i in range(ntask):
ssz[i] = _shareSize(arr.shape[dist], ntask, i)*tmp.shape[1]*slabsize
rsz[i] = _shareSize(arr.shape[dist], ntask, rank) * \
_shareSize(arr.shape[arr._distaxis], ntask, i) * \
slabsize
sdisp = np.empty(ntask, dtype=np.int)
rdisp = np.empty(ntask, dtype=np.int)
sdisp[0] = 0
rdisp[0] = 0
sdisp[1:] = np.cumsum(ssz[:-1])
rdisp[1:] = np.cumsum(rsz[:-1])
tmp = tmp.flatten()
out = np.empty(np.prod(local_shape(arr.shape, dist)), dtype=arr.dtype)
s_msg = [tmp, (ssz, sdisp), MPI.BYTE]
r_msg = [out, (rsz, rdisp), MPI.BYTE]
if dist == 0: # shortcut possible
sbuf = np.ascontiguousarray(arr._data)
for i in range(ntask):
lo, hi = _shareRange(arr.shape[dist], ntask, i)
ssz[i] = ssz0*(hi-lo)
rsz[i] = rsz0*_shareSize(arr.shape[arr._distaxis], ntask, i)
else:
sbuf = np.empty(arr._data.size, dtype=arr.dtype)
sslice = [slice(None)]*arr._data.ndim
ofs = 0
for i in range(ntask):
lo, hi = _shareRange(arr.shape[dist], ntask, i)
sslice[dist] = slice(lo,hi)
ssz[i] = ssz0*(hi-lo)
sbuf[ofs:ofs+ssz[i]] = arr._data[sslice].flat
ofs += ssz[i]
rsz[i] = rsz0*_shareSize(arr.shape[arr._distaxis], ntask, i)
ssz *= arr._data.itemsize
rsz *= arr._data.itemsize
sdisp = np.append (0, np.cumsum(ssz[:-1]))
rdisp = np.append (0, np.cumsum(rsz[:-1]))
s_msg = [sbuf, (ssz, sdisp), MPI.BYTE]
r_msg = [rbuf, (rsz, rdisp), MPI.BYTE]
_comm.Alltoallv(s_msg, r_msg)
out2 = np.empty([_shareSize(arr.shape[dist], ntask, rank),
arr.shape[arr._distaxis]] + list(tshape[2:]),
dtype=arr.dtype)
ofs = 0
for i in range(ntask):
lsize = rsz[i]//tmp.itemsize
lo, hi = _shareRange(arr.shape[arr._distaxis], ntask, i)
out2[slice(None), slice(lo, hi)] = \
out[ofs:ofs+lsize].reshape([_shareSize(arr.shape[dist], ntask, rank),_shareSize(arr.shape[arr._distaxis],ntask,i)]+list(tshape[2:]))
ofs += lsize
new_shape = [_shareSize(arr.shape[dist],ntask,rank), arr.shape[arr._distaxis]] +list(tshape[2:])
out2 = out2.reshape(new_shape)
out2 = np.moveaxis(out2, (0, 1), (dist, arr._distaxis))
return from_local_data(arr.shape, out2, dist)
if arr._distaxis == 0:
rbuf = rbuf.reshape(local_shape(arr.shape, dist))
arrnew = from_local_data(arr.shape, rbuf, distaxis=dist)
else:
arrnew = empty(arr.shape, dtype=arr.dtype, distaxis=dist)
rslice = [slice(None)]*arr._data.ndim
ofs = 0
for i in range(ntask):
lo, hi = _shareRange(arr.shape[arr._distaxis], ntask, i)
rslice[arr._distaxis] = slice(lo,hi)
sz = rsz[i]//arr._data.itemsize
arrnew._data[rslice].flat = rbuf[ofs:ofs+sz]
ofs += sz
return arrnew
def default_distaxis():
......
......@@ -18,6 +18,7 @@
from __future__ import print_function
from .iteration_controller import IterationController
from ... import dobj
class GradientNormController(IterationController):
......@@ -64,7 +65,8 @@ class GradientNormController(IterationController):
msg += " energy=" + str(energy.value)
msg += " gradnorm=" + str(energy.gradient_norm)
msg += " clvl=" + str(self._ccount)
print(msg)
if dobj.master:
print(msg)
# self.logger.info(msg)
# Are we done?
......
def probe_operation(soperation, domain, nprobes,
random_type, dtype):
for i in range(nprobes):
f = Field.from_random(random_type=random_type, domain=domain,
dtype=dtype)
tmp = operator(f)
if i==0:
mean = [0]*len(tmp)
var = [0]*len(tmp)
for i in range(len(tmp)):
mean[i] += tmp[i]
var[i] += tmp[i]**2
for i in range(len(tmp)):
mean[i] *= 1./nprobes
var[i] *= 1./nprobes
var[i] -= mean[i]**2
return mean, var
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment