test_correlated_fields.py 6.61 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
Philipp Arras's avatar
Philipp Arras committed
14
# Copyright(C) 2013-2020 Max-Planck-Society
15
16
17
#
# NIFTy is being developed at the Max-Planck-Institut fuer Astrophysik.

18
import numpy as np
19
import pytest
Philipp Arras's avatar
Philipp Arras committed
20
from numpy.testing import assert_, assert_allclose
21

Martin Reinecke's avatar
Martin Reinecke committed
22
import nifty7 as ift
Philipp Arras's avatar
Philipp Arras committed
23

24
from ..common import setup_function, teardown_function
25

Philipp Arras's avatar
Philipp Arras committed
26
pmp = pytest.mark.parametrize
27
28
29
30
31
32
spaces = [
    ift.RGSpace(4),
    ift.RGSpace((4, 4), (0.123, 0.4)),
    ift.HPSpace(8),
    ift.GLSpace(4)
]
33

Philipp Arras's avatar
Philipp Arras committed
34

Philipp Arras's avatar
Philipp Arras committed
35
36
37
38
39
40
def _stats(op, samples):
    sc = ift.StatCalculator()
    for s in samples:
        sc.add(op(s.extract(op.domain)))
    return sc.mean.val, sc.var.ptw("sqrt").val

Philipp Arras's avatar
Philipp Arras committed
41

42
43
44
45
46
47
48
49
def _rand():
    return ift.random.current_rng().normal()


def _posrand():
    return np.exp(_rand())


50
51
52
53
54
55
56
57
58
59
60
@pmp('dofdex', [[0, 0], [0, 1]])
@pmp('seed', [12, 3])
def testDistributor(dofdex, seed):
    with ift.random.Context(seed):
        dom = ift.RGSpace(3)
        N_copies = max(dofdex) + 1
        distributed_target = ift.makeDomain(
            (ift.UnstructuredDomain(len(dofdex)), dom))
        target = ift.makeDomain((ift.UnstructuredDomain(N_copies), dom))
        op = ift.library.correlated_fields._Distributor(
            dofdex, target, distributed_target)
Philipp Arras's avatar
Philipp Arras committed
61
        ift.extra.check_linear_operator(op)
62
63


64
@pmp('sspace', spaces)
Philipp Arras's avatar
Philipp Arras committed
65
@pmp('N', [0, 2])
Philipp Arras's avatar
Philipp Arras committed
66
def testAmplitudesInvariants(sspace, N):
67
    fsspace = ift.RGSpace((12,), (0.4,))
Philipp Arras's avatar
Philipp Arras committed
68
    dofdex1, dofdex2, dofdex3 = None, None, None
69
    if N == 2:
Philipp Arras's avatar
Philipp Arras committed
70
        dofdex1, dofdex2, dofdex3 = [0, 0], [1, 0], [1, 1]
71

72
73
74
75
76
77
78
79
80
    astds = 0.2, 1.2
    offset_std_mean = 1.3
    fa = ift.CorrelatedFieldMaker.make(1.2, offset_std_mean, 1e-2, '', N,
                                       dofdex1)
    fa.add_fluctuations(sspace, astds[0], 1e-2, 1.1, 2., 2.1, .5, -2, 1.,
                        'spatial', dofdex=dofdex2)
    fa.add_fluctuations(fsspace, astds[1], 1e-2, 3.1, 1., .5, .1, -4, 1.,
                        'freq', dofdex=dofdex3)
    op = fa.finalize()
81

Philipp Arras's avatar
Philipp Arras committed
82
    samples = [ift.from_random(op.domain) for _ in range(100)]
Philipp Arras's avatar
Philipp Arras committed
83
84
85
86
    tot_flm, _ = _stats(fa.total_fluctuation, samples)
    offset_amp_std, _ = _stats(fa.amplitude_total_offset, samples)
    intergated_fluct_std0, _ = _stats(fa.average_fluctuation(0), samples)
    intergated_fluct_std1, _ = _stats(fa.average_fluctuation(1), samples)
87

Philipp Arras's avatar
Philipp Arras committed
88
89
    slice_fluct_std0, _ = _stats(fa.slice_fluctuation(0), samples)
    slice_fluct_std1, _ = _stats(fa.slice_fluctuation(1), samples)
Martin Reinecke's avatar
stage 3    
Martin Reinecke committed
90

91
92
93
94
95
96
97
    sams = [op(s) for s in samples]
    fluct_total = fa.total_fluctuation_realized(sams)
    fluct_space = fa.average_fluctuation_realized(sams, 0)
    fluct_freq = fa.average_fluctuation_realized(sams, 1)
    zm_std_mean = fa.offset_amplitude_realized(sams)
    sl_fluct_space = fa.slice_fluctuation_realized(sams, 0)
    sl_fluct_freq = fa.slice_fluctuation_realized(sams, 1)
98

99
100
101
102
103
104
    assert_allclose(offset_amp_std, zm_std_mean, rtol=0.5)
    assert_allclose(intergated_fluct_std0, fluct_space, rtol=0.5)
    assert_allclose(intergated_fluct_std1, fluct_freq, rtol=0.5)
    assert_allclose(tot_flm, fluct_total, rtol=0.5)
    assert_allclose(slice_fluct_std0, sl_fluct_space, rtol=0.5)
    assert_allclose(slice_fluct_std1, sl_fluct_freq, rtol=0.5)
105

106
    fa = ift.CorrelatedFieldMaker.make(0., offset_std_mean, .1, '', N, dofdex1)
Philipp Arras's avatar
Philipp Arras committed
107
108
    fa.add_fluctuations(fsspace, astds[1], 1., 3.1, 1., .5, .1, -4, 1., 'freq',
                        dofdex=dofdex3)
109
110
    m = 3.
    x = fa.moment_slice_to_average(m)
Philipp Arras's avatar
Philipp Arras committed
111
112
    fa.add_fluctuations(sspace, x, 1.5, 1.1, 2., 2.1, .5, -2, 1., 'spatial', 0,
                        dofdex=dofdex2)
113
    op = fa.finalize()
Philipp Arras's avatar
Philipp Arras committed
114
    em, estd = _stats(fa.slice_fluctuation(0), samples)
115

116
    assert_allclose(m, em, rtol=0.5)
Philipp Arras's avatar
Philipp Arras committed
117
118
    assert_(op.target[-2] == sspace)
    assert_(op.target[-1] == fsspace)
Philipp Arras's avatar
Philipp Arras committed
119
120

    for ampl in fa.normalized_amplitudes:
121
122
        ift.extra.check_operator(ampl, 0.1*ift.from_random(ampl.domain), ntries=10)
    ift.extra.check_operator(op, 0.1*ift.from_random(op.domain), ntries=10)
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141


@pmp('seed', [42, 31])
@pmp('domain', spaces)
def test_complicated_vs_simple(seed, domain):
    with ift.random.Context(seed):
        offset_mean = _rand()
        offset_std_mean = _posrand()
        offset_std_std = _posrand()
        fluctuations_mean = _posrand()
        fluctuations_stddev = _posrand()
        flexibility_mean = _posrand()
        flexibility_stddev = _posrand()
        asperity_mean = _posrand()
        asperity_stddev = _posrand()
        loglogavgslope_mean = _posrand()
        loglogavgslope_stddev = _posrand()
        prefix = 'foobar'
        hspace = domain.get_default_codomain()
Philipp Arras's avatar
Philipp Arras committed
142
143
144
145
146
147
148
149
150
151
152
153
154
155
        op0 = ift.SimpleCorrelatedField(domain,
                                        offset_mean,
                                        offset_std_mean,
                                        offset_std_std,
                                        fluctuations_mean,
                                        fluctuations_stddev,
                                        flexibility_mean,
                                        flexibility_stddev,
                                        asperity_mean,
                                        asperity_stddev,
                                        loglogavgslope_mean,
                                        loglogavgslope_stddev,
                                        prefix=prefix,
                                        harmonic_partner=hspace)
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
        cfm = ift.CorrelatedFieldMaker.make(offset_mean, offset_std_mean,
                                            offset_std_std, prefix)
        cfm.add_fluctuations(domain,
                             fluctuations_mean,
                             fluctuations_stddev,
                             flexibility_mean,
                             flexibility_stddev,
                             asperity_mean,
                             asperity_stddev,
                             loglogavgslope_mean,
                             loglogavgslope_stddev,
                             prefix='',
                             harmonic_partner=hspace)
        op1 = cfm.finalize()
        assert_(op0.domain is op1.domain)
        inp = ift.from_random(op0.domain)
        ift.extra.assert_allclose(op0(inp), op1(inp))