hammurapy.py 9.94 KB
Newer Older
1
2
3
4
5
# -*- coding: utf-8 -*-

import abc
import os
import tempfile
6
import subprocess
7
8

import healpy
9
import numpy as np
10

11
from d2o import distributed_data_object
12
13

from imagine.observers.observer import Observer
14
from imagine.magnetic_fields.magnetic_field import MagneticField
15
16


17
class Hammurapy(Observer):
18
    def __init__(self, hammurabi_executable, conf_directory='./confs',
19
20
                 working_directory_base='.', nside=128,
                 analytic_ensemble_mean=False):
21
22
        self.hammurabi_executable = os.path.abspath(hammurabi_executable)
        self.conf_directory = os.path.abspath(conf_directory)
23
        self.working_directory_base = os.path.abspath(working_directory_base)
24

25
26
        self.analytic_ensemble_mean = bool(analytic_ensemble_mean)

27
        self.nside = int(nside)
28

29
30
        self.last_call_log = ""

31
32
33
34
35
        sync_template_fname = os.path.join(self.conf_directory,
                                           'IQU_sync.fits')
        dust_template_fname = os.path.join(self.conf_directory,
                                           'IQU_dust.fits')

36
        self.basic_parameters = {'B_ran_mem_lim': '6',
37
                                 'obs_shell_index_numb': '1',
Theo Steininger's avatar
Theo Steininger committed
38
                                 'total_shell_numb': '1',
39
                                 'vec_size_R': '500',
40
41
                                 'max_radius': '30',
                                 'max_z': '15',
42
43
44
45
46
47
                                 'B_field_transform_lon': '-999',
                                 'B_field_transform_lat': '-999',
                                 'TE_grid_filename': 'negrid_n400.bin',
                                 'TE_nx': '400',
                                 'TE_ny': '400',
                                 'TE_nz': '80',
48
                                 'TE_interp': 'F',
49
50
51
52
53
54
                                 'do_sync_emission': 'F',
                                 'do_rm': 'F',
                                 'do_dm': 'F',
                                 'do_dust': 'F',
                                 'do_tau': 'F',
                                 'do_ff': 'F',
55
56
57
                                 'obs_freq_GHz': '23',
                                 'sync_template_fname': sync_template_fname,
                                 'dust_template_fname': dust_template_fname
58
                                 }
59
60

    @abc.abstractproperty
61
62
    def magnetic_field_class(self):
        return MagneticField
63
64

    def _make_temp_folder(self):
65
        prefix = os.path.join(self.working_directory_base, 'temp_hammurabi_')
66
67
        return tempfile.mkdtemp(prefix=prefix)

68
69
70
71
72
73
74
75
76
77
78
    def _remove_folder(self, path):
        # Try multiple times in order to overcome 'Directory not empty' errors
        # Hopefully open file handles get closed in the meantime
        n = 0
        while n < 10:
            temp_process = subprocess.Popen(['rm', '-rf', path],
                                            stderr=subprocess.PIPE)
            # wait until process is finished
            errlog = temp_process.communicate()[1]
            # check if there were some errors
            if errlog == '':
Theo Steininger's avatar
Theo Steininger committed
79
                self.logger.debug("Successfully removed temporary folder.")
80
81
82
                break
        else:
            self.logger.warning('Could not delete %s' % path)
83

84
    def _read_fits_file(self, path, name, nside):
85
86
87
88
89
90
91
        map_path = os.path.join(path, name)
        result_list = []
        i = 0
        while True:
            try:
                loaded_map = healpy.read_map(map_path, verbose=False,
                                             field=i)
92
                loaded_map = healpy.ud_grade(loaded_map, nside_out=nside)
93
94
95
96
97
98
99
100
101
102
103
104
105
106
                result_list += [loaded_map]
                i += 1
            except IndexError:
                break
        return result_list

    def _call_hammurabi(self, path):
        parameters_file_path = os.path.join(path, 'parameters.txt')
        temp_process = subprocess.Popen([self.hammurabi_executable,
                                         parameters_file_path],
                                        stdout=subprocess.PIPE,
                                        cwd=self.conf_directory)
        self.last_call_log = temp_process.communicate()[0]

107
108
    def _initialize_observable_dict(self, observable_dict, magnetic_field):
        pass
109

110
111
112
113
114
    def _build_parameter_dict(self, parameter_dict, magnetic_field,
                              working_directory, local_ensemble_index):
        grid_space = magnetic_field.domain[1]
        lx, ly, lz = np.array(grid_space.shape)*np.array(grid_space.distances)
        nx, ny, nz = grid_space.shape
115
116
117
        if local_ensemble_index is not None:
            random_seed = magnetic_field.random_seed[local_ensemble_index]
            parameter_dict.update({'B_field_seed': random_seed})
118
119
120
121
122
123
124
125

        parameter_dict.update({'B_field_lx': lx,
                               'B_field_ly': ly,
                               'B_field_lz': lz,
                               'B_field_nx': nx,
                               'B_field_ny': ny,
                               'B_field_nz': nz,
                               })
126
        parameter_dict.update({'obs_NSIDE': self.nside})
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141

    def _write_parameter_dict(self, parameter_dict, working_directory):
        parameters_string = ''
        for item in parameter_dict:
            parameters_string += item + '=' + str(parameter_dict[item]) + '\n'

        parameters_file_path = os.path.join(working_directory,
                                            'parameters.txt')
        with open(parameters_file_path, 'wb') as config_file:
            config_file.write(parameters_string)

    def _fill_observable_dict(self, observable_dict, working_directory,
                              ensemble_index):
        return observable_dict

142
143
144
    def _add_ensemble_mean(self, observable_dict, working_directory):
        pass

145
146
    def __call__(self, magnetic_field):

147
148
149
150
        if not isinstance(magnetic_field, self.magnetic_field_class):
            raise ValueError("Given magnetic field is not a subclass of" +
                             " %s" % str(self.magnetic_field_class))

151
152
153
154
155
156
157
158
        observable_dict = {}
        self._initialize_observable_dict(observable_dict=observable_dict,
                                         magnetic_field=magnetic_field)

        # iterate over ensemble and put result into result_observable
        # get the local shape by creating a dummy d2o
        ensemble_number = magnetic_field.shape[0]
        dummy = distributed_data_object(global_shape=(ensemble_number,),
159
160
                                        distribution_strategy='equal',
                                        dtype=np.float)
161
162
163

        local_length = dummy.distributor.local_length
        for local_ensemble_index in xrange(local_length):
Theo Steininger's avatar
Theo Steininger committed
164
165
            self.logger.debug("Processing local_ensemble_index %i." %
                              local_ensemble_index)
166
167
168
169
170
            # create a temporary folder
            working_directory = self._make_temp_folder()

            # create dictionary for parameter file
            parameter_dict = self.basic_parameters.copy()
171
172

            # set the parameters for a numerical run
Theo Steininger's avatar
Theo Steininger committed
173
            parameter_dict['B_field_interp'] = 'T'
174
175
            parameter_dict['use_B_analytic'] = 'F'

176
177
178
179
180
181
182
183
184
            self._build_parameter_dict(
                                    parameter_dict=parameter_dict,
                                    magnetic_field=magnetic_field,
                                    working_directory=working_directory,
                                    local_ensemble_index=local_ensemble_index)

            self._write_parameter_dict(parameter_dict=parameter_dict,
                                       working_directory=working_directory)

185
            # call hammurabi
186
            self._call_hammurabi(working_directory)
187
188
189
190
191
192
193
194
195
196
197
198

            # if hammurabi failed, _fill_observable_dict will fail
            try:
                self._fill_observable_dict(observable_dict,
                                           working_directory,
                                           local_ensemble_index)
            except:
                self.logger.critical("Hammurabi failed! Last call log:\n" +
                                     self.last_call_log)
                raise
            finally:
                self._remove_folder(working_directory)
199

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
        if self.analytic_ensemble_mean:
            # compute ensemble_mean on last node and add it to the observables
            rank = dummy.comm.rank
            size = dummy.comm.size

            if rank + 1 == size:
                self.logger.debug("Processing ensemble mean.")
                # create a temporary folder
                working_directory = self._make_temp_folder()

                # create dictionary for parameter file
                parameter_dict = self.basic_parameters.copy()

                # set the parameters for an analytic run
                parameter_dict['use_B_analytic'] = 'T'

                self._build_parameter_dict(
                                        parameter_dict=parameter_dict,
                                        magnetic_field=magnetic_field,
                                        working_directory=working_directory,
                                        local_ensemble_index=None)

                self._write_parameter_dict(parameter_dict=parameter_dict,
                                           working_directory=working_directory)

                # call hammurabi
                self._call_hammurabi(working_directory)

                # if hammurabi failed, _add_ensemble_mean will fail
                try:
                    self._add_ensemble_mean(observable_dict, working_directory)
                except:
                    self.logger.critical("Hammurabi failed! Last call log:\n" +
                                         self.last_call_log)
                    raise
                finally:
                    self._remove_folder(working_directory)
            else:
                working_directory = None
239
240
                self._add_ensemble_mean(observable_dict, working_directory)

241
        return observable_dict