# This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Copyright(C) 2013-2019 Max-Planck-Society # # NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik. import numpy as np from .. import utilities from ..domain_tuple import DomainTuple from ..field import Field from ..linearization import Linearization from ..sugar import makeDomain, makeOp from .linear_operator import LinearOperator from .operator import Operator from .sampling_enabler import SamplingEnabler from .sandwich_operator import SandwichOperator from .simple_linear_operators import VdotOperator class EnergyOperator(Operator): """Operator which has a scalar domain as target domain. It is intended as an objective function for field inference. Examples -------- - Information Hamiltonian, i.e. negative-log-probabilities. - Gibbs free energy, i.e. an averaged Hamiltonian, aka Kullbach-Leibler divergence. """ _target = DomainTuple.scalar_domain() class SquaredNormOperator(EnergyOperator): """Computes the L2-norm of the output of an operator. Parameters ---------- domain : Domain, DomainTuple or tuple of Domain Target domain of the operator in which the L2-norm shall be computed. """ def __init__(self, domain): self._domain = domain def apply(self, x): self._check_input(x) if isinstance(x, Linearization): val = Field.scalar(x.val.vdot(x.val)) jac = VdotOperator(2*x.val)(x.jac) return x.new(val, jac) return Field.scalar(x.vdot(x)) class QuadraticFormOperator(EnergyOperator): """Computes the L2-norm of a Field or MultiField with respect to a specific metric endo. .. math :: E(f) = \\frac12 f^\\dagger \\text{endo}(f) Parameters ---------- endo : EndomorphicOperator Kernel of quadratic form. """ def __init__(self, endo): from .endomorphic_operator import EndomorphicOperator if not isinstance(endo, EndomorphicOperator): raise TypeError("op must be an EndomorphicOperator") self._op = endo self._domain = endo.domain def apply(self, x): self._check_input(x) if isinstance(x, Linearization): t1 = self._op(x.val) jac = VdotOperator(t1)(x.jac) val = Field.scalar(0.5*x.val.vdot(t1)) return x.new(val, jac) return Field.scalar(0.5*x.vdot(self._op(x))) class GaussianEnergy(EnergyOperator): """Class for energies of fields with Gaussian probability distribution. Represents up to constants in :math:m: .. math :: E(f) = - \\log G(f-m, D) = 0.5 (f-m)^\\dagger D^{-1} (f-m), an information energy for a Gaussian distribution with mean m and covariance D. Parameters ---------- mean : Field Mean of the Gaussian. Default is 0. covariance : LinearOperator Covariance of the Gaussian. Default is the identity operator. domain : Domain, DomainTuple of tuple of Domain Operator domain. By default it is inferred from mean or covariance if specified Note ---- At least one of the arguments has to be provided. """ def __init__(self, mean=None, covariance=None, domain=None): if mean is not None and not isinstance(mean, Field): raise TypeError if covariance is not None and not isinstance(covariance, LinearOperator): raise TypeError if domain is not None: domain = DomainTuple.make(domain) self._domain = None if mean is not None: self._checkEquivalence(mean.domain) if covariance is not None: self._checkEquivalence(covariance.domain) if domain is not None: self._checkEquivalence(domain) if self._domain is None: raise ValueError("no domain given") self._mean = mean if covariance is None: self._op = SquaredNormOperator(self._domain).scale(0.5) else: self._op = QuadraticFormOperator(covariance.inverse) self._icov = None if covariance is None else covariance.inverse def _checkEquivalence(self, newdom): newdom = makeDomain(newdom) if self._domain is None: self._domain = newdom else: if self._domain != newdom: raise ValueError("domain mismatch") def apply(self, x): self._check_input(x) residual = x if self._mean is None else x - self._mean res = self._op(residual).real if not isinstance(x, Linearization) or not x.want_metric: return res metric = SandwichOperator.make(x.jac, self._icov) return res.add_metric(metric) class PoissonianEnergy(EnergyOperator): """Class for likelihood Hamiltonians of expected count field constrained by Poissonian count data. Represents up to an f-independent term :math:log(d!): .. math :: E(f) = -\\log \\text{Poisson}(d|f) = \\sum f - d^\\dagger \\log(f), where f is a :class:Field in data space with the expectation values for the counts. Parameters ---------- d : Field Data field with counts. Needs to have integer dtype and all field values need to be non-negative. """ def __init__(self, d): if not isinstance(d, Field) or not np.issubdtype(d.dtype, np.integer): raise TypeError if np.any(d.local_data < 0): raise ValueError self._d = d self._domain = DomainTuple.make(d.domain) def apply(self, x): self._check_input(x) res = x.sum() - x.log().vdot(self._d) if not isinstance(x, Linearization): return Field.scalar(res) if not x.want_metric: return res metric = SandwichOperator.make(x.jac, makeOp(1./x.val)) return res.add_metric(metric) class InverseGammaLikelihood(EnergyOperator): """Special class for inverse Gamma distributed covariances. RL FIXME: To be documented. """ def __init__(self, d): if not isinstance(d, Field): raise TypeError self._d = d self._domain = DomainTuple.make(d.domain) def apply(self, x): self._check_input(x) res = 0.5*(x.log().sum() + (1./x).vdot(self._d)) if not isinstance(x, Linearization): return Field.scalar(res) if not x.want_metric: return res metric = SandwichOperator.make(x.jac, makeOp(0.5/(x.val**2))) return res.add_metric(metric) class BernoulliEnergy(EnergyOperator): """Class for likelihood-energies of expected event frequency constrained by event data. Parameters ---------- d : Field data field with events (=1) or non-events (=0) Notes ----- E = BernoulliEnergy(d) represents :math:E(f) = -\\log \\text{Bernoulli}(d|f) = -d^\\dagger \\log f - (1-d)^\\dagger \\log(1-f), where f is a field in data space (d.domain) with the expected frequencies of events. """ def __init__(self, d): self._d = d self._domain = DomainTuple.make(d.domain) def apply(self, x): self._check_input(x) v = x.log().vdot(-self._d) - (1. - x).log().vdot(1. - self._d) if not isinstance(x, Linearization): return Field.scalar(v) if not x.want_metric: return v met = makeOp(1./(x.val*(1. - x.val))) met = SandwichOperator.make(x.jac, met) return v.add_metric(met) class Hamiltonian(EnergyOperator): """Class for information Hamiltonians. Parameters ---------- lh : EnergyOperator a likelihood energy ic_samp : IterationController is passed to SamplingEnabler to draw Gaussian distributed samples with covariance = metric of Hamiltonian (= Hessian without terms that generate negative eigenvalues) Notes ----- H = Hamiltonian(E_lh) represents :math:H(f) = 0.5 f^\\dagger f + E_{lh}(f) an information Hamiltonian for a field f with a white Gaussian prior (unit covariance) and the likelihood energy :math:E_{lh}. Other field priors can be represented via transformations of a white Gaussian field into a field with the desired prior probability structure. By implementing prior information this way, the field prior is represented by a generative model, from which NIFTy can draw samples and infer a field using the Maximum a Posteriori (MAP) or the Variational Bayes (VB) method. For more details see: "Encoding prior knowledge in the structure of the likelihood" Jakob Knollmüller, Torsten A. Ensslin, submitted, arXiv:1812.04403 _ """ def __init__(self, lh, ic_samp=None): self._lh = lh self._prior = GaussianEnergy(domain=lh.domain) self._ic_samp = ic_samp self._domain = lh.domain def apply(self, x): self._check_input(x) if (self._ic_samp is None or not isinstance(x, Linearization) or not x.want_metric): return self._lh(x) + self._prior(x) else: lhx, prx = self._lh(x), self._prior(x) mtr = SamplingEnabler(lhx.metric, prx.metric.inverse, self._ic_samp, prx.metric.inverse) return (lhx + prx).add_metric(mtr) def __repr__(self): subs = 'Likelihood:\n{}'.format(utilities.indent(self._lh.__repr__())) subs += '\nPrior: Quadratic{}'.format(self._lh.domain.keys()) return 'Hamiltonian:\n' + utilities.indent(subs) class AveragedEnergy(EnergyOperator): """Class for Kullbach-Leibler (KL) Divergence or Gibbs free energies Precisely a sample averaged Hamiltonian (or other energy) that represents approximatively the relevant part of a KL to be used in Variational Bayes inference if the samples are drawn from the approximating Gaussian. Let :math:Q(f) = G(f-m,D) Gaussian used to approximate :math:P(f|d), the correct posterior with information Hamiltonian :math:H(d,f) = -\\log P(d,f) = -\\log P(f|d) + \\text{const.} The KL divergence between those should then be optimized for m. It is :math:KL(Q,P) = \\int Df Q(f) \\log Q(f)/P(f)\\\\ = \\left< \\log Q(f) \\right>_Q(f) - \\left< \\log P(f) \\right>_Q(f)\\\\ = \\text{const} + \\left< H(f) \\right>_G(f-m,D) in essence the information Hamiltonian averaged over a Gaussian distribution centered on the mean m. AveragedEnergy(H) approximates :math:\\left< H(f) \\right>_{G(f-m,D)} if the residuals :math:f-m are drawn from covariance :math:D. Parameters ---------- h: Hamiltonian the Hamiltonian/energy to be averaged res_samples : iterable of Fields set of residual sample points to be added to mean field for approximate estimation of the KL Notes ----- KL = AveragedEnergy(H, samples) represents :math:\\text{KL}(m) = \\sum_i H(m+v_i) / N, where :math:v_i are the residual samples, :math:N is their number, and :math:m is the mean field around which the samples are drawn. Having symmetrized residual samples, with both v_i and -v_i being present ensures that the distribution mean is exactly represented. This reduces sampling noise and helps the numerics of the KL minimization process in the variational Bayes inference. """ def __init__(self, h, res_samples): self._h = h self._domain = h.domain self._res_samples = tuple(res_samples) def apply(self, x): self._check_input(x) mymap = map(lambda v: self._h(x + v), self._res_samples) return utilities.my_sum(mymap)*(1./len(self._res_samples))