Commit 083f6433 authored by Theo Steininger's avatar Theo Steininger

Ported to keepers.Logging.

parent 8c7d198b
Pipeline #11901 passed with stage
in 7 minutes and 9 seconds
......@@ -17,7 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
from keepers import Versionable
from keepers import Versionable,\
Loggable
from d2o.config import configuration as gc,\
dependency_injector as gdi
......@@ -29,13 +30,8 @@ from strategies import STRATEGIES
MPI = gdi[gc['mpi_module']]
about_cstring = lambda z: z
from sys import stdout
about_warnings_cprint = lambda z: stdout.write(z + "\n"); stdout.flush()
about_infos_cprint = lambda z: stdout.write(z + "\n"); stdout.flush()
class distributed_data_object(Versionable, object):
class distributed_data_object(Loggable, Versionable, object):
"""A multidimensional array with modular MPI-based distribution schemes.
The purpose of a distributed_data_object (d2o) is to provide the user
......@@ -429,8 +425,7 @@ class distributed_data_object(Versionable, object):
try:
result_data = function(local_data)
except:
about_warnings_cprint(
"WARNING: Trying to use np.vectorize!")
self.logger.warn("Trying to use np.vectorize!")
result_data = np.vectorize(function,
otypes=[local_data.dtype])(local_data)
......@@ -1221,11 +1216,9 @@ class distributed_data_object(Versionable, object):
"""
if 0 in self.shape:
raise ValueError(
"ERROR: attempt to get argmin of an empty object")
raise ValueError("Attempt to get argmin of an empty object")
if axis is not None:
raise NotImplementedError("ERROR: argmin doesn't support axis "
"keyword")
raise NotImplementedError("argmin doesn't support axis keyword")
if self.shape == ():
return 0
......@@ -1259,9 +1252,9 @@ class distributed_data_object(Versionable, object):
if 0 in self.shape:
raise ValueError(
"ERROR: attempt to get argmax of an empty object")
"Attempt to get argmax of an empty object")
if axis is not None:
raise NotImplementedError("ERROR: argmax doesn't support axis "
raise NotImplementedError("argmax doesn't support axis "
"keyword")
if self.shape == ():
return 0
......@@ -1337,8 +1330,8 @@ class distributed_data_object(Versionable, object):
expensive.
"""
about_warnings_cprint(
"WARNING: The current implementation of median is very expensive!")
self.logger.warn("The current implementation of median is very "
"expensive!")
median = np.median(self.get_full_data(), axis=axis, **kwargs)
if np.isscalar(median):
return median
......@@ -1473,8 +1466,8 @@ class distributed_data_object(Versionable, object):
if self.dtype not in [np.dtype('int16'), np.dtype('int32'),
np.dtype('int64'), np.dtype('uint16'),
np.dtype('uint32'), np.dtype('uint64')]:
raise TypeError(about_cstring(
"ERROR: Distributed-data-object must be of integer datatype!"))
raise TypeError("Distributed-data-object must be of integer "
"datatype!")
if axis is ():
return self.copy()
......
......@@ -19,6 +19,8 @@
import numpy as np
from keepers import Loggable
from d2o.config import configuration as gc,\
dependency_injector as gdi
......@@ -39,13 +41,7 @@ h5py = gdi.get('h5py')
pyfftw = gdi.get('pyfftw')
about_cstring = lambda z: z
from sys import stdout
about_infos_cprint = lambda z: stdout.write(z + "\n"); stdout.flush()
class _distributor_factory(object):
class _distributor_factory(Loggable, object):
def __init__(self):
self.distributor_store = {}
......@@ -73,8 +69,8 @@ class _distributor_factory(object):
# Parse the MPI communicator
if comm is None:
raise ValueError(about_cstring(
"ERROR: The distributor needs MPI-communicator object comm!"))
raise ValueError("The distributor needs MPI-communicator object "
"comm!")
else:
return_dict['comm'] = comm
......@@ -82,9 +78,9 @@ class _distributor_factory(object):
# Check that all nodes got the same distribution_strategy
strat_list = comm.allgather(distribution_strategy)
if not all(x == strat_list[0] for x in strat_list):
raise ValueError(about_cstring(
"ERROR: The distribution-strategy must be the same on " +
"all nodes! Got: %s") % distribution_strategy)
raise ValueError("The distribution-strategy must be the same "
"on all nodes! Got: %s" %
distribution_strategy)
# Check for an hdf5 file and open it if given
if 'h5py' in gdi and alias is not None:
......@@ -109,7 +105,7 @@ class _distributor_factory(object):
if dtype is None:
if global_data is None:
dtype = np.dtype('float64')
about_infos_cprint('INFO: dtype was set to default.')
self.logger.info("dtype was set to default.")
else:
try:
dtype = global_data.dtype
......@@ -129,15 +125,15 @@ class _distributor_factory(object):
dtype = np.array(local_data).dtype
else:
dtype = np.dtype('float64')
about_infos_cprint('INFO: dtype set was set to default.')
self.logger.info("INFO: dtype set was set to default.")
else:
dtype = np.dtype(dtype)
if expensive_checks:
dtype_list = comm.allgather(dtype)
if not all(x == dtype_list[0] for x in dtype_list):
raise ValueError(about_cstring(
"ERROR: The given dtype must be the same on all nodes!"))
raise ValueError("The given dtype must be the same on all "
"nodes!")
return_dict['dtype'] = dtype
# Parse the shape
......@@ -152,20 +148,15 @@ class _distributor_factory(object):
elif global_data is not None:
global_shape = ()
else:
raise ValueError(about_cstring(
"ERROR: Neither global_data nor " +
"global_shape nor hdf5 file supplied!"))
# if global_shape == ():
# raise ValueError(about_cstring(
# "ERROR: global_shape == () is not a valid shape!"))
raise ValueError("Neither global_data nor global_shape nor "
"hdf5 file supplied!")
if expensive_checks:
global_shape_list = comm.allgather(global_shape)
if not all(x == global_shape_list[0]
for x in global_shape_list):
raise ValueError(about_cstring(
"ERROR: The global_shape must be the same on all " +
"nodes!"))
raise ValueError("The global_shape must be the same on "
"all nodes!")
return_dict['global_shape'] = global_shape
# Case 2: local-type slicer
......@@ -177,21 +168,18 @@ class _distributor_factory(object):
elif local_shape is not None:
local_shape = tuple(local_shape)
else:
raise ValueError(about_cstring(
"ERROR: Neither non-0-dimensional local_data nor " +
"local_shape nor global d2o supplied!"))
raise ValueError("Neither non-0-dimensional local_data nor "
"local_shape nor global d2o supplied!")
if local_shape == ():
raise ValueError(about_cstring(
"ERROR: local_shape == () is not a valid shape!"))
raise ValueError("local_shape == () is not a valid shape!")
if expensive_checks:
local_shape_list = comm.allgather(local_shape[1:])
cleared_set = set(local_shape_list)
cleared_set.discard(())
if len(cleared_set) > 1:
raise ValueError(about_cstring(
"ERROR: All but the first entry of local_shape " +
"must be the same on all nodes!"))
raise ValueError("All but the first entry of local_shape "
"must be the same on all nodes!")
return_dict['local_shape'] = local_shape
# Add the name of the distributor if needed
......@@ -233,8 +221,7 @@ class _distributor_factory(object):
def get_distributor(self, distribution_strategy, comm, **kwargs):
# check if the distribution strategy is known
if distribution_strategy not in STRATEGIES['all']:
raise ValueError(about_cstring(
"ERROR: Unknown distribution strategy supplied."))
raise ValueError("Unknown distribution strategy supplied.")
# parse the kwargs
parsed_kwargs = self.parse_kwargs(
......@@ -244,8 +231,8 @@ class _distributor_factory(object):
if parsed_kwargs.get('global_shape') == ():
distribution_strategy = 'not'
about_infos_cprint("WARNING: Distribution strategy was set to "
"'not' because of global_shape == ()")
self.logger.warn("Distribution strategy was set to "
"'not' because of global_shape == ()")
hashed_kwargs = self.hash_arguments(distribution_strategy,
**parsed_kwargs)
......@@ -301,7 +288,7 @@ def _infer_key_type(key):
found = 'd2o'
found_boolean = (key.dtype == np.bool_)
else:
raise ValueError(about_cstring("ERROR: Unknown keytype!"))
raise ValueError("Unknown keytype!")
return (found, found_boolean)
......@@ -538,9 +525,8 @@ class _slicing_distributor(distributor):
self._my_dtype_converter = dtype_converter
if not self._my_dtype_converter.known_np_Q(self.dtype):
raise TypeError(about_cstring(
"ERROR: The datatype " + str(self.dtype.__repr__()) +
" is not known to mpi4py."))
raise TypeError("The datatype " + str(self.dtype.__repr__()) +
" is not known to mpi4py.")
self.mpi_dtype = self._my_dtype_converter.to_mpi(self.dtype)
......@@ -600,8 +586,7 @@ class _slicing_distributor(distributor):
local_data = np.array(local_data).astype(
self.dtype, copy=copy).reshape(self.local_shape)
else:
raise TypeError(about_cstring(
"ERROR: Unknown istribution strategy"))
raise TypeError("Unknown istribution strategy")
return (local_data, hermitian)
def globalize_flat_index(self, index):
......@@ -610,8 +595,8 @@ class _slicing_distributor(distributor):
def globalize_index(self, index):
index = np.array(index, dtype=np.int).flatten()
if index.shape != (len(self.global_shape),):
raise TypeError(about_cstring("ERROR: Length\
of index tuple does not match the array's shape!"))
raise TypeError("Length of index tuple does not match the array's "
"shape!")
globalized_index = index
globalized_index[0] = index[0] + self.local_start
# ensure that the globalized index list is within the bounds
......@@ -620,7 +605,7 @@ class _slicing_distributor(distributor):
-np.array(self.global_shape),
np.array(self.global_shape) - 1)
if np.any(global_index_memory != globalized_index):
about_infos_cprint("WARNING: Indices were clipped!")
self.logger.warn("Indices were clipped!")
globalized_index = tuple(globalized_index)
return globalized_index
......@@ -769,70 +754,6 @@ class _slicing_distributor(distributor):
return result
# def distribute_data(self, data=None, alias=None,
# path=None, copy=True, **kwargs):
# '''
# distribute data checks
# - whether the data is located on all nodes or only on node 0
# - that the shape of 'data' matches the global_shape
# '''
#
## comm = self.comm
#
# if 'h5py' in gdi and alias is not None:
# data = self.load_data(alias=alias, path=path)
#
# if data is None:
# return np.empty(self.global_shape, dtype=self.dtype)
# elif np.isscalar(data):
# return np.ones(self.global_shape, dtype=self.dtype)*data
# copy = False
# elif isinstance(data, np.ndarray) or \
# isinstance(data, distributed_data_object):
# data = self.extract_local_data(data)
#
# if data.shape is not self.local_shape:
# copy = True
#
# if copy:
# result_data = np.empty(self.local_shape, dtype=self.dtype)
# result_data[:] = data
# else:
# result_data = data
#
# return result_data
#
# else:
# new_data = np.array(data)
# return new_data.astype(self.dtype,
# copy=copy).reshape(self.global_shape)
#
#
## local_data_available_Q = (data is not None)
## data_available_Q = np.array(comm.allgather(local_data_available_Q))
##
## if np.all(data_available_Q == False):
## return np.empty(self.local_shape, dtype=self.dtype, order='C')
## # if all nodes got data, we assume that it is the right data and
## # store it individually.
## elif np.all(data_available_Q == True):
## if isinstance(data, distributed_data_object):
## temp_d2o = data.get_data((slice(self.local_start,
## self.local_end),),
## local_keys=True,
## copy=copy)
## return temp_d2o.get_local_data(copy=False).astype(self.dtype,
## copy=False)
## elif np.isscalar(data):
## return np.ones(self.local_shape, dtype=self.dtype)*data
## else:
## return data[self.local_start:self.local_end].astype(
## self.dtype,
## copy=copy)
## else:
## raise ValueError(
## "ERROR: distribute_data must get data on all nodes!")
def _disperse_data_primitive(self, data, to_key, data_update, from_key,
copy, to_found, to_found_boolean, from_found,
from_found_boolean, **kwargs):
......@@ -851,9 +772,9 @@ class _slicing_distributor(distributor):
**kwargs)
else:
if from_key is not None:
about_infos_cprint(
"INFO: Advanced injection is not available for this " +
"combination of to_key and from_key.")
self.logger.info("Advanced injection is not available for "
"this combination of to_key and "
"from_key.")
prepared_data_update = data_update[from_key]
else:
prepared_data_update = data_update
......@@ -870,9 +791,9 @@ class _slicing_distributor(distributor):
# Case 2.1: The array is boolean.
if to_found_boolean:
if from_key is not None:
about_infos_cprint(
"INFO: Advanced injection is not available for this " +
"combination of to_key and from_key.")
self.logger.info("Advanced injection is not available for "
"this combination of to_key and "
"from_key.")
prepared_data_update = data_update[from_key]
else:
prepared_data_update = data_update
......@@ -886,9 +807,8 @@ class _slicing_distributor(distributor):
# advanced slicing is supported.
else:
if len(to_key.shape) != 1:
raise ValueError(about_cstring(
"WARNING: Only one-dimensional advanced indexing " +
"is supported"))
raise ValueError("Only one-dimensional advanced indexing "
"is supported")
# Make a recursive call in order to trigger the 'list'-section
return self.disperse_data(data=data, to_key=[to_key],
data_update=data_update,
......@@ -899,9 +819,8 @@ class _slicing_distributor(distributor):
# one-dimensional advanced indexing list.
elif to_found == 'indexinglist':
if from_key is not None:
about_infos_cprint(
"INFO: Advanced injection is not available for this " +
"combination of to_key and from_key.")
self.logger.info("Advanced injection is not available for "
"this combination of to_key and from_key.")
prepared_data_update = data_update[from_key]
else:
prepared_data_update = data_update
......@@ -984,8 +903,7 @@ class _slicing_distributor(distributor):
if to_step is None:
to_step = 1
elif to_step == 0:
raise ValueError(about_cstring(
"ERROR: to_step size == 0!"))
raise ValueError("to_step size == 0!")
# Compute the offset of the data the individual node will take.
# The offset is free of stepsizes. It is the offset in terms of
......@@ -1022,17 +940,15 @@ class _slicing_distributor(distributor):
shifted_stop=data_update.shape[0],
global_length=data_update.shape[0])
if from_slices_start is None:
raise ValueError(about_cstring(
"ERROR: _backshift_and_decycle should never return " +
"None for local_start!"))
raise ValueError("_backshift_and_decycle should never return "
"None for local_start!")
# parse the step sizes
from_step = from_slices[0].step
if from_step is None:
from_step = 1
elif from_step == 0:
raise ValueError(about_cstring(
"ERROR: from_step size == 0!"))
raise ValueError("from_step size must not be equal to 0!")
localized_from_start = from_slices_start + from_step * o[r]
localized_from_stop = localized_from_start + from_step * l
......@@ -1141,9 +1057,8 @@ class _slicing_distributor(distributor):
# advanced slicing is supported.
else:
if len(key.shape) != 1:
raise ValueError(about_cstring(
"WARNING: Only one-dimensional advanced indexing " +
"is supported"))
raise ValueError("Only one-dimensional advanced indexing "
"is supported")
# Make a recursive call in order to trigger the 'list'-section
return self.collect_data(data=data, key=[key], copy=copy,
**kwargs)
......@@ -1158,8 +1073,7 @@ class _slicing_distributor(distributor):
def collect_data_from_list(self, data, list_key, copy=True, **kwargs):
if list_key == []:
raise ValueError(about_cstring(
"ERROR: key == [] is an unsupported key!"))
raise ValueError("key == [] is an unsupported key!")
local_list_key = self._advanced_index_decycler(list_key)
local_result = data[local_list_key]
global_result = distributed_data_object(
......@@ -1184,8 +1098,7 @@ class _slicing_distributor(distributor):
# if the index is still negative, or it is greater than
# global_length the index is ill-choosen
if zeroth_key < 0 or zeroth_key >= global_length:
raise ValueError(about_cstring(
"ERROR: Index out of bounds!"))
raise ValueError("Index out of bounds!")
# shift the index
local_zeroth_key = zeroth_key - shift
# if the index lies within the local nodes' data-range
......@@ -1209,8 +1122,7 @@ class _slicing_distributor(distributor):
# if there are still negative indices, or indices greater than
# global_length the indices are ill-choosen
if (zeroth_key < 0).any() or (zeroth_key >= global_length).any():
raise ValueError(about_cstring(
"ERROR: Index out of bounds!"))
raise ValueError("Index out of bounds!")
# shift the indices according to shift
shift_list = self.comm.allgather(shift)
local_zeroth_key_list = map(lambda z: zeroth_key - z, shift_list)
......@@ -1264,8 +1176,7 @@ class _slicing_distributor(distributor):
# if there are still negative indices, or indices greater than
# global_length the indices are ill-choosen
if (zeroth_key < 0).any() or (zeroth_key >= global_length).any():
raise ValueError(about_cstring(
"ERROR: Index out of bounds!"))
raise ValueError("Index out of bounds!")
# shift the indices according to shift
local_zeroth_key = zeroth_key - shift
# discard all entries where the indices are negative or larger
......@@ -1483,9 +1394,8 @@ class _slicing_distributor(distributor):
if np.all(global_matchQ):
extracted_data = data_object[:]
else:
raise ValueError(about_cstring(
"ERROR: supplied shapes do neither match globally " +
"nor locally"))
raise ValueError("supplied shapes do neither match globally "
"nor locally")
# if shape-casting was successfull, extract the data
else:
......@@ -1576,8 +1486,7 @@ class _slicing_distributor(distributor):
# if this does not work, try to broadcast the shape
# check if the dimensions match
if len(self.global_shape) != len(foreign.shape):
raise ValueError(
about_cstring("ERROR: unequal number of dimensions!"))
raise ValueError("unequal number of dimensions!")
# check direct matches
direct_match = (np.array(self.global_shape) == np.array(foreign.shape))
# check broadcast compatibility
......@@ -1586,8 +1495,7 @@ class _slicing_distributor(distributor):
# combine the matches and assert that all are true
combined_match = (direct_match | broadcast_match)
if not np.all(combined_match):
raise ValueError(
about_cstring("ERROR: incompatible shapes!"))
raise ValueError("incompatible shapes!")
matching_dimensions = tuple(direct_match)
return (foreign, matching_dimensions)
......@@ -1978,9 +1886,8 @@ class _slicing_distributor(distributor):
f[alias]
# if yes, and overwriteQ is set to False, raise an Error
if overwriteQ is False:
raise ValueError(about_cstring(
"ERROR: overwriteQ is False, but alias already " +
"in use!"))
raise ValueError("overwriteQ is False, but alias already "
"in use!")
else: # if yes, remove the existing dataset
del f[alias]
except(KeyError):
......@@ -2007,14 +1914,12 @@ class _slicing_distributor(distributor):
dset = f[alias]
# check shape
if dset.shape != self.global_shape:
raise TypeError(about_cstring(
"ERROR: The shape of the given dataset does not match " +
"the distributed_data_object."))
raise TypeError("The shape of the given dataset does not match "
"the distributed_data_object.")
# check dtype
if dset.dtype != self.dtype:
raise TypeError(about_cstring(
"ERROR: The datatype of the given dataset does not " +
"match the one of the distributed_data_object."))
raise TypeError("The datatype of the given dataset does not "
"match the one of the distributed_data_object.")
# if everything seems to fit, load the data
data = dset[self.local_start:self.local_end]
# close the file
......@@ -2030,9 +1935,9 @@ class _slicing_distributor(distributor):
def get_axes_local_distribution_strategy(self, axes):
if 0 in axes:
if self.distribution_strategy in STRATEGIES['local']:
raise ValueError(about_cstring(
"ERROR: axes_local_distribution_strategy is not uniquely "
"defined for local-type distribution strategies."))
raise ValueError("axes_local_distribution_strategy is not "
"uniquely defined for local-type "
"distribution strategies.")
else:
return self.distribution_strategy
else:
......@@ -2083,9 +1988,8 @@ def _freeform_slicer(comm, local_shape):
cleared_set.discard(())
if len(cleared_set) > 1:
raise ValueError(about_cstring("ERROR: All but the first " +
"dimensions of local_shape " +
"must be the same!"))
raise ValueError("All but the first dimensions of local_shape "
"must be the same!")
if local_shape == ():
first_shape_index = 0
else:
......@@ -2172,43 +2076,6 @@ class _not_distributor(distributor):
return result_object
# def distribute_data(self, data, alias=None, path=None, copy=True,
# **kwargs):
# if 'h5py' in gdi and alias is not None:
# data = self.load_data(alias=alias, path=path)
#
# if data is None:
# return np.empty(self.global_shape, dtype=self.dtype)
# elif np.isscalar(data):
# return np.ones(self.global_shape, dtype=self.dtype)*data
# copy = False
# elif isinstance(data, np.ndarray) or \
# isinstance(data, distributed_data_object):
# data = self.extract_local_data(data)
# result_data = np.empty(self.local_shape, dtype=self.dtype)
# result_data[:] = data
# return result_data
#
# else:
# new_data = np.array(data)
# return new_data.astype(self.dtype,
# copy=copy).reshape(self.global_shape)
#
#
## if data is None:
## return np.empty(self.global_shape, dtype=self.dtype)
## elif isinstance(data, distributed_data_object):
## new_data = data.get_full_data()
## elif isinstance(data, np.ndarray):
## new_data = data
## elif np.isscalar(data):
## new_data = np.ones(self.global_shape, dtype=self.dtype)*data
## copy = False
## else:
## new_data = np.array(data)
## return new_data.astype(self.dtype,
## copy=copy).reshape(self.global_shape)
def _disperse_data_primitive(self, data, to_key, data_update, from_key,
copy, to_found, to_found_boolean, from_found,
from_found_boolean, **kwargs):
......@@ -2332,9 +2199,8 @@ class _not_distributor(distributor):
f[alias]
# if yes, and overwriteQ is set to False, raise an Error
if overwriteQ is False:
raise ValueError(about_cstring(
"ERROR: overwriteQ == False, but alias already " +
"in use!"))
raise ValueError("overwriteQ == False, but alias already "
"in use!")
else: # if yes, remove the existing dataset
del f[alias]
except(KeyError):
......@@ -2362,14 +2228,12 @@ class _not_distributor(distributor):
dset = f[alias]
# check shape
if dset.shape != self.global_shape:
raise TypeError(about_cstring(
"ERROR: The shape of the given dataset does not match " +
"the distributed_data_object."))
raise TypeError("The shape of the given dataset does not match "
"the distributed_data_object.")
# check dtype
if dset.dtype != self.dtype:
raise TypeError(about_cstring(
"ERROR: The datatype of the given dataset does not " +
"match the distributed_data_object."))
raise TypeError("The datatype of the given dataset does not "
"match the distributed_data_object.")
# if everything seems to fit, load the data
data = dset[:]
# close the file
......
......@@ -20,4 +20,4 @@
# 1) we don't load dependencies by storing it in __init__.py
# 2) we can import it in setup.py for the same reason
# 3) we can import it into your module module
__version__ = '1.0.8'
__version__ = '1.1.0'
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment