hammurapy.py 9.87 KB
Newer Older
1
2
3
4
5
# -*- coding: utf-8 -*-

import abc
import os
import tempfile
6
import subprocess
7
8

import healpy
9
import numpy as np
10

11
from d2o import distributed_data_object
12
13

from imagine.observers.observer import Observer
14
from imagine.magnetic_fields.magnetic_field import MagneticField
15
16


17
class Hammurapy(Observer):
18
    def __init__(self, hammurabi_executable, conf_directory='./confs',
19
20
                 working_directory_base='.', nside=128,
                 analytic_ensemble_mean=False):
21
22
        self.hammurabi_executable = os.path.abspath(hammurabi_executable)
        self.conf_directory = os.path.abspath(conf_directory)
23
        self.working_directory_base = os.path.abspath(working_directory_base)
24

25
26
        self.analytic_ensemble_mean = bool(analytic_ensemble_mean)

27
        self.nside = int(nside)
28

29
30
        self.last_call_log = ""

31
32
33
34
35
        sync_template_fname = os.path.join(self.conf_directory,
                                           'IQU_sync.fits')
        dust_template_fname = os.path.join(self.conf_directory,
                                           'IQU_dust.fits')

36
        self.basic_parameters = {'B_ran_mem_lim': '6',
37
                                 'obs_shell_index_numb': '1',
38
39
                                 'total_shell_numb': '3',
                                 'vec_size_R': '500',
40
41
                                 'max_radius': '30',
                                 'max_z': '15',
42
43
44
45
46
47
                                 'B_field_transform_lon': '-999',
                                 'B_field_transform_lat': '-999',
                                 'TE_grid_filename': 'negrid_n400.bin',
                                 'TE_nx': '400',
                                 'TE_ny': '400',
                                 'TE_nz': '80',
48
                                 'TE_interp': 'F',
49
50
51
52
53
54
                                 'do_sync_emission': 'F',
                                 'do_rm': 'F',
                                 'do_dm': 'F',
                                 'do_dust': 'F',
                                 'do_tau': 'F',
                                 'do_ff': 'F',
55
56
57
                                 'obs_freq_GHz': '23',
                                 'sync_template_fname': sync_template_fname,
                                 'dust_template_fname': dust_template_fname
58
                                 }
59
60

    @abc.abstractproperty
61
62
    def magnetic_field_class(self):
        return MagneticField
63
64

    def _make_temp_folder(self):
65
        prefix = os.path.join(self.working_directory_base, 'temp_hammurabi_')
66
67
        return tempfile.mkdtemp(prefix=prefix)

68
69
70
71
72
73
74
75
76
77
78
79
80
81
    def _remove_folder(self, path):
        # Try multiple times in order to overcome 'Directory not empty' errors
        # Hopefully open file handles get closed in the meantime
        n = 0
        while n < 10:
            temp_process = subprocess.Popen(['rm', '-rf', path],
                                            stderr=subprocess.PIPE)
            # wait until process is finished
            errlog = temp_process.communicate()[1]
            # check if there were some errors
            if errlog == '':
                break
        else:
            self.logger.warning('Could not delete %s' % path)
82

83
    def _read_fits_file(self, path, name, nside):
84
85
86
87
88
89
90
        map_path = os.path.join(path, name)
        result_list = []
        i = 0
        while True:
            try:
                loaded_map = healpy.read_map(map_path, verbose=False,
                                             field=i)
91
                loaded_map = healpy.ud_grade(loaded_map, nside_out=nside)
92
93
94
95
96
97
98
99
100
101
102
103
104
105
                result_list += [loaded_map]
                i += 1
            except IndexError:
                break
        return result_list

    def _call_hammurabi(self, path):
        parameters_file_path = os.path.join(path, 'parameters.txt')
        temp_process = subprocess.Popen([self.hammurabi_executable,
                                         parameters_file_path],
                                        stdout=subprocess.PIPE,
                                        cwd=self.conf_directory)
        self.last_call_log = temp_process.communicate()[0]

106
107
    def _initialize_observable_dict(self, observable_dict, magnetic_field):
        pass
108

109
110
111
112
113
    def _build_parameter_dict(self, parameter_dict, magnetic_field,
                              working_directory, local_ensemble_index):
        grid_space = magnetic_field.domain[1]
        lx, ly, lz = np.array(grid_space.shape)*np.array(grid_space.distances)
        nx, ny, nz = grid_space.shape
114
115
116
        if local_ensemble_index is not None:
            random_seed = magnetic_field.random_seed[local_ensemble_index]
            parameter_dict.update({'B_field_seed': random_seed})
117
118
119
120
121
122
123
124

        parameter_dict.update({'B_field_lx': lx,
                               'B_field_ly': ly,
                               'B_field_lz': lz,
                               'B_field_nx': nx,
                               'B_field_ny': ny,
                               'B_field_nz': nz,
                               })
125
        parameter_dict.update({'obs_NSIDE': self.nside})
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

    def _write_parameter_dict(self, parameter_dict, working_directory):
        parameters_string = ''
        for item in parameter_dict:
            parameters_string += item + '=' + str(parameter_dict[item]) + '\n'

        parameters_file_path = os.path.join(working_directory,
                                            'parameters.txt')
        with open(parameters_file_path, 'wb') as config_file:
            config_file.write(parameters_string)

    def _fill_observable_dict(self, observable_dict, working_directory,
                              ensemble_index):
        return observable_dict

141
142
143
    def _add_ensemble_mean(self, observable_dict, working_directory):
        pass

144
145
    def __call__(self, magnetic_field):

146
147
148
149
        if not isinstance(magnetic_field, self.magnetic_field_class):
            raise ValueError("Given magnetic field is not a subclass of" +
                             " %s" % str(self.magnetic_field_class))

150
151
152
153
154
155
156
157
        observable_dict = {}
        self._initialize_observable_dict(observable_dict=observable_dict,
                                         magnetic_field=magnetic_field)

        # iterate over ensemble and put result into result_observable
        # get the local shape by creating a dummy d2o
        ensemble_number = magnetic_field.shape[0]
        dummy = distributed_data_object(global_shape=(ensemble_number,),
158
159
                                        distribution_strategy='equal',
                                        dtype=np.float)
160
161
162

        local_length = dummy.distributor.local_length
        for local_ensemble_index in xrange(local_length):
Theo Steininger's avatar
Theo Steininger committed
163
164
            self.logger.debug("Processing local_ensemble_index %i." %
                              local_ensemble_index)
165
166
167
168
169
            # create a temporary folder
            working_directory = self._make_temp_folder()

            # create dictionary for parameter file
            parameter_dict = self.basic_parameters.copy()
170
171

            # set the parameters for a numerical run
Theo Steininger's avatar
Theo Steininger committed
172
            parameter_dict['B_field_interp'] = 'F'
173
174
            parameter_dict['use_B_analytic'] = 'F'

175
176
177
178
179
180
181
182
183
            self._build_parameter_dict(
                                    parameter_dict=parameter_dict,
                                    magnetic_field=magnetic_field,
                                    working_directory=working_directory,
                                    local_ensemble_index=local_ensemble_index)

            self._write_parameter_dict(parameter_dict=parameter_dict,
                                       working_directory=working_directory)

184
            # call hammurabi
185
            self._call_hammurabi(working_directory)
186
187
188
189
190
191
192
193
194
195
196
197

            # if hammurabi failed, _fill_observable_dict will fail
            try:
                self._fill_observable_dict(observable_dict,
                                           working_directory,
                                           local_ensemble_index)
            except:
                self.logger.critical("Hammurabi failed! Last call log:\n" +
                                     self.last_call_log)
                raise
            finally:
                self._remove_folder(working_directory)
198

199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
        if self.analytic_ensemble_mean:
            # compute ensemble_mean on last node and add it to the observables
            rank = dummy.comm.rank
            size = dummy.comm.size

            if rank + 1 == size:
                self.logger.debug("Processing ensemble mean.")
                # create a temporary folder
                working_directory = self._make_temp_folder()

                # create dictionary for parameter file
                parameter_dict = self.basic_parameters.copy()

                # set the parameters for an analytic run
                parameter_dict['use_B_analytic'] = 'T'

                self._build_parameter_dict(
                                        parameter_dict=parameter_dict,
                                        magnetic_field=magnetic_field,
                                        working_directory=working_directory,
                                        local_ensemble_index=None)

                self._write_parameter_dict(parameter_dict=parameter_dict,
                                           working_directory=working_directory)

                # call hammurabi
                self._call_hammurabi(working_directory)

                # if hammurabi failed, _add_ensemble_mean will fail
                try:
                    self._add_ensemble_mean(observable_dict, working_directory)
                except:
                    self.logger.critical("Hammurabi failed! Last call log:\n" +
                                         self.last_call_log)
                    raise
                finally:
                    self._remove_folder(working_directory)
            else:
                working_directory = None
238
239
                self._add_ensemble_mean(observable_dict, working_directory)

240
        return observable_dict