hammurapy.py 9.55 KB
Newer Older
1
2
3
4
5
# -*- coding: utf-8 -*-

import abc
import os
import tempfile
6
import subprocess
7
8

import healpy
9
import numpy as np
10

11
from d2o import distributed_data_object
12
13

from imagine.observers.observer import Observer
14
from imagine.magnetic_fields.magnetic_field import MagneticField
15
16


17
class Hammurapy(Observer):
18
    def __init__(self, hammurabi_executable, conf_directory='./confs',
19
                 working_directory_base='.', nside=128):
20
21
        self.hammurabi_executable = os.path.abspath(hammurabi_executable)
        self.conf_directory = os.path.abspath(conf_directory)
22
        self.working_directory_base = os.path.abspath(working_directory_base)
23

24
        self.nside = int(nside)
25

26
27
        self.last_call_log = ""

28
29
30
31
32
        sync_template_fname = os.path.join(self.conf_directory,
                                           'IQU_sync.fits')
        dust_template_fname = os.path.join(self.conf_directory,
                                           'IQU_dust.fits')

33
34
        self.basic_parameters = {'B_ran_mem_lim': '4',
                                 'obs_shell_index_numb': '1',
35
36
                                 'total_shell_numb': '3',
                                 'vec_size_R': '500',
37
38
39
40
41
42
43
                                 'max_radius': '35',
                                 'B_field_transform_lon': '-999',
                                 'B_field_transform_lat': '-999',
                                 'TE_grid_filename': 'negrid_n400.bin',
                                 'TE_nx': '400',
                                 'TE_ny': '400',
                                 'TE_nz': '80',
Theo Steininger's avatar
Theo Steininger committed
44
                                 'TE_interp': 'T',
45
46
47
48
49
50
                                 'do_sync_emission': 'F',
                                 'do_rm': 'F',
                                 'do_dm': 'F',
                                 'do_dust': 'F',
                                 'do_tau': 'F',
                                 'do_ff': 'F',
51
52
53
                                 'obs_freq_GHz': '23',
                                 'sync_template_fname': sync_template_fname,
                                 'dust_template_fname': dust_template_fname
54
                                 }
55
56

    @abc.abstractproperty
57
58
    def magnetic_field_class(self):
        return MagneticField
59
60

    def _make_temp_folder(self):
61
        prefix = os.path.join(self.working_directory_base, 'temp_hammurabi_')
62
63
        return tempfile.mkdtemp(prefix=prefix)

64
65
66
67
68
69
70
71
72
73
74
75
76
77
    def _remove_folder(self, path):
        # Try multiple times in order to overcome 'Directory not empty' errors
        # Hopefully open file handles get closed in the meantime
        n = 0
        while n < 10:
            temp_process = subprocess.Popen(['rm', '-rf', path],
                                            stderr=subprocess.PIPE)
            # wait until process is finished
            errlog = temp_process.communicate()[1]
            # check if there were some errors
            if errlog == '':
                break
        else:
            self.logger.warning('Could not delete %s' % path)
78

79
    def _read_fits_file(self, path, name, nside):
80
81
82
83
84
85
86
        map_path = os.path.join(path, name)
        result_list = []
        i = 0
        while True:
            try:
                loaded_map = healpy.read_map(map_path, verbose=False,
                                             field=i)
87
                loaded_map = healpy.ud_grade(loaded_map, nside_out=nside)
88
89
90
91
92
93
94
95
96
97
98
99
100
101
                result_list += [loaded_map]
                i += 1
            except IndexError:
                break
        return result_list

    def _call_hammurabi(self, path):
        parameters_file_path = os.path.join(path, 'parameters.txt')
        temp_process = subprocess.Popen([self.hammurabi_executable,
                                         parameters_file_path],
                                        stdout=subprocess.PIPE,
                                        cwd=self.conf_directory)
        self.last_call_log = temp_process.communicate()[0]

102
103
    def _initialize_observable_dict(self, observable_dict, magnetic_field):
        pass
104

105
106
107
108
109
    def _build_parameter_dict(self, parameter_dict, magnetic_field,
                              working_directory, local_ensemble_index):
        grid_space = magnetic_field.domain[1]
        lx, ly, lz = np.array(grid_space.shape)*np.array(grid_space.distances)
        nx, ny, nz = grid_space.shape
110
111
112
        if local_ensemble_index is not None:
            random_seed = magnetic_field.random_seed[local_ensemble_index]
            parameter_dict.update({'B_field_seed': random_seed})
113
114
115
116
117
118
119
120

        parameter_dict.update({'B_field_lx': lx,
                               'B_field_ly': ly,
                               'B_field_lz': lz,
                               'B_field_nx': nx,
                               'B_field_ny': ny,
                               'B_field_nz': nz,
                               })
121
        parameter_dict.update({'obs_NSIDE': self.nside})
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136

    def _write_parameter_dict(self, parameter_dict, working_directory):
        parameters_string = ''
        for item in parameter_dict:
            parameters_string += item + '=' + str(parameter_dict[item]) + '\n'

        parameters_file_path = os.path.join(working_directory,
                                            'parameters.txt')
        with open(parameters_file_path, 'wb') as config_file:
            config_file.write(parameters_string)

    def _fill_observable_dict(self, observable_dict, working_directory,
                              ensemble_index):
        return observable_dict

137
138
139
    def _add_ensemble_mean(self, observable_dict, working_directory):
        pass

140
141
    def __call__(self, magnetic_field):

142
143
144
145
        if not isinstance(magnetic_field, self.magnetic_field_class):
            raise ValueError("Given magnetic field is not a subclass of" +
                             " %s" % str(self.magnetic_field_class))

146
147
148
149
150
151
152
153
        observable_dict = {}
        self._initialize_observable_dict(observable_dict=observable_dict,
                                         magnetic_field=magnetic_field)

        # iterate over ensemble and put result into result_observable
        # get the local shape by creating a dummy d2o
        ensemble_number = magnetic_field.shape[0]
        dummy = distributed_data_object(global_shape=(ensemble_number,),
154
155
                                        distribution_strategy='equal',
                                        dtype=np.float)
156
157
158

        local_length = dummy.distributor.local_length
        for local_ensemble_index in xrange(local_length):
Theo Steininger's avatar
Theo Steininger committed
159
160
            self.logger.debug("Processing local_ensemble_index %i." %
                              local_ensemble_index)
161
162
163
164
165
            # create a temporary folder
            working_directory = self._make_temp_folder()

            # create dictionary for parameter file
            parameter_dict = self.basic_parameters.copy()
166
167
168
169
170

            # set the parameters for a numerical run
            parameter_dict['B_field_interp'] = 'T'
            parameter_dict['use_B_analytic'] = 'F'

171
172
173
174
175
176
177
178
179
            self._build_parameter_dict(
                                    parameter_dict=parameter_dict,
                                    magnetic_field=magnetic_field,
                                    working_directory=working_directory,
                                    local_ensemble_index=local_ensemble_index)

            self._write_parameter_dict(parameter_dict=parameter_dict,
                                       working_directory=working_directory)

180
            # call hammurabi
181
            self._call_hammurabi(working_directory)
182
183
184
185
186
187
188
189
190
191
192
193

            # if hammurabi failed, _fill_observable_dict will fail
            try:
                self._fill_observable_dict(observable_dict,
                                           working_directory,
                                           local_ensemble_index)
            except:
                self.logger.critical("Hammurabi failed! Last call log:\n" +
                                     self.last_call_log)
                raise
            finally:
                self._remove_folder(working_directory)
194

195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
        # compute ensemble_mean on the last node and add it to the observables
        rank = dummy.comm.rank
        size = dummy.comm.size

        if rank + 1 == size:
            self.logger.debug("Processing ensemble mean.")
            # create a temporary folder
            working_directory = self._make_temp_folder()

            # create dictionary for parameter file
            parameter_dict = self.basic_parameters.copy()

            # set the parameters for an analytic run
            parameter_dict['use_B_analytic'] = 'T'

            self._build_parameter_dict(
                                    parameter_dict=parameter_dict,
                                    magnetic_field=magnetic_field,
                                    working_directory=working_directory,
                                    local_ensemble_index=None)

            self._write_parameter_dict(parameter_dict=parameter_dict,
                                       working_directory=working_directory)

            # call hammurabi
            self._call_hammurabi(working_directory)

            # if hammurabi failed, _add_ensemble_mean will fail
            try:
                self._add_ensemble_mean(observable_dict, working_directory)
            except:
                self.logger.critical("Hammurabi failed! Last call log:\n" +
                                     self.last_call_log)
                raise
            finally:
                self._remove_folder(working_directory)
        else:
            working_directory = None
            self._add_ensemble_mean(observable_dict, working_directory)

235
        return observable_dict