pipeline.py 6.83 KB
Newer Older
1
2
# -*- coding: utf-8 -*-

3
4
5
6
7
import os
import numpy as np

from mpi4py import MPI

8
9
10
from keepers import Loggable

from likelihoods import Likelihood
11
from magnetic_fields import MagneticFieldFactory
12
13
from observers import Observer
from priors import Prior
14
from imagine import pymultinest
15
16
17
18

comm = MPI.COMM_WORLD
size = comm.size
rank = comm.rank
19

20
21
22
WORK_TAG = 0
DIE_TAG = 1

23
24

class Pipeline(Loggable, object):
25
26
27
28
29
30
31
32
33
34
35
36
    """
    The pipeline
    - posses all the building blocks: magnetic_field, observer,
        likelihood and prior.
    - if multiple log-likelihoods and log-priors are given: sum the result
    - coordinates the repeated observation in order to compute an ensemble
    - controls which parameters of the magnetic field are tested
        (active parameters)


    """
    def __init__(self, magnetic_field_factory, observer, likelihood, prior,
37
38
                 active_variables=[], ensemble_size=1,
                 pymultinest_parameters={}):
39
        self.logger.debug("Setting up pipeline.")
40
        self.magnetic_field_factory = magnetic_field_factory
41
42
43
        self.observer = observer
        self.likelihood = likelihood
        self.prior = prior
44
        self.active_variables = active_variables
45
46
        self.ensemble_size = ensemble_size

47
48
49
50
51
52
        # setting defaults
        self.pymultinest_parameters = {'verbose': True,
                                       'n_iter_before_update': 1,
                                       'n_live_points': 100}
        self.pymultinest_parameters.update(pymultinest_parameters)

53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    @property
    def observer(self):
        return self._observer

    @observer.setter
    def observer(self, observer):
        if not isinstance(observer, Observer):
            raise TypeError("observer must be an instance of Observer-class.")
        self.logger.debug("Setting observer.")
        self._observer = observer

    @property
    def likelihood(self):
        return self._likelihood

    @likelihood.setter
    def likelihood(self, likelihood):
70
        self.logger.debug("Setting likelihood.")
71
        self._likelihood = ()
72
73
74
        if not (isinstance(likelihood, list) and
                isinstance(likelihood, tuple)):
            likelihood = [likelihood]
75
76
77
78
79
        for l in likelihood:
            if not isinstance(l, Likelihood):
                raise TypeError(
                    "likelihood must be an instance of Likelihood-class.")
            self._likelihood += (l,)
80
81
82
83
84
85
86

    @property
    def prior(self):
        return self._prior

    @prior.setter
    def prior(self, prior):
87
        self.logger.debug("Setting prior.")
88
89
90
91
        if not isinstance(prior, Prior):
            raise TypeError(
                "prior must be an instance of Prior-class.")
        self._prior = prior
92
93

    @property
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    def magnetic_field_factory(self):
        return self._magnetic_field_factory

    @magnetic_field_factory.setter
    def magnetic_field_factory(self, magnetic_field_factory):
        if not isinstance(magnetic_field_factory, MagneticFieldFactory):
            raise TypeError(
                "magnetic_field_factory must be an instance of the "
                "MagneticFieldFactory-class.")
        self.logger.debug("Setting magnetic_field_factory.")
        self._magnetic_field_factory = magnetic_field_factory

    @property
    def active_variables(self):
        return self._active_variables

    @active_variables.setter
    def active_variables(self, active_variables):
        if not isinstance(active_variables, list):
            raise TypeError(
                    "active_variables must be a list.")
        self.logger.debug("Resetting active_variables to %s" %
                          str(active_variables))
        new_active = []
        for av in active_variables:
            new_active += [str(av)]
        self._active_variables = new_active
121
122
123
124
125
126
127
128
129
130
131
132
133
134

    @property
    def ensemble_size(self):
        return self._ensemble_size

    @ensemble_size.setter
    def ensemble_size(self, ensemble_size):

        ensemble_size = int(ensemble_size)
        if ensemble_size <= 0:
            raise ValueError("ensemble_size must be positive!")
        self.logger.debug("Setting ensemble size to %i." % ensemble_size)
        self._ensemble_size = ensemble_size

135
136
137
138
139
140
141
142
    def _multinest_likelihood(self, cube, ndim, nparams):
        cube_content = np.empty(ndim)
        for i in xrange(ndim):
            cube_content[i] = cube[i]
        if rank != 0:
            raise RuntimeError("_multinest_likelihood must only be called on "
                               "rank==0.")
        for i in xrange(1, size):
143
            comm.send(cube_content, dest=i, tag=WORK_TAG)
Theo Steininger's avatar
Theo Steininger committed
144
        self.logger.debug("Sent multinest-cube to nodes with rank > 0.")
145
146
147
148

        return self._core_likelihood(cube_content)

    def _listen_for_likelihood_calls(self):
149
150
151
152
153
154
155
156
        status = MPI.Status()
        while True:
            cube = comm.recv(source=0, tag=MPI.ANY_TAG, status=status)
            if status == DIE_TAG:
                self.logger.debug("Received DIE_TAG from rank 0.")
                break
            self.logger.debug("Received cube from rank 0.")
            self._core_likelihood(cube)
157
158

    def _core_likelihood(self, cube):
159
160
        self.logger.debug("Beginning Likelihood-calculation for %s." %
                          str(cube))
161
162
163
164
165
166
        # translate cube to variables
        variables = {}
        for i, av in enumerate(self.active_variables):
            variables[av] = cube[i]

        # create magnetic field
167
168
169
170
171
        self.logger.debug("Creating magnetic field.")
        b_field = self.magnetic_field_factory.generate(
                                              variables=variables,
                                              ensemble_size=self.ensemble_size)

172
        # create observables
173
        self.logger.debug("Creating observables.")
174
175
176
        observables = self.observer(b_field)

        # add up individual log-likelihood terms
177
        self.logger.debug("Evaluating likelihood(s).")
178
179
180
181
        likelihood = 0
        for like in self.likelihood:
            likelihood += like(observables)

Theo Steininger's avatar
Theo Steininger committed
182
183
        self.logger.info("Evaluated likelihood: %f for %s" %
                         (likelihood, str(cube)))
184
185
        return likelihood

186
    def __call__(self, variables):
187
188
189

        if rank == 0:
            # kickstart pymultinest
Theo Steininger's avatar
Theo Steininger committed
190
            self.logger.info("Starting pymultinest.")
191
192
193
194
195
            if not os.path.exists("chains"):
                os.mkdir("chains")
            pymultinest.run(self._multinest_likelihood,
                            self.prior,
                            len(self.active_variables),
196
                            **self.pymultinest_parameters)
197
198
199
200
            self.logger.info("pymultinest finished.")
            for i in xrange(1, size):
                self.logger.debug("Sending DIE_TAG to rank %i." % i)
                comm.send(None, dest=i, tag=DIE_TAG)
201
202
203
        else:
            # let all other nodes listen for likelihood evaluations
            self._listen_for_likelihood_calls()