Commit ab97b34a authored by Philipp Arras's avatar Philipp Arras
Browse files

All energy test run through now :)

parent 43b247bb
Pipeline #24156 failed with stage
in 3 minutes and 59 seconds
......@@ -43,7 +43,7 @@ class LogNormalWienerFilterEnergy(Energy):
The prior signal covariance in harmonic space.
"""
def __init__(self, position, d, R, N, S, inverter, fft=None):
def __init__(self, position, d, R, N, S, inverter, ht=None):
super(LogNormalWienerFilterEnergy, self).__init__(position=position)
self.d = d
self.R = R
......@@ -51,21 +51,21 @@ class LogNormalWienerFilterEnergy(Energy):
self.S = S
self._inverter = inverter
if fft is None:
self._fft = create_composed_fft_operator(self.S.domain,
all_to='position')
if ht is None:
self._ht = create_composed_fft_operator(self.S.domain,
all_to='position')
else:
self._fft = fft
self._ht = ht
self._expp_sspace = exp(self._fft(self.position))
self._expp_sspace = exp(self._ht(self.position))
Sp = self.S.inverse_times(self.position)
expp = self._fft.adjoint_times(self._expp_sspace)
expp = self._ht.adjoint_times(self._expp_sspace)
Rexppd = self.R(expp) - self.d
NRexppd = self.N.inverse_times(Rexppd)
self._value = 0.5*(self.position.vdot(Sp) + Rexppd.vdot(NRexppd))
exppRNRexppd = self._fft.adjoint_times(
self._expp_sspace * self._fft(self.R.adjoint_times(NRexppd)))
exppRNRexppd = self._ht.adjoint_times(
self._expp_sspace * self._ht(self.R.adjoint_times(NRexppd)))
self._gradient = Sp + exppRNRexppd
def at(self, position):
......
......@@ -20,9 +20,11 @@ from .. import Field, exp
from ..operators.diagonal_operator import DiagonalOperator
from ..minimization.energy import Energy
# TODO Take only residual_sample_list as argument
class NoiseEnergy(Energy):
def __init__(self, position, d, m, D, t, FFT, Instrument, nonlinearity,
def __init__(self, position, d, m, D, t, ht, Instrument, nonlinearity,
alpha, q, Projection, samples=3, sample_list=None,
inverter=None):
super(NoiseEnergy, self).__init__(position=position)
......@@ -32,7 +34,7 @@ class NoiseEnergy(Energy):
self.N = DiagonalOperator(diagonal=exp(self.position))
self.t = t
self.samples = samples
self.FFT = FFT
self.ht = ht
self.Instrument = Instrument
self.nonlinearity = nonlinearity
......@@ -53,7 +55,7 @@ class NoiseEnergy(Energy):
def at(self, position):
return self.__class__(
position, self.d, self.m, self.D, self.t, self.FFT,
position, self.d, self.m, self.D, self.t, self.ht,
self.Instrument, self.nonlinearity, self.alpha, self.q,
self.Projection, sample_list=self.sample_list,
samples=self.samples, inverter=self.inverter)
......@@ -63,7 +65,7 @@ class NoiseEnergy(Energy):
for sample in self.sample_list:
residual = self.d - \
self.Instrument(self.nonlinearity(
self.FFT.adjoint_times(self.power*sample)))
self.ht(self.power*sample)))
lh = 0.5 * residual.vdot(self.N.inverse_times(residual))
grad = -0.5 * self.N.inverse_times(residual.conjugate() * residual)
if likelihood_gradient is None:
......@@ -74,7 +76,7 @@ class NoiseEnergy(Energy):
likelihood_gradient += grad
likelihood = ((likelihood / float(len(self.sample_list))) +
0.5 * self.position.integrate() +
0.5 * self.position.sum() +
(self.alpha - 1.).vdot(self.position) +
self.q.vdot(exp(-self.position)))
likelihood_gradient = (
......
......@@ -20,12 +20,12 @@ from ..operators.inversion_enabler import InversionEnabler
from .response_operators import LinearizedPowerResponse
def NonlinearPowerCurvature(position, FFT, Instrument, nonlinearity,
def NonlinearPowerCurvature(position, ht, Instrument, nonlinearity,
Projection, N, T, sample_list, inverter):
result = None
for sample in sample_list:
LinearizedResponse = LinearizedPowerResponse(
Instrument, nonlinearity, FFT, Projection, position, sample)
Instrument, nonlinearity, ht, Projection, position, sample)
op = LinearizedResponse.adjoint*N.inverse*LinearizedResponse
result = op if result is None else result + op
result = result*(1./len(sample_list)) + T
......
......@@ -51,7 +51,7 @@ class NonlinearPowerEnergy(Energy):
default : 3
"""
def __init__(self, position, d, N, m, D, FFT, Instrument, nonlinearity,
def __init__(self, position, d, N, m, D, ht, Instrument, nonlinearity,
Projection, sigma=0., samples=3, sample_list=None,
inverter=None):
super(NonlinearPowerEnergy, self).__init__(position)
......@@ -61,7 +61,7 @@ class NonlinearPowerEnergy(Energy):
self.N = N
self.T = SmoothnessOperator(domain=self.position.domain[0],
strength=sigma, logarithmic=True)
self.FFT = FFT
self.ht = ht
self.Instrument = Instrument
self.nonlinearity = nonlinearity
self.Projection = Projection
......@@ -80,7 +80,7 @@ class NonlinearPowerEnergy(Energy):
def at(self, position):
return self.__class__(position, self.d, self.N, self.m, self.D,
self.FFT, self.Instrument, self.nonlinearity,
self.ht, self.Instrument, self.nonlinearity,
self.Projection, sigma=self._sigma,
samples=len(self.sample_list),
sample_list=self.sample_list,
......@@ -91,10 +91,10 @@ class NonlinearPowerEnergy(Energy):
for sample in self.sample_list:
residual = self.d - \
self.Instrument(self.nonlinearity(
self.FFT.adjoint_times(self.power*sample)))
self.ht(self.power*sample)))
lh = 0.5 * residual.vdot(self.N.inverse_times(residual))
LinR = LinearizedPowerResponse(
self.Instrument, self.nonlinearity, self.FFT, self.Projection,
self.Instrument, self.nonlinearity, self.ht, self.Projection,
self.position, sample)
grad = LinR.adjoint_times(self.N.inverse_times(residual))
if likelihood_gradient is None:
......@@ -122,6 +122,6 @@ class NonlinearPowerEnergy(Energy):
@memo
def curvature(self):
return NonlinearPowerCurvature(
self.position, self.FFT, self.Instrument, self.nonlinearity,
self.position, self.ht, self.Instrument, self.nonlinearity,
self.Projection, self.N, self.T, self.sample_list,
inverter=self.inverter)
......@@ -23,19 +23,19 @@ from .response_operators import LinearizedSignalResponse
class NonlinearWienerFilterEnergy(Energy):
def __init__(self, position, d, Instrument, nonlinearity, FFT, power, N, S,
def __init__(self, position, d, Instrument, nonlinearity, ht, power, N, S,
inverter=None):
super(NonlinearWienerFilterEnergy, self).__init__(position=position)
self.d = d
self.Instrument = Instrument
self.nonlinearity = nonlinearity
self.FFT = FFT
self.ht = ht
self.power = power
self.LinearizedResponse = \
LinearizedSignalResponse(Instrument, nonlinearity, FFT, power,
LinearizedSignalResponse(Instrument, nonlinearity, ht, power,
self.position)
position_map = FFT.adjoint_times(self.power * self.position)
position_map = ht(self.power * self.position)
residual = d - Instrument(nonlinearity(position_map))
self.N = N
self.S = S
......@@ -48,7 +48,7 @@ class NonlinearWienerFilterEnergy(Energy):
def at(self, position):
return self.__class__(position, self.d, self.Instrument,
self.nonlinearity, self.FFT, self.power, self.N,
self.nonlinearity, self.ht, self.power, self.N,
self.S, inverter=self.inverter)
@property
......
......@@ -19,14 +19,15 @@
from ..field import exp
def LinearizedSignalResponse(Instrument, nonlinearity, FFT, power, m):
position = FFT.adjoint_times(power*m)
def LinearizedSignalResponse(Instrument, nonlinearity, ht, power, m):
position = ht(power*m)
return (Instrument * nonlinearity.derivative(position) *
FFT.adjoint * power)
ht * power)
def LinearizedPowerResponse(Instrument, nonlinearity, FFT, Projection, t, m):
def LinearizedPowerResponse(Instrument, nonlinearity, ht, Projection, t, m):
power = exp(0.5*t)
position = FFT.adjoint_times(Projection.adjoint_times(power) * m)
position = ht(Projection.adjoint_times(power) * m)
linearization = nonlinearity.derivative(position)
return (0.5 * Instrument * linearization * FFT.adjoint * m *
return (0.5 * Instrument * linearization * ht * m *
Projection.adjoint * power)
......@@ -25,13 +25,15 @@ from numpy.testing import assert_allclose
# TODO Add also other space types
# TODO Set tolerances to reasonable values
# TODO Set tolerances and eps to reasonable values
class Map_Energy_Tests(unittest.TestCase):
@expand(product([ift.RGSpace(64, distances=.789),
ift.RGSpace([32, 32], distances=.789)]))
def testLinearMap(self, space):
ift.RGSpace([32, 32], distances=.789)],
[4, 78, 23]))
def testLinearMap(self, space, seed):
np.random.seed(seed)
dim = len(space.shape)
hspace = space.get_default_codomain()
ht = ift.HarmonicTransformOperator(hspace, target=space)
......@@ -54,7 +56,7 @@ class Map_Energy_Tests(unittest.TestCase):
direction = ift.Field.from_random('normal', hspace)
direction /= np.sqrt(direction.var())
eps = 1e-10
eps = 1e-7
s1 = s0 + eps * direction
IC = ift.GradientNormController(
......@@ -72,12 +74,14 @@ class Map_Energy_Tests(unittest.TestCase):
a = (energy1.value - energy0.value) / eps
b = energy0.gradient.vdot(direction)
tol = 1e-3
tol = 1e-5
assert_allclose(a, b, rtol=tol, atol=tol)
@expand(product([ift.RGSpace(64, distances=.789),
ift.RGSpace([32, 32], distances=.789)]))
def testLognormalMap(self, space):
ift.RGSpace([32, 32], distances=.789)],
[4, 78, 23]))
def testLognormalMap(self, space, seed):
np.random.seed(seed)
dim = len(space.shape)
hspace = space.get_default_codomain()
ht = ift.HarmonicTransformOperator(hspace, target=space)
......@@ -101,7 +105,7 @@ class Map_Energy_Tests(unittest.TestCase):
direction = ift.Field.from_random('normal', hspace)
direction /= np.sqrt(direction.var())
eps = 1e-10
eps = 1e-6
sh1 = sh0 + eps * direction
IC = ift.GradientNormController(
......@@ -119,17 +123,19 @@ class Map_Energy_Tests(unittest.TestCase):
a = (energy1.value - energy0.value) / eps
b = energy0.gradient.vdot(direction)
tol = 1e-2
tol = 1e-3
assert_allclose(a, b, rtol=tol, atol=tol)
@expand(product([ift.RGSpace(64, distances=.789),
ift.RGSpace([32, 32], distances=.789)],
[ift.library.Exponential, ift.library.Linear]))
def testNonlinearMap(self, space, nonlinearity):
[ift.library.Exponential, ift.library.Linear],
[4, 78, 23]))
def testNonlinearMap(self, space, nonlinearity, seed):
np.random.seed(seed)
f = nonlinearity()
dim = len(space.shape)
fft = ift.FFTOperator(space)
hspace = fft.target[0]
hspace = space.get_default_codomain()
ht = ift.HarmonicTransformOperator(hspace, target=space)
binbounds = ift.PowerSpace.useful_binbounds(hspace, logarithmic=False)
pspace = ift.PowerSpace(hspace, binbounds=binbounds)
P = ift.PowerProjectionOperator(domain=hspace, power_space=pspace)
......@@ -139,7 +145,7 @@ class Map_Energy_Tests(unittest.TestCase):
pspec = ift.PS_field(pspace, pspec)
A = P.adjoint_times(ift.sqrt(pspec))
n = ift.Field.from_random(domain=space, random_type='normal')
s = fft.inverse_times(xi0 * A)
s = ht(xi0 * A)
diag = ift.Field.ones(space) * 10
R = ift.DiagonalOperator(diag)
diag = ift.Field.ones(space)
......@@ -148,16 +154,16 @@ class Map_Energy_Tests(unittest.TestCase):
direction = ift.Field.from_random('normal', hspace)
direction /= np.sqrt(direction.var())
eps = 1e-10
eps = 1e-7
xi1 = xi0 + eps * direction
S = ift.create_power_operator(hspace, power_spectrum=lambda k: 1.)
energy0 = ift.library.NonlinearWienerFilterEnergy(
position=xi0, d=d, Instrument=R, nonlinearity=f, FFT=fft, power=A, N=N, S=S)
position=xi0, d=d, Instrument=R, nonlinearity=f, ht=ht, power=A, N=N, S=S)
energy1 = ift.library.NonlinearWienerFilterEnergy(
position=xi1, d=d, Instrument=R, nonlinearity=f, FFT=fft, power=A, N=N, S=S)
position=xi1, d=d, Instrument=R, nonlinearity=f, ht=ht, power=A, N=N, S=S)
a = (energy1.value - energy0.value) / eps
b = energy0.gradient.vdot(direction)
tol = 1e-2
tol = 1e-4
assert_allclose(a, b, rtol=tol, atol=tol)
......@@ -30,12 +30,14 @@ from numpy.testing import assert_allclose
class Noise_Energy_Tests(unittest.TestCase):
@expand(product([ift.RGSpace(64, distances=.789),
ift.RGSpace([32, 32], distances=.789)],
[ift.library.Exponential, ift.library.Linear]))
def testNoise(self, space, nonlinearity):
[ift.library.Exponential, ift.library.Linear],
[23, 131, 32]))
def testNoise(self, space, nonlinearity, seed):
np.random.seed(seed)
f = nonlinearity()
dim = len(space.shape)
fft = ift.FFTOperator(space)
hspace = fft.target[0]
hspace = space.get_default_codomain()
ht = ift.HarmonicTransformOperator(hspace, target=space)
binbounds = ift.PowerSpace.useful_binbounds(hspace, logarithmic=False)
pspace = ift.PowerSpace(hspace, binbounds=binbounds)
P = ift.PowerProjectionOperator(domain=hspace, power_space=pspace)
......@@ -44,13 +46,15 @@ class Noise_Energy_Tests(unittest.TestCase):
def pspec(k): return 1 / (1 + k**2)**dim
tau = ift.PS_field(pspace, pspec)
A = P.adjoint_times(ift.sqrt(tau))
n = ift.Field.from_random(domain=space, random_type='normal')
s = fft.inverse_times(xi * A)
var = 1.
n = ift.Field.from_random(domain=space, random_type='normal', std=np.sqrt(var))
var = ift.Field(n.domain, val=var)
N = ift.DiagonalOperator(var)
eta0 = ift.log(var)
s = ht(xi * A)
diag = ift.Field.ones(space) * 10
R = ift.DiagonalOperator(diag)
diag = ift.Field.ones(space)
eta0 = ift.log(diag)
N = ift.DiagonalOperator(diag)
d = R(f(s)) + n
alpha = ift.Field(d.domain, val=2.)
......@@ -58,7 +62,7 @@ class Noise_Energy_Tests(unittest.TestCase):
direction = ift.Field.from_random('normal', d.domain)
direction /= np.sqrt(direction.var())
eps = 1e-10
eps = 1e-8
eta1 = eta0 + eps * direction
IC = ift.GradientNormController(
......@@ -74,22 +78,24 @@ class Noise_Energy_Tests(unittest.TestCase):
d=d,
Instrument=R,
nonlinearity=f,
FFT=fft,
ht=ht,
power=A,
N=N,
S=S,
inverter=inverter).curvature
Nsamples = 10
sample_list = [D.generate_posterior_sample() + xi for i in range(Nsamples)]
energy0 = ift.library.NoiseEnergy(
position=eta0, d=d, m=xi, D=D, t=tau, Instrument=R,
alpha=alpha, q=q, Projection=P, nonlinearity=f,
FFT=fft, samples=3)
ht=ht, sample_list=sample_list)
energy1 = ift.library.NoiseEnergy(
position=eta1, d=d, m=xi, D=D, t=tau, Instrument=R,
alpha=alpha, q=q, Projection=P, nonlinearity=f,
FFT=fft, samples=3)
ht=ht, sample_list=sample_list)
a = (energy1.value - energy0.value) / eps
b = energy0.gradient.vdot(direction)
tol = 1e-2
tol = 1e-5
assert_allclose(a, b, rtol=tol, atol=tol)
......@@ -29,8 +29,10 @@ from numpy.testing import assert_allclose
class Power_Energy_Tests(unittest.TestCase):
@expand(product([ift.RGSpace(64, distances=.789),
ift.RGSpace([32, 32], distances=.789)]))
def testLinearPower(self, space):
ift.RGSpace([32, 32], distances=.789)],
[132, 42, 3]))
def testLinearPower(self, space, seed):
np.random.seed(seed)
dim = len(space.shape)
hspace = space.get_default_codomain()
ht = ift.HarmonicTransformOperator(hspace, space)
......@@ -92,12 +94,14 @@ class Power_Energy_Tests(unittest.TestCase):
@expand(product([ift.RGSpace(64, distances=.789),
ift.RGSpace([32, 32], distances=.789)],
[ift.library.Exponential, ift.library.Linear]))
def testNonlinearPower(self, space, nonlinearity):
[ift.library.Exponential, ift.library.Linear],
[132, 42, 3]))
def testNonlinearPower(self, space, nonlinearity, seed):
np.random.seed(seed)
f = nonlinearity()
dim = len(space.shape)
fft = ift.FFTOperator(space)
hspace = fft.target[0]
hspace = space.get_default_codomain()
ht = ift.HarmonicTransformOperator(hspace, space)
binbounds = ift.PowerSpace.useful_binbounds(hspace, logarithmic=True)
pspace = ift.PowerSpace(hspace, binbounds=binbounds)
P = ift.PowerProjectionOperator(domain=hspace, power_space=pspace)
......@@ -107,7 +111,7 @@ class Power_Energy_Tests(unittest.TestCase):
tau0 = ift.PS_field(pspace, pspec)
A = P.adjoint_times(ift.sqrt(tau0))
n = ift.Field.from_random(domain=space, random_type='normal')
s = fft.inverse_times(xi * A)
s = ht(xi * A)
diag = ift.Field.ones(space) * 10
R = ift.DiagonalOperator(diag)
diag = ift.Field.ones(space)
......@@ -116,7 +120,7 @@ class Power_Energy_Tests(unittest.TestCase):
direction = ift.Field.from_random('normal', pspace)
direction /= np.sqrt(direction.var())
eps = 1e-10
eps = 1e-7
tau1 = tau0 + eps * direction
IC = ift.GradientNormController(
......@@ -132,11 +136,13 @@ class Power_Energy_Tests(unittest.TestCase):
d=d,
Instrument=R,
nonlinearity=f,
FFT=fft,
power=A,
N=N,
S=S,
ht=ht,
inverter=inverter).curvature
Nsamples = 10
sample_list = [D.generate_posterior_sample() + xi for _ in range(Nsamples)]
energy0 = ift.library.NonlinearPowerEnergy(
position=tau0,
......@@ -146,9 +152,9 @@ class Power_Energy_Tests(unittest.TestCase):
Instrument=R,
Projection=P,
nonlinearity=f,
FFT=fft,
ht=ht,
N=N,
inverter=inverter)
sample_list=sample_list)
energy1 = ift.library.NonlinearPowerEnergy(
position=tau1,
d=d,
......@@ -157,11 +163,11 @@ class Power_Energy_Tests(unittest.TestCase):
Instrument=R,
Projection=P,
nonlinearity=f,
FFT=fft,
ht=ht,
N=N,
inverter=inverter)
sample_list=sample_list)
a = (energy1.value - energy0.value) / eps
b = energy0.gradient.vdot(direction)
tol = 1e-2
tol = 1e-4
assert_allclose(a, b, rtol=tol, atol=tol)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment