Commit fb61bf9e authored by Philipp Arras's avatar Philipp Arras
Browse files

Just add files. Nothing working. Need to port.

parent c165f5cd
from ...operators.endomorphic_operator import EndomorphicOperator
from ...operators.invertible_operator_mixin import InvertibleOperatorMixin
class NoiseCurvature(InvertibleOperatorMixin, EndomorphicOperator):
pass
from ... import Field, exp
from ...energies.energy import Energy
from ...memoization import memo
from ...operators.diagonal_operator import DiagonalOperator
from ...sugar import generate_posterior_sample
class NoiseEnergy(Energy):
def __init__(self, position, d, m, D, t, FFT, Instrument, nonlinearity, alpha, q, Projection,
samples=3, sample_list=None, inverter=None):
super(NoiseEnergy, self).__init__(position=position.copy())
dummy = self.position.norm()
self.m = m
self.D = D
self.d = d
self.N = DiagonalOperator(
self.position.domain, diagonal=exp(self.position))
self.t = t
self.samples = samples
self.FFT = FFT
self.Instrument = Instrument
self.nonlinearity = nonlinearity
self.alpha = alpha
self.q = q
self.Projection = Projection
self.power = self.Projection.adjoint_times(exp(0.5 * self.t))
self.one = Field(self.position.domain, val=1.)
if sample_list is None:
sample_list = []
if samples is None:
sample_list.append(self.m)
else:
for i in range(samples):
sample = generate_posterior_sample(m, D)
sample = FFT(Field(FFT.domain, val=(
FFT.adjoint_times(sample).val)))
sample_list.append(sample)
self.sample_list = sample_list
self.inverter = inverter
def at(self, position):
return self.__class__(position, self.d, self.m,
self.D, self.t, self.FFT, self.Instrument, self.nonlinearity,
self.alpha,
self.q,
self.Projection,
sample_list=self.sample_list,
samples=self.samples, inverter=self.inverter)
@property
@memo
def value(self):
likelihood = 0.
for sample in self.sample_list:
likelihood += self._likelihood(sample)
return ((likelihood / float(len(self.sample_list))) + 0.5 * self.one.vdot(self.position)
+ (self.alpha - self.one).vdot(self.position) + self.q.vdot(exp(-self.position)))
def _likelihood(self, m):
residual = self.d - \
self.Instrument(self.nonlinearity(
self.FFT.adjoint_times(self.power * m)))
energy = 0.5 * residual.vdot(self.N.inverse_times(residual))
return energy.real
@property
@memo
def gradient(self):
likelihood_gradient = Field(self.position.domain, val=0.)
for sample in self.sample_list:
likelihood_gradient += self._likelihood_gradient(sample)
return (likelihood_gradient / float(len(self.sample_list))
+ 0.5 * self.one + (self.alpha - self.one) - self.q * (exp(-self.position)))
def _likelihood_gradient(self, m):
residual = self.d - \
self.Instrument(self.nonlinearity(
self.FFT.adjoint_times(self.power * m)))
gradient = - 0.5 * \
self.N.inverse_times(residual.conjugate() * residual)
return gradient
@property
@memo
def curvature(self):
pass
from ... import Field
from ...operators.endomorphic_operator import EndomorphicOperator
from ...operators.invertible_operator_mixin import InvertibleOperatorMixin
from .response_operators import LinearizedPowerResponse
class NonlinearPowerCurvature(InvertibleOperatorMixin, EndomorphicOperator):
def __init__(self, position, FFT, Instrument, nonlinearity,
Projection, N, T, sample_list, inverter=None):
self.N = N
self.FFT = FFT
self.Instrument = Instrument
self.T = T
self.sample_list = sample_list
self.samples = len(sample_list)
self.position = position
self.Projection = Projection
self.nonlinearity = nonlinearity
# if preconditioner is None:
# preconditioner = self.theta.inverse_times
self._domain = self.position.domain
super(NonlinearPowerCurvature, self).__init__(inverter=inverter)
@property
def domain(self):
return self._domain
@property
def self_adjoint(self):
return True
@property
def unitary(self):
return False
# ---Added properties and methods---
def _times(self, x, spaces):
result = Field(self.domain, val=0.)
for i in range(self.samples):
sample = self.sample_list[i]
result += self._sample_times(x, sample)
result /= self.samples
return (result + self.T(x))
def _sample_times(self, x, sample):
LinearizedResponse = LinearizedPowerResponse(self.Instrument, self.nonlinearity,
self.FFT, self.Projection, self.position, sample)
result = LinearizedResponse.adjoint_times(
self.N.inverse_times(LinearizedResponse(x)))
return result
from ... import Field, exp
from ...energies.energy import Energy
from ...memoization import memo
from ...operators.smoothness_operator import SmoothnessOperator
from ...sugar import generate_posterior_sample
from .nonlinear_power_curvature import NonlinearPowerCurvature
from .response_operators import LinearizedPowerResponse
class NonlinearPowerEnergy(Energy):
"""The Energy of the power spectrum according to the critical filter.
It describes the energy of the logarithmic amplitudes of the power spectrum of
a Gaussian random field under reconstruction uncertainty with smoothness and
inverse gamma prior. It is used to infer the correlation structure of a correlated signal.
Parameters
----------
position : Field,
The current position of this energy.
m : Field,
The map whichs power spectrum has to be inferred
D : EndomorphicOperator,
The curvature of the Gaussian encoding the posterior covariance.
If not specified, the map is assumed to be no reconstruction.
default : None
sigma : float
The parameter of the smoothness prior.
default : ??? None? ???????
samples : integer
Number of samples used for the estimation of the uncertainty corrections.
default : 3
"""
def __init__(self, position, d, N, m, D, FFT, Instrument, nonlinearity, Projection,
sigma=0., samples=3, sample_list=None, inverter=None):
super(NonlinearPowerEnergy, self).__init__(position=position.copy())
dummy = self.position.norm()
self.m = m
self.D = D
self.d = d
self.N = N
self.sigma = sigma
self.T = SmoothnessOperator(domain=self.position.domain[0], strength=self.sigma,
logarithmic=True)
self.samples = samples
self.FFT = FFT
self.Instrument = Instrument
self.nonlinearity = nonlinearity
self.Projection = Projection
self.LinearizedResponse = LinearizedPowerResponse(self.Instrument, self.nonlinearity,
self.FFT, self.Projection, self.position, self.m)
self.power = self.Projection.adjoint_times(
exp(0.5 * self.position))
if sample_list is None:
sample_list = []
if samples is None:
sample_list.append(self.m)
else:
for i in range(samples):
sample = generate_posterior_sample(m, D)
sample = FFT(Field(FFT.domain, val=(
FFT.adjoint_times(sample).val)))
sample_list.append(sample)
self.sample_list = sample_list
self.inverter = inverter
def at(self, position):
return self.__class__(position, self.d, self.N, self.m,
self.D, self.FFT, self.Instrument, self.nonlinearity,
self.Projection, sigma=self.sigma,
sample_list=self.sample_list,
samples=self.samples, inverter=self.inverter)
@property
@memo
def value(self):
likelihood = 0.
for sample in self.sample_list:
likelihood += self._likelihood(sample)
return 0.5 * self.position.vdot(self.T(self.position)) + likelihood / float(len(self.sample_list))
def _likelihood(self, m):
residual = self.d - \
self.Instrument(self.nonlinearity(
self.FFT.adjoint_times(self.power * m)))
energy = 0.5 * residual.vdot(self.N.inverse_times(residual))
return energy.real
@property
@memo
def gradient(self):
likelihood_gradient = Field(self.position.domain, val=0.)
for sample in self.sample_list:
likelihood_gradient += self._likelihood_gradient(sample)
return -likelihood_gradient / float(len(self.sample_list)) + self.T(self.position)
def _likelihood_gradient(self, m):
residual = self.d - \
self.Instrument(self.nonlinearity(
self.FFT.adjoint_times(self.power * m)))
LinearizedResponse = LinearizedPowerResponse(self.Instrument, self.nonlinearity,
self.FFT, self.Projection, self.position, m)
gradient = LinearizedResponse.adjoint_times(
self.N.inverse_times(residual))
return gradient
@property
@memo
def curvature(self):
curvature = NonlinearPowerCurvature(self.position, self.FFT, self.Instrument, self.nonlinearity,
self.Projection, self.N, self.T, self.sample_list, inverter=self.inverter)
return curvature
from ...operators.endomorphic_operator import EndomorphicOperator
from ...operators.invertible_operator_mixin import InvertibleOperatorMixin
class NonlinearSignalCurvature(InvertibleOperatorMixin, EndomorphicOperator):
def __init__(self, R, N, S, inverter=None):
self.R = R
self.N = N
self.S = S
# if preconditioner is None:
# preconditioner = self.S.times
self._domain = self.S.domain
super(NonlinearSignalCurvature, self).__init__(inverter=inverter)
@property
def domain(self):
return self._domain
@property
def self_adjoint(self):
return True
@property
def unitary(self):
return False
# ---Added properties and methods---
def _times(self, x, spaces):
return self.R.adjoint_times(self.N.inverse_times(self.R(x))) + self.S.inverse_times(x)
from ...energies.energy import Energy
from ...memoization import memo
from .nonlinear_signal_curvature import NonlinearSignalCurvature
from .response_operators import LinearizedSignalResponse
class NonlinearWienerFilterEnergy(Energy):
"""The Energy for the Gaussian lognormal case.
It describes the situation of linear measurement of a
lognormal signal with Gaussian noise and Gaussain signal prior.
Parameters
----------
d : Field,
the data.
R : Operator,
The nonlinear response operator, describtion of the measurement process.
N : EndomorphicOperator,
The noise covariance in data space.
S : EndomorphicOperator,
The prior signal covariance in harmonic space.
"""
def __init__(self, position, d, Instrument, nonlinearity, FFT, power, N, S, inverter=None):
super(NonlinearWienerFilterEnergy, self).__init__(position=position)
# print "init", position.norm()
self.d = d
self.Instrument = Instrument
self.nonlinearity = nonlinearity
self.FFT = FFT
self.power = power
self.LinearizedResponse = LinearizedSignalResponse(Instrument, nonlinearity,
FFT, power, self.position)
position_map = FFT.adjoint_times(self.power * self.position)
# position_map = (Field(FFT.domain,val=position_map.val.real+0j))
self.residual = d - Instrument(nonlinearity(position_map))
# Field(d.domain,
# val=self.residual.val.get_full_data().view(np.complex128).conjugate().view(np.float64))
self.N = N
self.S = S
self.inverter = inverter
self._t1 = self.S.inverse_times(self.position)
self._t2 = self.N.inverse_times(self.residual)
def at(self, position):
return self.__class__(position, self.d, self.Instrument,
self.nonlinearity, self.FFT, self.power, self.N, self.S, inverter=self.inverter)
@property
@memo
def value(self):
energy = 0.5 * self.position.vdot(self._t1)
energy += 0.5 * self.residual.vdot(self._t2)
return energy.real
@property
@memo
def gradient(self):
gradient = self._t1.copy()
gradient -= self.LinearizedResponse.adjoint_times(self._t2)
return gradient
@property
@memo
def curvature(self):
curvature = NonlinearSignalCurvature(R=self.LinearizedResponse,
N=self.N,
S=self.S, inverter=self.inverter)
return curvature
from numpy import logical_and, where
from ... import Field, exp, tanh
class Linear:
def __call__(self, x):
return x
def derivative(self, x):
return 1
class Exponential(object):
def __call__(self, x):
return exp(x)
def derivative(self, x):
return exp(x)
class Tanh(object):
def __call__(self, x):
return tanh(x)
def derivative(self, x):
return (1. - tanh(x)**2)
class PositiveTanh:
def __call__(self, x):
return 0.5 * tanh(x) + 0.5
def derivative(self, x):
return 0.5 * (1. - tanh(x)**2)
class LinearThenQuadraticWithJump(object):
def __call__(self, x):
dom = x.domain
x = x.copy().val.get_full_data()
cond = where(x > 0.)
not_cond = where(x <= 0.)
x[cond] **= 2
x[not_cond] -= 1
return Field(domain=dom, val=x)
def derivative(self, x):
dom = x.domain
x = x.copy().val.get_full_data()
cond = where(x > 0.)
not_cond = where(x <= 0.)
x[cond] *= 2
x[not_cond] = 1
return Field(domain=dom, val=x)
class ReallyStupidNonlinearity(object):
def __call__(self, x):
dom = x.domain
x = x.copy().val.get_full_data()
cond1 = where(logical_and(x > 0., x < .5))
cond2 = where(x >= .5)
not_cond = where(x <= 0.)
x[cond2] -= 0.5
x[cond2] **= 2
x[cond1] = 0.
x[not_cond] -= 1
return Field(domain=dom, val=x)
def derivative(self, x):
dom = x.domain
x = x.copy().val.get_full_data()
cond1 = where(logical_and(x > 0., x < 0.5))
cond2 = where(x > .5)
not_cond = where(x <= 0.)
x[cond2] -= 0.5
x[cond2] *= 2
x[cond1] = 0.
x[not_cond] = 1
return Field(domain=dom, val=x)
from ... import exp
from ...operators.linear_operator import LinearOperator
class AdjointFFTResponse(LinearOperator):
def __init__(self, FFT, R, default_spaces=None):
super(AdjointFFTResponse, self).__init__(default_spaces)
self._domain = FFT.target
self._target = R.target
self.R = R
self.FFT = FFT
def _times(self, x, spaces=None):
return self.R(self.FFT.adjoint_times(x))
def _adjoint_times(self, x, spaces=None):
return self.FFT(self.R.adjoint_times(x))
@property
def domain(self):
return self._domain
@property
def target(self):
return self._target
@property
def unitary(self):
return False
class LinearizedSignalResponse(LinearOperator):
def __init__(self, Instrument, nonlinearity, FFT, power, m, default_spaces=None):
super(LinearizedSignalResponse, self).__init__(default_spaces)
self._target = Instrument.target
self._domain = FFT.target
self.Instrument = Instrument
self.FFT = FFT
self.power = power
position = FFT.adjoint_times(self.power * m)
self.linearization = nonlinearity.derivative(position)
def _times(self, x, spaces):
return self.Instrument(self.linearization * self.FFT.adjoint_times(self.power * x))
def _adjoint_times(self, x, spaces):
return self.power * self.FFT(self.linearization * self.Instrument.adjoint_times(x))
@property
def domain(self):
return self._domain
@property
def target(self):
return self._target
@property
def unitary(self):
return False
class LinearizedPowerResponse(LinearOperator):
def __init__(self, Instrument, nonlinearity, FFT, Projection, t, m, default_spaces=None):
super(LinearizedPowerResponse, self).__init__(default_spaces)
self._target = Instrument.target
self._domain = t.domain
self.Instrument = Instrument
self.FFT = FFT
self.Projection = Projection
self.power = exp(0.5 * t)
self.m = m
position = FFT.adjoint_times(
self.Projection.adjoint_times(self.power) * self.m)
self.linearization = nonlinearity.derivative(position)
def _times(self, x, spaces):
return 0.5 * self.Instrument(self.linearization
* self.FFT.adjoint_times(self.m
* self.Projection.adjoint_times(self.power * x)))
def _adjoint_times(self, x, spaces):
return 0.5 * self.power * self.Projection(self.m.conjugate()
* self.FFT(self.linearization
* self.Instrument.adjoint_times(x))) # .weight(-1)
@property
def domain(self):
return self._domain
@property
def target(self):
return self._target
@property
def unitary(self):
return False
class SignalResponse(LinearOperator):
def __init__(self, t, FFT, R, default_spaces=None):
super(SignalResponse, self).__init__(default_spaces)
self._domain = FFT.target
self._target = R.target
self.power = exp(t).power_synthesize(
mean=1, std=0, real_signal=False)
self.R = R
self.FFT = FFT
def _times(self, x, spaces=None):
return self.R(self.FFT.adjoint_times(self.power * x))
def _adjoint_times(self, x, spaces=None):
return self.power * self.FFT(self.R.adjoint_times(x))
@property
def domain(self):
return self._domain
@property
def target(self):
return self._target
@property
def unitary(self):
return False
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment