# NIFTY (Numerical Information Field Theory) has been developed at the # Max-Planck-Institute for Astrophysics. # # Copyright (C) 2015 Max-Planck-Society # # Author: Marco Selig # Project homepage: # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from __future__ import division import numpy as np from nifty.keepers import about from nifty.nifty_core import space, \ point_space, \ field from nifty_minimization import conjugate_gradient from nifty_probing import trace_prober,\ inverse_trace_prober,\ diagonal_prober,\ inverse_diagonal_prober import nifty.nifty_utilities as utilities import nifty.nifty_simple_math as nifty_simple_math # ============================================================================= class operator(object): """ .. __ .. / /_ .. ______ ______ _______ _____ ____ __ / _/ ______ _____ .. / _ | / _ | / __ / / __/ / _ / / / / _ | / __/ .. / /_/ / / /_/ / / /____/ / / / /_/ / / /_ / /_/ / / / .. \______/ / ____/ \______/ /__/ \______| \___/ \______/ /__/ class .. /__/ NIFTY base class for (linear) operators The base NIFTY operator class is an abstract class from which other specific operator subclasses, including those preimplemented in NIFTY (e.g. the diagonal operator class) must be derived. Parameters ---------- domain : space The space wherein valid arguments live. sym : bool, *optional* Indicates whether the operator is self-adjoint or not (default: False) uni : bool, *optional* Indicates whether the operator is unitary or not (default: False) imp : bool, *optional* Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not (default: False) target : space, *optional* The space wherein the operator output lives (default: domain) para : {single object, list of objects}, *optional* This is a freeform list of parameters that derivatives of the operator class can use. Not used in the base operators. (default: None) See Also -------- diagonal_operator : An operator class for handling purely diagonal operators. power_operator : Similar to diagonal_operator but with handy features for dealing with diagonal operators whose diagonal consists of a power spectrum. vecvec_operator : Operators constructed from the outer product of two fields response_operator : Implements a modeled instrument response which translates a signal into data space. projection_operator : An operator that projects out one or more components in a basis, e.g. a spectral band of Fourier components. Attributes ---------- domain : space The space wherein valid arguments live. sym : bool Indicates whether the operator is self-adjoint or not uni : bool Indicates whether the operator is unitary or not imp : bool Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not target : space The space wherein the operator output lives para : {single object, list of objects} This is a freeform list of parameters that derivatives of the operator class can use. Not used in the base operators. """ def __init__(self, domain, codomain=None, sym=False, uni=False, imp=False, target=None, cotarget=None, bare=False): """ Sets the attributes for an operator class instance. Parameters ---------- domain : space The space wherein valid arguments live. sym : bool, *optional* Indicates whether the operator is self-adjoint or not (default: False) uni : bool, *optional* Indicates whether the operator is unitary or not (default: False) imp : bool, *optional* Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not (default: False) target : space, *optional* The space wherein the operator output lives (default: domain) para : {object, list of objects}, *optional* This is a freeform list of parameters that derivatives of the operator class can use. Not used in the base operators. (default: None) Returns ------- None """ # Check if the domain is realy a space if not isinstance(domain, space): raise TypeError(about._errors.cstring( "ERROR: invalid input. domain is not a space.")) self.domain = domain # Parse codomain if self.domain.check_codomain(codomain) == True: self.codomain = codomain else: self.codomain = self.domain.get_codomain() # Cast the symmetric and unitary input self.sym = bool(sym) self.uni = bool(uni) self.bare = bool(bare) # If no target is supplied, we assume that the operator is square # If the operator is symmetric or unitary, we know that the operator # must be square if self.sym or self.uni: target = self.domain cotarget = self.codomain if target is not None: about.warnings.cprint("WARNING: Ignoring target.") elif target is None: target = self.domain cotarget = self.codomain elif isinstance(target, space): self.target = target # Parse cotarget if self.target.check_codomain(cotarget) == True: self.codomain = codomain else: self.codomain = self.domain.get_codomain() else: raise TypeError(about._errors.cstring( "ERROR: invalid input. Target is not a space.")) if self.domain.discrete and self.target.discrete: self.imp = True else: self.imp = bool(imp) def set_val(self, new_val): """ Resets the field values. Parameters ---------- new_val : {scalar, ndarray} New field values either as a constant or an arbitrary array. """ self.val = new_val return self.val def get_val(self): return self.val def _multiply(self, x, **kwargs): # > applies the operator to a given field raise NotImplementedError(about._errors.cstring( "ERROR: no generic instance method 'multiply'.")) def _adjoint_multiply(self, x, **kwargs): # > applies the adjoint operator to a given field raise NotImplementedError(about._errors.cstring( "ERROR: no generic instance method 'adjoint_multiply'.")) def _inverse_multiply(self, x, **kwargs): # > applies the inverse operator to a given field raise NotImplementedError(about._errors.cstring( "ERROR: no generic instance method 'inverse_multiply'.")) def _adjoint_inverse_multiply(self, x, **kwargs): # > applies the inverse adjoint operator to a given field raise NotImplementedError(about._errors.cstring( "ERROR: no generic instance method 'adjoint_inverse_multiply'.")) def _inverse_adjoint_multiply(self, x, **kwargs): # > applies the adjoint inverse operator to a given field raise NotImplementedError(about._errors.cstring( "ERROR: no generic instance method 'inverse_adjoint_multiply'.")) def _briefing(self, x, domain, codomain, inverse): # make sure, that the result_field of the briefing lives in the # given domain and codomain result_field = field(domain=domain, val=x, codomain=codomain, copy=False) # weight if necessary if (not self.imp) and (not domain.discrete) and (not inverse): result_field = result_field.weight(power=1) return result_field def _debriefing(self, x, y, target, cotarget, inverse): # The debriefing takes care that the result field lives in the same # fourier-type domain as the input field assert(isinstance(y, field)) # weight if necessary if (not self.imp) and (not target.discrete) and inverse: y = y.weight(power=-1) # if the operators domain as well as the target have the harmonic # attribute, try to match the result_field to the input_field if hasattr(self.domain, 'harmonic') and \ hasattr(self.target, 'harmonic'): if x.domain.harmonic != y.domain.harmonic: y = y.transform() return y def times(self, x, **kwargs): """ Applies the operator to a given object Parameters ---------- x : {scalar, ndarray, field} Scalars are interpreted as constant arrays, and an array will be interpreted as a field on the domain of the operator. Returns ------- Ox : field Mapped field on the target domain of the operator. """ # prepare y = self._briefing(x, self.domain, self.codomain, inverse=False) # apply operator y = self._multiply(y, **kwargs) # evaluate return self._debriefing(x, y, self.target, self.cotarget, inverse=False) def __call__(self, x, **kwargs): return self.times(x, **kwargs) def adjoint_times(self, x, **kwargs): """ Applies the adjoint operator to a given object. Parameters ---------- x : {scalar, ndarray, field} Scalars are interpreted as constant arrays, and an array will be interpreted as a field on the target space of the operator. Returns ------- OAx : field Mapped field on the domain of the operator. """ # check whether self-adjoint if self.sym: return self.times(x, **kwargs) # check whether unitary if self.uni: return self.inverse_times(x, **kwargs) # prepare y = self._briefing(x, self.target, self.cotarget, inverse=False) # apply operator y = self._adjoint_multiply(y, **kwargs) # evaluate return self._debriefing(x, y, self.domain, self.codomain, inverse=False) def inverse_times(self, x, **kwargs): """ Applies the inverse operator to a given object. Parameters ---------- x : {scalar, ndarray, field} Scalars are interpreted as constant arrays, and an array will be interpreted as a field on the domain space of the operator. Returns ------- OIx : field Mapped field on the target space of the operator. """ # check whether self-inverse if self.sym and self.uni: return self.times(x, **kwargs) # prepare y = self._briefing(x, self.target, self.cotarget, inverse=True) # apply operator y = self._inverse_multiply(y, **kwargs) # evaluate return self._debriefing(x, y, self.domain, self.codomain, inverse=True) def adjoint_inverse_times(self, x, **kwargs): """ Applies the inverse adjoint operator to a given object. Parameters ---------- x : {scalar, ndarray, field} Scalars are interpreted as constant arrays, and an array will be interpreted as a field on the target space of the operator. Returns ------- OAIx : field Mapped field on the domain of the operator. """ # check whether self-adjoint if self.sym: return self.inverse_times(x, **kwargs) # check whether unitary if self.uni: return self.times(x, **kwargs) # prepare y = self._briefing(x, self.domain, self.codomain, inverse=True) # apply operator y = self._adjoint_inverse_multiply(y, **kwargs) # evaluate return self._debriefing(x, y, self.target, self.cotarget, inverse=True) def inverse_adjoint_times(self, x, **kwargs): """ Applies the adjoint inverse operator to a given object. Parameters ---------- x : {scalar, ndarray, field} Scalars are interpreted as constant arrays, and an array will be interpreted as a field on the target space of the operator. Returns ------- OIAx : field Mapped field on the domain of the operator. """ # check whether self-adjoint if self.sym: return self.inverse_times(x, **kwargs) # check whether unitary if self.uni: return self.times(x, **kwargs) # prepare y = self._briefing(x, self.domain, self.codomain, inverse=True) # apply operator y = self._inverse_adjoint_multiply(y, **kwargs) # evaluate return self._debriefing(x, y, self.target, self.cotarget, inverse=True) def tr(self, domain=None, codomain=None, random="pm1", nrun=8, varQ=False, **kwargs): """ Computes the trace of the operator Parameters ---------- domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- tr : float Trace of the operator delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. See Also -------- probing : The class used to perform probing operations """ return trace_prober(operator=self, domain=domain, codomain=codomain, random=random, nrun=nrun, varQ=varQ, **kwargs )() def inverse_tr(self, domain=None, codomain=None, random="pm1", nrun=8, varQ=False, **kwargs): """ Computes the trace of the inverse operator Parameters ---------- domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") nrun : int, *optional* total number of probes (default: 8) varQ : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). Returns ------- tr : float Trace of the inverse operator delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. See Also -------- probing : The class used to perform probing operations """ return inverse_trace_prober(operator=self, domain=domain, codomain=codomain, random=random, nrun=nrun, varQ=varQ, **kwargs )() def diag(self, domain=None, codomain=None, random="pm1", nrun=8, varQ=False, bare=False, **kwargs): """ Computes the diagonal of the operator via probing. Parameters ---------- bare : bool, *optional* Indicatese whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- diag : ndarray The matrix diagonal delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. See Also -------- probing : The class used to perform probing operations Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ diag = diagonal_prober(operator=self, domain=domain, codomain=codomain, random=random, nrun=nrun, varQ=varQ, **kwargs )() if diag is None: about.warnings.cprint("WARNING: forwarding 'NoneType'.") return None if domain is None: domain = diag.domain # weight if ... if (not domain.discrete) and bare: if(isinstance(diag, tuple)): # diag == (diag,variance) return (diag[0].weight(power=-1), diag[1].weight(power=-1)) else: return diag.weight(power=-1) else: return diag def inverse_diag(self, domain=None, codomain=None, random="pm1", nrun=8, varQ=False, bare=False, **kwargs): """ Computes the diagonal of the inverse operator via probing. Parameters ---------- bare : bool, *optional* Indicatese whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- diag : ndarray The diagonal of the inverse matrix delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. See Also -------- probing : The class used to perform probing operations Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ if(domain is None): domain = self.target diag = inverse_diagonal_prober(operator=self, domain=domain, codomain=codomain, random=random, nrun=nrun, varQ=varQ, **kwargs )() if(diag is None): about.infos.cprint("INFO: forwarding 'NoneType'.") return None if domain is None: domain = diag.codomain # weight if ... if not domain.discrete and bare: if(isinstance(diag, tuple)): # diag == (diag,variance) return (diag[0].weight(power=-1), diag[1].weight(power=-1)) else: return diag.weight(power=-1) else: return diag def det(self): """ Computes the determinant of the operator. Returns ------- det : float The determinant """ raise NotImplementedError(about._errors.cstring( "ERROR: no generic instance method 'det'.")) def inverse_det(self): """ Computes the determinant of the inverse operator. Returns ------- det : float The determinant """ raise NotImplementedError(about._errors.cstring( "ERROR: no generic instance method 'inverse_det'.")) def log_det(self): """ Computes the logarithm of the determinant of the operator (if applicable). Returns ------- logdet : float The logarithm of the determinant """ raise NotImplementedError(about._errors.cstring( "ERROR: no generic instance method 'log_det'.")) def tr_log(self): """ Computes the trace of the logarithm of the operator (if applicable). Returns ------- logdet : float The trace of the logarithm """ return self.log_det() def hat(self, bare=False, domain=None, codomain=None, **kwargs): """ Translates the operator's diagonal into a field Parameters ---------- bare : bool, *optional* Indicatese whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- x : field The matrix diagonal as a field living on the operator domain space See Also -------- probing : The class used to perform probing operations Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ if domain is None: domain = self.domain if codomain is None: codomain = self.codomain diag = self.diag(bare=bare, domain=domain, codomain=codomain, var=False, **kwargs) if diag is None: about.infos.cprint("WARNING: forwarding 'NoneType'.") return None return diag def inverse_hat(self, bare=False, domain=None, codomain=None, **kwargs): """ Translates the inverse operator's diagonal into a field Parameters ---------- bare : bool, *optional* Indicatese whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- x : field The matrix diagonal as a field living on the operator domain space See Also -------- probing : The class used to perform probing operations Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ if domain is None: domain = self.target if codomain is None: codomain = self.cotarget diag = self.inverse_diag(bare=bare, domain=domain, codomain=codomain, var=False, **kwargs) if diag is None: about.infos.cprint("WARNING: forwarding 'NoneType'.") return None return diag def hathat(self, domain=None, codomain=None, **kwargs): """ Translates the operator's diagonal into a diagonal operator Parameters ---------- domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- D : diagonal_operator The matrix diagonal as an operator See Also -------- probing : The class used to perform probing operations Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ if domain is None: domain = self.domain if codomain is None: codomain = self.codomain diag = self.diag(bare=False, domain=domain, codomain=codomain, var=False, **kwargs) if diag is None: about.infos.cprint("WARNING: forwarding 'NoneType'.") return None return diagonal_operator(domain=domain, codomain=codomain, diag=diag, bare=False) def inverse_hathat(self, domain=None, codomain=None, **kwargs): """ Translates the inverse operator's diagonal into a diagonal operator Parameters ---------- domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- D : diagonal_operator The diagonal of the inverse matrix as an operator See Also -------- probing : The class used to perform probing operations Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ if domain is None: domain = self.target if codomain is None: codomain = self.cotarget diag = self.inverse_diag(bare=False, domain=domain, codomain=codomain, var=False, **kwargs) if diag is None: about.infos.cprint("WARNING: forwarding 'NoneType'.") return None return diagonal_operator(domain=domain, codomain=codomain, diag=diag, bare=False) def __repr__(self): return "" # ============================================================================= class diagonal_operator(operator): """ .. __ __ __ .. / / /__/ / / .. ____/ / __ ____ __ ____ __ ______ __ ___ ____ __ / / .. / _ / / / / _ / / _ / / _ | / _ | / _ / / / .. / /_/ / / / / /_/ / / /_/ / / /_/ / / / / / / /_/ / / /_ .. \______| /__/ \______| \___ / \______/ /__/ /__/ \______| \___/ operator class .. /______/ NIFTY subclass for diagonal operators Parameters ---------- domain : space, *optional* The space wherein valid arguments live. If no domain is given then the diag parameter *must* be a field and the domain of that field is assumed. (default: None) diag : {scalar, ndarray, field} The diagonal entries of the operator. For a scalar, a constant diagonal is defined having the value provided. If no domain is given, diag must be a field. (default: 1) bare : bool, *optional* whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. The inverse applications of the diagonal operator feature a ``pseudo`` flag indicating if zero divison shall be ignored and return zero instead of causing an error. Attributes ---------- domain : space The space wherein valid arguments live. val : ndarray A field containing the diagonal entries of the matrix. sym : bool Indicates whether the operator is self-adjoint or not uni : bool Indicates whether the operator is unitary or not imp : bool Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not target : space The space wherein the operator output lives """ def __init__(self, domain=None, codomain=None, diag=1, bare=False): """ Sets the standard operator properties and `values`. Parameters ---------- domain : space, *optional* The space wherein valid arguments live. If no domain is given then the diag parameter *must* be a field and the domain of that field is assumed. (default: None) diag : {scalar, ndarray, field}, *optional* The diagonal entries of the operator. For a scalar, a constant diagonal is defined having the value provided. If no domain is given, diag must be a field. (default: 1) bare : bool, *optional* whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) Returns ------- None Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ # Set the domain if domain is None: try: self.domain = diag.domain except(AttributeError): raise TypeError(about._errors.cstring( "ERROR: Explicit or implicit, i.e. via diag domain " + "inupt needed!")) else: self.domain = domain if self.domain.check_codomain(codomain): self.codomain = codomain else: self.codomain = self.domain.get_codomain() self.target = self.domain self.cotarget = self.codomain self.imp = True self.set_diag(new_diag=diag, bare=bare) def set_diag(self, new_diag, bare=False): """ Sets the diagonal of the diagonal operator Parameters ---------- new_diag : {scalar, ndarray, field} The new diagonal entries of the operator. For a scalar, a constant diagonal is defined having the value provided. If no domain is given, diag must be a field. bare : bool, *optional* whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) Returns ------- None """ # Set the diag-val self.val = self.domain.cast(new_diag) # Set the bare-val #TODO Check with Theo self.bare = bare # Weight if necessary if not self.domain.discrete and bare: self.val = self.domain.calc_weight(self.val, power=1) # Check complexity attributes if self.domain.calc_real_Q(self.val) == True: self.sym = True else: self.sym = False # Check if unitary, i.e. identity if (self.val == 1).all() == True: self.uni = True else: self.uni = False def _multiply(self, x, **kwargs): # applies the operator to a given field y = x.copy(domain=self.target, codomain=self.cotarget) y *= self.get_val() return y def _adjoint_multiply(self, x, **kwargs): # applies the adjoint operator to a given field y = x.copy(domain=self.domain, codomain=self.codomain) y *= self.get_val().conjugate() return y def _inverse_multiply(self, x, pseudo=False, **kwargs): # applies the inverse operator to a given field y = x.copy(domain=self.domain, codomain=self.codomain) if (self.get_val() == 0).any(): if not pseudo: raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) else: # raise NotImplementedError( # "ERROR: function not yet implemented!") y /= self.get_val() # TODO: implement this # the following code does not work. np.isnan is needed, # but on a level of fields # y[y == np.nan] = 0 # y[y == np.inf] = 0 else: y /= self.get_val() return y def _adjoint_inverse_multiply(self, x, pseudo=False, **kwargs): # > applies the inverse adjoint operator to a given field y = x.copy(domain=self.target, codomain=self.cotarget) if (self.get_val() == 0).any(): if not pseudo: raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) else: raise NotImplementedError( "ERROR: function not yet implemented!") # TODO: implement this # the following code does not work. np.isnan is needed, # but on a level of fields y /= self.get_val().conjugate() y[y == np.nan] = 0 y[y == np.inf] = 0 else: y /= self.get_val().conjugate() return y def _inverse_adjoint_multiply(self, x, pseudo=False, **kwargs): # > applies the adjoint inverse operator to a given field return self._adjoint_inverse_multiply(x, pseudo=pseudo, **kwargs) def tr(self, varQ=False, **kwargs): """ Computes the trace of the operator Parameters ---------- domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- tr : float Trace of the operator delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. """ tr = self.domain.unary_operation(self.val, 'sum') if varQ: return (tr, 1) else: return tr def inverse_tr(self, varQ=False, **kwargs): """ Computes the trace of the inverse operator Parameters ---------- domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- tr : float Trace of the inverse operator delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. """ if (self.get_val() == 0).any(): raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) inverse_tr = self.domain.unary_operation( self.domain.binary_operation(self.val, 1, 'rdiv', cast=0), 'sum') if varQ: return (inverse_tr, 1) else: return inverse_tr def diag(self, bare=False, domain=None, codomain=None, varQ=False, **kwargs): """ Computes the diagonal of the operator. Parameters ---------- bare : bool, *optional* Indicatese whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- diag : ndarray The matrix diagonal delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ if (domain is None) or (domain == self.domain): if not self.domain.discrete and bare: diag_val = self.domain.calc_weight(self.val, power=-1) else: diag_val = self.val diag = field(self.domain, codomain=self.codomain, val=diag_val) else: diag = super(diagonal_operator, self).diag(bare=bare, domain=domain, codomain=codomain, nrun=1, random='pm1', varQ=False, **kwargs) if varQ: return (diag, diag.domain.cast(1)) else: return diag def inverse_diag(self, bare=False, domain=None, codomain=None, varQ=False, **kwargs): """ Computes the diagonal of the inverse operator. Parameters ---------- bare : bool, *optional* Indicatese whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- diag : ndarray The diagonal of the inverse matrix delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. See Also -------- probing : The class used to perform probing operations Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ if (domain is None) or (domain == self.domain): inverse_val = 1. / self.val if not self.domain.discrete and bare: inverse_diag_val = self.domain.calc_weight(inverse_val, power=-1) else: inverse_diag_val = inverse_val inverse_diag = field(self.domain, codomain=self.codomain, val=inverse_diag_val) else: inverse_diag = super(diagonal_operator, self).inverse_diag(bare=bare, domain=domain, nrun=1, random='pm1', varQ=False, **kwargs) if varQ: return (inverse_diag, inverse_diag.domain.cast(1)) else: return inverse_diag def det(self): """ Computes the determinant of the matrix. Returns ------- det : float The determinant """ if self.uni: # identity return 1. else: return self.domain.unary_operation(self.val, op='prod') def inverse_det(self): """ Computes the determinant of the inverse operator. Returns ------- det : float The determinant """ if self.uni: # identity return 1. det = self.det() if det != 0: return 1. / det else: raise ValueError(about._errors.cstring( "ERROR: singular operator.")) def log_det(self): """ Computes the logarithm of the determinant of the operator. Returns ------- logdet : float The logarithm of the determinant """ if self.uni: # identity return 0 else: return self.domain.unary_operation( nifty_simple_math.log(self.val), op='sum') def get_random_field(self, domain=None, codomain=None): """ Generates a Gaussian random field with variance equal to the diagonal. Parameters ---------- domain : space, *optional* space wherein the field lives (default: None, indicates to use self.domain) target : space, *optional* space wherein the transform of the field lives (default: None, indicates to use target of domain) Returns ------- x : field Random field. """ temp_field = field(domain=self.domain, codomain=self.codomain, random='gau', std=nifty_simple_math.sqrt( self.diag(bare=True).get_val())) if domain is None: domain = self.domain if domain.check_codomain(codomain): codomain = codomain elif domain.check_codomain(self.codomain): codomain = self.codomain else: codomain = domain.get_codomain() return field(domain=domain, val=temp_field, codomain=codomain) # if domain.harmonic != self.domain.harmonic: # temp_field = temp_field.transform(new_domain=domain) # # if self.domain == domain and self.codomain == codomain: # return temp_field # else: # return temp_field.copy(domain=domain, # codomain=codomain) def __repr__(self): return "" class identity_operator(diagonal_operator): def __init__(self, domain, codomain=None, bare=False): super(identity_operator, self).__init__(domain=domain, codomain=codomain, diag=1, bare=bare) def identity(domain, codomain=None): """ Returns an identity operator. The identity operator is represented by a `diagonal_operator` instance, which is applicable to a field-like object; i.e., a scalar, list, array or field. (The identity operator is unrelated to PYTHON's built-in function :py:func:`id`.) Parameters ---------- domain : space The space wherein valid arguments live. Returns ------- id : diagonal_operator The identity operator as a `diagonal_operator` instance. See Also -------- diagonal_operator Examples -------- >>> I = identity(rg_space(8,dist=0.2)) >>> I.diag() array([ 1., 1., 1., 1., 1., 1., 1., 1.]) >>> I.diag(bare=True) array([ 5., 5., 5., 5., 5., 5., 5., 5.]) >>> I.tr() 8.0 >>> I(3) >>> I(3).val array([ 3., 3., 3., 3., 3., 3., 3., 3.]) >>> I(np.arange(8))[:] array([ 0., 1., 2., 3., 4., 5., 6., 7.]) >>> f = I.get_random_field() >>> print(I(f) - f) nifty.field instance - domain = - val = [...] - min.,max. = [0.0, 0.0] - med.,mean = [0.0, 0.0] - target = >>> I.times(f) ## equal to I(f) >>> I.inverse_times(f) """ about.warnings.cprint('WARNING: The identity function is deprecated. ' + 'Use the identity_operator class.') return diagonal_operator(domain=domain, codomain=codomain, diag=1, bare=False) class power_operator(diagonal_operator): """ .. ______ ______ __ __ _______ _____ .. / _ | / _ | | |/\/ / / __ / / __/ .. / /_/ / / /_/ / | / / /____/ / / .. / ____/ \______/ |__/\__/ \______/ /__/ operator class .. /__/ NIFTY subclass for (signal-covariance-type) diagonal operators containing a power spectrum Parameters ---------- domain : space, *optional* The space wherein valid arguments live. If no domain is given then the diag parameter *must* be a field and the domain of that field is assumed. (default: None) spec : {scalar, list, array, field, function} The power spectrum. For a scalar, a constant power spectrum is defined having the value provided. If no domain is given, diag must be a field. (default: 1) bare : bool, *optional* whether the entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: True) pindex : ndarray, *optional* indexing array, obtainable from domain.get_power_indices (default: None) Other Parameters ---------------- log : bool, *optional* Flag specifying if the spectral binning is performed on logarithmic scale or not; if set, the number of used bins is set automatically (if not given otherwise); by default no binning is done (default: None). nbin : integer, *optional* Number of used spectral bins; if given `log` is set to ``False``; integers below the minimum of 3 induce an automatic setting; by default no binning is done (default: None). binbounds : {list, array}, *optional* User specific inner boundaries of the bins, which are preferred over the above parameters; by default no binning is done (default: None). vmin : {scalar, list, ndarray, field}, *optional* Lower limit of the uniform distribution if ``random == "uni"`` (default: 0). Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. Attributes ---------- domain : space The space wherein valid arguments live. val : ndarray A field containing the diagonal entries of the matrix. sym : bool Indicates whether the operator is self-adjoint or not uni : bool Indicates whether the operator is unitary or not imp : bool Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not target : space The space wherein the operator output lives """ def __init__(self, domain, codomain=None, spec=1, bare=True, **kwargs): """ Sets the diagonal operator's standard properties Parameters ---------- domain : space, *optional* The space wherein valid arguments live. If no domain is given then the diag parameter *must* be a field and the domain of that field is assumed. (default: None) spec : {scalar, list, array, field, function} The power spectrum. For a scalar, a constant power spectrum is defined having the value provided. If no domain is given, diag must be a field. (default: 1) bare : bool, *optional* whether the entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: True) pindex : ndarray, *optional* indexing array, obtainable from domain.get_power_indices (default: None) Returns ------- None Other Parameters ---------------- log : bool, *optional* Flag specifying if the spectral binning is performed on logarithmic scale or not; if set, the number of used bins is set automatically (if not given otherwise); by default no binning is done (default: None). nbin : integer, *optional* Number of used spectral bins; if given `log` is set to ``False``; integers below the minimum of 3 induce an automatic setting; by default no binning is done (default: None). binbounds : {list, array}, *optional* User specific inner boundaries of the bins, which are preferred over the above parameters; by default no binning is done (default: None). vmin : {scalar, list, ndarray, field}, *optional* Lower limit of the uniform distribution if ``random == "uni"`` (default: 0). """ # Set the domain if not isinstance(domain, space): raise TypeError(about._errors.cstring( "ERROR: The given domain is not a nifty space.")) self.domain = domain if self.domain.check_codomain(codomain): self.codomain = codomain else: self.codomain = self.domain.get_codomain() # Set the target self.target = self.domain # Set the cotarget self.cotarget = self.codomain # Set imp self.imp = True # Save the kwargs self.kwargs = kwargs # Set the diag self.set_power(new_spec=spec, bare=bare, **kwargs) self.sym = True # check whether identity if(np.all(spec == 1)): self.uni = True else: self.uni = False # The domain is used for calculations of the power-spectrum, not for # actual field values. Therefore the casting of self.val must be switched # off. def set_val(self, new_val): """ Resets the field values. Parameters ---------- new_val : {scalar, ndarray} New field values either as a constant or an arbitrary array. """ self.val = new_val return self.val def get_val(self): return self.val def set_power(self, new_spec, bare=True, pindex=None, **kwargs): """ Sets the power spectrum of the diagonal operator Parameters ---------- newspec : {scalar, list, array, field, function} The entries of the operator. For a scalar, a constant diagonal is defined having the value provided. If no domain is given, diag must be a field. (default: 1) bare : bool whether the entries are `bare` or not (mandatory for the correct incorporation of volume weights) pindex : ndarray, *optional* indexing array, obtainable from domain.get_power_indices (default: None) Returns ------- None Other Parameters ---------------- log : bool, *optional* Flag specifying if the spectral binning is performed on logarithmic scale or not; if set, the number of used bins is set automatically (if not given otherwise); by default no binning is done (default: None). nbin : integer, *optional* Number of used spectral bins; if given `log` is set to ``False``; integers below the minimum of 3 induce an automatic setting; by default no binning is done (default: None). binbounds : {list, array}, *optional* User specific inner boundaries of the bins, which are preferred over the above parameters; by default no binning is done (default: None). """ # Cast the pontentially given pindex. If no pindex was given, # extract it from self.domain using the supplied kwargs. pindex = self._cast_pindex(pindex, **kwargs) # Cast the new powerspectrum function temp_spec = self.domain.enforce_power(new_spec) # Calculate the diagonal try: diag = pindex.apply_scalar_function(lambda x: temp_spec[x], dtype=temp_spec.dtype.type) diag.hermitian = True except(AttributeError): # TODO: update all pindices to d2o's diag = temp_spec[pindex] if self.domain.datamodel == 'np': try: diag = diag.get_full_data() except(AttributeError): pass # Weight if necessary if not self.domain.discrete and bare: self.val = self.domain.calc_weight(diag, power=1) else: self.val = diag # check whether identity if (self.val == 1).all() == True: self.uni = True else: self.uni = False return self.val def _cast_pindex(self, pindex=None, **kwargs): # Update the internal kwargs dict with the given one: temp_kwargs = self.kwargs temp_kwargs.update(kwargs) # Case 1: no pindex given if pindex is None: pindex = self.domain.power_indices.get_index_dict( **temp_kwargs)['pindex'] # Case 2: explicit pindex given else: # TODO: Pindex casting could be done here. No must-have. assert(np.all(np.array(pindex.shape) == np.array(self.domain.get_shape()))) return pindex def get_power(self, bare=True, **kwargs): """ Computes the power spectrum Parameters ---------- bare : bool, *optional* whether the entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: True) pundex : ndarray, *optional* unindexing array, obtainable from domain.get_power_indices (default: None) pindex : ndarray, *optional* indexing array, obtainable from domain.get_power_indices (default: None) Returns ------- spec : ndarray The power spectrum Other Parameters ---------------- log : bool, *optional* Flag specifying if the spectral binning is performed on logarithmic scale or not; if set, the number of used bins is set automatically (if not given otherwise); by default no binning is done (default: None). nbin : integer, *optional* Number of used spectral bins; if given `log` is set to ``False``; integers below the minimum of 3 induce an automatic setting; by default no binning is done (default: None). binbounds : {list, array}, *optional* User specific inner boundaries of the bins, which are preferred over the above parameters; by default no binning is done (default: None). vmin : {scalar, list, ndarray, field}, *optional* Lower limit of the uniform distribution if ``random == "uni"`` (default: 0). """ temp_kwargs = self.kwargs temp_kwargs.update(kwargs) # Weight the diagonal values if necessary if not self.domain.discrete and bare: diag = self.domain.calc_weight(self.val, power=-1) else: diag = self.val # Use the calc_power routine of the domain in order to to stay # independent of the implementation diag = diag**(0.5) power = self.domain.calc_power(diag, **temp_kwargs) return power def get_projection_operator(self, pindex=None, **kwargs): """ Generates a spectral projection operator Parameters ---------- pindex : ndarray indexing array obtainable from domain.get_power_indices (default: None) Returns ------- P : projection_operator Other Parameters ---------------- log : bool, *optional* Flag specifying if the spectral binning is performed on logarithmic scale or not; if set, the number of used bins is set automatically (if not given otherwise); by default no binning is done (default: None). nbin : integer, *optional* Number of used spectral bins; if given `log` is set to ``False``; integers below the minimum of 3 induce an automatic setting; by default no binning is done (default: None). binbounds : {list, array}, *optional* User specific inner boundaries of the bins, which are preferred over the above parameters; by default no binning is done (default: None). vmin : {scalar, list, ndarray, field}, *optional* Lower limit of the uniform distribution if ``random == "uni"`` (default: 0). """ pindex = self._cast_pindex(pindex, **kwargs) return projection_operator(self.domain, codomain=self.codomain, assign=pindex) def __repr__(self): return "" class projection_operator(operator): """ .. __ __ __ .. /__/ / /_ /__/ .. ______ _____ ______ __ _______ _______ / _/ __ ______ __ ___ .. / _ | / __/ / _ | / / / __ / / ____/ / / / / / _ | / _ | .. / /_/ / / / / /_/ / / / / /____/ / /____ / /_ / / / /_/ / / / / / .. / ____/ /__/ \______/ / / \______/ \______/ \___/ /__/ \______/ /__/ /__/ operator class .. /__/ /___/ NIFTY subclass for projection operators Parameters ---------- domain : space The space wherein valid arguments live. assign : ndarray, *optional* Assignments of domain items to projection bands. An array of integers, negative integers are associated with the nullspace of the projection. (default: None) Other Parameters ---------------- log : bool, *optional* Flag specifying if the spectral binning is performed on logarithmic scale or not; if set, the number of used bins is set automatically (if not given otherwise); by default no binning is done (default: None). nbin : integer, *optional* Number of used spectral bins; if given `log` is set to ``False``; integers below the minimum of 3 induce an automatic setting; by default no binning is done (default: None). binbounds : {list, array}, *optional* User specific inner boundaries of the bins, which are preferred over the above parameters; by default no binning is done (default: None). vmin : {scalar, list, ndarray, field}, *optional* Lower limit of the uniform distribution if ``random == "uni"`` (default: 0). Notes ----- The application of the projection operator features a ``band`` keyword specifying a single projection band (see examples), a ``bandsup`` keyword specifying which projection bands to sum up, and a ``split`` keyword. Examples -------- >>> space = point_space(3) >>> P = projection_operator(space, assign=[0, 1, 0]) >>> P.bands() 2 >>> P([1, 2, 3], band=0) # equal to P.times(field(space,val=[1, 2, 3])) >>> P([1, 2, 3], band=0).domain >>> P([1, 2, 3], band=0).val # projection on band 0 (items 0 and 2) array([ 1., 0., 3.]) >>> P([1, 2, 3], band=1).val # projection on band 1 (item 1) array([ 0., 2., 0.]) >>> P([1, 2, 3]) >>> P([1, 2, 3]).domain >>> P([1, 2, 3]).val # projection on all bands array([[ 1., 0., 3.], [ 0., 2., 0.]]) Attributes ---------- domain : space The space wherein valid arguments live. ind : ndarray Assignments of domain items to projection bands. sym : bool Indicates whether the operator is self-adjoint or not uni : bool Indicates whether the operator is unitary or not imp : bool Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not target : space The space wherein the operator output lives """ def __init__(self, domain, assign=None, codomain=None, **kwargs): """ Sets the standard operator properties and `indexing`. Parameters ---------- domain : space The space wherein valid arguments live. assign : ndarray, *optional* Assignments of domain items to projection bands. An array of integers, negative integers are associated with the nullspace of the projection. (default: None) Returns ------- None Other Parameters ---------------- log : bool, *optional* Flag specifying if the spectral binning is performed on logarithmic scale or not; if set, the number of used bins is set automatically (if not given otherwise); by default no binning is done (default: None). nbin : integer, *optional* Number of used spectral bins; if given `log` is set to ``False``; integers below the minimum of 3 induce an automatic setting; by default no binning is done (default: None). binbounds : {list, array}, *optional* User specific inner boundaries of the bins, which are preferred over the above parameters; by default no binning is done (default: None). vmin : {scalar, list, ndarray, field}, *optional* Lower limit of the uniform distribution if ``random == "uni"`` (default: 0). """ # Check the domain if not isinstance(domain, space): raise TypeError(about._errors.cstring( "ERROR: The supplied domain is not a nifty space instance.")) self.domain = domain # Parse codomain if self.domain.check_codomain(codomain): self.codomain = codomain else: self.codomain = self.domain.get_codomain() self.target = self.domain self.cotarget = self.codomain # Cast the assignment if assign is None: try: self.domain.power_indices['pindex'] except AttributeError: assign = np.arange(self.domain.get_dim(), dtype=np.int) self.assign = self.domain.cast(assign, dtype=np.dtype('int'), hermitianize=False) # build indexing self.ind = self.domain.unary_operation(self.assign, op='unique') self.sym = True self.uni = False self.imp = True def bands(self): about.warnings.cprint( "WARNING: projection_operator.bands() is deprecated. " + "Use get_band_num() instead.") return self.get_band_num() def get_band_num(self): """ Computes the number of projection bands Returns ------- bands : int The number of projection bands """ return len(self.ind) def rho(self): """ Computes the number of degrees of freedom per projection band. Returns ------- rho : ndarray The number of degrees of freedom per projection band. """ # Check if the space has some meta-degrees of freedom if self.domain.get_dim() == self.domain.get_dof(): # If not, compute the degeneracy factor directly rho = self.domain.calc_bincount(self.assign) else: meta_mask = self.domain.calc_weight( self.domain.get_meta_volume(split=True), power=-1) rho = self.domain.calc_bincount(self.assign, weights=meta_mask) return rho def _multiply(self, x, bands=None, bandsum=None, **kwargs): """ Applies the operator to a given field. Parameters ---------- x : field Valid input field. band : int, *optional* Projection band whereon to project (default: None). bandsup: {integer, list/array of integers}, *optional* List of projection bands whereon to project and which to sum up. The `band` keyword is prefered over `bandsup` (default: None). Returns ------- Px : field projected field(!) """ if bands is not None: # cast the band if np.isscalar(bands): bands_was_scalar = True else: bands_was_scalar = False bands = np.array(bands, dtype=np.int).flatten() # check for consistency if np.any(bands > self.get_band_num() - 1) or np.any(bands < 0): raise TypeError(about._errors.cstring("ERROR: Invalid bands.")) if bands_was_scalar: new_field = fx * (self.assign == bands[0]) else: # build up the projection results # prepare the projector-carrier carrier = np.empty((len(bands),), dtype=np.object_) for i in xrange(len(bands)): current_band = bands[i] projector = (self.assign == current_band) carrier[i] = projector # Use the carrier and tensor dot to do the projection new_field = x.tensor_product(carrier) return new_field elif bandsum is not None: if np.isscalar(bandsum): bandsum = np.arange(int(bandsum) + 1) else: bandsum = np.array(bandsum, dtype=np.int_).flatten() # check for consistency if np.any(bandsum > self.get_band_num() - 1) or \ np.any(bandsum < 0): raise TypeError(about._errors.cstring( "ERROR: Invalid bandsum.")) new_field = x.copy_empty() # Initialize the projector array, completely projector_sum = (self.assign != self.assign) for i in xrange(len(bandsum)): current_band = bandsum[i] projector = self.domain.binary_operation(self.assign, current_band, 'eq') projector_sum += projector new_field = x * projector_sum return new_field else: return self._multiply(x, bands=self.ind) def _inverse_multiply(self, x, **kwargs): raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) def pseudo_tr(self, x, axis=(), **kwargs): """ Computes the pseudo trace of a given object for all projection bands Parameters ---------- x : {field, operator} The object whose pseudo-trace is to be computed. If the input is a field, the pseudo trace equals the trace of the projection operator mutliplied by a vector-vector operator corresponding to the input field. This is also equal to the pseudo inner product of the field with projected field itself. If the input is a operator, the pseudo trace equals the trace of the projection operator multiplied by the input operator. random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- tr : float Pseudo trace for all projection bands """ # Parse the input # Case 1: x is a field # -> Compute the diagonal of the corresponding vecvec-operator: # x * x^dagger if isinstance(x, field): # check if field is in the same signal/harmonic space as the # domain of the projection operator if self.domain != x.domain: x = x.transform(new_domain=self.domain) vecvec = vecvec_operator(val=x) return self.pseudo_tr(x=vecvec, axis=axis, **kwargs) # Case 2: x is an operator # -> take the diagonal elif isinstance(x, operator): working_field = x.diag(bare=False, domain=self.domain, codomain=self.codomain) if self.domain != working_field.domain: working_field = working_field.transform(new_domain=self.domain) # Case 3: x is something else else: raise TypeError(about._errors.cstring( "ERROR: x must be a field or an operator.")) # Check for hidden degrees of freedom and compensate the trace # accordingly if self.domain.get_dim() != self.domain.get_dof(): working_field *= self.domain.calc_weight( self.domain.get_meta_volume(split=True), power=-1) # prepare the result object projection_result = utilities.field_map( working_field.ishape, lambda z: self.domain.calc_bincount(self.assign, weights=z), working_field.get_val()) projection_result = np.sum(projection_result, axis=axis) return projection_result def __repr__(self): return "" class vecvec_operator(operator): """ .. __ .. __/ /__ .. __ __ _______ _______ __ __ _______ _______ /__ __/ .. | |/ / / __ / / ____/ | |/ / / __ / / ____/ /__/ .. | / / /____/ / /____ | / / /____/ / /____ .. |____/ \______/ \______/ |____/ \______/ \______/ .. operator class NIFTY subclass for vector-vector operators Parameters ---------- domain : space, *optional* The space wherein valid arguments live. If none is given, the space of the field given in val is used. (default: None) val : {scalar, ndarray, field}, *optional* The field from which to construct the operator. For a scalar, a constant field is defined having the value provided. If no domain is given, val must be a field. (default: 1) Attributes ---------- domain : space The space wherein valid arguments live. val : ndarray The field from which the operator is derived. sym : bool Indicates whether the operator is self-adjoint or not uni : bool Indicates whether the operator is unitary or not imp : bool Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not target : space The space wherein the operator output lives. """ def __init__(self, domain=None, codomain=None, val=1): """ Sets the standard operator properties and `values`. Parameters ---------- domain : space, *optional* The space wherein valid arguments live. If none is given, the space of the field given in val is used. (default: None) val : {scalar, ndarray, field}, *optional* The field from which to construct the operator. For a scalar, a constant field is defined having the value provided. If no domain is given, val must be a field. (default: 1) Returns ------- None """ if isinstance(val, field): if domain is None: domain = val.domain if codomain is None: codomain = val.codomain if not isinstance(domain, space): raise TypeError(about._errors.cstring("ERROR: invalid input.")) self.domain = domain if self.domain.check_codomain(codomain): self.codomain = codomain else: self.codomain = self.domain.get_codomain() self.target = self.domain self.cotarget = self.codomain self.val = field(domain=self.domain, codomain=self.codomain, val=val) self.sym = True self.uni = False self.imp = True def set_val(self, new_val, copy=False): """ Sets the field values of the operator Parameters ---------- newval : {scalar, ndarray, field} The new field values. For a scalar, a constant diagonal is defined having the value provided. If no domain is given, diag must be a field. (default: 1) Returns ------- None """ self.val = self.val.set_val(new_val=new_val, copy=copy) def _multiply(self, x, **kwargs): y = x.copy_empty(domain=self.target, codomain=self.cotarget) y.set_val(new_val=self.val, copy=True) y *= self.val.dot(x, axis=()) return y def _inverse_multiply(self, x, **kwargs): raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) def tr(self, domain=None, axis=None, **kwargs): """ Computes the trace of the operator Parameters ---------- domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- tr : float Trace of the operator delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. """ if domain is None or domain == self.domain: return self.val.dot(self.val, axis=axis) else: return super(vecvec_operator, self).tr(domain=domain, **kwargs) def inverse_tr(self): """ Inverse is ill-defined for this operator. """ raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) def diag(self, bare=False, domain=None, **kwargs): """ Computes the diagonal of the operator. Parameters ---------- bare : bool, *optional* Indicatese whether the diagonal entries are `bare` or not (mandatory for the correct incorporation of volume weights) (default: False) domain : space, *optional* space wherein the probes live (default: self.domain) target : space, *optional* space wherein the transform of the probes live (default: None, applies target of the domain) random : string, *optional* Specifies the pseudo random number generator. Valid options are "pm1" for a random vector of +/-1, or "gau" for a random vector with entries drawn from a Gaussian distribution with zero mean and unit variance. (default: "pm1") ncpu : int, *optional* number of used CPUs to use. (default: 2) nrun : int, *optional* total number of probes (default: 8) nper : int, *optional* number of tasks performed by one process (default: 1) var : bool, *optional* Indicates whether to additionally return the probing variance or not (default: False). save : bool, *optional* whether all individual probing results are saved or not (default: False) path : string, *optional* path wherein the results are saved (default: "tmp") prefix : string, *optional* prefix for all saved files (default: "") loop : bool, *optional* Indicates whether or not to perform a loop i.e., to parallelise (default: False) Returns ------- diag : ndarray The matrix diagonal delta : float, *optional* Probing variance of the trace. Returned if `var` is True in of probing case. Notes ----- The ambiguity of `bare` or non-bare diagonal entries is based on the choice of a matrix representation of the operator in question. The naive choice of absorbing the volume weights into the matrix leads to a matrix-vector calculus with the non-bare entries which seems intuitive, though. The choice of keeping matrix entries and volume weights separate deals with the bare entries that allow for correct interpretation of the matrix entries; e.g., as variance in case of an covariance operator. """ if domain is None or (domain == self.domain): diag_val = self.val * self.val.conjugate() # bare diagonal # weight if ... if not self.domain.discrete and not bare: diag_val = diag_val.weight(power=1, overwrite=True) return diag_val else: return super(vecvec_operator, self).diag(bare=bare, domain=domain, **kwargs) def inverse_diag(self): """ Inverse is ill-defined for this operator. """ raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) def det(self): """ Computes the determinant of the operator. Returns ------- det : 0 The determinant """ return 0 def inverse_det(self): """ Inverse is ill-defined for this operator. """ raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) def log_det(self): """ Logarithm of the determinant is ill-defined for this singular operator. """ raise AttributeError(about._errors.cstring( "ERROR: singular operator.")) def __repr__(self): return "" class response_operator(operator): """ .. _____ _______ _______ ______ ______ __ ___ _______ _______ .. / __/ / __ / / _____/ / _ | / _ | / _ | / _____/ / __ / .. / / / /____/ /_____ / / /_/ / / /_/ / / / / / /_____ / / /____/ .. /__/ \______/ /_______/ / ____/ \______/ /__/ /__/ /_______/ \______/ operator class .. /__/ NIFTY subclass for response operators (of a certain family) Any response operator handles Gaussian convolutions, itemwise masking, and selective mappings. Parameters ---------- domain : space The space wherein valid arguments live. sigma : float, *optional* The standard deviation of the Gaussian kernel. Zero indicates no convolution. (default: 0) mask : {scalar, ndarray}, *optional* Masking values for arguments (default: 1) assign : {list, ndarray}, *optional* Assignments of codomain items to domain items. A list of indices/ index tuples or a one/ two-dimensional array. (default: None) den : bool, *optional* Whether to consider the arguments as densities or not. Mandatory for the correct incorporation of volume weights. (default: False) target : space, *optional* The space wherein the operator output lives (default: domain) Attributes ---------- domain : space The space wherein valid arguments live. sym : bool Indicates whether the operator is self-adjoint or not. uni : bool Indicates whether the operator is unitary or not. imp : bool Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not. target : space The space wherein the operator output lives sigma : float The standard deviation of the Gaussian kernel. Zero indicates no convolution. mask : {scalar, ndarray} Masking values for arguments assign : {list, ndarray} Assignments of codomain items to domain items. A list of indices/ index tuples or a one/ two-dimensional array. den : bool Whether to consider the arguments as densities or not. Mandatory for the correct incorporation of volume weights. """ def __init__(self, domain, codomain=None, sigma=0, mask=1, assign=None, den=False, target=None, cotarget=None): """ Sets the standard properties and `density`, `sigma`, `mask` and `assignment(s)`. Parameters ---------- domain : space The space wherein valid arguments live. sigma : float, *optional* The standard deviation of the Gaussian kernel. Zero indicates no convolution. (default: 0) mask : {scalar, ndarray}, *optional* Masking values for arguments (default: 1) assign : {list, ndarray}, *optional* Assignments of codomain items to domain items. A list of indices/ index tuples or a one/ two-dimensional array. (default: None) den : bool, *optional* Whether to consider the arguments as densities or not. Mandatory for the correct incorporation of volume weights. (default: False) target : space, *optional* The space wherein the operator output lives (default: domain) Returns ------- None """ if not isinstance(domain, space): raise TypeError(about._errors.cstring( "ERROR: The domain must be a space instance.")) self.domain = domain if self.domain.check_codomain(codomain): self.codomain = codomain else: self.codomain = self.domain.get_codomain() self.sym = False self.uni = False self.imp = False self.den = bool(den) self.set_mask(new_mask=mask) # check sigma self.sigma = np.float(sigma) if sigma < 0: raise ValueError(about._errors.cstring("ERROR: invalid sigma.")) # check assignment(s) if assign is None: assignments = self.domain.get_dim() self.assign = None elif isinstance(assign, list): # check that the advanced indexing entries are either scalar # or all have the same size shape_list = map(np.shape, assign) shape_list.remove(()) if len(self.domain.get_shape()) == 1: if len(shape_list) == 0: assignments = len(assign) elif len(shape_list) == 1: assignments = len(assign[0]) else: raise ValueError(about._errors.cstring( "ERROR: Wrong number of indices!")) else: if len(assign) != len(self.domain.get_shape()): raise ValueError(about._errors.cstring( "ERROR: Wrong number of indices!")) elif shape_list == []: raise ValueError(about._errors.cstring( "ERROR: Purely scalar entries in the assign list " + "are only valid for one-dimensional fields!")) elif not all([x == shape_list[0] for x in shape_list]): raise ValueError(about._errors.cstring( "ERROR: Non-scalar entries of assign list all must " + "have the same shape!")) else: assignments = np.prod(shape_list[0]) self.assign = assign else: raise ValueError(about._errors.cstring( "ERROR: assign must be None or list of arrays!")) if target is None: # set target target = point_space(assignments, dtype=self.domain.dtype, datamodel=self.domain.datamodel) else: # check target if not isinstance(target, space): raise TypeError(about._errors.cstring( "ERROR: Given target is not a nifty space")) elif not target.discrete: raise ValueError(about._errors.cstring( "ERROR: Given target must be a discrete space!")) elif len(target.get_shape()) > 1: raise ValueError(about._errors.cstring( "ERROR: Given target must be a one-dimensional space.")) elif assignments != target.get_dim(): raise ValueError(about._errors.cstring( "ERROR: dimension mismatch ( " + str(assignments) + " <> " + str(target.get_dim(split=False)) + " ).")) self.target = target if self.target.check_codomain(cotarget): self.cotarget = cotarget else: self.cotarget = self.target.get_codomain() def set_sigma(self, new_sigma): """ Sets the standard deviation of the response operator, indicating the amount of convolution. Parameters ---------- sigma : float The standard deviation of the Gaussian kernel. Zero indicates no convolution. Returns ------- None """ # check sigma if new_sigma < 0: raise ValueError(about._errors.cstring("ERROR: invalid sigma.")) self.sigma = np.float(new_sigma) def set_mask(self, new_mask): """ Sets the masking values of the response operator Parameters ---------- newmask : {scalar, ndarray} masking values for arguments Returns ------- None """ if np.isscalar(new_mask): self.mask = np.bool(new_mask) else: self.mask = self.domain.cast(new_mask, dtype=np.dtype('bool'), hermitianize=False) def _multiply(self, x, **kwargs): # smooth y = self.domain.calc_smooth(x.val, sigma=self.sigma) # mask y *= self.mask # assign and return if self.assign is not None: val = y[self.assign] else: try: val = y.flatten(inplace=True) except TypeError: val = y.flatten() return field(self.target, val=val, codomain=self.cotarget) def _adjoint_multiply(self, x, **kwargs): if self.assign is None: y = self.domain.cast(x.val) else: y = self.domain.cast(0) y[self.assign] = x.val y *= self.mask y = self.domain.calc_smooth(y, sigma=self.sigma) return field(self.domain, val=y, codomain=self.codomain) def _briefing(self, x, domain, codomain, inverse): # make sure, that the result_field of the briefing lives in the # given domain and codomain result_field = field(domain=domain, val=x, codomain=codomain, copy=False) # weight if necessary if (not self.imp) and (not domain.discrete) and (not inverse) and \ self.den: result_field = result_field.weight(power=1) return result_field def _debriefing(self, x, y, target, cotarget, inverse): # The debriefing takes care that the result field lives in the same # fourier-type domain as the input field assert(isinstance(y, field)) # weight if necessary if (not self.imp) and (not target.discrete) and \ (not self.den ^ inverse): y = y.weight(power=-1) return y # # # # > evaluates x and y after `multiply` # if y is None: # return None # else: # # inspect y # if not isinstance(y, field): # y = field(target, codomain=cotarget, val=y) # elif y.domain != target: # raise ValueError(about._errors.cstring( # "ERROR: invalid output domain.")) # # weight if ... # if (not self.imp) and (not target.discrete) and \ # (not self.den ^ inverse): # y = y.weight(power=-1) # # inspect x # if isinstance(x, field): # # repair if the originally field was living in the codomain # # of the operators domain # if self.domain == self.target == x.codomain: # y = y.transform(new_domain=x.domain) # if x.domain == y.domain and (x.codomain != y.codomain): # y.set_codomain(new_codomain=x.codomain) # return y def __repr__(self): return "" class invertible_operator(operator): """ .. __ __ __ __ __ .. /__/ / /_ /__/ / / / / .. __ __ ___ __ __ _______ _____ / _/ __ / /___ / / _______ .. / / / _ || |/ / / __ / / __/ / / / / / _ | / / / __ / .. / / / / / / | / / /____/ / / / /_ / / / /_/ / / /_ / /____/ .. /__/ /__/ /__/ |____/ \______/ /__/ \___/ /__/ \______/ \___/ \______/ operator class NIFTY subclass for invertible, self-adjoint (linear) operators The invertible operator class is an abstract class for self-adjoint or symmetric (linear) operators from which other more specific operator subclassescan be derived. Such operators inherit an automated inversion routine, namely conjugate gradient. Parameters ---------- domain : space The space wherein valid arguments live. uni : bool, *optional* Indicates whether the operator is unitary or not. (default: False) imp : bool, *optional* Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not (default: False). para : {single object, tuple/list of objects}, *optional* This is a freeform tuple/list of parameters that derivatives of the operator class can use (default: None). See Also -------- operator Notes ----- This class is not meant to be instantiated. Operator classes derived from this one only need a `_multiply` or `_inverse_multiply` instance method to perform the other. However, one of them needs to be defined. Attributes ---------- domain : space The space wherein valid arguments live. sym : bool Indicates whether the operator is self-adjoint or not. uni : bool Indicates whether the operator is unitary or not. imp : bool Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not. target : space The space wherein the operator output lives. para : {single object, list of objects} This is a freeform tuple/list of parameters that derivatives of the operator class can use. Not used in the base operators. """ def __init__(self, domain, codomain=None, uni=False, imp=False): """ Sets the standard operator properties. Parameters ---------- domain : space The space wherein valid arguments live. uni : bool, *optional* Indicates whether the operator is unitary or not. (default: False) imp : bool, *optional* Indicates whether the incorporation of volume weights in multiplications is already implemented in the `multiply` instance methods or not (default: False). para : {single object, tuple/list of objects}, *optional* This is a freeform tuple/list of parameters that derivatives of the operator class can use (default: None). """ if not isinstance(domain, space): raise TypeError(about._errors.cstring("ERROR: invalid input.")) self.domain = domain if self.domain.check_codomain(codomain): self.codomain = codomain else: self.codomain = self.domain.get_codomain() self.sym = True self.uni = bool(uni) if(self.domain.discrete): self.imp = True else: self.imp = bool(imp) self.target = self.domain self.cotarget = self.codomain def _multiply(self, x, force=False, W=None, spam=None, reset=None, note=False, x0=None, tol=1E-4, clevel=1, limii=None, **kwargs): """ Applies the invertible operator to a given field by invoking a conjugate gradient. Parameters ---------- x : field Valid input field. force : bool Indicates wheter to return a field instead of ``None`` in case the conjugate gradient fails. Returns ------- OIIx : field Mapped field with suitable domain. See Also -------- conjugate_gradient Other Parameters ---------------- W : {operator, function}, *optional* Operator `W` that is a preconditioner on `A` and is applicable to a field (default: None). spam : function, *optional* Callback function which is given the current `x` and iteration counter each iteration (default: None). reset : integer, *optional* Number of iterations after which to restart; i.e., forget previous conjugated directions (default: sqrt(b.get_dim())). note : bool, *optional* Indicates whether notes are printed or not (default: False). x0 : field, *optional* Starting guess for the minimization. tol : scalar, *optional* Tolerance specifying convergence; measured by current relative residual (default: 1E-4). clevel : integer, *optional* Number of times the tolerance should be undershot before exiting (default: 1). limii : integer, *optional* Maximum number of iterations performed (default: 10 * b.get_dim()). """ x_, convergence = conjugate_gradient(self.inverse_times, x, W=W, spam=spam, reset=reset, note=note)(x0=x0, tol=tol, clevel=clevel, limii=limii) # check convergence if not convergence: if not force or x_ is None: return None about.warnings.cprint("WARNING: conjugate gradient failed.") # TODO: A weighting here shoud be wrong, as this is done by # the (de)briefing methods -> Check! # # weight if ... # if not self.imp: # continiuos domain/target # x_.weight(power=-1, overwrite=True) return x_ def _inverse_multiply(self, x, force=False, W=None, spam=None, reset=None, note=False, x0=None, tol=1E-4, clevel=1, limii=None, **kwargs): """ Applies the inverse of the invertible operator to a given field by invoking a conjugate gradient. Parameters ---------- x : field Valid input field. force : bool Indicates wheter to return a field instead of ``None`` in case the conjugate gradient fails. Returns ------- OIx : field Mapped field with suitable domain. See Also -------- conjugate_gradient Other Parameters ---------------- W : {operator, function}, *optional* Operator `W` that is a preconditioner on `A` and is applicable to a field (default: None). spam : function, *optional* Callback function which is given the current `x` and iteration counter each iteration (default: None). reset : integer, *optional* Number of iterations after which to restart; i.e., forget previous conjugated directions (default: sqrt(b.get_dim())). note : bool, *optional* Indicates whether notes are printed or not (default: False). x0 : field, *optional* Starting guess for the minimization. tol : scalar, *optional* Tolerance specifying convergence; measured by current relative residual (default: 1E-4). clevel : integer, *optional* Number of times the tolerance should be undershot before exiting (default: 1). limii : integer, *optional* Maximum number of iterations performed (default: 10 * b.get_dim()). """ x_, convergence = conjugate_gradient(self.times, x, W=W, spam=spam, reset=reset, note=note)(x0=x0, tol=tol, clevel=clevel, limii=limii) # check convergence if not convergence: if not force or x_ is None: return None about.warnings.cprint("WARNING: conjugate gradient failed.") # TODO: A weighting here shoud be wrong, as this is done by # the (de)briefing methods -> Check! # # weight if ... # if not self.imp: # continiuos domain/target # x_.weight(power=1, overwrite=True) return x_ def __repr__(self): return "" class propagator_operator(operator): """ .. __ .. / /_ .. _______ _____ ______ ______ ____ __ ____ __ ____ __ / _/ ______ _____ .. / _ / / __/ / _ | / _ | / _ / / _ / / _ / / / / _ | / __/ .. / /_/ / / / / /_/ / / /_/ / / /_/ / / /_/ / / /_/ / / /_ / /_/ / / / .. / ____/ /__/ \______/ / ____/ \______| \___ / \______| \___/ \______/ /__/ operator class .. /__/ /__/ /______/ NIFTY subclass for propagator operators (of a certain family) The propagator operators :math:`D` implemented here have an inverse formulation like :math:`(S^{-1} + M)`, :math:`(S^{-1} + N^{-1})`, or :math:`(S^{-1} + R^\dagger N^{-1} R)` as appearing in Wiener filter theory. Parameters ---------- S : operator Covariance of the signal prior. M : operator Likelihood contribution. R : operator Response operator translating signal to (noiseless) data. N : operator Covariance of the noise prior or the likelihood, respectively. See Also -------- conjugate_gradient Notes ----- The propagator will puzzle the operators `S` and `M` or `R`, `N` or only `N` together in the predefined from, a domain is set automatically. The application of the inverse is done by invoking a conjugate gradient. Note that changes to `S`, `M`, `R` or `N` auto-update the propagator. Examples -------- >>> f = field(rg_space(4), val=[2, 4, 6, 8]) >>> S = power_operator(f.target, spec=1) >>> N = diagonal_operator(f.domain, diag=1) >>> D = propagator_operator(S=S, N=N) # D^{-1} = S^{-1} + N^{-1} >>> D(f).val array([ 1., 2., 3., 4.]) Attributes ---------- domain : space A space wherein valid arguments live. codomain : space An alternative space wherein valid arguments live; commonly the codomain of the `domain` attribute. sym : bool Indicates that the operator is self-adjoint. uni : bool Indicates that the operator is not unitary. imp : bool Indicates that volume weights are implemented in the `multiply` instance methods. target : space The space wherein the operator output lives. _A1 : {operator, function} Application of :math:`S^{-1}` to a field. _A2 : {operator, function} Application of all operations not included in `A1` to a field. RN : {2-tuple of operators}, *optional* Contains `R` and `N` if given. """ def __init__(self, S=None, M=None, R=None, N=None): """ Sets the standard operator properties and `codomain`, `_A1`, `_A2`, and `RN` if required. Parameters ---------- S : operator Covariance of the signal prior. M : operator Likelihood contribution. R : operator Response operator translating signal to (noiseless) data. N : operator Covariance of the noise prior or the likelihood, respectively. """ # parse the signal prior covariance if not isinstance(S, operator): raise ValueError(about._errors.cstring( "ERROR: The given S is not an operator.")) self.S = S self.S_inverse_times = self.S.inverse_times # take signal-space domain from S as the domain for D S_is_harmonic = False if hasattr(S.domain, 'harmonic'): if S.domain.harmonic: S_is_harmonic = True if S_is_harmonic: self.domain = S.codomain self.codomain = S.domain else: self.domain = S.domain self.codomain = S.codomain self.target = self.domain self.cotarget = self.codomain # build up the likelihood contribution (self.M_times, M_domain, M_codomain, M_target, M_cotarget) = self._build_likelihood_contribution(M, R, N) # assert that S and M have matching domains if not (self.domain == M_domain and self.codomain == M_codomain and self.target == M_target and self.cotarget == M_cotarget): raise ValueError(about._errors.cstring( "ERROR: The (co)domains and (co)targets of the prior " + "signal covariance and the likelihood contribution must be " + "the same in the sense of '=='.")) self.sym = True self.uni = False self.imp = True def _build_likelihood_contribution(self, M, R, N): # if a M is given, return its times method and its domains # supplier and discard R and N if M is not None: return (M.times, M.domain, M.codomain, M.target, M.cotarget) if N is not None: if R is not None: return (lambda z: R.adjoint_times(N.inverse_times(R.times(z))), R.domain, R.codomain, R.domain, R.codomain) else: return (N.inverse_times, N.domain, N.codomain, N.target, N.cotarget) else: raise ValueError(about._errors.cstring( "ERROR: At least M or N must be given.")) def _multiply(self, x, W=None, spam=None, reset=None, note=False, x0=None, tol=1E-4, clevel=1, limii=None, **kwargs): if W is None: W = self.S (result, convergence) = conjugate_gradient(self._inverse_multiply, x, W=W, spam=spam, reset=reset, note=note)(x0=x0, tol=tol, clevel=clevel, limii=limii) # evaluate if not convergence: about.warnings.cprint("WARNING: conjugate gradient failed.") return result def _inverse_multiply(self, x, **kwargs): result = self.S_inverse_times(x) result += self.M_times(x) return result class propagator_operator_old(operator): """ .. __ .. / /_ .. _______ _____ ______ ______ ____ __ ____ __ ____ __ / _/ ______ _____ .. / _ / / __/ / _ | / _ | / _ / / _ / / _ / / / / _ | / __/ .. / /_/ / / / / /_/ / / /_/ / / /_/ / / /_/ / / /_/ / / /_ / /_/ / / / .. / ____/ /__/ \______/ / ____/ \______| \___ / \______| \___/ \______/ /__/ operator class .. /__/ /__/ /______/ NIFTY subclass for propagator operators (of a certain family) The propagator operators :math:`D` implemented here have an inverse formulation like :math:`(S^{-1} + M)`, :math:`(S^{-1} + N^{-1})`, or :math:`(S^{-1} + R^\dagger N^{-1} R)` as appearing in Wiener filter theory. Parameters ---------- S : operator Covariance of the signal prior. M : operator Likelihood contribution. R : operator Response operator translating signal to (noiseless) data. N : operator Covariance of the noise prior or the likelihood, respectively. See Also -------- conjugate_gradient Notes ----- The propagator will puzzle the operators `S` and `M` or `R`, `N` or only `N` together in the predefined from, a domain is set automatically. The application of the inverse is done by invoking a conjugate gradient. Note that changes to `S`, `M`, `R` or `N` auto-update the propagator. Examples -------- >>> f = field(rg_space(4), val=[2, 4, 6, 8]) >>> S = power_operator(f.target, spec=1) >>> N = diagonal_operator(f.domain, diag=1) >>> D = propagator_operator(S=S, N=N) # D^{-1} = S^{-1} + N^{-1} >>> D(f).val array([ 1., 2., 3., 4.]) Attributes ---------- domain : space A space wherein valid arguments live. codomain : space An alternative space wherein valid arguments live; commonly the codomain of the `domain` attribute. sym : bool Indicates that the operator is self-adjoint. uni : bool Indicates that the operator is not unitary. imp : bool Indicates that volume weights are implemented in the `multiply` instance methods. target : space The space wherein the operator output lives. _A1 : {operator, function} Application of :math:`S^{-1}` to a field. _A2 : {operator, function} Application of all operations not included in `A1` to a field. RN : {2-tuple of operators}, *optional* Contains `R` and `N` if given. """ def __init__(self, S=None, M=None, R=None, N=None): """ Sets the standard operator properties and `codomain`, `_A1`, `_A2`, and `RN` if required. Parameters ---------- S : operator Covariance of the signal prior. M : operator Likelihood contribution. R : operator Response operator translating signal to (noiseless) data. N : operator Covariance of the noise prior or the likelihood, respectively. """ # check signal prior covariance if(S is None): raise Exception(about._errors.cstring( "ERROR: insufficient input.")) elif(not isinstance(S, operator)): raise ValueError(about._errors.cstring("ERROR: invalid input.")) space1 = S.domain # check likelihood (pseudo) covariance if(M is None): if(N is None): raise Exception(about._errors.cstring( "ERROR: insufficient input.")) elif(not isinstance(N, operator)): raise ValueError(about._errors.cstring( "ERROR: invalid input.")) if(R is None): space2 = N.domain elif(not isinstance(R, operator)): raise ValueError(about._errors.cstring( "ERROR: invalid input.")) else: space2 = R.domain elif(not isinstance(M, operator)): raise ValueError(about._errors.cstring("ERROR: invalid input.")) else: space2 = M.domain # set spaces self.domain = space2 if(self.domain.check_codomain(space1)): self.codomain = space1 else: raise ValueError(about._errors.cstring("ERROR: invalid input.")) self.target = self.domain self.cotarget = self.codomain # define A1 == S_inverse self.S = S if(isinstance(S, diagonal_operator)): self._A1 = S._inverse_multiply # S.imp == True else: self._A1 = S.inverse_times # define A2 == M == R_adjoint N_inverse R == N_inverse if(M is None): if(R is not None): self.RN = (R, N) if(isinstance(N, diagonal_operator)): self._A2 = self._standard_M_times_1 else: self._A2 = self._standard_M_times_2 elif(isinstance(N, diagonal_operator)): self._A2 = N._inverse_multiply # N.imp == True else: self._A2 = N.inverse_times elif(isinstance(M, diagonal_operator)): self._A2 = M._multiply # M.imp == True else: self._A2 = M.times self.sym = True self.uni = False self.imp = True # applies > R_adjoint N_inverse R assuming N is diagonal def _standard_M_times_1(self, x, **kwargs): # N.imp = True return self.RN[0].adjoint_times(self.RN[1]._inverse_multiply(self.RN[0].times(x))) def _standard_M_times_2(self, x, **kwargs): # applies > R_adjoint N_inverse R return self.RN[0].adjoint_times(self.RN[1].inverse_times(self.RN[0].times(x))) # > applies A1 + A2 in self.codomain def _inverse_multiply_1(self, x, **kwargs): return self._A1(x, pseudo=True) + self._A2(x.transform(self.domain)).transform(self.codomain) def _inverse_multiply_2(self, x, **kwargs): # > applies A1 + A2 in self.domain transformed_x = x.transform(self.codomain) aed_x = self._A1(transformed_x, pseudo=True) #print (vars(aed_x),) transformed_aed = aed_x.transform(self.domain) #print (vars(transformed_aed),) temp_to_add = self._A2(x) added = transformed_aed + temp_to_add return added # return # self._A1(x.transform(self.codomain),pseudo=True).transform(self.domain)+self._A2(x) def _briefing(self, x): # > prepares x for `multiply` # inspect x if not isinstance(x, field): return (field(self.domain, codomain=self.codomain, val=x), False) # check x.domain elif x.domain == self.domain: return (x, False) elif x.domain == self.codomain: return (x, True) # transform else: return (x.transform(new_domain=self.codomain, overwrite=False), True) def _debriefing(self, x, x_, in_codomain): # > evaluates x and x_ after `multiply` if x_ is None: return None # inspect x elif isinstance(x, field): # repair ... if in_codomain == True and x.domain != self.codomain: x_ = x_.transform(new_domain=x.domain) # ... domain if x_.codomain != x.codomain: x_.set_codomain(new_codomain=x.codomain) # ... codomain return x_ def times(self, x, W=None, spam=None, reset=None, note=False, x0=None, tol=1E-4, clevel=1, limii=None, **kwargs): """ Applies the propagator to a given object by invoking a conjugate gradient. Parameters ---------- x : {scalar, list, array, field} Scalars are interpreted as constant arrays, and an array will be interpreted as a field on the domain of the operator. force : bool Indicates wheter to return a field instead of ``None`` in case the conjugate gradient fails. Returns ------- Dx : field Mapped field with suitable domain. See Also -------- conjugate_gradient Other Parameters ---------------- W : {operator, function}, *optional* Operator `W` that is a preconditioner on `A` and is applicable to a field (default: None). spam : function, *optional* Callback function which is given the current `x` and iteration counter each iteration (default: None). reset : integer, *optional* Number of iterations after which to restart; i.e., forget previous conjugated directions (default: sqrt(b.get_dim())). note : bool, *optional* Indicates whether notes are printed or not (default: False). x0 : field, *optional* Starting guess for the minimization. tol : scalar, *optional* Tolerance specifying convergence; measured by current relative residual (default: 1E-4). clevel : integer, *optional* Number of times the tolerance should be undershot before exiting (default: 1). limii : integer, *optional* Maximum number of iterations performed (default: 10 * b.get_dim()). """ if W is None: W = self.S # prepare x_, in_codomain = self._briefing(x) # apply operator if(in_codomain): A = self._inverse_multiply_1 else: A = self._inverse_multiply_2 x_, convergence = conjugate_gradient(A, x_, W=W, spam=spam, reset=reset, note=note)( x0=x0, tol=tol, clevel=clevel, limii=limii) # evaluate if not convergence: about.warnings.cprint("WARNING: conjugate gradient failed.") return self._debriefing(x, x_, in_codomain) def inverse_times(self, x, **kwargs): """ Applies the inverse propagator to a given object. Parameters ---------- x : {scalar, list, array, field} Scalars are interpreted as constant arrays, and an array will be interpreted as a field on the domain of the operator. Returns ------- DIx : field Mapped field with suitable domain. """ # prepare x_, in_codomain = self._briefing(x) # apply operator if(in_codomain): x_ = self._inverse_multiply_1(x_) else: x_ = self._inverse_multiply_2(x_) # evaluate return self._debriefing(x, x_, in_codomain) def __repr__(self): return ""