Commit c92db8ac by Martin Reinecke

### tweaks

parent 35e632b0
Pipeline #24368 passed with stage
in 24 minutes and 15 seconds
 %% Cell type:markdown id: tags: # A NIFTy demonstration %% Cell type:markdown id: tags: ## IFT: Big Picture IFT starting point: $$d = Rs+n$$ Typically, $s$ continuous field, $d$ discrete data vector. Particularily, $R$ is not invertible. Typically, $s$ is a continuous field, $d$ a discrete data vector. Particularly, $R$ is not invertible. IFT aims at **inverting** the above uninvertible problem in the **best possible way** using Bayesian statistics. ## NIFTy NIFTy (Numerical Information Field Theory, en. raffiniert) is a Python framework in which IFT problems can be tackeled easily. NIFTy (Numerical Information Field Theory) is a Python framework in which IFT problems can be tackled easily. Main Interfaces: - **Spaces**: Cartesian, 2-Spheres (Healpix, Gauss-Legendre) and their respective harmonic spaces. - **Fields**: Defined on spaces. - **Operators**: Acting on spaces. - **Operators**: Acting on fields. %% Cell type:markdown id: tags: ## Wiener Filter: Formulae ### Assumptions - $d=Rs+n$, $R$ linear operator. - $\mathcal P (s) = \mathcal G (s,S)$, $\mathcal P (n) = \mathcal G (n,N)$ where $S, N$ are positive definite matrices. ### Posterior The Posterior is given by: $$\mathcal P (s|d) \propto P(s,d) = \mathcal G(d-Rs,N) \,\mathcal G(s,S) \propto \mathcal G (m,D)$$ where \begin{align} m &= Dj \\ D^{-1}&= (S^{-1} +R^\dagger N^{-1} R )\\ j &= R^\dagger N^{-1} d \end{align} Let us implement this in NIFTy! %% Cell type:markdown id: tags: ## Wiener Filter: Example - One-dimensional signal with powerspectrum: $$P(k) = P_0\,\left(1+\left(\frac{k}{k_0}\right)^2\right)^{-\gamma /2},$$ - One-dimensional signal with power spectrum: $$P(k) = P_0\,\left(1+\left(\frac{k}{k_0}\right)^2\right)^{-\gamma /2},$$ with $P_0 = 0.2, k_0 = 5, \gamma = 4$. Recall: $P(k)$ defines an isotropic and homogeneous $S$. - $N = 0.5 \cdot \text{id}$. - Number data points $N_{pix} = 512$. - Response operator: $$R_x=\begin{pmatrix} \delta(x-0)\\\delta(x-1)\\\ldots\\ \delta(x-511) \end{pmatrix}.$$ However, the signal space is also discrete on the computer and $R = \text{id}$. %% Cell type:code id: tags:  python N_pixels = 512 # Number of pixels def pow_spec(k): P0, k0, gamma = [.2, 5, 6] return P0 * (1. + (k/k0)**2)**(- gamma / 2) P0, k0, gamma = [.2, 5, 4] return P0 / ((1. + (k/k0)**2)**(gamma / 2))  %% Cell type:markdown id: tags: ## Wiener Filter: Implementation %% Cell type:markdown id: tags: ### Import Modules %% Cell type:code id: tags:  python import numpy as np np.random.seed(42) np.random.seed(40) import nifty4 as ift import matplotlib.pyplot as plt %matplotlib inline  %% Cell type:markdown id: tags: ### Implement Propagator %% Cell type:code id: tags:  python def PropagatorOperator(R, N, Sh): IC = ift.GradientNormController(name="inverter", iteration_limit=50000, tol_abs_gradnorm=0.1) inverter = ift.ConjugateGradient(controller=IC) D = (R.adjoint*N.inverse*R + Sh.inverse).inverse # MR FIXME: we can/should provide a preconditioner here as well! return ift.InversionEnabler(D, inverter) #return ift.library.wiener_filter_curvature.WienerFilterCurvature(R,N,Sh,inverter).inverse  %% Cell type:markdown id: tags: ### Conjugate Gradient Preconditioning - $D$ is defined via: $$D^{-1} = \mathcal F^\dagger S_h^{-1}\mathcal F + R^\dagger N^{-1} R.$$ In the end, we want to apply $D$ to $j$, i.e. we need the inverse action of $D^{-1}$. This is done numerically (algorithm: *Conjugate Gradient*). - One can define the *condition number* of a non-singular and normal matrix $A$: $$\kappa (A) := \frac{|\lambda_{\text{max}}|}{|\lambda_{\text{min}}|},$$ where $\lambda_{\text{max}}$ and $\lambda_{\text{min}}$ are the largest and smallest eigenvalue of $A$, respectively. - The larger $\kappa$ the slower Conjugate Gradient. - By default, conjugate gradient solves: $D^{-1} m = j$ for $m$, where $D^{-1}$ can be bad conditioned. If one knows a non-singular matrix $T$ for which $TD^{-1}$ is better conditioned, one can solve the equivalent problem: - By default, conjugate gradient solves: $D^{-1} m = j$ for $m$, where $D^{-1}$ can be badly conditioned. If one knows a non-singular matrix $T$ for which $TD^{-1}$ is better conditioned, one can solve the equivalent problem: $$\tilde A m = \tilde j,$$ where $\tilde A = T D^{-1}$ and $\tilde j = Tj$. - In our case $S^{-1}$ is responsible for the bad conditioning of $D$ depending on the chosen power spectrum. Thus, we choose $$T = \mathcal F^\dagger S_h^{-1} \mathcal F.$$ %% Cell type:markdown id: tags: ### Generate Mock data - Generate a field $s$ and $n$ with given covariances. - Calculate $d$. %% Cell type:code id: tags:  python s_space = ift.RGSpace(N_pixels) h_space = s_space.get_default_codomain() HT = ift.HarmonicTransformOperator(h_space, target=s_space) p_space = ift.PowerSpace(h_space) # Operators Sh = ift.create_power_operator(h_space, power_spectrum=pow_spec) R = HT #*ift.create_harmonic_smoothing_operator((h_space,), 0, 0.02) # Fields and data sh = ift.power_synthesize(ift.PS_field(p_space, pow_spec),real_signal=True) noiseless_data=R(sh) signal_to_noise = 5 noise_amplitude = noiseless_data.std()/signal_to_noise noise_amplitude = np.sqrt(0.05) N = ift.ScalingOperator(noise_amplitude**2, s_space) n = ift.Field.from_random(domain=s_space, random_type='normal', std=noise_amplitude, mean=0) ift.plot(n) std=noise_amplitude, mean=0) d = noiseless_data + n ift.plot(d) j = R.adjoint_times(N.inverse_times(d)) ift.plot(HT(j)) D = PropagatorOperator(R=R, N=N, Sh=Sh)  %% Cell type:markdown id: tags: ### Run Wiener Filter %% Cell type:code id: tags:  python m = D(j)  %% Cell type:markdown id: tags: ### Create Power Spectra of Signal and Reconstruction %% Cell type:code id: tags:  python s_power = ift.power_analyze(sh) m_power = ift.power_analyze(m) s_power_data = s_power.val.real m_power_data = m_power.val.real # Get signal data and reconstruction data s_data = HT(sh).val.real m_data = HT(m).val.real d_data = d.val.real  %% Cell type:markdown id: tags: ### Signal Reconstruction %% Cell type:code id: tags:  python plt.plot(s_data, 'k', label="Signal", alpha=.5, linewidth=.5) plt.plot(s_data, 'g', label="Signal") plt.plot(d_data, 'k+', label="Data") plt.plot(m_data, 'r', label="Reconstruction") plt.title("Reconstruction") plt.legend() plt.show()  %% Cell type:code id: tags:  python plt.figure() plt.plot(s_data - s_data, 'k', label="Signal", alpha=.5, linewidth=.5) plt.plot(s_data - s_data, 'g', label="Signal") plt.plot(d_data - s_data, 'k+', label="Data") plt.plot(m_data - s_data, 'r', label="Reconstruction") plt.axhspan(-noise_amplitude,noise_amplitude, facecolor='0.9', alpha=.5) plt.title("Residuals") plt.legend() plt.show()  %% Cell type:markdown id: tags: ### Power Spectrum %% Cell type:code id: tags:  python plt.loglog() plt.xlim(1, int(N_pixels/2)) ymin = min(m_power_data) plt.ylim(ymin, 1) xs = np.arange(1,int(N_pixels/2),.1) plt.plot(xs, pow_spec(xs), label="True Power Spectrum", linewidth=.7, color='k') plt.plot(s_power_data, 'k', label="Signal", alpha=.5, linewidth=.5) plt.plot(m_power_data, 'r', label="Reconstruction") plt.axhline(noise_amplitude**2 / N_pixels, color="k", linestyle='--', label="Noise level", alpha=.5) plt.axhspan(noise_amplitude**2 / N_pixels, ymin, facecolor='0.9', alpha=.5) plt.title("Power Spectrum") plt.legend() plt.show()  %% Cell type:markdown id: tags: ## Wiener Filter on Incomplete Data %% Cell type:code id: tags:  python # Operators Sh = ift.create_power_operator(h_space, power_spectrum=pow_spec) N = ift.ScalingOperator(noise_amplitude**2,s_space) # R is defined below # Fields sh = ift.power_synthesize(ift.PS_field(p_space,pow_spec),real_signal=True) s = HT(sh) n = ift.Field.from_random(domain=s_space, random_type='normal', std=noise_amplitude, mean=0)  %% Cell type:markdown id: tags: ### Partially Lose Data %% Cell type:code id: tags:  python l = int(N_pixels * 0.2) h = int(N_pixels * 0.2 * 2) mask = ift.Field(s_space, val=1) mask.val[ l : h] = 0 R = ift.DiagonalOperator(mask)*HT n.val[l:h] = 0 d = R(sh) + n  %% Cell type:code id: tags:  python D = PropagatorOperator(R=R, N=N, Sh=Sh) j = R.adjoint_times(N.inverse_times(d)) m = D(j)  %% Cell type:markdown id: tags: ### Compute Uncertainty %% Cell type:code id: tags:  python sc = ift.probing.utils.StatCalculator() IC = ift.GradientNormController(name="inverter", iteration_limit=50000, tol_abs_gradnorm=0.1) inverter = ift.ConjugateGradient(controller=IC) curv = ift.library.wiener_filter_curvature.WienerFilterCurvature(R,N,Sh,inverter) for i in range(200): sc.add(HT(curv.generate_posterior_sample())) m_var = sc.var  %% Cell type:markdown id: tags: ### Get data %% Cell type:code id: tags:  python s_power = ift.power_analyze(sh) m_power = ift.power_analyze(m) s_power_data = s_power.val.real m_power_data = m_power.val.real # Get signal data and reconstruction data s_data = s.val.real m_data = HT(m).val.real m_var_data = m_var.val.real uncertainty = np.sqrt(np.abs(m_var_data)) ift.plot(ift.sqrt(m_var)) d_data = d.val.real # Set lost data to NaN for proper plotting d_data[d_data == 0] = np.nan  %% Cell type:code id: tags:  python fig = plt.figure(figsize=(15,10)) plt.plot(s_data, 'k', label="Signal", alpha=.5, linewidth=1) plt.plot(d_data, 'k+', label="Data", alpha=1) plt.axvspan(l, h, facecolor='0.8', alpha=.5) plt.title("Incomplete Data") plt.legend()  %% Cell type:code id: tags:  python fig  %% Cell type:code id: tags:  python fig = plt.figure(figsize=(15,10)) plt.plot(s_data, 'k', label="Signal", alpha=1, linewidth=1) plt.plot(d_data, 'k+', label="Data", alpha=.5) plt.plot(m_data, 'r', label="Reconstruction") plt.axvspan(l, h, facecolor='0.8', alpha=.5) plt.fill_between(range(N_pixels), m_data - uncertainty, m_data + uncertainty, facecolor='0') plt.title("Reconstruction of incomplete data") plt.legend()  %% Cell type:code id: tags:  python fig  %% Cell type:markdown id: tags: # 2d Example %% Cell type:code id: tags:  python N_pixels = 256 # Number of pixels sigma2 = 1000 # Noise variance def pow_spec(k): P0, k0, gamma = [.2, 20, 4] return P0 * (1. + (k/k0)**2)**(- gamma / 2) s_space = ift.RGSpace([N_pixels, N_pixels])  %% Cell type:code id: tags:  python fft = FFTOperator(s_space) h_space = fft.target[0] p_space = PowerSpace(h_space) h_space = s_space.get_default_codomain() fft = ift.FFTOperator(s_space,h_space) p_space = ift.PowerSpace(h_space) # Operators Sh = create_power_operator(h_space, power_spectrum=pow_spec) N = DiagonalOperator(s_space, diagonal=sigma2, bare=True) R = SmoothingOperator(s_space, sigma=.01) D = PropagatorOperator(R=R, N=N, Sh=Sh) Sh = ift.create_power_operator(h_space, power_spectrum=pow_spec) N = ift.ScalingOperator(sigma2,s_space) R = ift.FFTSmoothingOperator(s_space, sigma=.01) #D = PropagatorOperator(R=R, N=N, Sh=Sh) # Fields and data sh = Field(p_space, val=pow_spec).power_synthesize(real_signal=True) sh = ift.power_synthesize(ift.PS_field(p_space,pow_spec),real_signal=True) s = fft.adjoint_times(sh) n = Field.from_random(domain=s_space, random_type='normal', n = ift.Field.from_random(domain=s_space, random_type='normal', std=np.sqrt(sigma2), mean=0) # Lose some data l = int(N_pixels * 0.2) h = int(N_pixels * 0.2 * 2) mask = Field(s_space, val=1) mask = ift.Field(s_space, val=1) mask.val[l:h,l:h] = 0 R = DiagonalOperator(s_space, diagonal = mask) R = ift.DiagonalOperator(mask) n.val[l:h, l:h] = 0 D = PropagatorOperator(R=R, N=N, Sh=Sh) D = PropagatorOperator(R=R, N=N, Sh=fft.inverse*Sh*fft) d = R(s) + n j = R.adjoint_times(N.inverse_times(d)) # Run Wiener filter m = D(j) # Uncertainty sc = ift.probing.utils.StatCalculator() IC = ift.GradientNormController(name="inverter", iteration_limit=50000, tol_abs_gradnorm=0.1) inverter = ift.ConjugateGradient(controller=IC) curv = ift.library.wiener_filter_curvature.WienerFilterCurvature(R,N,fft.inverse*Sh*fft,inverter) for i in range(20): sc.add(HT(curv.generate_posterior_sample())) m_var = sc.var diagProber = DiagonalProber(domain=s_space, probe_dtype=np.complex, probe_count=10) diagProber(D) m_var = Field(s_space, val=diagProber.diagonal.val).weight(-1) # Get data s_power = sh.power_analyze() m_power = fft(m).power_analyze() s_power_data = s_power.val.get_full_data().real m_power_data = m_power.val.get_full_data().real s_data = s.val.get_full_data().real m_data = m.val.get_full_data().real m_var_data = m_var.val.get_full_data().real d_data = d.val.get_full_data().real uncertainty = np.sqrt(np.abs(m_var_data))  %% Cell type:code id: tags:  python cm = ['magma', 'inferno', 'plasma', 'viridis'][1] mi = np.min(s_data) ma = np.max(s_data) fig, axes = plt.subplots(1, 2, figsize=(15, 7)) data = [s_data, d_data] caption = ["Signal", "Data"] for ax in axes.flat: im = ax.imshow(data.pop(0), interpolation='nearest', cmap=cm, vmin=mi, vmax=ma) ax.set_title(caption.pop(0)) fig.subplots_adjust(right=0.8) cbar_ax = fig.add_axes([0.85, 0.15, 0.05, 0.7]) fig.colorbar(im, cax=cbar_ax)  %% Cell type:code id: tags:  python fig  %% Cell type:code id: tags:  python mi = np.min(s_data) ma = np.max(s_data) fig, axes = plt.subplots(2, 2, figsize=(15, 15)) data = [s_data, m_data, s_data - m_data, uncertainty] caption = ["Signal", "Reconstruction", "Residuals", "Uncertainty Map"] for ax in axes.flat: im = ax.imshow(data.pop(0), interpolation='nearest', cmap=cm, vmin=mi, vmax=ma) ax.set_title(caption.pop(0)) fig.subplots_adjust(right=0.8) cbar_ax = fig.add_axes([.85, 0.15, 0.05, 0.7]) fig.colorbar(im, cax=cbar_ax)  %% Cell type:code id: tags:  python fig  %% Cell type:markdown id: tags: ### Is the uncertainty map reliable? %% Cell type:code id: tags:  python precise = (np.abs(s_data-m_data) < uncertainty ) print("Error within uncertainty map bounds: " + str(np.sum(precise) * 100 / N_pixels**2) + "%") fig = plt.figure() plt.imshow(precise.astype(float), cmap="brg") plt.colorbar() fig  %% Cell type:markdown id: tags: # Start Coding ## NIFTy Repository + Installation guide https://gitlab.mpcdf.mpg.de/ift/NIFTy commit 1d10be4674a42945f8548f3b68688bf0f0d753fe NIFTy v3 **not (yet) stable!** ... ...
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!