# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Copyright(C) 2013-2019 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik.
from functools import reduce
import numpy as np
from . import dobj, utilities
from .domain_tuple import DomainTuple
class Field(object):
"""The discrete representation of a continuous field over multiple spaces.
Stores data arrays and carries all the needed metainformation (i.e. the
domain) for operators to be able to operate on them.
Parameters
----------
domain : DomainTuple
the domain of the new Field
val : data_object
This object's global shape must match the domain shape
After construction, the object will no longer be writeable!
Notes
-----
If possible, do not invoke the constructor directly, but use one of the
many convenience functions for instantiation!
"""
_scalar_dom = DomainTuple.scalar_domain()
def __init__(self, domain, val):
if not isinstance(domain, DomainTuple):
raise TypeError("domain must be of type DomainTuple")
if type(val) is not dobj.data_object:
if np.isscalar(val):
val = dobj.full(domain.shape, val)
else:
raise TypeError("val must be of type dobj.data_object")
if domain.shape != val.shape:
raise ValueError("shape mismatch between val and domain")
self._domain = domain
self._val = val
dobj.lock(self._val)
@staticmethod
def scalar(val):
return Field(Field._scalar_dom, val)
# prevent implicit conversion to bool
def __nonzero__(self):
raise TypeError("Field does not support implicit conversion to bool")
def __bool__(self):
raise TypeError("Field does not support implicit conversion to bool")
@staticmethod
def full(domain, val):
"""Creates a Field with a given domain, filled with a constant value.
Parameters
----------
domain : Domain, tuple of Domain, or DomainTuple
domain of the new Field
val : float/complex/int scalar
fill value. Data type of the field is inferred from val.
Returns
-------
Field
the newly created field
"""
if not np.isscalar(val):
raise TypeError("val must be a scalar")
if not (np.isreal(val) or np.iscomplex(val)):
raise TypeError("need arithmetic scalar")
domain = DomainTuple.make(domain)
return Field(domain, val)
@staticmethod
def from_global_data(domain, arr, sum_up=False):
"""Returns a Field constructed from `domain` and `arr`.
Parameters
----------
domain : DomainTuple, tuple of Domain, or Domain
the domain of the new Field
arr : numpy.ndarray
The data content to be used for the new Field.
Its shape must match the shape of `domain`.
sum_up : bool, optional
If True, the contents of `arr` are summed up over all MPI tasks
(if any), and the sum is used as data content.
If False, the contens of `arr` are used directly, and must be
identical on all MPI tasks.
"""
return Field(DomainTuple.make(domain),
dobj.from_global_data(arr, sum_up))
@staticmethod
def from_local_data(domain, arr):
return Field(DomainTuple.make(domain),
dobj.from_local_data(domain.shape, arr))
def to_global_data(self):
"""Returns an array containing the full data of the field.
Returns
-------
numpy.ndarray : array containing all field entries.
Its shape is identical to `self.shape`.
"""
return dobj.to_global_data(self._val)
def to_global_data_rw(self):
"""Returns a modifiable array containing the full data of the field.
Returns
-------
numpy.ndarray : array containing all field entries, which can be
modified. Its shape is identical to `self.shape`.
"""
return dobj.to_global_data_rw(self._val)
@property
def local_data(self):
"""numpy.ndarray : locally residing field data
Returns a handle to the part of the array data residing on the local
task (or to the entore array if MPI is not active).
"""
return dobj.local_data(self._val)
def cast_domain(self, new_domain):
"""Returns a field with the same data, but a different domain
Parameters
----------
new_domain : Domain, tuple of Domain, or DomainTuple
The domain for the returned field. Must be shape-compatible to
`self`.
Returns
-------
Field
Field defined on `new_domain`, but with the same data as `self`.
"""
return Field(DomainTuple.make(new_domain), self._val)
@staticmethod
def from_random(random_type, domain, dtype=np.float64, **kwargs):
"""Draws a random field with the given parameters.
Parameters
----------
random_type : 'pm1', 'normal', or 'uniform'
The random distribution to use.
domain : DomainTuple
The domain of the output random field
dtype : type
The datatype of the output random field
Returns
-------
Field
The newly created Field.
"""
domain = DomainTuple.make(domain)
return Field(domain=domain,
val=dobj.from_random(random_type, dtype=dtype,
shape=domain.shape, **kwargs))
@property
def val(self):
"""dobj.data_object : the data object storing the field's entries
Notes
-----
This property is intended for low-level, internal use only. Do not use
from outside of NIFTy's core; there should be better alternatives.
"""
return self._val
@property
def dtype(self):
"""type : the data type of the field's entries"""
return self._val.dtype
@property
def domain(self):
"""DomainTuple : the field's domain"""
return self._domain
@property
def shape(self):
"""tuple of int : the concatenated shapes of all sub-domains"""
return self._domain.shape
@property
def size(self):
"""int : total number of pixels in the field"""
return self._domain.size
@property
def real(self):
"""Field : The real part of the field"""
if utilities.iscomplextype(self.dtype):
return Field(self._domain, self._val.real)
return self
@property
def imag(self):
"""Field : The imaginary part of the field"""
if not utilities.iscomplextype(self.dtype):
raise ValueError(".imag called on a non-complex Field")
return Field(self._domain, self._val.imag)
def scalar_weight(self, spaces=None):
"""Returns the uniform volume element for a sub-domain of `self`.
Parameters
----------
spaces : int, tuple of int or None
indices of the sub-domains of the field's domain to be considered.
If `None`, the entire domain is used.
Returns
-------
float or None
if the requested sub-domain has a uniform volume element, it is
returned. Otherwise, `None` is returned.
"""
if np.isscalar(spaces):
return self._domain[spaces].scalar_dvol
if spaces is None:
spaces = range(len(self._domain))
res = 1.
for i in spaces:
tmp = self._domain[i].scalar_dvol
if tmp is None:
return None
res *= tmp
return res
def total_volume(self, spaces=None):
"""Returns the total volume of a sub-domain of `self`.
Parameters
----------
spaces : int, tuple of int or None
indices of the sub-domains of the field's domain to be considered.
If `None`, the entire domain is used.
Returns
-------
float
the total volume of the requested sub-domain.
"""
if np.isscalar(spaces):
return self._domain[spaces].total_volume
if spaces is None:
spaces = range(len(self._domain))
res = 1.
for i in spaces:
res *= self._domain[i].total_volume
return res
def weight(self, power=1, spaces=None):
"""Weights the pixels of `self` with their invidual pixel-volume.
Parameters
----------
power : number
The pixels get weighted with the volume-factor**power.
spaces : None, int or tuple of int
Determines on which sub-domain the operation takes place.
If None, the entire domain is used.
Returns
-------
Field
The weighted field.
"""
aout = self.local_data.copy()
spaces = utilities.parse_spaces(spaces, len(self._domain))
fct = 1.
for ind in spaces:
wgt = self._domain[ind].dvol
if np.isscalar(wgt):
fct *= wgt
else:
new_shape = np.ones(len(self.shape), dtype=np.int)
new_shape[self._domain.axes[ind][0]:
self._domain.axes[ind][-1]+1] = wgt.shape
wgt = wgt.reshape(new_shape)
if dobj.distaxis(self._val) >= 0 and ind == 0:
# we need to distribute the weights along axis 0
wgt = dobj.local_data(dobj.from_global_data(wgt))
aout *= wgt**power
fct = fct**power
if fct != 1.:
aout *= fct
return Field.from_local_data(self._domain, aout)
def outer(self, x):
"""Computes the outer product of 'self' with x.
Parameters
----------
x : Field
Returns
----------
Field, defined on the product space of self.domain and x.domain
"""
if not isinstance(x, Field):
raise TypeError("The multiplier must be an instance of " +
"the Field class")
from .operators.outer_product_operator import OuterProduct
return OuterProduct(self, x.domain)(x)
def vdot(self, x=None, spaces=None):
"""Computes the dot product of 'self' with x.
Parameters
----------
x : Field
x must be defined on the same domain as `self`.
spaces : None, int or tuple of int
The dot product is only carried out over the sub-domains in this
tuple. If None, it is carried out over all sub-domains.
Default: None.
Returns
-------
float, complex, either scalar (for full dot products) or Field (for partial dot products).
"""
if not isinstance(x, Field):
raise TypeError("The dot-partner must be an instance of " +
"the Field class")
if x._domain != self._domain:
raise ValueError("Domain mismatch")
ndom = len(self._domain)
spaces = utilities.parse_spaces(spaces, ndom)
if len(spaces) == ndom:
return dobj.vdot(self._val, x._val)
# If we arrive here, we have to do a partial dot product.
# For the moment, do this the explicit, non-optimized way
return (self.conjugate()*x).sum(spaces=spaces)
def norm(self, ord=2):
"""Computes the L2-norm of the field values.
Parameters
----------
ord : int
Accepted values: 1, 2, ..., np.inf. Default: 2.
Returns
-------
float
The L2-norm of the field values.
"""
return dobj.norm(self._val, ord)
def conjugate(self):
"""Returns the complex conjugate of the field.
Returns
-------
Field
The complex conjugated field.
"""
if utilities.iscomplextype(self._val.dtype):
return Field(self._domain, self._val.conjugate())
return self
# ---General unary/contraction methods---
def __pos__(self):
return self
def __neg__(self):
return Field(self._domain, -self._val)
def __abs__(self):
return Field(self._domain, abs(self._val))
def _contraction_helper(self, op, spaces):
if spaces is None:
return getattr(self._val, op)()
spaces = utilities.parse_spaces(spaces, len(self._domain))
axes_list = tuple(self._domain.axes[sp_index] for sp_index in spaces)
if len(axes_list) > 0:
axes_list = reduce(lambda x, y: x+y, axes_list)
# perform the contraction on the data
data = getattr(self._val, op)(axis=axes_list)
# check if the result is scalar or if a result_field must be constr.
if np.isscalar(data):
return data
else:
return_domain = tuple(dom
for i, dom in enumerate(self._domain)
if i not in spaces)
return Field(DomainTuple.make(return_domain), data)
def sum(self, spaces=None):
"""Sums up over the sub-domains given by `spaces`.
Parameters
----------
spaces : None, int or tuple of int
The summation is only carried out over the sub-domains in this
tuple. If None, it is carried out over all sub-domains.
Returns
-------
Field or scalar
The result of the summation. If it is carried out over the entire
domain, this is a scalar, otherwise a Field.
"""
return self._contraction_helper('sum', spaces)
def integrate(self, spaces=None):
"""Integrates over the sub-domains given by `spaces`.
Integration is performed by summing over `self` multiplied by its
volume factors.
Parameters
----------
spaces : None, int or tuple of int
The summation is only carried out over the sub-domains in this
tuple. If None, it is carried out over all sub-domains.
Returns
-------
Field or scalar
The result of the integration. If it is carried out over the
entire domain, this is a scalar, otherwise a Field.
"""
swgt = self.scalar_weight(spaces)
if swgt is not None:
res = self.sum(spaces)
res = res*swgt
return res
tmp = self.weight(1, spaces=spaces)
return tmp.sum(spaces)
def prod(self, spaces=None):
"""Computes the product over the sub-domains given by `spaces`.
Parameters
----------
spaces : None, int or tuple of int
The operation is only carried out over the sub-domains in this
tuple. If None, it is carried out over all sub-domains.
Default: None.
Returns
-------
Field or scalar
The result of the product. If it is carried out over the entire
domain, this is a scalar, otherwise a Field.
"""
return self._contraction_helper('prod', spaces)
def all(self, spaces=None):
return self._contraction_helper('all', spaces)
def any(self, spaces=None):
return self._contraction_helper('any', spaces)
# def min(self, spaces=None):
# """Determines the minimum over the sub-domains given by `spaces`.
#
# Parameters
# ----------
# spaces : None, int or tuple of int (default: None)
# The operation is only carried out over the sub-domains in this
# tuple. If None, it is carried out over all sub-domains.
#
# Returns
# -------
# Field or scalar
# The result of the operation. If it is carried out over the entire
# domain, this is a scalar, otherwise a Field.
# """
# return self._contraction_helper('min', spaces)
#
# def max(self, spaces=None):
# """Determines the maximum over the sub-domains given by `spaces`.
#
# Parameters
# ----------
# spaces : None, int or tuple of int (default: None)
# The operation is only carried out over the sub-domains in this
# tuple. If None, it is carried out over all sub-domains.
#
# Returns
# -------
# Field or scalar
# The result of the operation. If it is carried out over the entire
# domain, this is a scalar, otherwise a Field.
# """
# return self._contraction_helper('max', spaces)
def mean(self, spaces=None):
"""Determines the mean over the sub-domains given by `spaces`.
``x.mean(spaces)`` is equivalent to
``x.integrate(spaces)/x.total_volume(spaces)``.
Parameters
----------
spaces : None, int or tuple of int
The operation is only carried out over the sub-domains in this
tuple. If None, it is carried out over all sub-domains.
Returns
-------
Field or scalar
The result of the operation. If it is carried out over the entire
domain, this is a scalar, otherwise a Field.
"""
if self.scalar_weight(spaces) is not None:
return self._contraction_helper('mean', spaces)
# MR FIXME: not very efficient
# MR FIXME: do we need "spaces" here?
tmp = self.weight(1, spaces)
return tmp.sum(spaces)*(1./tmp.total_volume(spaces))
def var(self, spaces=None):
"""Determines the variance over the sub-domains given by `spaces`.
Parameters
----------
spaces : None, int or tuple of int
The operation is only carried out over the sub-domains in this
tuple. If None, it is carried out over all sub-domains.
Default: None.
Returns
-------
Field or scalar
The result of the operation. If it is carried out over the entire
domain, this is a scalar, otherwise a Field.
"""
if self.scalar_weight(spaces) is not None:
return self._contraction_helper('var', spaces)
# MR FIXME: not very efficient or accurate
m1 = self.mean(spaces)
if utilities.iscomplextype(self.dtype):
sq = abs(self-m1)**2
else:
sq = (self-m1)**2
return sq.mean(spaces)
def std(self, spaces=None):
"""Determines the standard deviation over the sub-domains given by
`spaces`.
``x.std(spaces)`` is equivalent to ``sqrt(x.var(spaces))``.
Parameters
----------
spaces : None, int or tuple of int
The operation is only carried out over the sub-domains in this
tuple. If None, it is carried out over all sub-domains.
Default: None.
Returns
-------
Field or scalar
The result of the operation. If it is carried out over the entire
domain, this is a scalar, otherwise a Field.
"""
from .sugar import sqrt
if self.scalar_weight(spaces) is not None:
return self._contraction_helper('std', spaces)
return sqrt(self.var(spaces))
def __repr__(self):
return ""
def __str__(self):
return "nifty5.Field instance\n- domain = " + \
self._domain.__str__() + \
"\n- val = " + repr(self._val)
def extract(self, dom):
if dom != self._domain:
raise ValueError("domain mismatch")
return self
def unite(self, other):
return self+other
def flexible_addsub(self, other, neg):
return self-other if neg else self+other
def sigmoid(self):
return 0.5*(1.+self.tanh())
def clip(self, min=None, max=None):
return Field(self._domain, dobj.clip(self._val, min, max))
def one_over(self):
return 1/self
def _binary_op(self, other, op):
# if other is a field, make sure that the domains match
f = getattr(self._val, op)
if isinstance(other, Field):
if other._domain != self._domain:
raise ValueError("domains are incompatible.")
return Field(self._domain, f(other._val))
if np.isscalar(other):
return Field(self._domain, f(other))
return NotImplemented
for op in ["__add__", "__radd__",
"__sub__", "__rsub__",
"__mul__", "__rmul__",
"__div__", "__rdiv__",
"__truediv__", "__rtruediv__",
"__floordiv__", "__rfloordiv__",
"__pow__", "__rpow__",
"__lt__", "__le__", "__gt__", "__ge__", "__eq__", "__ne__"]:
def func(op):
def func2(self, other):
return self._binary_op(other, op)
return func2
setattr(Field, op, func(op))
for op in ["__iadd__", "__isub__", "__imul__", "__idiv__",
"__itruediv__", "__ifloordiv__", "__ipow__"]:
def func(op):
def func2(self, other):
raise TypeError(
"In-place operations are deliberately not supported")
return func2
setattr(Field, op, func(op))
for f in ["sqrt", "exp", "log", "tanh",
"sin", "cos", "tan", "cosh", "sinh",
"absolute", "sinc", "sign"]:
def func(f):
def func2(self):
return Field(self._domain, getattr(dobj, f)(self.val))
return func2
setattr(Field, f, func(f))