hammurapy.py 10.1 KB
Newer Older
1
2
3
4
5
# -*- coding: utf-8 -*-

import abc
import os
import tempfile
6
import subprocess
7
8

import healpy
9
import numpy as np
10

11
from d2o import distributed_data_object
12
13

from imagine.observers.observer import Observer
14
from imagine.magnetic_fields.magnetic_field import MagneticField
15
16


17
class Hammurapy(Observer):
18
    def __init__(self, hammurabi_executable, conf_directory='./confs',
19
20
                 working_directory_base='.', nside=128,
                 analytic_ensemble_mean=False):
21
22
        self.hammurabi_executable = os.path.abspath(hammurabi_executable)
        self.conf_directory = os.path.abspath(conf_directory)
23
        self.working_directory_base = os.path.abspath(working_directory_base)
24

25
26
        self.analytic_ensemble_mean = bool(analytic_ensemble_mean)

27
        self.nside = int(nside)
28

29
30
        self.last_call_log = ""

31
32
33
34
35
        sync_template_fname = os.path.join(self.conf_directory,
                                           'IQU_sync.fits')
        dust_template_fname = os.path.join(self.conf_directory,
                                           'IQU_dust.fits')

36
        self.basic_parameters = {'B_ran_mem_lim': '6',
37
                                 'obs_shell_index_numb': '1',
Theo Steininger's avatar
Theo Steininger committed
38
                                 'total_shell_numb': '1',
39
                                 'vec_size_R': '400',
40
                                 'max_radius': '30',
41
                                 'max_z': '10',
42
43
44
45
46
47
                                 'B_field_transform_lon': '-999',
                                 'B_field_transform_lat': '-999',
                                 'TE_grid_filename': 'negrid_n400.bin',
                                 'TE_nx': '400',
                                 'TE_ny': '400',
                                 'TE_nz': '80',
48
                                 'TE_interp': 'T',
49
50
51
52
53
54
                                 'do_sync_emission': 'F',
                                 'do_rm': 'F',
                                 'do_dm': 'F',
                                 'do_dust': 'F',
                                 'do_tau': 'F',
                                 'do_ff': 'F',
55
56
57
                                 'obs_freq_GHz': '23',
                                 'sync_template_fname': sync_template_fname,
                                 'dust_template_fname': dust_template_fname
58
                                 }
59
60

    @abc.abstractproperty
61
62
    def magnetic_field_class(self):
        return MagneticField
63
64

    def _make_temp_folder(self):
65
        prefix = os.path.join(self.working_directory_base, 'temp_hammurabi_')
66
67
        return tempfile.mkdtemp(prefix=prefix)

68
69
70
71
72
73
74
75
76
77
78
    def _remove_folder(self, path):
        # Try multiple times in order to overcome 'Directory not empty' errors
        # Hopefully open file handles get closed in the meantime
        n = 0
        while n < 10:
            temp_process = subprocess.Popen(['rm', '-rf', path],
                                            stderr=subprocess.PIPE)
            # wait until process is finished
            errlog = temp_process.communicate()[1]
            # check if there were some errors
            if errlog == '':
Theo Steininger's avatar
Theo Steininger committed
79
                self.logger.debug("Successfully removed temporary folder.")
80
81
82
                break
        else:
            self.logger.warning('Could not delete %s' % path)
83

84
    def _read_fits_file(self, path, name, nside):
85
86
87
88
89
90
91
        map_path = os.path.join(path, name)
        result_list = []
        i = 0
        while True:
            try:
                loaded_map = healpy.read_map(map_path, verbose=False,
                                             field=i)
92
                loaded_map = healpy.ud_grade(loaded_map, nside_out=nside)
93
94
95
96
97
98
99
100
101
102
103
104
105
106
                result_list += [loaded_map]
                i += 1
            except IndexError:
                break
        return result_list

    def _call_hammurabi(self, path):
        parameters_file_path = os.path.join(path, 'parameters.txt')
        temp_process = subprocess.Popen([self.hammurabi_executable,
                                         parameters_file_path],
                                        stdout=subprocess.PIPE,
                                        cwd=self.conf_directory)
        self.last_call_log = temp_process.communicate()[0]

107
108
    def _initialize_observable_dict(self, observable_dict, magnetic_field):
        pass
109

110
111
112
113
114
    def _build_parameter_dict(self, parameter_dict, magnetic_field,
                              working_directory, local_ensemble_index):
        grid_space = magnetic_field.domain[1]
        lx, ly, lz = np.array(grid_space.shape)*np.array(grid_space.distances)
        nx, ny, nz = grid_space.shape
115
        if local_ensemble_index is not None:
116
117
118
119
            # access the magnetic-field's random-seed d2o directly, since we
            # know that the distribution strategy is the same for the
            # randam samples and the magnetic field itself
            random_seed = magnetic_field.random_seed.data[local_ensemble_index]
120
            parameter_dict.update({'B_field_seed': random_seed})
121
122
123
124
125
126
127
128

        parameter_dict.update({'B_field_lx': lx,
                               'B_field_ly': ly,
                               'B_field_lz': lz,
                               'B_field_nx': nx,
                               'B_field_ny': ny,
                               'B_field_nz': nz,
                               })
129
        parameter_dict.update({'obs_NSIDE': self.nside})
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144

    def _write_parameter_dict(self, parameter_dict, working_directory):
        parameters_string = ''
        for item in parameter_dict:
            parameters_string += item + '=' + str(parameter_dict[item]) + '\n'

        parameters_file_path = os.path.join(working_directory,
                                            'parameters.txt')
        with open(parameters_file_path, 'wb') as config_file:
            config_file.write(parameters_string)

    def _fill_observable_dict(self, observable_dict, working_directory,
                              ensemble_index):
        return observable_dict

145
146
147
    def _add_ensemble_mean(self, observable_dict, working_directory):
        pass

148
149
    def __call__(self, magnetic_field):

150
151
152
153
        if not isinstance(magnetic_field, self.magnetic_field_class):
            raise ValueError("Given magnetic field is not a subclass of" +
                             " %s" % str(self.magnetic_field_class))

154
155
156
157
158
159
160
161
        observable_dict = {}
        self._initialize_observable_dict(observable_dict=observable_dict,
                                         magnetic_field=magnetic_field)

        # iterate over ensemble and put result into result_observable
        # get the local shape by creating a dummy d2o
        ensemble_number = magnetic_field.shape[0]
        dummy = distributed_data_object(global_shape=(ensemble_number,),
162
163
                                        distribution_strategy='equal',
                                        dtype=np.float)
164
165
166

        local_length = dummy.distributor.local_length
        for local_ensemble_index in xrange(local_length):
Theo Steininger's avatar
Theo Steininger committed
167
168
            self.logger.debug("Processing local_ensemble_index %i." %
                              local_ensemble_index)
169
170
171
172
173
            # create a temporary folder
            working_directory = self._make_temp_folder()

            # create dictionary for parameter file
            parameter_dict = self.basic_parameters.copy()
174
175

            # set the parameters for a numerical run
Theo Steininger's avatar
Theo Steininger committed
176
            parameter_dict['B_field_interp'] = 'T'
177
178
            parameter_dict['use_B_analytic'] = 'F'

179
180
181
182
183
184
185
186
187
            self._build_parameter_dict(
                                    parameter_dict=parameter_dict,
                                    magnetic_field=magnetic_field,
                                    working_directory=working_directory,
                                    local_ensemble_index=local_ensemble_index)

            self._write_parameter_dict(parameter_dict=parameter_dict,
                                       working_directory=working_directory)

188
            # call hammurabi
189
            self._call_hammurabi(working_directory)
190
191
192
193
194
195
196
197
198
199
200
201

            # if hammurabi failed, _fill_observable_dict will fail
            try:
                self._fill_observable_dict(observable_dict,
                                           working_directory,
                                           local_ensemble_index)
            except:
                self.logger.critical("Hammurabi failed! Last call log:\n" +
                                     self.last_call_log)
                raise
            finally:
                self._remove_folder(working_directory)
202

203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
        if self.analytic_ensemble_mean:
            # compute ensemble_mean on last node and add it to the observables
            rank = dummy.comm.rank
            size = dummy.comm.size

            if rank + 1 == size:
                self.logger.debug("Processing ensemble mean.")
                # create a temporary folder
                working_directory = self._make_temp_folder()

                # create dictionary for parameter file
                parameter_dict = self.basic_parameters.copy()

                # set the parameters for an analytic run
                parameter_dict['use_B_analytic'] = 'T'

                self._build_parameter_dict(
                                        parameter_dict=parameter_dict,
                                        magnetic_field=magnetic_field,
                                        working_directory=working_directory,
                                        local_ensemble_index=None)

                self._write_parameter_dict(parameter_dict=parameter_dict,
                                           working_directory=working_directory)

                # call hammurabi
                self._call_hammurabi(working_directory)

                # if hammurabi failed, _add_ensemble_mean will fail
                try:
                    self._add_ensemble_mean(observable_dict, working_directory)
                except:
                    self.logger.critical("Hammurabi failed! Last call log:\n" +
                                         self.last_call_log)
                    raise
                finally:
                    self._remove_folder(working_directory)
            else:
                working_directory = None
242
243
                self._add_ensemble_mean(observable_dict, working_directory)

244
        return observable_dict