hammurapy.py 9.82 KB
Newer Older
1
2
3
4
5
# -*- coding: utf-8 -*-

import abc
import os
import tempfile
6
import subprocess
7
8

import healpy
9
import numpy as np
10

11
from d2o import distributed_data_object
12
13

from imagine.observers.observer import Observer
14
from imagine.magnetic_fields.magnetic_field import MagneticField
15
16


17
class Hammurapy(Observer):
18
    def __init__(self, hammurabi_executable, conf_directory='./confs',
19
20
                 working_directory_base='.', nside=128,
                 analytic_ensemble_mean=False):
21
22
        self.hammurabi_executable = os.path.abspath(hammurabi_executable)
        self.conf_directory = os.path.abspath(conf_directory)
23
        self.working_directory_base = os.path.abspath(working_directory_base)
24

25
26
        self.analytic_ensemble_mean = bool(analytic_ensemble_mean)

27
        self.nside = int(nside)
28

29
30
        self.last_call_log = ""

31
32
33
34
35
        sync_template_fname = os.path.join(self.conf_directory,
                                           'IQU_sync.fits')
        dust_template_fname = os.path.join(self.conf_directory,
                                           'IQU_dust.fits')

36
        self.basic_parameters = {'B_ran_mem_lim': '6',
37
                                 'obs_shell_index_numb': '1',
38
39
                                 'total_shell_numb': '3',
                                 'vec_size_R': '500',
40
41
42
43
44
45
46
                                 'max_radius': '35',
                                 'B_field_transform_lon': '-999',
                                 'B_field_transform_lat': '-999',
                                 'TE_grid_filename': 'negrid_n400.bin',
                                 'TE_nx': '400',
                                 'TE_ny': '400',
                                 'TE_nz': '80',
47
                                 'TE_interp': 'F',
48
49
50
51
52
53
                                 'do_sync_emission': 'F',
                                 'do_rm': 'F',
                                 'do_dm': 'F',
                                 'do_dust': 'F',
                                 'do_tau': 'F',
                                 'do_ff': 'F',
54
55
56
                                 'obs_freq_GHz': '23',
                                 'sync_template_fname': sync_template_fname,
                                 'dust_template_fname': dust_template_fname
57
                                 }
58
59

    @abc.abstractproperty
60
61
    def magnetic_field_class(self):
        return MagneticField
62
63

    def _make_temp_folder(self):
64
        prefix = os.path.join(self.working_directory_base, 'temp_hammurabi_')
65
66
        return tempfile.mkdtemp(prefix=prefix)

67
68
69
70
71
72
73
74
75
76
77
78
79
80
    def _remove_folder(self, path):
        # Try multiple times in order to overcome 'Directory not empty' errors
        # Hopefully open file handles get closed in the meantime
        n = 0
        while n < 10:
            temp_process = subprocess.Popen(['rm', '-rf', path],
                                            stderr=subprocess.PIPE)
            # wait until process is finished
            errlog = temp_process.communicate()[1]
            # check if there were some errors
            if errlog == '':
                break
        else:
            self.logger.warning('Could not delete %s' % path)
81

82
    def _read_fits_file(self, path, name, nside):
83
84
85
86
87
88
89
        map_path = os.path.join(path, name)
        result_list = []
        i = 0
        while True:
            try:
                loaded_map = healpy.read_map(map_path, verbose=False,
                                             field=i)
90
                loaded_map = healpy.ud_grade(loaded_map, nside_out=nside)
91
92
93
94
95
96
97
98
99
100
101
102
103
104
                result_list += [loaded_map]
                i += 1
            except IndexError:
                break
        return result_list

    def _call_hammurabi(self, path):
        parameters_file_path = os.path.join(path, 'parameters.txt')
        temp_process = subprocess.Popen([self.hammurabi_executable,
                                         parameters_file_path],
                                        stdout=subprocess.PIPE,
                                        cwd=self.conf_directory)
        self.last_call_log = temp_process.communicate()[0]

105
106
    def _initialize_observable_dict(self, observable_dict, magnetic_field):
        pass
107

108
109
110
111
112
    def _build_parameter_dict(self, parameter_dict, magnetic_field,
                              working_directory, local_ensemble_index):
        grid_space = magnetic_field.domain[1]
        lx, ly, lz = np.array(grid_space.shape)*np.array(grid_space.distances)
        nx, ny, nz = grid_space.shape
113
114
115
        if local_ensemble_index is not None:
            random_seed = magnetic_field.random_seed[local_ensemble_index]
            parameter_dict.update({'B_field_seed': random_seed})
116
117
118
119
120
121
122
123

        parameter_dict.update({'B_field_lx': lx,
                               'B_field_ly': ly,
                               'B_field_lz': lz,
                               'B_field_nx': nx,
                               'B_field_ny': ny,
                               'B_field_nz': nz,
                               })
124
        parameter_dict.update({'obs_NSIDE': self.nside})
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139

    def _write_parameter_dict(self, parameter_dict, working_directory):
        parameters_string = ''
        for item in parameter_dict:
            parameters_string += item + '=' + str(parameter_dict[item]) + '\n'

        parameters_file_path = os.path.join(working_directory,
                                            'parameters.txt')
        with open(parameters_file_path, 'wb') as config_file:
            config_file.write(parameters_string)

    def _fill_observable_dict(self, observable_dict, working_directory,
                              ensemble_index):
        return observable_dict

140
141
142
    def _add_ensemble_mean(self, observable_dict, working_directory):
        pass

143
144
    def __call__(self, magnetic_field):

145
146
147
148
        if not isinstance(magnetic_field, self.magnetic_field_class):
            raise ValueError("Given magnetic field is not a subclass of" +
                             " %s" % str(self.magnetic_field_class))

149
150
151
152
153
154
155
156
        observable_dict = {}
        self._initialize_observable_dict(observable_dict=observable_dict,
                                         magnetic_field=magnetic_field)

        # iterate over ensemble and put result into result_observable
        # get the local shape by creating a dummy d2o
        ensemble_number = magnetic_field.shape[0]
        dummy = distributed_data_object(global_shape=(ensemble_number,),
157
158
                                        distribution_strategy='equal',
                                        dtype=np.float)
159
160
161

        local_length = dummy.distributor.local_length
        for local_ensemble_index in xrange(local_length):
Theo Steininger's avatar
Theo Steininger committed
162
163
            self.logger.debug("Processing local_ensemble_index %i." %
                              local_ensemble_index)
164
165
166
167
168
            # create a temporary folder
            working_directory = self._make_temp_folder()

            # create dictionary for parameter file
            parameter_dict = self.basic_parameters.copy()
169
170
171
172
173

            # set the parameters for a numerical run
            parameter_dict['B_field_interp'] = 'T'
            parameter_dict['use_B_analytic'] = 'F'

174
175
176
177
178
179
180
181
182
            self._build_parameter_dict(
                                    parameter_dict=parameter_dict,
                                    magnetic_field=magnetic_field,
                                    working_directory=working_directory,
                                    local_ensemble_index=local_ensemble_index)

            self._write_parameter_dict(parameter_dict=parameter_dict,
                                       working_directory=working_directory)

183
            # call hammurabi
184
            self._call_hammurabi(working_directory)
185
186
187
188
189
190
191
192
193
194
195
196

            # if hammurabi failed, _fill_observable_dict will fail
            try:
                self._fill_observable_dict(observable_dict,
                                           working_directory,
                                           local_ensemble_index)
            except:
                self.logger.critical("Hammurabi failed! Last call log:\n" +
                                     self.last_call_log)
                raise
            finally:
                self._remove_folder(working_directory)
197

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
        if self.analytic_ensemble_mean:
            # compute ensemble_mean on last node and add it to the observables
            rank = dummy.comm.rank
            size = dummy.comm.size

            if rank + 1 == size:
                self.logger.debug("Processing ensemble mean.")
                # create a temporary folder
                working_directory = self._make_temp_folder()

                # create dictionary for parameter file
                parameter_dict = self.basic_parameters.copy()

                # set the parameters for an analytic run
                parameter_dict['use_B_analytic'] = 'T'

                self._build_parameter_dict(
                                        parameter_dict=parameter_dict,
                                        magnetic_field=magnetic_field,
                                        working_directory=working_directory,
                                        local_ensemble_index=None)

                self._write_parameter_dict(parameter_dict=parameter_dict,
                                           working_directory=working_directory)

                # call hammurabi
                self._call_hammurabi(working_directory)

                # if hammurabi failed, _add_ensemble_mean will fail
                try:
                    self._add_ensemble_mean(observable_dict, working_directory)
                except:
                    self.logger.critical("Hammurabi failed! Last call log:\n" +
                                         self.last_call_log)
                    raise
                finally:
                    self._remove_folder(working_directory)
            else:
                working_directory = None
237
238
                self._add_ensemble_mean(observable_dict, working_directory)

239
        return observable_dict