Commit d3fe4057 authored by Theo Steininger's avatar Theo Steininger

Rearranged operators from library. Refactored wiener_filter operators.

parent ab2a8289
Pipeline #14550 failed with stage
in 4 minutes and 48 seconds
from operator_library import *
from energy_library import *
from critical_filter import *
from wiener_filter import *
from wiener_filter_curvature import WienerFilterCurvature
# -*- coding: utf-8 -*-
from critical_power_curvature import CriticalPowerCurvature
from smoothness_operator import SmoothnessOperator
\ No newline at end of file
from critical_power_energy import CriticalPowerEnergy
......@@ -5,6 +5,7 @@ from nifty.library.operator_library import CriticalPowerCurvature,\
from nifty.sugar import generate_posterior_sample
from nifty import Field, exp
class CriticalPowerEnergy(Energy):
"""The Energy of the power spectrum according to the critical filter.
......
# -*- coding: utf-8 -*-
from wiener_filter_curvature import WienerFilterCurvature
from wiener_filter_energy import WienerFilterEnergy
from critical_power_energy import CriticalPowerEnergy
\ No newline at end of file
......@@ -30,8 +30,10 @@ class WienerFilterCurvature(InvertibleOperatorMixin, EndomorphicOperator):
if preconditioner is None:
preconditioner = self.S.times
self._domain = self.S.domain
super(WienerFilterCurvature, self).__init__(inverter=inverter,
super(WienerFilterCurvature, self).__init__(
inverter=inverter,
preconditioner=preconditioner)
@property
def domain(self):
return self._domain
......@@ -47,5 +49,5 @@ class WienerFilterCurvature(InvertibleOperatorMixin, EndomorphicOperator):
# ---Added properties and methods---
def _times(self, x, spaces):
return self.R.adjoint_times(self.N.inverse_times(self.R(x)))\
+ self.S.inverse_times(x)
return (self.R.adjoint_times(self.N.inverse_times(self.R(x))) +
self.S.inverse_times(x))
......@@ -2,10 +2,11 @@ from nifty.energies.energy import Energy
from nifty.energies.memoization import memo
from nifty.library.operator_library import WienerFilterCurvature
class WienerFilterEnergy(Energy):
"""The Energy for the Wiener filter.
It describes the situation of linear measurement with
It covers the case of linear measurement with
Gaussian noise and Gaussain signal prior with known covariance.
Parameters
......@@ -23,7 +24,7 @@ class WienerFilterEnergy(Energy):
"""
def __init__(self, position, d, R, N, S, inverter=None):
super(WienerFilterEnergy, self).__init__(position = position)
super(WienerFilterEnergy, self).__init__(position=position)
self.d = d
self.R = R
self.N = N
......@@ -31,30 +32,31 @@ class WienerFilterEnergy(Energy):
self.inverter = inverter
def at(self, position):
return self.__class__(position, self.d, self.R, self.N, self.S)
return self.__class__(position=position, d=self.d, R=self.R, N=self.N,
S=self.S, inverter=self.inverter)
@property
@memo
def value(self):
residual = self._residual()
energy = 0.5 * self.position.vdot(self.S.inverse_times(self.position))
energy += 0.5 * (residual).vdot(self.N.inverse_times(residual))
return energy.real
return 0.5*self.position.vdot(self._Dx) - self._j.vdot(self.position)
@property
@memo
def gradient(self):
residual = self._residual()
gradient = self.S.inverse_times(self.position)
gradient -= self.R.adjoint_times(
self.N.inverse_times(residual))
return gradient
return self._Dx - self._j
@property
@memo
def curvature(self):
curvature = WienerFilterCurvature(R=self.R, N=self.N, S=self.S, inverter=self.inverter)
return curvature
return WienerFilterCurvature(R=self.R, N=self.N, S=self.S,
inverter=self.inverter)
@property
@memo
def _residual(self):
residual = self.d - self.R(self.position)
return residual
def _Dx(self):
return self.curvature(self.position)
@property
@memo
def _j(self):
return self.R.adjoint_times(self.N.inverse_times(self.d))
......@@ -37,3 +37,5 @@ from composed_operator import ComposedOperator
from response_operator import ResponseOperator
from laplace_operator import LaplaceOperator
from smoothness_operator import SmoothnessOperator
......@@ -17,9 +17,10 @@
# and financially supported by the Studienstiftung des deutschen Volkes.
import numpy as np
from nifty import Field,\
EndomorphicOperator,\
PowerSpace
from nifty.field import Field
from nifty.spaces.power_space import PowerSpace
from nifty.operators.endomorphic_operator import EndomorphicOperator
import nifty.nifty_utilities as utilities
......
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2017 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from propagator_operator import PropagatorOperator
from harmonic_propagator_operator import HarmonicPropagatorOperator
\ No newline at end of file
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2017 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from nifty.operators import EndomorphicOperator,\
FFTOperator,\
InvertibleOperatorMixin
class HarmonicPropagatorOperator(InvertibleOperatorMixin, EndomorphicOperator):
""" NIFTY Harmonic Propagator Operator D.
The propagator operator D, is known from the Wiener Filter.
Its inverse functional form might look like:
D = (S^(-1) + M)^(-1)
D = (S^(-1) + N^(-1))^(-1)
D = (S^(-1) + R^(\dagger) N^(-1) R)^(-1)
In contrast to the PropagatorOperator the inference is done in the
harmonic space.
Parameters
----------
S : LinearOperator
Covariance of the signal prior.
M : LinearOperator
Likelihood contribution.
R : LinearOperator
Response operator translating signal to (noiseless) data.
N : LinearOperator
Covariance of the noise prior or the likelihood, respectively.
inverter : class to invert explicitly defined operators
(default:ConjugateGradient)
preconditioner : Field
numerical preconditioner to speed up convergence
default_spaces : tuple of ints *optional*
Defines on which space(s) of a given field the Operator acts by
default (default: None)
Attributes
----------
domain : tuple of DomainObjects, i.e. Spaces and FieldTypes
The domain on which the Operator's input Field lives.
target : tuple of DomainObjects, i.e. Spaces and FieldTypes
The domain in which the outcome of the operator lives. As the Operator
is endomorphic this is the same as its domain.
unitary : boolean
Indicates whether the Operator is unitary or not.
self_adjoint : boolean
Indicates whether the operator is self_adjoint or not.
Raises
------
ValueError
is raised if
* neither N nor M is given
Notes
-----
Examples
--------
See Also
--------
Scientific reference
https://arxiv.org/abs/0806.3474
"""
# ---Overwritten properties and methods---
def __init__(self, S, M=None, R=None, N=None, inverter=None,
preconditioner=None):
"""
Sets the standard operator properties and `codomain`, `_A1`, `_A2`,
and `RN` if required.
Parameters
----------
S : operator
Covariance of the signal prior.
M : operator
Likelihood contribution.
R : operator
Response operator translating signal to (noiseless) data.
N : operator
Covariance of the noise prior or the likelihood, respectively.
"""
# infer domain, and target
if M is not None:
self._codomain = M.domain
self._likelihood = M.times
elif N is None:
raise ValueError("Either M or N must be given!")
elif R is not None:
self._codomain = R.domain
self._likelihood = \
lambda z: R.adjoint_times(N.inverse_times(R.times(z)))
else:
self._codomain = N.domain
self._likelihood = lambda z: N.inverse_times(z)
self._domain = S.domain
self._S = S
self._fft_S = FFTOperator(self._domain, target=self._codomain)
super(HarmonicPropagatorOperator, self).__init__(inverter=inverter,
preconditioner=preconditioner)
# ---Mandatory properties and methods---
@property
def domain(self):
return self._domain
@property
def self_adjoint(self):
return True
@property
def unitary(self):
return False
# ---Added properties and methods---
def _likelihood_times(self, x, spaces=None):
transformed_x = self._fft_S.times(x, spaces=spaces)
y = self._likelihood(transformed_x)
transformed_y = self._fft_S.adjoint_times(y, spaces=spaces)
result = x.copy_empty()
result.set_val(transformed_y, copy=False)
return result
def _inverse_times(self, x, spaces):
pre_result = self._S.inverse_times(x, spaces)
pre_result += self._likelihood_times(x)
result = x.copy_empty()
result.set_val(pre_result, copy=False)
return result
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2017 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from nifty.operators import EndomorphicOperator,\
FFTOperator,\
InvertibleOperatorMixin
class PropagatorOperator(InvertibleOperatorMixin, EndomorphicOperator):
""" NIFTY Propagator Operator D.
The propagator operator D, is known from the Wiener Filter.
Its inverse functional form might look like:
D = (S^(-1) + M)^(-1)
D = (S^(-1) + N^(-1))^(-1)
D = (S^(-1) + R^(\dagger) N^(-1) R)^(-1)
Parameters
----------
S : LinearOperator
Covariance of the signal prior.
M : LinearOperator
Likelihood contribution.
R : LinearOperator
Response operator translating signal to (noiseless) data.
N : LinearOperator
Covariance of the noise prior or the likelihood, respectively.
inverter : class to invert explicitly defined operators
(default:ConjugateGradient)
preconditioner : Field
numerical preconditioner to speed up convergence
default_spaces : tuple of ints *optional*
Defines on which space(s) of a given field the Operator acts by
default (default: None)
Attributes
----------
domain : tuple of DomainObjects, i.e. Spaces and FieldTypes
The domain on which the Operator's input Field lives.
target : tuple of DomainObjects, i.e. Spaces and FieldTypes
The domain in which the outcome of the operator lives. As the Operator
is endomorphic this is the same as its domain.
unitary : boolean
Indicates whether the Operator is unitary or not.
self_adjoint : boolean
Indicates whether the operator is self_adjoint or not.
Raises
------
ValueError
is raised if
* neither N nor M is given
Notes
-----
Examples
--------
>>> x_space = RGSpace(4)
>>> k_space = RGRGTransformation.get_codomain(x_space)
>>> f = Field(x_space, val=[2, 4, 6, 8])
>>> S = create_power_operator(k_space, spec=1)
>>> N = DiagonalOperaor(f.domain, diag=1)
>>> D = PropagatorOperator(S=S, N=N) # D^{-1} = S^{-1} + N^{-1}
>>> D(f).val
<distributed_data_object>
array([ 1., 2., 3., 4.]
See Also
--------
Scientific reference
https://arxiv.org/abs/0806.3474
"""
# ---Overwritten properties and methods---
def __init__(self, S=None, M=None, R=None, N=None, inverter=None,
preconditioner=None, default_spaces=None):
"""
Sets the standard operator properties and `codomain`, `_A1`, `_A2`,
and `RN` if required.
Parameters
----------
S : operator
Covariance of the signal prior.
M : operator
Likelihood contribution.
R : operator
Response operator translating signal to (noiseless) data.
N : operator
Covariance of the noise prior or the likelihood, respectively.
"""
# infer domain, and target
if M is not None:
self._domain = M.domain
self._likelihood_times = M.times
elif N is None:
raise ValueError("Either M or N must be given!")
elif R is not None:
self._domain = R.domain
self._likelihood_times = \
lambda z: R.adjoint_times(N.inverse_times(R.times(z)))
else:
self._domain = N.domain
self._likelihood_times = lambda z: N.inverse_times(z)
self._S = S
self._fft_S = FFTOperator(self._domain, target=self._S.domain)
if preconditioner is None:
preconditioner = self._S_times
super(PropagatorOperator, self).__init__(inverter=inverter,
preconditioner=preconditioner,
default_spaces=default_spaces)
# ---Mandatory properties and methods---
@property
def domain(self):
return self._domain
@property
def self_adjoint(self):
return True
@property
def unitary(self):
return False
# ---Added properties and methods---
def _S_times(self, x, spaces=None):
transformed_x = self._fft_S(x, spaces=spaces)
y = self._S(transformed_x, spaces=spaces)
transformed_y = self._fft_S.inverse_times(y, spaces=spaces)
result = x.copy_empty()
result.set_val(transformed_y, copy=False)
return result
def _S_inverse_times(self, x, spaces=None):
transformed_x = self._fft_S(x, spaces=spaces)
y = self._S.inverse_times(transformed_x, spaces=spaces)
transformed_y = self._fft_S.inverse_times(y, spaces=spaces)
result = x.copy_empty()
result.set_val(transformed_y, copy=False)
return result
def _inverse_times(self, x, spaces):
pre_result = self._S_inverse_times(x, spaces)
pre_result += self._likelihood_times(x)
result = x.copy_empty()
result.set_val(pre_result, copy=False)
return result
# -*- coding: utf-8 -*-
from smoothness_operator import SmoothnessOperator
from nifty import EndomorphicOperator,\
PowerSpace
import nifty.nifty_utilities as utilities
from laplace_operator import LaplaceOperator
from nifty.spaces.power_space import PowerSpace
from nifty.operators.endomorphic_operator import EndomorphicOperator
from nifty.operators.laplace_operator import LaplaceOperator
class SmoothnessOperator(EndomorphicOperator):
......@@ -25,28 +24,28 @@ class SmoothnessOperator(EndomorphicOperator):
default : True
"""
def __init__(self, domain, sigma ,logarithmic = True,
default_spaces=None):
# ---Overwritten properties and methods---
super(SmoothnessOperator, self).__init__(default_spaces)
def __init__(self, domain, sigma, logarithmic=True, default_spaces=None):
if (not isinstance(domain, PowerSpace)):
raise TypeError("The domain has to live over a PowerSpace")
super(SmoothnessOperator, self).__init__(default_spaces=default_spaces)
self._domain = self._parse_domain(domain)
if len(self.domain) != 0:
raise ValueError("The domain must contain exactly one PowerSpace.")
if (sigma <= 0):
if not isinstance(self.domain[0], PowerSpace):
raise TypeError("The domain must contain exactly one PowerSpace.")
if sigma <= 0:
raise ValueError("ERROR: invalid sigma.")
self._sigma = sigma
self._Laplace = LaplaceOperator(domain=domain, logarithmic=logarithmic)
self._laplace = LaplaceOperator(domain=self.domain,
logarithmic=logarithmic)
@property
def sigma(self):
return self._sigma
# ---Mandatory properties and methods---
@property
def target(self):
......@@ -69,6 +68,11 @@ class SmoothnessOperator(EndomorphicOperator):
return False
def _times(self, x, spaces):
res = self._Laplace.adjoint_times(self._Laplace.times(x,spaces), spaces)
res = self._aplace.adjoint_times(self._laplace(x, spaces), spaces)
return (1./self.sigma)**2*res
# ---Added properties and methods---
@property
def sigma(self):
return self._sigma
......@@ -24,7 +24,6 @@ from nifty.minimization.conjugate_gradient import ConjugateGradient
__all__ = ['create_power_operator']
def create_power_operator(domain, power_spectrum, dtype=None,
distribution_strategy='not'):
""" Creates a diagonal operator with the given power spectrum.
......@@ -54,11 +53,9 @@ def create_power_operator(domain, power_spectrum, dtype=None,
if isinstance(power_spectrum, Field):
power_domain = power_spectrum.domain
else :
else:
power_domain = PowerSpace(domain,
distribution_strategy=distribution_strategy)
distribution_strategy=distribution_strategy)
fp = Field(power_domain, val=power_spectrum, dtype=dtype,
distribution_strategy=distribution_strategy)
......@@ -66,7 +63,8 @@ def create_power_operator(domain, power_spectrum, dtype=None,
f **= 2
return DiagonalOperator(domain, diagonal=f, bare=True)
def generate_posterior_sample(mean, covariance, inverter = None):
def generate_posterior_sample(mean, covariance, inverter=None):
""" Generates a posterior sample from a Gaussian distribution with given mean and covariance
This method generates samples by setting up the observation and reconstruction of a mock signal
......@@ -99,15 +97,13 @@ def generate_posterior_sample(mean, covariance, inverter = None):
power = S.diagonal().power_analyze()**.5
mock_signal = power.power_synthesize(real_signal=True)
noise = N.diagonal(bare=True).val
mock_noise = Field.from_random(random_type="normal", domain=N.domain,
std = sqrt(noise), dtype = noise.dtype)
std=sqrt(noise), dtype=noise.dtype)
mock_data = R(mock_signal) + mock_noise
mock_j = R.adjoint_times(N.inverse_times(mock_data))
mock_m = covariance.inverse_times(mock_j)
sample = mock_signal - mock_m + mean
return sample
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment