Commit c09199ba authored by theos's avatar theos
Browse files

Added axis functionality to d2o methods.

-> Therefore modified the MPI_dummy.py

Fixed bincount method of _slicing_distributor (now uses allreduce instead of allgather+sum)
parent 680d82c0
Pipeline #1698 skipped
# -*- coding: utf-8 -*-
import numbers
import numpy as np
from nifty import about
def cast_axis_to_tuple(axis):
if axis is None:
return None
try:
axis = tuple([int(item) for item in axis])
except(TypeError):
if isinstance(axis, numbers.Number):
axis = (int(axis), )
else:
raise TypeError(about._errors.cstring(
"ERROR: Could not convert axis-input to tuple of ints"))
return axis
......@@ -8,6 +8,7 @@ from nifty.keepers import about,\
global_dependency_injector as gdi
from d2o_librarian import d2o_librarian
from cast_axis_to_tuple import cast_axis_to_tuple
from strategies import STRATEGIES
......@@ -1025,10 +1026,12 @@ class distributed_data_object(object):
"""
other = self.distributor.extract_local_data(other)
local_vdot = np.array([np.vdot(self.get_local_data(), other)])
local_vdot = np.array([np.vdot(self.get_local_data(copy=False),
other)])
global_vdot = np.empty_like(local_vdot)
self.distributor._Allreduce_sum(sendbuf=local_vdot,
recvbuf=global_vdot)
self.distributor._Allreduce_helper(sendbuf=local_vdot,
recvbuf=global_vdot,
op=MPI.SUM)
# local_vdot = np.vdot(self.get_local_data(), other)
# local_vdot_list = self.distributor._allgather(local_vdot)
......@@ -1044,199 +1047,135 @@ class distributed_data_object(object):
""" x.__setitem__(i, y) <==> x[i]=y <==> x.set_data(y, i) """
self.set_data(data, key)
def _contraction_helper(self, function, **kwargs):
""" Used for various operations like min, max, sum, prod, mean,...
_builtin_helper checks whether the local node's data array is empty,
then applies the given function on the local data, collects the
results, and then applies the function to this with the keyword axis=0.
Parameters
----------
function : callable
This object will be applied to the local data array.
**kwargs
Additional keyword arguments will be passed to the first `function`
call.
Returns
-------
out :
The return object of `function`.
Raises
------
ValueError
Raised if the d2o's shape equals (0,).
"""
if self.shape == (0,):
raise ValueError("ERROR: Zero-size array to reduction operation " +
"which has no identity")
if self.data.size == 0:
local = 0
include = False
else:
local = function(self.data, **kwargs)
include = True
local_list = self.distributor._allgather(local)
local_list = np.array(local_list, dtype=np.dtype(local_list[0]))
include_list = np.array(self.distributor._allgather(include))
work_list = local_list[include_list]
if work_list.shape[0] == 0:
raise ValueError("ERROR: Zero-size array to reduction operation " +
"which has no identity")
else:
result = function(work_list, axis=0)
return result
def min(self, **kwargs):
def min(self, axis=None, **kwargs):
""" x.min() <==> x.amin() """
return self.amin(**kwargs)
return self.amin(axis=axis, **kwargs)
def amin(self, **kwargs):
def amin(self, axis=None, **kwargs):
""" Returns the minimum of an array.
See Also
--------
numpy.amin
"""
return self.distributor._contraction_helper(self,
np.amin,
axis=axis,
**kwargs)
return self._contraction_helper(np.amin, **kwargs)
def nanmin(self, **kwargs):
def nanmin(self, axis=None, **kwargs):
""" Returns the minimum of an array ignoring all NaNs.
See Also
--------
numpy.nanmin
"""
return self.distributor._contraction_helper(self,
np.nanmin,
axis=axis,
**kwargs)
return self._contraction_helper(np.nanmin, **kwargs)
def max(self, **kwargs):
def max(self, axis=None, **kwargs):
""" x.max() <==> x.amax() """
return self.amax(**kwargs)
return self.amax(axis=axis, **kwargs)
def amax(self, **kwargs):
def amax(self, axis=None, **kwargs):
""" Returns the maximum of an array.
See Also
--------
numpy.amax
"""
return self.distributor._contraction_helper(self,
np.amax,
axis=axis,
**kwargs)
return self._contraction_helper(np.amax, **kwargs)
def nanmax(self, **kwargs):
def nanmax(self, axis=None, **kwargs):
""" Returns the maximum of an array ignoring all NaNs.
See Also
--------
numpy.nanmax
"""
return self.distributor._contraction_helper(self,
np.nanmax,
axis=axis,
**kwargs)
return self._contraction_helper(np.nanmax, **kwargs)
def sum(self, **kwargs):
def sum(self, axis=None, **kwargs):
""" Sums the array elements.
See Also
--------
numpy.sum
"""
return self.distributor._contraction_helper(self,
np.sum,
axis=axis,
**kwargs)
if self.shape == (0,):
return self.dtype.type(0)
return self._contraction_helper(np.sum, **kwargs)
def prod(self, **kwargs):
def prod(self, axis=None, **kwargs):
""" Multiplies the array elements.
See Also
--------
numpy.prod
"""
if self.shape == (0,):
return self.dtype.type(1)
return self._contraction_helper(np.prod, **kwargs)
def mean(self, power=1):
""" Returns the mean of the d2o's elements.
Parameters
----------
power : scalar
Used for point-wise exponentiation of the array's elements before
computing the mean: mean(data**power)
Returns
-------
mean : scalar
The (pre-powered) mean-value of the array.
"""
if self.shape == (0,):
return np.mean(np.array([], dtype=self.dtype))
# compute the local means and the weights for the mean-mean.
if np.prod(self.data.shape) == 0:
local_mean = 0
include = False
return self.distributor._contraction_helper(self,
np.prod,
axis=axis,
**kwargs)
def all(self, axis=None, **kwargs):
return self.distributor._contraction_helper(self,
np.all,
axis=axis,
**kwargs)
def any(self, axis=None, **kwargs):
return self.distributor._contraction_helper(self,
np.any,
axis=axis,
**kwargs)
def mean(self, axis=None, **kwargs):
# infer, which axes will be collapsed
axis = cast_axis_to_tuple(axis)
if axis is None:
collapsed_shapes = self.shape
else:
if power != 1:
local_mean = np.mean(self.data**power)
else:
local_mean = np.mean(self.data)
include = True
local_weight = np.prod(self.data.shape)
# collect the local means and cast the result to a ndarray
local_mean_list = self.distributor._allgather(local_mean)
local_weight_list = self.distributor._allgather(local_weight)
local_mean_list = np.array(local_mean_list,
dtype=np.dtype(local_mean_list[0]))
local_weight_list = np.array(local_weight_list)
# extract the parts from the non-empty nodes
include_list = np.array(self.distributor._allgather(include))
work_mean_list = local_mean_list[include_list]
work_weight_list = local_weight_list[include_list]
if work_mean_list.shape[0] == 0:
raise ValueError("ERROR: Mean of empty slice.")
collapsed_shapes = [self.shape[i] for i in axis]
# np.prod(()) returns 1.0 which is needed here
collapsed_dimensions = np.prod(collapsed_shapes)
result = self.sum(axis=axis, **kwargs)
# the following may produce division by 0 warnings, but np.mean also
# does. So, this is by purpose.
if np.issubdtype(result.dtype, np.integer) or \
result.dtype == np.dtype('bool'):
result = result/np.float(collapsed_dimensions)
else:
# compute the denominator for the weighted mean-mean
global_weight = np.sum(work_weight_list)
# compute the numerator
numerator = np.sum(work_mean_list * work_weight_list)
global_mean = numerator / global_weight
return global_mean
def var(self):
result /= collapsed_dimensions
return result
def var(self, axis=None):
""" Returns the variance of the d2o's elements.
Internally the formula <x**2> - <x>**2 is used.
"""
if self.shape == (0,):
return np.var(np.array([], dtype=self.dtype))
if issubclass(self.dtype.type, np.complexfloating):
mean_of_the_square = abs(self**2).mean()
square_of_the_mean = abs(self.mean())**2
mean_of_the_square = abs(self**2).mean(axis=axis)
square_of_the_mean = abs(self.mean(axis=axis))**2
else:
mean_of_the_square = self.mean(power=2)
square_of_the_mean = self.mean()**2
mean_of_the_square = (self**2).mean(axis=axis)
square_of_the_mean = self.mean(axis=axis)**2
return mean_of_the_square - square_of_the_mean
def std(self):
def std(self, axis=None):
""" Returns the standard deviation of the d2o's elements. """
if self.shape == (0,):
return np.std(np.array([], dtype=self.dtype))
if self.shape == (0,):
return np.nan
return np.sqrt(self.var())
return np.sqrt(self.var(axis=axis))
def argmin(self):
""" Returns the (flat) index of the d2o's smallest value.
......@@ -1245,10 +1184,10 @@ class distributed_data_object(object):
argmax, argmin_nonflat, argmax_nonflat
"""
if self.shape == (0,):
if 0 in self.shape:
raise ValueError(
"ERROR: attempt to get argmin of an empty object")
if np.prod(self.data.shape) == 0:
if 0 in self.local_shape:
local_argmin = np.nan
local_argmin_value = np.nan
globalized_local_argmin = np.nan
......@@ -1276,10 +1215,10 @@ class distributed_data_object(object):
argmin, argmin_nonflat, argmax_nonflat
"""
if self.shape == (0,):
if 0 in self.shape:
raise ValueError(
"ERROR: attempt to get argmax of an empty object")
if np.prod(self.data.shape) == 0:
if 0 in self.local_shape:
local_argmax = np.nan
local_argmax_value = np.nan
globalized_local_argmax = np.nan
......@@ -1321,8 +1260,8 @@ class distributed_data_object(object):
""" Returns the element-wise complex conjugate. """
temp_d2o = self.copy_empty()
temp_data = np.conj(self.get_local_data())
temp_d2o.set_local_data(temp_data)
temp_data = np.conj(self.get_local_data(copy=False))
temp_d2o.set_local_data(temp_data, copy=False)
temp_d2o.hermitian = self.hermitian
return temp_d2o
......@@ -1334,7 +1273,7 @@ class distributed_data_object(object):
return self.conjugate()
def median(self):
def median(self, axis=None, **kwargs):
""" Returns the d2o element's median.
The median is computed by collecting the full d2o data and then passing
......@@ -1344,7 +1283,7 @@ class distributed_data_object(object):
about.warnings.cprint(
"WARNING: The current implementation of median is very expensive!")
median = np.median(self.get_full_data())
median = np.median(self.get_full_data(), axis=axis, **kwargs)
return median
def _is_helper(self, function):
......@@ -1363,7 +1302,7 @@ class distributed_data_object(object):
"""
temp_d2o = self.copy_empty(dtype=np.dtype('bool'))
temp_d2o.set_local_data(function(self.data))
temp_d2o.set_local_data(function(self.data), copy=False)
return temp_d2o
def iscomplex(self):
......@@ -1431,23 +1370,10 @@ class distributed_data_object(object):
isfinite
"""
temp_d2o = self.copy_empty()
temp_d2o.set_local_data(np.nan_to_num(self.data))
temp_d2o.set_local_data(np.nan_to_num(self.get_local_data(copy=False)),
copy=False)
return temp_d2o
def all(self):
""" Returns True if all elements of an array evaluate to True. """
local_all = np.all(self.get_local_data())
global_all = self.distributor._allgather(local_all)
return np.all(global_all)
def any(self):
""" Returns True if any element of an array evaluate to True. """
local_any = np.any(self.get_local_data())
global_any = self.distributor._allgather(local_any)
return np.any(global_any)
def unique(self):
""" Returns a `numpy.ndarray` holding the d2o's unique elements. """
......@@ -1497,22 +1423,11 @@ class distributed_data_object(object):
else:
local_weights = None
local_counts = np.bincount(self.get_local_data(copy=False).flatten(),
weights=local_weights,
minlength=minlength)
if self.distribution_strategy == 'not':
return local_counts
else:
counts = np.empty_like(local_counts)
# self.distributor._Allreduce_sum(local_counts, counts)
# Potentially faster, but buggy. <- If np.binbount yields
# inconsistent datatypes because of empty arrays on certain nodes,
# the Allreduce produces non-sense results.
list_of_counts = self.distributor._allgather(local_counts)
counts = np.sum(list_of_counts, axis=0)
return counts
local_data = self.get_local_data(copy=False).flatten()
counts = self.distributor.bincount(local_data=local_data,
local_weights=local_weights,
minlength=minlength)
return counts
def where(self):
""" Return the indices where `self` is True.
......
# -*- coding: utf-8 -*-
import numbers
import numpy as np
from nifty.keepers import about,\
......@@ -12,6 +14,8 @@ from d2o_iter import d2o_slicing_iter,\
d2o_not_iter
from d2o_librarian import d2o_librarian
from dtype_converter import dtype_converter
from cast_axis_to_tuple import cast_axis_to_tuple
from translate_to_mpi_operator import op_translate_dict
from strategies import STRATEGIES
......@@ -278,31 +282,6 @@ def _infer_key_type(key):
class distributor(object):
#def inject(self, data, to_slices, data_update, from_slices,
# **kwargs):
# # check if to_key and from_key are completely built of slices
# if not np.all(
# np.vectorize(lambda x: isinstance(x, slice))(to_slices)):
# raise ValueError(about._errors.cstring(
# "ERROR: The to_slices argument must be a list or " +
# "tuple of slices!")
# )
# if not np.all(
# np.vectorize(lambda x: isinstance(x, slice))(from_slices)):
# raise ValueError(about._errors.cstring(
# "ERROR: The from_slices argument must be a list or " +
# "tuple of slices!")
# )
# to_slices = tuple(to_slices)
# from_slices = tuple(from_slices)
# self.disperse_data(data=data,
# to_key=to_slices,
# data_update=data_update,
# from_key=from_slices,
# **kwargs)
def disperse_data(self, data, to_key, data_update, from_key=None,
local_keys=False, copy=True, **kwargs):
# Check which keys we got:
......@@ -464,7 +443,7 @@ class _slicing_distributor(distributor):
copy=copy)
elif self.distribution_strategy in ['freeform']:
if isinstance(global_data, distributed_data_object):
local_data = global_data.get_local_data(copy=copy)
local_data = global_data.get_local_data()
elif np.isscalar(local_data):
temp_local_data = np.empty(self.local_shape,
dtype=self.dtype)
......@@ -510,14 +489,64 @@ class _slicing_distributor(distributor):
gathered_things = comm.allgather(thing)
return gathered_things
def _Allreduce_sum(self, sendbuf, recvbuf):
def _Allreduce_helper(self, sendbuf, recvbuf, op):
send_dtype = self._my_dtype_converter.to_mpi(sendbuf.dtype)
recv_dtype = self._my_dtype_converter.to_mpi(recvbuf.dtype)
self.comm.Allreduce([sendbuf, send_dtype],
[recvbuf, recv_dtype],
op=MPI.SUM)
op=op)
return recvbuf
def _contraction_helper(self, parent, function, axis=None, **kwargs):
if axis == ():
return parent.copy()
old_shape = parent.shape
axis = cast_axis_to_tuple(axis)
if axis is None:
new_shape = ()
else:
new_shape = tuple([old_shape[i] for i in xrange(len(old_shape))
if i not in axis])
# do the contraction on the node's local data
local_data = parent.data
contracted_local_data = function(local_data, axis=axis, **kwargs)
new_dtype = contracted_local_data.dtype
# check if additional contraction along the first axis must be done
if axis is None or 0 in axis:
(mpi_op, bufferQ) = op_translate_dict[function]
contracted_local_data = self.comm.allreduce(contracted_local_data,
op=mpi_op)
new_dist_strategy = 'not'
else:
new_dist_strategy = parent.distribution_strategy
if new_shape == ():
result = contracted_local_data
else:
# try to store the result in a distributed_data_object with the
# distribution_strategy as parent
result = parent.copy_empty(global_shape=new_shape,
dtype=new_dtype,
distribution_strategy=new_dist_strategy)
# However, there are cases where the contracted data does not any
# longer follow the prior distribution scheme.
# Example: FFTW distribution on 4 MPI processes
# Contracting (4, 4) to (4,).
# (4, 4) was distributed (1, 4)...(1, 4)
# (4, ) is not distributed like (1,)...(1,) but like (2,)(2,)()()!
if result.local_shape != contracted_local_data.shape:
result = parent.copy_empty(
local_shape=contracted_local_data.shape,
dtype=new_dtype,
distribution_strategy='freeform')
result.set_local_data(contracted_local_data, copy=False)
return result
def distribute_data(self, data=None, alias=None,
path=None, copy=True, **kwargs):
'''
......@@ -1053,8 +1082,8 @@ class _slicing_distributor(distributor):
localized_stop,
first_step),) + slice_objects[1:]
# if directly_to_np_Q == False:
local_result = data[local_slice]
if (first_step is not None) and (first_step < 0):
local_result = self._invert_mpi_data_ordering(local_result)
......@@ -1327,6 +1356,17 @@ class _slicing_distributor(distributor):
local_where)
return global_where
def bincount(self, local_data, local_weights, minlength):
local_counts = np.bincount(local_data,
weights=local_weights,
minlength=minlength)
global_counts = np.empty_like(local_counts)
self._Allreduce_helper(local_counts,
global_counts,
MPI.SUM)
return global_counts
def cumsum(self, data, axis):
# compute the local np.cumsum
local_cumsum = np.cumsum(data, axis=axis)
......@@ -1414,7 +1454,7 @@ class _slicing_distributor(distributor):
if sliceified[0]:
# Case 1: The in_data d2o has more than one dimension
if len(in_data.shape) > 1 and \
in_data.distribution_strategy in STRATEGIES['slicing']:
in_data.distribution_strategy in STRATEGIES['slicing']:
local_in_data = in_data.get_local_data(copy=False)
local_has_data = (np.prod(local_in_data.shape) != 0)
local_has_data_list = np.array(
......@@ -1702,10 +1742,25 @@ class _not_distributor(distributor):
def _allgather(self, thing):
return [thing, ]
def _Allreduce_sum(self, sendbuf, recvbuf):
def _Allreduce_helper(self, sendbuf, recvbuf, op):
recvbuf[:] = sendbuf
return recvbuf
def _contraction_helper(self, parent, function, axis=None, **kwargs):
if axis == ():
return parent.copy()
local_result = function(parent.data, axis=axis, **kwargs)
if isinstance(local_result, np.ndarray):
result_object = parent.copy_empty(global_shape=local_result.shape,
dtype=local_result.dtype)
result_object.set_local_data(local_result, copy=False)
else:
result_object = local_result