Commit 0fadc5fd authored by Lukas Platz's avatar Lukas Platz

LMSpace: get_conv_kernel_from_func, SphericalFuncConvolutionOperator

parent 1bb2c55c
Pipeline #43890 passed with stages
in 8 minutes and 7 seconds
import numpy as np
import nifty5 as ift
# Define domains
nside = 64
npix = 12 * nside * nside
domain = ift.HPSpace(nside)
dom_tuple = ift.DomainTuple.make(domain)
codom = domain.get_default_codomain()
# Define test signal (some point sources)
signal_vals = np.zeros(npix, dtype=np.float64)
for i in range(0, npix, npix//12 + 27):
signal_vals[i] = 1.
signal = ift.from_global_data(dom_tuple, signal_vals)
# Define kernel function
def func(theta):
ct = np.cos(theta)
return 1. * np.logical_and(ct > 0.7, ct <= 0.8)
# Create Convolution Operator
conv_op = ift.SphericalFuncConvolutionOperator(dom_tuple, func)
# Convolve, Adjoint-Convolve
conv_signal = conv_op(signal)
cac_signal = conv_op.adjoint_times(conv_signal)
print(signal.integrate(), conv_signal.integrate(), cac_signal.integrate())
# Define delta signal, generate kernel image
delta_vals = np.zeros(npix, dtype=np.float64)
delta_vals[0] = 1.0
delta = ift.from_global_data(domain, delta_vals)
conv_delta = conv_op(delta)
# Plot results
plot = ift.Plot()
plot.add(signal, title='Signal')
plot.add(conv_signal, title='Signal Convolved')
plot.add(cac_signal, title='Signal, Conv, Adj-Conv')
plot.add(conv_delta, title='Kernel')
plot.output()
......@@ -50,6 +50,8 @@ from .operators.value_inserter import ValueInserter
from .operators.energy_operators import (
EnergyOperator, GaussianEnergy, PoissonianEnergy, InverseGammaLikelihood,
BernoulliEnergy, StandardHamiltonian, AveragedEnergy)
from .operators.convolution_operators import (
SphericalConvolutionOperator, SphericalFuncConvolutionOperator)
from .probing import probe_with_posterior_samples, probe_diagonal, \
StatCalculator
......
......@@ -103,6 +103,29 @@ class LMSpace(StructuredDomain):
def get_fft_smoothing_kernel_function(self, sigma):
return lambda x: self._kernel(x, sigma)
def get_conv_kernel_from_func(self, func):
"""Creates a convolution kernel defined by a function.
Assumes the function to be radially symmetric,
e.g. only dependant on theta in radians"""
from .gl_space import GLSpace
from ..operators.harmonic_operators import HarmonicTransformOperator
import pyHealpix
# define azimuthally symmetric spaces for kernel transform
gl = GLSpace(self.lmax + 1, 1)
lm0 = gl.get_default_codomain()
theta = pyHealpix.GL_thetas(gl.nlat)
# evaluate the kernel function at the required thetas
kernel_sphere = Field.from_global_data(gl, func(theta))
# normalize the kernel such that the integral over the sphere is 4pi
kernel_sphere = kernel_sphere * (4 * np.pi / kernel_sphere.integrate())
# compute the spherical harmonic coefficients of the kernel
op = HarmonicTransformOperator(lm0, gl)
kernel_lm = op.adjoint_times(kernel_sphere.weight(1)).to_global_data()
# evaluate the k lengths of the harmonic space
k_lengths = self.get_k_length_array().to_global_data().astype(np.int)
return Field.from_global_data(self, kernel_lm[k_lengths])
@property
def lmax(self):
"""int : maximum allowed :math:`l`
......@@ -147,40 +170,3 @@ class LMSpace(StructuredDomain):
from ..domains.hp_space import HPSpace
if not isinstance(codomain, (GLSpace, HPSpace)):
raise TypeError("codomain must be a GLSpace or HPSpace.")
def apply_spherical_convolution(inp, func):
"""Convolves `inp` with a kernel defined by `func`
which is assumed to be radially symmetric around theta==0."""
import pyHealpix
from .gl_space import GLSpace
from ..operators.harmonic_operators import HarmonicTransformOperator
if len(inp.domain) != 1:
raise ValueError("need exactly one domain")
sph = inp.domain[0]
# define an appropriate harmonic partner space
lm = sph.get_default_codomain()
if not isinstance(lm, LMSpace):
raise TypeError("need a spherical domain")
# define azimuthally symmetric spaces for kernel transform
gl = GLSpace(lm.lmax + 1, 1)
lm0 = gl.get_default_codomain()
theta = pyHealpix.GL_thetas(gl.nlat)
# evaluate the kernel function at the required thetas
kernel_sphere = Field.from_global_data(gl, func(theta))
# normalize the kernel such that the integral over the sphere is 4pi
kernel_sphere = kernel_sphere * (4 * np.pi / kernel_sphere.integrate())
# compute the spherical harmonic coefficients of the kernel
op = HarmonicTransformOperator(lm0, gl)
kernel_lm = op.adjoint_times(kernel_sphere.weight(1)).to_global_data()
# evaluate the k lengths of the harmonic space
k_lengths = lm.get_k_length_array().to_global_data().astype(np.int)
op = HarmonicTransformOperator(lm, sph)
# "inverse" transform to harmonic space
inp_lm = op.adjoint_times(inp.weight(1)).to_global_data()
# multiply the kernel to the coefficients and adjust normalization
inp_lm = inp_lm * kernel_lm[k_lengths] * 4 * np.pi
out = Field.from_global_data(lm, inp_lm)
# back to the original space
return op(out)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2019 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik.
import numpy as np
from ..domains.lm_space import LMSpace
from ..domains.hp_space import HPSpace
from ..domains.gl_space import GLSpace
from .endomorphic_operator import EndomorphicOperator
from .harmonic_operators import HarmonicTransformOperator
from ..domain_tuple import DomainTuple
from ..field import Field
class SphericalFuncConvolutionOperator(EndomorphicOperator):
"""Convolves input with a radially symmetric kernel defined by `func`
Parameters
----------
domain: domain of the operator
func: function defining the sperical convolution kernel
dependant only on theta in radians
"""
def __init__(self, domain, func):
if len(domain) != 1:
raise ValueError("need exactly one domain")
if not isinstance(domain[0], (HPSpace, GLSpace)):
raise TypeError("need a spherical domain")
self._domain = domain
self.lm = domain[0].get_default_codomain()
self.kernel = self.lm.get_conv_kernel_from_func(func)
self.HT = HarmonicTransformOperator(self.lm, domain[0])
self._capability = self.TIMES | self.ADJOINT_TIMES
def apply(self, x, mode):
self._check_input(x, mode)
x_lm = self.HT.adjoint_times(x.weight(1))
x_lm = x_lm * self.kernel * (4. * np.pi)
return self.HT(x_lm)
class SphericalConvolutionOperator(EndomorphicOperator):
"""Convolves with kernel living on the appropriate LMSpace"""
def __init__(self, domain, kernel):
if len(domain) != 1:
raise ValueError("need exactly one domain")
if len(kernel.domain) != 1:
raise ValueError("kernel needs exactly one domain")
if not isinstance(domain[0], (HPSpace, GLSpace)):
raise TypeError("need a spherical domain")
self._domain = domain
self.lm = domain[0].get_default_codomain()
if self.lm != kernel.domain[0]:
raise ValueError("Input domain and kernel are incompatible")
self.kernel = kernel
self.HT = HarmonicTransformOperator(self.lm, domain[0])
self._capability = self.TIMES | self.ADJOINT_TIMES
def apply(self, x, mode):
self._check_input(x, mode)
x_lm = self.HT.adjoint_times(x.weight(1))
x_lm = x_lm * self.kernel * (4. * np.pi)
return self.HT(x_lm)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment