nifty_mpi_data.py 108 KB
Newer Older
ultimanet's avatar
ultimanet committed
1
# -*- coding: utf-8 -*-
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# NIFTY (Numerical Information Field Theory) has been developed at the
# Max-Planck-Institute for Astrophysics.
#
# Copyright (C) 2015 Max-Planck-Society
#
# Author: Theo Steininger
# Project homepage: <http://www.mpa-garching.mpg.de/ift/nifty/>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
22
23


ultimanet's avatar
ultimanet committed
24
import numpy as np
25
from weakref import WeakValueDictionary as weakdict
ultimanet's avatar
ultimanet committed
26

27
28
29
from keepers import about,\
                    global_configuration as gc,\
                    global_dependency_injector as gdi
ultimanet's avatar
ultimanet committed
30

31
32
33
MPI = gdi[gc['mpi_module']]
h5py = gdi.get('h5py')
pyfftw = gdi.get('pyfftw')
ultimanet's avatar
ultimanet committed
34

35
36
37
38
39
40
41
42
43
_maybe_fftw = ['fftw'] if ('pyfftw' in gdi) else []
STRATEGIES = {
                'all': ['not', 'equal', 'freeform'] + _maybe_fftw,
                'global': ['not', 'equal'] + _maybe_fftw,
                'local': ['freeform'],
                'slicing': ['equal', 'freeform'] + _maybe_fftw,
                'not': ['not'],
                'hdf5': ['equal'] + _maybe_fftw,
             }
44

45

ultimanet's avatar
ultimanet committed
46
47
48
49
50
51
52
53
class distributed_data_object(object):
    """

        NIFTY class for distributed data

        Parameters
        ----------
        global_data : {tuple, list, numpy.ndarray} *at least 1-dimensional*
54
            Initial data which will be casted to a numpy.ndarray and then
ultimanet's avatar
ultimanet committed
55
56
57
58
59
60
            stored according to the distribution strategy. The global_data's
            shape overwrites global_shape.
        global_shape : tuple of ints, *optional*
            If no global_data is supplied, global_shape can be used to
            initialize an empty distributed_data_object
        dtype : type, *optional*
61
62
            If an explicit dtype is supplied, the given global_data will be
            casted to it.
ultimanet's avatar
ultimanet committed
63
        distribution_strategy : {'fftw' (default), 'not'}, *optional*
64
65
            Specifies the way, how global_data will be distributed to the
            individual nodes.
ultimanet's avatar
ultimanet committed
66
            'fftw' follows the distribution strategy of pyfftw.
67
68
            'not' does not distribute the data at all.

ultimanet's avatar
ultimanet committed
69
70
71
72
73
74
75
76
77
78

        Attributes
        ----------
        data : numpy.ndarray
            The numpy.ndarray in which the individual node's data is stored.
        dtype : type
            Data type of the data object.
        distribution_strategy : string
            Name of the used distribution_strategy
        distributor : distributor
79
80
            The distributor object which takes care of all distribution and
            consolidation of the data.
ultimanet's avatar
ultimanet committed
81
82
        shape : tuple of int
            The global shape of the data
83

ultimanet's avatar
ultimanet committed
84
85
        Raises
        ------
86
87
88
        TypeError :
            If the supplied distribution strategy is not known.

ultimanet's avatar
ultimanet committed
89
    """
90

91
    def __init__(self, global_data=None, global_shape=None, dtype=None,
Ultima's avatar
Ultima committed
92
                 local_data=None, local_shape=None,
93
                 distribution_strategy='fftw', hermitian=False,
94
95
96
97
98
                 alias=None, path=None, comm=MPI.COMM_WORLD,
                 copy=True, *args, **kwargs):

        # TODO: allow init with empty shape

99
100
101
102
        if isinstance(global_data, tuple) or isinstance(global_data, list):
            global_data = np.array(global_data, copy=False)
        if isinstance(local_data, tuple) or isinstance(local_data, list):
            local_data = np.array(local_data, copy=False)
103

104
        self.distributor = distributor_factory.get_distributor(
105
106
107
108
109
110
111
112
113
                                distribution_strategy=distribution_strategy,
                                comm=comm,
                                global_data=global_data,
                                global_shape=global_shape,
                                local_data=local_data,
                                local_shape=local_shape,
                                alias=alias,
                                path=path,
                                dtype=dtype,
114
                                **kwargs)
115

ultimanet's avatar
ultimanet committed
116
117
118
        self.distribution_strategy = distribution_strategy
        self.dtype = self.distributor.dtype
        self.shape = self.distributor.global_shape
Ultima's avatar
Ultima committed
119
120
        self.local_shape = self.distributor.local_shape
        self.comm = self.distributor.comm
121
122

        self.init_args = args
123
        self.init_kwargs = kwargs
124

Ultima's avatar
Ultima committed
125
        (self.data, self.hermitian) = self.distributor.initialize_data(
126
127
128
129
130
131
            global_data=global_data,
            local_data=local_data,
            alias=alias,
            path=path,
            hermitian=hermitian,
            copy=copy)
132
        self.index = d2o_librarian.register(self)
133

134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    @property
    def real(self):
        new_data = self.get_local_data().real
        new_dtype = new_data.dtype
        new_d2o = self.copy_empty(dtype=new_dtype)
        new_d2o.set_local_data(data=new_data,
                               hermitian=self.hermitian)
        return new_d2o

    @property
    def imag(self):
        new_data = self.get_local_data().imag
        new_dtype = new_data.dtype
        new_d2o = self.copy_empty(dtype=new_dtype)
        new_d2o.set_local_data(data=new_data,
                               hermitian=self.hermitian)
        return new_d2o

Ultimanet's avatar
Ultimanet committed
152
    def copy(self, dtype=None, distribution_strategy=None, **kwargs):
153
154
155
        temp_d2o = self.copy_empty(dtype=dtype,
                                   distribution_strategy=distribution_strategy,
                                   **kwargs)
Ultima's avatar
Ultima committed
156
        if distribution_strategy is None or \
157
                distribution_strategy == self.distribution_strategy:
Ultimanet's avatar
Ultimanet committed
158
159
            temp_d2o.set_local_data(self.get_local_data(), copy=True)
        else:
Ultima's avatar
Ultima committed
160
            temp_d2o.inject((slice(None),), self, (slice(None),))
161
        temp_d2o.hermitian = self.hermitian
162
        return temp_d2o
163
164

    def copy_empty(self, global_shape=None, local_shape=None, dtype=None,
165
                   distribution_strategy=None, **kwargs):
Ultima's avatar
Ultima committed
166
        if self.distribution_strategy == 'not' and \
167
                distribution_strategy in STRATEGIES['local'] and \
168
169
170
171
172
                local_shape is None:
            result = self.copy_empty(global_shape=global_shape,
                                     local_shape=local_shape,
                                     dtype=dtype,
                                     distribution_strategy='equal',
Ultima's avatar
Ultima committed
173
                                     **kwargs)
174
            return result.copy_empty(distribution_strategy='freeform')
175

Ultima's avatar
Ultima committed
176
        if global_shape is None:
177
            global_shape = self.shape
Ultima's avatar
Ultima committed
178
179
180
        if local_shape is None:
            local_shape = self.local_shape
        if dtype is None:
181
            dtype = self.dtype
Ultima's avatar
Ultima committed
182
        if distribution_strategy is None:
183
184
185
            distribution_strategy = self.distribution_strategy

        kwargs.update(self.init_kwargs)
186

187
188
189
190
191
192
193
194
        temp_d2o = distributed_data_object(
                                   global_shape=global_shape,
                                   local_shape=local_shape,
                                   dtype=dtype,
                                   distribution_strategy=distribution_strategy,
                                   comm=self.comm,
                                   *self.init_args,
                                   **kwargs)
195
        return temp_d2o
196

197
    def apply_scalar_function(self, function, inplace=False, dtype=None):
198
        remember_hermitianQ = self.hermitian
199

200
        if inplace is True:
Ultimanet's avatar
Ultimanet committed
201
            temp = self
Ultima's avatar
Ultima committed
202
            if dtype is not None and self.dtype != np.dtype(dtype):
203
204
205
                about.warnings.cprint(
                    "WARNING: Inplace dtype conversion is not possible!")

Ultimanet's avatar
Ultimanet committed
206
        else:
207
            temp = self.copy_empty(dtype=dtype)
Ultimanet's avatar
Ultimanet committed
208

Ultima's avatar
Ultima committed
209
        if np.prod(self.local_shape) != 0:
210
            try:
Ultima's avatar
Ultima committed
211
212
213
214
                temp.data[:] = function(self.data)
            except:
                temp.data[:] = np.vectorize(function)(self.data)
        else:
215
216
            # Noting to do here. The value-empty array
            # is also geometrically empty
Ultima's avatar
Ultima committed
217
            pass
218

219
220
221
222
        if function in (np.exp, np.log):
            temp.hermitian = remember_hermitianQ
        else:
            temp.hermitian = False
Ultimanet's avatar
Ultimanet committed
223
        return temp
224

Ultimanet's avatar
Ultimanet committed
225
226
227
    def apply_generator(self, generator):
        self.set_local_data(generator(self.distributor.local_shape))
        self.hermitian = False
228

ultimanet's avatar
ultimanet committed
229
230
    def __str__(self):
        return self.data.__str__()
231

ultimanet's avatar
ultimanet committed
232
    def __repr__(self):
233
        return '<distributed_data_object>\n' + self.data.__repr__()
234

235
    def _compare_helper(self, other, op):
236
        result = self.copy_empty(dtype=np.bool_)
237
238
        # Case 1: 'other' is a scalar
        # -> make point-wise comparison
Ultimanet's avatar
Ultimanet committed
239
        if np.isscalar(other):
240
            result.set_local_data(
241
                getattr(self.get_local_data(copy=False), op)(other))
242
            return result
Ultimanet's avatar
Ultimanet committed
243

244
245
        # Case 2: 'other' is a numpy array or a distributed_data_object
        # -> extract the local data and make point-wise comparison
Ultimanet's avatar
Ultimanet committed
246
        elif isinstance(other, np.ndarray) or\
247
                isinstance(other, distributed_data_object):
Ultimanet's avatar
Ultimanet committed
248
            temp_data = self.distributor.extract_local_data(other)
249
250
            result.set_local_data(
                getattr(self.get_local_data(copy=False), op)(temp_data))
Ultimanet's avatar
Ultimanet committed
251
            return result
252
253

        # Case 3: 'other' is None
Ultima's avatar
Ultima committed
254
        elif other is None:
Ultimanet's avatar
Ultimanet committed
255
            return False
256
257
258

        # Case 4: 'other' is something different
        # -> make a numpy casting and make a recursive call
Ultimanet's avatar
Ultimanet committed
259
260
        else:
            temp_other = np.array(other)
261
            return getattr(self, op)(temp_other)
262

263
264
    def __ne__(self, other):
        return self._compare_helper(other, '__ne__')
265

266
267
    def __lt__(self, other):
        return self._compare_helper(other, '__lt__')
268

269
270
271
272
273
274
    def __le__(self, other):
        return self._compare_helper(other, '__le__')

    def __eq__(self, other):

        return self._compare_helper(other, '__eq__')
275

276
277
278
279
280
281
    def __ge__(self, other):
        return self._compare_helper(other, '__ge__')

    def __gt__(self, other):
        return self._compare_helper(other, '__gt__')

Ultimanet's avatar
Ultimanet committed
282
    def equal(self, other):
Ultimanet's avatar
Ultimanet committed
283
284
285
286
287
288
289
290
291
        if other is None:
            return False
        try:
            assert(self.dtype == other.dtype)
            assert(self.shape == other.shape)
            assert(self.init_args == other.init_args)
            assert(self.init_kwargs == other.init_kwargs)
            assert(self.distribution_strategy == other.distribution_strategy)
            assert(np.all(self.data == other.data))
Ultimanet's avatar
Ultimanet committed
292
        except(AssertionError, AttributeError):
Ultimanet's avatar
Ultimanet committed
293
294
295
296
            return False
        else:
            return True

297
    def __pos__(self):
298
        temp_d2o = self.copy_empty()
299
        temp_d2o.set_local_data(data=self.get_local_data(), copy=True)
300
        return temp_d2o
301

ultimanet's avatar
ultimanet committed
302
    def __neg__(self):
303
        temp_d2o = self.copy_empty()
304
305
        temp_d2o.set_local_data(data=self.get_local_data().__neg__(),
                                copy=True)
ultimanet's avatar
ultimanet committed
306
        return temp_d2o
307

308
    def __abs__(self):
309
        # translate complex dtypes
310
311
312
313
314
315
        if self.dtype == np.dtype('complex64'):
            new_dtype = np.dtype('float32')
        elif self.dtype == np.dtype('complex128'):
            new_dtype = np.dtype('float64')
        elif issubclass(self.dtype.type, np.complexfloating):
            new_dtype = np.dtype('float')
Ultimanet's avatar
Ultimanet committed
316
317
        else:
            new_dtype = self.dtype
318
319
320
        temp_d2o = self.copy_empty(dtype=new_dtype)
        temp_d2o.set_local_data(data=self.get_local_data().__abs__(),
                                copy=True)
321
        return temp_d2o
322

Ultima's avatar
Ultima committed
323
324
325
326
327
    def _builtin_helper(self, operator, other, inplace=False):
        if isinstance(other, distributed_data_object):
            other_is_real = other.isreal()
        else:
            other_is_real = np.isreal(other)
328
329

        # Case 1: other is not a scalar
Ultimanet's avatar
Ultimanet committed
330
        if not (np.isscalar(other) or np.shape(other) == (1,)):
331
            try:
332
                hermitian_Q = (other.hermitian and self.hermitian)
333
334
            except(AttributeError):
                hermitian_Q = False
335
            # extract the local data from the 'other' object
Ultimanet's avatar
Ultimanet committed
336
337
            temp_data = self.distributor.extract_local_data(other)
            temp_data = operator(temp_data)
338
339

        # Case 2: other is a real scalar -> preserve hermitianity
Ultima's avatar
Ultima committed
340
341
        elif other_is_real or (self.dtype not in (np.dtype('complex128'),
                                                  np.dtype('complex256'))):
342
            hermitian_Q = self.hermitian
ultimanet's avatar
ultimanet committed
343
            temp_data = operator(other)
344
        # Case 3: other is complex
345
346
        else:
            hermitian_Q = False
347
348
            temp_data = operator(other)
        # write the new data into a new distributed_data_object
349
        if inplace is True:
350
351
            temp_d2o = self
        else:
352
            # use common datatype for self and other
353
            new_dtype = np.dtype(np.find_common_type((self.dtype,),
354
                                                     (temp_data.dtype,)))
355
            temp_d2o = self.copy_empty(
356
                dtype=new_dtype)
ultimanet's avatar
ultimanet committed
357
        temp_d2o.set_local_data(data=temp_data)
358
        temp_d2o.hermitian = hermitian_Q
ultimanet's avatar
ultimanet committed
359
        return temp_d2o
360

ultimanet's avatar
ultimanet committed
361
    def __add__(self, other):
Ultima's avatar
Ultima committed
362
        return self._builtin_helper(self.get_local_data().__add__, other)
ultimanet's avatar
ultimanet committed
363
364

    def __radd__(self, other):
Ultima's avatar
Ultima committed
365
        return self._builtin_helper(self.get_local_data().__radd__, other)
Ultimanet's avatar
Ultimanet committed
366
367

    def __iadd__(self, other):
368
        return self._builtin_helper(self.get_local_data().__iadd__,
369
370
                                    other,
                                    inplace=True)
Ultimanet's avatar
Ultimanet committed
371

ultimanet's avatar
ultimanet committed
372
    def __sub__(self, other):
Ultima's avatar
Ultima committed
373
        return self._builtin_helper(self.get_local_data().__sub__, other)
374

ultimanet's avatar
ultimanet committed
375
    def __rsub__(self, other):
Ultima's avatar
Ultima committed
376
        return self._builtin_helper(self.get_local_data().__rsub__, other)
377

ultimanet's avatar
ultimanet committed
378
    def __isub__(self, other):
379
        return self._builtin_helper(self.get_local_data().__isub__,
380
381
                                    other,
                                    inplace=True)
382

ultimanet's avatar
ultimanet committed
383
    def __div__(self, other):
Ultima's avatar
Ultima committed
384
        return self._builtin_helper(self.get_local_data().__div__, other)
385

386
387
    def __truediv__(self, other):
        return self.__div__(other)
388

ultimanet's avatar
ultimanet committed
389
    def __rdiv__(self, other):
Ultima's avatar
Ultima committed
390
        return self._builtin_helper(self.get_local_data().__rdiv__, other)
391

392
393
    def __rtruediv__(self, other):
        return self.__rdiv__(other)
ultimanet's avatar
ultimanet committed
394

Ultimanet's avatar
Ultimanet committed
395
    def __idiv__(self, other):
396
        return self._builtin_helper(self.get_local_data().__idiv__,
397
398
399
                                    other,
                                    inplace=True)

400
    def __itruediv__(self, other):
401
        return self.__idiv__(other)
402

ultimanet's avatar
ultimanet committed
403
    def __floordiv__(self, other):
404
        return self._builtin_helper(self.get_local_data().__floordiv__,
405
406
                                    other)

ultimanet's avatar
ultimanet committed
407
    def __rfloordiv__(self, other):
408
        return self._builtin_helper(self.get_local_data().__rfloordiv__,
409
410
                                    other)

Ultimanet's avatar
Ultimanet committed
411
    def __ifloordiv__(self, other):
Ultima's avatar
Ultima committed
412
        return self._builtin_helper(
413
414
            self.get_local_data().__ifloordiv__, other,
            inplace=True)
415

ultimanet's avatar
ultimanet committed
416
    def __mul__(self, other):
Ultima's avatar
Ultima committed
417
        return self._builtin_helper(self.get_local_data().__mul__, other)
418

ultimanet's avatar
ultimanet committed
419
    def __rmul__(self, other):
Ultima's avatar
Ultima committed
420
        return self._builtin_helper(self.get_local_data().__rmul__, other)
ultimanet's avatar
ultimanet committed
421
422

    def __imul__(self, other):
423
        return self._builtin_helper(self.get_local_data().__imul__,
424
425
                                    other,
                                    inplace=True)
Ultimanet's avatar
Ultimanet committed
426

ultimanet's avatar
ultimanet committed
427
    def __pow__(self, other):
Ultima's avatar
Ultima committed
428
        return self._builtin_helper(self.get_local_data().__pow__, other)
429

ultimanet's avatar
ultimanet committed
430
    def __rpow__(self, other):
Ultima's avatar
Ultima committed
431
        return self._builtin_helper(self.get_local_data().__rpow__, other)
ultimanet's avatar
ultimanet committed
432
433

    def __ipow__(self, other):
434
        return self._builtin_helper(self.get_local_data().__ipow__,
435
436
437
                                    other,
                                    inplace=True)

Ultima's avatar
Ultima committed
438
439
    def __mod__(self, other):
        return self._builtin_helper(self.get_local_data().__mod__, other)
440

Ultima's avatar
Ultima committed
441
    def __rmod__(self, other):
442
        return self._builtin_helper(self.get_local_data().__rmod__, other)
443

Ultima's avatar
Ultima committed
444
    def __imod__(self, other):
445
        return self._builtin_helper(self.get_local_data().__imod__,
446
447
448
                                    other,
                                    inplace=True)

449
450
    def __len__(self):
        return self.shape[0]
451

452
    def get_dim(self):
453
        return np.prod(self.shape)
454

455
    def vdot(self, other):
456
        other = self.distributor.extract_local_data(other)
457
458
459
460
        local_vdot = np.vdot(self.get_local_data(), other)
        local_vdot_list = self.distributor._allgather(local_vdot)
        global_vdot = np.sum(local_vdot_list)
        return global_vdot
Ultimanet's avatar
Ultimanet committed
461

ultimanet's avatar
ultimanet committed
462
    def __getitem__(self, key):
Ultima's avatar
Ultima committed
463
        return self.get_data(key)
464

ultimanet's avatar
ultimanet committed
465
466
    def __setitem__(self, key, data):
        self.set_data(data, key)
467

468
    def _contraction_helper(self, function, **kwargs):
Ultima's avatar
Ultima committed
469
470
471
472
473
474
475
        if np.prod(self.data.shape) == 0:
            local = 0
            include = False
        else:
            local = function(self.data, **kwargs)
            include = True

476
        local_list = self.distributor._allgather(local)
477
        local_list = np.array(local_list, dtype=np.dtype(local_list[0]))
Ultima's avatar
Ultima committed
478
479
480
        include_list = np.array(self.distributor._allgather(include))
        work_list = local_list[include_list]
        if work_list.shape[0] == 0:
481
            raise ValueError("ERROR: Zero-size array to reduction operation " +
Ultima's avatar
Ultima committed
482
                             "which has no identity")
483
        else:
Ultima's avatar
Ultima committed
484
485
            result = function(work_list, axis=0)
            return result
486

487
488
489
    def min(self, **kwargs):
        return self.amin(**kwargs)

490
    def amin(self, **kwargs):
491
        return self._contraction_helper(np.amin, **kwargs)
492
493

    def nanmin(self, **kwargs):
494
        return self._contraction_helper(np.nanmin, **kwargs)
495

496
497
498
    def max(self, **kwargs):
        return self.amax(**kwargs)

499
    def amax(self, **kwargs):
500
        return self._contraction_helper(np.amax, **kwargs)
501

502
    def nanmax(self, **kwargs):
503
        return self._contraction_helper(np.nanmax, **kwargs)
504

505
506
507
508
    def sum(self, **kwargs):
        return self._contraction_helper(np.sum, **kwargs)

    def prod(self, **kwargs):
509
510
        return self._contraction_helper(np.prod, **kwargs)

511
    def mean(self, power=1):
512
        # compute the local means and the weights for the mean-mean.
Ultima's avatar
Ultima committed
513
514
515
516
517
518
        if np.prod(self.data.shape) == 0:
            local_mean = 0
            include = False
        else:
            local_mean = np.mean(self.data**power)
            include = True
519

520
        local_weight = np.prod(self.data.shape)
521
        # collect the local means and cast the result to a ndarray
Ultima's avatar
Ultima committed
522
523
        local_mean_list = self.distributor._allgather(local_mean)
        local_weight_list = self.distributor._allgather(local_weight)
524

525
526
        local_mean_list = np.array(local_mean_list,
                                   dtype=np.dtype(local_mean_list[0]))
527
528
        local_weight_list = np.array(local_weight_list)
        # extract the parts from the non-empty nodes
Ultima's avatar
Ultima committed
529
530
531
532
533
        include_list = np.array(self.distributor._allgather(include))
        work_mean_list = local_mean_list[include_list]
        work_weight_list = local_weight_list[include_list]
        if work_mean_list.shape[0] == 0:
            raise ValueError("ERROR:  Mean of empty slice.")
534
535
        else:
            # compute the denominator for the weighted mean-mean
Ultima's avatar
Ultima committed
536
            global_weight = np.sum(work_weight_list)
537
            # compute the numerator
Ultima's avatar
Ultima committed
538
            numerator = np.sum(work_mean_list * work_weight_list)
539
            global_mean = numerator / global_weight
Ultima's avatar
Ultima committed
540
            return global_mean
541
542
543
544
545

    def var(self):
        mean_of_the_square = self.mean(power=2)
        square_of_the_mean = self.mean()**2
        return mean_of_the_square - square_of_the_mean
546

547
548
    def std(self):
        return np.sqrt(self.var())
549

Ultima's avatar
Ultima committed
550
551
552
553
554
555
556
    def argmin(self):
        if np.prod(self.data.shape) == 0:
            local_argmin = np.nan
            local_argmin_value = np.nan
            globalized_local_argmin = np.nan
        else:
            local_argmin = np.argmin(self.data)
557
            local_argmin_value = self.data[np.unravel_index(local_argmin,
Ultima's avatar
Ultima committed
558
                                                            self.data.shape)]
559

Ultima's avatar
Ultima committed
560
            globalized_local_argmin = self.distributor.globalize_flat_index(
561
562
563
564
                local_argmin)
        local_argmin_list = self.distributor._allgather(
                                                    (local_argmin_value,
                                                     globalized_local_argmin))
565
        local_argmin_list = np.array(local_argmin_list, dtype=[
566
567
            ('value', np.dtype('complex128')),
            ('index', np.dtype('float'))])
568
569
        local_argmin_list = np.sort(local_argmin_list,
                                    order=['value', 'index'])
Ultima's avatar
Ultima committed
570
        return np.int(local_argmin_list[0][1])
571

Ultima's avatar
Ultima committed
572
573
574
575
576
577
578
    def argmax(self):
        if np.prod(self.data.shape) == 0:
            local_argmax = np.nan
            local_argmax_value = np.nan
            globalized_local_argmax = np.nan
        else:
            local_argmax = np.argmax(self.data)
579
            local_argmax_value = -self.data[np.unravel_index(local_argmax,
580
                                                             self.data.shape)]
Ultima's avatar
Ultima committed
581
            globalized_local_argmax = self.distributor.globalize_flat_index(
582
583
584
585
                local_argmax)
        local_argmax_list = self.distributor._allgather(
                                                  (local_argmax_value,
                                                   globalized_local_argmax))
586
        local_argmax_list = np.array(local_argmax_list, dtype=[
587
588
            ('value', np.dtype('complex128')),
            ('index', np.dtype('float'))])
589
590
        local_argmax_list = np.sort(local_argmax_list,
                                    order=['value', 'index'])
Ultima's avatar
Ultima committed
591
        return np.int(local_argmax_list[0][1])
592

593
    def argmin_nonflat(self):
Ultima's avatar
Ultima committed
594
        return np.unravel_index(self.argmin(), self.shape)
595

Ultima's avatar
Ultima committed
596
597
    def argmax_nonflat(self):
        return np.unravel_index(self.argmax(), self.shape)
598

599
600
601
602
603
604
605
    def conjugate(self):
        temp_d2o = self.copy_empty()
        temp_data = np.conj(self.get_local_data())
        temp_d2o.set_local_data(temp_data)
        return temp_d2o

    def conj(self):
606
607
        return self.conjugate()

608
    def median(self):
609
        about.warnings.cprint(
610
611
612
            "WARNING: The current implementation of median is very expensive!")
        median = np.median(self.get_full_data())
        return median
613

614
    def _is_helper(self, function):
Ultima's avatar
Ultima committed
615
        temp_d2o = self.copy_empty(dtype=np.dtype('bool'))
616
        temp_d2o.set_local_data(function(self.data))
617
        return temp_d2o
618

619
620
621
    def iscomplex(self):
        return self._is_helper(np.iscomplex)

622
    def isreal(self):
623
624
625
626
627
628
629
630
631
632
633
634
635
636
        return self._is_helper(np.isreal)

    def isnan(self):
        return self._is_helper(np.isnan)

    def isinf(self):
        return self._is_helper(np.isinf)

    def isfinite(self):
        return self._is_helper(np.isfinite)

    def nan_to_num(self):
        temp_d2o = self.copy_empty()
        temp_d2o.set_local_data(np.nan_to_num(self.data))
637
        return temp_d2o
638

639
640
641
642
643
644
645
646
    def all(self):
        local_all = np.all(self.get_local_data())
        global_all = self.distributor._allgather(local_all)
        return np.all(global_all)

    def any(self):
        local_any = np.any(self.get_local_data())
        global_any = self.distributor._allgather(local_any)
647
        return np.any(global_any)
648

649
650
651
652
653
    def unique(self):
        local_unique = np.unique(self.get_local_data())
        global_unique = self.distributor._allgather(local_unique)
        global_unique = np.concatenate(global_unique)
        return np.unique(global_unique)
654

655
    def bincount(self, weights=None, minlength=None):
656
        if self.dtype not in [np.dtype('int16'), np.dtype('int32'),
657
658
                              np.dtype('int64'),  np.dtype('uint16'),
                              np.dtype('uint32'), np.dtype('uint64')]:
659
            raise TypeError(about._errors.cstring(
660
661
                "ERROR: Distributed-data-object must be of integer datatype!"))

662
        minlength = max(self.amax() + 1, minlength)
663

664
665
        if weights is not None:
            local_weights = self.distributor.extract_local_data(weights).\
666
                                flatten()
667
668
        else:
            local_weights = None
669

670
        local_counts = np.bincount(self.get_local_data().flatten(),
671
672
                                   weights=local_weights,
                                   minlength=minlength)
Ultima's avatar
Ultima committed
673
674
675
676
        if self.distribution_strategy == 'not':
            return local_counts
        else:
            list_of_counts = self.distributor._allgather(local_counts)
677
            counts = np.sum(list_of_counts, axis=0)
Ultima's avatar
Ultima committed
678
            return counts
679

Ultima's avatar
Ultima committed
680
681
    def where(self):
        return self.distributor.where(self.data)
682

683
    def set_local_data(self, data, hermitian=False, copy=True):
ultimanet's avatar
ultimanet committed
684
        """
685
            Stores data directly in the local data attribute. No distribution
ultimanet's avatar
ultimanet committed
686
687
688
689
690
            is done. The shape of the data must fit the local data attributes
            shape.

            Parameters
            ----------
691
            data : tuple, list, numpy.ndarray
ultimanet's avatar
ultimanet committed
692
                The data which should be stored in the local data attribute.
693

ultimanet's avatar
ultimanet committed
694
695
696
            Returns
            -------
            None
697

ultimanet's avatar
ultimanet committed
698
        """
Ultimanet's avatar
Ultimanet committed
699
        self.hermitian = hermitian
700
        if copy is True:
Ultima's avatar
Ultima committed
701
702
            self.data[:] = data
        else:
703
704
705
706
            self.data = np.array(data,
                                 dtype=self.dtype,
                                 copy=False,
                                 order='C').reshape(self.local_shape)
707

Ultima's avatar
Ultima committed
708
    def set_data(self, data, to_key, from_key=None, local_keys=False,
709
                 hermitian=False, copy=True, **kwargs):
ultimanet's avatar
ultimanet committed
710
        """
711
            Stores the supplied data in the region which is specified by key.
ultimanet's avatar
ultimanet committed
712
            The data is distributed according to the distribution strategy. If
713
            the individual nodes get different key-arguments. Their data is
ultimanet's avatar
ultimanet committed
714
            processed one-by-one.
715

ultimanet's avatar
ultimanet committed
716
717
            Parameters
            ----------
718
            data : tuple, list, numpy.ndarray
ultimanet's avatar
ultimanet committed
719
720
                The data which should be distributed.
            key : int, slice, tuple of int or slice
721
722
723
                The key is the object which specifies the region, where data
                will be stored in.

ultimanet's avatar
ultimanet committed
724
725
726
            Returns
            -------
            None
727

ultimanet's avatar
ultimanet committed
728
        """
Ultimanet's avatar
Ultimanet committed
729
        self.hermitian = hermitian
730
731
732
733
734
735
        self.distributor.disperse_data(data=self.data,
                                       to_key=to_key,
                                       data_update=data,
                                       from_key=from_key,
                                       local_keys=local_keys,
                                       copy=copy,
Ultima's avatar
Ultima committed
736
                                       **kwargs)
737

738
    def set_full_data(self, data, hermitian=False, copy=True, **kwargs):
ultimanet's avatar
ultimanet committed
739
        """
740
            Distributes the supplied data to the nodes. The shape of data must
ultimanet's avatar
ultimanet committed
741
            match the shape of the distributed_data_object.
742

ultimanet's avatar
ultimanet committed
743
744
            Parameters
            ----------
745
            data : tuple, list, numpy.ndarray
ultimanet's avatar
ultimanet committed
746
                The data which should be distributed.
747

ultimanet's avatar
ultimanet committed
748
749
            Notes
            -----
750
            set_full_data(foo) is equivalent to set_data(foo,slice(None)) but
ultimanet's avatar
ultimanet committed
751
            faster.
752

ultimanet's avatar
ultimanet committed
753
754
755
            Returns
            -------
            None
756

ultimanet's avatar
ultimanet committed
757
        """
Ultimanet's avatar
Ultimanet committed
758
        self.hermitian = hermitian
759
        self.data = self.distributor.distribute_data(data=data, copy=copy,
760
                                                     **kwargs)
ultimanet's avatar
ultimanet committed
761

Ultimanet's avatar
Ultimanet committed
762
    def get_local_data(self, key=(slice(None),), copy=True):
ultimanet's avatar
ultimanet committed
763
        """
764
765
            Loads data directly from the local data attribute. No consolidation
            is done.
ultimanet's avatar
ultimanet committed
766
767
768
769

            Parameters
            ----------
            key : int, slice, tuple of int or slice
770
771
                The key which will be used to access the data.

ultimanet's avatar
ultimanet committed
772
773
774
            Returns
            -------
            self.data[key] : numpy.ndarray
775

Ultimanet's avatar
Ultimanet committed
776
        """
777
        if copy is True:
778
            return self.data[key]
779
        if copy is False:
Ultimanet's avatar
Ultimanet committed
780
            return self.data
781

782
    def get_data(self, key, local_keys=False, **kwargs):
ultimanet's avatar
ultimanet committed
783
        """
784
785
            Loads data from the region which is specified by key. The data is
            consolidated according to the distribution strategy. If the
ultimanet's avatar
ultimanet committed
786
            individual nodes get different key-arguments, they get individual
787
788
            data.

ultimanet's avatar
ultimanet committed
789
790
            Parameters
            ----------
791

ultimanet's avatar
ultimanet committed
792
            key : int, slice, tuple of int or slice
793
794
795
                The key is the object which specifies the region, where data
                will be loaded from.

ultimanet's avatar
ultimanet committed
796
797
798
            Returns
            -------
            global_data[key] : numpy.ndarray
799

ultimanet's avatar
ultimanet committed
800
        """
Ultima's avatar
Ultima committed
801
802
803
804
805
806
807
808
809
810
811
812
        if key is None:
            return self.copy()
        elif isinstance(key, slice):
            if key == slice(None):
                return self.copy()
        elif isinstance(key, tuple):
            try:
                if all(x == slice(None) for x in key):
                    return self.copy()
            except(ValueError):
                pass

813
814
        return self.distributor.collect_data(self.data,
                                             key,
815
                                             local_keys=local_keys,
816
                                             **kwargs)
817

ultimanet's avatar
ultimanet committed
818
819
    def get_full_data(self, target_rank='all'):
        """
820
821
            Fully consolidates the distributed data.

ultimanet's avatar
ultimanet committed
822
823
824
            Parameters
            ----------
            target_rank : 'all' (default), int *optional*
825
                If only one node should recieve the full data, it can be
ultimanet's avatar
ultimanet committed
826
                specified here.
827

ultimanet's avatar
ultimanet committed
828
829
            Notes
            -----
830
            get_full_data() is equivalent to get_data(slice(None)) but
ultimanet's avatar
ultimanet committed
831
            faster.
832

ultimanet's avatar
ultimanet committed
833
834
835
836
837
            Returns
            -------
            None
        """

838
        return self.distributor.consolidate_data(self.data,
839
                                                 target_rank=target_rank)
ultimanet's avatar
ultimanet committed
840

841
    def inject(self, to_key=(slice(None),), data=None,
Ultima's avatar
Ultima committed
842
843
               from_key=(slice(None),)):
        if data is None:
Ultimanet's avatar
Ultimanet committed
844
            return self
Ultima's avatar
Ultima committed
845
        self.distributor.inject(self.data, to_key, data, from_key)
846

847
848
    def flatten(self, inplace=False):
        flat_data = self.distributor.flatten(self.data, inplace=inplace)
849

Ultima's avatar
Ultima committed
850
851
        flat_global_shape = (np.prod(self.shape),)
        flat_local_shape = np.shape(flat_data)
852
853
854

        # Try to keep the distribution strategy. Therefore
        # create an empty copy of self which has the new shape
855
856
        temp_d2o = self.copy_empty(global_shape=flat_global_shape,
                                   local_shape=flat_local_shape)
857
        # Check if the local shapes match.
Ultima's avatar
Ultima committed
858
859
        if temp_d2o.local_shape == flat_local_shape:
            work_d2o = temp_d2o
860