Commit 2e1a1b7a authored by theos's avatar theos

Initial commit.

parents
Pipeline #3783 skipped
# custom
setup.cfg
.idea
# from https://github.com/github/gitignore/blob/master/Python.gitignore
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# IPython Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# dotenv
.env
# virtualenv
venv/
ENV/
# Spyder project settings
.spyderproject
# Rope project settings
.ropeproject
\ No newline at end of file
D2O is a Python module for cluster-distributed multi-dimensional numerical
arrays. It can be regarded as a layer of abstraction between abstract algorithm
code and actual data distribution logic. The main goal is to achieve usability
without losing numerical performance and scalability. Therefore D2O's global
interface is similar to the one of a numpy.ndarray, whereas the cluster node's
local data is directly accessible, allowing usage in specialized numerical
high-performance modules. D2O is written in Python which makes it portable and
easy to use and modify, whereas expensive operations are carried out by
dedicated external libraries like numpy and mpi4py. Performance-wise in most of
the serial cases, D2O is comparable to numpy's ndarray but is superior when
moving to an MPI cluster.
# -*- coding: utf-8 -*-
from __future__ import division
from distributed_data_object import distributed_data_object
from d2o_librarian import d2o_librarian
from strategies import STRATEGIES
\ No newline at end of file
# -*- coding: utf-8 -*-
import numpy as np
from nifty import about
def cast_axis_to_tuple(axis):
if axis is None:
return None
try:
axis = tuple([int(item) for item in axis])
except(TypeError):
if np.isscalar(axis):
axis = (int(axis), )
else:
raise TypeError(about._errors.cstring(
"ERROR: Could not convert axis-input to tuple of ints"))
return axis
# -*- coding: utf-8 -*-
from d2o_config import dependency_injector,\
configuration
# -*- coding: utf-8 -*-
import keepers
# Setup the dependency injector
dependency_injector = keepers.DependencyInjector(
[('mpi4py.MPI', 'MPI'),
('mpi_dummy', 'MPI_dummy')]
)
dependency_injector.register('pyfftw', lambda z: hasattr(z, 'FFTW_MPI'))
# Initialize the variables
variable_mpi_module = keepers.Variable('mpi_module',
['MPI', 'MPI_dummy'],
lambda z: z in dependency_injector)
variable_default_distribution_strategy = keepers.Variable(
'default_distribution_strategy',
['fftw', 'equal', 'not'],
lambda z: (('pyfftw' in dependency_injector)
if (z == 'fftw') else True)
)
variable_mpi_init_checks = keepers.Variable('mpi_init_checks',
[True, False],
lambda z: isinstance(z, bool),
genus='boolean')
# Construct the configuration object
configuration = keepers.get_Configuration(
'D2O',
[variable_mpi_module,
variable_default_distribution_strategy,
variable_mpi_init_checks])
\ No newline at end of file
# -*- coding: utf-8 -*-
import numpy as np
class d2o_iter(object):
def __init__(self, d2o):
self.d2o = d2o
self.i = 0
self.n = np.prod(self.d2o.shape)
self.initialize_current_local_data()
def __iter__(self):
return self
def next(self):
if self.n == 0:
raise StopIteration()
self.update_current_local_data()
if self.i < self.n:
i = self.i
self.i += 1
return self.current_local_data[i]
else:
raise StopIteration()
def initialize_current_local_data(self):
raise NotImplementedError
def update_current_local_data(self):
raise NotImplementedError
class d2o_not_iter(d2o_iter):
def initialize_current_local_data(self):
self.current_local_data = self.d2o.data.flatten()
def update_current_local_data(self):
pass
class d2o_slicing_iter(d2o_iter):
def __init__(self, d2o):
self.d2o = d2o
self.i = 0
self.n = np.prod(self.d2o.shape)
self.local_dim_offset_list = \
self.d2o.distributor.all_local_slices[:, 4]
self.active_node = None
self.initialize_current_local_data()
def initialize_current_local_data(self):
self.update_current_local_data()
def update_current_local_data(self):
new_active_node = np.searchsorted(self.local_dim_offset_list,
self.i,
side='right')-1
# new_active_node = min(new_active_node, self.d2o.comm.size-1)
if self.active_node != new_active_node:
self.active_node = new_active_node
self.current_local_data = self.d2o.comm.bcast(
self.d2o.get_local_data().flatten(),
root=self.active_node)
# -*- coding: utf-8 -*-
from weakref import WeakValueDictionary as weakdict
class _d2o_librarian(object):
def __init__(self):
self.library = weakdict()
self.counter = 0
def register(self, d2o):
self.counter += 1
self.library[self.counter] = d2o
return self.counter
def __getitem__(self, key):
return self.library[key]
d2o_librarian = _d2o_librarian()
This diff is collapsed.
This diff is collapsed.
# -*- coding: utf-8 -*-
import numpy as np
from nifty.keepers import global_configuration as gc,\
global_dependency_injector as gdi
MPI = gdi[gc['mpi_module']]
class _dtype_converter(object):
"""
NIFTY class for dtype conversion between python/numpy dtypes and MPI
dtypes.
"""
def __init__(self):
pre_dict = [
# [, MPI_CHAR],
# [, MPI_SIGNED_CHAR],
# [, MPI_UNSIGNED_CHAR],
[np.dtype('bool'), MPI.BYTE],
[np.dtype('int16'), MPI.SHORT],
[np.dtype('uint16'), MPI.UNSIGNED_SHORT],
[np.dtype('uint32'), MPI.UNSIGNED_INT],
[np.dtype('int32'), MPI.INT],
[np.dtype('int'), MPI.LONG],
[np.dtype(np.long), MPI.LONG],
[np.dtype('int64'), MPI.LONG_LONG],
[np.dtype('longlong'), MPI.LONG],
[np.dtype('uint'), MPI.UNSIGNED_LONG],
[np.dtype('uint64'), MPI.UNSIGNED_LONG_LONG],
[np.dtype('ulonglong'), MPI.UNSIGNED_LONG_LONG],
[np.dtype('float32'), MPI.FLOAT],
[np.dtype('float64'), MPI.DOUBLE],
[np.dtype('float128'), MPI.LONG_DOUBLE],
[np.dtype('complex64'), MPI.COMPLEX],
[np.dtype('complex128'), MPI.DOUBLE_COMPLEX]]
to_mpi_pre_dict = np.array(pre_dict)
to_mpi_pre_dict[:, 0] = map(self.dictionize_np, to_mpi_pre_dict[:, 0])
self._to_mpi_dict = dict(to_mpi_pre_dict)
to_np_pre_dict = np.array(pre_dict)[:, ::-1]
to_np_pre_dict[:, 0] = map(self.dictionize_mpi, to_np_pre_dict[:, 0])
self._to_np_dict = dict(to_np_pre_dict)
def dictionize_np(self, x):
dic = x.type.__dict__.items()
if x.type is np.float:
dic[24] = 0
dic[29] = 0
dic[37] = 0
return frozenset(dic)
def dictionize_mpi(self, x):
return x.name
def to_mpi(self, dtype):
return self._to_mpi_dict[self.dictionize_np(dtype)]
def to_np(self, dtype):
return self._to_np_dict[self.dictionize_mpi(dtype)]
def known_mpi_Q(self, dtype):
return (self.dictionize_mpi(dtype) in self._to_np_dict)
def known_np_Q(self, dtype):
return (self.dictionize_np(np.dtype(dtype)) in self._to_mpi_dict)
dtype_converter = _dtype_converter()
# -*- coding: utf-8 -*-
import mpi_dummy
\ No newline at end of file
# -*- coding: utf-8 -*-
import copy
import numpy as np
class Op(object):
@classmethod
def Create(cls, function, commute=False):
pass
MIN = Op()
MAX = Op()
SUM = Op()
PROD = Op()
LAND = Op()
LOR = Op()
BAND = Op()
BOR = Op()
class Comm(object):
pass
class Intracomm(Comm):
def __init__(self, name):
if not running_single_threadedQ():
raise RuntimeError("ERROR: MPI_dummy module is running in a " +
"mpirun with n>1.")
self.name = name
self.rank = 0
self.size = 1
def Get_rank(self):
return self.rank
def Get_size(self):
return self.size
def _scattergather_helper(self, sendbuf, recvbuf=None, **kwargs):
sendbuf = self._unwrapper(sendbuf)
recvbuf = self._unwrapper(recvbuf)
if recvbuf is not None:
recvbuf[:] = sendbuf
return recvbuf
else:
recvbuf = np.copy(sendbuf)
return recvbuf
def bcast(self, sendbuf, *args, **kwargs):
return sendbuf
def Bcast(self, sendbuf, *args, **kwargs):
return sendbuf
def scatter(self, sendbuf, *args, **kwargs):
return sendbuf[0]
def Scatter(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def Scatterv(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def gather(self, sendbuf, *args, **kwargs):
return [sendbuf]
def Gather(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def Gatherv(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def allgather(self, sendbuf, *args, **kwargs):
return [sendbuf]
def Allgather(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def Allgatherv(self, *args, **kwargs):
return self._scattergather_helper(*args, **kwargs)
def Allreduce(self, sendbuf, recvbuf, op, **kwargs):
sendbuf = self._unwrapper(sendbuf)
recvbuf = self._unwrapper(recvbuf)
recvbuf[:] = sendbuf
return recvbuf
def allreduce(self, sendobj, op=SUM, **kwargs):
if np.isscalar(sendobj):
return sendobj
return copy.copy(sendobj)
def sendrecv(self, sendobj, **kwargs):
return sendobj
def _unwrapper(self, x):
if isinstance(x, list):
return x[0]
else:
return x
def Barrier(self):
pass
class _datatype():
def __init__(self, name):
self.name = str(name)
def running_single_threadedQ():
try:
from mpi4py import MPI
except ImportError:
return True
else:
if MPI.COMM_WORLD.size != 1:
return False
else:
return True
BYTE = _datatype('MPI_BYTE')
SHORT = _datatype('MPI_SHORT')
UNSIGNED_SHORT = _datatype("MPI_UNSIGNED_SHORT")
UNSIGNED_INT = _datatype("MPI_UNSIGNED_INT")
INT = _datatype("MPI_INT")
LONG = _datatype("MPI_LONG")
UNSIGNED_LONG = _datatype("MPI_UNSIGNED_LONG")
LONG_LONG = _datatype("MPI_LONG_LONG")
UNSIGNED_LONG_LONG = _datatype("MPI_UNSIGNED_LONG_LONG")
FLOAT = _datatype("MPI_FLOAT")
DOUBLE = _datatype("MPI_DOUBLE")
LONG_DOUBLE = _datatype("MPI_LONG_DOUBLE")
COMPLEX = _datatype("MPI_COMPLEX")
DOUBLE_COMPLEX = _datatype("MPI_DOUBLE_COMPLEX")
class _comm_wrapper(Intracomm):
def __init__(self, name):
self.cache = None
self.name = name
self.size = 1
self.rank = 0
@property
def comm(self):
if self.cache is None:
self.cache = Intracomm(self.name)
return self.cache
def __getattr__(self, x):
return self.comm.__getattribute__(x)
COMM_WORLD = _comm_wrapper('MPI_dummy_COMM_WORLD')
#COMM_WORLD.__class__ = COMM_WORLD.comm.__class__
# -*- coding: utf-8 -*-
from nifty.keepers import global_dependency_injector as gdi
pyfftw = gdi.get('pyfftw')
_maybe_fftw = ['fftw'] if ('pyfftw' in gdi) else []
STRATEGIES = {
'all': ['not', 'equal', 'freeform'] + _maybe_fftw,
'global': ['not', 'equal'] + _maybe_fftw,
'local': ['freeform'],
'slicing': ['equal', 'freeform'] + _maybe_fftw,
'not': ['not'],
'hdf5': ['equal'] + _maybe_fftw,
}
# -*- coding: utf-8 -*-
import numpy as np
from nifty.keepers import global_configuration as gc,\
global_dependency_injector as gdi
MPI = gdi[gc['mpi_module']]
custom_MIN = MPI.Op.Create(lambda x, y, datatype:
np.amin(np.vstack((x, y)), axis=0)
if isinstance(x, np.ndarray) else
min(x, y))
custom_MAX = MPI.Op.Create(lambda x, y, datatype:
np.amax(np.vstack((x, y)), axis=0)
if isinstance(x, np.ndarray) else
max(x, y))
custom_NANMIN = MPI.Op.Create(lambda x, y, datatype:
np.nanmin(np.vstack((x, y)), axis=0))
custom_NANMAX = MPI.Op.Create(lambda x, y, datatype:
np.nanmax(np.vstack((x, y)), axis=0))
custom_UNIQUE = MPI.Op.Create(lambda x, y, datatype:
np.unique(np.concatenate([x, y])))
op_translate_dict = {}
# the value tuple contains the operator and a boolean which specifies
# if the operator is compatible to buffers (for Allreduce instead of allreduce)
op_translate_dict[np.sum] = (MPI.SUM, True)
op_translate_dict[np.prod] = (MPI.PROD, True)
op_translate_dict[np.amin] = (custom_MIN, False)
op_translate_dict[np.amax] = (custom_MAX, False)
op_translate_dict[np.all] = (MPI.BAND, True)
op_translate_dict[np.any] = (MPI.BOR, True)
op_translate_dict[np.nanmin] = (custom_NANMIN, False)
op_translate_dict[np.nanmax] = (custom_NANMAX, False)
op_translate_dict[np.unique] = (custom_UNIQUE, False)
# -*- coding: utf-8 -*-
import os
from setuptools import setup
# Utility function to read the README file.
# Used for the long_description. It's nice, because now 1) we have a top level
# README file and 2) it's easier to type in the README file than to put a raw
# string in below ...
def read(fname):
return open(os.path.join(os.path.dirname(__file__), fname)).read()
setup(
name = "D2O",
version = "1.0.0rc1",
author = "Theo Steininger",
author_email = "theos@mpa-garching.mpg.de",
description = ("A distributed data object for parallel high-performance "
"computing in Python"),
license = "",
keywords = "parallelization, numerics, MPI",
url = "https://gitlab.mpcdf.mpg.de/ift/D2O",
packages=['d2o', 'd2o.config', 'd2o.mpi_dummy', 'tests'],
zip_safe=False,
dependency_links = [
"git+https://gitlab.mpcdf.mpg.de/ift/keepers.git#egg=keepers"],
install_requires = ["keepers"],
long_description=read('README'),
classifiers=[
"Development Status :: 4 - Beta",
"Topic :: Utilities",
# "License :: OSI Approved :: BSD License",
],
)
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment