nifty_mpi_data.py 109 KB
Newer Older
ultimanet's avatar
ultimanet committed
1
# -*- coding: utf-8 -*-
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# NIFTY (Numerical Information Field Theory) has been developed at the
# Max-Planck-Institute for Astrophysics.
#
# Copyright (C) 2015 Max-Planck-Society
#
# Author: Theo Steininger
# Project homepage: <http://www.mpa-garching.mpg.de/ift/nifty/>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
23


ultimanet's avatar
ultimanet committed
24
import numpy as np
Ultimanet's avatar
Ultimanet committed
25
from nifty_about import about
26
from weakref import WeakValueDictionary as weakdict
ultimanet's avatar
ultimanet committed
27

28
29
# initialize the 'FOUND-packages'-dictionary
FOUND = {}
ultimanet's avatar
ultimanet committed
30
try:
31
    from mpi4py import MPI
Ultima's avatar
Ultima committed
32
    FOUND['MPI'] = True
33
except(ImportError):
34
    import mpi_dummy as MPI
Ultima's avatar
Ultima committed
35
    FOUND['MPI'] = False
ultimanet's avatar
ultimanet committed
36
37
38

try:
    import pyfftw
Ultima's avatar
Ultima committed
39
    FOUND['pyfftw'] = True
40
except(ImportError):
Ultima's avatar
Ultima committed
41
    FOUND['pyfftw'] = False
ultimanet's avatar
ultimanet committed
42
43

try:
44
    import h5py
Ultima's avatar
Ultima committed
45
46
    FOUND['h5py'] = True
    FOUND['h5py_parallel'] = h5py.get_config().mpi
ultimanet's avatar
ultimanet committed
47
except(ImportError):
Ultima's avatar
Ultima committed
48
49
    FOUND['h5py'] = False
    FOUND['h5py_parallel'] = False
ultimanet's avatar
ultimanet committed
50
51


Ultima's avatar
Ultima committed
52
53
54
55
ALL_DISTRIBUTION_STRATEGIES = ['not', 'equal', 'fftw', 'freeform']
GLOBAL_DISTRIBUTION_STRATEGIES = ['not', 'equal', 'fftw']
LOCAL_DISTRIBUTION_STRATEGIES = ['freeform']
HDF5_DISTRIBUTION_STRATEGIES = ['equal', 'fftw']
56
57
58

COMM = MPI.COMM_WORLD

59

ultimanet's avatar
ultimanet committed
60
61
62
63
64
65
66
67
class distributed_data_object(object):
    """

        NIFTY class for distributed data

        Parameters
        ----------
        global_data : {tuple, list, numpy.ndarray} *at least 1-dimensional*
68
            Initial data which will be casted to a numpy.ndarray and then
ultimanet's avatar
ultimanet committed
69
70
71
72
73
74
            stored according to the distribution strategy. The global_data's
            shape overwrites global_shape.
        global_shape : tuple of ints, *optional*
            If no global_data is supplied, global_shape can be used to
            initialize an empty distributed_data_object
        dtype : type, *optional*
75
76
            If an explicit dtype is supplied, the given global_data will be
            casted to it.
ultimanet's avatar
ultimanet committed
77
        distribution_strategy : {'fftw' (default), 'not'}, *optional*
78
79
            Specifies the way, how global_data will be distributed to the
            individual nodes.
ultimanet's avatar
ultimanet committed
80
            'fftw' follows the distribution strategy of pyfftw.
81
82
            'not' does not distribute the data at all.

ultimanet's avatar
ultimanet committed
83
84
85
86
87
88
89
90
91
92

        Attributes
        ----------
        data : numpy.ndarray
            The numpy.ndarray in which the individual node's data is stored.
        dtype : type
            Data type of the data object.
        distribution_strategy : string
            Name of the used distribution_strategy
        distributor : distributor
93
94
            The distributor object which takes care of all distribution and
            consolidation of the data.
ultimanet's avatar
ultimanet committed
95
96
        shape : tuple of int
            The global shape of the data
97

ultimanet's avatar
ultimanet committed
98
99
        Raises
        ------
100
101
102
        TypeError :
            If the supplied distribution strategy is not known.

ultimanet's avatar
ultimanet committed
103
    """
104

105
    def __init__(self, global_data=None, global_shape=None, dtype=None,
Ultima's avatar
Ultima committed
106
                 local_data=None, local_shape=None,
107
                 distribution_strategy='fftw', hermitian=False,
108
109
110
111
112
                 alias=None, path=None, comm=MPI.COMM_WORLD,
                 copy=True, *args, **kwargs):

        # TODO: allow init with empty shape

113
114
115
116
        if isinstance(global_data, tuple) or isinstance(global_data, list):
            global_data = np.array(global_data, copy=False)
        if isinstance(local_data, tuple) or isinstance(local_data, list):
            local_data = np.array(local_data, copy=False)
117

118
        self.distributor = distributor_factory.get_distributor(
119
120
121
122
123
124
125
126
127
                                distribution_strategy=distribution_strategy,
                                comm=comm,
                                global_data=global_data,
                                global_shape=global_shape,
                                local_data=local_data,
                                local_shape=local_shape,
                                alias=alias,
                                path=path,
                                dtype=dtype,
128
                                **kwargs)
129

ultimanet's avatar
ultimanet committed
130
131
132
        self.distribution_strategy = distribution_strategy
        self.dtype = self.distributor.dtype
        self.shape = self.distributor.global_shape
Ultima's avatar
Ultima committed
133
134
        self.local_shape = self.distributor.local_shape
        self.comm = self.distributor.comm
135
136

        self.init_args = args
137
        self.init_kwargs = kwargs
138

Ultima's avatar
Ultima committed
139
        (self.data, self.hermitian) = self.distributor.initialize_data(
140
141
142
143
144
145
            global_data=global_data,
            local_data=local_data,
            alias=alias,
            path=path,
            hermitian=hermitian,
            copy=copy)
146
        self.index = d2o_librarian.register(self)
147

Ultimanet's avatar
Ultimanet committed
148
    def copy(self, dtype=None, distribution_strategy=None, **kwargs):
149
150
151
        temp_d2o = self.copy_empty(dtype=dtype,
                                   distribution_strategy=distribution_strategy,
                                   **kwargs)
Ultima's avatar
Ultima committed
152
        if distribution_strategy is None or \
153
                distribution_strategy == self.distribution_strategy:
Ultimanet's avatar
Ultimanet committed
154
155
            temp_d2o.set_local_data(self.get_local_data(), copy=True)
        else:
Ultima's avatar
Ultima committed
156
            temp_d2o.inject((slice(None),), self, (slice(None),))
157
        temp_d2o.hermitian = self.hermitian
158
        return temp_d2o
159
160

    def copy_empty(self, global_shape=None, local_shape=None, dtype=None,
161
                   distribution_strategy=None, **kwargs):
Ultima's avatar
Ultima committed
162
        if self.distribution_strategy == 'not' and \
163
164
165
166
167
168
                distribution_strategy in LOCAL_DISTRIBUTION_STRATEGIES and \
                local_shape is None:
            result = self.copy_empty(global_shape=global_shape,
                                     local_shape=local_shape,
                                     dtype=dtype,
                                     distribution_strategy='equal',
Ultima's avatar
Ultima committed
169
                                     **kwargs)
170
            return result.copy_empty(distribution_strategy='freeform')
171

Ultima's avatar
Ultima committed
172
        if global_shape is None:
173
            global_shape = self.shape
Ultima's avatar
Ultima committed
174
175
176
        if local_shape is None:
            local_shape = self.local_shape
        if dtype is None:
177
            dtype = self.dtype
Ultima's avatar
Ultima committed
178
        if distribution_strategy is None:
179
180
181
            distribution_strategy = self.distribution_strategy

        kwargs.update(self.init_kwargs)
182

183
184
185
186
187
188
189
190
        temp_d2o = distributed_data_object(
                                   global_shape=global_shape,
                                   local_shape=local_shape,
                                   dtype=dtype,
                                   distribution_strategy=distribution_strategy,
                                   comm=self.comm,
                                   *self.init_args,
                                   **kwargs)
191
        return temp_d2o
192

193
    def apply_scalar_function(self, function, inplace=False, dtype=None):
194
        remember_hermitianQ = self.hermitian
195

196
        if inplace is True:
Ultimanet's avatar
Ultimanet committed
197
            temp = self
Ultima's avatar
Ultima committed
198
            if dtype is not None and self.dtype != np.dtype(dtype):
199
200
201
                about.warnings.cprint(
                    "WARNING: Inplace dtype conversion is not possible!")

Ultimanet's avatar
Ultimanet committed
202
        else:
203
            temp = self.copy_empty(dtype=dtype)
Ultimanet's avatar
Ultimanet committed
204

Ultima's avatar
Ultima committed
205
        if np.prod(self.local_shape) != 0:
206
            try:
Ultima's avatar
Ultima committed
207
208
209
210
                temp.data[:] = function(self.data)
            except:
                temp.data[:] = np.vectorize(function)(self.data)
        else:
211
212
            # Noting to do here. The value-empty array
            # is also geometrically empty
Ultima's avatar
Ultima committed
213
            pass
214

215
216
217
218
        if function in (np.exp, np.log):
            temp.hermitian = remember_hermitianQ
        else:
            temp.hermitian = False
Ultimanet's avatar
Ultimanet committed
219
        return temp
220

Ultimanet's avatar
Ultimanet committed
221
222
223
    def apply_generator(self, generator):
        self.set_local_data(generator(self.distributor.local_shape))
        self.hermitian = False
224

ultimanet's avatar
ultimanet committed
225
226
    def __str__(self):
        return self.data.__str__()
227

ultimanet's avatar
ultimanet committed
228
    def __repr__(self):
229
        return '<distributed_data_object>\n' + self.data.__repr__()
230

231
    def _compare_helper(self, other, op):
232
        result = self.copy_empty(dtype=np.bool_)
233
234
        # Case 1: 'other' is a scalar
        # -> make point-wise comparison
Ultimanet's avatar
Ultimanet committed
235
        if np.isscalar(other):
236
            result.set_local_data(
237
                getattr(self.get_local_data(copy=False), op)(other))
238
            return result
Ultimanet's avatar
Ultimanet committed
239

240
241
        # Case 2: 'other' is a numpy array or a distributed_data_object
        # -> extract the local data and make point-wise comparison
Ultimanet's avatar
Ultimanet committed
242
        elif isinstance(other, np.ndarray) or\
243
                isinstance(other, distributed_data_object):
Ultimanet's avatar
Ultimanet committed
244
            temp_data = self.distributor.extract_local_data(other)
245
246
            result.set_local_data(
                getattr(self.get_local_data(copy=False), op)(temp_data))
Ultimanet's avatar
Ultimanet committed
247
            return result
248
249

        # Case 3: 'other' is None
Ultima's avatar
Ultima committed
250
        elif other is None:
Ultimanet's avatar
Ultimanet committed
251
            return False
252
253
254

        # Case 4: 'other' is something different
        # -> make a numpy casting and make a recursive call
Ultimanet's avatar
Ultimanet committed
255
256
        else:
            temp_other = np.array(other)
257
            return getattr(self, op)(temp_other)
258

259
260
    def __ne__(self, other):
        return self._compare_helper(other, '__ne__')
261

262
263
    def __lt__(self, other):
        return self._compare_helper(other, '__lt__')
264

265
266
267
268
269
270
    def __le__(self, other):
        return self._compare_helper(other, '__le__')

    def __eq__(self, other):

        return self._compare_helper(other, '__eq__')
271

272
273
274
275
276
277
    def __ge__(self, other):
        return self._compare_helper(other, '__ge__')

    def __gt__(self, other):
        return self._compare_helper(other, '__gt__')

Ultimanet's avatar
Ultimanet committed
278
    def equal(self, other):
Ultimanet's avatar
Ultimanet committed
279
280
281
282
283
284
285
286
287
        if other is None:
            return False
        try:
            assert(self.dtype == other.dtype)
            assert(self.shape == other.shape)
            assert(self.init_args == other.init_args)
            assert(self.init_kwargs == other.init_kwargs)
            assert(self.distribution_strategy == other.distribution_strategy)
            assert(np.all(self.data == other.data))
Ultimanet's avatar
Ultimanet committed
288
        except(AssertionError, AttributeError):
Ultimanet's avatar
Ultimanet committed
289
290
291
292
            return False
        else:
            return True

293
    def __pos__(self):
294
        temp_d2o = self.copy_empty()
295
        temp_d2o.set_local_data(data=self.get_local_data(), copy=True)
296
        return temp_d2o
297

ultimanet's avatar
ultimanet committed
298
    def __neg__(self):
299
        temp_d2o = self.copy_empty()
300
301
        temp_d2o.set_local_data(data=self.get_local_data().__neg__(),
                                copy=True)
ultimanet's avatar
ultimanet committed
302
        return temp_d2o
303

304
    def __abs__(self):
305
        # translate complex dtypes
306
307
308
309
310
311
        if self.dtype == np.dtype('complex64'):
            new_dtype = np.dtype('float32')
        elif self.dtype == np.dtype('complex128'):
            new_dtype = np.dtype('float64')
        elif issubclass(self.dtype.type, np.complexfloating):
            new_dtype = np.dtype('float')
Ultimanet's avatar
Ultimanet committed
312
313
        else:
            new_dtype = self.dtype
314
315
316
        temp_d2o = self.copy_empty(dtype=new_dtype)
        temp_d2o.set_local_data(data=self.get_local_data().__abs__(),
                                copy=True)
317
        return temp_d2o
318

Ultima's avatar
Ultima committed
319
320
321
322
323
    def _builtin_helper(self, operator, other, inplace=False):
        if isinstance(other, distributed_data_object):
            other_is_real = other.isreal()
        else:
            other_is_real = np.isreal(other)
324
325

        # Case 1: other is not a scalar
Ultimanet's avatar
Ultimanet committed
326
        if not (np.isscalar(other) or np.shape(other) == (1,)):
327
            try:
328
                hermitian_Q = (other.hermitian and self.hermitian)
329
330
            except(AttributeError):
                hermitian_Q = False
331
            # extract the local data from the 'other' object
Ultimanet's avatar
Ultimanet committed
332
333
            temp_data = self.distributor.extract_local_data(other)
            temp_data = operator(temp_data)
334
335

        # Case 2: other is a real scalar -> preserve hermitianity
Ultima's avatar
Ultima committed
336
337
        elif other_is_real or (self.dtype not in (np.dtype('complex128'),
                                                  np.dtype('complex256'))):
338
            hermitian_Q = self.hermitian
ultimanet's avatar
ultimanet committed
339
            temp_data = operator(other)
340
        # Case 3: other is complex
341
342
        else:
            hermitian_Q = False
343
344
            temp_data = operator(other)
        # write the new data into a new distributed_data_object
345
        if inplace is True:
346
347
            temp_d2o = self
        else:
348
            # use common datatype for self and other
349
            new_dtype = np.dtype(np.find_common_type((self.dtype,),
350
                                                     (temp_data.dtype,)))
351
            temp_d2o = self.copy_empty(
352
                dtype=new_dtype)
ultimanet's avatar
ultimanet committed
353
        temp_d2o.set_local_data(data=temp_data)
354
        temp_d2o.hermitian = hermitian_Q
ultimanet's avatar
ultimanet committed
355
        return temp_d2o
356

ultimanet's avatar
ultimanet committed
357
    def __add__(self, other):
Ultima's avatar
Ultima committed
358
        return self._builtin_helper(self.get_local_data().__add__, other)
ultimanet's avatar
ultimanet committed
359
360

    def __radd__(self, other):
Ultima's avatar
Ultima committed
361
        return self._builtin_helper(self.get_local_data().__radd__, other)
Ultimanet's avatar
Ultimanet committed
362
363

    def __iadd__(self, other):
364
        return self._builtin_helper(self.get_local_data().__iadd__,
365
366
                                    other,
                                    inplace=True)
Ultimanet's avatar
Ultimanet committed
367

ultimanet's avatar
ultimanet committed
368
    def __sub__(self, other):
Ultima's avatar
Ultima committed
369
        return self._builtin_helper(self.get_local_data().__sub__, other)
370

ultimanet's avatar
ultimanet committed
371
    def __rsub__(self, other):
Ultima's avatar
Ultima committed
372
        return self._builtin_helper(self.get_local_data().__rsub__, other)
373

ultimanet's avatar
ultimanet committed
374
    def __isub__(self, other):
375
        return self._builtin_helper(self.get_local_data().__isub__,
376
377
                                    other,
                                    inplace=True)
378

ultimanet's avatar
ultimanet committed
379
    def __div__(self, other):
Ultima's avatar
Ultima committed
380
        return self._builtin_helper(self.get_local_data().__div__, other)
381

382
383
    def __truediv__(self, other):
        return self.__div__(other)
384

ultimanet's avatar
ultimanet committed
385
    def __rdiv__(self, other):
Ultima's avatar
Ultima committed
386
        return self._builtin_helper(self.get_local_data().__rdiv__, other)
387

388
389
    def __rtruediv__(self, other):
        return self.__rdiv__(other)
ultimanet's avatar
ultimanet committed
390

Ultimanet's avatar
Ultimanet committed
391
    def __idiv__(self, other):
392
        return self._builtin_helper(self.get_local_data().__idiv__,
393
394
395
                                    other,
                                    inplace=True)

396
    def __itruediv__(self, other):
397
        return self.__idiv__(other)
398

ultimanet's avatar
ultimanet committed
399
    def __floordiv__(self, other):
400
        return self._builtin_helper(self.get_local_data().__floordiv__,
401
402
                                    other)

ultimanet's avatar
ultimanet committed
403
    def __rfloordiv__(self, other):
404
        return self._builtin_helper(self.get_local_data().__rfloordiv__,
405
406
                                    other)

Ultimanet's avatar
Ultimanet committed
407
    def __ifloordiv__(self, other):
Ultima's avatar
Ultima committed
408
        return self._builtin_helper(
409
410
            self.get_local_data().__ifloordiv__, other,
            inplace=True)
411

ultimanet's avatar
ultimanet committed
412
    def __mul__(self, other):
Ultima's avatar
Ultima committed
413
        return self._builtin_helper(self.get_local_data().__mul__, other)
414

ultimanet's avatar
ultimanet committed
415
    def __rmul__(self, other):
Ultima's avatar
Ultima committed
416
        return self._builtin_helper(self.get_local_data().__rmul__, other)
ultimanet's avatar
ultimanet committed
417
418

    def __imul__(self, other):
419
        return self._builtin_helper(self.get_local_data().__imul__,
420
421
                                    other,
                                    inplace=True)
Ultimanet's avatar
Ultimanet committed
422

ultimanet's avatar
ultimanet committed
423
    def __pow__(self, other):
Ultima's avatar
Ultima committed
424
        return self._builtin_helper(self.get_local_data().__pow__, other)
425

ultimanet's avatar
ultimanet committed
426
    def __rpow__(self, other):
Ultima's avatar
Ultima committed
427
        return self._builtin_helper(self.get_local_data().__rpow__, other)
ultimanet's avatar
ultimanet committed
428
429

    def __ipow__(self, other):
430
        return self._builtin_helper(self.get_local_data().__ipow__,
431
432
433
                                    other,
                                    inplace=True)

Ultima's avatar
Ultima committed
434
435
    def __mod__(self, other):
        return self._builtin_helper(self.get_local_data().__mod__, other)
436

Ultima's avatar
Ultima committed
437
    def __rmod__(self, other):
438
        return self._builtin_helper(self.get_local_data().__rmod__, other)
439

Ultima's avatar
Ultima committed
440
    def __imod__(self, other):
441
        return self._builtin_helper(self.get_local_data().__imod__,
442
443
444
                                    other,
                                    inplace=True)

445
446
    def __len__(self):
        return self.shape[0]
447

448
    def get_dim(self):
449
        return np.prod(self.shape)
450

451
    def vdot(self, other):
452
        other = self.distributor.extract_local_data(other)
453
454
455
456
        local_vdot = np.vdot(self.get_local_data(), other)
        local_vdot_list = self.distributor._allgather(local_vdot)
        global_vdot = np.sum(local_vdot_list)
        return global_vdot
Ultimanet's avatar
Ultimanet committed
457

ultimanet's avatar
ultimanet committed
458
    def __getitem__(self, key):
Ultima's avatar
Ultima committed
459
        return self.get_data(key)
460

ultimanet's avatar
ultimanet committed
461
462
    def __setitem__(self, key, data):
        self.set_data(data, key)
463

464
    def _contraction_helper(self, function, **kwargs):
Ultima's avatar
Ultima committed
465
466
467
468
469
470
471
        if np.prod(self.data.shape) == 0:
            local = 0
            include = False
        else:
            local = function(self.data, **kwargs)
            include = True

472
        local_list = self.distributor._allgather(local)
473
        local_list = np.array(local_list, dtype=np.dtype(local_list[0]))
Ultima's avatar
Ultima committed
474
475
476
        include_list = np.array(self.distributor._allgather(include))
        work_list = local_list[include_list]
        if work_list.shape[0] == 0:
477
            raise ValueError("ERROR: Zero-size array to reduction operation " +
Ultima's avatar
Ultima committed
478
                             "which has no identity")
479
        else:
Ultima's avatar
Ultima committed
480
481
            result = function(work_list, axis=0)
            return result
482

483
    def amin(self, **kwargs):
484
        return self._contraction_helper(np.amin, **kwargs)
485
486

    def nanmin(self, **kwargs):
487
        return self._contraction_helper(np.nanmin, **kwargs)
488

489
    def amax(self, **kwargs):
490
        return self._contraction_helper(np.amax, **kwargs)
491

492
    def nanmax(self, **kwargs):
493
        return self._contraction_helper(np.nanmax, **kwargs)
494

495
496
497
498
    def sum(self, **kwargs):
        return self._contraction_helper(np.sum, **kwargs)

    def prod(self, **kwargs):
499
500
        return self._contraction_helper(np.prod, **kwargs)

501
    def mean(self, power=1):
502
        # compute the local means and the weights for the mean-mean.
Ultima's avatar
Ultima committed
503
504
505
506
507
508
        if np.prod(self.data.shape) == 0:
            local_mean = 0
            include = False
        else:
            local_mean = np.mean(self.data**power)
            include = True
509

510
        local_weight = np.prod(self.data.shape)
511
        # collect the local means and cast the result to a ndarray
Ultima's avatar
Ultima committed
512
513
        local_mean_list = self.distributor._allgather(local_mean)
        local_weight_list = self.distributor._allgather(local_weight)
514

515
516
        local_mean_list = np.array(local_mean_list,
                                   dtype=np.dtype(local_mean_list[0]))
517
518
        local_weight_list = np.array(local_weight_list)
        # extract the parts from the non-empty nodes
Ultima's avatar
Ultima committed
519
520
521
522
523
        include_list = np.array(self.distributor._allgather(include))
        work_mean_list = local_mean_list[include_list]
        work_weight_list = local_weight_list[include_list]
        if work_mean_list.shape[0] == 0:
            raise ValueError("ERROR:  Mean of empty slice.")
524
525
        else:
            # compute the denominator for the weighted mean-mean
Ultima's avatar
Ultima committed
526
            global_weight = np.sum(work_weight_list)
527
            # compute the numerator
Ultima's avatar
Ultima committed
528
            numerator = np.sum(work_mean_list * work_weight_list)
529
            global_mean = numerator / global_weight
Ultima's avatar
Ultima committed
530
            return global_mean
531
532
533
534
535

    def var(self):
        mean_of_the_square = self.mean(power=2)
        square_of_the_mean = self.mean()**2
        return mean_of_the_square - square_of_the_mean
536

537
538
    def std(self):
        return np.sqrt(self.var())
539

Ultima's avatar
Ultima committed
540
541
542
543
544
545
546
    def argmin(self):
        if np.prod(self.data.shape) == 0:
            local_argmin = np.nan
            local_argmin_value = np.nan
            globalized_local_argmin = np.nan
        else:
            local_argmin = np.argmin(self.data)
547
            local_argmin_value = self.data[np.unravel_index(local_argmin,
Ultima's avatar
Ultima committed
548
                                                            self.data.shape)]
549

Ultima's avatar
Ultima committed
550
            globalized_local_argmin = self.distributor.globalize_flat_index(
551
552
553
554
                local_argmin)
        local_argmin_list = self.distributor._allgather(
                                                    (local_argmin_value,
                                                     globalized_local_argmin))
555
        local_argmin_list = np.array(local_argmin_list, dtype=[
556
557
            ('value', np.dtype('complex128')),
            ('index', np.dtype('float'))])
558
559
        local_argmin_list = np.sort(local_argmin_list,
                                    order=['value', 'index'])
Ultima's avatar
Ultima committed
560
        return np.int(local_argmin_list[0][1])
561

Ultima's avatar
Ultima committed
562
563
564
565
566
567
568
    def argmax(self):
        if np.prod(self.data.shape) == 0:
            local_argmax = np.nan
            local_argmax_value = np.nan
            globalized_local_argmax = np.nan
        else:
            local_argmax = np.argmax(self.data)
569
            local_argmax_value = -self.data[np.unravel_index(local_argmax,
570
                                                             self.data.shape)]
Ultima's avatar
Ultima committed
571
            globalized_local_argmax = self.distributor.globalize_flat_index(
572
573
574
575
                local_argmax)
        local_argmax_list = self.distributor._allgather(
                                                  (local_argmax_value,
                                                   globalized_local_argmax))
576
        local_argmax_list = np.array(local_argmax_list, dtype=[
577
578
            ('value', np.dtype('complex128')),
            ('index', np.dtype('float'))])
579
580
        local_argmax_list = np.sort(local_argmax_list,
                                    order=['value', 'index'])
Ultima's avatar
Ultima committed
581
        return np.int(local_argmax_list[0][1])
582

583
    def argmin_nonflat(self):
Ultima's avatar
Ultima committed
584
        return np.unravel_index(self.argmin(), self.shape)
585

Ultima's avatar
Ultima committed
586
587
    def argmax_nonflat(self):
        return np.unravel_index(self.argmax(), self.shape)
588

589
590
591
592
593
594
595
    def conjugate(self):
        temp_d2o = self.copy_empty()
        temp_data = np.conj(self.get_local_data())
        temp_d2o.set_local_data(temp_data)
        return temp_d2o

    def conj(self):
596
597
        return self.conjugate()

598
    def median(self):
599
        about.warnings.cprint(
600
601
602
            "WARNING: The current implementation of median is very expensive!")
        median = np.median(self.get_full_data())
        return median
603

604
    def iscomplex(self):
Ultima's avatar
Ultima committed
605
        temp_d2o = self.copy_empty(dtype=np.dtype('bool'))
606
607
        temp_d2o.set_local_data(np.iscomplex(self.data))
        return temp_d2o
608

609
    def isreal(self):
Ultima's avatar
Ultima committed
610
        temp_d2o = self.copy_empty(dtype=np.dtype('bool'))
611
612
        temp_d2o.set_local_data(np.isreal(self.data))
        return temp_d2o
613

614
615
616
617
618
619
620
621
    def all(self):
        local_all = np.all(self.get_local_data())
        global_all = self.distributor._allgather(local_all)
        return np.all(global_all)

    def any(self):
        local_any = np.any(self.get_local_data())
        global_any = self.distributor._allgather(local_any)
622
        return np.any(global_any)
623

624
625
626
627
628
    def unique(self):
        local_unique = np.unique(self.get_local_data())
        global_unique = self.distributor._allgather(local_unique)
        global_unique = np.concatenate(global_unique)
        return np.unique(global_unique)
629

630
    def bincount(self, weights=None, minlength=None):
631
        if self.dtype not in [np.dtype('int16'), np.dtype('int32'),
632
633
                              np.dtype('int64'),  np.dtype('uint16'),
                              np.dtype('uint32'), np.dtype('uint64')]:
634
            raise TypeError(about._errors.cstring(
635
636
                "ERROR: Distributed-data-object must be of integer datatype!"))

637
        minlength = max(self.amax() + 1, minlength)
638

639
640
        if weights is not None:
            local_weights = self.distributor.extract_local_data(weights).\
641
                flatten()
642
643
        else:
            local_weights = None
644

645
        local_counts = np.bincount(self.get_local_data().flatten(),
646
647
                                   weights=local_weights,
                                   minlength=minlength)
Ultima's avatar
Ultima committed
648
649
650
651
        if self.distribution_strategy == 'not':
            return local_counts
        else:
            list_of_counts = self.distributor._allgather(local_counts)
652
            counts = np.sum(list_of_counts, axis=0)
Ultima's avatar
Ultima committed
653
            return counts
654

Ultima's avatar
Ultima committed
655
656
    def where(self):
        return self.distributor.where(self.data)
657

658
    def set_local_data(self, data, hermitian=False, copy=True):
ultimanet's avatar
ultimanet committed
659
        """
660
            Stores data directly in the local data attribute. No distribution
ultimanet's avatar
ultimanet committed
661
662
663
664
665
            is done. The shape of the data must fit the local data attributes
            shape.

            Parameters
            ----------
666
            data : tuple, list, numpy.ndarray
ultimanet's avatar
ultimanet committed
667
                The data which should be stored in the local data attribute.
668

ultimanet's avatar
ultimanet committed
669
670
671
            Returns
            -------
            None
672

ultimanet's avatar
ultimanet committed
673
        """
Ultimanet's avatar
Ultimanet committed
674
        self.hermitian = hermitian
675
        if copy is True:
Ultima's avatar
Ultima committed
676
677
            self.data[:] = data
        else:
678
679
680
681
            self.data = np.array(data,
                                 dtype=self.dtype,
                                 copy=False,
                                 order='C').reshape(self.local_shape)
682

Ultima's avatar
Ultima committed
683
    def set_data(self, data, to_key, from_key=None, local_keys=False,
684
                 hermitian=False, copy=True, **kwargs):
ultimanet's avatar
ultimanet committed
685
        """
686
            Stores the supplied data in the region which is specified by key.
ultimanet's avatar
ultimanet committed
687
            The data is distributed according to the distribution strategy. If
688
            the individual nodes get different key-arguments. Their data is
ultimanet's avatar
ultimanet committed
689
            processed one-by-one.
690

ultimanet's avatar
ultimanet committed
691
692
            Parameters
            ----------
693
            data : tuple, list, numpy.ndarray
ultimanet's avatar
ultimanet committed
694
695
                The data which should be distributed.
            key : int, slice, tuple of int or slice
696
697
698
                The key is the object which specifies the region, where data
                will be stored in.

ultimanet's avatar
ultimanet committed
699
700
701
            Returns
            -------
            None
702

ultimanet's avatar
ultimanet committed
703
        """
Ultimanet's avatar
Ultimanet committed
704
        self.hermitian = hermitian
705
706
707
708
709
710
        self.distributor.disperse_data(data=self.data,
                                       to_key=to_key,
                                       data_update=data,
                                       from_key=from_key,
                                       local_keys=local_keys,
                                       copy=copy,
Ultima's avatar
Ultima committed
711
                                       **kwargs)
712

713
    def set_full_data(self, data, hermitian=False, copy=True, **kwargs):
ultimanet's avatar
ultimanet committed
714
        """
715
            Distributes the supplied data to the nodes. The shape of data must
ultimanet's avatar
ultimanet committed
716
            match the shape of the distributed_data_object.
717

ultimanet's avatar
ultimanet committed
718
719
            Parameters
            ----------
720
            data : tuple, list, numpy.ndarray
ultimanet's avatar
ultimanet committed
721
                The data which should be distributed.
722

ultimanet's avatar
ultimanet committed
723
724
            Notes
            -----
725
            set_full_data(foo) is equivalent to set_data(foo,slice(None)) but
ultimanet's avatar
ultimanet committed
726
            faster.
727

ultimanet's avatar
ultimanet committed
728
729
730
            Returns
            -------
            None
731

ultimanet's avatar
ultimanet committed
732
        """
Ultimanet's avatar
Ultimanet committed
733
        self.hermitian = hermitian
734
        self.data = self.distributor.distribute_data(data=data, copy=copy,
735
                                                     **kwargs)
ultimanet's avatar
ultimanet committed
736

Ultimanet's avatar
Ultimanet committed
737
    def get_local_data(self, key=(slice(None),), copy=True):
ultimanet's avatar
ultimanet committed
738
        """
739
740
            Loads data directly from the local data attribute. No consolidation
            is done.
ultimanet's avatar
ultimanet committed
741
742
743
744

            Parameters
            ----------
            key : int, slice, tuple of int or slice
745
746
                The key which will be used to access the data.

ultimanet's avatar
ultimanet committed
747
748
749
            Returns
            -------
            self.data[key] : numpy.ndarray
750

Ultimanet's avatar
Ultimanet committed
751
        """
752
        if copy is True:
753
            return self.data[key]
754
        if copy is False:
Ultimanet's avatar
Ultimanet committed
755
            return self.data
756

757
    def get_data(self, key, local_keys=False, **kwargs):
ultimanet's avatar
ultimanet committed
758
        """
759
760
            Loads data from the region which is specified by key. The data is
            consolidated according to the distribution strategy. If the
ultimanet's avatar
ultimanet committed
761
            individual nodes get different key-arguments, they get individual
762
763
            data.

ultimanet's avatar
ultimanet committed
764
765
            Parameters
            ----------
766

ultimanet's avatar
ultimanet committed
767
            key : int, slice, tuple of int or slice
768
769
770
                The key is the object which specifies the region, where data
                will be loaded from.

ultimanet's avatar
ultimanet committed
771
772
773
            Returns
            -------
            global_data[key] : numpy.ndarray
774

ultimanet's avatar
ultimanet committed
775
        """
Ultima's avatar
Ultima committed
776
777
778
779
780
781
782
783
784
785
786
787
        if key is None:
            return self.copy()
        elif isinstance(key, slice):
            if key == slice(None):
                return self.copy()
        elif isinstance(key, tuple):
            try:
                if all(x == slice(None) for x in key):
                    return self.copy()
            except(ValueError):
                pass

788
789
        return self.distributor.collect_data(self.data,
                                             key,
790
                                             local_keys=local_keys,
791
                                             **kwargs)
792

ultimanet's avatar
ultimanet committed
793
794
    def get_full_data(self, target_rank='all'):
        """
795
796
            Fully consolidates the distributed data.

ultimanet's avatar
ultimanet committed
797
798
799
            Parameters
            ----------
            target_rank : 'all' (default), int *optional*
800
                If only one node should recieve the full data, it can be
ultimanet's avatar
ultimanet committed
801
                specified here.
802

ultimanet's avatar
ultimanet committed
803
804
            Notes
            -----
805
            get_full_data() is equivalent to get_data(slice(None)) but
ultimanet's avatar
ultimanet committed
806
            faster.
807

ultimanet's avatar
ultimanet committed
808
809
810
811
812
            Returns
            -------
            None
        """

813
        return self.distributor.consolidate_data(self.data,
814
                                                 target_rank=target_rank)
ultimanet's avatar
ultimanet committed
815

816
    def inject(self, to_key=(slice(None),), data=None,
Ultima's avatar
Ultima committed
817
818
               from_key=(slice(None),)):
        if data is None:
Ultimanet's avatar
Ultimanet committed
819
            return self
Ultima's avatar
Ultima committed
820
        self.distributor.inject(self.data, to_key, data, from_key)
821

822
823
    def flatten(self, inplace=False):
        flat_data = self.distributor.flatten(self.data, inplace=inplace)
824

Ultima's avatar
Ultima committed
825
826
        flat_global_shape = (np.prod(self.shape),)
        flat_local_shape = np.shape(flat_data)
827
828
829

        # Try to keep the distribution strategy. Therefore
        # create an empty copy of self which has the new shape
830
831
        temp_d2o = self.copy_empty(global_shape=flat_global_shape,
                                   local_shape=flat_local_shape)
832
        # Check if the local shapes match.
Ultima's avatar
Ultima committed
833
834
        if temp_d2o.local_shape == flat_local_shape:
            work_d2o = temp_d2o