Commit 3d3e41dc authored by Philipp Frank's avatar Philipp Frank

utility functions for fluctuations

parent e01ad3a5
Pipeline #63362 passed with stages
in 5 minutes and 48 seconds
...@@ -5,9 +5,11 @@ np.random.seed(42) ...@@ -5,9 +5,11 @@ np.random.seed(42)
def testAmplitudesConsistency(seed, sspace): def testAmplitudesConsistency(seed, sspace):
offset_std = 40 offset_std = 30
intergated_fluct_std0 = 10. intergated_fluct_std0 = .003
intergated_fluct_std1 = 2. intergated_fluct_std1 = 0.1
nsam = 1000
hspace = sspace.get_default_codomain() hspace = sspace.get_default_codomain()
target0 = ift.PowerSpace(hspace) target0 = ift.PowerSpace(hspace)
...@@ -23,51 +25,29 @@ def testAmplitudesConsistency(seed, sspace): ...@@ -23,51 +25,29 @@ def testAmplitudesConsistency(seed, sspace):
-4, 1., 'freq') -4, 1., 'freq')
op = fa.finalize(offset_std, 1E-8, '') op = fa.finalize(offset_std, 1E-8, '')
flucts = [intergated_fluct_std0, intergated_fluct_std1] samples = [ift.from_random('normal',op.domain) for _ in range(nsam)]
tot_flm, totflsig = fa.effective_total_fluctuation(flucts, [1E-8, 1E-8]) tot_flm, _ = fa.stats(fa.total_fluctuation,samples)
offset_std,_ = fa.stats(fa.amplitude_total_offset,samples)
space = op.target intergated_fluct_std0,_ = fa.stats(fa.average_fluctuation(0),samples)
totaltoalvol = 1. intergated_fluct_std1,_ = fa.stats(fa.average_fluctuation(1),samples)
for s in space[:]:
totaltoalvol *= s.total_volume slice_fluct_std0,_ = fa.stats(fa.slice_fluctuation(0),samples)
slice_fluct_std1,_ = fa.stats(fa.slice_fluctuation(1),samples)
nsam = 1000
sams = [op(s) for s in samples]
zm_std_mean = 0. fluct_total = fa.total_fluctuation_realized(sams)
fluct_space = 0. fluct_space = fa.average_fluctuation_realized(sams,0)
fluct_freq = 0. fluct_freq = fa.average_fluctuation_realized(sams,1)
fluct_total = 0. zm_std_mean = fa.offset_amplitude_realized(sams)
sl_fluct_space = fa.slice_fluctuation_realized(sams,0)
for i in range(nsam): sl_fluct_freq = fa.slice_fluctuation_realized(sams,1)
x = ift.from_random('normal', op.domain)
res = op(x)
zm = res.integrate()/totaltoalvol
zm2 = res.mean()
fl = ((res - zm)**2).integrate()/totaltoalvol
zm_std_mean += zm**2
fluct_total += fl
r = res.integrate(1)/fsspace.total_volume
r0 = r.integrate()/sspace.total_volume
tm = ((r - r0)**2).integrate()/sspace.total_volume
fluct_space += tm
fr = res.integrate(0)/sspace.total_volume
fr0 = fr.integrate()/fsspace.total_volume
ftm = ((fr - fr0)**2).integrate()/fsspace.total_volume
fluct_freq += ftm
fluct_total = np.sqrt(fluct_total/nsam)
fluct_space = np.sqrt(fluct_space/nsam)
fluct_freq = np.sqrt(fluct_freq/nsam)
zm_std_mean = np.sqrt(zm_std_mean/nsam)
np.testing.assert_allclose(offset_std, zm_std_mean, rtol=0.5) np.testing.assert_allclose(offset_std, zm_std_mean, rtol=0.5)
np.testing.assert_allclose(intergated_fluct_std0, fluct_space, rtol=0.5) np.testing.assert_allclose(intergated_fluct_std0, fluct_space, rtol=0.5)
np.testing.assert_allclose(intergated_fluct_std1, fluct_freq, rtol=0.5)
np.testing.assert_allclose(tot_flm, fluct_total, rtol=0.5) np.testing.assert_allclose(tot_flm, fluct_total, rtol=0.5)
np.testing.assert_allclose(slice_fluct_std0, sl_fluct_space, rtol=0.5)
np.testing.assert_allclose(slice_fluct_std1, sl_fluct_freq, rtol=0.5)
print("Expected offset Std: " + str(offset_std)) print("Expected offset Std: " + str(offset_std))
print("Estimated offset Std: " + str(zm_std_mean)) print("Estimated offset Std: " + str(zm_std_mean))
...@@ -79,6 +59,14 @@ def testAmplitudesConsistency(seed, sspace): ...@@ -79,6 +59,14 @@ def testAmplitudesConsistency(seed, sspace):
print("Expected integrated fluct. frequency Std: " + print("Expected integrated fluct. frequency Std: " +
str(intergated_fluct_std1)) str(intergated_fluct_std1))
print("Estimated integrated fluct. frequency Std: " + str(fluct_freq)) print("Estimated integrated fluct. frequency Std: " + str(fluct_freq))
print("Expected slice fluct. space Std: " +
str(slice_fluct_std0))
print("Estimated slice fluct. space Std: " + str(sl_fluct_space))
print("Expected slice fluct. frequency Std: " +
str(slice_fluct_std1))
print("Estimated slice fluct. frequency Std: " + str(sl_fluct_freq))
print("Expected total fluct. Std: " + str(tot_flm)) print("Expected total fluct. Std: " + str(tot_flm))
print("Estimated total fluct. Std: " + str(fluct_total)) print("Estimated total fluct. Std: " + str(fluct_total))
......
...@@ -32,6 +32,7 @@ from ..operators.operator import Operator ...@@ -32,6 +32,7 @@ from ..operators.operator import Operator
from ..operators.simple_linear_operators import VdotOperator, ducktape from ..operators.simple_linear_operators import VdotOperator, ducktape
from ..operators.value_inserter import ValueInserter from ..operators.value_inserter import ValueInserter
from ..sugar import from_global_data, full, makeDomain from ..sugar import from_global_data, full, makeDomain
from ..probing import StatCalculator
def _lognormal_moments(mean, sig): def _lognormal_moments(mean, sig):
...@@ -207,12 +208,14 @@ class _Amplitude(Operator): ...@@ -207,12 +208,14 @@ class _Amplitude(Operator):
adder = Adder(from_global_data(target, mask)) adder = Adder(from_global_data(target, mask))
op = adder @ ((expander @ fluctuations)*normal_ampl) op = adder @ ((expander @ fluctuations)*normal_ampl)
self.apply = op.apply self.apply = op.apply
self.fluctuation_amplitude = fluctuations
self._domain, self._target = op.domain, op.target self._domain, self._target = op.domain, op.target
class CorrelatedFieldMaker: class CorrelatedFieldMaker:
def __init__(self): def __init__(self):
self._a = [] self._a = []
self._azm = None
def add_fluctuations(self, def add_fluctuations(self,
target, target,
...@@ -256,6 +259,7 @@ class CorrelatedFieldMaker: ...@@ -256,6 +259,7 @@ class CorrelatedFieldMaker:
def finalize_from_op(self, zeromode, prefix=''): def finalize_from_op(self, zeromode, prefix=''):
assert isinstance(zeromode, Operator) assert isinstance(zeromode, Operator)
self._azm = zeromode
hspace = makeDomain([dd.target[0].harmonic_partner for dd in self._a]) hspace = makeDomain([dd.target[0].harmonic_partner for dd in self._a])
foo = np.ones(hspace.shape) foo = np.ones(hspace.shape)
zeroind = len(hspace.shape)*(0,) zeroind = len(hspace.shape)*(0,)
...@@ -305,17 +309,91 @@ class CorrelatedFieldMaker: ...@@ -305,17 +309,91 @@ class CorrelatedFieldMaker:
def amplitudes(self): def amplitudes(self):
return self._a return self._a
def effective_total_fluctuation(self, @property
fluctuations_means, def amplitude_total_offset(self):
fluctuations_stddevs, return self._azm
nsamples=100):
namps = len(fluctuations_means) @property
xis = np.random.normal(size=namps*nsamples).reshape((namps, nsamples)) def total_fluctuation(self):
q = np.ones(nsamples) if len(self._a) == 0:
for i in range(len(fluctuations_means)): raise(NotImplementedError)
m, sig = _lognormal_moments(fluctuations_means[i], if len(self._a) == 1:
fluctuations_stddevs[i]) return self._a[0].fluctuation_amplitude
f = np.exp(m + sig*xis[i]) q = 1.
q *= (1. + f**2) for a in self._a:
q = np.sqrt(q - 1.) fl = a.fluctuation_amplitude
return np.mean(q), np.std(q) q = q * (Adder(full(fl.target,1.)) @ fl**2)
return (Adder(full(q.target,-1.)) @ q).sqrt()
def slice_fluctuation(self,space):
if len(self._a) == 0:
raise(NotImplementedError)
assert space < len(self._a)
if len(self._a) == 1:
return self._a[0].fluctuation_amplitude
q = 1.
for j in range(len(self._a)):
fl = self._a[j].fluctuation_amplitude
if j == space:
q = q * fl**2
else:
q = q * (Adder(full(fl.target,1.)) @ fl**2)
return q.sqrt()
def average_fluctuation(self,space):
if len(self._a) == 0:
raise(NotImplementedError)
assert space < len(self._a)
if len(self._a) == 1:
return self._a[0].fluctuation_amplitude
return self._a[space].fluctuation_amplitude
def offset_amplitude_realized(self,samples):
res = 0.
for s in samples:
res += s.mean()**2
return np.sqrt(res/len(samples))
def total_fluctuation_realized(self,samples):
res = 0.
for s in samples:
res = res + (s-s.mean())**2
res = res/len(samples)
return np.sqrt(res.mean())
def average_fluctuation_realized(self,samples,space):
ldom = len(samples[0].domain)
assert space < ldom
if ldom == 1:
return self.total_fluctuation_realized(samples)
spaces=()
for i in range(ldom):
if i != space:
spaces += (i,)
res = 0.
for s in samples:
r = s.mean(spaces)
res = res + (r-r.mean())**2
res = res/len(samples)
return np.sqrt(res.mean())
def slice_fluctuation_realized(self,samples,space):
ldom = len(samples[0].domain)
assert space < ldom
if ldom == 1:
return self.total_fluctuation_realized(samples)
res1 = 0.
res2 = 0.
for s in samples:
res1 = res1 + s**2
res2 = res2 + s.mean(space)**2
res1 = res1/len(samples)
res2 = res2/len(samples)
res = res1.mean() - res2.mean()
return np.sqrt(res)
def stats(self,op,samples):
sc = StatCalculator()
for s in samples:
sc.add(op(s.extract(op.domain)))
return sc.mean.to_global_data(), sc.var.sqrt().to_global_data()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment