Commit fdf2bec6 authored by Reimar H Leike's avatar Reimar H Leike

rewrote NonlinearWienerFilterEnergy and adjusted tests accordingly. Deleted...

rewrote NonlinearWienerFilterEnergy and adjusted tests accordingly. Deleted NoiseEnergy and PowerEnergy as they are obsolete
parent 67e96903
from .los_response import LOSResponse
from .noise_energy import NoiseEnergy
from .nonlinear_power_energy import NonlinearPowerEnergy
from .nonlinear_wiener_filter_energy import NonlinearWienerFilterEnergy
from .nonlinearities import Exponential, Linear, PositiveTanh, Tanh
from .poisson_energy import PoissonEnergy
......@@ -8,3 +6,4 @@ from .unit_log_gauss import UnitLogGauss
from .poisson_log_likelihood import PoissonLogLikelihood
from .wiener_filter_curvature import WienerFilterCurvature
from .wiener_filter_energy import WienerFilterEnergy
from .hamiltonian import Hamiltonian
from ..minimization.energy import Energy
from ..operators import InversionEnabler, SamplingEnabler
from ..models.variable import Variable
from ..utilities import memo
from .unit_log_gauss import UnitLogGauss
class Hamiltonian(Energy):
def __init__(self, lh, iteration_controller,
iteration_controller_sampling=None):
"""
lh: Likelihood (energy object)
prior:
"""
super(Hamiltonian, self).__init__(lh.position)
self._lh = lh
self._ic = iteration_controller
if iteration_controller_sampling is None:
self._ic_samp = iteration_controller
else:
self._ic_samp = iteration_controller_sampling
self._prior = UnitLogGauss(Variable(self.position))
self._precond = self._prior.curvature
def at(self, position):
return self.__class__(self._lh.at(position), self._ic, self._ic_samp)
@property
@memo
def value(self):
return self._lh.value + self._prior.value
@property
@memo
def gradient(self):
return self._lh.gradient + self._prior.gradient
@property
@memo
def curvature(self):
prior_curv = self._prior.curvature
c = SamplingEnabler(self._lh.curvature, prior_curv.inverse,
self._ic_samp, prior_curv.inverse)
return InversionEnabler(c, self._ic, self._precond)
def __str__(self):
res = 'Likelihood:\t{:.2E}\n'.format(self._lh.value)
res += 'Prior:\t\t{:.2E}'.format(self._prior.value)
return res
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2018 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from ..sugar import exp
from ..minimization.energy import Energy
from ..operators.diagonal_operator import DiagonalOperator
import numpy as np
class NoiseEnergy(Energy):
def __init__(self, position, alpha, q, res_sample_list):
super(NoiseEnergy, self).__init__(position)
self.N = DiagonalOperator(exp(position))
self.alpha = alpha
self.q = q
self.res_sample_list = res_sample_list
self._gradient = None
for s in res_sample_list:
lh = .5 * s.vdot(self.N.inverse_times(s))
grad = -.5 * self.N.inverse_times(s.conjugate()*s)
if self._gradient is None:
self._value = lh
self._gradient = grad
else:
self._value += lh
self._gradient += grad
expmpos = exp(-position)
self._value *= 1./len(res_sample_list)
possum = position.sum()
s1 = (alpha-1.)*possum if np.isscalar(alpha) \
else (alpha-1.).vdot(position)
s2 = q*expmpos.sum() if np.isscalar(q) else q.vdot(expmpos)
self._value += .5*possum + s1 + s2
self._gradient *= 1./len(res_sample_list)
self._gradient += (alpha-0.5) - q*expmpos
self._gradient.lock()
def at(self, position):
return self.__class__(position, self.alpha, self.q,
self.res_sample_list)
@property
def value(self):
return self._value
@property
def gradient(self):
return self._gradient
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2018 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from ..sugar import exp, makeOp
from ..minimization.energy import Energy
from ..operators.smoothness_operator import SmoothnessOperator
from ..operators.inversion_enabler import InversionEnabler
from ..utilities import memo
def _LinearizedPowerResponse(Instrument, nonlinearity, ht, Distributor, tau,
xi):
power = exp(0.5*tau)
position = ht(Distributor(power)*xi)
linearization = makeOp(nonlinearity.derivative(position))
return (0.5 * Instrument * linearization * ht * makeOp(xi) * Distributor *
makeOp(power))
class NonlinearPowerEnergy(Energy):
"""The Energy of the power spectrum according to the critical filter.
It describes the energy of the logarithmic amplitudes of the power spectrum
of a Gaussian random field under reconstruction uncertainty with smoothness
and inverse gamma prior. It is used to infer the correlation structure of a
correlated signal. The smoothness prior operates on logarithmic scale, i.e.
it prefers power-law-like power spectra.
Parameters
----------
position : Field
The current position of this energy.
xi : Field
The excitation field.
D : EndomorphicOperator
The curvature of the Gaussian encoding the posterior covariance.
If not specified, the map is assumed to be no reconstruction.
default : None
sigma : float
The parameter of the smoothness prior. Needs to be positive. A bigger
number means a stronger smoothness prior.
default : 0
samples : int
Number of samples used for the estimation of the uncertainty
corrections.
default : 3
"""
# MR FIXME: docstring incomplete and outdated
def __init__(self, position, d, N, xi, D, ht, Instrument, nonlinearity,
Distributor, sigma=0., samples=3, xi_sample_list=None,
iteration_controller=None):
super(NonlinearPowerEnergy, self).__init__(position)
self.xi = xi
self.D = D
self.d = d
self.N = N
self.T = SmoothnessOperator(domain=position.domain[0],
strength=sigma, logarithmic=True)
self.ht = ht
self.Instrument = Instrument
self.nonlinearity = nonlinearity
self.Distributor = Distributor
self.sigma = sigma
if xi_sample_list is None:
if samples is None or samples == 0:
xi_sample_list = [xi]
else:
xi_sample_list = [D.draw_sample(from_inverse=True) + xi
for _ in range(samples)]
self.xi_sample_list = xi_sample_list
self._ic = iteration_controller
A = Distributor(exp(.5 * position))
self._gradient = None
for xi_sample in self.xi_sample_list:
map_s = ht(A*xi_sample)
LinR = _LinearizedPowerResponse(Instrument, nonlinearity, ht,
Distributor, position, xi_sample)
residual = d - Instrument(nonlinearity(map_s))
tmp = N.inverse_times(residual)
lh = 0.5 * residual.vdot(tmp)
grad = LinR.adjoint_times(tmp)
if self._gradient is None:
self._value = lh
self._gradient = grad.copy()
else:
self._value += lh
self._gradient += grad
self._value *= 1. / len(self.xi_sample_list)
Tpos = self.T(position)
self._value += 0.5 * position.vdot(Tpos)
self._gradient *= -1. / len(self.xi_sample_list)
self._gradient += Tpos
self._gradient.lock()
def at(self, position):
return self.__class__(position, self.d, self.N, self.xi, self.D,
self.ht, self.Instrument, self.nonlinearity,
self.Distributor, sigma=self.sigma,
samples=len(self.xi_sample_list),
xi_sample_list=self.xi_sample_list,
iteration_controller=self._ic)
@property
def value(self):
return self._value
@property
def gradient(self):
return self._gradient
@property
@memo
def curvature(self):
result = None
for xi_sample in self.xi_sample_list:
LinearizedResponse = _LinearizedPowerResponse(
self.Instrument, self.nonlinearity, self.ht, self.Distributor,
self.position, xi_sample)
op = LinearizedResponse.adjoint*self.N.inverse*LinearizedResponse
result = op if result is None else result + op
result = result*(1./len(self.xi_sample_list)) + self.T
return InversionEnabler(result, self._ic)
......@@ -16,53 +16,14 @@
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from .wiener_filter_curvature import WienerFilterCurvature
from ..utilities import memo
from ..minimization.energy import Energy
from ..sugar import makeOp
from ..models.constant import Constant
from .unit_log_gauss import UnitLogGauss
from .hamiltonian import Hamiltonian
class NonlinearWienerFilterEnergy(Energy):
def __init__(self, position, d, Instrument, nonlinearity, ht, power, N, S,
iteration_controller=None,
iteration_controller_sampling=None):
super(NonlinearWienerFilterEnergy, self).__init__(position=position)
self.d = d.lock()
self.Instrument = Instrument
self.nonlinearity = nonlinearity
self.ht = ht
self.power = power
m = ht(power*position)
def NonlinearWienerFilterEnergy(measured_data, data_model, sqrtN, iteration_controller):
d = measured_data.lock()
residual = Constant(data_model.position, d) - data_model
lh = UnitLogGauss(sqrtN.inverse(residual))
return Hamiltonian(lh, iteration_controller)
residual = d - Instrument(nonlinearity(m))
self.N = N
self.S = S
self._ic = iteration_controller
if iteration_controller_sampling is None:
iteration_controller_sampling = self._ic
self._ic_samp = iteration_controller_sampling
t1 = S.inverse_times(position)
t2 = N.inverse_times(residual)
self._value = 0.5 * (position.vdot(t1) + residual.vdot(t2)).real
self.R = (Instrument * makeOp(nonlinearity.derivative(m)) *
ht * makeOp(power))
self._gradient = (t1 - self.R.adjoint_times(t2)).lock()
def at(self, position):
return self.__class__(position, self.d, self.Instrument,
self.nonlinearity, self.ht, self.power, self.N,
self.S, self._ic, self._ic_samp)
@property
def value(self):
return self._value
@property
def gradient(self):
return self._gradient
@property
@memo
def curvature(self):
return WienerFilterCurvature(self.R, self.N, self.S, self._ic,
self._ic_samp)
......@@ -79,20 +79,22 @@ class Energy_Tests(unittest.TestCase):
pspace = ift.PowerSpace(hspace, binbounds=binbounds)
Dist = ift.PowerDistributor(target=hspace, power_space=pspace)
xi0 = ift.Field.from_random(domain=hspace, random_type='normal')
xi0_var = ift.Variable(ift.MultiField({'xi':xi0}))['xi']
def pspec(k): return 1 / (1 + k**2)**dim
pspec = ift.PS_field(pspace, pspec)
A = Dist(ift.sqrt(pspec))
n = ift.Field.from_random(domain=space, random_type='normal')
s = ht(xi0 * A)
s = ht(ift.makeOp(A)(xi0_var))
R = ift.ScalingOperator(10., space)
N = ift.ScalingOperator(1., space)
d = R(f(s)) + n
sqrtN = ift.ScalingOperator(1., space)
d_model = R(ift.LocalModel(s, nonlinearity()))
d = d_model.value + n
S = ift.create_power_operator(hspace, power_spectrum=_flat_PS)
IC = ift.GradientNormController(iteration_limit=100,
tol_abs_gradnorm=1e-5)
energy = ift.library.NonlinearWienerFilterEnergy(
position=xi0, d=d, Instrument=R, nonlinearity=f, ht=ht, power=A,
N=N, S=S)
d, d_model, sqrtN, IC)
if isinstance(nonlinearity, ift.library.Linear):
ift.extra.check_value_gradient_curvature_consistency(
energy, ntries=10)
......
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2018 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
import unittest
import nifty5 as ift
import numpy as np
from itertools import product
from test.common import expand
from numpy.testing import assert_allclose
def _flat_PS(k):
return np.ones_like(k)
class Noise_Energy_Tests(unittest.TestCase):
@expand(product([ift.RGSpace(64, distances=.789),
ift.RGSpace([32, 32], distances=.789)],
[ift.library.Exponential, ift.library.Linear],
[23, 131, 32]))
def testNoise(self, space, nonlinearity, seed):
np.random.seed(seed)
f = nonlinearity()
dim = len(space.shape)
hspace = space.get_default_codomain()
ht = ift.HarmonicTransformOperator(hspace, target=space)
binbounds = ift.PowerSpace.useful_binbounds(hspace, logarithmic=False)
pspace = ift.PowerSpace(hspace, binbounds=binbounds)
Dist = ift.PowerDistributor(target=hspace, power_space=pspace)
xi = ift.Field.from_random(domain=hspace, random_type='normal')
def pspec(k): return 1 / (1 + k**2)**dim
tau = ift.PS_field(pspace, pspec)
A = Dist(ift.sqrt(tau))
var = 1.
n = ift.Field.from_random(
domain=space,
random_type='normal',
std=np.sqrt(var))
var = ift.Field.full(n.domain, var)
N = ift.DiagonalOperator(var)
eta0 = ift.log(var)
s = ht(xi * A)
R = ift.ScalingOperator(10., space)
d = R(f(s)) + n
alpha = ift.Field.full(d.domain, 2.)
q = ift.Field.full(d.domain, 1e-5)
IC = ift.GradientNormController(
iteration_limit=100,
tol_abs_gradnorm=1e-5)
S = ift.create_power_operator(hspace, power_spectrum=_flat_PS)
C = ift.library.NonlinearWienerFilterEnergy(
position=xi,
d=d,
Instrument=R,
nonlinearity=f,
ht=ht,
power=A,
N=N,
S=S,
iteration_controller=IC).curvature
res_sample_list = [d - R(f(ht(C.draw_sample(from_inverse=True) + xi)))
for _ in range(10)]
energy = ift.library.NoiseEnergy(eta0, alpha, q, res_sample_list)
ift.extra.check_value_gradient_consistency(energy, ntries=10)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2018 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
import unittest
import nifty5 as ift
import numpy as np
from itertools import product
from test.common import expand
from numpy.testing import assert_allclose
def _flat_PS(k):
return np.ones_like(k)
class Energy_Tests(unittest.TestCase):
@expand(product([ift.RGSpace(64, distances=.789),
ift.RGSpace([32, 32], distances=.789)],
[ift.library.Exponential, ift.library.Linear],
[132, 42, 3]))
def testNonlinearPower(self, space, nonlinearity, seed):
np.random.seed(seed)
f = nonlinearity()
dim = len(space.shape)
hspace = space.get_default_codomain()
ht = ift.HarmonicTransformOperator(hspace, space)
binbounds = ift.PowerSpace.useful_binbounds(hspace, logarithmic=True)
pspace = ift.PowerSpace(hspace, binbounds=binbounds)
Dist = ift.PowerDistributor(target=hspace, power_space=pspace)
xi = ift.Field.from_random(domain=hspace, random_type='normal')
def pspec(k): return 1 / (1 + k**2)**dim
tau0 = ift.PS_field(pspace, pspec)
A = Dist(ift.sqrt(tau0))
n = ift.Field.from_random(domain=space, random_type='normal')
s = ht(xi * A)
R = ift.ScalingOperator(10., space)
diag = ift.full(space, 1.)
N = ift.DiagonalOperator(diag)
d = R(f(s)) + n
IC = ift.GradientNormController(
iteration_limit=100,
tol_abs_gradnorm=1e-5)
S = ift.create_power_operator(hspace, power_spectrum=_flat_PS)
D = ift.library.NonlinearWienerFilterEnergy(
position=xi,
d=d,
Instrument=R,
nonlinearity=f,
power=A,
N=N,
S=S,
ht=ht,
iteration_controller=IC).curvature
energy = ift.library.NonlinearPowerEnergy(
position=tau0,
d=d,
xi=xi,
D=D,
Instrument=R,
Distributor=Dist,
nonlinearity=f,
ht=ht,
N=N,
samples=10)
ift.extra.check_value_gradient_consistency(energy, ntries=10)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment