-
Martin Reinecke authoredMartin Reinecke authored
nonlinear_critical_filter.py 4.25 KiB
import nifty4 as ift
from nifty4.library.nonlinearities import Exponential
import numpy as np
np.random.seed(42)
def adjust_zero_mode(m0, t0):
mtmp = m0.to_global_data().copy()
zero_position = len(m0.shape)*(0,)
zero_mode = mtmp[zero_position]
mtmp[zero_position] = zero_mode / abs(zero_mode)
ttmp = t0.to_global_data().copy()
ttmp[0] += 2 * np.log(abs(zero_mode))
return (ift.Field.from_global_data(m0.domain, mtmp),
ift.Field.from_global_data(t0.domain, ttmp))
if __name__ == "__main__":
noise_level = 1.
p_spec = (lambda k: (1. / (k + 1) ** 2))
# nonlinearity = Linear()
nonlinearity = Exponential()
# Set up position space
# s_space = ift.RGSpace([1024])
s_space = ift.HPSpace(32)
h_space = s_space.get_default_codomain()
# Define harmonic transformation and associated harmonic space
HT = ift.HarmonicTransformOperator(h_space, target=s_space)
# Setting up power space
p_space = ift.PowerSpace(h_space,
binbounds=ift.PowerSpace.useful_binbounds(
h_space, logarithmic=True))
# Choosing the prior correlation structure and defining
# correlation operator
p = ift.PS_field(p_space, p_spec)
log_p = ift.log(p)
S = ift.create_power_operator(h_space, lambda k: 1.)
# Drawing a sample sh from the prior distribution in harmonic space
sh = S.draw_sample()
# Choosing the measurement instrument
# Instrument = SmoothingOperator(s_space, sigma=0.01)
mask = np.ones(s_space.shape)
mask[6000:8000] = 0.
mask = ift.Field.from_global_data(s_space, mask)
MaskOperator = ift.DiagonalOperator(mask)
R = ift.GeometryRemover(s_space)
R = R*MaskOperator
# R = R*HT
# R = R * ift.create_harmonic_smoothing_operator((harmonic_space,), 0,
# response_sigma)
MeasurementOperator = R
d_space = MeasurementOperator.target
Distributor = ift.PowerDistributor(target=h_space, power_space=p_space)
power = Distributor(ift.exp(0.5*log_p))
# Creating the mock data
true_sky = nonlinearity(HT(power*sh))
noiseless_data = MeasurementOperator(true_sky)
noise_amplitude = noiseless_data.val.std()*noise_level
N = ift.ScalingOperator(noise_amplitude**2, d_space)
n = N.draw_sample()
# Creating the mock data
d = noiseless_data + n
m0 = ift.full(h_space, 1e-7)
t0 = ift.full(p_space, -4.)
power0 = Distributor.times(ift.exp(0.5 * t0))
IC1 = ift.GradientNormController(name="IC1", iteration_limit=100,
tol_abs_gradnorm=1e-3)
LS = ift.LineSearchStrongWolfe(c2=0.02)
minimizer = ift.RelaxedNewton(IC1, line_searcher=LS)
ICI = ift.GradientNormController(iteration_limit=500,
tol_abs_gradnorm=1e-3)
inverter = ift.ConjugateGradient(controller=ICI)
for i in range(20):
power0 = Distributor(ift.exp(0.5*t0))
map0_energy = ift.library.NonlinearWienerFilterEnergy(
m0, d, MeasurementOperator, nonlinearity, HT, power0, N, S,
inverter=inverter)
# Minimization with chosen minimizer
map0_energy, convergence = minimizer(map0_energy)
m0 = map0_energy.position
# Updating parameters for correlation structure reconstruction
D0 = map0_energy.curvature
# Initializing the power energy with updated parameters
power0_energy = ift.library.NonlinearPowerEnergy(
position=t0, d=d, N=N, xi=m0, D=D0, ht=HT,
Instrument=MeasurementOperator, nonlinearity=nonlinearity,
Distributor=Distributor, sigma=1., samples=2, inverter=inverter)
power0_energy = minimizer(power0_energy)[0]
# Setting new power spectrum
t0 = power0_energy.position
# break degeneracy between amplitude and excitation by setting
# excitation monopole to 1
m0, t0 = adjust_zero_mode(m0, t0)
plotdict = {"colormap": "Planck-like"}
ift.plot(true_sky, name="true_sky.png", **plotdict)
ift.plot(nonlinearity(HT(power0*m0)),
name="reconstructed_sky.png", **plotdict)
ift.plot(MeasurementOperator.adjoint_times(d), name="data.png", **plotdict)
ift.plot([ift.exp(t0), p], name="ps.png")