Commit f1057ce0 authored by theos's avatar theos
Browse files

Moved keepers and d2o into separate modules.

parent d06ba788
Pipeline #3785 skipped
......@@ -26,18 +26,20 @@ mpl.use('Agg')
import dummys
from keepers import about,\
global_dependency_injector,\
global_configuration
# it is important to import config before d2o such that NIFTy is able to
# pre-create d2o's configuration object with the corrected path
from config import about,\
dependency_injector,\
nifty_configuration,\
d2o_configuration
from d2o import distributed_data_object, d2o_librarian
from nifty_cmaps import ncmap
from nifty_core import space,\
point_space,\
field
from d2o import distributed_data_object, d2o_librarian
from nifty_random import random
from nifty_simple_math import *
from nifty_utilities import *
......
# -*- coding: utf-8 -*-
from nifty_about import *
from nifty_config import dependency_injector,\
nifty_configuration
from d2o_config import d2o_configuration
# -*- coding: utf-8 -*-
import os
import keepers
# pre-create the D2O configuration instance and set its path explicitly
d2o_configuration = keepers.get_Configuration(
'D2O',
path=os.path.expanduser('~') + "/.nifty/d2o_config")
......@@ -2,92 +2,89 @@
import os
from nifty_dependency_injector import dependency_injector
from nifty_configuration import variable,\
configuration
import keepers
global_dependency_injector = dependency_injector(
['h5py',
('mpi4py.MPI', 'MPI'),
('nifty.dummys.MPI_dummy', 'MPI_dummy'),
# Setup the dependency injector
dependency_injector = keepers.DependencyInjector(
[('mpi4py.MPI', 'MPI'),
'h5py',
'gfft',
('nifty.dummys.gfft_dummy', 'gfft_dummy'),
'healpy',
'libsharp_wrapper_gl'])
global_dependency_injector.register('pyfftw', lambda z: hasattr(z, 'FFTW_MPI'))
dependency_injector.register('pyfftw', lambda z: hasattr(z, 'FFTW_MPI'))
variable_fft_module = variable('fft_module',
['pyfftw', 'gfft', 'gfft_dummy'],
lambda z: z in global_dependency_injector)
# Initialize the variables
variable_fft_module = keepers.Variable(
'fft_module',
['pyfftw', 'gfft', 'gfft_dummy'],
lambda z: z in dependency_injector)
# gl_space needs libsharp
variable_lm2gl = variable('lm2gl',
variable_lm2gl = keepers.Variable(
'lm2gl',
[True, False],
lambda z: (('libsharp_wrapper_gl' in
global_dependency_injector)
dependency_injector)
if z else True) and isinstance(z, bool),
genus='boolean')
variable_use_healpy = variable(
variable_use_healpy = keepers.Variable(
'use_healpy',
[True, False],
lambda z: (('healpy' in global_dependency_injector)
lambda z: (('healpy' in dependency_injector)
if z else True) and isinstance(z, bool),
genus='boolean')
variable_use_libsharp = variable('use_libsharp',
variable_use_libsharp = keepers.Variable(
'use_libsharp',
[True, False],
lambda z: (('libsharp_wrapper_gl' in
global_dependency_injector)
dependency_injector)
if z else True) and
isinstance(z, bool),
genus='boolean')
variable_verbosity = variable('verbosity',
[1],
lambda z: z == abs(int(z)),
genus='int')
variable_mpi_module = variable('mpi_module',
['MPI', 'MPI_dummy'],
lambda z: z in global_dependency_injector)
variable_verbosity = keepers.Variable('verbosity',
[1],
lambda z: z == abs(int(z)),
genus='int')
variable_default_distribution_strategy = variable(
'default_distribution_strategy',
['fftw', 'equal', 'not'],
lambda z: (('pyfftw' in global_dependency_injector)
if (z == 'fftw') else True)
)
variable_d2o_init_checks = variable('d2o_init_checks',
[True, False],
lambda z: isinstance(z, bool),
genus='boolean')
global_configuration = configuration(
nifty_configuration = keepers.get_Configuration(
'NIFTy',
[variable_fft_module,
variable_lm2gl,
variable_use_healpy,
variable_use_libsharp,
variable_verbosity,
variable_mpi_module,
variable_default_distribution_strategy,
variable_d2o_init_checks
],
path=os.path.expanduser('~') + "/.nifty/global_config")
path=os.path.expanduser('~') + "/.nifty/nifty_config")
########
### Compatibility variables
########
variable_mpi_module = keepers.Variable('mpi_module',
['MPI'],
lambda z: z in dependency_injector)
nifty_configuration.register(variable_mpi_module)
variable_default_comm = variable(
# register the default comm variable as the 'mpi_module' variable is now
# available
variable_default_comm = keepers.Variable(
'default_comm',
['COMM_WORLD'],
lambda z: hasattr(global_dependency_injector[
global_configuration['mpi_module']], z))
lambda z: hasattr(dependency_injector[
nifty_configuration['mpi_module']], z))
nifty_configuration.register(variable_default_comm)
global_configuration.register(variable_default_comm)
########
########
try:
global_configuration.load()
nifty_configuration.load()
except:
pass
# -*- coding: utf-8 -*-
from __future__ import division
from distributed_data_object import distributed_data_object
from d2o_librarian import d2o_librarian
from strategies import STRATEGIES
\ No newline at end of file
# -*- coding: utf-8 -*-
import numpy as np
from nifty import about
def cast_axis_to_tuple(axis):
if axis is None:
return None
try:
axis = tuple([int(item) for item in axis])
except(TypeError):
if np.isscalar(axis):
axis = (int(axis), )
else:
raise TypeError(about._errors.cstring(
"ERROR: Could not convert axis-input to tuple of ints"))
return axis
# -*- coding: utf-8 -*-
import numpy as np
class d2o_iter(object):
def __init__(self, d2o):
self.d2o = d2o
self.i = 0
self.n = np.prod(self.d2o.shape)
self.initialize_current_local_data()
def __iter__(self):
return self
def next(self):
if self.n == 0:
raise StopIteration()
self.update_current_local_data()
if self.i < self.n:
i = self.i
self.i += 1
return self.current_local_data[i]
else:
raise StopIteration()
def initialize_current_local_data(self):
raise NotImplementedError
def update_current_local_data(self):
raise NotImplementedError
class d2o_not_iter(d2o_iter):
def initialize_current_local_data(self):
self.current_local_data = self.d2o.data.flatten()
def update_current_local_data(self):
pass
class d2o_slicing_iter(d2o_iter):
def __init__(self, d2o):
self.d2o = d2o
self.i = 0
self.n = np.prod(self.d2o.shape)
self.local_dim_offset_list = \
self.d2o.distributor.all_local_slices[:, 4]
self.active_node = None
self.initialize_current_local_data()
def initialize_current_local_data(self):
self.update_current_local_data()
def update_current_local_data(self):
new_active_node = np.searchsorted(self.local_dim_offset_list,
self.i,
side='right')-1
# new_active_node = min(new_active_node, self.d2o.comm.size-1)
if self.active_node != new_active_node:
self.active_node = new_active_node
self.current_local_data = self.d2o.comm.bcast(
self.d2o.get_local_data().flatten(),
root=self.active_node)
# -*- coding: utf-8 -*-
from weakref import WeakValueDictionary as weakdict
class _d2o_librarian(object):
def __init__(self):
self.library = weakdict()
self.counter = 0
def register(self, d2o):
self.counter += 1
self.library[self.counter] = d2o
return self.counter
def __getitem__(self, key):
return self.library[key]
d2o_librarian = _d2o_librarian()
This diff is collapsed.
This diff is collapsed.
# -*- coding: utf-8 -*-
import numpy as np
from nifty.keepers import global_configuration as gc,\
global_dependency_injector as gdi
MPI = gdi[gc['mpi_module']]
class _dtype_converter(object):
"""
NIFTY class for dtype conversion between python/numpy dtypes and MPI
dtypes.
"""
def __init__(self):
pre_dict = [
# [, MPI_CHAR],
# [, MPI_SIGNED_CHAR],
# [, MPI_UNSIGNED_CHAR],
[np.dtype('bool'), MPI.BYTE],
[np.dtype('int16'), MPI.SHORT],
[np.dtype('uint16'), MPI.UNSIGNED_SHORT],
[np.dtype('uint32'), MPI.UNSIGNED_INT],
[np.dtype('int32'), MPI.INT],
[np.dtype('int'), MPI.LONG],
[np.dtype(np.long), MPI.LONG],
[np.dtype('int64'), MPI.LONG_LONG],
[np.dtype('longlong'), MPI.LONG],
[np.dtype('uint'), MPI.UNSIGNED_LONG],
[np.dtype('uint64'), MPI.UNSIGNED_LONG_LONG],
[np.dtype('ulonglong'), MPI.UNSIGNED_LONG_LONG],
[np.dtype('float32'), MPI.FLOAT],
[np.dtype('float64'), MPI.DOUBLE],
[np.dtype('float128'), MPI.LONG_DOUBLE],
[np.dtype('complex64'), MPI.COMPLEX],
[np.dtype('complex128'), MPI.DOUBLE_COMPLEX]]
to_mpi_pre_dict = np.array(pre_dict)
to_mpi_pre_dict[:, 0] = map(self.dictionize_np, to_mpi_pre_dict[:, 0])
self._to_mpi_dict = dict(to_mpi_pre_dict)
to_np_pre_dict = np.array(pre_dict)[:, ::-1]
to_np_pre_dict[:, 0] = map(self.dictionize_mpi, to_np_pre_dict[:, 0])
self._to_np_dict = dict(to_np_pre_dict)
def dictionize_np(self, x):
dic = x.type.__dict__.items()
if x.type is np.float:
dic[24] = 0
dic[29] = 0
dic[37] = 0
return frozenset(dic)
def dictionize_mpi(self, x):
return x.name
def to_mpi(self, dtype):
return self._to_mpi_dict[self.dictionize_np(dtype)]
def to_np(self, dtype):
return self._to_np_dict[self.dictionize_mpi(dtype)]
def known_mpi_Q(self, dtype):
return (self.dictionize_mpi(dtype) in self._to_np_dict)
def known_np_Q(self, dtype):
return (self.dictionize_np(np.dtype(dtype)) in self._to_mpi_dict)
dtype_converter = _dtype_converter()
# -*- coding: utf-8 -*-
from nifty.keepers import global_dependency_injector as gdi
pyfftw = gdi.get('pyfftw')
_maybe_fftw = ['fftw'] if ('pyfftw' in gdi) else []
STRATEGIES = {
'all': ['not', 'equal', 'freeform'] + _maybe_fftw,
'global': ['not', 'equal'] + _maybe_fftw,
'local': ['freeform'],
'slicing': ['equal', 'freeform'] + _maybe_fftw,
'not': ['not'],
'hdf5': ['equal'] + _maybe_fftw,
}
# -*- coding: utf-8 -*-
import numpy as np
from nifty.keepers import global_configuration as gc,\
global_dependency_injector as gdi
MPI = gdi[gc['mpi_module']]
custom_MIN = MPI.Op.Create(lambda x, y, datatype:
np.amin(np.vstack((x, y)), axis=0)
if isinstance(x, np.ndarray) else
min(x, y))
custom_MAX = MPI.Op.Create(lambda x, y, datatype:
np.amax(np.vstack((x, y)), axis=0)
if isinstance(x, np.ndarray) else
max(x, y))
custom_NANMIN = MPI.Op.Create(lambda x, y, datatype:
np.nanmin(np.vstack((x, y)), axis=0))
custom_NANMAX = MPI.Op.Create(lambda x, y, datatype:
np.nanmax(np.vstack((x, y)), axis=0))
custom_UNIQUE = MPI.Op.Create(lambda x, y, datatype:
np.unique(np.concatenate([x, y])))
op_translate_dict = {}
# the value tuple contains the operator and a boolean which specifies
# if the operator is compatible to buffers (for Allreduce instead of allreduce)
op_translate_dict[np.sum] = (MPI.SUM, True)
op_translate_dict[np.prod] = (MPI.PROD, True)
op_translate_dict[np.amin] = (custom_MIN, False)
op_translate_dict[np.amax] = (custom_MAX, False)
op_translate_dict[np.all] = (MPI.BAND, True)
op_translate_dict[np.any] = (MPI.BOR, True)
op_translate_dict[np.nanmin] = (custom_NANMIN, False)
op_translate_dict[np.nanmax] = (custom_NANMAX, False)
op_translate_dict[np.unique] = (custom_UNIQUE, False)
# -*- coding: utf-8 -*-
import copy
import numpy as np
class Op(object):
@classmethod
def Create(cls, function, commute=False):
pass
MIN = Op()
MAX = Op()
SUM = Op()
PROD = Op()
LAND = Op()
LOR = Op()
BAND = Op()
BOR = Op()
class Comm(object):
pass
class Intracomm(Comm):
def __init__(self, name):
if not running_single_threadedQ():
raise RuntimeError("ERROR: MPI_dummy module is running in a " +
"mpirun with n>1.")
self.name = name
self.rank = 0
self.size = 1
def Get_rank(self):
return self.rank
def Get_size(self):
return self.size
def _scattergather_helper(self, sendbuf, recvbuf=None, **kwargs):
sendbuf = self._unwrapper(sendbuf)
recvbuf = self._unwrapper(recvbuf)
if recvbuf is not None:
recvbuf[:] = sendbuf
return recvbuf
else:
recvbuf = np.copy(sendbuf)
return recvbuf
def bcast(self, sendbuf, *args, **kwargs):
return sendbuf
def Bcast(self, sendbuf, *args, **kwargs):
return sendbuf
def scatter(self, sendbuf, *args, **kwargs):
return sendbuf[0]
def Scatter(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def Scatterv(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def gather(self, sendbuf, *args, **kwargs):
return [sendbuf]
def Gather(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def Gatherv(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def allgather(self, sendbuf, *args, **kwargs):
return [sendbuf]
def Allgather(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def Allgatherv(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def Allreduce(self, sendbuf, recvbuf, op, **kwargs):
sendbuf = self._unwrapper(sendbuf)
recvbuf = self._unwrapper(recvbuf)
recvbuf[:] = sendbuf
return recvbuf
def allreduce(self, sendobj, op=SUM, **kwargs):
if np.isscalar(sendobj):
return sendobj
return copy.copy(sendobj)
def sendrecv(self, sendobj, **kwargs):
return sendobj
def _unwrapper(self, x):
if isinstance(x, list):
return x[0]
else:
return x
def Barrier(self):
pass
class _datatype():
def __init__(self, name):
self.name = str(name)
def running_single_threadedQ():
try:
from mpi4py import MPI
except ImportError:
return True
else:
if MPI.COMM_WORLD.size != 1:
return False
else:
return True
BYTE = _datatype('MPI_BYTE')
SHORT = _datatype('MPI_SHORT')
UNSIGNED_SHORT = _datatype("MPI_UNSIGNED_SHORT")
UNSIGNED_INT = _datatype("MPI_UNSIGNED_INT")
INT = _datatype("MPI_INT")
LONG = _datatype("MPI_LONG")
UNSIGNED_LONG = _datatype("MPI_UNSIGNED_LONG")
LONG_LONG = _datatype("MPI_LONG_LONG")
UNSIGNED_LONG_LONG = _datatype("MPI_UNSIGNED_LONG_LONG")
FLOAT = _datatype("MPI_FLOAT")
DOUBLE = _datatype("MPI_DOUBLE")
LONG_DOUBLE = _datatype("MPI_LONG_DOUBLE")
COMPLEX = _datatype("MPI_COMPLEX")
DOUBLE_COMPLEX = _datatype("MPI_DOUBLE_COMPLEX")
class _comm_wrapper(Intracomm):
def __init__(self, name):
self.cache = None
self.name = name
self.size = 1
self.rank = 0
@property
def comm(self):
if self.cache is None:
self.cache = Intracomm(self.name)
return self.cache
def __getattr__(self, x):
return self.comm.__getattribute__(x)
COMM_WORLD = _comm_wrapper('MPI_dummy_COMM_WORLD')
#COMM_WORLD.__class__ = COMM_WORLD.comm.__class__
# -*- coding: utf-8 -*-
import gfft_dummy
import MPI_dummy
\ No newline at end of file
# -*- coding: utf-8 -*-
from nifty_about import *
from nifty_default_config import global_dependency_injector,\