# This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Copyright(C) 2013-2019 Max-Planck-Society # # NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik. import numpy as np from .. import utilities from ..domain_tuple import DomainTuple from ..field import Field from ..linearization import Linearization from ..sugar import makeDomain, makeOp from .linear_operator import LinearOperator from .operator import Operator from .sampling_enabler import SamplingEnabler from .sandwich_operator import SandwichOperator from .simple_linear_operators import VdotOperator class EnergyOperator(Operator): """Operator which has a scalar domain as target domain. It is intended as an objective function for field inference. Examples -------- - Information Hamiltonian, i.e. negative-log-probabilities. - Gibbs free energy, i.e. an averaged Hamiltonian, aka Kullbach-Leibler divergence. """ _target = DomainTuple.scalar_domain() class SquaredNormOperator(EnergyOperator): """Computes the L2-norm of the output of an operator. Parameters ---------- domain : Domain, DomainTuple or tuple of Domain Target domain of the operator in which the L2-norm shall be computed. """ def __init__(self, domain): self._domain = domain def apply(self, x): self._check_input(x) if isinstance(x, Linearization): val = Field.scalar(x.val.vdot(x.val)) jac = VdotOperator(2*x.val)(x.jac) return x.new(val, jac) return Field.scalar(x.vdot(x)) class QuadraticFormOperator(EnergyOperator): """Computes the L2-norm of a Field or MultiField with respect to a specific metric `endo`. .. math :: E(f) = \\frac12 f^\\dagger \\text{endo}(f) Parameters ---------- endo : EndomorphicOperator Kernel of quadratic form. """ def __init__(self, endo): from .endomorphic_operator import EndomorphicOperator if not isinstance(endo, EndomorphicOperator): raise TypeError("op must be an EndomorphicOperator") self._op = endo self._domain = endo.domain def apply(self, x): self._check_input(x) if isinstance(x, Linearization): t1 = self._op(x.val) jac = VdotOperator(t1)(x.jac) val = Field.scalar(0.5*x.val.vdot(t1)) return x.new(val, jac) return Field.scalar(0.5*x.vdot(self._op(x))) class GaussianEnergy(EnergyOperator): """Class for energies of fields with Gaussian probability distribution. Represents up to constants in :math:`m`: .. math :: E(f) = - \\log G(f-m, D) = 0.5 (f-m)^\\dagger D^{-1} (f-m), an information energy for a Gaussian distribution with mean m and covariance D. Parameters ---------- mean : Field Mean of the Gaussian. Default is 0. covariance : LinearOperator Covariance of the Gaussian. Default is the identity operator. domain : Domain, DomainTuple, tuple of Domain or MultiDomain Operator domain. By default it is inferred from `mean` or `covariance` if specified Note ---- At least one of the arguments has to be provided. """ def __init__(self, mean=None, covariance=None, domain=None): if mean is not None and not isinstance(mean, Field): raise TypeError if covariance is not None and not isinstance(covariance, LinearOperator): raise TypeError self._domain = None if mean is not None: self._checkEquivalence(mean.domain) if covariance is not None: self._checkEquivalence(covariance.domain) if domain is not None: self._checkEquivalence(domain) if self._domain is None: raise ValueError("no domain given") self._mean = mean if covariance is None: self._op = SquaredNormOperator(self._domain).scale(0.5) else: self._op = QuadraticFormOperator(covariance.inverse) self._icov = None if covariance is None else covariance.inverse def _checkEquivalence(self, newdom): newdom = makeDomain(newdom) if self._domain is None: self._domain = newdom else: if self._domain != newdom: raise ValueError("domain mismatch") def apply(self, x): self._check_input(x) residual = x if self._mean is None else x - self._mean res = self._op(residual).real if not isinstance(x, Linearization) or not x.want_metric: return res metric = SandwichOperator.make(x.jac, self._icov) return res.add_metric(metric) class PoissonianEnergy(EnergyOperator): """Class for likelihood Hamiltonians of expected count field constrained by Poissonian count data. Represents up to an f-independent term :math:`log(d!)`: .. math :: E(f) = -\\log \\text{Poisson}(d|f) = \\sum f - d^\\dagger \\log(f), where f is a :class:`Field` in data space with the expectation values for the counts. Parameters ---------- d : Field Data field with counts. Needs to have integer dtype and all field values need to be non-negative. """ def __init__(self, d): if not isinstance(d, Field) or not np.issubdtype(d.dtype, np.integer): raise TypeError if np.any(d.local_data < 0): raise ValueError self._d = d self._domain = DomainTuple.make(d.domain) def apply(self, x): self._check_input(x) res = x.sum() - x.log().vdot(self._d) if not isinstance(x, Linearization): return Field.scalar(res) if not x.want_metric: return res metric = SandwichOperator.make(x.jac, makeOp(1./x.val)) return res.add_metric(metric) class InverseGammaLikelihood(EnergyOperator): """ FIXME """ def __init__(self, d): if not isinstance(d, Field): raise TypeError self._d = d self._domain = DomainTuple.make(d.domain) def apply(self, x): self._check_input(x) res = 0.5*(x.log().sum() + (1./x).vdot(self._d)) if not isinstance(x, Linearization): return Field.scalar(res) if not x.want_metric: return res metric = SandwichOperator.make(x.jac, makeOp(0.5/(x.val**2))) return res.add_metric(metric) class BernoulliEnergy(EnergyOperator): """Computes likelihood energy of expected event frequency constrained by event data. .. math :: E(f) = -\\log \\text{Bernoulli}(d|f) = -d^\\dagger \\log f - (1-d)^\\dagger \\log(1-f), where f is a field defined on `d.domain` with the expected frequencies of events. Parameters ---------- d : Field Data field with events (1) or non-events (0). """ def __init__(self, d): print(d.dtype) if not isinstance(d, Field) or not np.issubdtype(d.dtype, np.integer): raise TypeError if not np.all(np.logical_or(d.local_data == 0, d.local_data == 1)): raise ValueError self._d = d self._domain = DomainTuple.make(d.domain) def apply(self, x): self._check_input(x) v = -(x.log().vdot(self._d) + (1. - x).log().vdot(1. - self._d)) if not isinstance(x, Linearization): return Field.scalar(v) if not x.want_metric: return v met = makeOp(1./(x.val*(1. - x.val))) met = SandwichOperator.make(x.jac, met) return v.add_metric(met) class Hamiltonian(EnergyOperator): """Computes an information Hamiltonian in its standard form, i.e. with the prior being a Gaussian with unit covariance. Let the likelihood energy be :math:`E_{lh}`. Then this operator computes: .. math :: H(f) = 0.5 f^\\dagger f + E_{lh}(f): Other field priors can be represented via transformations of a white Gaussian field into a field with the desired prior probability structure. By implementing prior information this way, the field prior is represented by a generative model, from which NIFTy can draw samples and infer a field using the Maximum a Posteriori (MAP) or the Variational Bayes (VB) method. The metric of this operator can be used as covariance for drawing Gaussian samples. Parameters ---------- lh : EnergyOperator The likelihood energy. ic_samp : IterationController Tells an internal :class:`SamplingEnabler` which convergence criterium to use to draw Gaussian samples. See also -------- `Encoding prior knowledge in the structure of the likelihood`, Jakob Knollmüller, Torsten A. Ensslin, ``_ """ def __init__(self, lh, ic_samp=None): self._lh = lh self._prior = GaussianEnergy(domain=lh.domain) self._ic_samp = ic_samp self._domain = lh.domain def apply(self, x): self._check_input(x) if (self._ic_samp is None or not isinstance(x, Linearization) or not x.want_metric): return self._lh(x) + self._prior(x) else: lhx, prx = self._lh(x), self._prior(x) mtr = SamplingEnabler(lhx.metric, prx.metric.inverse, self._ic_samp, prx.metric.inverse) return (lhx + prx).add_metric(mtr) def __repr__(self): subs = 'Likelihood:\n{}'.format(utilities.indent(self._lh.__repr__())) subs += '\nPrior: Quadratic{}'.format(self._lh.domain.keys()) return 'Hamiltonian:\n' + utilities.indent(subs) class AveragedEnergy(EnergyOperator): """Computes Kullback-Leibler (KL) divergence or Gibbs free energies. A sample-averaged energy, e.g. an Hamiltonian, approximates the relevant part of a KL to be used in Variational Bayes inference if the samples are drawn from the approximating Gaussian: .. math :: \\text{KL}(m) = \\frac1{\\#\{v_i\}} \\sum_{v_i} H(m+v_i), where :math:`v_i` are the residual samples and :math:`m` is the mean field around which the samples are drawn. Parameters ---------- h: Hamiltonian The energy to be averaged. res_samples : iterable of Fields Set of residual sample points to be added to mean field for approximate estimation of the KL. Note ---- Having symmetrized residual samples, with both v_i and -v_i being present ensures that the distribution mean is exactly represented. This reduces sampling noise and helps the numerics of the KL minimization process in the variational Bayes inference. See also -------- Let :math:`Q(f) = G(f-m,D)` be the Gaussian distribution which is used to approximate the accurate posterior :math:`P(f|d)` with information Hamiltonian :math:`H(d,f) = -\\log P(d,f) = -\\log P(f|d) + \\text{const}`. In Variational Bayes one needs to optimize the KL divergence between those two distributions for m. It is: :math:`KL(Q,P) = \\int Df Q(f) \\log Q(f)/P(f)\\\\ = \\left< \\log Q(f) \\right>_Q(f) - \\left< \\log P(f) \\right>_Q(f)\\\\ = \\text{const} + \\left< H(f) \\right>_G(f-m,D)` in essence the information Hamiltonian averaged over a Gaussian distribution centered on the mean m. :class:`AveragedEnergy(h)` approximates :math:`\\left< H(f) \\right>_{G(f-m,D)}` if the residuals :math:`f-m` are drawn from a Gaussian distribution with covariance :math:`D`. """ def __init__(self, h, res_samples): self._h = h self._domain = h.domain self._res_samples = tuple(res_samples) def apply(self, x): self._check_input(x) mymap = map(lambda v: self._h(x + v), self._res_samples) return utilities.my_sum(mymap)*(1./len(self._res_samples))