Commit 6b18bbdf authored by Martin Reinecke's avatar Martin Reinecke
Browse files

replace InvertibleOperatorMixin with InversionEnabler; move...

replace InvertibleOperatorMixin with InversionEnabler; move PowerSpectrum-related functionality out of Field
parent 42b1e9e8
Pipeline #20029 passed with stage
in 4 minutes and 22 seconds
......@@ -19,13 +19,13 @@
from __future__ import division, print_function
from builtins import range
import numpy as np
from .spaces.power_space import PowerSpace
from . import nifty_utilities as utilities
from .random import Random
from .domain_tuple import DomainTuple
from functools import reduce
from . import dobj
class Field(object):
""" The discrete representation of a continuous field over multiple spaces.
......@@ -80,7 +80,8 @@ class Field(object):
raise ValueError("Domain mismatch")
self._val = dobj.from_object(val.val, dtype=dtype, copy=copy)
elif (np.isscalar(val)):
self._val = dobj.full(self.domain.shape, dtype=dtype, fill_value=val)
self._val = dobj.full(self.domain.shape, dtype=dtype,
fill_value=val)
elif isinstance(val, dobj.data_object):
if self.domain.shape == val.shape:
self._val = dobj.from_object(val, dtype=dtype, copy=copy)
......@@ -180,10 +181,6 @@ class Field(object):
-------
out : Field
The output object.
See Also
--------
power_synthesize
"""
domain = DomainTuple.make(domain)
......@@ -191,182 +188,6 @@ class Field(object):
return Field(domain=domain, val=generator_function(dtype=dtype,
shape=domain.shape, **kwargs))
# ---Powerspectral methods---
def power_analyze(self, spaces=None, binbounds=None,
keep_phase_information=False):
""" Computes the square root power spectrum for a subspace of `self`.
Creates a PowerSpace for the space addressed by `spaces` with the given
binning and computes the power spectrum as a Field over this
PowerSpace. This can only be done if the subspace to be analyzed is a
harmonic space. The resulting field has the same units as the initial
field, corresponding to the square root of the power spectrum.
Parameters
----------
spaces : int *optional*
The subspace for which the powerspectrum shall be computed.
(default : None).
binbounds : array-like *optional*
Inner bounds of the bins (default : None).
if binbounds==None : bins are inferred.
keep_phase_information : boolean, *optional*
If False, return a real-valued result containing the power spectrum
of the input Field.
If True, return a complex-valued result whose real component
contains the power spectrum computed from the real part of the
input Field, and whose imaginary component contains the power
spectrum computed from the imaginary part of the input Field.
The absolute value of this result should be identical to the output
of power_analyze with keep_phase_information=False.
(default : False).
Raise
-----
TypeError
Raised if any of the input field's domains is not harmonic
Returns
-------
out : Field
The output object. Its domain is a PowerSpace and it contains
the power spectrum of 'self's field.
See Also
--------
power_synthesize, PowerSpace
"""
# check if all spaces in `self.domain` are either harmonic or
# power_space instances
for sp in self.domain:
if not sp.harmonic and not isinstance(sp, PowerSpace):
print("WARNING: Field has a space in `domain` which is "
"neither harmonic nor a PowerSpace.")
# check if the `spaces` input is valid
if spaces is None:
spaces = range(len(self.domain))
else:
spaces = utilities.cast_iseq_to_tuple(spaces)
if len(spaces) == 0:
raise ValueError("No space for analysis specified.")
if keep_phase_information:
parts = [self.real*self.real, self.imag*self.imag]
else:
parts = [self.real*self.real + self.imag*self.imag]
parts = [ part.weight(1,spaces) for part in parts ]
for space_index in spaces:
parts = [self._single_power_analyze(field=part,
idx=space_index,
binbounds=binbounds)
for part in parts]
return parts[0] + 1j*parts[1] if keep_phase_information else parts[0]
@staticmethod
def _single_power_analyze(field, idx, binbounds):
from .operators.power_projection_operator import PowerProjectionOperator
power_domain = PowerSpace(field.domain[idx], binbounds)
ppo = PowerProjectionOperator(field.domain, power_domain, idx)
return ppo(field.weight(-1))
def _compute_spec(self, spaces):
from .operators.power_projection_operator import PowerProjectionOperator
from .basic_arithmetics import sqrt
if spaces is None:
spaces = range(len(self.domain))
else:
spaces = utilities.cast_iseq_to_tuple(spaces)
# create the result domain
result_domain = list(self.domain)
spec = sqrt(self)
for i in spaces:
result_domain[i] = self.domain[i].harmonic_partner
ppo = PowerProjectionOperator(result_domain, self.domain[i], i)
spec = ppo.adjoint_times(spec)
return spec
def power_synthesize(self, spaces=None, real_power=True, real_signal=True):
""" Yields a sampled field with `self`**2 as its power spectrum.
This method draws a Gaussian random field in the harmonic partner
domain of this field's domains, using this field as power spectrum.
Parameters
----------
spaces : {tuple, int, None} *optional*
Specifies the subspace containing all the PowerSpaces which
should be converted (default : None).
if spaces==None : Tries to convert the whole domain.
real_power : boolean *optional*
Determines whether the power spectrum is treated as intrinsically
real or complex (default : True).
real_signal : boolean *optional*
True will result in a purely real signal-space field
(default : True).
Returns
-------
out : Field
The output object. A random field created with the power spectrum
stored in the `spaces` in `self`.
Notes
-----
For this the spaces specified by `spaces` must be a PowerSpace.
This expects this field to be the square root of a power spectrum, i.e.
to have the unit of the field to be sampled.
See Also
--------
power_analyze
Raises
------
ValueError : If domain specified by `spaces` is not a PowerSpace.
"""
spec = self._compute_spec(spaces)
# create random samples: one or two, depending on whether the
# power spectrum is real or complex
result = [self.from_random('normal', mean=0., std=1.,
domain=spec.domain,
dtype=np.float if real_signal
else np.complex)
for x in range(1 if real_power else 2)]
# MR: dummy call - will be removed soon
if real_signal:
self.from_random('normal', mean=0., std=1.,
domain=spec.domain, dtype=np.float)
# apply the rescaler to the random fields
result[0] *= spec.real
if not real_power:
result[1] *= spec.imag
return result[0] if real_power else result[0] + 1j*result[1]
def power_synthesize_special(self, spaces=None):
spec = self._compute_spec(spaces)
# MR: dummy call - will be removed soon
self.from_random('normal', mean=0., std=1.,
domain=spec.domain, dtype=np.complex)
return spec.real
# ---Properties---
@property
......
from ...operators.endomorphic_operator import EndomorphicOperator
from ...operators.invertible_operator_mixin import InvertibleOperatorMixin
from ...operators.diagonal_operator import DiagonalOperator
class CriticalPowerCurvature(InvertibleOperatorMixin, EndomorphicOperator):
class CriticalPowerCurvature(EndomorphicOperator):
"""The curvature of the CriticalPowerEnergy.
This operator implements the second derivative of the
CriticalPowerEnergy used in some minimization algorithms or
for error estimates of the powerspectrum.
for error estimates of the power spectrum.
Parameters
......@@ -21,15 +20,14 @@ class CriticalPowerCurvature(InvertibleOperatorMixin, EndomorphicOperator):
# ---Overwritten properties and methods---
def __init__(self, theta, T, inverter, preconditioner=None, **kwargs):
def __init__(self, theta, T):
self.theta = DiagonalOperator(theta)
self.T = T
if preconditioner is None:
preconditioner = self.theta.inverse_times
super(CriticalPowerCurvature, self).__init__(
inverter=inverter,
preconditioner=preconditioner,
**kwargs)
super(CriticalPowerCurvature, self).__init__()
@property
def preconditioner(self):
return self.theta.inverse_times
def _times(self, x):
return self.T(x) + self.theta(x)
......
from ...energies.energy import Energy
from ...operators.smoothness_operator import SmoothnessOperator
from ...operators.inversion_enabler import InversionEnabler
from . import CriticalPowerCurvature
from ...memoization import memo
from ...sugar import generate_posterior_sample
from ...sugar import generate_posterior_sample, power_analyze
from ... import Field, exp
......@@ -95,8 +96,9 @@ class CriticalPowerEnergy(Energy):
@property
def curvature(self):
curvature = CriticalPowerCurvature(theta=self._theta.weight(-1),
T=self.T, inverter=self._inverter)
curvature = InversionEnabler(CriticalPowerCurvature(
theta=self._theta.weight(-1),
T=self.T), inverter=self._inverter)
return curvature
# ---Added properties and methods---
......@@ -119,7 +121,8 @@ class CriticalPowerEnergy(Energy):
# self.logger.info("Drawing sample %i" % i)
posterior_sample = generate_posterior_sample(
self.m, self.D)
projected_sample = posterior_sample.power_analyze(
projected_sample = power_analyze(
posterior_sample,
binbounds=self.position.domain[0].binbounds)
w += (projected_sample) * self.rho
w /= float(self.samples)
......
from ...operators import EndomorphicOperator,\
InvertibleOperatorMixin
from ...operators import EndomorphicOperator
from ...memoization import memo
from ...basic_arithmetics import exp
from ...sugar import create_composed_fft_operator
class LogNormalWienerFilterCurvature(InvertibleOperatorMixin,
EndomorphicOperator):
class LogNormalWienerFilterCurvature(EndomorphicOperator):
"""The curvature of the LogNormalWienerFilterEnergy.
This operator implements the second derivative of the
......@@ -24,7 +22,7 @@ class LogNormalWienerFilterCurvature(InvertibleOperatorMixin,
The prior signal covariance
"""
def __init__(self, R, N, S, d, position, inverter, fft4exp=None, **kwargs):
def __init__(self, R, N, S, d, position, fft4exp=None):
self.R = R
self.N = N
self.S = S
......@@ -37,9 +35,7 @@ class LogNormalWienerFilterCurvature(InvertibleOperatorMixin,
else:
self._fft = fft4exp
super(LogNormalWienerFilterCurvature, self).__init__(
inverter=inverter,
**kwargs)
super(LogNormalWienerFilterCurvature, self).__init__()
@property
def domain(self):
......
......@@ -2,7 +2,7 @@ from ...energies.energy import Energy
from ...memoization import memo
from . import LogNormalWienerFilterCurvature
from ...sugar import create_composed_fft_operator
from ...operators.inversion_enabler import InversionEnabler
class LogNormalWienerFilterEnergy(Energy):
"""The Energy for the log-normal Wiener filter.
......@@ -47,19 +47,20 @@ class LogNormalWienerFilterEnergy(Energy):
@memo
def value(self):
return 0.5*(self.position.vdot(self._Sp) +
self.curvature._Rexppd.vdot(self.curvature._NRexppd))
self.curvature.op._Rexppd.vdot(self.curvature.op._NRexppd))
@property
@memo
def gradient(self):
return self._Sp + self.curvature._exppRNRexppd
return self._Sp + self.curvature.op._exppRNRexppd
@property
@memo
def curvature(self):
return LogNormalWienerFilterCurvature(R=self.R, N=self.N, S=self.S,
return InversionEnabler(
LogNormalWienerFilterCurvature(R=self.R, N=self.N, S=self.S,
d=self.d, position=self.position,
fft4exp=self._fft,
fft4exp=self._fft),
inverter=self._inverter)
@property
......
from ...operators import EndomorphicOperator,\
InvertibleOperatorMixin
from ...operators import EndomorphicOperator
class WienerFilterCurvature(InvertibleOperatorMixin, EndomorphicOperator):
class WienerFilterCurvature(EndomorphicOperator):
"""The curvature of the WienerFilterEnergy.
This operator implements the second derivative of the
......@@ -20,16 +19,15 @@ class WienerFilterCurvature(InvertibleOperatorMixin, EndomorphicOperator):
The prior signal covariance
"""
def __init__(self, R, N, S, inverter, preconditioner=None, **kwargs):
def __init__(self, R, N, S):
self.R = R
self.N = N
self.S = S
if preconditioner is None:
preconditioner = self.S.times
super(WienerFilterCurvature, self).__init__(
inverter=inverter,
preconditioner=preconditioner,
**kwargs)
super(WienerFilterCurvature, self).__init__()
@property
def preconditioner(self):
return self.S.times
@property
def domain(self):
......
from ...energies.energy import Energy
from ...memoization import memo
from ...operators.inversion_enabler import InversionEnabler
from . import WienerFilterCurvature
......@@ -49,7 +50,8 @@ class WienerFilterEnergy(Energy):
@property
@memo
def curvature(self):
return WienerFilterCurvature(R=self.R, N=self.N, S=self.S,
return InversionEnabler(WienerFilterCurvature(R=self.R, N=self.N,
S=self.S),
inverter=self._inverter)
@property
......
......@@ -11,7 +11,7 @@ from .direct_smoothing_operator import DirectSmoothingOperator
from .fft_operator import FFTOperator
from .invertible_operator_mixin import InvertibleOperatorMixin
from .inversion_enabler import InversionEnabler
from .composed_operator import ComposedOperator
......
......@@ -16,60 +16,82 @@
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from builtins import object
from ..energies import QuadraticEnergy
from ..field import Field
from .linear_operator import LinearOperator
class InvertibleOperatorMixin(object):
""" Mixin class to invert implicit defined operators.
class InversionEnabler(LinearOperator):
This class provides the functionality necessary to invert the application
of a given implicitly defined operator on a field. Inheriting
functionality from this class provides the derived class with the
operations inverse to the defined operator applications
(e.g. .inverse_times if .times is defined and
.adjoint_times if .adjoint_inverse_times is defined)
def __init__(self, op, inverter, preconditioner=None):
self._op = op
self._inverter = inverter
if preconditioner is None and hasattr(op, "preconditioner"):
self._preconditioner = op.preconditioner
else:
self._preconditioner = preconditioner
super(InversionEnabler, self).__init__()
Parameters
----------
inverter : Inverter
An instance of an Inverter class.
"""
@property
def domain(self):
return self._op.domain
def __init__(self, inverter, preconditioner=None, *args, **kwargs):
self.__inverter = inverter
self._preconditioner = preconditioner
super(InvertibleOperatorMixin, self).__init__(*args, **kwargs)
@property
def target(self):
return self._op.target
@property
def unitary(self):
return self._op.unitary
@property
def op(self):
return self._op
def _times(self, x):
try:
res = self._op._times(x)
except NotImplementedError:
x0 = Field.zeros(self.target, dtype=x.dtype)
(result, convergence) = self.__inverter(QuadraticEnergy(
A=self.inverse_times,
(result, convergence) = self._inverter(QuadraticEnergy(
A=self._op.inverse_times,
b=x, position=x0),
preconditioner=self._preconditioner)
return result.position
res = result.position
return res
def _adjoint_times(self, x):
try:
res = self._op._adjoint_times(x)
except NotImplementedError:
x0 = Field.zeros(self.domain, dtype=x.dtype)
(result, convergence) = self.__inverter(QuadraticEnergy(
(result, convergence) = self._inverter(QuadraticEnergy(
A=self.adjoint_inverse_times,
b=x, position=x0),
preconditioner=self._preconditioner)
return result.position
res = result.position
return res
def _inverse_times(self, x):
try:
res = self._op._inverse_times(x)
except NotImplementedError:
x0 = Field.zeros(self.domain, dtype=x.dtype)
(result, convergence) = self.__inverter(QuadraticEnergy(
(result, convergence) = self._inverter(QuadraticEnergy(
A=self.times,
b=x, position=x0),
preconditioner=self._preconditioner)
return result.position
res = result.position
return res
def _adjoint_inverse_times(self, x):
try:
res = self._op._adjoint_inverse_times(x)
except NotImplementedError:
x0 = Field.zeros(self.target, dtype=x.dtype)
(result, convergence) = self.__inverter(QuadraticEnergy(
(result, convergence) = self._inverter(QuadraticEnergy(
A=self.adjoint_times,
b=x, position=x0),
preconditioner=self._preconditioner)
return result.position
res = result.position
return res
......@@ -20,7 +20,6 @@ from builtins import str
import abc
from ..nifty_meta import NiftyMeta
from ..field import Field
from .. import nifty_utilities as utilities
from future.utils import with_metaclass
......@@ -110,8 +109,8 @@ class LinearOperator(with_metaclass(
self._check_input_compatibility(x, inverse=True)
try:
y = self._inverse_times(x)
except(NotImplementedError):
if (self.unitary):
except NotImplementedError:
if self.unitary:
y = self._adjoint_times(x)
else:
raise
......@@ -136,8 +135,8 @@ class LinearOperator(with_metaclass(
self._check_input_compatibility(x, inverse=True)
try:
y = self._adjoint_times(x)
except(NotImplementedError):
if (self.unitary):
except NotImplementedError:
if self.unitary:
y = self._inverse_times(x)
else:
raise
......@@ -164,7 +163,7 @@ class LinearOperator(with_metaclass(
self._check_input_compatibility(x)
try:
y = self._adjoint_inverse_times(x)
except(NotImplementedError):
except NotImplementedError:
if self.unitary:
y = self._times(x)
else:
......
......@@ -25,13 +25,190 @@ from . import Space,\
DiagonalOperator,\
FFTOperator,\
sqrt
from . import nifty_utilities as utilities
__all__ = ['create_power_field',
__all__ = ['power_analyze',
'power_synthesize',
'power_synthesize_special',
'create_power_field',
'create_power_operator',
'generate_posterior_sample',
'create_composed_fft_operator']
def _single_power_analyze(field, idx, binbounds):
from .operators.power_projection_operator import PowerProjectionOperator
power_domain = PowerSpace(field.domain[idx], binbounds)
ppo = PowerProjectionOperator(field.domain, power_domain, idx)
return ppo(field.weight(-1))
def power_analyze(field, spaces=None, binbounds=None,
keep_phase_information=False):
""" Computes the square root power spectrum for a subspace of `field`.
Creates a PowerSpace for the space addressed by `spaces` with the given
binning and computes the power spectrum as a Field over this
PowerSpace. This can only be done if the subspace to be analyzed is a
harmonic space. The resulting field has the same units as the initial
field, corresponding to the square root of the power spectrum.
Parameters
----------
field : Field
The field to be analyzed
spaces : int *optional*
The subspace for which the powerspectrum shall be computed.