Skip to content
Snippets Groups Projects
Commit ce40f30f authored by Jakob Knollmueller's avatar Jakob Knollmueller
Browse files

all ingredients for critical filtering, nothing tested

parent 39263e56
No related branches found
No related tags found
1 merge request!120Working on demos
Pipeline #
......@@ -258,7 +258,145 @@ class Field(Loggable, Versionable, object):
return result_field
def _calculate_power_spectrum(self, x, pindex, rho, axes=None):
def project_power(self, spaces=None, logarithmic=False, nbin=None,
binbounds=None, decompose_power=True):
""" Computes the quadratic power projection for a subspace of the Field.
Creates a PowerSpace for the space addressed by `spaces` with the given
binning and computes the quadratic power projection as a Field over this
PowerSpace. This can only be done if the subspace to be analyzed is a
harmonic space.
Parameters
----------
spaces : int *optional*
The subspace for which the power projection shall be computed
(default : None).
if spaces==None : Tries to synthesize for the whole domain
logarithmic : boolean *optional*
True if the output PowerSpace should use logarithmic binning.
{default : False}
nbin : int *optional*
The number of bins the resulting PowerSpace shall have
(default : None).
if nbin==None : maximum number of bins is used
binbounds : array-like *optional*
Inner bounds of the bins (default : None).
if binbounds==None : bins are inferred. Overwrites nbins and log
decompose_power : boolean, *optional*
Whether the analysed signal-space Field is intrinsically real or
complex and if the projected power shall therefore be computed
for the real and the imaginary part of the Field separately
(default : True).
Raise
-----
ValueError
Raised if
*len(domain) is != 1 when spaces==None
*len(spaces) is != 1 if not None
*the analyzed space is not harmonic
Returns
-------
out : Field
The output object. It's domain is a PowerSpace and it contains
the quadratic power projection of 'self's field.
See Also
--------
power_synthesize, PowerSpace, power_analyze
"""
# check if all spaces in `self.domain` are either harmonic or
# power_space instances
for sp in self.domain:
if not sp.harmonic and not isinstance(sp, PowerSpace):
self.logger.info(
"Field has a space in `domain` which is neither "
"harmonic nor a PowerSpace.")
# check if the `spaces` input is valid
spaces = utilities.cast_axis_to_tuple(spaces, len(self.domain))
if spaces is None:
if len(self.domain) == 1:
spaces = (0,)
else:
raise ValueError(
"Field has multiple spaces as domain "
"but `spaces` is None.")
if len(spaces) == 0:
raise ValueError(
"No space for analysis specified.")
elif len(spaces) > 1:
raise ValueError(
"Conversion of only one space at a time is allowed.")
space_index = spaces[0]
if not self.domain[space_index].harmonic:
raise ValueError(
"The analyzed space must be harmonic.")
# Create the target PowerSpace instance:
# If the associated signal-space field was real, we extract the
# hermitian and anti-hermitian parts of `self` and put them
# into the real and imaginary parts of the projected power.
# If it was complex, all the power is put into a real power projection
distribution_strategy = \
self.val.get_axes_local_distribution_strategy(
self.domain_axes[space_index])
harmonic_domain = self.domain[space_index]
power_domain = PowerSpace(harmonic_partner=harmonic_domain,
distribution_strategy=distribution_strategy,
logarithmic=logarithmic, nbin=nbin,
binbounds=binbounds)
# extract pindex and rho from power_domain
pindex = power_domain.pindex
if decompose_power:
hermitian_part, anti_hermitian_part = \
harmonic_domain.hermitian_decomposition(
self.val,
axes=self.domain_axes[space_index])
[hermitian_power, anti_hermitian_power] = \
[self._calculate_projected_power(
x=part,
pindex=pindex,
axes=self.domain_axes[space_index])
for part in [hermitian_part, anti_hermitian_part]]
power_projection = hermitian_power + 1j * anti_hermitian_power
else:
power_projection = self._calculate_projected_power(
x=self.val,
pindex=pindex,
axes=self.domain_axes[space_index])
# create the result field and put power_spectrum into it
result_domain = list(self.domain)
result_domain[space_index] = power_domain
if decompose_power:
result_dtype = np.complex
else:
result_dtype = np.float
result_field = self.copy_empty(
domain=result_domain,
dtype=result_dtype,
distribution_strategy=power_projection.distribution_strategy)
result_field.set_val(new_val=power_projection, copy=False)
return result_field
def _calculate_projected_power(self, x, pindex, axes=None):
fieldabs = abs(x)
fieldabs **= 2
......@@ -268,8 +406,12 @@ class Field(Loggable, Versionable, object):
target_shape=x.shape,
target_strategy=x.distribution_strategy,
axes=axes)
power_spectrum = pindex.bincount(weights=fieldabs,
projected_power = pindex.bincount(weights=fieldabs,
axis=axes)
return projected_power
def _calculate_power_spectrum(self, x, pindex, rho, axes=None):
power_spectrum = self._calculate_projected_power(x, pindex, axes)
if axes is not None:
new_rho_shape = [1, ] * len(power_spectrum.shape)
new_rho_shape[axes[0]] = len(rho)
......
from nifty.energies.energy import Energy
from nifty.library.operator_library import CriticalPowerCurvature
from nifty.library.operator_library import CriticalPowerCurvature,\
SmoothnessOperator
from nifty.sugar import generate_posterior_sample
from nifty import Field
from nifty import Field, exp
class CriticalPowerEnergy(Energy):
"""The Energy for the Gaussian lognormal case.
......@@ -21,35 +22,44 @@ class CriticalPowerEnergy(Energy):
The prior signal covariance in harmonic space.
"""
def __init__(self, position, m, D=None, alpha =1.0, q=0, w=None, samples=3):
def __init__(self, position, m, D=None, alpha =1.0, q=0, sigma=0, w=None, samples=3):
super(CriticalPowerEnergy, self).__init__(position = position)
self.m = m
self.D = D
self.samples = samples
self.sigma = sigma
self.alpha = alpha
self.q = q
self.T = SmoothnessOperator(domain=self.position.domain, sigma=self.sigma)
self.rho = self.position.domain.rho
if w is None:
self._calculate_w(self.m, D)
self.w = self._calculate_w(self.m, self.D, self.samples)
self.theta = exp(-self.position) * (self.q + w / 2.)
def at(self, position):
return self.__class__(position, self.d, self.R, self.N, self.S)
return self.__class__(position, self.m, D=self.D,
alpha =self.alpha,
q=self.q,
sigma=self.sigma, w=self.w,
samples=self.samples)
@property
def value(self):
energy = 0.5 * self.position.dot(self.S.inverse_times(self.position))
energy += 0.5 * (self.d - self.R(self.position)).dot(
self.N.inverse_times(self.d - self.R(self.position)))
energy = self.theta.sum()
energy += self.position.dot(self.alpha - 1 + self.rho / 2.)
energy += 0.5 * self.position.dot(self.T(self.position))
return energy.real
@property
def gradient(self):
gradient = self.S.inverse_times(self.position)
gradient -= self.R.derived_adjoint_times(
self.N.inverse_times(self.d - self.R(self.position)), self.position)
gradient = - self.theta
gradient += self.alpha - 1 + self.rho / 2.
gradient += self.T(self.position)
return gradient
@property
def curvature(self):
curvature =CriticalPowerCurvature(R=self.R,
N=self.N,
S=self.S,
position=self.position)
curvature = CriticalPowerCurvature(theta=self.theta, T = self.T)
return curvature
def _calculate_w(self, m, D, samples):
......@@ -57,13 +67,13 @@ class CriticalPowerEnergy(Energy):
if D is not None:
for i in range(samples):
posterior_sample = generate_posterior_sample(m, D)
projected_sample =posterior_sample.power_analyze()**2
projected_sample =posterior_sample.project_power(domain=self.position.domain)
w += projected_sample
w /= float(samples)
else:
pass
w = m.project_power(domain=self.position.domain)
return w / float(samples)
return w
from wiener_filter_curvature import WienerFilterCurvature
from nonlinear_wiener_filter_curvature import NonlinearWienerFilterCurvature
from critical_power_curvature import CriticalPowerCurvature
\ No newline at end of file
from critical_power_curvature import CriticalPowerCurvature
from laplace_operator import LaplaceOperator
from smoothness_operator import SmoothnessOperator
\ No newline at end of file
......@@ -3,15 +3,13 @@ from nifty.operators import EndomorphicOperator,\
class CriticalPowerCurvature(InvertibleOperatorMixin, EndomorphicOperator):
def __init__(self, R, N, S, position, inverter=None, preconditioner=None):
def __init__(self, theta, T, inverter=None, preconditioner=None):
self.R = R
self.N = N
self.S = S
self.position = position
self.theta = theta
self.T = T
if preconditioner is None:
preconditioner = self.S.times
self._domain = self.S.domain
preconditioner = self.T.times
self._domain = self.T.domain
super(CriticalPowerCurvature, self).__init__(inverter=inverter,
preconditioner=preconditioner)
@property
......@@ -29,7 +27,4 @@ class CriticalPowerCurvature(InvertibleOperatorMixin, EndomorphicOperator):
# ---Added properties and methods---
def _times(self, x, spaces):
return self.R.derived_adjoint_times(
self.N.inverse_times(self.R.derived_times(
x, self.position)), self.position)\
+ self.S.inverse_times(x)
return self.T(x) + self.theta * x
import numpy as np
from nifty import Field,\
EndomorphicOperator,\
PowerSpace
class LaplaceOperator(EndomorphicOperator):
def __init__(self, domain,
default_spaces=None):
super(LaplaceOperator, self).__init__(default_spaces)
if (domain is not None):
if(not isinstance(domain, PowerSpace)):
raise TypeError("The domain has to live over a PowerSpace")
self._domain = self._parse_domain(domain)
"""
input parameters:
domain- can only live over one domain
to do:
correct implementation of D20 object
"""
@property
def target(self):
return self._domain
@property
def domain(self):
return self._domain
@property
def unitary(self):
return False
@property
def symmetric(self):
return False
@property
def self_adjoint(self):
return False
def _times(self, t, spaces):
if t.val.distribution_strategy != 'not':
self.logger.warn("distribution_strategy should be 'not' but is %s"
% t.val.distribution_strategy)
array = t.val.get_full_data()
else:
array = t.val.get_local_data(copy=False)
ret = 2 * array - np.append(0, array[:-1]) - np.append(array[1:], 0)
ret[0] = 0
ret[1] = 0
ret[-1] = 0
return Field(self.domain, val=ret)
def _adjoint_times(self, t, spaces):
if t.val.distribution_strategy != 'not':
self.logger.warn("distribution_strategy should be 'not' but is %s"
% t.val.distribution_strategy)
array = t.val.get_full_data()
else:
array = t.val.get_local_data(copy=False)
ret = 2 * array - np.append(0, array[:-1]) - np.append(array[1:], 0)
ret[0] = 0
ret[1] = -array[2]
ret[2] = 2*array[2]-array[3]
ret[-1] = -array[-2]
ret[-2] = -array[-3]+2*array[-2]
return Field(self.domain, val=ret)
\ No newline at end of file
from nifty import EndomorphicOperator,\
PowerSpace
from laplace_operator import LaplaceOperator
class SmoothnessOperator(EndomorphicOperator):
def __init__(self, domain, sigma=1.,
default_spaces=None):
super(SmoothnessOperator, self).__init__(default_spaces)
if (not isinstance(domain, PowerSpace)):
raise TypeError("The domain has to live over a PowerSpace")
self._domain = self._parse_domain(domain)
if (sigma <= 0):
raise ValueError("ERROR: invalid sigma.")
self._sigma = sigma
self._Laplace = LaplaceOperator(domain=self._domain[0])
"""
SmoothnessOperator
input parameters:
domain: Domain of the field, has to be a single PowerSpace
sigma: specifying the expected roughness, has to be a positive float
"""
@property
def sigma(self):
return self._sigma
@property
def target(self):
return self._domain
@property
def domain(self):
return self._domain
@property
def unitary(self):
return False
@property
def symmetric(self):
return False
@property
def self_adjoint(self):
return False
def _times(self, t, spaces):
res = self._Laplace.adjoint_times(self._Laplace.times(t))
return (1/self.sigma)**2*res
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment