diff --git a/__init__.py b/__init__.py index d6e72ebd69cb7703d935ee07f0b2a3386901b1c6..1defacdef6d9ccf4fb2e825a5ec2a75d1ed81141 100644 --- a/__init__.py +++ b/__init__.py @@ -31,7 +31,7 @@ from nifty_core import space,\ nested_space,\ field -from nifty_mpi_data import distributed_data_object +from nifty_mpi_data import distributed_data_object, d2o_librarian from nifty_power import * from nifty_random import random from nifty_simple_math import * diff --git a/nifty_mpi_data.py b/nifty_mpi_data.py index 630ad67e6c5990ba56cfe96c1c327601ed29f020..6585fa389f88738c41d827dcff365b897150ae58 100644 --- a/nifty_mpi_data.py +++ b/nifty_mpi_data.py @@ -28,6 +28,7 @@ FOUND = {} import numpy as np from nifty_about import about +from weakref import WeakValueDictionary as weakdict try: from mpi4py import MPI @@ -51,6 +52,9 @@ except(ImportError): FOUND['h5py_parallel'] = False + +COMM = MPI.COMM_WORLD + class distributed_data_object(object): """ @@ -167,8 +171,15 @@ class distributed_data_object(object): ## global_shape = global_data.shape ## TODO: allow init with empty shape + + if isinstance(global_data, tuple) or isinstance(global_data, list): + global_data = np.array(global_data, copy=False) + if isinstance(local_data, tuple) or isinstance(local_data, list): + local_data = np.array(local_data, copy=False) + self.distributor = distributor_factory.get_distributor( distribution_strategy = distribution_strategy, + comm = comm, global_data = global_data, global_shape = global_shape, local_data = local_data, @@ -176,7 +187,6 @@ class distributed_data_object(object): alias = alias, path = path, dtype = dtype, - comm = comm, **kwargs) self.distribution_strategy = distribution_strategy @@ -193,6 +203,7 @@ class distributed_data_object(object): path = alias, hermitian = hermitian, copy = copy) + self.index = d2o_librarian.register(self) # ## If a hdf5 path was given, load the data # if FOUND['h5py'] == True and alias is not None: # self.load(alias = alias, path = path) @@ -222,7 +233,7 @@ class distributed_data_object(object): temp_d2o.hermitian = self.hermitian return temp_d2o - def copy_empty(self, global_shape=None, dtype=None, + def copy_empty(self, global_shape=None, local_shape=None, dtype=None, distribution_strategy=None, **kwargs): if global_shape == None: global_shape = self.shape @@ -234,10 +245,11 @@ class distributed_data_object(object): kwargs.update(self.init_kwargs) temp_d2o = distributed_data_object(global_shape=global_shape, - dtype=dtype, - distribution_strategy=distribution_strategy, - *self.init_args, - **kwargs) + local_shape = local_shape, + dtype = dtype, + distribution_strategy = distribution_strategy, + *self.init_args, + **kwargs) return temp_d2o def apply_scalar_function(self, function, inplace=False, dtype=None): @@ -245,7 +257,7 @@ class distributed_data_object(object): if inplace == True: temp = self - if dtype != None and self.dtype != dtype: + if dtype != None and self.dtype != np.dtype(dtype): about.warnings.cprint(\ "WARNING: Inplace dtype conversion is not possible!") @@ -352,14 +364,12 @@ class distributed_data_object(object): def __abs__(self): ## translate complex dtypes - if self.dtype == np.complex64: - new_dtype = np.float32 - elif self.dtype == np.complex128: - new_dtype = np.float64 - elif self.dtype == np.complex: - new_dtype = np.float - elif issubclass(self.dtype, np.complexfloating): - new_dtype = np.float + if self.dtype == np.dtype('complex64'): + new_dtype = np.dtype('float32') + elif self.dtype == np.dtype('complex128'): + new_dtype = np.dtype('float64') + elif issubclass(self.dtype.type, np.complexfloating): + new_dtype = np.dtype('float') else: new_dtype = self.dtype temp_d2o = self.copy_empty(dtype = new_dtype) @@ -382,8 +392,9 @@ class distributed_data_object(object): temp_data = operator(temp_data) ## Case 2: other is a real scalar -> preserve hermitianity - elif np.isreal(other) or (self.dtype not in (np.complex, np.complex128, - np.complex256)): + elif np.isreal(other) or (self.dtype not in ( + np.dtype('complex128'), + np.dtype('complex256'))): hermitian_Q = self.hermitian temp_data = operator(other) ## Case 3: other is complex @@ -396,7 +407,7 @@ class distributed_data_object(object): else: ## use common datatype for self and other new_dtype = np.dtype(np.find_common_type((self.dtype,), - (temp_data.dtype,))).type + (temp_data.dtype,))) temp_d2o = self.copy_empty( dtype = new_dtype) temp_d2o.set_local_data(data=temp_data) @@ -570,7 +581,7 @@ class distributed_data_object(object): local_mean_weight_list = self.distributor._allgather((local_mean, local_weight)) local_mean_weight_list =np.array(local_mean_weight_list) - ## compute the denominator for the weighted mean-mean + ## compute the denominator for the weighted mean-mean global_weight = np.sum(local_mean_weight_list[:,1]) ## compute the numerator numerator = np.sum(local_mean_weight_list[:,0]*\ @@ -590,7 +601,7 @@ class distributed_data_object(object): # local_argmin = function(self.data) # local_argmin_value = self.data[np.unravel_index(local_argmin, # self.data.shape)] -# globalized_local_argmin = self.distributor.globalize_flat_index(local_argmin) +# globalized_local_argmin = self.distributor.globalize_flat_index(local_argmin) # local_argmin_list = self.distributor._allgather((local_argmin_value, # globalized_local_argmin)) # local_argmin_list = np.array(local_argmin_list, dtype=[('value', int), @@ -602,7 +613,7 @@ class distributed_data_object(object): local_argmin_value = self.data[np.unravel_index(local_argmin, self.data.shape)] globalized_local_argmin = self.distributor.globalize_flat_index( - local_argmin) + local_argmin) local_argmin_list = self.distributor._allgather((local_argmin_value, globalized_local_argmin)) local_argmin_list = np.array(local_argmin_list, dtype=[ @@ -617,7 +628,7 @@ class distributed_data_object(object): local_argmax_value = -self.data[np.unravel_index(local_argmax, self.data.shape)] globalized_local_argmax = self.distributor.globalize_flat_index( - local_argmax) + local_argmax) local_argmax_list = self.distributor._allgather((local_argmax_value, globalized_local_argmax)) local_argmax_list = np.array(local_argmax_list, dtype=[ @@ -678,8 +689,9 @@ class distributed_data_object(object): return np.unique(global_unique) def bincount(self, weights = None, minlength = None): - if np.dtype(self.dtype).type not in [np.int8, np.int16, np.int32, - np.int64, np.uint8, np.uint16, np.uint32, np.uint64]: + if self.dtype not in [np.dtype('int16'), np.dtype('int32'), + np.dtype('int64'), np.dtype('uint16'), + np.dtype('uint32'), np.dtype('uint64')]: raise TypeError(about._errors.cstring( "ERROR: Distributed-data-object must be of integer datatype!")) @@ -718,7 +730,8 @@ class distributed_data_object(object): self.hermitian = hermitian self.data = np.array(data, dtype=self.dtype, copy=copy, order='C') - def set_data(self, data, key, hermitian=False, copy=True, **kwargs): + def set_data(self, data, to_key, from_key=None, local_to_keys=False, + hermitian=False, copy=True, **kwargs): """ Stores the supplied data in the region which is specified by key. The data is distributed according to the distribution strategy. If @@ -740,8 +753,10 @@ class distributed_data_object(object): """ self.hermitian = hermitian self.distributor.disperse_data(data = self.data, - to_key = key, + to_key = to_key, data_update = data, + from_key = from_key, + local_to_keys = local_to_keys, copy = copy, **kwargs) # @@ -796,7 +811,7 @@ class distributed_data_object(object): if copy == False: return self.data - def get_data(self, key, **kwargs): + def get_data(self, key, local_keys=False, **kwargs): """ Loads data from the region which is specified by key. The data is consolidated according to the distribution strategy. If the @@ -815,7 +830,10 @@ class distributed_data_object(object): global_data[key] : numpy.ndarray """ - return self.distributor.collect_data(self.data, key, **kwargs) + return self.distributor.collect_data(self.data, + key, + local_keys = local_keys, + **kwargs) # (slices, sliceified) = self.__sliceify__(key) # result = self.distributor.collect_data(self.data, slices, **kwargs) # return self.__defold__(result, sliceified) @@ -918,14 +936,18 @@ class _distributor_factory(object): # # return return_dict - def parse_kwargs(self, distribution_strategy, + def parse_kwargs(self, distribution_strategy, comm, global_data = None, global_shape = None, local_data = None, local_shape = None, alias = None, path = None, - dtype = None, comm = None, **kwargs): + dtype = None, **kwargs): return_dict = {} + ## Check that all nodes got the same distribution_strategy + strat_list = comm.allgather(distribution_strategy) + assert(all(x == strat_list[0] for x in strat_list)) + ## Check for an hdf5 file and open it if given if FOUND['h5py'] == True and alias is not None: ## set file path @@ -942,43 +964,47 @@ class _distributor_factory(object): ## Parse the MPI communicator - if distribution_strategy in ['equal', 'fftw', 'freeform']: - if comm is None: - raise ValueError(about._errors.cstring( - "ERROR: The distributor needs a MPI communicator object comm!")) - else: - return_dict['comm'] = comm + if comm is None: + raise ValueError(about._errors.cstring( + "ERROR: The distributor needs a MPI communicator object comm!")) + else: + return_dict['comm'] = comm ## Parse the datatype if distribution_strategy in ['not', 'equal', 'fftw'] and \ (dset is not None): - dtype = dset.dtype.type + dtype = dset.dtype - elif distribution_strategy in ['not', 'equal', 'fftw', 'freeform']: + elif distribution_strategy in ['not', 'equal', 'fftw']: if dtype is None: - if global_data is None and local_data is None: + if global_data is None: raise ValueError(about._errors.cstring( - "ERROR: Neither global_data nor local_data nor dtype supplied!")) - elif global_data is not None: + "ERROR: Neither global_data nor dtype supplied!")) + else: try: - dtype = global_data.dtype.type + dtype = global_data.dtype except(AttributeError): - try: - dtype = global_data.dtype - except(AttributeError): - dtype = np.array(global_data).dtype.type - elif local_data is not None: + dtype = np.array(global_data).dtype + else: + dtype = np.dtype(dtype) + + elif distribution_strategy in ['freeform']: + if dtype is None: + if global_data is None and local_data is None: + raise ValueError(about._errors.cstring( + "ERROR: Neither nor local_data nor dtype supplied!")) + else: try: - dtype = local_data.dtype.type + dtype = local_data.dtype except(AttributeError): - try: - dtype = local_data.dtype - except(AttributeError): - dtype = np.array(local_data).dtype.type + dtype = np.array(local_data).dtype else: - dtype = np.dtype(dtype).type + dtype = np.dtype(dtype) + + dtype_list = comm.allgather(dtype) + assert(all(x == dtype_list[0] for x in dtype_list)) return_dict['dtype'] = dtype - + ## Parse the shape ## Case 1: global-type slicer if distribution_strategy in ['not', 'equal', 'fftw']: @@ -995,10 +1021,9 @@ class _distributor_factory(object): if global_shape == (): raise ValueError(about._errors.cstring( "ERROR: global_shape == () is not valid shape!")) - if np.any(np.array(global_shape) == 0): - raise ValueError(about._errors.cstring( - "ERROR: Dimension of size 0 occurred!")) + global_shape_list = comm.allgather(global_shape) + assert(all(x == global_shape_list[0] for x in global_shape_list)) return_dict['global_shape'] = global_shape ## Case 2: local-type slicer @@ -1013,6 +1038,9 @@ class _distributor_factory(object): "local_shape supplied!")) return_dict['local_shape'] = local_shape + ## Add the name of the distributor if needed + if distribution_strategy in ['equal', 'fftw', 'freeform']: + return_dict['name'] = distribution_strategy ## close the file-handle if dset is not None: @@ -1023,27 +1051,31 @@ class _distributor_factory(object): def hash_arguments(self, distribution_strategy, **kwargs): kwargs = kwargs.copy() - if kwargs.has_key('comm'): - kwargs['comm'] = id(kwargs['comm']) + + comm = kwargs['comm'] + kwargs['comm'] = id(comm) if kwargs.has_key('global_shape'): kwargs['global_shape'] = kwargs['global_shape'] if kwargs.has_key('local_shape'): - kwargs['local_shape'] = kwargs['local_shape'] + local_shape = kwargs['local_shape'] + local_shape_list = comm.allgather(local_shape) + kwargs['local_shape'] = tuple(local_shape_list) kwargs['dtype'] = self.dictionize_np(kwargs['dtype']) kwargs['distribution_strategy'] = distribution_strategy + return frozenset(kwargs.items()) def dictionize_np(self, x): - dic = x.__dict__.items() + dic = x.type.__dict__.items() if x is np.float: dic[24] = 0 dic[29] = 0 dic[37] = 0 return frozenset(dic) - def get_distributor(self, distribution_strategy, **kwargs): + def get_distributor(self, distribution_strategy, comm, **kwargs): ## check if the distribution strategy is known known_distribution_strategies = ['not', 'equal', 'freeform'] @@ -1056,15 +1088,15 @@ class _distributor_factory(object): ## parse the kwargs parsed_kwargs = self.parse_kwargs( distribution_strategy = distribution_strategy, + comm = comm, **kwargs) hashed_kwargs = self.hash_arguments(distribution_strategy, **parsed_kwargs) - #print hashed_arguments ## check if the distributors has already been produced in the past if self.distributor_store.has_key(hashed_kwargs): return self.distributor_store[hashed_kwargs] - else: + else: ## produce new distributor if distribution_strategy == 'not': produced_distributor = _not_distributor(**parsed_kwargs) @@ -1072,21 +1104,18 @@ class _distributor_factory(object): elif distribution_strategy == 'equal': produced_distributor = _slicing_distributor( slicer = _equal_slicer, - name = distribution_strategy, **parsed_kwargs) elif distribution_strategy == 'fftw': produced_distributor = _slicing_distributor( slicer = _fftw_slicer, - name = distribution_strategy, **parsed_kwargs) elif distribution_strategy == 'freeform': produced_distributor = _slicing_distributor( slicer = _freeform_slicer, - name = distribution_strategy, **parsed_kwargs) - self.distributor_store[hashed_kwargs] = produced_distributor + self.distributor_store[hashed_kwargs] = produced_distributor return self.distributor_store[hashed_kwargs] @@ -1109,22 +1138,23 @@ class _slicing_distributor(object): self.comm = comm self.distribution_strategy = name - if comm.rank == 0: - if dtype is None: - raise TypeError(about._errors.cstring( - "ERROR: Failed setting datatype! No datatype supplied.")) - else: - self.dtype = dtype - else: - self.dtype=None - self.dtype = comm.bcast(self.dtype, root=0) +# if comm.rank == 0: +# if dtype is None: +# raise TypeError(about._errors.cstring( +# "ERROR: Failed setting datatype! No datatype supplied.")) +# else: +# self.dtype = np.dtype(dtype).type +# else: +# self.dtype=None +# self.dtype = comm.bcast(self.dtype, root=0) + self.dtype = np.dtype(dtype) - self._my_dtype_converter = _global_dtype_converter + self._my_dtype_converter = global_dtype_converter if not self._my_dtype_converter.known_np_Q(self.dtype): raise TypeError(about._errors.cstring(\ - "ERROR: The datatype "+str(self.dtype)+" is not known to mpi4py.")) + "ERROR: The datatype "+str(self.dtype.__repr__())+" is not known to mpi4py.")) self.mpi_dtype = self._my_dtype_converter.to_mpi(self.dtype) @@ -1170,12 +1200,13 @@ class _slicing_distributor(object): copy = copy) elif self.distribution_strategy in ['freeform']: if np.isscalar(local_data): - local_data = np.empty(self.local_shape, dtype = self.dtype) - local_data.fill(global_data) + temp_local_data = np.empty(self.local_shape, + dtype = self.dtype) + temp_local_data.fill(local_data) + local_data = temp_local_data hermitian = True elif local_data is None: local_data = np.empty(self.local_shape, dtype = self.dtype) - hermitian = False else: local_data = np.array(local_data).astype( self.dtype, copy=copy).reshape(self.local_shape) @@ -1210,19 +1241,19 @@ class _slicing_distributor(object): gathered_things = comm.allgather(thing) return gathered_things - def distribute_data(self, data=None, comm = None, alias=None, + def distribute_data(self, data=None, alias=None, path=None, copy=True, **kwargs): ''' distribute data checks - whether the data is located on all nodes or only on node 0 - that the shape of 'data' matches the global_shape ''' - if comm == None: - comm = self.comm + + comm = self.comm rank = comm.Get_rank() size = comm.Get_size() local_data_available_Q = np.array((int(data is not None), )) - data_available_Q = np.empty(size,dtype=int) + data_available_Q = np.empty(size, dtype=int) comm.Allgather([local_data_available_Q, MPI.INT], [data_available_Q, MPI.INT]) @@ -1235,7 +1266,7 @@ class _slicing_distributor(object): f= h5py.File(file_path, 'r') dset = f[alias] if dset.shape == self.global_shape and \ - dset.dtype.type == self.dtype: + dset.dtype == self.dtype: temp_data = dset[self.local_start:self.local_end] f.close() return temp_data @@ -1276,35 +1307,246 @@ class _slicing_distributor(object): def disperse_data(self, data, to_key, data_update, from_key=None, - comm=None, copy=True, **kwargs): + local_to_keys=False, copy=True, **kwargs): + + ## Check which keys we got: + (to_found, to_found_boolean) = self._infer_key_type(to_key) + (from_found, from_found_boolean) = self._infer_key_type(from_key) + + comm = self.comm + if local_to_keys == False: + return self._disperse_data_primitive(data = data, + to_key = to_key, + data_update = data_update, + from_key = from_key, + copy = copy, + to_found = to_found, + to_found_boolean = to_found_boolean, + from_found = from_found, + from_found_boolean = from_found_boolean, + **kwargs) + + else: + ## assert that all to_keys are from same type + to_found_list = comm.allgather(to_found) + assert(all(x == to_found_list[0] for x in to_found_list)) + to_found_boolean_list = comm.allgather(to_found_boolean) + assert( + all(x == to_found_boolean_list[0] for x in to_found_boolean_list)) + ## gather the local_keys into a global key_list + to_key_list = comm.allgather(to_key) + + i = 0 + for temp_to_key in to_key_list: + ## build a temporary freeform d2o which only contains data from + ## node i + if comm.rank == i: + temp_shape = np.shape(data_update) + try: + temp_dtype = np.dtype(data_update).type + except(TypeError): + temp_dtype = np.array(data_update).dtype.type + else: + temp_shape = None + temp_dtype = None + temp_shape = comm.bcast(temp_shape, root=i) + temp_dtype = comm.bcast(temp_dtype, root=i) + + if comm.rank != i: + temp_shape[0] = 0 + temp_data = np.empty(temp_shape, dtype = temp_dtype) + else: + temp_data = data_update + temp_d2o = distributed_data_object(local_data = temp_data, + distribution_strategy = 'freeform') + # disperse the data one after another + self._disperse_data_primitive(data = data, + to_key = temp_to_key, + data_update = temp_d2o, + from_key = from_key, + copy = copy, + to_found = to_found, + to_found_boolean = to_found_boolean, + from_found = from_found, + from_found_boolean = from_found_boolean, + **kwargs) + i += 1 + + def _disperse_data_primitive(self, data, to_key, data_update, from_key, + copy, to_found, to_found_boolean, from_found, + from_found_boolean, **kwargs): - return self.disperse_data_from_slices(data = data, + + ## Case 1: to_key is a slice-tuple. Hence, the basic indexing/slicing + ## machinery will be used + if to_found == 'slicetuple': + if from_found == 'slicetuple': + return self.disperse_data_to_slices(data = data, to_slices = to_key, data_update = data_update, from_slices = from_key, - comm = comm, copy = copy, **kwargs) + else: + if from_key is not None: + about.infos.cprint( + "INFO: Advanced injection is not available for this "+ + "combination of to_key and from_key.") + prepared_data_update = data_update[from_key] + else: + prepared_data_update = data_update + return self.disperse_data_to_slices(data = data, + to_slices = to_key, + data_update = prepared_data_update, + copy = copy, + **kwargs) + - def disperse_data_from_slices(self, data, to_slices, data_update, - from_slices=None, comm=None, copy = True, + ## Case 2: key is an array + elif (to_found == 'ndarray' or to_found == 'd2o'): + ## Case 2.1: The array is boolean. + if to_found_boolean == True: + if from_key is not None: + about.infos.cprint( + "INFO: Advanced injection is not available for this "+ + "combination of to_key and from_key.") + prepared_data_update = data_update[from_key] + else: + prepared_data_update = data_update + return self.disperse_data_to_bool(data = data, + to_boolean_key = to_key, + data_update = prepared_data_update, + copy = copy, + **kwargs) + ## Case 2.2: The array is not boolean. Only 1-dimensional + ## advanced slicing is supported. + else: + if len(to_key.shape) != 1: + raise ValueError(about._errors.cstring( + "WARNING: Only one-dimensional advanced indexing " + + "is supported")) + ## Make a recursive call in order to trigger the 'list'-section + return self.disperse_data(data = data, to_key = [to_key], + data_update = data_update, + from_key = from_key, copy = copy, + **kwargs) + + ## Case 3 : to_key is a list. This list is interpreted as + ## one-dimensional advanced indexing list. + elif to_found == 'indexinglist': + if from_key is not None: + about.infos.cprint( + "INFO: Advanced injection is not available for this "+ + "combination of to_key and from_key.") + prepared_data_update = data_update[from_key] + else: + prepared_data_update = data_update + return self.disperse_data_to_list(data = data, + to_list_key = to_key, + data_update = prepared_data_update, + copy = copy, + **kwargs) + + + def disperse_data_to_list(self, data, to_list_key, data_update, + copy = True, **kwargs): + + if to_list_key == []: + return data + + ## Check if the key list is properly formatted: + ## Case 1: Flat list full of scalars + if np.all(map(np.isscalar, to_list_key)): + ## The scalars are interpreted as the indices of the first + ## dimension. + ## Decycle and shift the indices to the local slice + local_to_list_key = self._advandced_index_decycler(to_list_key) + ## if the list is not sorted, a mpirun will yield in randomly + ## unsorted results + l = np.array(local_to_list_key).flatten() + if not all(l[i] <= l[i+1] for i in xrange(len(l)-1)): + raise ValueError(about._errors.cstring( + "ERROR: The first dimemnsion of list_key must be sorted!")) + ## Case 2: Nested list: + ## The length of the list must be smaller or equal the number of + ## dimensions of the d2o. + elif len(to_list_key) <= len(self.global_shape): + ## apply the index decycler to every element in the list + local_to_list_key = map(self._advandced_index_decycler, + to_list_key) + ## if the list is not sorted, a mpirun will yield in randomly + ## unsorted results + l = np.array(local_to_list_key[0]).flatten() + if not all(l[i] <= l[i+1] for i in xrange(len(l)-1)): + raise ValueError(about._errors.cstring( + "ERROR: The first dimemnsion of list_key must be sorted!")) + else: + raise ValueError(about._errors.cstring( + "ERROR: too many indices!")) + return self._disperse_data_to_list_and_bool_helper( + data = data, + local_to_key = local_to_list_key, + data_update = data_update, + copy = copy , + **kwargs) + + + def disperse_data_to_bool(self, data, to_boolean_key, data_update, + copy = True, **kwargs): + ## Extract the part of the to_boolean_key which corresponds to the + ## local data + local_to_boolean_key = self.extract_local_data(to_boolean_key) + return self._disperse_data_to_list_and_bool_helper( + data = data, + local_to_key = local_to_boolean_key, + data_update = data_update, + copy = copy, + **kwargs) + + def _disperse_data_to_list_and_bool_helper(self, data, local_to_key, + data_update, copy, **kwargs): + comm = self.comm + rank = comm.rank + size = comm.size + ## Infer the length and offset of the locally affected data + locally_affected_data = data[local_to_key] + data_length = np.shape(locally_affected_data)[0] + data_length_list = np.empty(size, dtype = np.int_) + comm.Allgather( + [np.array(data_length, dtype=np.int), MPI.INT], + [data_length_list, MPI.INT]) + data_length_offset_list = np.append([0], + np.cumsum(data_length_list)[:-1]) + + ## Update the local data object with its very own portion + o = data_length_offset_list + l = data_length + + if isinstance(data_update, distributed_data_object): + data[local_to_key] = data_update[o[rank]:o[rank]+l].\ + get_full_data().astype(self.dtype) + else: + data[local_to_key] = np.array(data_update[o[rank]:o[rank]+l], + copy=copy).astype(self.dtype) + return data + + def disperse_data_to_slices(self, data, to_slices, data_update, + from_slices=None, copy = True, **kwargs): (to_slices, sliceified) = self._sliceify(to_slices) data_update = self._enfold(data_update, sliceified) - if comm == None: - comm = self.comm + comm = self.comm to_slices_list = comm.allgather(to_slices) ## check if all slices are the same. if all(x == to_slices_list[0] for x in to_slices_list): ## in this case, the _disperse_data_primitive can simply be called ##with target_rank = 'all' - self._disperse_data_from_slices_primitive(data = data, + self._disperse_data_to_slices_primitive(data = data, to_slices = to_slices, data_update=data_update, from_slices=from_slices, source_rank='all', - comm=comm, copy = copy) ## if the different nodes got different slices, disperse the data ## individually @@ -1312,12 +1554,11 @@ class _slicing_distributor(object): i = 0 for temp_to_slices in to_slices_list: ## make the collect_data call on all nodes - self._disperse_data_from_slices_primitive(data=data, + self._disperse_data_to_slices_primitive(data=data, to_slices=temp_to_slices, data_update=data_update, from_slices=from_slices, source_rank=i, - comm=comm, copy = copy) i += 1 @@ -1344,11 +1585,9 @@ class _slicing_distributor(object): # # - def _disperse_data_from_slices_primitive(self, data, to_slices, - data_update, from_slices, source_rank='all', comm=None, - copy=True): - if comm == None: - comm = self.comm + def _disperse_data_to_slices_primitive(self, data, to_slices, + data_update, from_slices, source_rank='all', copy=True): + comm = self.comm # if to_slices[0].step is not None and to_slices[0].step < -1: # raise ValueError(about._errors.cstring( @@ -1410,7 +1649,7 @@ class _slicing_distributor(object): [local_affected_data_length_list, MPI.INT]) local_affected_data_length_offset_list = np.append([0],\ np.cumsum( - local_affected_data_length_list[::order])[:-1])[::order] + local_affected_data_length_list[::order])[:-1])[::order] ## construct the locally adapted from_slice object r = comm.rank @@ -1427,7 +1666,14 @@ class _slicing_distributor(object): from_step),) update_slice = localized_from_slice + from_slices[1:] - data[local_to_slice] = np.array(data_update[update_slice],\ + + if isinstance(data_update, distributed_data_object): + data[local_to_slice] = data_update.get_data( + key = update_slice, + local_keys = True + ).get_local_data().astype(self.dtype) + else: + data[local_to_slice] = np.array(data_update[update_slice],\ copy=copy).astype(self.dtype) @@ -1460,14 +1706,11 @@ class _slicing_distributor(object): [local_dispersed_data, self.mpi_dtype], root=source_rank) data[local_to_slice] = local_dispersed_data - return None + return data - def collect_data(self, data, key, **kwargs): -# return self.collect_data_from_slices(data = data, -# slice_objects = key, -# **kwargs) - + + def collect_data(self, data, key, local_keys = False, **kwargs): ## collect_data supports three types of keys ## Case 1: key is a slicing/index tuple ## Case 2: key is a boolean-array of the same shape as self @@ -1477,24 +1720,61 @@ class _slicing_distributor(object): ## the same for all of the lists. This is essentially ## numpy advanced indexing in one dimension, only. - ## TODO: die Unterscheidung tuple und liste ist nicht hinreichend. - ## TODO: -> wenn es sich um ein tuple handelt, das etwas anderes als - ## TODO: skalare und slices enthält wird advanced indexing aktiviert. ## Check which case we got: - if isinstance(key, tuple) or isinstance(key,slice) or np.isscalar(key): - found = 'tuple' - elif isinstance(key, np.ndarray): - found = 'ndarray' - found_boolean = (key.dtype.type == np.bool_) - elif isinstance(key, distributed_data_object): - found = 'd2o' - found_boolean = (key.dtype == np.bool_) - elif isinstance(key, list): - found = 'list' + (found, found_boolean) = self._infer_key_type(key) + + comm = self.comm - ## Case 1: key is a tuple. Hence, the basic indexing/slicing machinery - ## will be used - if found == 'tuple': + if local_keys == False: + return self._collect_data_primitive(data, key, found, + found_boolean, **kwargs) + else: + ## assert that all keys are from same type + found_list = comm.allgather(found) + assert(all(x == found_list[0] for x in found_list)) + found_boolean_list = comm.allgather(found_boolean) + assert(all(x == found_boolean_list[0] for x in found_boolean_list)) + + ## gather the local_keys into a global key_list + ## Case 1: the keys are no distributed_data_objects + ## -> allgather does the job + if found != 'd2o': + key_list = comm.allgather(key) + ## Case 2: if the keys are distributed_data_objects, gather + ## the index of the array and build the key_list with help + ## from the librarian + else: + index_list = comm.allgather(key.index) + key_list = map(lambda z: d2o_librarian[z], index_list) + + i = 0 + for temp_key in key_list: + ## build the locally fed d2o + temp_d2o = self._collect_data_primitive(data, temp_key, found, + found_boolean, **kwargs) + ## collect the data stored in the d2o to the individual target + ## rank + temp_data = temp_d2o.get_full_data(target_rank = i) + if comm.rank == i: + individual_data = temp_data + i += 1 + return_d2o = distributed_data_object( + local_data = individual_data, + distribution_strategy = 'freeform') + return return_d2o + + + + + + def _collect_data_primitive(self, data, key, found, found_boolean, + **kwargs): + + + + ## Case 1: key is a slice-tuple. Hence, the basic indexing/slicing + ## machinery will be used + if found == 'slicetuple': return self.collect_data_from_slices(data = data, slice_objects = key, **kwargs) @@ -1505,8 +1785,8 @@ class _slicing_distributor(object): return self.collect_data_from_bool(data = data, boolean_key = key, **kwargs) - ## Case 2.2: The array is not boolean. As only 1-dimensional - ## advanced slicing is supported, make the input-array flat. + ## Case 2.2: The array is not boolean. Only 1-dimensional + ## advanced slicing is supported. else: if len(key.shape) != 1: raise ValueError(about._errors.cstring( @@ -1515,9 +1795,9 @@ class _slicing_distributor(object): ## Make a recursive call in order to trigger the 'list'-section return self.collect_data(data = data, key = [key], **kwargs) - ## Case 3 : key is an list. This list is interpreted as one-dimensional - ## advanced indexing list. Therefore - elif found == 'list': + ## Case 3 : key is a list. This list is interpreted as one-dimensional + ## advanced indexing list. + elif found == 'indexinglist': return self.collect_data_from_list(data = data, list_key = key, **kwargs) @@ -1535,6 +1815,12 @@ class _slicing_distributor(object): ## dimension. ## Decycle and shift the indices to the local slice local_list_key = self._advandced_index_decycler(list_key) + ## if the list is not sorted, a mpirun will yield in randomly + ## unsorted results + l = np.array(local_list_key).flatten() + if not all(l[i] <= l[i+1] for i in xrange(len(l)-1)): + raise ValueError(about._errors.cstring( + "ERROR: The first dimemnsion of list_key must be sorted!")) ## Extract from the local data local_result = data[local_list_key] ## Case 2: Nested list: @@ -1543,6 +1829,13 @@ class _slicing_distributor(object): elif len(list_key) <= len(self.global_shape): ## apply the index decycler to every element in the list local_list_key = map(self._advandced_index_decycler, list_key) + ## if the list is not sorted, a mpirun will yield in randomly + ## unsorted results + l = np.array(local_list_key[0]).flatten() + if not all(l[i] <= l[i+1] for i in xrange(len(l)-1)): + raise ValueError(about._errors.cstring( + "ERROR: The first dimemnsion of list_key must be sorted!")) + ## Extract from the local data local_result = data[local_list_key] else: @@ -1583,6 +1876,35 @@ class _slicing_distributor(object): else: return local_list_key + + def _infer_key_type(self, key): + if key is None: + return (None, None) + found_boolean = False + ## Check which case we got: + if isinstance(key, tuple) or isinstance(key,slice) or np.isscalar(key): + ## Check if there is something different in the array than + ## scalars and slices + if isinstance(key, slice) or np.isscalar(key): + key = [key] + + scalarQ = np.array(map(np.isscalar, key)) + sliceQ = np.array(map(lambda z: isinstance(z, slice), key)) + if np.all(scalarQ + sliceQ): + found = 'slicetuple' + else: + found = 'indexinglist' + elif isinstance(key, np.ndarray): + found = 'ndarray' + found_boolean = (key.dtype.type == np.bool_) + elif isinstance(key, distributed_data_object): + found = 'd2o' + found_boolean = (key.dtype == np.bool_) + elif isinstance(key, list): + found = 'indexinglist' + return (found, found_boolean) + + def collect_data_from_bool(self, data, boolean_key, **kwargs): local_boolean_key = self.extract_local_data(boolean_key) local_result = data[local_boolean_key] @@ -1590,94 +1912,129 @@ class _slicing_distributor(object): distribution_strategy = 'freeform') return global_result - def collect_data_from_slices(self, data, slice_objects, **kwargs): - - (slice_objects, sliceified) = self._sliceify(slice_objects) - comm = self.comm - slice_objects_list = comm.allgather(slice_objects) - ## check if all slices are the same. - if all(x == slice_objects_list[0] for x in slice_objects_list): - ## in this case, the _collect_data_primitive can simply be called - ##with target_rank = 'all' - result = self._collect_data_from_slices_primitive(data=data, - slice_objects=slice_objects, - target_rank='all') - ## if the different nodes got different slices, collect the data individually - else: - i = 0 - for temp_slices in slice_objects_list: - ## make the collect_data call on all nodes - temp_data = self._collect_data_from_slices_primitive(data=data, - slice_objects=temp_slices, - target_rank=i) - ## save the result only on the pulling node - if comm.rank == i: - individual_data = temp_data - i += 1 - result = individual_data +# def collect_data_from_slices(self, data, slice_objects, **kwargs): +# +# (slice_objects, sliceified) = self._sliceify(slice_objects) +## comm = self.comm +## slice_objects_list = comm.allgather(slice_objects) +## ## check if all slices are the same. +## if all(x == slice_objects_list[0] for x in slice_objects_list): +## ## in this case, the _collect_data_primitive can simply be called +## ##with target_rank = 'all' +# result = self._collect_data_from_slices_primitive(data=data, +# slice_objects=slice_objects, +# target_rank='all') +## ## if the different nodes got different slices, collect the data individually +## else: +## i = 0 +## for temp_slices in slice_objects_list: +## ## make the collect_data call on all nodes +## temp_data = self._collect_data_from_slices_primitive(data=data, +## slice_objects=temp_slices, +## target_rank=i) +## ## save the result only on the pulling node +## if comm.rank == i: +## individual_data = temp_data +## i += 1 +## result = individual_data +# +# return self._defold(result, sliceified) + + + def _invert_mpi_data_ordering(self, data): + comm = self.comm + s = comm.size + r = comm.rank + if s == 1: + return data - return self._defold(result, sliceified) + partner = s-1-r + new_data = comm.sendrecv(sendobj = data, + dest = partner, + source = partner) + comm.barrier() + return new_data - def _collect_data_from_slices_primitive(self, data, slice_objects, - target_rank='all'): - comm = self.comm + def collect_data_from_slices(self, data, slice_objects, + target_rank='all', directly_to_np_Q = False): # if slice_objects[0].step is not None and slice_objects[0].step < -1: # raise ValueError(about._errors.cstring( # "ERROR: Negative stepsizes other than -1 are not supported!")) + (slice_objects, sliceified) = self._sliceify(slice_objects) + + localized_start, localized_stop = self._backshift_and_decycle( slice_objects[0], self.local_start, self.local_end, self.global_shape[0]) + first_step = slice_objects[0].step local_slice = (slice(localized_start, localized_stop, - slice_objects[0].step),) + slice_objects[1:] - - ## This is the bad guy, which makes slicing slower than native numpy - local_collected_data = np.ascontiguousarray(data[local_slice]) + first_step),) + slice_objects[1:] - local_collected_data_length = local_collected_data.shape[0] - local_collected_data_length_list=np.empty(comm.size, dtype=np.int) - comm.Allgather( + + if directly_to_np_Q == False: + local_result = data[local_slice] + if (first_step != None) and (first_step < 0): + local_result = self._invert_mpi_data_ordering(local_result) + + global_result = distributed_data_object(local_data = local_result, + distribution_strategy = 'freeform') + + + + + else: + comm = self.comm + ## This is the bad guy, which makes slicing slower than native numpy + local_collected_data = np.ascontiguousarray(data[local_slice]) + + local_collected_data_length = local_collected_data.shape[0] + local_collected_data_length_list=np.empty(comm.size, dtype=np.int) + comm.Allgather( [np.array(local_collected_data_length, dtype=np.int), MPI.INT], [local_collected_data_length_list, MPI.INT]) - - collected_data_length = np.sum(local_collected_data_length_list) - collected_data_shape = (collected_data_length,) + \ + + collected_data_length = np.sum(local_collected_data_length_list) + collected_data_shape = (collected_data_length,) + \ local_collected_data.shape[1:] - local_collected_data_dim_list =\ - np.array(local_collected_data_length_list) *\ - np.product(local_collected_data.shape[1:]) - - ## if the first slice object has a negative step size, the ordering - ## of the Gatherv functions must be reversed - order = slice_objects[0].step - if order == None: - order = 1 - else: - order = np.sign(order) + local_collected_data_dim_list =\ + np.array(local_collected_data_length_list) *\ + np.product(local_collected_data.shape[1:]) + + ## if the first slice object has a negative step size, the ordering + ## of the Gatherv functions must be reversed + order = slice_objects[0].step + if order == None: + order = 1 + else: + order = np.sign(order) + + local_collected_data_dim_offset_list = np.append([0], + np.cumsum(local_collected_data_dim_list[::order])[:-1])[::order] + + local_collected_data_dim_offset_list =\ + local_collected_data_dim_offset_list + collected_data = np.empty(collected_data_shape, dtype=self.dtype) - local_collected_data_dim_offset_list = np.append([0], - np.cumsum(local_collected_data_dim_list[::order])[:-1])[::order] + if target_rank == 'all': + comm.Allgatherv([local_collected_data, self.mpi_dtype], + [collected_data, local_collected_data_dim_list, + local_collected_data_dim_offset_list, self.mpi_dtype]) + else: + comm.Gatherv([local_collected_data, self.mpi_dtype], + [collected_data, local_collected_data_dim_list, + local_collected_data_dim_offset_list, + self.mpi_dtype], + root=target_rank) + global_result = collected_data + + return self._defold(global_result, sliceified) - local_collected_data_dim_offset_list =\ - local_collected_data_dim_offset_list - collected_data = np.empty(collected_data_shape, dtype=self.dtype) - if target_rank == 'all': - comm.Allgatherv([local_collected_data, self.mpi_dtype], - [collected_data, local_collected_data_dim_list, - local_collected_data_dim_offset_list, self.mpi_dtype]) - else: - comm.Gatherv([local_collected_data, self.mpi_dtype], - [collected_data, local_collected_data_dim_list, - local_collected_data_dim_offset_list, - self.mpi_dtype], - root=target_rank) - return collected_data - def _backshift_and_decycle(self, slice_object, shifted_start, shifted_stop, global_length): # ## Crop the start value @@ -1793,7 +2150,7 @@ class _slicing_distributor(object): # local_stop = local_length return (local_start, local_stop) - def inject(self, data, to_slices, data_update, from_slices, comm=None, + def inject(self, data, to_slices, data_update, from_slices, **kwargs): ## check if to_key and from_key are completely built of slices if not np.all( @@ -1814,7 +2171,6 @@ class _slicing_distributor(object): to_key = to_slices, data_update = data_update, from_key = from_slices, - comm=comm, **kwargs) def extract_local_data(self, data_object): @@ -1866,7 +2222,9 @@ class _slicing_distributor(object): ## Case 2: no. All nodes extract their local slice from the ## data_object extracted_data =\ - data_object[self.local_start:self.local_end] + data_object.get_data(slice(self.local_start, + self.local_end), + local_keys = True) ## Case 3: First dimension fits directly and data_object is an generic ## array @@ -1920,10 +2278,12 @@ class _slicing_distributor(object): _dim_offset_list = self.all_local_slices[:,4] if target_rank == 'all': comm.Allgatherv([data, self.mpi_dtype], - [_gathered_data, _dim_list, _dim_offset_list, self.mpi_dtype]) + [_gathered_data, _dim_list, _dim_offset_list, + self.mpi_dtype]) else: comm.Gatherv([data, self.mpi_dtype], - [_gathered_data, _dim_list, _dim_offset_list, self.mpi_dtype], + [_gathered_data, _dim_list, _dim_offset_list, + self.mpi_dtype], root=target_rank) return _gathered_data @@ -1979,46 +2339,115 @@ class _slicing_distributor(object): def _enfold(self, in_data, sliceified): - data = np.array(in_data, copy=False) - temp_shape = () + ## TODO: Implement a reshape functionality in order to avoid this + ## low level mess!! + if isinstance(in_data, distributed_data_object): + local_data = in_data.data + elif isinstance(in_data, np.ndarray) == False: + local_data = np.array(in_data, copy=False) + in_data = local_data + else: + local_data = in_data + + temp_local_shape = () + temp_global_shape = () j=0 for i in sliceified: - if i == True: - temp_shape += (1,) + if i == False: + try: + temp_local_shape += (local_data.shape[j],) + temp_global_shape += (in_data.shape[j],) + except(IndexError): + temp_local_shape += (1,) + temp_global_shape += (1,) + j += 1 + else: + temp_local_shape += (1,) + temp_global_shape += (1,) try: - if data.shape[j] == 1: + if in_data.shape[j] == 1: j +=1 except(IndexError): pass + + ## take into account that the sliceified tuple may be too short, + ## because of a non-exaustive list of slices + for i in range(len(local_data.shape)-j): + temp_local_shape += (local_data.shape[j],) + temp_global_shape += (in_data.shape[j],) + j += 1 + + if isinstance(in_data, distributed_data_object) == True: + if in_data.distribution_strategy != 'freeform': + new_data = in_data.copy_empty(global_shape = temp_global_shape) + new_data.data[:] = local_data.reshape(temp_local_shape) else: + reshaped_data = local_data.reshape(temp_local_shape) + new_data = distributed_data_object(local_data = reshaped_data, + distribution_strategy = 'freeform') + return new_data + else: + return local_data.reshape(temp_local_shape) + + def _defold(self, in_data, sliceified): + ## TODO: Implement a reshape functionality in order to avoid this + ## low level mess!! + if isinstance(in_data, distributed_data_object): + local_data = in_data.data + elif isinstance(in_data, np.ndarray) == False: + local_data = np.array(in_data, copy=False) + in_data = local_data + else: + local_data = in_data + temp_local_shape = () + temp_global_shape = () + j=0 + for i in sliceified: + if i == False: try: - temp_shape += (data.shape[j],) + temp_local_shape += (local_data.shape[j],) + temp_global_shape += (in_data.shape[j],) except(IndexError): - temp_shape += (1,) - j += 1 - ## take into account that the sliceified tuple may be too short, because - ## of a non-exaustive list of slices - for i in range(len(data.shape)-j): - temp_shape += (data.shape[j],) + temp_local_shape += (1,) + temp_global_shape += (1,) + j += 1 + + ## take into account that the sliceified tuple may be too short, + ## because of a non-exaustive list of slices + for i in range(len(local_data.shape)-j): + temp_local_shape += (local_data.shape[j],) + temp_global_shape += (in_data.shape[j],) j += 1 - return data.reshape(temp_shape) - - def _defold(self, data, sliceified): - temp_slice = () - for i in sliceified: - if i == True: - temp_slice += (0,) + if isinstance(in_data, distributed_data_object) == True: + if in_data.distribution_strategy != 'freeform': + new_data = in_data.copy_empty(global_shape = temp_global_shape) + if np.any(np.array(local_data.shape)[np.array(sliceified)]==0): + new_data.data[:] = np.empty((0,)*len(temp_local_shape), + dtype = in_data.dtype) + else: + new_data.data[:] = local_data.reshape(temp_local_shape) else: - temp_slice += (slice(None),) - return data[temp_slice] + if np.any(np.array(local_data.shape)[np.array(sliceified)]==0): + reshaped_data = np.empty((0,)*len(temp_local_shape), + dtype = in_data.dtype) + else: + reshaped_data = local_data.reshape(temp_local_shape) + + new_data = distributed_data_object(local_data = reshaped_data, + distribution_strategy = 'freeform') + return new_data + else: + return local_data.reshape(temp_local_shape) + + if FOUND['h5py']: - def save_data(self, data, alias, path=None, overwriteQ=True, comm=None): - if comm == None: - comm = self.comm - ## if no path and therefore no filename was given, use the alias as filename + def save_data(self, data, alias, path=None, overwriteQ=True): + comm = self.comm + ## if no path and therefore no filename was given, use the alias + ## as filename use_path = alias if path==None else path ## create the file-handle @@ -2029,23 +2458,26 @@ class _slicing_distributor(object): ## check if dataset with name == alias already exists try: f[alias] - if overwriteQ == False: #if yes, and overwriteQ is set to False, raise an Error - raise KeyError(about._errors.cstring("ERROR: overwriteQ == False, but alias already in use!")) + #if yes, and overwriteQ is set to False, raise an Error + if overwriteQ == False: + raise KeyError(about._errors.cstring( + "ERROR: overwriteQ == False, but alias already in use!")) else: # if yes, remove the existing dataset del f[alias] except(KeyError): pass ## create dataset - dset = f.create_dataset(alias, shape=self.global_shape, dtype=self.dtype) + dset = f.create_dataset(alias, + shape=self.global_shape, + dtype=self.dtype) ## write the data dset[self.local_start:self.local_end] = data ## close the file f.close() - def load_data(self, alias, path, comm=None): - if comm == None: - comm = self.comm + def load_data(self, alias, path): + comm = self.comm ## parse the path file_path = path if (path is not None) else alias ## create the file-handle @@ -2060,7 +2492,7 @@ class _slicing_distributor(object): "ERROR: The shape of the given dataset does not match "+ "the distributed_data_object.")) ## check dtype - if dset.dtype.type != self.dtype: + if dset.dtype != self.dtype: raise TypeError(about._errors.cstring( "ERROR: The datatype of the given dataset does not match "+ "the distributed_data_object.")) @@ -2102,6 +2534,7 @@ def _equal_slicer(comm, global_shape): def _freeform_slicer(comm, local_shape): rank = comm.rank + size = comm.size ## Check that all but the first dimensions of local_shape are the same local_sub_shape = local_shape[1:] local_sub_shape_list = comm.allgather(local_sub_shape) @@ -2118,7 +2551,7 @@ def _freeform_slicer(comm, local_shape): first_shape_index_list = comm.allgather(first_shape_index) first_shape_index_cumsum = (0,) + tuple(np.cumsum(first_shape_index_list)) local_offset = first_shape_index_cumsum[rank] - global_shape = (first_shape_index_cumsum[rank+1],) + local_shape[1:] + global_shape = (first_shape_index_cumsum[size],) + local_shape[1:] return (local_offset, local_offset+first_shape_index, global_shape) @@ -2126,7 +2559,18 @@ if FOUND['pyfftw'] == True: def _fftw_slicer(comm, global_shape): if FOUND['MPI'] == False: comm = None - local_size = pyfftw.local_size(global_shape, comm = comm) + ## pyfftw.local_size crashes if any of the entries of global_shape + working_shape = np.array(global_shape) + mask = (working_shape == 0) + if mask[0] == True: + start = 0 + end = 0 + return (start, end, global_shape) + + if np.any(mask): + working_shape[mask] = 1 + + local_size = pyfftw.local_size(working_shape, comm = comm) start = local_size[2] end = start + local_size[1] return (start, end, global_shape) @@ -2134,8 +2578,8 @@ if FOUND['pyfftw'] == True: class _not_distributor(object): def __init__(self, global_shape=None, dtype=None, *args, **kwargs): - if dtype != None: - self.dtype = dtype + if dtype is not None: + self.dtype = np.dtype(dtype) else: raise ValueError(about._errors.cstring( "ERROR: No datatype supplied!")) @@ -2180,21 +2624,31 @@ class _not_distributor(object): def disperse_data(self, data, to_key, data_update, from_key=None, copy = True, **kwargs): - data[to_key] = np.array(data_update[from_key], - copy=copy).astype(self.dtype) + if isinstance(data_update, distributed_data_object): + update = data_update[from_key].get_full_data() + else: + update = np.array(data_update, copy = copy)[from_key] + update = update.astype(self.dtype) + data[to_key] = update def collect_data(self, data, slice_objects, **kwargs): - return data[slice_objects] + new_data = data[slice_objects] + return distributed_data_object(global_data = new_data, + distribution_strategy = 'not') def consolidate_data(self, data, **kwargs): - return data + return data.copy() def inject(self, data, to_slices = (slice(None),), data_update = None, from_slices = (slice(None),), **kwargs): - data[to_slices] = data_update[from_slices] + data[to_slices] = self.extract_local_data(data_update[from_slices]).\ + astype(self.dtype) def extract_local_data(self, data_object): - return data_object[:] + if isinstance(data_object, distributed_data_object): + return data_object[:].get_full_data() + else: + return np.array(data_object)[:] def flatten(self, data, inplace = False): if inplace == False: @@ -2212,35 +2666,63 @@ class _not_distributor(object): -class dtype_converter: +class _dtype_converter(object): """ NIFTY class for dtype conversion between python/numpy dtypes and MPI dtypes. """ def __init__(self): +# pre_dict = [ +# #[, MPI_CHAR], +# #[, MPI_SIGNED_CHAR], +# #[, MPI_UNSIGNED_CHAR], +# [np.bool_, MPI.BYTE], +# [np.int16, MPI.SHORT], +# [np.uint16, MPI.UNSIGNED_SHORT], +# [np.uint32, MPI.UNSIGNED_INT], +# [np.int32, MPI.INT], +# [np.int, MPI.LONG], +# [np.int_, MPI.LONG], +# [np.int64, MPI.LONG], +# [np.long, MPI.LONG], +# [np.longlong, MPI.LONG_LONG], +# [np.uint64, MPI.UNSIGNED_LONG], +# [np.ulonglong, MPI.UNSIGNED_LONG_LONG], +# [np.int64, MPI.LONG_LONG], +# [np.uint64, MPI.UNSIGNED_LONG_LONG], +# [np.float32, MPI.FLOAT], +# [np.float, MPI.DOUBLE], +# [np.float_, MPI.DOUBLE], +# [np.float64, MPI.DOUBLE], +# [np.float128, MPI.LONG_DOUBLE], +# [np.complex64, MPI.COMPLEX], +# [np.complex, MPI.DOUBLE_COMPLEX], +# [np.complex_, MPI.DOUBLE_COMPLEX], +# [np.complex128, MPI.DOUBLE_COMPLEX]] pre_dict = [ #[, MPI_CHAR], #[, MPI_SIGNED_CHAR], #[, MPI_UNSIGNED_CHAR], - [np.bool_, MPI.BYTE], - [np.int16, MPI.SHORT], - [np.uint16, MPI.UNSIGNED_SHORT], - [np.uint32, MPI.UNSIGNED_INT], - [np.int32, MPI.INT], - [np.int, MPI.LONG], - [np.int64, MPI.LONG], - [np.uint64, MPI.UNSIGNED_LONG], - [np.int64, MPI.LONG_LONG], - [np.uint64, MPI.UNSIGNED_LONG_LONG], - [np.float32, MPI.FLOAT], - [np.float, MPI.DOUBLE], - [np.float64, MPI.DOUBLE], - [np.float128, MPI.LONG_DOUBLE], - [np.complex64, MPI.COMPLEX], - [np.complex, MPI.DOUBLE_COMPLEX], - [np.complex128, MPI.DOUBLE_COMPLEX]] - + [np.dtype('bool'), MPI.BYTE], + [np.dtype('int16'), MPI.SHORT], + [np.dtype('uint16'), MPI.UNSIGNED_SHORT], + [np.dtype('uint32'), MPI.UNSIGNED_INT], + [np.dtype('int32'), MPI.INT], + [np.dtype('int'), MPI.LONG], + [np.dtype(np.long), MPI.LONG], + [np.dtype('int64'), MPI.LONG_LONG], + [np.dtype('longlong'), MPI.LONG], + [np.dtype('uint'), MPI.UNSIGNED_LONG], + [np.dtype('uint64'), MPI.UNSIGNED_LONG_LONG], + [np.dtype('ulonglong'), MPI.UNSIGNED_LONG_LONG], + [np.dtype('float32'), MPI.FLOAT], + [np.dtype('float64'), MPI.DOUBLE], + [np.dtype('float128'), MPI.LONG_DOUBLE], + [np.dtype('complex64'), MPI.COMPLEX], + [np.dtype('complex128'), MPI.DOUBLE_COMPLEX]] + + to_mpi_pre_dict = np.array(pre_dict) to_mpi_pre_dict[:,0] = map(self.dictionize_np, to_mpi_pre_dict[:,0]) self._to_mpi_dict = dict(to_mpi_pre_dict) @@ -2250,8 +2732,8 @@ class dtype_converter: self._to_np_dict = dict(to_np_pre_dict) def dictionize_np(self, x): - dic = x.__dict__.items() - if x is np.float: + dic = x.type.__dict__.items() + if x.type is np.float: dic[24] = 0 dic[29] = 0 dic[37] = 0 @@ -2270,7 +2752,68 @@ class dtype_converter: return self._to_np_dict.has_key(self.dictionize_mpi(dtype)) def known_np_Q(self, dtype): - return self._to_mpi_dict.has_key(self.dictionize_np(dtype)) + return self._to_mpi_dict.has_key(self.dictionize_np(np.dtype(dtype))) + +global_dtype_converter = _dtype_converter() + + +class _d2o_librarian(object): + def __init__(self): + self.library = weakdict() + self.counter = 0 + + def register(self, d2o): + self.counter += 1 + self.library[self.counter] = d2o + return self.counter + + def __getitem__(self, key): + return self.library[key] + +d2o_librarian = _d2o_librarian() + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + -_global_dtype_converter = dtype_converter() diff --git a/test/test_nifty_mpi_data.py b/test/test_nifty_mpi_data.py index a7fd2e6e4f9fdf275c6fe0809202b8a29901e5bb..82b617978b1f40a36b7ba88c739e17d762ee8939 100644 --- a/test/test_nifty_mpi_data.py +++ b/test/test_nifty_mpi_data.py @@ -1,28 +1,60 @@ # -*- coding: utf-8 -*- -from numpy.testing import assert_equal, assert_raises +from numpy.testing import assert_equal, assert_array_equal, assert_raises from nose_parameterized import parameterized import unittest +from time import sleep import itertools - import os +import imp import numpy as np import nifty -from nifty.nifty_mpi_data import distributed_data_object +from nifty.nifty_mpi_data import distributed_data_object, d2o_librarian -found = {} +FOUND = {} try: - import h5py - found['h5py'] = True + imp.find_module('h5py') + FOUND['h5py'] = True except(ImportError): - found['h5py'] = False + FOUND['h5py'] = False + +try: + from mpi4py import MPI + FOUND['MPI'] = True +except(ImportError): + import mpi_dummy as MPI + FOUND['MPI'] = False + +############################################################################### + +comm = MPI.COMM_WORLD +rank = comm.rank +size = comm.size + +############################################################################### + +np.random.seed(123) + +############################################################################### +#all_datatypes = [np.bool_, np.int16, np.uint16, np.uint32, np.int32, np.int_, +# np.int, np.int64, np.uint64, np.float32, np.float_, np.float, +# np.float64, np.float128, np.complex64, np.complex_, +# np.complex, np.complex128] +all_datatypes = [np.dtype('bool'), np.dtype('int16'), np.dtype('uint16'), + np.dtype('uint32'), np.dtype('int32'), np.dtype('int'), + np.dtype('int64'), np.dtype('uint'), np.dtype('uint64'), + np.dtype('float32'), np.dtype('float64'), np.dtype('float128'), + np.dtype('complex64'), np.dtype('complex128')] + +############################################################################### -all_datatypes = [np.bool_, np.int16, np.uint16, np.uint32, np.int32, np.int_, - np.int64, np.uint64, np.int64, np.uint64, np.float32, np.float_, - np.float64, np.float128, np.complex64, np.complex_, np.complex128] -all_distribution_strategies = ['not', 'equal', 'fftw'] +all_distribution_strategies = ['not', 'equal', 'fftw', 'freeform'] +global_distribution_strategies = ['not', 'equal', 'fftw'] +local_distribution_strategies = ['freeform'] + +############################################################################### binary_non_inplace_operators = ['__add__', '__radd__', '__sub__', '__rsub__', '__div__', '__truediv__', '__rdiv__', @@ -32,23 +64,34 @@ binary_non_inplace_operators = ['__add__', '__radd__', '__sub__', '__rsub__', binary_inplace_operators = ['__iadd__', '__isub__', '__idiv__', '__itruediv__', '__ifloordiv__', '__imul__', '__ipow__'] +############################################################################### + hdf5_test_paths = [#('hdf5_init_test.hdf5', None), ('hdf5_init_test.hdf5', os.path.join(os.path.dirname(nifty.__file__), 'test/hdf5_init_test.hdf5')), ('hdf5_init_test.hdf5', os.path.join(os.path.dirname(nifty.__file__), 'test//hdf5_test_folder/hdf5_init_test.hdf5'))] + +############################################################################### def custom_name_func(testcase_func, param_num, param): return "%s_%s" %( testcase_func.__name__, parameterized.to_safe_name("_".join(str(x) for x in param.args)), ) - -class Test_Initialization(unittest.TestCase): + + +############################################################################### +############################################################################### + + +class Test_Globaltype_Initialization(unittest.TestCase): + +############################################################################### @parameterized.expand( itertools.product([(1,), (7,), (78,11), (256,256)], all_datatypes, - all_distribution_strategies, + global_distribution_strategies, [True, False]), testcase_func_name=custom_name_func) def test_successful_init_via_global_shape_and_dtype(self, @@ -57,9 +100,9 @@ class Test_Initialization(unittest.TestCase): distribution_strategy, hermitian): obj = distributed_data_object(global_shape = global_shape, - dtype = dtype, + dtype = dtype, distribution_strategy = distribution_strategy, - hermitian = hermitian) + hermitian = hermitian) assert_equal(obj.dtype, dtype) assert_equal(obj.shape, global_shape) @@ -67,37 +110,19 @@ class Test_Initialization(unittest.TestCase): assert_equal(obj.hermitian, hermitian) assert_equal(obj.data.dtype, np.dtype(dtype)) - - @parameterized.expand([ - (None, None, None), - (None, (8,8), None), - (None, None, np.int), - (1, None, None) - ], - testcase_func_name=custom_name_func) - def test_failed_init_on_unsufficient_parameters(self, - global_data, - global_shape, - dtype): - assert_raises(ValueError, - lambda: distributed_data_object( - global_data = global_data, - global_shape = global_shape, - dtype = dtype)) + +############################################################################### @parameterized.expand( - itertools.product([(1,),(7,),(77,11),(256,256)], - [np.bool, np.int16, np.uint16, np.uint32, np.int32, - np.int, np.int64, np.uint64, np.int64, np.uint64, - np.float32, np.float, np.float64, np.float128, - np.complex64, np.complex, np.complex128], - ['not', 'equal', 'fftw']), - testcase_func_name=custom_name_func) - + itertools.product([(1,), (7,), (77,11), (256,256)], + all_datatypes, + global_distribution_strategies), + testcase_func_name=custom_name_func) def test_successful_init_via_global_data(self, global_shape, dtype, distribution_strategy): + a = (np.random.rand(*global_shape)*100-50).astype(dtype) obj = distributed_data_object(global_data = a, distribution_strategy = distribution_strategy) @@ -106,12 +131,40 @@ class Test_Initialization(unittest.TestCase): assert_equal(obj.distribution_strategy, distribution_strategy) assert_equal(obj.data.dtype, np.dtype(dtype)) +############################################################################### + + @parameterized.expand( + itertools.product([(1,), (7,), (77,11), (256,256)], + ['tuple', 'list'], + all_datatypes, + global_distribution_strategies), + testcase_func_name=custom_name_func) + def test_successful_init_via_tuple_and_list(self, + global_shape, + global_data_type, + dtype, + distribution_strategy): + + a = (np.random.rand(*global_shape)*100-50).astype(dtype) + if global_data_type == 'list': + a = a.tolist() + elif global_data_type == 'tuple': + a = tuple(a.tolist()) + + obj = distributed_data_object(global_data = a, + distribution_strategy = distribution_strategy) + assert_equal(obj.shape, global_shape) + assert_equal(obj.distribution_strategy, distribution_strategy) + + + +############################################################################### @parameterized.expand(itertools.product([ [1, (13,7), np.float64, (13,7), np.float64], [np.array([1]), (13,7), np.float64, (1,), np.float64], - [np.array([[1.,2.],[3.,4.]]), (13,7), np.int_, (2,2), np.int_], - ], ['not', 'equal', 'fftw']), + [np.array([[1.,2.],[3.,4.]]), (13,7), np.int_, (2,2), np.int_] + ], global_distribution_strategies), testcase_func_name=custom_name_func) def test_special_init_cases(self, (global_data, @@ -127,17 +180,249 @@ class Test_Initialization(unittest.TestCase): assert_equal(obj.shape, expected_shape) assert_equal(obj.dtype, expected_dtype) +############################################################################### - if found['h5py'] == True: + if FOUND['h5py'] == True: @parameterized.expand(itertools.product(hdf5_test_paths, - all_distribution_strategies), + global_distribution_strategies), testcase_func_name=custom_name_func) def test_hdf5_init(self, (alias, path), distribution_strategies): - obj = distributed_data_object(alias = alias, + obj = distributed_data_object(global_data = 1., + global_shape = (12,6), + alias = alias, path = path) assert_equal(obj.dtype, np.complex128) assert_equal(obj.shape, (13,7)) + +############################################################################### + + @parameterized.expand( + itertools.product( + [(None, None, None, None, None), + (None, (8,8), None, None, None), + (None, None, np.int_, None, None), + (1, None, None, None, None), + (None, None, None, np.array([1,2,3]), (3,)), + (None, (3*size,), None, np.array([1,2,3]), None), + (None, None, np.int_, None, (3,)),], + global_distribution_strategies), + testcase_func_name=custom_name_func) + def test_failed_init_on_unsufficient_parameters(self, + (global_data, global_shape, dtype, local_data, local_shape), + distribution_strategy): + assert_raises(ValueError, + lambda: distributed_data_object( + global_data = global_data, + global_shape = global_shape, + dtype = dtype, + local_data = local_data, + local_shape = local_shape, + distribution_strategy = distribution_strategy)) + +############################################################################### + + @parameterized.expand( + itertools.product([(0,), (1,0), (0,1), (25,0,10), (0,0)], + global_distribution_strategies), + testcase_func_name=custom_name_func) + def test_init_with_zero_type_shape(self, global_shape, + distribution_strategy): + obj = distributed_data_object(global_shape = global_shape, + dtype = np.int, + distribution_strategy = distribution_strategy) + assert_equal(obj.shape, global_shape) + + + +############################################################################### +############################################################################### + +class Test_Localtype_Initialization(unittest.TestCase): + +############################################################################### + @parameterized.expand( + itertools.product([(1,), (7,), (78,11), (256,256)], + [False, True], + all_datatypes, + local_distribution_strategies, + [True, False]), + testcase_func_name=custom_name_func) + def test_successful_init_via_local_shape_and_dtype(self, + local_shape, + different_shapes, + dtype, + distribution_strategy, + hermitian): + + if different_shapes == True: + expected_global_shape = np.array(local_shape) + expected_global_shape[0] *= size*(size-1)/2 + expected_global_shape = tuple(expected_global_shape) + local_shape = list(local_shape) + local_shape[0] *= rank + local_shape = tuple(local_shape) + else: + expected_global_shape = np.array(local_shape) + expected_global_shape[0] *= size + expected_global_shape = tuple(expected_global_shape) + + obj = distributed_data_object(local_shape = local_shape, + dtype = dtype, + distribution_strategy = distribution_strategy, + hermitian = hermitian) + + assert_equal(obj.dtype, dtype) + assert_equal(obj.shape, expected_global_shape) + assert_equal(obj.distribution_strategy, distribution_strategy) + assert_equal(obj.hermitian, hermitian) + assert_equal(obj.data.dtype, np.dtype(dtype)) + + +############################################################################### + + @parameterized.expand( + itertools.product([(1,), (7,), (77,11), (256,256)], + [False, True], + all_datatypes, + local_distribution_strategies), + testcase_func_name=custom_name_func) + def test_successful_init_via_local_data(self, + local_shape, + different_shapes, + dtype, + distribution_strategy): + if different_shapes == True: + expected_global_shape = np.array(local_shape) + expected_global_shape[0] *= size*(size-1)/2 + expected_global_shape = tuple(expected_global_shape) + local_shape = list(local_shape) + local_shape[0] *= rank + local_shape = tuple(local_shape) + else: + expected_global_shape = np.array(local_shape) + expected_global_shape[0] *= size + expected_global_shape = tuple(expected_global_shape) + + a = (np.random.rand(*local_shape)*100-50).astype(dtype) + obj = distributed_data_object(local_data = a, + distribution_strategy = distribution_strategy) + + assert_equal(obj.dtype, np.dtype(dtype)) + assert_equal(obj.shape, expected_global_shape) + assert_equal(obj.distribution_strategy, distribution_strategy) + assert_equal(obj.data.dtype, np.dtype(dtype)) + +############################################################################### + + @parameterized.expand( + itertools.product([(1,)],#, (7,), (77,11), (256,256)], + ['tuple', 'list'], + all_datatypes, + local_distribution_strategies), + testcase_func_name=custom_name_func) + def test_successful_init_via_tuple_and_list(self, + local_shape, + local_data_type, + dtype, + distribution_strategy): + + a = (np.random.rand(*local_shape)*100).astype(dtype) + if local_data_type == 'list': + a = a.tolist() + elif local_data_type == 'tuple': + a = tuple(a.tolist()) + sleep(0.01) + obj = distributed_data_object(local_data = a, + distribution_strategy = distribution_strategy) + + expected_global_shape = np.array(local_shape) + expected_global_shape[0] *= size + expected_global_shape = tuple(expected_global_shape) + + assert_equal(obj.shape, expected_global_shape) + assert_equal(obj.distribution_strategy, distribution_strategy) + + + +############################################################################### + + @parameterized.expand(itertools.product([ + [1, (13,7), np.float64, (13*size,7), np.float64], + [np.array([1]), (13,7), np.float64, (1*size,), np.float64], + [np.array([[1.,2.],[3.,4.]]), (13,7), np.int, (2*size,2), np.int] + ], local_distribution_strategies), + testcase_func_name=custom_name_func) + def test_special_init_cases(self, + (local_data, + local_shape, + dtype, + expected_shape, + expected_dtype), + distribution_strategy): + obj = distributed_data_object(local_data = local_data, + local_shape = local_shape, + dtype = dtype, + distribution_strategy = distribution_strategy) + + assert_equal(obj.shape, expected_shape) + assert_equal(obj.dtype, expected_dtype) + +################################################################################ +# +# if FOUND['h5py'] == True: +# @parameterized.expand(itertools.product(hdf5_test_paths, +# global_distribution_strategies), +# testcase_func_name=custom_name_func) +# def test_hdf5_init(self, (alias, path), distribution_strategies): +# obj = distributed_data_object(global_data = 1., +# global_shape = (12,6), +# alias = alias, +# path = path) +# assert_equal(obj.dtype, np.complex128) +# assert_equal(obj.shape, (13,7)) +# +################################################################################ +# +# @parameterized.expand( +# itertools.product( +# [(None, None, None, None, None), +# (None, (8,8), None, None, None), +# (None, None, np.int_, None, None), +# (1, None, None, None, None), +# (None, None, None, np.array([1,2,3]), (3,)), +# (None, (3*size,), None, np.array([1,2,3]), None), +# (None, None, np.int_, None, (3,)),], +# global_distribution_strategies), +# testcase_func_name=custom_name_func) +# def test_failed_init_on_unsufficient_parameters(self, +# (global_data, global_shape, dtype, local_data, local_shape), +# distribution_strategy): +# assert_raises(ValueError, +# lambda: distributed_data_object( +# global_data = global_data, +# global_shape = global_shape, +# dtype = dtype, +# local_data = local_data, +# local_shape = local_shape, +# distribution_strategy = distribution_strategy)) +# +################################################################################ +# +# @parameterized.expand( +# itertools.product([(0,), (1,0), (0,1), (25,0,10), (0,0)], +# global_distribution_strategies), +# testcase_func_name=custom_name_func) +# def test_init_with_zero_type_shape(self, global_shape, +# distribution_strategy): +# obj = distributed_data_object(global_shape = global_shape, +# dtype = np.int, +# distribution_strategy = distribution_strategy) +# assert_equal(obj.shape, global_shape) +# + +############################################################################### +############################################################################### class Test_set_get_full_and_local_data(unittest.TestCase): @parameterized.expand( @@ -155,7 +440,7 @@ class Test_set_get_full_and_local_data(unittest.TestCase): distribution_strategy = distribution_strategy) assert_equal(obj.get_full_data(), a) - if found['h5py']: + if FOUND['h5py']: @parameterized.expand(hdf5_test_paths) def test_loading_hdf5_file(self, alias, path): a = np.arange(13*7).reshape((13,7)).astype(np.float) @@ -199,14 +484,18 @@ class Test_set_get_full_and_local_data(unittest.TestCase): b = obj.get_local_data() c = (np.random.random(b.shape)*100).astype(np.dtype(dtype)) obj.set_local_data(data = c) - assert_equal(obj.get_local_data(), c) + assert_equal(obj.get_local_data(), c) + +############################################################################### +############################################################################### class Test_slicing_get_set_data(unittest.TestCase): @parameterized.expand( - itertools.product([(4,4),(20,21), (77,11), (256,256)], + itertools.product( + [(4,4),(20,21), (77,11), (256,256)], all_datatypes, - all_distribution_strategies, + all_distribution_strategies, [slice(None, None, None), slice(5, 18), slice(5, 18, 4), @@ -245,7 +534,8 @@ class Test_slicing_get_set_data(unittest.TestCase): reshape(global_shape) obj = distributed_data_object(global_data = a, distribution_strategy = distribution_strategy) - assert_equal(obj[slice_tuple], a[slice_tuple]) + + assert_array_equal(obj[slice_tuple].get_full_data(), a[slice_tuple]) b = 100*np.copy(a) obj[slice_tuple] = b[slice_tuple] @@ -253,6 +543,8 @@ class Test_slicing_get_set_data(unittest.TestCase): assert_equal(obj.get_full_data(), a) +############################################################################### +############################################################################### class Test_inject(unittest.TestCase): @@ -284,7 +576,7 @@ class Test_inject(unittest.TestCase): data = p, from_slices = slice_tuple_2) a[slice_tuple_1] = b[slice_tuple_2] - assert_equal(obj, a) + assert_equal(obj.get_full_data(), a) @@ -293,7 +585,9 @@ def scalar_only_square(x): return x*x else: raise ValueError - +############################################################################### +############################################################################### + class Test_copy_and_copy_empty(unittest.TestCase): @parameterized.expand([ @@ -388,7 +682,8 @@ class Test_copy_and_copy_empty(unittest.TestCase): assert_equal(obj.get_full_data(), np.ones(shape=temp_shape)) assert_equal(obj.dtype, desired_dtype) - +############################################################################### +############################################################################### class Test_unary_and_binary_operations(unittest.TestCase): @@ -460,7 +755,7 @@ class Test_unary_and_binary_operations(unittest.TestCase): temp_shape = (8,8) a = np.arange(np.prod(temp_shape)).reshape(temp_shape) obj = distributed_data_object(a) - assert_equal(obj+1j, a+1j) + assert_equal((obj+1j).get_full_data(), a+1j) @parameterized.expand(binary_non_inplace_operators, testcase_func_name=custom_name_func) @@ -516,7 +811,10 @@ class Test_unary_and_binary_operations(unittest.TestCase): assert_equal(obj.equal(p), True) assert_equal(obj.equal(p+1), False) assert_equal(obj.equal(None), False) - + +############################################################################### +############################################################################### + class Test_contractions(unittest.TestCase): def test_vdot(self): temp_shape = (8,8) @@ -570,7 +868,6 @@ class Test_contractions(unittest.TestCase): np.all(np.isreal(a))) assert_equal(obj.isreal().any(), np.any(np.isreal(a))) - ## Todo: Assert that data is copied, when copy flag is set ## Todo: Assert that set, get and injection work, if there is different data