Commit 2ce453c2 authored by Martin Reinecke's avatar Martin Reinecke
Browse files

fix FFT, cleanups

parent 4bed5bd5
Pipeline #21444 passed with stage
in 5 minutes and 6 seconds
......@@ -12,63 +12,66 @@ __all__ = ["ntask", "rank", "master", "local_shape", "data_object", "full",
_comm = MPI.COMM_WORLD
ntask = _comm.Get_size()
rank = _comm.Get_rank()
master = rank==0
master = rank == 0
def _shareSize(nwork, nshares, myshare):
nbase = nwork//nshares
return nbase if myshare>=nwork%nshares else nbase+1
return nbase if myshare >= nwork % nshares else nbase+1
def _shareRange(nwork, nshares, myshare):
nbase = nwork//nshares;
additional = nwork%nshares;
nbase = nwork//nshares
additional = nwork % nshares
lo = myshare*nbase + min(myshare, additional)
hi = lo+nbase+ (1 if myshare<additional else 0)
return lo,hi
hi = lo + nbase + (1 if myshare < additional else 0)
return lo, hi
def local_shape(shape, distaxis):
if len(shape)==0:
if len(shape) == 0:
distaxis = -1
if distaxis==-1:
if distaxis == -1:
return shape
shape2=list(shape)
shape2[distaxis]=_shareSize(shape[distaxis],ntask,rank)
shape2 = list(shape)
shape2[distaxis] = _shareSize(shape[distaxis], ntask, rank)
return tuple(shape2)
class data_object(object):
def __init__(self, shape, data, distaxis):
"""Must not be called directly by users"""
self._shape = tuple(shape)
if len(self._shape)==0:
if len(self._shape) == 0:
distaxis = -1
self._distaxis = distaxis
lshape = local_shape(self._shape, self._distaxis)
self._data = data
def _sanity_checks(self):
# check whether the distaxis is consistent
if self._distaxis<-1 or self._distaxis>=len(self._shape):
if self._distaxis < -1 or self._distaxis >= len(self._shape):
raise ValueError
itmp=np.array(self._distaxis)
otmp=np.empty(ntask,dtype=np.int)
_comm.Allgather(itmp,otmp)
if np.any(otmp!=self._distaxis):
itmp = np.array(self._distaxis)
otmp = np.empty(ntask, dtype=np.int)
_comm.Allgather(itmp, otmp)
if np.any(otmp != self._distaxis):
raise ValueError
# check whether the global shape is consistent
itmp=np.array(self._shape)
otmp=np.empty((ntask,len(self._shape)),dtype=np.int)
_comm.Allgather(itmp,otmp)
itmp = np.array(self._shape)
otmp = np.empty((ntask, len(self._shape)), dtype=np.int)
_comm.Allgather(itmp, otmp)
for i in range(ntask):
if np.any(otmp[i,:]!=self._shape):
if np.any(otmp[i, :] != self._shape):
raise ValueError
# check shape of local data
if self._distaxis<0:
if self._data.shape!=self._shape:
if self._distaxis < 0:
if self._data.shape != self._shape:
raise ValueError
else:
itmp=np.array(self._shape)
itmp[self._distaxis] = _shareSize(self._shape[self._distaxis],ntask,rank)
if np.any(self._data.shape!=itmp):
itmp = np.array(self._shape)
itmp[self._distaxis] = _shareSize(self._shape[self._distaxis],
ntask, rank)
if np.any(self._data.shape != itmp):
raise ValueError
@property
......@@ -93,52 +96,50 @@ class data_object(object):
def _contraction_helper(self, op, mpiop, axis):
if axis is not None:
if len(axis)==len(self._data.shape):
if len(axis) == len(self._data.shape):
axis = None
if axis is None:
res = np.array(getattr(self._data, op)())
if (self._distaxis==-1):
if (self._distaxis == -1):
return res[()]
res2 = np.empty((),dtype=res.dtype)
_comm.Allreduce(res,res2,mpiop)
res2 = np.empty((), dtype=res.dtype)
_comm.Allreduce(res, res2, mpiop)
return res2[()]
if self._distaxis in axis:
res = getattr(self._data, op)(axis=axis)
res2 = np.empty_like(res)
_comm.Allreduce(res,res2,mpiop)
_comm.Allreduce(res, res2, mpiop)
return from_global_data(res2, distaxis=0)
else:
# perform the contraction on the local data
res = getattr(self._data, op)(axis=axis)
if self._distaxis == -1:
return from_global_data(res,distaxis=0)
return from_global_data(res, distaxis=0)
shp = list(res.shape)
shift=0
shift = 0
for ax in axis:
if ax<self._distaxis:
shift+=1
if ax < self._distaxis:
shift += 1
shp[self._distaxis-shift] = self.shape[self._distaxis]
return from_local_data(shp, res, self._distaxis-shift)
# check if the result is scalar or if a result_field must be constr.
if np.isscalar(data):
return data
else:
return data_object(data)
def sum(self, axis=None):
return self._contraction_helper("sum", MPI.SUM, axis)
def min(self, axis=None):
return self._contraction_helper("min", MPI.MIN, axis)
def max(self, axis=None):
return self._contraction_helper("max", MPI.MAX, axis)
# FIXME: to be improved!
def mean(self):
return self.sum()/self.size
def std(self):
return np.sqrt(self.var())
def var(self):
return (abs(self-self.mean())**2).mean()
......@@ -157,7 +158,10 @@ class data_object(object):
b = other
tval = getattr(a, op)(b)
return self if tval is a else data_object(self._shape, tval, self._distaxis)
if tval is a:
return self
else:
return data_object(self._shape, tval, self._distaxis)
def __add__(self, other):
return self._binary_helper(other, op='__add__')
......@@ -217,10 +221,10 @@ class data_object(object):
return self._binary_helper(other, op='__ne__')
def __neg__(self):
return data_object(self._shape,-self._data,self._distaxis)
return data_object(self._shape, -self._data, self._distaxis)
def __abs__(self):
return data_object(self._shape,np.abs(self._data),self._distaxis)
return data_object(self._shape, np.abs(self._data), self._distaxis)
def all(self):
return self._data.all()
......@@ -230,19 +234,23 @@ class data_object(object):
def full(shape, fill_value, dtype=None, distaxis=0):
return data_object(shape, np.full(local_shape(shape, distaxis), fill_value, dtype), distaxis)
return data_object(shape, np.full(local_shape(shape, distaxis),
fill_value, dtype), distaxis)
def empty(shape, dtype=None, distaxis=0):
return data_object(shape, np.empty(local_shape(shape, distaxis), dtype), distaxis)
return data_object(shape, np.empty(local_shape(shape, distaxis),
dtype), distaxis)
def zeros(shape, dtype=None, distaxis=0):
return data_object(shape, np.zeros(local_shape(shape, distaxis), dtype), distaxis)
return data_object(shape, np.zeros(local_shape(shape, distaxis), dtype),
distaxis)
def ones(shape, dtype=None, distaxis=0):
return data_object(shape, np.ones(local_shape(shape, distaxis), dtype), distaxis)
return data_object(shape, np.ones(local_shape(shape, distaxis), dtype),
distaxis)
def empty_like(a, dtype=None):
......@@ -251,8 +259,8 @@ def empty_like(a, dtype=None):
def vdot(a, b):
tmp = np.array(np.vdot(a._data, b._data))
res = np.empty((),dtype=tmp.dtype)
_comm.Allreduce(tmp,res,MPI.SUM)
res = np.empty((), dtype=tmp.dtype)
_comm.Allreduce(tmp, res, MPI.SUM)
return res[()]
......@@ -261,7 +269,7 @@ def _math_helper(x, function, out):
function(x._data, out=out._data)
return out
else:
return data_object(x.shape,function(x._data),x._distaxis)
return data_object(x.shape, function(x._data), x._distaxis)
def abs(a, out=None):
......@@ -288,28 +296,31 @@ def bincount(x, weights=None, minlength=None):
def from_object(object, dtype=None, copy=True):
return data_object(object._shape, np.array(object._data, dtype=dtype, copy=copy), distaxis=object._distaxis)
return data_object(object._shape, np.array(object._data, dtype=dtype,
copy=copy),
distaxis=object._distaxis)
def from_random(random_type, shape, dtype=np.float64, distaxis=0, **kwargs):
generator_function = getattr(Random, random_type)
#lshape = local_shape(shape, distaxis)
#return data_object(shape, generator_function(dtype=dtype, shape=lshape, **kwargs), distaxis=distaxis)
# lshape = local_shape(shape, distaxis)
# return data_object(shape, generator_function(dtype=dtype, shape=lshape, **kwargs), distaxis=distaxis)
return from_global_data(generator_function(dtype=dtype, shape=shape, **kwargs), distaxis=distaxis)
def local_data(arr):
return arr._data
def ibegin(arr):
res = [0] * arr._data.ndim
res[arr._distaxis] = _shareRange(arr._shape[arr._distaxis],ntask,rank)[0]
res[arr._distaxis] = _shareRange(arr._shape[arr._distaxis], ntask, rank)[0]
return tuple(res)
def np_allreduce_sum(arr):
res = np.empty_like(arr)
_comm.Allreduce(arr,res,MPI.SUM)
_comm.Allreduce(arr, res, MPI.SUM)
return res
......@@ -317,97 +328,102 @@ def distaxis(arr):
return arr._distaxis
def from_local_data (shape, arr, distaxis):
def from_local_data(shape, arr, distaxis):
return data_object(shape, arr, distaxis)
def from_global_data (arr, distaxis=0):
if distaxis==-1:
def from_global_data(arr, distaxis=0):
if distaxis == -1:
return data_object(arr.shape, arr, distaxis)
lo, hi = _shareRange(arr.shape[distaxis],ntask,rank)
lo, hi = _shareRange(arr.shape[distaxis], ntask, rank)
sl = [slice(None)]*len(arr.shape)
sl[distaxis]=slice(lo,hi)
sl[distaxis] = slice(lo, hi)
return data_object(arr.shape, arr[sl], distaxis)
def to_global_data (arr):
if arr._distaxis==-1:
def to_global_data(arr):
if arr._distaxis == -1:
return arr._data
tmp = redistribute(arr, dist=-1)
return tmp._data
def redistribute (arr, dist=None, nodist=None):
def redistribute(arr, dist=None, nodist=None):
if dist is not None:
if nodist is not None:
raise ValueError
if dist==arr._distaxis:
if dist == arr._distaxis:
return arr
else:
if nodist is None:
raise ValueError
if arr._distaxis not in nodist:
return arr
dist=-1
dist = -1
for i in range(len(arr.shape)):
if i not in nodist:
dist=i
dist = i
break
if arr._distaxis==-1: # just pick the proper subset
if arr._distaxis == -1: # just pick the proper subset
return from_global_data(arr._data, dist)
if dist==-1: # gather data
if dist == -1: # gather data
tmp = np.moveaxis(arr._data, arr._distaxis, 0)
slabsize=np.prod(tmp.shape[1:])*tmp.itemsize
sz=np.empty(ntask,dtype=np.int)
slabsize = np.prod(tmp.shape[1:])*tmp.itemsize
sz = np.empty(ntask, dtype=np.int)
for i in range(ntask):
sz[i]=slabsize*_shareSize(arr.shape[arr._distaxis],ntask,i)
disp=np.empty(ntask,dtype=np.int)
disp[0]=0
disp[1:]=np.cumsum(sz[:-1])
tmp=tmp.flatten()
out = np.empty(arr.size,dtype=arr.dtype)
_comm.Allgatherv(tmp,[out,sz,disp,MPI.BYTE])
sz[i] = slabsize*_shareSize(arr.shape[arr._distaxis], ntask, i)
disp = np.empty(ntask, dtype=np.int)
disp[0] = 0
disp[1:] = np.cumsum(sz[:-1])
tmp = tmp.flatten()
out = np.empty(arr.size, dtype=arr.dtype)
_comm.Allgatherv(tmp, [out, sz, disp, MPI.BYTE])
shp = np.array(arr._shape)
shp[1:arr._distaxis+1] = shp[0:arr._distaxis]
shp[0] = arr.shape[arr._distaxis]
out = out.reshape(shp)
out = np.moveaxis(out, 0, arr._distaxis)
return from_global_data (out, distaxis=-1)
return from_global_data(out, distaxis=-1)
# real redistribution via Alltoallv
# temporary slow, but simple solution for comparison purposes:
#return redistribute(redistribute(arr,dist=-1),dist=dist)
# return redistribute(redistribute(arr,dist=-1),dist=dist)
tmp = np.moveaxis(arr._data, (dist, arr._distaxis), (0, 1))
tshape = tmp.shape
slabsize=np.prod(tmp.shape[2:])*tmp.itemsize
ssz=np.empty(ntask,dtype=np.int)
rsz=np.empty(ntask,dtype=np.int)
slabsize = np.prod(tmp.shape[2:])*tmp.itemsize
ssz = np.empty(ntask, dtype=np.int)
rsz = np.empty(ntask, dtype=np.int)
for i in range(ntask):
ssz[i]=_shareSize(arr.shape[dist],ntask,i)*tmp.shape[1]*slabsize
rsz[i]=_shareSize(arr.shape[dist],ntask,rank)*_shareSize(arr.shape[arr._distaxis],ntask,i)*slabsize
sdisp=np.empty(ntask,dtype=np.int)
rdisp=np.empty(ntask,dtype=np.int)
sdisp[0]=0
rdisp[0]=0
sdisp[1:]=np.cumsum(ssz[:-1])
rdisp[1:]=np.cumsum(rsz[:-1])
tmp=tmp.flatten()
out = np.empty(np.prod(local_shape(arr.shape,dist)),dtype=arr.dtype)
ssz[i] = _shareSize(arr.shape[dist], ntask, i)*tmp.shape[1]*slabsize
rsz[i] = _shareSize(arr.shape[dist], ntask, rank) * \
_shareSize(arr.shape[arr._distaxis], ntask, i) * \
slabsize
sdisp = np.empty(ntask, dtype=np.int)
rdisp = np.empty(ntask, dtype=np.int)
sdisp[0] = 0
rdisp[0] = 0
sdisp[1:] = np.cumsum(ssz[:-1])
rdisp[1:] = np.cumsum(rsz[:-1])
tmp = tmp.flatten()
out = np.empty(np.prod(local_shape(arr.shape, dist)), dtype=arr.dtype)
s_msg = [tmp, (ssz, sdisp), MPI.BYTE]
r_msg = [out, (rsz, rdisp), MPI.BYTE]
_comm.Alltoallv(s_msg, r_msg)
out2 = np.empty([_shareSize(arr.shape[dist],ntask,rank), arr.shape[arr._distaxis]] +list(tshape[2:]), dtype=arr.dtype)
ofs=0
out2 = np.empty([_shareSize(arr.shape[dist], ntask, rank),
arr.shape[arr._distaxis]] + list(tshape[2:]),
dtype=arr.dtype)
ofs = 0
for i in range(ntask):
lsize = rsz[i]//tmp.itemsize
lo,hi = _shareRange(arr.shape[arr._distaxis],ntask,i)
out2[slice(None),slice(lo,hi)] = out[ofs:ofs+lsize].reshape([_shareSize(arr.shape[dist],ntask,rank),_shareSize(arr.shape[arr._distaxis],ntask,i)]+list(tshape[2:]))
lo, hi = _shareRange(arr.shape[arr._distaxis], ntask, i)
out2[slice(None), slice(lo, hi)] = \
out[ofs:ofs+lsize].reshape([_shareSize(arr.shape[dist], ntask, rank),_shareSize(arr.shape[arr._distaxis],ntask,i)]+list(tshape[2:]))
ofs += lsize
new_shape = [_shareSize(arr.shape[dist],ntask,rank), arr.shape[arr._distaxis]] +list(tshape[2:])
out2=out2.reshape(new_shape)
out2 = out2.reshape(new_shape)
out2 = np.moveaxis(out2, (0, 1), (dist, arr._distaxis))
return from_local_data (arr.shape, out2, dist)
return from_local_data(arr.shape, out2, dist)
def default_distaxis():
......
......@@ -6,17 +6,6 @@ class data_object(object):
def __init__(self, npdata):
self._data = np.asarray(npdata)
# FIXME: subscripting support will most likely go away
def __getitem__(self, key):
res = self._data[key]
return res if np.isscalar(res) else data_object(res)
def __setitem__(self, key, value):
if isinstance(value, data_object):
self._data[key] = value._data
else:
self._data[key] = value
@property
def dtype(self):
return self._data.dtype
......@@ -39,7 +28,7 @@ class data_object(object):
def _contraction_helper(self, op, axis):
if axis is not None:
if len(axis)==len(self._data.shape):
if len(axis) == len(self._data.shape):
axis = None
if axis is None:
return getattr(self._data, op)()
......@@ -221,21 +210,21 @@ def distaxis(arr):
return -1
def from_local_data (shape, arr, distaxis):
if shape!=arr.shape:
def from_local_data(shape, arr, distaxis):
if shape != arr.shape:
raise ValueError
return data_object(arr)
def from_global_data (arr, distaxis=-1):
def from_global_data(arr, distaxis=-1):
return data_object(arr)
def to_global_data (arr):
def to_global_data(arr):
return arr._data
def redistribute (arr, dist=None, nodist=None):
def redistribute(arr, dist=None, nodist=None):
return arr
......
......@@ -2,9 +2,21 @@
import numpy as np
from numpy import ndarray as data_object
from numpy import full, empty, sqrt, ones, zeros, vdot, abs, bincount, exp, log
from numpy import full, empty, empty_like, sqrt, ones, zeros, vdot, abs, \
bincount, exp, log
from .random import Random
__all__ = ["ntask", "rank", "master", "local_shape", "data_object", "full",
"empty", "zeros", "ones", "empty_like", "vdot", "abs", "exp",
"log", "sqrt", "bincount", "from_object", "from_random",
"local_data", "ibegin", "np_allreduce_sum", "distaxis",
"from_local_data", "from_global_data", "to_global_data",
"redistribute", "default_distaxis"]
ntask = 1
rank = 0
master = True
def from_object(object, dtype=None, copy=True):
return np.array(object, dtype=dtype, copy=copy)
......@@ -31,21 +43,21 @@ def distaxis(arr):
return -1
def from_local_data (shape, arr, distaxis):
if shape!=arr.shape:
def from_local_data(shape, arr, distaxis):
if shape != arr.shape:
raise ValueError
return arr
def from_global_data (arr, distaxis=-1):
def from_global_data(arr, distaxis=-1):
return arr
def to_global_data (arr):
def to_global_data(arr):
return arr
def redistribute (arr, dist=None, nodist=None):
def redistribute(arr, dist=None, nodist=None):
return arr
......
from .data_objects.distributed_do import *
#from .data_objects.my_own_do import *
#from .data_objects.numpy_do import *
#from __future__ import print_function
try:
from mpi4py import MPI
if MPI.COMM_WORLD.Get_size()==1:
#print ("MPI found, but only with one task, using numpy_do...")
from .data_objects.numpy_do import *
else:
#if MPI.COMM_WORLD.Get_rank() == 0:
# print ("MPI with multiple tasks found, using distributed_do...")
from .data_objects.distributed_do import *
except ImportError:
#print ("MPI not found, using numpy_do...")
from .data_objects.numpy_do import *
......@@ -58,6 +58,7 @@ class RGRGTransformation(Transformation):
x : Field
The field to be transformed
"""
from pyfftw.interfaces.numpy_fft import fftn
axes = x.domain.axes[self.space]
p2h = x.domain == self.pdom
tdom = self.hdom if p2h else self.pdom
......@@ -65,16 +66,25 @@ class RGRGTransformation(Transformation):
tmpax = (dobj.distaxis(x.val),)
tmp = dobj.redistribute(x.val, nodist=tmpax)
ldat = dobj.local_data(tmp)
tmp = dobj.from_local_data(tmp.shape,hartley(ldat,tmpax),distaxis=dobj.distaxis(tmp))
tmp = dobj.redistribute(tmp, dist=tmpax[0])
tmpax = tuple (i for i in axes if i not in tmpax)
if len(tmpax) > 0:
ldat = fftn(ldat, axes=tmpax)
if len(axes) ==1: # we are done
ldat = ldat.real+ldat.imag
tmp = dobj.from_local_data(tmp.shape,ldat,distaxis=dobj.distaxis(tmp))
tmp = dobj.redistribute(tmp, dist=tmpax[0])
else:
tmp = dobj.from_local_data(tmp.shape,ldat,distaxis=dobj.distaxis(tmp))
tmp = dobj.redistribute(tmp, dist=tmpax[0])
tmpax = tuple (i for i in axes if i not in tmpax)
ldat = dobj.local_data(tmp)
tmp = dobj.from_local_data(tmp.shape,hartley(ldat,tmpax),distaxis=dobj.distaxis(tmp))
ldat = fftn(ldat, axes=tmpax)
ldat = ldat.real+ldat.imag
tmp = dobj.from_local_data(tmp.shape,ldat,distaxis=dobj.distaxis(tmp))
Tval = Field(tdom, tmp)
else:
ldat = dobj.local_data(x.val)
tmp = dobj.from_local_data(x.val.shape,hartley(ldat,axes),distaxis=dobj.distaxis(x.val))
ldat = fftn(ldat,axes=axes)
ldat = ldat.real+ldat.imag
tmp = dobj.from_local_data(x.val.shape, ldat, distaxis=dobj.distaxis(x.val))
Tval = Field(tdom,tmp)
fct = self.fct_p2h if p2h else self.fct_h2p
if fct != 1:
......
def probe_operation(soperation, domain, nprobes,
random_type, dtype):
for i in range(nprobes):
f = Field.from_random(random_type=random_type, domain=domain,
dtype=dtype)
tmp = operator(f)