-
Martin Reinecke authoredMartin Reinecke authored
power_space.py 11.40 KiB
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright(C) 2013-2017 Max-Planck-Society
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik
# and financially supported by the Studienstiftung des deutschen Volkes.
#from builtins import str
import ast
import numpy as np
from d2o import distributed_data_object
from ..space import Space
from functools import reduce
class PowerSpace(Space):
""" NIFTY class for spaces of power spectra.
Parameters
----------
harmonic_partner : Space
The harmonic Space of which this is the power space.
distribution_strategy : str *optional*
The distribution strategy used for the distributed_data_objects
derived from this PowerSpace, e.g. the pindex.
(default : 'not')
logarithmic : bool *optional*
True if logarithmic binning should be used (default : None).
nbin : {int, None} *optional*
The number of bins that should be used for power spectrum binning
(default : None).
if nbin == None, then nbin is set to the length of kindex.
binbounds : {list, array-like} *optional*
Boundaries between the power spectrum bins.
(If binbounds has n entries, there will be n+1 bins, the first bin
starting at -inf, the last bin ending at +inf.)
(default : None)
if binbounds == None:
Calculates the bounds from the kindex while applying the
logarithmic and nbin keywords.
Note: if "bindounds" is not None, both "logarithmic" and "nbin" must be
None!
Note: if "binbounds", "logarithmic", and "nbin" are all None, then
"natural" binning is performed, i.e. there will be one bin for every
distinct k-vector length.
Attributes
----------
pindex : distributed_data_object
This holds the information which pixel of the harmonic partner gets
mapped to which power bin
kindex : numpy.ndarray
Sorted array of all k-modes.
rho : numpy.ndarray
The amount of k-modes that get mapped to one power bin is given by
rho.
dim : np.int
Total number of dimensionality, i.e. the number of pixels.
harmonic : bool
Specifies whether the space is a signal or harmonic space.
total_volume : np.float
The total volume of the space.
shape : tuple of np.ints
The shape of the space's data array.
binbounds : tuple or None
Boundaries between the power spectrum bins
Notes
-----
A power space is the result of a projection of a harmonic space where
k-modes of equal length get mapped to one power index.
"""
_powerIndexCache = {}
# ---Overwritten properties and methods---
def __init__(self, harmonic_partner, distribution_strategy='not',
logarithmic=None, nbin=None, binbounds=None):
super(PowerSpace, self).__init__()
self._ignore_for_hash += ['_pindex', '_kindex', '_rho']
if not (isinstance(harmonic_partner, Space) and
harmonic_partner.harmonic):
raise ValueError("harmonic_partner must be a harmonic space.")
self._harmonic_partner = harmonic_partner
# sanity check
if binbounds is not None and not(nbin is None and logarithmic is None):
raise ValueError(
"if binbounds is defined, nbin and logarithmic must be None")
if binbounds is not None:
binbounds = tuple(binbounds)
key = (harmonic_partner, distribution_strategy, logarithmic, nbin,
binbounds)
if self._powerIndexCache.get(key) is None:
distance_array = \
self.harmonic_partner.get_distance_array(distribution_strategy)
temp_binbounds = self._compute_binbounds(
harmonic_partner=self.harmonic_partner,
distribution_strategy=distribution_strategy,
logarithmic=logarithmic,
nbin=nbin,
binbounds=binbounds)
temp_pindex = self._compute_pindex(
harmonic_partner=self.harmonic_partner,
distance_array=distance_array,
binbounds=temp_binbounds,
distribution_strategy=distribution_strategy)
temp_rho = temp_pindex.bincount().get_full_data()
temp_kindex = \
(temp_pindex.bincount(weights=distance_array).get_full_data() /
temp_rho)
self._powerIndexCache[key] = (temp_binbounds,
temp_pindex,
temp_kindex,
temp_rho)
(self._binbounds, self._pindex, self._kindex, self._rho) = \
self._powerIndexCache[key]
@staticmethod
def _compute_binbounds(harmonic_partner, distribution_strategy,
logarithmic, nbin, binbounds):
if logarithmic is None and nbin is None and binbounds is None:
result = None
else:
if binbounds is not None:
bb = np.sort(np.array(binbounds))
else:
if logarithmic is not None:
logarithmic = bool(logarithmic)
if nbin is not None:
nbin = int(nbin)
# equidistant binning (linear or log)
# MR FIXME: this needs to improve
kindex = harmonic_partner.get_unique_distances()
if (logarithmic):
k = np.r_[0, np.log(kindex[1:])]
else:
k = kindex
dk = np.max(k[2:] - k[1:-1]) # minimum dk to avoid empty bins
if(nbin is None):
nbin = int((k[-1] - 0.5 * (k[2] + k[1])) /
dk - 0.5) # maximal nbin
else:
nbin = min(int(nbin), int(
(k[-1] - 0.5 * (k[2] + k[1])) / dk + 2.5))
dk = (k[-1] - 0.5 * (k[2] + k[1])) / (nbin - 2.5)
bb = np.r_[0.5 * (3 * k[1] - k[2]),
0.5 * (k[1] + k[2]) + dk * np.arange(nbin-2)]
if(logarithmic):
bb = np.exp(bb)
result = tuple(bb)
return result
@staticmethod
def _compute_pindex(harmonic_partner, distance_array, binbounds,
distribution_strategy):
# Compute pindex, kindex and rho according to bb
pindex = distributed_data_object(
global_shape=distance_array.shape,
dtype=np.int,
distribution_strategy=distribution_strategy)
if binbounds is None:
binbounds = harmonic_partner.get_natural_binbounds()
pindex.set_local_data(
np.searchsorted(binbounds, distance_array.get_local_data()))
return pindex
def pre_cast(self, x, axes):
""" Casts power spectrum functions to discretized power spectra.
This function takes an array or a function. If it is an array it does
nothing, otherwise it interpretes the function as power spectrum and
evaluates it at every k-mode.
Parameters
----------
x : {array-like, function array-like -> array-like}
power spectrum given either in discretized form or implicitly as a
function
axes : tuple of ints
Specifies the axes of x which correspond to this space. For
explicifying the power spectrum function, this is ignored.
Returns
-------
array-like
discretized power spectrum
"""
return x(self.kindex) if callable(x) else x
# ---Mandatory properties and methods---
def __repr__(self):
return ("PowerSpace(harmonic_partner=%r, distribution_strategy=%r, "
"binbounds=%r)"
% (self.harmonic_partner, self.pindex.distribution_strategy,
self._binbounds))
@property
def harmonic(self):
return True
@property
def shape(self):
return self.kindex.shape
@property
def dim(self):
return self.shape[0]
@property
def total_volume(self):
# every power-pixel has a volume of 1
return float(reduce(lambda x, y: x*y, self.pindex.shape))
def copy(self):
distribution_strategy = self.pindex.distribution_strategy
return self.__class__(harmonic_partner=self.harmonic_partner,
distribution_strategy=distribution_strategy,
binbounds=self._binbounds)
def weight(self, x, power, axes, inplace=False):
reshaper = [1, ] * len(x.shape)
# we know len(axes) is always 1
reshaper[axes[0]] = self.shape[0]
weight = self.rho.reshape(reshaper)
if power != 1:
weight = weight ** np.float(power)
if inplace:
x *= weight
result_x = x
else:
result_x = x*weight
return result_x
def get_distance_array(self, distribution_strategy):
return distributed_data_object(
self.kindex, dtype=np.float64,
distribution_strategy=distribution_strategy)
def get_fft_smoothing_kernel_function(self, sigma):
raise NotImplementedError(
"There is no fft smoothing function for PowerSpace.")
# ---Added properties and methods---
@property
def harmonic_partner(self):
""" Returns the Space of which this is the power space.
"""
return self._harmonic_partner
@property
def binbounds(self):
return self._binbounds
@property
def pindex(self):
""" A distributed_data_object having the shape of the harmonic partner
space containing the indices of the power bin a pixel belongs to.
"""
return self._pindex
@property
def kindex(self):
""" Sorted array of all k-modes.
"""
return self._kindex
@property
def rho(self):
"""Degeneracy factor of the individual k-vectors.
"""
return self._rho
# ---Serialization---
def _to_hdf5(self, hdf5_group):
hdf5_group.attrs['binbounds'] = str(self._binbounds)
hdf5_group.attrs['distribution_strategy'] = \
self._pindex.distribution_strategy
return {
'harmonic_partner': self.harmonic_partner,
}
@classmethod
def _from_hdf5(cls, hdf5_group, repository):
hp = repository.get('harmonic_partner', hdf5_group)
bb = ast.literal_eval(hdf5_group.attrs['binbounds'])
ds = hdf5_group.attrs['distribution_strategy']
return PowerSpace(hp, ds, binbounds=bb)