Commit a65cc4fa authored by Martin Reinecke's avatar Martin Reinecke

allow also RGSpace convolution

parent 6fcd6fb6
Pipeline #44236 passed with stages
in 8 minutes and 46 seconds
import numpy as np
import nifty5 as ift
def convtest(test_signal, delta, func):
domain = test_signal.domain
codom = domain[0].get_default_codomain()
# Create Convolution Operator
conv_op = ift.FuncConvolutionOperator(domain, func)
# Convolve, Adjoint-Convolve
conv_signal = conv_op(signal)
cac_signal = conv_op.adjoint_times(conv_signal)
print(signal.integrate(), conv_signal.integrate(), cac_signal.integrate())
# generate kernel image
conv_delta = conv_op(delta)
# Plot results
plot = ift.Plot()
plot.add(signal, title='Signal')
plot.add(conv_signal, title='Signal Convolved')
plot.add(cac_signal, title='Signal, Conv, Adj-Conv')
plot.add(conv_delta, title='Kernel')
plot.output()
# Healpix test
nside = 64
npix = 12 * nside * nside
domain = ift.HPSpace(nside)
# Define test signal (some point sources)
signal_vals = np.zeros(npix, dtype=np.float64)
for i in range(0, npix, npix//12 + 27):
signal_vals[i] = 1.
signal = ift.from_global_data(domain, signal_vals)
delta_vals = np.zeros(npix, dtype=np.float64)
delta_vals[0] = 1.0
delta = ift.from_global_data(domain, delta_vals)
# Define kernel function
def func(theta):
ct = np.cos(theta)
return 1. * np.logical_and(ct > 0.7, ct <= 0.8)
convtest(signal, delta, func)
domain = ift.RGSpace((100, 100))
# Define test signal (some point sources)
signal_vals = np.zeros(domain.shape, dtype=np.float64)
signal_vals[35, 70] = 1
signal_vals[45, 8] = 1
signal = ift.from_global_data(domain, signal_vals)
# Define delta signal, generate kernel image
delta_vals = np.zeros(domain.shape, dtype=np.float64)
delta_vals[0, 0] = 1.0
delta = ift.from_global_data(domain, delta_vals)
# Define kernel function
def func(dist):
return 1. * np.logical_and(dist > 0.1, dist <= 0.2)
convtest(signal, delta, func)
import numpy as np
import nifty5 as ift
# Define domains
nside = 64
npix = 12 * nside * nside
domain = ift.HPSpace(nside)
dom_tuple = ift.DomainTuple.make(domain)
codom = domain.get_default_codomain()
# Define test signal (some point sources)
signal_vals = np.zeros(npix, dtype=np.float64)
for i in range(0, npix, npix//12 + 27):
signal_vals[i] = 1.
signal = ift.from_global_data(dom_tuple, signal_vals)
# Define kernel function
def func(theta):
ct = np.cos(theta)
return 1. * np.logical_and(ct > 0.7, ct <= 0.8)
# Create Convolution Operator
conv_op = ift.SphericalFuncConvolutionOperator(dom_tuple, func)
# Convolve, Adjoint-Convolve
conv_signal = conv_op(signal)
cac_signal = conv_op.adjoint_times(conv_signal)
print(signal.integrate(), conv_signal.integrate(), cac_signal.integrate())
# Define delta signal, generate kernel image
delta_vals = np.zeros(npix, dtype=np.float64)
delta_vals[0] = 1.0
delta = ift.from_global_data(domain, delta_vals)
conv_delta = conv_op(delta)
# Plot results
plot = ift.Plot()
plot.add(signal, title='Signal')
plot.add(conv_signal, title='Signal Convolved')
plot.add(cac_signal, title='Signal, Conv, Adj-Conv')
plot.add(conv_delta, title='Kernel')
plot.output()
......@@ -50,7 +50,7 @@ from .operators.value_inserter import ValueInserter
from .operators.energy_operators import (
EnergyOperator, GaussianEnergy, PoissonianEnergy, InverseGammaLikelihood,
BernoulliEnergy, StandardHamiltonian, AveragedEnergy)
from .operators.convolution_operators import SphericalFuncConvolutionOperator
from .operators.convolution_operators import FuncConvolutionOperator
from .probing import probe_with_posterior_samples, probe_diagonal, \
StatCalculator
......
......@@ -90,9 +90,7 @@ class RGSpace(StructuredDomain):
def scalar_dvol(self):
return self._dvol
def get_k_length_array(self):
if (not self.harmonic):
raise NotImplementedError
def _get_dist_array(self):
ibegin = dobj.ibegin_from_shape(self._shape)
res = np.arange(self.local_shape[0], dtype=np.float64) + ibegin[0]
res = np.minimum(res, self.shape[0]-res)*self.distances[0]
......@@ -106,6 +104,11 @@ class RGSpace(StructuredDomain):
res = np.add.outer(res, tmp)
return Field.from_local_data(self, np.sqrt(res))
def get_k_length_array(self):
if (not self.harmonic):
raise NotImplementedError
return self._get_dist_array()
def get_unique_k_lengths(self):
if (not self.harmonic):
raise NotImplementedError
......@@ -146,6 +149,27 @@ class RGSpace(StructuredDomain):
raise NotImplementedError
return lambda x: self._kernel(x, sigma)
def get_conv_kernel_from_func(self, func):
"""Creates a convolution kernel defined by a function.
Parameters
----------
func: function
This function needs to take exactly one argument, which is
distance from center (in the same units as the RGSpace distances),
and return the kernel amplitude at that distance.
Assumes the function to be radially symmetric,
e.g. only dependant on distance"""
from ..operators.harmonic_operators import HarmonicTransformOperator
if (not self.harmonic):
raise NotImplementedError
op = HarmonicTransformOperator(self, self.get_default_codomain())
dist = op.target[0]._get_dist_array()
kernel = Field.from_local_data(op.target, func(dist.local_data))
kernel = kernel / kernel.integrate()
return op.adjoint_times(kernel.weight(1))
def get_default_codomain(self):
"""Returns a :class:`RGSpace` object representing the (position or
harmonic) partner domain of `self`, depending on `self.harmonic`.
......
......@@ -17,56 +17,57 @@
import numpy as np
from ..domains.rg_space import RGSpace
from ..domains.lm_space import LMSpace
from ..domains.hp_space import HPSpace
from ..domains.gl_space import GLSpace
from .endomorphic_operator import EndomorphicOperator
from .harmonic_operators import HarmonicTransformOperator
from .diagonal_operator import DiagonalOperator
from .simple_linear_operators import WeightApplier
from ..domain_tuple import DomainTuple
from ..field import Field
from .. import utilities
def SphericalFuncConvolutionOperator(domain, func):
def FuncConvolutionOperator(domain, func, space=None):
"""Convolves input with a radially symmetric kernel defined by `func`
Parameters
----------
domain: DomainTuple
Domain of the operator. Must have exactly one entry, which is
of type `HPSpace` or `GLSpace`.
Domain of the operator.
func: function
This function needs to take exactly one argument, which is
colatitude in radians, and return the kernel amplitude at that
colatitude.
space: int, optional
The index of the subdomain on which the operator should act
If None, it is set to 0 if `domain` contains exactly one space.
`domain[space]` must be of type `RGSpace`, `HPSpace`, or `GLSpace`.
"""
if len(domain) != 1:
raise ValueError("need exactly one domain")
if not isinstance(domain[0], (HPSpace, GLSpace)):
raise TypeError("need a spherical domain")
kernel = domain[0].get_default_codomain().get_conv_kernel_from_func(func)
return _SphericalConvolutionOperator(domain, kernel)
domain = DomainTuple.make(domain)
space = utilities.infer_space(domain, space)
if not isinstance(domain[space], (RGSpace, HPSpace, GLSpace)):
raise TypeError("unsupported domain")
codomain = domain[space].get_default_codomain()
kernel = codomain.get_conv_kernel_from_func(func)
return _ConvolutionOperator(domain, kernel, space)
class _SphericalConvolutionOperator(EndomorphicOperator):
"""Convolves with kernel living on the appropriate LMSpace"""
def __init__(self, domain, kernel):
if len(domain) != 1:
raise ValueError("need exactly one domain")
if len(kernel.domain) != 1:
raise ValueError("kernel needs exactly one domain")
if not isinstance(domain[0], (HPSpace, GLSpace)):
raise TypeError("need a spherical domain")
self._domain = domain
self.lm = domain[0].get_default_codomain()
if self.lm != kernel.domain[0]:
raise ValueError("Input domain and kernel are incompatible")
self.kernel = kernel
self.HT = HarmonicTransformOperator(self.lm, domain[0])
self._capability = self.TIMES | self.ADJOINT_TIMES
def apply(self, x, mode):
self._check_input(x, mode)
x_lm = self.HT.adjoint_times(x.weight(1))
x_lm = x_lm * self.kernel * (4. * np.pi)
return self.HT(x_lm)
def _ConvolutionOperator(domain, kernel, space=None):
domain = DomainTuple.make(domain)
space = utilities.infer_space(domain, space)
if len(kernel.domain) != 1:
raise ValueError("kernel needs exactly one domain")
if not isinstance(domain[space], (HPSpace, GLSpace, RGSpace)):
raise TypeError("need RGSpace, HPSpace, or GLSpace")
lm = [d for d in domain]
lm[space] = lm[space].get_default_codomain()
lm = DomainTuple.make(lm)
if lm[space] != kernel.domain[0]:
raise ValueError("Input domain and kernel are incompatible")
HT = HarmonicTransformOperator(lm, domain[space], space)
diag = DiagonalOperator(kernel*domain[space].total_volume, lm, (space,))
wgt = WeightApplier(domain, space, 1)
return HT(diag(HT.adjoint(wgt)))
......@@ -63,6 +63,35 @@ class ConjugationOperator(EndomorphicOperator):
return x.conjugate()
class WeightApplier(EndomorphicOperator):
"""Operator multiplying its input by a given power of dvol.
Parameters
----------
domain: Domain, tuple of domains or DomainTuple
domain of the input field
spaces: list or tuple of int
indices of subdomains for which the weights shall be applied
power: int
the power of to be used for the volume factors
"""
def __init__(self, domain, spaces, power):
from .. import utilities
self._domain = DomainTuple.make(domain)
if spaces is None:
self._spaces = None
else:
self._spaces = utilities.parse_spaces(spaces, len(self._domain))
self._power = int(power)
self._capability = self._all_ops
def apply(self, x, mode):
self._check_input(x, mode)
power = self._power if (mode & 3) else -self._power
return x.weight(power, spaces=self._spaces)
class Realizer(EndomorphicOperator):
"""Operator returning the real component of its input.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment