"This program is free software: you can redistribute it and/or modify\n",
"it under the terms of the GNU General Public License as published by\n",
"the Free Software Foundation, either version 3 of the License, or\n",
"(at your option) any later version.\n",
"\n",
"This program is distributed in the hope that it will be useful,\n",
"but WITHOUT ANY WARRANTY; without even the implied warranty of\n",
"MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the\n",
"GNU General Public License for more details.\n",
"\n",
"You should have received a copy of the GNU General Public License\n",
"along with this program. If not, see <http://www.gnu.org/licenses/>.\n",
"\n",
"Copyright(C) 2013-2022 Max-Planck-Society\n",
"\n",
"NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik."
]
},
{
"cell_type": "markdown",
"id": "40ff38e3",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Code example: Wiener filter"
]
},
{
"cell_type": "markdown",
"id": "751fb241",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Introduction to Information Field Theory(IFT)\n",
"We start with the measurement equation\n",
"$$d_i = (Rs)_i+n_i$$\n",
"Here, $s$ is a continuous field, $d$ a discrete data vector, $n$ is the discrete noise on each data point and $R$ is the instrument response.\n",
"In most cases, $R$ is not invertible.\n",
"IFT aims at **inverting** the above uninvertible problem in the **best possible way** using Bayesian statistics.\n",
"\n",
"NIFTy (Numerical Information Field Theory) is a Python framework in which IFT problems can be tackled easily.\n",
"\n",
"Its main interfaces are:\n",
"\n",
"- **Spaces**: Cartesian, 2-Spheres (Healpix, Gauss-Legendre) and their respective harmonic spaces.\n",
"- **Fields**: Defined on spaces.\n",
"- **Operators**: Acting on fields.\n"
]
},
{
"cell_type": "markdown",
"id": "1c53d3d5",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"## Wiener filter on 1D- fields in IFT\n",
"\n",
"### Assumptions\n",
"- We consider a linear response R in the measurement equation $d=Rs+n$.\n",
"- We assume the **signal** and the **noise** prior to be **Gaussian** $\\mathcal P (s) = \\mathcal G (s,S)$, $\\mathcal P (n) = \\mathcal G (n,N)$\n",
"- Here $S, N$ are signal and noise covariances. Therefore they are positive definite matrices.\n",
"\n",
"### Wiener filter solution\n",
"- Making use of Bayes' theorem, the posterior is proportional to the joint probability and is given by:\n",
"\n",
"$$\\mathcal P (s|d) \\propto P(s,d) = \\mathcal G(d-Rs,N) \\,\\mathcal G(s,S) \\propto \\mathcal G (s-m,D)$$\n",
"\n",
"- Here, $m$ is the posterior mean , $D$ is the information propagator and are defined as follows:\n",
"- There, $j$ is the information source defined as $$ j = R^\\dagger N^{-1} d.$$\n",
"\n",
"Let us implement this in **NIFTy!** So let's import all the packages we need."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "73573037",
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"import numpy as np\n",
"import nifty8 as ift\n",
"import matplotlib.pyplot as plt\n",
"plt.rcParams['figure.dpi'] = 100\n",
"plt.style.use(\"seaborn-notebook\")"
]
},
{
"cell_type": "markdown",
"id": "1b447738",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Implementation in NIFTy\n",
"\n",
"We assume statistical **homogeneity** and **isotropy**, so the signal covariance $S$ is **translation invariant** and only depends on the **absolute value** of the distance.\n",
"According to Wiener-Khinchin theorem, the signal covariance $S$ is diagonal in harmonic space, $$S_{kk^{\\prime}} = 2 \\pi \\delta(k-k^{\\prime}) P(k)= \\text{diag}(S) \\equiv \\widehat{S_k}$$ and is described by a one-dimensional power spectrum.\n",
"We assume the power spectrum to follow a power-law, $$P(k) = P_0\\,\\left(1+\\left(\\frac{k}{k_0}\\right)^2\\right)^{-\\gamma /2},$$ with $P_0 = 2 \\cdot 10^4, \\ k_0 = 5, \\ \\gamma = 4$, thus the reconstruction starts in harmonic space."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c3a935c1",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [],
"source": [
"def pow_spec(k):\n",
" P0, k0, gamma = [2e4, 5, 4]\n",
" return P0 / ((1. + (k/k0)**2)**(gamma / 2))"
]
},
{
"cell_type": "markdown",
"id": "5ce7c229",
"metadata": {},
"source": [
"### Spaces and harmonic transformations\n",
"- We define our non-harmonic signal space to be Cartesian with $N_{pix} = 512$ being the number of grid cells.\n",
"- To connect harmonic and non-harmonic spaces we introduce the Hartley transform $H$ that is closely related to the Fourier transform but maps $\\mathbb{R}\\rightarrow\\mathbb{R}$.\n",
"- The covariance S in non-harmonic space is given by $$S = H^{\\dagger}\\widehat{S_k} H \\ .$$"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f754e1b4",
"metadata": {},
"outputs": [],
"source": [
"# Signal space is a regular Cartesian grid space\n",
"N_pix = 512\n",
"s_space = ift.RGSpace(N_pix)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6c25ac41",
"metadata": {},
"outputs": [],
"source": [
"# k_space is the harmonic conjugate space of s_space\n",
"- In order to demonstrate the Wiener filter, we use **synthetic data**. Therefore, we draw a sample $\\tilde{s}$ from $S$. (see Sampling)\n",
"- For simplicity we define the response operator as a unit matrix, $R = \\mathbb{1}$.\n",
"- We assume the noise covariance to be uncorrelated and constant, $N = 0.2 \\cdot \\mathbb{1}$ and draw a sample $\\tilde{n}$.\n",
"- Thus the synthetic data $d = R(\\tilde{s}) + \\tilde{n}$."
]
},
{
"cell_type": "markdown",
"id": "387bd2c5",
"metadata": {},
"source": [
"### Sampling\n",
"\n",
"- Assuming we have a distribution $\\mathcal{G}(b,B)$ we can sample from and we want to draw a sample from a distritbution $\\mathcal{G}(c,C)$ with covariance $C$. The two distributions are connected via the relation $C = ABA^{\\dagger}.$ One can show that $c= Ab$ with $b \\curvearrowleft \\mathcal{G}(b,B)$\thas a probability distribution with covariance $C$ as desired.\n",
"- This is also true for the case that $B = \\mathbb{1}$, meaning that $\\mathcal{G}(b,\\mathbb{1})$ Thus $C = AA^{\\dagger}$ .\n",
"- Note that, if $C$ is diagonal, $A$ is diagonal as well.\n",
"- It can be shown that if $C = A + B$, then $c = a + b$ with $b \\curvearrowleft \\mathcal{G}(b,B)$ and $a \\curvearrowleft \\mathcal{G}(a,A)$ has a probability distribution with covariance $C$ as desired.\n",
"- If we can draw samples from $\\mathcal{G}(a,A)$, and we want to draw a sample with the covariance $A^{-1}$, one can simply show that $c = A^{-1}a$ has a a probability distribution with covariance $A^{-1}$.\n",
"# Enable .inverse to invert D via Conjugate Gradient.\n",
"D_inv = ift.InversionEnabler(D_inv, ic)\n",
"D = D_inv.inverse"
]
},
{
"cell_type": "markdown",
"id": "c2ab51d4",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Apply Wiener Filter\n",
"\n",
"After defining the information source and propagator, we are able to apply the Wiener filter in order to get the posterior mean $m = \\langle s \\rangle_{\\mathcal{P}(s|d)}$ that is our reconstruction of the signal:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8e7616c8",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [],
"source": [
"m = D(j)"
]
},
{
"cell_type": "markdown",
"id": "60b7922d",
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"source": [
"### Results"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3d4fdd46",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [],
"source": [
"# `.val` retrieves the underlying numpy array from a NIFTy Field.\n",
"Now we consider a case that the data is not complete.\n",
"This might be the case in real situations as the instrument might not be able to receive data.\n",
"In order to apply the Wiener filter to this case, we first need to build the response corresponding to the incomplete measurement in NIFTy!"
]
},
{
"cell_type": "markdown",
"id": "dae020f7",
"metadata": {},
"source": [
"### Incomplete Measuring / Masking\n",
"We need to build mask operator which cuts out all the unobserved parts of the signal.\n",
"Lets assume that we first observe the signal for some time, but then something goes wrong with our instrument and we don't collect data for a while.\n",
"After fixing the instrument we can collect data again.\n",
"This means that data lives on an unstructured domain as the there is data missing for the period of time $t_{\\text{off}}$ when the instrument was offline.\n",
"In order to implement this incomplete measurement we need to define a new response operator $R$ which masks the signal for the time $t_{\\text{off}}$.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "14726428",
"metadata": {
"slideshow": {
"slide_type": "-"
}
},
"outputs": [],
"source": [
"# Whole observation time\n",
"npix = s_space.size\n",
"# Time when the instrument is turned off\n",
"l = int(npix * 0.2)\n",
"# Time when the instrument is turned on again\n",
"h = int(npix * 0.4)\n",
"# Initialise a new array for the whole time frame\n",
"mask = np.zeros(s_space.shape, bool)\n",
"# Define the mask\n",
"mask[l:h] = 1\n",
"# Turn the numpy array into a nifty field\n",
"mask = ift.makeField(s_space, mask)\n",
"# Define the response operator which masks the places where mask == 1\n",
"R = ift.MaskOperator(mask)"
]
},
{
"cell_type": "markdown",
"id": "dd3e8af1",
"metadata": {},
"source": [
"### Synthetic Data\n",
"As in the Wiener filter example with complete data, we are generating some synthetic data now by propagating the previously drawn prior sample through the incomplete measurement response and adding a noise sample."
"# Measure the signal sample with additional noise\n",
"d = R(s) + n"
]
},
{
"cell_type": "markdown",
"id": "798f5ca8",
"metadata": {},
"source": [
"### Sampling from D\n",
"Since we have an incomplete measurement we want to know how uncertain we are about our Wiener filter solution. We can easily obtain both, the mean and the standard deviation by sampling from $D$ and computing them directly from the drawn samples.\n",
"In order to enable NIFTy to sample from $D$ we need to use some helper functions."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "53b10df1",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"outputs": [],
"source": [
"# This implements the rule how to sample from a sum of covariances\n",
"# Number of samples to calculate the posterior standard deviation\n",
"n_samples = 200\n",
"\n",
"# Helper function that calculates the mean and the variance from a set of samples efficiently\n",
"sc = ift.StatCalculator()\n",
"for _ in range(n_samples):\n",
" # Draw a sample of G(s,D) and shifting it by m -> G(s-m,D)\n",
" sample = m + D.draw_sample()\n",
" # Add it to the StatCalculator\n",
" sc.add(sample)\n",
"# Standard deviation from samples\n",
"samples_std = sc.var.sqrt()\n",
"# Mean from samples that converges to m in the limit of infinitely many samples\n",
"samples_mean = sc.mean"
]
},
{
"cell_type": "markdown",
"id": "7c1cb854",
"metadata": {},
"source": [
"### Plots\n",
"Let us visualize the results of the Wiener filter $m$, the sampled standard deviation and mean, as well as the true signal (ground truth) and the data.\n",
"Since the data lives in data space, we first need to project it back into the signal space via $R^{\\dagger}d$."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "09c0d576",
"metadata": {
"slideshow": {
"slide_type": "skip"
}
},
"outputs": [],
"source": [
"plt.axvspan(l, h, facecolor='0.8',alpha=0.3, label=\"masked area\") # Shading the masked interval\n",
- There, $j$ is the information source defined as $$ j = R^\dagger N^{-1} d.$$
Let us implement this in **NIFTy!** So let's import all the packages we need.
%% Cell type:code id:73573037 tags:
``` python
%matplotlibinline
importnumpyasnp
importnifty8asift
importmatplotlib.pyplotasplt
plt.rcParams['figure.dpi']=100
plt.style.use("seaborn-notebook")
```
%% Cell type:markdown id:1b447738 tags:
### Implementation in NIFTy
We assume statistical **homogeneity** and **isotropy**, so the signal covariance $S$ is **translation invariant** and only depends on the **absolute value** of the distance.
According to Wiener-Khinchin theorem, the signal covariance $S$ is diagonal in harmonic space, $$S_{kk^{\prime}} = 2 \pi \delta(k-k^{\prime}) P(k)= \text{diag}(S) \equiv \widehat{S_k}$$ and is described by a one-dimensional power spectrum.
We assume the power spectrum to follow a power-law, $$P(k) = P_0\,\left(1+\left(\frac{k}{k_0}\right)^2\right)^{-\gamma /2},$$ with $P_0 = 2 \cdot 10^4, \ k_0 = 5, \ \gamma = 4$, thus the reconstruction starts in harmonic space.
%% Cell type:code id:c3a935c1 tags:
``` python
defpow_spec(k):
P0,k0,gamma=[2e4,5,4]
returnP0/((1.+(k/k0)**2)**(gamma/2))
```
%% Cell type:markdown id:5ce7c229 tags:
### Spaces and harmonic transformations
- We define our non-harmonic signal space to be Cartesian with $N_{pix} = 512$ being the number of grid cells.
- To connect harmonic and non-harmonic spaces we introduce the Hartley transform $H$ that is closely related to the Fourier transform but maps $\mathbb{R}\rightarrow\mathbb{R}$.
- The covariance S in non-harmonic space is given by $$S = H^{\dagger}\widehat{S_k} H \ .$$
%% Cell type:code id:f754e1b4 tags:
``` python
# Signal space is a regular Cartesian grid space
N_pix=512
s_space=ift.RGSpace(N_pix)
```
%% Cell type:code id:6c25ac41 tags:
``` python
# k_space is the harmonic conjugate space of s_space
# Sandwich Operator implements S = HT.adjoint @ S_k @ HT and enables NIFTy to sample from S
S=ift.SandwichOperator.make(bun=HT,cheese=S_k)
```
%% Cell type:markdown id:a84f4b80 tags:
### Synthetic Data
- In order to demonstrate the Wiener filter, we use **synthetic data**. Therefore, we draw a sample $\tilde{s}$ from $S$. (see Sampling)
- For simplicity we define the response operator as a unit matrix, $R = \mathbb{1}$.
- We assume the noise covariance to be uncorrelated and constant, $N = 0.2 \cdot \mathbb{1}$ and draw a sample $\tilde{n}$.
- Thus the synthetic data $d = R(\tilde{s}) + \tilde{n}$.
%% Cell type:markdown id:387bd2c5 tags:
### Sampling
- Assuming we have a distribution $\mathcal{G}(b,B)$ we can sample from and we want to draw a sample from a distritbution $\mathcal{G}(c,C)$ with covariance $C$. The two distributions are connected via the relation $C = ABA^{\dagger}.$ One can show that $c= Ab$ with $b \curvearrowleft \mathcal{G}(b,B)$ has a probability distribution with covariance $C$ as desired.
- This is also true for the case that $B = \mathbb{1}$, meaning that $\mathcal{G}(b,\mathbb{1})$ Thus $C = AA^{\dagger}$ .
- Note that, if $C$ is diagonal, $A$ is diagonal as well.
- It can be shown that if $C = A + B$, then $c = a + b$ with $b \curvearrowleft \mathcal{G}(b,B)$ and $a \curvearrowleft \mathcal{G}(a,A)$ has a probability distribution with covariance $C$ as desired.
- If we can draw samples from $\mathcal{G}(a,A)$, and we want to draw a sample with the covariance $A^{-1}$, one can simply show that $c = A^{-1}a$ has a a probability distribution with covariance $A^{-1}$.
# Enable .inverse to invert D via Conjugate Gradient.
D_inv=ift.InversionEnabler(D_inv,ic)
D=D_inv.inverse
```
%% Cell type:markdown id:c2ab51d4 tags:
### Apply Wiener Filter
After defining the information source and propagator, we are able to apply the Wiener filter in order to get the posterior mean $m = \langle s \rangle_{\mathcal{P}(s|d)}$ that is our reconstruction of the signal:
%% Cell type:code id:8e7616c8 tags:
``` python
m=D(j)
```
%% Cell type:markdown id:60b7922d tags:
### Results
%% Cell type:code id:3d4fdd46 tags:
``` python
# `.val` retrieves the underlying numpy array from a NIFTy Field.
Now we consider a case that the data is not complete.
This might be the case in real situations as the instrument might not be able to receive data.
In order to apply the Wiener filter to this case, we first need to build the response corresponding to the incomplete measurement in NIFTy!
%% Cell type:markdown id:dae020f7 tags:
### Incomplete Measuring / Masking
We need to build mask operator which cuts out all the unobserved parts of the signal.
Lets assume that we first observe the signal for some time, but then something goes wrong with our instrument and we don't collect data for a while.
After fixing the instrument we can collect data again.
This means that data lives on an unstructured domain as the there is data missing for the period of time $t_{\text{off}}$ when the instrument was offline.
In order to implement this incomplete measurement we need to define a new response operator $R$ which masks the signal for the time $t_{\text{off}}$.
%% Cell type:code id:14726428 tags:
``` python
# Whole observation time
npix=s_space.size
# Time when the instrument is turned off
l=int(npix*0.2)
# Time when the instrument is turned on again
h=int(npix*0.4)
# Initialise a new array for the whole time frame
mask=np.zeros(s_space.shape,bool)
# Define the mask
mask[l:h]=1
# Turn the numpy array into a nifty field
mask=ift.makeField(s_space,mask)
# Define the response operator which masks the places where mask == 1
R=ift.MaskOperator(mask)
```
%% Cell type:markdown id:dd3e8af1 tags:
### Synthetic Data
As in the Wiener filter example with complete data, we are generating some synthetic data now by propagating the previously drawn prior sample through the incomplete measurement response and adding a noise sample.
Since we have an incomplete measurement we want to know how uncertain we are about our Wiener filter solution. We can easily obtain both, the mean and the standard deviation by sampling from $D$ and computing them directly from the drawn samples.
In order to enable NIFTy to sample from $D$ we need to use some helper functions.
%% Cell type:code id:53b10df1 tags:
``` python
# This implements the rule how to sample from a sum of covariances
# Number of samples to calculate the posterior standard deviation
n_samples=200
# Helper function that calculates the mean and the variance from a set of samples efficiently
sc=ift.StatCalculator()
for_inrange(n_samples):
# Draw a sample of G(s,D) and shifting it by m -> G(s-m,D)
sample=m+D.draw_sample()
# Add it to the StatCalculator
sc.add(sample)
# Standard deviation from samples
samples_std=sc.var.sqrt()
# Mean from samples that converges to m in the limit of infinitely many samples
samples_mean=sc.mean
```
%% Cell type:markdown id:7c1cb854 tags:
### Plots
Let us visualize the results of the Wiener filter $m$, the sampled standard deviation and mean, as well as the true signal (ground truth) and the data.
Since the data lives in data space, we first need to project it back into the signal space via $R^{\dagger}d$.
%% Cell type:code id:09c0d576 tags:
``` python
plt.axvspan(l,h,facecolor='0.8',alpha=0.3,label="masked area")# Shading the masked interval