Commit df578915 authored by Jakob Knollmueller's avatar Jakob Knollmueller

polishing

parent 3213a41c
import nifty5 as ift
import numpy as np
# from nifty5.library.nonlinearities import Exponential
#DEFINE THE SIGNAL:
#Define signal space as a regular grid
#s_space = ift.RGSpace(1024)
# s_space = ift.RGSpace([128,128])
s_space = ift.HPSpace(128)
#Define the harmonic space
h_space = s_space.get_default_codomain()
def get_2D_exposure():
x_shape, y_shape = position_space.shape
#Prepare Harmonic transformation between the two spaces
HT = ift.HarmonicTransformOperator(h_space, s_space)
exposure = np.ones(position_space.shape)
exposure[x_shape/3:x_shape/2,:] *= 2.
exposure[x_shape*4/5:x_shape,:] *= .1
exposure[x_shape/2:x_shape*3/2,:] *=3.
exposure[:,x_shape / 3:x_shape / 2] *= 2.
exposure[:,x_shape * 4 / 5:x_shape] *= .1
exposure[:,x_shape / 2:x_shape * 3 / 2] *= 3.
#Define domain
domain = ift.MultiDomain.make({'xi': h_space})
exposure = ift.Field(position_space, val=exposure)
return exposure
#Define positions from a Gaussian distribution
position = ift.from_random('normal', domain)
Nsamples = 5
#Define a power spectrum
def sqrtpspec(k):
return 10. / (20.+k**2)
#Define a power space
p_space = ift.PowerSpace(h_space)
if __name__ == '__main__':
# ABOUT THIS CODE
np.random.seed(42)
#Define the power distribution between the harmonic and the power spaces
pd = ift.PowerDistributor(h_space, p_space)
# Set up the position space of the signal
#Create a field with the defined power spectrum
a = ift.PS_field(p_space, sqrtpspec)
# # One dimensional regular grid with uniform exposure
# position_space = ift.RGSpace(1024)
# exposure = np.ones(position_space.shape)
#Define the amplitudes
A = pd(a)
#Unpack the positions xi from the Multifield
xi = ift.Variable(position)['xi']
# Two dimensional regular grid with inhomogeneous exposure
position_space = ift.RGSpace([512, 512])
exposure = get_2D_exposure()
#Multiply the positions by the amplitudes in the harmonic domain
logsky_h = xi * A
# # Sphere with with uniform exposure
# position_space = ift.HPSpace(128)
# exposure = np.ones(position_space.shape)
#Transform to the real domain
logsky = HT(logsky_h)
# Defining harmonic space and transform
harmonic_space = position_space.get_default_codomain()
HT = ift.HarmonicTransformOperator(harmonic_space, position_space)
#Create a sky model by applying the exponential (Poisson)
sky = ift.PointwiseExponential(logsky)
domain = ift.MultiDomain.make({'xi': harmonic_space})
position = ift.from_random('normal', domain)
#DEFINE THE RESPONSE OPERATOR:
# Define power spectrum and amplitudes
def sqrtpspec(k):
return 1. / (20. + k**2)
#Define a mask to cover a patch of the real space
exposure = 1*np.ones(s_space.shape)
# exposure[int(s_space.shape[0]/3):int(s_space.shape[0]/3+10)] = 10.
p_space = ift.PowerSpace(harmonic_space)
pd = ift.PowerDistributor(harmonic_space, p_space)
a = ift.PS_field(p_space, sqrtpspec)
A = pd(a)
#Convert the mask into a field
exposure = ift.Field(s_space,val=exposure)
# Set up a sky model
xi = ift.Variable(position)['xi']
logsky_h = xi * A
logsky = HT(logsky_h)
sky = ift.PointwiseExponential(logsky)
#Create a diagonal matrix corresponding to the mask
E = ift.DiagonalOperator(exposure)
exposure = ift.Field(position_space, val=exposure)
M = ift.DiagonalOperator(exposure)
GR = ift.GeometryRemover(position_space)
# Set up instrumental response
R = GR * M
#Create the response operator and apply the mask on it
R = ift.GeometryRemover(s_space) * E
# Generate mock data
d_space = R.target[0]
lamb = R(sky)
mock_position = ift.from_random('normal', lamb.position.domain)
data = np.random.poisson(lamb.at(mock_position).value.val.astype(np.float64))
data = ift.Field.from_local_data(d_space, data)
#CREATE THE MOCK DATA:
# Compute likelihood and Hamiltonian
position = ift.from_random('normal', lamb.position.domain)
likelihood = ift.PoissonianEnergy(lamb, data)
ic_cg = ift.GradientNormController(iteration_limit=50)
ic_newton = ift.GradientNormController(name='Newton',iteration_limit=50, tol_abs_gradnorm=1e-3)
minimizer = ift.RelaxedNewton(ic_newton)
#Define the data space
d_space = R.target[0]
# Minimize the Hamiltonian
H = ift.Hamiltonian(likelihood, ic_cg)
H, convergence = minimizer(H)
#Apply the response operator to the signal
#lamb corresponds to the mean in the Poisson distribution
lamb = R(sky)
# Plot results
result_sky = sky.at(H.position).value
#Draw coordinates of the mock data from a Gaussian distribution
mock_position = ift.from_random('normal', lamb.position.domain)
#Generate mock data from a Poisson distribution using lamb as a mean
data = np.random.poisson(lamb.at(mock_position).value.val.astype(np.float64))
#Store the data as a field
data = ift.Field.from_local_data(d_space, data)
#RECONSTRUCT THE SIGNAL:
#Define the positions where we perform the analysis from a Gaussian distribution
position = ift.from_random('normal', lamb.position.domain)
#Define the Poisson likelihood knowing the mean and the data
likelihood = ift.library.PoissonLogLikelihood(lamb, data)
#Define a iteration controller with a maximum number of iterations
ic_cg = ift.GradientNormController(iteration_limit=50)
#Define a iteration controller for the minimizer
ic_newton = ift.GradientNormController(name='Newton', tol_abs_gradnorm=1e-3)
minimizer = ift.RelaxedNewton(ic_newton)
#Build the Hamiltonian
H = ift.Hamiltonian(likelihood, ic_cg)
H, convergence = minimizer(H)
#PLOT RESULTS:
#Evaluate lambda at final position
lamb_recontructed = lamb.at(H.position).value
sky_reconstructed = sky.at(H.position).value
#Evaluate lambda at the data position for comparison
lamb_mock = lamb.at(mock_position).value
#Plot the data, reconstruction and underlying signal
ift.plot([data, lamb_mock, lamb_recontructed], name="poisson.png",
label=['Data', 'Mock signal', 'Reconstruction'],
alpha=[.5, 1, 1])
#Plot power spectrum for posterior test
a_mock = a
a_recon = a
ift.plot([a_mock**2, a_recon**2, ift.power_analyze(logsky_h.at(KL.position).value)],
name='power_spectrum.png', label=['Mock', 'Reconstruction', 'power_analyze'])
import nifty5 as ift
from nifty5.library.los_response import LOSResponse
from nifty5.library.apply_data import ApplyData
from nifty5.library.unit_log_gauss import UnitLogGauss
from nifty5.library.amplitude_model import make_amplitude_model
from nifty5.library.smooth_sky import make_correlated_field
import numpy as np
......@@ -18,33 +16,64 @@ if __name__ == '__main__':
np.random.seed(41)
position_space = ift.RGSpace([128,128])
A, __ = make_amplitude_model(position_space,16, 1, 10, -4., 1, 0., 1.)
log_signal, _ = make_correlated_field(position_space,A)
signal = ift.PointwisePositiveTanh(log_signal)
LOS_starts, LOS_ends = get_random_LOS(100)
# Setting up an amplitude model
A, amplitude_internals = make_amplitude_model(position_space,16, 1, 10, -4., 1, 0., 1.)
# Building the model for a correlated signal
harmonic_space = position_space.get_default_codomain()
ht = ift.HarmonicTransformOperator(harmonic_space, position_space)
power_space = A.value.domain[0]
power_distributor = ift.PowerDistributor(harmonic_space, power_space)
position = {}
position['xi'] = ift.Field.from_random('normal', harmonic_space)
position = ift.MultiField(position)
xi = ift.Variable(position)['xi']
Amp = power_distributor(A)
correlated_field_h = Amp * xi
correlated_field = ht(correlated_field_h)
# # alternatively to the block above one can do:
# correlated_field, _ = make_correlated_field(position_space,A)
# apply some nonlinearity
signal = ift.PointwisePositiveTanh(correlated_field)
# Building the Line of Sight response
LOS_starts, LOS_ends = get_random_LOS(1000)
R = LOSResponse(position_space, starts=LOS_starts, ends=LOS_ends)
data_space = R.target
# build signal response model and model likelihood
signal_response = R(signal)
# specify noise
data_space = R.target
noise = .001
N = ift.ScalingOperator(noise,data_space)
MOCK_POSITION = ift.from_random('normal', signal.position.domain)
# generate mock data
MOCK_POSITION = ift.from_random('normal', signal.position.domain)
data = signal_response.at(MOCK_POSITION).value + N.draw_sample()
NWR = ApplyData(data,ift.Field(data_space,val=noise), signal_response)
# set up model likelihood
likelihood = ift.GaussianEnergy(signal_response, mean=data, covariance=N)
# set up minimization and inversion schemes
ic_cg = ift.GradientNormController(iteration_limit=10)
ic_sampling = ift.GradientNormController(iteration_limit=100)
ic_newton = ift.GradientNormController(name='Newton', iteration_limit=100)
minimizer = ift.RelaxedNewton(ic_newton)
likelihood = UnitLogGauss(NWR,ic_cg)
# build model Hamiltonian
H = ift.Hamiltonian(likelihood,ic_cg,iteration_controller_sampling=ic_sampling)
N_samples = 20
IC2 = ift.GradientNormController(name='Newton', iteration_limit=100)
minimizer = ift.RelaxedNewton(IC2)
INITIAL_POSITION = ift.from_random('normal',H.position.domain)
position = INITIAL_POSITION
ift.plot(signal.at(MOCK_POSITION).value,name='truth.pdf')
ift.plot(R.adjoint_times(data),name='data.pdf')
ift.plot([ A.at(MOCK_POSITION).value], name='power.pdf')
# number of samples used to estimate the KL
N_samples = 10
for i in range(5):
H = H.at(position)
samples = [H.curvature.draw_sample(from_inverse=True) for _ in range(N_samples)]
......@@ -56,6 +85,7 @@ if __name__ == '__main__':
ift.plot(signal.at(position).value,name='reconstruction.pdf')
ift.plot([A.at(position).value, A.at(MOCK_POSITION).value],name='power.pdf')
avrg = 0.
va = 0.
powers = []
......
......@@ -3,5 +3,6 @@ from .apply_data import ApplyData
from .gaussian_energy import GaussianEnergy
from .los_response import LOSResponse
from .point_sources import PointSources
from .poissonian_energy import PoissonianEnergy
from .wiener_filter_curvature import WienerFilterCurvature
from .wiener_filter_energy import WienerFilterEnergy
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment