Commit a27637a8 authored by Martin Reinecke's avatar Martin Reinecke
Browse files

introduce HarmonicTransformOperator

parent 8584dcbc
Pipeline #24091 passed with stage
in 4 minutes and 44 seconds
......@@ -19,6 +19,7 @@ from .operators.linear_operator import LinearOperator
from .operators.endomorphic_operator import EndomorphicOperator
from .operators.scaling_operator import ScalingOperator
from .operators.diagonal_operator import DiagonalOperator
from .operators.harmonic_transform_operator import HarmonicTransformOperator
from .operators.fft_operator import FFTOperator
from .operators.fft_smoothing_operator import FFTSmoothingOperator
from .operators.direct_smoothing_operator import DirectSmoothingOperator
......
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2017 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
from __future__ import division
from .minimizer import Minimizer
from ..field import Field
from .. import dobj
class ScipyMinimizer(Minimizer):
"""Scipy-based minimizer
Parameters
----------
controller : IterationController
Object that decides when to terminate the minimization.
"""
def __init__(self, controller, method="trust-ncg"):
super(ScipyMinimizer, self).__init__()
if not dobj.is_numpy():
raise NotImplementedError
self._controller = controller
self._method = method
def __call__(self, energy):
class _MinimizationDone:
pass
class _MinHelper(object):
def __init__(self, controller, energy):
self._controller = controller
self._energy = energy
self._domain = energy.position.domain
def _update(self, x):
pos = Field(self._domain, x.reshape(self._domain.shape))
if (pos.val != self._energy.position.val).any():
self._energy = self._energy.at(pos)
status = self._controller.check(self._energy)
if status != self._controller.CONTINUE:
raise _MinimizationDone
def fun(self, x):
self._update(x)
return self._energy.value
def jac(self, x):
self._update(x)
return self._energy.gradient.val.reshape(-1)
def hessp(self, x, p):
self._update(x)
vec = Field(self._domain, p.reshape(self._domain.shape))
res = self._energy.curvature(vec)
return res.val.reshape(-1)
import scipy.optimize as opt
status = self._controller.start(energy)
if status != self._controller.CONTINUE:
return energy, status
hlp = _MinHelper(self._controller, energy)
options = {'disp': False,
'xtol': 1e-15,
'eps': 1.4901161193847656e-08,
'return_all': False,
'maxiter': None}
options = {'disp': False,
'ftol': 1e-15,
'gtol': 1e-15,
'eps': 1.4901161193847656e-08}
try:
opt.minimize(hlp.fun, energy.position.val.reshape(-1),
method=self._method, jac=hlp.jac,
hessp=hlp.hessp,
options=options)
except _MinimizationDone:
energy = hlp._energy
status = self._controller.check(energy)
return energy, status
return hlp._energy, self._controller.ERROR
......@@ -23,42 +23,22 @@ from .linear_operator import LinearOperator
from .. import dobj
from .. import utilities
from ..field import Field
from ..spaces.gl_space import GLSpace
class FFTOperator(LinearOperator):
"""Transforms between a pair of position and harmonic domains.
Built-in domain pairs are
- a harmonic and a non-harmonic RGSpace (with matching distances)
- a HPSpace and a LMSpace
- a GLSpace and a LMSpace
Within a domain pair, both orderings are possible.
For RGSpaces, the operator provides the full set of operations.
For the sphere-related domains, it only supports the transform from
harmonic to position space and its adjoint; if the operator domain is
harmonic, this will be times() and adjoint_times(), otherwise
inverse_times() and adjoint_inverse_times()
"""Transforms between a pair of position and harmonic RGSpaces.
Parameters
----------
domain: Space or single-element tuple of Spaces
domain: Space, tuple of Spaces or DomainObject
The domain of the data that is input by "times" and output by
"adjoint_times".
target: Space
The target space of the transform operation.
If omitted, a space will be chosen automatically.
space: the index of the space on which the operator should act
If None, it is set to 0 if domain contains exactly one space
target: Space or single-element tuple of Spaces (optional)
The domain of the data that is output by "times" and input by
"adjoint_times".
If omitted, a co-domain will be chosen automatically.
Whenever "domain" is an RGSpace, the codomain (and its parameters) are
uniquely determined.
For GLSpace, HPSpace, and LMSpace, a sensible (but not unique)
co-domain is chosen that should work satisfactorily in most situations,
but for full control, the user should explicitly specify a codomain.
If None, it is set to 0 if domain contains exactly one space.
domain[space] must be an RGSpace.
"""
def __init__(self, domain, target=None, space=None):
......@@ -69,6 +49,8 @@ class FFTOperator(LinearOperator):
self._space = utilities.infer_space(self._domain, space)
adom = self._domain[self._space]
if not isinstance(adom, RGSpace):
raise TypeError("FFTOperator only works on RGSpaces")
if target is None:
target = adom.get_default_codomain()
......@@ -78,47 +60,18 @@ class FFTOperator(LinearOperator):
adom.check_codomain(target)
target.check_codomain(adom)
if isinstance(adom, RGSpace):
self._applyfunc = self._apply_cartesian
self._capability = self._all_ops
import pyfftw
pyfftw.interfaces.cache.enable()
else:
from pyHealpix import sharpjob_d
self._applyfunc = self._apply_spherical
hspc = adom if adom.harmonic else target
pspc = target if adom.harmonic else adom
self.lmax = hspc.lmax
self.mmax = hspc.mmax
self.sjob = sharpjob_d()
self.sjob.set_triangular_alm_info(self.lmax, self.mmax)
if isinstance(pspc, GLSpace):
self.sjob.set_Gauss_geometry(pspc.nlat, pspc.nlon)
else:
self.sjob.set_Healpix_geometry(pspc.nside)
if adom.harmonic:
self._capability = self.TIMES | self.ADJOINT_TIMES
else:
self._capability = (self.INVERSE_TIMES |
self.INVERSE_ADJOINT_TIMES)
import pyfftw
pyfftw.interfaces.cache.enable()
def apply(self, x, mode):
self._check_input(x, mode)
if np.issubdtype(x.dtype, np.complexfloating):
return (self._applyfunc(x.real, mode) +
1j*self._applyfunc(x.imag, mode))
return (self._apply_cartesian(x.real, mode) +
1j*self._apply_cartesian(x.imag, mode))
else:
return self._applyfunc(x, mode)
return self._apply_cartesian(x, mode)
def _apply_cartesian(self, x, mode):
"""
RG -> RG transform method.
Parameters
----------
x : Field
The field to be transformed
"""
from pyfftw.interfaces.numpy_fft import fftn
axes = x.domain.axes[self._space]
tdom = self._target if x.domain == self._domain else self._domain
......@@ -171,47 +124,6 @@ class FFTOperator(LinearOperator):
return Tval
def _slice_p2h(self, inp):
rr = self.sjob.alm2map_adjoint(inp)
assert len(rr) == ((self.mmax+1)*(self.mmax+2))//2 + \
(self.mmax+1)*(self.lmax-self.mmax)
res = np.empty(2*len(rr)-self.lmax-1, dtype=rr[0].real.dtype)
res[0:self.lmax+1] = rr[0:self.lmax+1].real
res[self.lmax+1::2] = np.sqrt(2)*rr[self.lmax+1:].real
res[self.lmax+2::2] = np.sqrt(2)*rr[self.lmax+1:].imag
return res/np.sqrt(np.pi*4)
def _slice_h2p(self, inp):
res = np.empty((len(inp)+self.lmax+1)//2, dtype=(inp[0]*1j).dtype)
assert len(res) == ((self.mmax+1)*(self.mmax+2))//2 + \
(self.mmax+1)*(self.lmax-self.mmax)
res[0:self.lmax+1] = inp[0:self.lmax+1]
res[self.lmax+1:] = np.sqrt(0.5)*(inp[self.lmax+1::2] +
1j*inp[self.lmax+2::2])
res = self.sjob.alm2map(res)
return res/np.sqrt(np.pi*4)
def _apply_spherical(self, x, mode):
axes = x.domain.axes[self._space]
axis = axes[0]
tval = x.val
if dobj.distaxis(tval) == axis:
tval = dobj.redistribute(tval, nodist=(axis,))
distaxis = dobj.distaxis(tval)
p2h = not x.domain[self._space].harmonic
tdom = self._target if x.domain == self._domain else self._domain
func = self._slice_p2h if p2h else self._slice_h2p
idat = dobj.local_data(tval)
odat = np.empty(dobj.local_shape(tdom.shape, distaxis=distaxis),
dtype=x.dtype)
for slice in utilities.get_slice_list(idat.shape, axes):
odat[slice] = func(idat[slice])
odat = dobj.from_local_data(tdom.shape, odat, distaxis)
if distaxis != dobj.distaxis(x.val):
odat = dobj.redistribute(odat, dist=dobj.distaxis(x.val))
return Field(tdom, odat)
@property
def domain(self):
return self._domain
......@@ -222,4 +134,4 @@ class FFTOperator(LinearOperator):
@property
def capability(self):
return self._capability
return self._all_ops
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2017 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
import numpy as np
from ..domain_tuple import DomainTuple
from ..spaces.rg_space import RGSpace
from .linear_operator import LinearOperator
from .. import dobj
from .. import utilities
from ..field import Field
from ..spaces.gl_space import GLSpace
class HarmonicTransformOperator(LinearOperator):
"""Transforms between a harmonic domain and a position space counterpart.
Built-in domain pairs are
- a harmonic and a non-harmonic RGSpace (with matching distances)
- an LMSpace and a LMSpace
- an LMSpace and a GLSpace
The supported operations are times() and adjoint_times().
Parameters
----------
domain: Space, tuple of Spaces or DomainObject
The domain of the data that is input by "times" and output by
"adjoint_times".
target: Space (optional)
The target space of the transform operation.
If omitted, a space will be chosen automatically.
Whenever the input space of the transform is an RGSpace, the codomain
(and its parameters) are uniquely determined.
For LMSpace, a GLSpace of sufficient resolution is chosen.
space: the index of the space on which the operator should act
If None, it is set to 0 if domain contains exactly one space.
domain[space] must be a harmonic space.
"""
def __init__(self, domain, target=None, space=None):
super(HarmonicTransformOperator, self).__init__()
# Initialize domain and target
self._domain = DomainTuple.make(domain)
self._space = utilities.infer_space(self._domain, space)
hspc = self._domain[self._space]
if not hspc.harmonic:
raise TypeError(
"HarmonicTransformOperator only works on a harmonic space")
if target is None:
target = hspc.get_default_codomain()
self._target = [dom for dom in self._domain]
self._target[self._space] = target
self._target = DomainTuple.make(self._target)
hspc.check_codomain(target)
target.check_codomain(hspc)
if isinstance(hspc, RGSpace):
self._applyfunc = self._apply_cartesian
import pyfftw
pyfftw.interfaces.cache.enable()
else:
from pyHealpix import sharpjob_d
self._applyfunc = self._apply_spherical
self.lmax = hspc.lmax
self.mmax = hspc.mmax
self.sjob = sharpjob_d()
self.sjob.set_triangular_alm_info(self.lmax, self.mmax)
if isinstance(target, GLSpace):
self.sjob.set_Gauss_geometry(target.nlat, target.nlon)
else:
self.sjob.set_Healpix_geometry(target.nside)
def apply(self, x, mode):
self._check_input(x, mode)
if np.issubdtype(x.dtype, np.complexfloating):
return (self._applyfunc(x.real, mode) +
1j*self._applyfunc(x.imag, mode))
else:
return self._applyfunc(x, mode)
def _apply_cartesian(self, x, mode):
from pyfftw.interfaces.numpy_fft import fftn
axes = x.domain.axes[self._space]
tdom = self._target if x.domain == self._domain else self._domain
oldax = dobj.distaxis(x.val)
if oldax not in axes: # straightforward, no redistribution needed
ldat = dobj.local_data(x.val)
ldat = utilities.hartley(ldat, axes=axes)
tmp = dobj.from_local_data(x.val.shape, ldat, distaxis=oldax)
elif len(axes) < len(x.shape) or len(axes) == 1:
# we can use one Hartley pass in between the redistributions
tmp = dobj.redistribute(x.val, nodist=axes)
newax = dobj.distaxis(tmp)
ldat = dobj.local_data(tmp)
ldat = utilities.hartley(ldat, axes=axes)
tmp = dobj.from_local_data(tmp.shape, ldat, distaxis=newax)
tmp = dobj.redistribute(tmp, dist=oldax)
else: # two separate, full FFTs needed
# ideal strategy for the moment would be:
# - do real-to-complex FFT on all local axes
# - fill up array
# - redistribute array
# - do complex-to-complex FFT on remaining axis
# - add re+im
# - redistribute back
rem_axes = tuple(i for i in axes if i != oldax)
tmp = x.val
ldat = dobj.local_data(tmp)
ldat = utilities.my_fftn_r2c(ldat, axes=rem_axes)
if oldax != 0:
raise ValueError("bad distribution")
ldat2 = ldat.reshape((ldat.shape[0],
np.prod(ldat.shape[1:])))
shp2d = (x.val.shape[0], np.prod(x.val.shape[1:]))
tmp = dobj.from_local_data(shp2d, ldat2, distaxis=0)
tmp = dobj.transpose(tmp)
ldat2 = dobj.local_data(tmp)
ldat2 = fftn(ldat2, axes=(1,))
ldat2 = ldat2.real+ldat2.imag
tmp = dobj.from_local_data(tmp.shape, ldat2, distaxis=0)
tmp = dobj.transpose(tmp)
ldat2 = dobj.local_data(tmp).reshape(ldat.shape)
tmp = dobj.from_local_data(x.val.shape, ldat2, distaxis=0)
Tval = Field(tdom, tmp)
fct = self._domain[self._space].scalar_dvol()
if fct != 1:
Tval *= fct
return Tval
def _slice_p2h(self, inp):
rr = self.sjob.alm2map_adjoint(inp)
assert len(rr) == ((self.mmax+1)*(self.mmax+2))//2 + \
(self.mmax+1)*(self.lmax-self.mmax)
res = np.empty(2*len(rr)-self.lmax-1, dtype=rr[0].real.dtype)
res[0:self.lmax+1] = rr[0:self.lmax+1].real
res[self.lmax+1::2] = np.sqrt(2)*rr[self.lmax+1:].real
res[self.lmax+2::2] = np.sqrt(2)*rr[self.lmax+1:].imag
return res/np.sqrt(np.pi*4)
def _slice_h2p(self, inp):
res = np.empty((len(inp)+self.lmax+1)//2, dtype=(inp[0]*1j).dtype)
assert len(res) == ((self.mmax+1)*(self.mmax+2))//2 + \
(self.mmax+1)*(self.lmax-self.mmax)
res[0:self.lmax+1] = inp[0:self.lmax+1]
res[self.lmax+1:] = np.sqrt(0.5)*(inp[self.lmax+1::2] +
1j*inp[self.lmax+2::2])
res = self.sjob.alm2map(res)
return res/np.sqrt(np.pi*4)
def _apply_spherical(self, x, mode):
axes = x.domain.axes[self._space]
axis = axes[0]
tval = x.val
if dobj.distaxis(tval) == axis:
tval = dobj.redistribute(tval, nodist=(axis,))
distaxis = dobj.distaxis(tval)
p2h = not x.domain[self._space].harmonic
tdom = self._target if x.domain == self._domain else self._domain
func = self._slice_p2h if p2h else self._slice_h2p
idat = dobj.local_data(tval)
odat = np.empty(dobj.local_shape(tdom.shape, distaxis=distaxis),
dtype=x.dtype)
for slice in utilities.get_slice_list(idat.shape, axes):
odat[slice] = func(idat[slice])
odat = dobj.from_local_data(tdom.shape, odat, distaxis)
if distaxis != dobj.distaxis(x.val):
odat = dobj.redistribute(odat, dist=dobj.distaxis(x.val))
return Field(tdom, odat)
@property
def domain(self):
return self._domain
@property
def target(self):
return self._target
@property
def capability(self):
return self.TIMES | self.ADJOINT_TIMES
......@@ -39,18 +39,17 @@ def _check_adjointness(op, dtype=np.float64):
op.inverse_adjoint_times(f1).vdot(f2),
rtol=1e-8)
_harmonic_spaces = [ift.RGSpace(7, distances=0.2, harmonic=True),
ift.RGSpace((12, 46), distances=(0.2, 0.3), harmonic=True),
ift.LMSpace(17)]
_h_RG_spaces = [ift.RGSpace(7, distances=0.2, harmonic=True),
ift.RGSpace((12, 46), distances=(.2, .3), harmonic=True)]
_h_spaces = _h_RG_spaces + [ift.LMSpace(17)]
_position_spaces = [ift.RGSpace(19, distances=0.7),
ift.RGSpace((1, 2, 3, 6), distances=(0.2, 0.25, 0.34, .8)),
ift.HPSpace(17),
ift.GLSpace(8, 13)]
_p_RG_spaces = [ift.RGSpace(19, distances=0.7),
ift.RGSpace((1, 2, 3, 6), distances=(0.2, 0.25, 0.34, .8))]
_p_spaces = _p_RG_spaces + [ift.HPSpace(17), ift.GLSpace(8, 13)]
class Adjointness_Tests(unittest.TestCase):
@expand(product(_harmonic_spaces, [np.float64, np.complex128]))
@expand(product(_h_spaces, [np.float64, np.complex128]))
def testPPO(self, sp, dtype):
op = ift.PowerProjectionOperator(sp)
_check_adjointness(op, dtype)
......@@ -63,10 +62,15 @@ class Adjointness_Tests(unittest.TestCase):
op = ift.PowerProjectionOperator(sp, ps)
_check_adjointness(op, dtype)
@expand(product(_harmonic_spaces+_position_spaces,
@expand(product(_h_RG_spaces+_p_RG_spaces,
[np.float64, np.complex128]))
def testFFT(self, sp, dtype):
op = ift.FFTOperator(sp)
_check_adjointness(op, dtype)
op = ift.FFTOperator(sp.get_default_codomain())
_check_adjointness(op, dtype)
@expand(product(_h_spaces, [np.float64, np.complex128]))
def testHarmonic(self, sp, dtype):
op = ift.HarmonicTransformOperator(sp)
_check_adjointness(op, dtype)
......@@ -83,6 +83,7 @@ class FFTOperatorTests(unittest.TestCase):
assert_allclose(ift.dobj.to_global_data(inp.val),
ift.dobj.to_global_data(out.val), rtol=tol, atol=tol)
@expand(product([0, 1, 2],
[np.float64, np.float32, np.complex64, np.complex128]))
def test_composed_fft(self, index, dtype):
......@@ -97,63 +98,10 @@ class FFTOperatorTests(unittest.TestCase):
assert_allclose(ift.dobj.to_global_data(inp.val),
ift.dobj.to_global_data(out.val), rtol=tol, atol=tol)
#@expand(product([0, 3, 6, 11, 30],
# [np.float64, np.float32, np.complex64, np.complex128]))
#def test_sht(self, lm, tp):
# tol = _get_rtol(tp)
# a = ift.LMSpace(lmax=lm)
# b = ift.GLSpace(nlat=lm+1)
# fft = ift.FFTOperator(domain=a, target=b)
# inp = ift.Field.from_random(domain=a, random_type='normal',
# std=7, mean=3, dtype=tp)
# out = fft.inverse_times(fft.times(inp))
# assert_allclose(ift.dobj.to_global_data(inp.val),
# ift.dobj.to_global_data(out.val), rtol=tol, atol=tol)
#@expand(product([128, 256],
# [np.float64, np.float32, np.complex64, np.complex128]))
#def test_sht2(self, lm, tp):
# a = ift.LMSpace(lmax=lm)
# b = ift.HPSpace(nside=lm//2)
# fft = ift.FFTOperator(domain=a, target=b)
# inp = ift.Field.from_random(domain=a, random_type='normal',
# std=1, mean=0, dtype=tp)
# out = fft.inverse_times(fft.times(inp))
# assert_allclose(ift.dobj.to_global_data(inp.val),
# ift.dobj.to_global_data(out.val), rtol=1e-3, atol=1e-1)
@expand(product([128, 256],
[np.float64, np.float32, np.complex64, np.complex128]))
def test_dotsht(self, lm, tp):
tol = 10 * _get_rtol(tp)
a = ift.LMSpace(lmax=lm)
b = ift.GLSpace(nlat=lm+1)
fft = ift.FFTOperator(domain=a, target=b)
inp = ift.Field.from_random(domain=a, random_type='normal',
std=1, mean=0, dtype=tp)
out = fft.times(inp)
v1 = np.sqrt(out.vdot(out))
v2 = np.sqrt(inp.vdot(fft.adjoint_times(out)))
assert_allclose(v1, v2, rtol=tol, atol=tol)
@expand(product([128, 256],
[np.float64, np.float32, np.complex64, np.complex128]))
def test_dotsht2(self, lm, tp):
tol = 10 * _get_rtol(tp)
a = ift.LMSpace(lmax=lm)
b = ift.HPSpace(nside=lm//2)
fft = ift.FFTOperator(domain=a, target=b)
inp = ift.Field.from_random(domain=a, random_type='normal',
std=1, mean=0, dtype=tp)
out = fft.times(inp)
v1 = np.sqrt(out.vdot(out))
v2 = np.sqrt(inp.vdot(fft.adjoint_times(out)))
assert_allclose(v1, v2, rtol=tol, atol=tol)
@expand(product([ift.RGSpace(128, distances=3.76, harmonic=True),
ift.RGSpace((15,27), distances=(1.7,3.33), harmonic=True),
ift.RGSpace(73, distances=0.5643),
ift.LMSpace(lmax=30, mmax=25)],
ift.RGSpace(73, distances=0.5643)],
[np.float64, np.float32, np.complex64, np.complex128]))
def test_normalisation(self, space, tp):
tol = 10 * _get_rtol(tp)
......
# This program is free software: you can redistribute it and/or modify