...
 
Commits (163)
......@@ -5,6 +5,7 @@ from .domains import *
from .domain_tuple import DomainTuple
from .field import Field
from .operators import *
from .symbolic import *
from .probing.utils import probe_with_posterior_samples, probe_diagonal, \
StatCalculator
from .minimization import *
......
......@@ -15,10 +15,14 @@
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from .wiener_filter_curvature import WienerFilterCurvature
from ..utilities import memo
from ..field import Field
from ..sugar import sqrt
from ..minimization.energy import Energy
from ..operators import DiagonalOperator
from ..operators.inversion_enabler import InversionEnabler
from ..symbolic import (SymbolicCABF, SymbolicAdd, SymbolicConstant,
SymbolicQuad, SymbolicVariable, Tensor, fromNiftyNL)
from ..utilities import memo
class NonlinearWienerFilterEnergy(Energy):
......@@ -30,17 +34,33 @@ class NonlinearWienerFilterEnergy(Energy):
self.nonlinearity = nonlinearity
self.ht = ht
self.power = power
m = ht(power*position)
residual = d - Instrument(nonlinearity(m))
self.N = N
self.S = S
self.inverter = inverter
t1 = S.inverse_times(position)
t2 = N.inverse_times(residual)
self._value = 0.5 * (position.vdot(t1) + residual.vdot(t2)).real
self.R = Instrument * nonlinearity.derivative(m) * ht * power
self._gradient = (t1 - self.R.adjoint_times(t2)).lock()
pos_nl = SymbolicVariable(position.domain)
Ninv_nextgen = DiagonalOperator(sqrt(N.inverse(Field.full(N.target, 1.))))
Ninv_nextgen_nl = SymbolicConstant(Tensor(Ninv_nextgen, 2, name='Ninv_nextgen'), (-1, -1))
Sinv_nextgen = DiagonalOperator(sqrt(S.inverse(Field.full(S.target, 1.))))
Sinv_nextgen_nl = SymbolicConstant(Tensor(Sinv_nextgen, 2, name='Sinv_nextgen'), (-1, -1))
mh_nl = SymbolicCABF(SymbolicConstant(Tensor(DiagonalOperator(power), 2, name='power'), (1, -1)), pos_nl)
m_nl = SymbolicCABF(SymbolicConstant(Tensor(ht, 2, name='HT'), (1, -1)), mh_nl)
sky_nl = fromNiftyNL("nonlin", nonlinearity, m_nl)
d_nl = SymbolicConstant(Tensor(d, 1, name='d'), (1,))
MinusR_nl = SymbolicConstant(Tensor((-1) * Instrument, 2, name='-R'), (1, -1))
rec_nl = SymbolicCABF(MinusR_nl, sky_nl)
residual_nl = SymbolicAdd(d_nl, rec_nl)
residual_nextgen_nl = SymbolicCABF(Ninv_nextgen_nl, residual_nl)
pos_nextgen_nl = SymbolicCABF(Sinv_nextgen_nl, pos_nl)
likelihood_nl = SymbolicQuad(residual_nextgen_nl)
prior_nl = SymbolicQuad(pos_nextgen_nl)
energy_nl = SymbolicAdd(likelihood_nl, prior_nl)
self._value = energy_nl.eval(position)
self._gradient = energy_nl.derivative.eval(position)
self._curvature = InversionEnabler(energy_nl.curvature.eval(position),
inverter, S.inverse)
def at(self, position):
return self.__class__(position, self.d, self.Instrument,
......@@ -58,4 +78,4 @@ class NonlinearWienerFilterEnergy(Energy):
@property
@memo
def curvature(self):
return WienerFilterCurvature(self.R, self.N, self.S, self.inverter)
return self._curvature
......@@ -27,6 +27,8 @@ class ChainOperator(LinearOperator):
if not _callingfrommake:
raise NotImplementedError
super(ChainOperator, self).__init__()
if len(ops) == 0:
raise ValueError("ops is empty")
self._ops = ops
self._capability = self._all_ops
for op in ops:
......
......@@ -94,6 +94,10 @@ class DiagonalOperator(EndomorphicOperator):
self._ldiag = diagonal.local_data
self._update_diagmin()
def __str__(self):
return 'Diag({})'.format(self(ift.full(self._domain, 1.))
.to_global_data())
def _update_diagmin(self):
self._ldiag.flags.writeable = False
if not np.issubdtype(self._ldiag.dtype, np.complexfloating):
......
......@@ -161,6 +161,8 @@ class LinearOperator(NiftyMetaBase()):
def __rsub__(self, other):
from .sum_operator import SumOperator
if np.isscalar(other) and other == 0.:
return -self
other = self._toOperator(other, self.domain)
return SumOperator.make([other, self], [False, True])
......
......@@ -57,6 +57,9 @@ class ScalingOperator(EndomorphicOperator):
self._factor = factor
self._domain = makeDomain(domain)
def __str__(self):
return 'Scale({})'.format(self._factor)
def apply(self, x, mode):
self._check_input(x, mode)
......
from .add import SymbolicAdd
from .adjoint import SymbolicAdjoint
from .chain import SymbolicChain
from .constant import SymbolicConstant
from .contractions import (SymbolicCABF, SymbolicCABL, SymbolicApplyForm, SymbolicChainLinOps,
SymbolicChainLinOps11, SymbolicOuterProd, SymbolicQuad, SymbolicSandwich,
SymbolicVdot)
from .diag import SymbolicDiag
from .pointwise_nonlinearity import SymbolicNonlinear, fromNiftyNL
from .scalar_mul import SymbolicScalarMul
from .symbolic_tensor import SymbolicTensor
from .variable import SymbolicVariable
from .zero import SymbolicZero
from .tensor import Tensor
__all__ = ['SymbolicTensor', 'SymbolicAdd', 'SymbolicAdjoint', 'SymbolicConstant', 'SymbolicChain',
'SymbolicChainLinOps', 'SymbolicChainLinOps11', 'SymbolicCABF', 'SymbolicCABL', 'SymbolicVdot',
'SymbolicOuterProd', 'SymbolicApplyForm', 'SymbolicDiag', 'SymbolicScalarMul',
'SymbolicVariable', 'SymbolicZero', 'SymbolicSandwich',
'SymbolicQuad', 'Tensor', 'SymbolicNonlinear', 'fromNiftyNL']
from .symbolic_tensor import SymbolicTensor
class SymbolicAdd(SymbolicTensor):
def __init__(self, fst, snd):
assert fst.indices == snd.indices
super(SymbolicAdd, self).__init__(fst.indices)
self._fst = fst
self._snd = snd
@staticmethod
def make(fst, snd):
# FIXME: add special cases for two constants, for zeros etc.?
return SymbolicAdd(fst, snd)
def __str__(self):
return '{} + {}'.format(self._fst, self._snd)
def __call__(self, x):
return self._fst(x) + self._snd(x)
def eval(self, x):
A = self._fst.eval(x)
B = self._snd.eval(x)
if isinstance(A, float) and A == 0.:
return B
if isinstance(B, float) and B == 0.:
return A
return A + B
@property
def derivative(self):
return self.__class__(self._fst.derivative, self._snd.derivative)
@property
def curvature(self):
return self.__class__(self._fst.curvature, self._snd.curvature)
from .symbolic_tensor import SymbolicTensor
class SymbolicAdjoint(SymbolicTensor):
def __init__(self, thing):
assert thing.rank in [1, 2]
if thing.rank == 1:
indices = (-1 * thing.indices[0],)
else:
indices = thing.indices[::-1]
super(SymbolicAdjoint, self).__init__(indices)
self._thing = thing
@staticmethod
def make(thing):
if isinstance(thing, SymbolicAdjoint):
return thing._thing
return SymbolicAdjoint(thing)
def __str__(self):
return '{}^dagger'.format(self._thing)
def eval(self, x):
res = self._thing.eval(x)
if isinstance(res, float) and res == 0.:
return 0.
if self.rank == 2:
return res.adjoint
elif self.rank == 1:
return res.conjugate()
else:
raise NotImplementedError
@property
def derivative(self):
if self.rank == 1:
return self.__class__(self._thing.derivative, self._indices + (-1,))
raise NotImplementedError
from .symbolic_tensor import SymbolicTensor
class SymbolicChain(SymbolicTensor):
def __init__(self, outer, inner):
assert outer.rank == 1 and inner.rank == 1
super(SymbolicChain, self).__init__(outer.indices)
self._outer = outer
self._inner = inner
def __str__(self):
return '{}({})'.format(self._outer, self._inner)
def eval(self, x):
return self._outer.eval(self._inner.eval(x))
@property
def derivative(self):
return self.__class__(self._outer, self._inner.derivative)
from .tensor import Tensor
from .symbolic_tensor import SymbolicTensor
from .zero import SymbolicZero
class SymbolicConstant(SymbolicTensor):
def __init__(self, tensor, indices):
"""
Takes a tensor object and wraps it into a Nonlinear Object.
"""
assert isinstance(tensor, Tensor)
assert len(indices) == tensor.rank
super(SymbolicConstant, self).__init__(indices)
self._tensor = tensor
def __call__(self, x):
return self
def __str__(self):
return str(self._tensor)
def eval(self, x):
return self._tensor._thing
@property
def derivative(self):
return SymbolicZero(self.indices + (-1,), self._tensor.domain)
from ..extra.operator_tests import consistency_check
from ..operators import EndomorphicOperator, DiagonalOperator, SandwichOperator
from ..field import Field
from .add import SymbolicAdd
from .constant import SymbolicZero
from .symbolic_tensor import SymbolicTensor
from ..domain_tuple import DomainTuple
class SymbolicChainLinOps(SymbolicTensor):
def __init__(self, op1, op2):
assert op1.rank == 2 and op2.rank == 2
assert op1.indices[1] == -op2.indices[0]
super(SymbolicChainLinOps, self).__init__((op1.indices[0], op2.indices[1]))
self._op1 = op1
self._op2 = op2
def __str__(self):
if isinstance(self._op1, SymbolicZero) or isinstance(self._op2, SymbolicZero):
return 'Zero'
return '({} {})'.format(self._op1, self._op2)
def eval(self, x):
if isinstance(self._op1, SymbolicZero) or isinstance(self._op2, SymbolicZero):
return 0.
A = self._op1.eval(x)
B = self._op2.eval(x)
return A * B
@property
def derivative(self):
raise NotImplementedError
class SymbolicSandwich(SymbolicTensor):
def __init__(self, bun):
assert bun.rank == 2
super(SymbolicSandwich, self).__init__(2*(bun.indices[1],))
self._bun = bun
def __str__(self):
return 'Sandwich({})'.format(self._bun)
def eval(self, x):
return SandwichOperator.make(self._bun.eval(x))
@property
def derivative(self):
raise NotImplementedError
class SymbolicChainLinOps11(SymbolicTensor):
# Contract first (only for rank 2)
# FIXME Also for rank 1 (unify with ApplyForm)
# op1.indices = (-1, -1)
# op2.indices = (1, -1)
# contract first index of op1 with first index of op2
# Assuming op1 is real
def __init__(self, op1, op2):
assert op1.rank == 2 and op2.rank == 2
assert op1.indices[0] == - op2.indices[0]
super(SymbolicChainLinOps11, self).__init__((op1.indices[1], op2.indices[1]))
self._op1 = op1
self._op2 = op2
def __str__(self):
return '{} _11_ {}'.format(self._op1, self._op2)
def eval(self, x):
A = self._op2.eval(x)
B = self._op1.eval(x)
if (isinstance(A, float) and A == 0.) or (isinstance(B, float) and B == 0.):
return 0.
return A.adjoint * B
@property
def derivative(self):
raise NotImplementedError
class SymbolicCABF(SymbolicTensor):
# CABF = Contract All But First
def __init__(self, nltensor, *nlvectors):
super(SymbolicCABF, self).__init__(nltensor.indices[0:1])
self._args = nlvectors
self._nltensor = nltensor
assert len(self._nltensor.indices) == len(self._args) + 1
for ii in range(len(self._args)):
assert self._args[ii].indices[0] == - self._nltensor.indices[ii + 1]
def __str__(self):
s = '{} '.format(self._nltensor)
for vv in self._args:
s += '{}, '.format(vv)
s = s[:-2]
# s += ')'
return s
def eval(self, x):
if len(self._args) == 1:
vector = self._args[0].eval(x)
operator = self._nltensor.eval(x)
return operator(vector)
raise NotImplementedError
@property
def derivative(self):
if isinstance(self._nltensor.derivative, SymbolicZero) and len(self._args) == 1:
return SymbolicChainLinOps(self._nltensor, self._args[0].derivative)
else:
# TODO Implement more general case
raise NotImplementedError
class SymbolicCABL(SymbolicTensor):
# CABL = Contract All But Last
def __init__(self, nltensor, *nlvectors):
super(SymbolicCABL, self).__init__(nltensor.indices[-1:])
self._args = nlvectors
self._nltensor = nltensor
assert len(self._nltensor.indices) == len(self._args) + 1
for ii in range(len(self._args)):
assert self._args[ii].indices[0] == - self._nltensor.indices[ii]
def __str__(self):
s = '{}CABL('.format(self._nltensor)
for vv in self._args:
s += '{}, '.format(vv)
s = s[:-2]
s += ')'
return s
def eval(self, x):
if isinstance(self._nltensor, SymbolicZero) or self._nltensor.eval(x) == 0.:
return 0.
if len(self._args) == 1:
nlvector = self._args[0]
vector = nlvector.eval(x)
operator = self._nltensor.eval(x)
return operator.adjoint(vector).conjugate()
raise NotImplementedError
@property
def derivative(self):
# FIXME This is not the general case!!!
assert len(self._args) == 1
return SymbolicChainLinOps11(self._nltensor, self._args[0].derivative)
class SymbolicVdot(SymbolicTensor):
def __init__(self, vector1, vector2):
assert vector1.indices == vector2.indices
assert vector1.indices in [(1,), (-1,)]
super(SymbolicVdot, self).__init__(())
self._vector1 = vector1
self._vector2 = vector2
def __str__(self):
return '({}).vdot({})'.format(self._vector1, self._vector2)
def eval(self, x):
return self._vector1.eval(x).vdot(self._vector2.eval(x))
@property
def derivative(self):
A = SymbolicCABL(self._vector1.derivative, self._vector2.adjoint)
B = SymbolicCABL(self._vector2.derivative, self._vector1.adjoint)
return SymbolicAdd(A, B)
class SymbolicQuad(SymbolicTensor):
def __init__(self, thing):
assert thing.rank == 1
super(SymbolicQuad, self).__init__(())
self._thing = thing
def __str__(self):
return '0.5 * |{}|**2'.format(self._thing)
def eval(self, x):
a = self._thing.eval(x)
return 0.5 * a.vdot(a)
@property
def derivative(self):
return SymbolicCABL(self._thing.derivative, self._thing.adjoint)
@property
def curvature(self):
# This is Jakob's curvature
return SymbolicSandwich(self._thing.derivative)
class RowOperator(EndomorphicOperator):
def __init__(self, field):
super(RowOperator, self).__init__()
if not isinstance(field, Field):
raise TypeError("Field object required")
self._field = field
self._domain = DomainTuple.make(field.domain)
def apply(self, x, mode):
self._check_input(x, mode)
if mode == self.TIMES:
return Field.full(self.target, self._field.vdot(x))
else:
return self._field * x.sum()
@property
def domain(self):
return self._domain
@property
def capability(self):
return self.TIMES | self.ADJOINT_TIMES
def OuterOperator(field, row_operator):
return DiagonalOperator(field) * row_operator
class SymbolicOuterProd(SymbolicTensor):
def __init__(self, snd, fst):
""" Computes A = fst*snd """
assert snd.indices == (-1,)
assert fst.indices in [(1,), (-1,)]
super(SymbolicOuterProd, self).__init__(fst.indices + snd.indices)
self._snd = snd
self._fst = fst
def __str__(self):
return '{} outer {}'.format(self._snd, self._fst)
def eval(self, x):
if isinstance(self._snd, SymbolicZero) or isinstance(self._fst, SymbolicZero):
return 0.
op = OuterOperator(self._fst.eval(x), RowOperator(self._snd.eval(x)))
consistency_check(op)
return op
@property
def derivative(self):
raise NotImplementedError
class SymbolicApplyForm(SymbolicTensor):
def __init__(self, form, vector):
assert vector.indices == (1,)
assert form.indices == (-1,)
super(SymbolicApplyForm, self).__init__(())
self._vector = vector
self._form = form
self._indices = ()
def __str__(self):
return '{}.applyForm({})'.format(self._form, self._vector)
def eval(self, x):
return self._form.eval(x).vdot(self._vector.eval(x))
@property
def derivative(self):
A = SymbolicCABL(self._form.derivative, self._vector)
B = SymbolicCABL(self._vector.derivative, self._form)
return SymbolicAdd(A, B)
from ..operators import DiagonalOperator
from .symbolic_tensor import SymbolicTensor
class SymbolicDiag(SymbolicTensor):
def __init__(self, diag):
assert diag.rank == 1
super(SymbolicDiag, self).__init__((diag.indices[0], -diag.indices[0]))
self._diag = diag
def __call__(self, x):
raise NotImplementedError
def __str__(self):
return 'diag({})'.format(self._diag)
def eval(self, x):
return DiagonalOperator(self._diag.eval(x))
@property
def derivative(self):
raise NotImplementedError
from ..sugar import exp, tanh
from .contractions import SymbolicChainLinOps
from .diag import SymbolicDiag
from .symbolic_tensor import SymbolicTensor
class PointwiseNonlinearity(SymbolicTensor):
def __init__(self, inner):
assert inner.rank == 1
super(PointwiseNonlinearity, self).__init__(inner.indices)
self._inner = inner
def __call__(self, x):
raise NotImplementedError
class SymbolicNonlinear(PointwiseNonlinearity):
def __init__(self, name, func, inner, deriv=None):
super(SymbolicNonlinear, self).__init__(inner)
self._name = name
self._func = func
self._deriv = deriv
def __str__(self):
return '{}({})'.format(self._name, self._inner)
def eval(self, x):
return self._func(self._inner.eval(x))
@property
def derivative(self):
if self._deriv is None:
return NotImplementedError
tmp = SymbolicNonlinear(self._name+"'", self._deriv, self._inner)
return SymbolicChainLinOps(SymbolicDiag(tmp), self._inner.derivative)
def fromNiftyNL(name, NL, inner):
return SymbolicNonlinear(name, NL.__call__, inner, NL.derivative)
from ..field import Field
from .add import SymbolicAdd
from .contractions import SymbolicOuterProd
from .symbolic_tensor import SymbolicTensor
class SymbolicScalarMul(SymbolicTensor):
def __init__(self, nltensor, nlscalar):
assert isinstance(nltensor, SymbolicTensor)
assert isinstance(nlscalar, SymbolicTensor)
assert nlscalar.rank == 0
super(SymbolicScalarMul, self).__init__(nltensor._indices)
self._nltensor = nltensor
self._nlscalar = nlscalar
def __str__(self):
return '({}) x ({})'.format(self._nlscalar, self._nltensor)
def eval(self, x):
scalar = self._nlscalar.eval(x)
if isinstance(scalar, Field) and len(scalar.domain) == 0:
scalar = scalar.to_global_data()[()]
return scalar * self._nltensor.eval(x)
@property
def derivative(self):
if self._nltensor.rank == 0 and self._nlscalar.rank == 0:
A = self.__class__(self._nltensor.derivative, self._nlscalar)
B = self.__class__(self._nlscalar.derivative, self._nltensor)
elif self._nltensor.rank == 1:
A = self.__class__(self._nltensor.derivative, self._nlscalar)
B = SymbolicOuterProd(self._nlscalar.derivative, self._nltensor)
else:
raise NotImplementedError
return SymbolicAdd(A, B)
class SymbolicTensor(object):
def __init__(self, indices):
self._indices = indices
def __call__(self, x):
from .contractions import SymbolicChain
return SymbolicChain(self, x)
@property
def indices(self):
return self._indices
@property
def rank(self):
return len(self._indices)
def eval(self, x):
raise NotImplementedError
@property
def derivative(self):
raise NotImplementedError
@property
def adjoint(self):
if self.rank in [1, 2]:
from .adjoint import SymbolicAdjoint
return SymbolicAdjoint.make(self)
raise NotImplementedError
from ..operators.linear_operator import LinearOperator
from ..operators.scaling_operator import ScalingOperator
from ..operators.diagonal_operator import DiagonalOperator
from ..field import Field
class Tensor(object):
def __init__(self, thing, rank, domain=None, name=None):
"""
thing: Can be a LinearOperator, Field or a Scalar.
"""
assert isinstance(thing, LinearOperator) or isinstance(thing, Field) or isinstance(thing, (int, float))
self.rank = rank
self._name = name
# Rank 2
if self.rank == 2:
if isinstance(thing, LinearOperator):
self._thing = thing
elif isinstance(thing, Field) and thing.domain == domain:
self._thing = DiagonalOperator(thing)
elif isinstance(thing, Field) and thing.domain != domain and len(thing.domain) == 0:
assert domain is not None
self._thing = ScalingOperator(thing[()], domain)
elif isinstance(thing, (float, int)) and domain is not None:
self._thing = ScalingOperator(thing, domain)
else:
raise ValueError
# Rank 1
elif self.rank == 1:
if domain is not None and thing.domain != domain and len(thing.domain) == 0:
thing = Field(domain, thing.val)
elif isinstance(thing, Field):
self._thing = thing
else:
raise ValueError
# Rank 0
elif self.rank == 0:
if isinstance(thing, Field) and len(thing.domain) == 0:
self._thing = thing
elif isinstance(thing, (float, int)):
self._thing = Field((), thing)
else:
raise ValueError
else:
raise NotImplementedError
@property
def domain(self):
return self._thing.domain
def __str__(self):
return str(self._thing) if self._name is None else self._name
def __add__(self, other):
assert self.rank == other.rank
return self.__class__(self._thing + other._thing, self.rank)
from .tensor import Tensor
from .constant import SymbolicConstant
from .symbolic_tensor import SymbolicTensor
class SymbolicVariable(SymbolicTensor):
def __init__(self, domain):
super(SymbolicVariable, self).__init__((1,))
self._domain = domain
def __call__(self, x):
raise ValueError
def __str__(self):
return 'var'
def eval(self, x):
return x
@property
def derivative(self):
return SymbolicConstant(Tensor(1., 2, self._domain),
self._indices + (-1, ))
from .symbolic_tensor import SymbolicTensor
class SymbolicZero(SymbolicTensor):
def __init__(self, indices, domain=None):
super(SymbolicZero, self).__init__(indices)
self._domain = domain
def __call__(self, x):
return self
def __str__(self):
return 'Zero'
def eval(self, x):
return 0.
# FIXME This situation here is suboptimal. It would be much better to
# return the actual NIFTy objects.
@property
def derivative(self):
return self.__class__(self.indices + (-1,))
import unittest
import nifty4 as ift
import numpy as np
from numpy.testing import assert_allclose
class NonlinearTests(unittest.TestCase):
def make(self):
space = ift.RGSpace(2)
self.x = ift.from_global_data(space, np.array([2., 5.]))
self.a = ift.SymbolicVariable(space)
self.S = ift.DiagonalOperator(ift.Field(space, 2.))
@staticmethod
def takeOp1D1D(op, at, out):
gradient = op.derivative.eval(at)
dom = gradient.domain
grad1 = gradient(ift.from_global_data(dom, np.array([1., 0.]))).to_global_data()
grad2 = gradient(ift.from_global_data(dom, np.array([0., 1.]))).to_global_data()
grad = np.array([grad1, grad2])
assert_allclose(grad, out)
def test_const(self):
# E = a
self.make()
E = self.a
res = E.eval(self.x).local_data
assert_allclose(res, self.x.local_data)
self.takeOp1D1D(E, self.x, np.diagflat(np.ones(2)))
def test_const2(self):
# E = (2*Id)(a)
self.make()
A = ift.SymbolicConstant(ift.Tensor(self.S, 2), (-1, -1))
E = ift.SymbolicCABF(A, self.a)
res = E.eval(self.x).local_data
Sdiag = self.S(ift.Field.full(self.S.domain, 1.))
true_res = (Sdiag * self.x).local_data
# Test function evaluation
assert_allclose(res, true_res)
# Test gradient
self.takeOp1D1D(E, self.x, np.diagflat(Sdiag.to_global_data()))
def test_with_crossterms(self):
# E = a * |a|**2
self.make()
E = ift.SymbolicScalarMul(self.a, ift.SymbolicVdot(self.a, self.a))
res = E.eval(self.x).local_data
res_true = (self.x * self.x.vdot(self.x)).local_data
assert_allclose(res, res_true)
x1, x2 = self.x.to_global_data()
self.takeOp1D1D(E, self.x, np.array([[3*x1**2+x2**2, 2*x1*x2], [2*x1*x2, 3*x2**2+x1**2]]))
def test_priorEnergy(self):
# E = a^dagger (2*Id)(a)
self.make()
A = ift.SymbolicConstant(ift.Tensor(self.S, 2), (-1, -1))
E = ift.SymbolicApplyForm(ift.SymbolicCABF(A, self.a), self.a)
res = E.eval(self.x)
assert_allclose(res, 58)
gradient = E.derivative.eval(self.x)
assert_allclose(gradient.local_data, 4 * self.x.local_data)
curv = E.derivative.derivative
curv = curv.eval(self.x)
curv_true = 2 * self.S
assert_allclose((curv-curv_true)(ift.Field.from_random('normal', curv.domain)).local_data, ift.Field.full(curv.domain, 0.).local_data)
def test_nonlinearity(self):
# E = exp(a)
self.make()
E = ift.SymbolicNonlinear("exp", ift.exp, self.a, ift.exp)
res = E.eval(self.x).to_global_data()
assert_allclose(res, np.exp(self.x.to_global_data()))
self.takeOp1D1D(E, self.x, np.diagflat(res))
def test_nonlinearpriorEnergy(self):
# E = 0.5 * exp(a)^dagger S exp(a)
self.make()
exp_a = ift.SymbolicNonlinear("exp", ift.exp, self.a, ift.exp)
A = ift.SymbolicConstant(ift.Tensor(0.5 * self.S, 2), (-1, -1))
E = ift.SymbolicApplyForm(ift.SymbolicCABF(A, exp_a), exp_a)
res = E.eval(self.x)
res_true = 0.5 * ift.exp(self.x).vdot(self.S(ift.exp(self.x)))
assert_allclose(res, res_true)
gradient = E.derivative.eval(self.x)
gradient_true = (ift.DiagonalOperator(ift.exp(self.x)) * self.S)(ift.exp(self.x)).local_data
assert_allclose(gradient.local_data, gradient_true)
curv = E.derivative.derivative
curv = curv.eval(self.x)
curv_true = ift.DiagonalOperator(ift.exp(self.x)) * self.S * ift.DiagonalOperator(ift.exp(self.x))
assert_allclose((curv-curv_true)(ift.Field.from_random('normal', curv.domain)).local_data, ift.Field.full(curv.domain, 0.).local_data)
def test_quad(self):
self.make()
S = ift.SymbolicConstant(ift.Tensor(self.S, 2), (1, -1))
E = ift.SymbolicQuad(ift.SymbolicCABF(S, self.a))
res = E.eval(self.x)
assert_allclose(res, 0.5 * self.x.vdot((self.S.adjoint*self.S)(self.x)))
gradient = E.derivative.eval(self.x)
assert_allclose(gradient.local_data, (self.S.adjoint*self.S)(self.x).local_data)
curv = E.curvature
curv = curv.eval(self.x)
curv_true = self.S.adjoint*self.S
assert_allclose((curv-curv_true)(ift.Field.from_random('normal', curv.domain)).local_data, ift.Field.full(curv.domain, 0.).local_data)