Commit 5d9b5045 authored by Jakob Knollmueller's avatar Jakob Knollmueller

nonlinear_wiener_filter demo added

parent 996ea442
Pipeline #12535 passed with stage
in 6 minutes and 5 seconds
from nifty import *
import plotly.offline as pl
import plotly.graph_objs as go
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.rank
np.random.seed(42)
class NonlinearResponse(LinearOperator):
def __init__(self, FFT, Instrument, function, derivative, default_spaces=None):
super(NonlinearResponse, self).__init__(default_spaces)
self._domain = FFT.target
self._target = Instrument.target
self.function = function
self.derivative = derivative
self.I = Instrument
self.FFT = FFT
def _times(self, x, spaces=None):
return self.I(self.function(self.FFT.adjoint_times(x)))
def _adjoint_times(self, x, spaces=None):
return self.FFT(self.function(self.I.adjoint_times(x)))
def derived_times(self, x, position):
position_derivative = self.derivative(self.FFT.adjoint_times(position))
return self.I(position_derivative * self.FFT.adjoint_times(x))
def derived_adjoint_times(self, x, position):
position_derivative = self.derivative(self.FFT.adjoint_times(position))
return self.FFT(position_derivative * self.I.adjoint_times(x))
@property
def domain(self):
return self._domain
@property
def target(self):
return self._target
@property
def unitary(self):
return False
if __name__ == "__main__":
distribution_strategy = 'not'
# Set up position space
s_space = RGSpace([128,128])
# s_space = HPSpace(32)
# Define harmonic transformation and associated harmonic space
fft = FFTOperator(s_space)
h_space = fft.target[0]
# Setting up power space
p_space = PowerSpace(h_space, distribution_strategy=distribution_strategy)
# Choosing the prior correlation structure and defining correlation operator
pow_spec = (lambda k: (0.42 / (k + 1) ** 3))
S = create_power_operator(h_space, power_spectrum=pow_spec,
distribution_strategy=distribution_strategy)
# Drawing a sample sh from the prior distribution in harmonic space
sp = Field(p_space, val=lambda z: pow_spec(z)**(1./2),
distribution_strategy=distribution_strategy)
sh = sp.power_synthesize(real_signal=True)
# Choosing the measurement instrument
# Instrument = SmoothingOperator(s_space, sigma=0.01)
Instrument = DiagonalOperator(s_space, diagonal=1.)
# Instrument._diagonal.val[200:400, 200:400] = 0
# Choosing nonlinearity
# function = exp
# derivative = exp
def function(x):
return 0.5 * tanh(x) + 0.5
def derivative(x):
return 0.5*(1 - tanh(x)**2)
# def function(x):
# return x
# def derivative(x):
# return 1
#
# def function(x):
# return 0.5*x**2 + x
# def derivative(x):
# return x + 1
#
# def function(x):
# return 0.9*x**4 +0.2*x**2 + x
# def derivative(x):
# return 0.9*4*x**3 + 0.4*x +1
#
#Adding a harmonic transformation to the instrument
R = NonlinearResponse(fft, Instrument, function, derivative)
noise = 0.01
N = DiagonalOperator(s_space, diagonal=noise, bare=True)
n = Field.from_random(domain=s_space,
random_type='normal',
std=sqrt(noise),
mean=0)
# Creating the mock data
d = R(sh) + n
# Choosing the minimization strategy
def convergence_measure(energy, iteration): # returns current energy
x = energy.value
print (x, iteration)
# minimizer = SteepestDescent(convergence_tolerance=0,
# iteration_limit=50,
# callback=convergence_measure)
#
minimizer = RelaxedNewton(convergence_tolerance=0,
convergence_level=1,
iteration_limit=10,
callback=convergence_measure)
# minimizer = VL_BFGS(convergence_tolerance=0,
# iteration_limit=50,
# callback=convergence_measure,
# max_history_length=3)
#
#
# Setting starting position
m0 = Field.from_random(random_type="normal",domain = h_space, std=0.001)
# Initializing the Wiener Filter energy
energy = NonlinearWienerFilterEnergy(position=m0, d=d, R=R, N=N, S=S)
# Solving the problem with chosen minimization strategy
(energy, convergence) = minimizer(energy)
# Transforming fields to position space for plotting
ss = fft.adjoint_times(sh)
m = fft.adjoint_times(energy.position)
# Plotting
d_data = d.val.get_full_data().real
if rank == 0:
pl.plot([go.Heatmap(z=d_data)], filename='data.html')
ss_data = ss.val.get_full_data().real
if rank == 0:
pl.plot([go.Heatmap(z=ss_data)], filename='ss.html')
sh_data = sh.val.get_full_data().real
if rank == 0:
pl.plot([go.Heatmap(z=sh_data)], filename='sh.html')
m_data = m.val.get_full_data().real
if rank == 0:
pl.plot([go.Heatmap(z=m_data)], filename='map.html')
......@@ -19,7 +19,7 @@ if __name__ == "__main__":
correlation_length = 0.1
#variance of field in position space sqrt(<|s_x|^2>) (in unit of s)
field_variance = 2.
#smoothing length that response (in same unit as L)
#smoothing length of response (in same unit as L)
response_sigma = 0.1
#defining resolution (pixels per dimension)
......@@ -62,6 +62,7 @@ if __name__ == "__main__":
d = R(ss) + n
# Wiener filter
j = R.adjoint_times(N.inverse_times(d))
D = PropagatorOperator(S=S, N=N, R=R)
......
from wiener_filter_energy import WienerFilterEnergy
\ No newline at end of file
from wiener_filter_energy import WienerFilterEnergy
from nonlinear_wiener_filter_energy import NonlinearWienerFilterEnergy
\ No newline at end of file
from nifty.energies.energy import Energy
from nifty.library.operator_library import NonlinearWienerFilterCurvature
class NonlinearWienerFilterEnergy(Energy):
"""The Energy for the Gaussian lognormal case.
It describes the situation of linear measurement of a
lognormal signal with Gaussian noise and Gaussain signal prior.
Parameters
----------
d : Field,
the data.
R : Operator,
The nonlinear response operator, describtion of the measurement process.
N : EndomorphicOperator,
The noise covariance in data space.
S : EndomorphicOperator,
The prior signal covariance in harmonic space.
"""
def __init__(self, position, d, R, N, S):
super(NonlinearWienerFilterEnergy, self).__init__(position = position)
self.d = d
self.R = R
self.N = N
self.S = S
def at(self, position):
return self.__class__(position, self.d, self.R, self.N, self.S)
@property
def value(self):
energy = 0.5 * self.position.dot(self.S.inverse_times(self.position))
energy += 0.5 * (self.d - self.R(self.position)).dot(
self.N.inverse_times(self.d - self.R(self.position)))
return energy.real
@property
def gradient(self):
gradient = self.S.inverse_times(self.position)
gradient -= self.R.derived_adjoint_times(
self.N.inverse_times(self.d - self.R(self.position)), self.position)
return gradient
@property
def curvature(self):
curvature = NonlinearWienerFilterCurvature(R=self.R,
N=self.N,
S=self.S,
position=self.position)
return curvature
from wiener_filter_curvature import WienerFilterCurvature
\ No newline at end of file
from wiener_filter_curvature import WienerFilterCurvature
from nonlinear_wiener_filter_curvature import NonlinearWienerFilterCurvature
\ No newline at end of file
from nifty.operators import EndomorphicOperator,\
InvertibleOperatorMixin
class NonlinearWienerFilterCurvature(InvertibleOperatorMixin, EndomorphicOperator):
def __init__(self, R, N, S, position, inverter=None, preconditioner=None):
self.R = R
self.N = N
self.S = S
self.position = position
if preconditioner is None:
preconditioner = self.S.times
self._domain = self.S.domain
super(NonlinearWienerFilterCurvature, self).__init__(inverter=inverter,
preconditioner=preconditioner)
@property
def domain(self):
return self._domain
@property
def self_adjoint(self):
return True
@property
def unitary(self):
return False
# ---Added properties and methods---
def _times(self, x, spaces):
return self.R.derived_adjoint_times(
self.N.inverse_times(self.R.derived_times(
x, self.position)), self.position)\
+ self.S.inverse_times(x)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment