Skip to content
Snippets Groups Projects

WIP: Issue173 wiener filter demos

Closed Philipp Arras requested to merge (removed):issue173_wienerFilterDemos into master
Files
2
import numpy as np
from nifty import RGSpace, PowerSpace, Field, FFTOperator, ComposedOperator,\
DiagonalOperator, ResponseOperator, plotting,\
create_power_operator
import nifty as ift
from nifty.library import WienerFilterCurvature
if __name__ == "__main__":
#####################
# Set up parameters #
#####################
distribution_strategy = 'not'
# Typical distance over which the field is correlated
correlation_length = 0.01
# Variance of field in position space sqrt(<|s_x|^2>)
field_variance = 2.
# smoothing length of response (in same unit as L)
response_sigma = 0.1
# The signal to noise ratio
signal_to_noise = 0.7
# Setting up variable parameters
# Typical distance over which the field is correlated
correlation_length = 0.01
# Variance of field in position space sqrt(<|s_x|^2>)
field_variance = 2.
# smoothing length of response (in same unit as L)
response_sigma = 0.1
# The signal to noise ratio
signal_to_noise = 0.7
def power_spectrum(k):
a = 4 * correlation_length * field_variance**2
return a / (1 + k * correlation_length) ** 4
# note that field_variance**2 = a*k_0/4. for this analytic form of power
# spectrum
def power_spectrum(k):
a = 4 * correlation_length * field_variance**2
return a / (1 + k * correlation_length) ** 4
# Setting up the geometry
###################
# Set up geometry #
###################
# Total side-length of the domain
L = 2.
# Grid resolution (pixels per axis)
N_pixels = 512
# Total side-length of the domain
L = 2.
# Grid resolution (pixels per axis)
N_pixels = 512
signal_space = RGSpace([N_pixels, N_pixels], distances=L/N_pixels)
harmonic_space = FFTOperator.get_default_codomain(signal_space)
fft = FFTOperator(harmonic_space, target=signal_space,
signal_space = ift.RGSpace([N_pixels, N_pixels], distances=L/N_pixels)
harmonic_space = ift.FFTOperator.get_default_codomain(signal_space)
fft = ift.FFTOperator(harmonic_space, target=signal_space,
domain_dtype=np.complex, target_dtype=np.float)
power_space = PowerSpace(harmonic_space,
distribution_strategy=distribution_strategy)
# Creating the mock data
S = create_power_operator(harmonic_space, power_spectrum=power_spectrum,
distribution_strategy=distribution_strategy)
mock_power = Field(power_space, val=power_spectrum,
distribution_strategy=distribution_strategy)
np.random.seed(43)
mock_harmonic = mock_power.power_synthesize(real_signal=True)
mock_signal = fft(mock_harmonic)
R = ResponseOperator(signal_space, sigma=(response_sigma,))
data_domain = R.target[0]
R_harmonic = ComposedOperator([fft, R], default_spaces=[0, 0])
N = DiagonalOperator(data_domain,
diagonal=mock_signal.var()/signal_to_noise,
bare=True)
noise = Field.from_random(domain=data_domain,
random_type='normal',
power_space = ift.PowerSpace(harmonic_space)
####################
# Create mock data #
####################
# Signal
S = ift.create_power_operator(harmonic_space, power_spectrum=power_spectrum)
mock_power = ift.Field(power_space, val=power_spectrum)
mock_harmonic = mock_power.power_synthesize(real_signal=True)
mock_signal = fft(mock_harmonic)
# Response Operator
R = ift.ResponseOperator(signal_space, sigma=[response_sigma, ])
data_domain = R.target[0]
# The response operator is defined acting on harmonic space. On RGSpace this is
# not necessary. However, on the sphere the signal space and its conjugate
# harmonic space do not the same number of degrees of freedom. Therefore, one
# should always reconstruct in harmonic space and transform back to signal
# space after the reconstruction.
R_harmonic = ift.ComposedOperator([fft, R], default_spaces=[0, 0])
# Noise
N = ift.DiagonalOperator(data_domain,
diagonal=mock_signal.var()/signal_to_noise, bare=True)
noise = ift.Field.from_random(domain=data_domain, random_type='normal',
std=mock_signal.std()/np.sqrt(signal_to_noise),
mean=0)
data = R(mock_signal) + noise
# Wiener filter
# Data
data = R(mock_signal) + noise
j = R_harmonic.adjoint_times(N.inverse_times(data))
#######################
# Apply Wiener filter #
#######################
# Note that the constructor of WienerFilterCurvature takes S acting on harmonic
# space, N acting on data space and R mapping from harmonic space to data
# space.
wiener_curvature = WienerFilterCurvature(S=S, N=N, R=R_harmonic)
# m lives in harmonic space.
m = wiener_curvature.inverse_times(j)
# Finally, transform back to signal space.
m_s = fft(m)
###########################
# Compute uncertainty map #
###########################
j = R_harmonic.adjoint_times(N.inverse_times(data))
wiener_curvature = WienerFilterCurvature(S=S, N=N, R=R_harmonic)
m = wiener_curvature.inverse_times(j)
m_s = fft(m)
class DiagonalProber(ift.DiagonalProberMixin, ift.Prober):
def __init__(self, *args, **kwargs):
super(DiagonalProber, self).__init__(*args, **kwargs)
plotter = plotting.RG2DPlotter()
plotter.title = 'mock_signal.html';
plotter(mock_signal)
plotter.title = 'data.html'
plotter(Field(signal_space,
diagProber = DiagonalProber(domain=signal_space, probe_dtype=np.complex)
diagProber(lambda x: fft(wiener_curvature.inverse_times(fft.adjoint_times(x))))
m_var = ift.Field(signal_space, val=diagProber.diagonal.val.real).weight(-1)
m_error = ift.sqrt(m_var)
################
# Plot results #
################
plotter = ift.plotting.RG2DPlotter(path="signal.html")
plotter.title = 'Signal'
plotter(mock_signal)
plotter = ift.plotting.RG2DPlotter(path="data.html")
plotter.title = 'Data'
plotter(ift.Field(signal_space,
val=data.val.get_full_data().reshape(signal_space.shape)))
plotter.title = 'map.html'; plotter(m_s)
\ No newline at end of file
plotter = ift.plotting.RG2DPlotter(path="reconstruction.html")
plotter.title = 'Reconstruction'
plotter(m_s)
plotter = ift.plotting.RG2DPlotter(path="uncertainty.html")
plotter.title = 'Uncertainty map'
plotter(m_error)
Loading