Skip to content
Snippets Groups Projects
Commit 209068e0 authored by Gordian Edenhofer's avatar Gordian Edenhofer
Browse files

Move `density_estimator` from demos/ to sugar.py

parent 74e620f8
No related branches found
No related tags found
1 merge request!620Density estimator sugar
Pipeline #101631 passed
......@@ -30,62 +30,6 @@ import numpy as np
import nifty7 as ift
def density_estimator(domain, pad=1.0, cf_fluctuations=None, cf_azm_uniform=None):
cf_azm_uniform_sane_default = (1e-4, 1.0)
cf_fluctuations_sane_default = {
"scale": (0.5, 0.3),
"cutoff": (4.0, 3.0),
"loglogslope": (-6.0, 3.0)
}
domain = ift.DomainTuple.make(domain)
dom_scaling = 1. + np.broadcast_to(pad, (len(domain.axes), ))
if cf_fluctuations is None:
cf_fluctuations = cf_fluctuations_sane_default
if cf_azm_uniform is None:
cf_azm_uniform = cf_azm_uniform_sane_default
domain_padded = []
for d_scl, d in zip(dom_scaling, domain):
if not isinstance(d, ift.RGSpace) or d.harmonic:
te = [f"unexpected domain encountered in `domain`: {domain}"]
te += "expected a non-harmonic `ift.RGSpace`"
raise TypeError("\n".join(te))
shape_padded = tuple((d_scl * np.array(d.shape)).astype(int))
domain_padded.append(ift.RGSpace(shape_padded, distances=d.distances))
domain_padded = ift.DomainTuple.make(domain_padded)
# Set up the signal model
azm_offset_mean = 0.0 # The zero-mode should be inferred only from the data
cfmaker = ift.CorrelatedFieldMaker("")
for i, d in enumerate(domain_padded):
if isinstance(cf_fluctuations, (list, tuple)):
cf_fl = cf_fluctuations[i]
else:
cf_fl = cf_fluctuations
cfmaker.add_fluctuations_matern(d, **cf_fl, prefix=f"ax{i}")
scalar_domain = ift.DomainTuple.scalar_domain()
uniform = ift.UniformOperator(scalar_domain, *cf_azm_uniform)
azm = uniform.ducktape("zeromode")
cfmaker.set_amplitude_total_offset(azm_offset_mean, azm)
correlated_field = cfmaker.finalize(0).clip(-10., 10.)
normalized_amplitudes = cfmaker.get_normalized_amplitudes()
domain_shape = tuple(d.shape for d in domain)
slc = ift.SliceOperator(correlated_field.target, domain_shape)
signal = ift.exp(slc @ correlated_field)
model_operators = {
"correlated_field": correlated_field,
"select_subset": slc,
"amplitude_total_offset": azm,
"normalized_amplitudes": normalized_amplitudes
}
return signal, model_operators
if __name__ == "__main__":
filename = "getting_started_density_{}.png"
ift.random.push_sseq_from_seed(42)
......@@ -97,7 +41,7 @@ if __name__ == "__main__":
sp2 = ift.RGSpace(npix2)
position_space = ift.DomainTuple.make((sp1, sp2))
signal, ops = density_estimator(position_space)
signal, ops = ift.density_estimator(position_space)
correlated_field = ops["correlated_field"]
data_space = signal.target
......
......@@ -32,14 +32,15 @@ from .operators.diagonal_operator import DiagonalOperator
from .operators.distributors import PowerDistributor
from .operators.operator import Operator
from .operators.sampling_enabler import SamplingDtypeSetter
from .operators.selection_operators import SliceOperator
from .operators.scaling_operator import ScalingOperator
from .plot import Plot
__all__ = ['PS_field', 'power_analyze', 'create_power_operator',
'create_harmonic_smoothing_operator', 'from_random',
'full', 'makeField',
'is_fieldlike', 'is_linearization', 'is_operator',
'makeDomain', 'get_signal_variance', 'makeOp', 'domain_union',
'density_estimator', 'create_harmonic_smoothing_operator',
'from_random', 'full', 'makeField', 'is_fieldlike',
'is_linearization', 'is_operator', 'makeDomain',
'get_signal_variance', 'makeOp', 'domain_union',
'get_default_codomain', 'single_plot', 'exec_time',
'calculate_position'] + list(pointwise.ptw_dict.keys())
......@@ -214,6 +215,66 @@ def create_power_operator(domain, power_spectrum, space=None):
return DiagonalOperator(field, domain, space)
def density_estimator(domain, pad=1.0, cf_fluctuations=None,
cf_azm_uniform=None):
from .domains.rg_space import RGSpace
from .library.correlated_fields import CorrelatedFieldMaker
from .library.special_distributions import UniformOperator
cf_azm_uniform_sane_default = (1e-4, 1.0)
cf_fluctuations_sane_default = {
"scale": (0.5, 0.3),
"cutoff": (4.0, 3.0),
"loglogslope": (-6.0, 3.0)
}
domain = DomainTuple.make(domain)
dom_scaling = 1. + np.broadcast_to(pad, (len(domain.axes), ))
if cf_fluctuations is None:
cf_fluctuations = cf_fluctuations_sane_default
if cf_azm_uniform is None:
cf_azm_uniform = cf_azm_uniform_sane_default
domain_padded = []
for d_scl, d in zip(dom_scaling, domain):
if not isinstance(d, RGSpace) or d.harmonic:
te = [f"unexpected domain encountered in `domain`: {domain}"]
te += "expected a non-harmonic `RGSpace`"
raise TypeError("\n".join(te))
shape_padded = tuple((d_scl * np.array(d.shape)).astype(int))
domain_padded.append(RGSpace(shape_padded, distances=d.distances))
domain_padded = DomainTuple.make(domain_padded)
# Set up the signal model
azm_offset_mean = 0.0 # The zero-mode should be inferred only from the data
cfmaker = CorrelatedFieldMaker("")
for i, d in enumerate(domain_padded):
if isinstance(cf_fluctuations, (list, tuple)):
cf_fl = cf_fluctuations[i]
else:
cf_fl = cf_fluctuations
cfmaker.add_fluctuations_matern(d, **cf_fl, prefix=f"ax{i}")
scalar_domain = DomainTuple.scalar_domain()
uniform = UniformOperator(scalar_domain, *cf_azm_uniform)
azm = uniform.ducktape("zeromode")
cfmaker.set_amplitude_total_offset(azm_offset_mean, azm)
correlated_field = cfmaker.finalize(0).clip(-10., 10.)
normalized_amplitudes = cfmaker.get_normalized_amplitudes()
domain_shape = tuple(d.shape for d in domain)
slc = SliceOperator(correlated_field.target, domain_shape)
signal = (slc @ correlated_field).exp()
model_operators = {
"correlated_field": correlated_field,
"select_subset": slc,
"amplitude_total_offset": azm,
"normalized_amplitudes": normalized_amplitudes
}
return signal, model_operators
def create_harmonic_smoothing_operator(domain, space, sigma):
"""Creates an operator which smoothes a subspace of a harmonic domain.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment