hammurapy.py 7.31 KB
Newer Older
1
2
3
4
5
# -*- coding: utf-8 -*-

import abc
import os
import tempfile
6
import subprocess
7
8

import healpy
9
import numpy as np
10

11
from d2o import distributed_data_object
12
13

from imagine.observers.observer import Observer
14
from imagine.magnetic_fields.magnetic_field import MagneticField
15
16


17
class Hammurapy(Observer):
18
    def __init__(self, hammurabi_executable, conf_directory='./confs',
19
                 working_directory_base='.', nside=128):
20
21
        self.hammurabi_executable = os.path.abspath(hammurabi_executable)
        self.conf_directory = os.path.abspath(conf_directory)
22
        self.working_directory_base = os.path.abspath(working_directory_base)
23

24
        self.nside = int(nside)
25

26
27
        self.last_call_log = ""

28
29
30
31
32
        sync_template_fname = os.path.join(self.conf_directory,
                                           'IQU_sync.fits')
        dust_template_fname = os.path.join(self.conf_directory,
                                           'IQU_dust.fits')

33
        self.basic_parameters = {'obs_shell_index_numb': '1',
34
35
                                 'total_shell_numb': '3',
                                 'vec_size_R': '500',
36
37
38
39
40
41
42
                                 'max_radius': '35',
                                 'B_field_transform_lon': '-999',
                                 'B_field_transform_lat': '-999',
                                 'TE_grid_filename': 'negrid_n400.bin',
                                 'TE_nx': '400',
                                 'TE_ny': '400',
                                 'TE_nz': '80',
Theo Steininger's avatar
Theo Steininger committed
43
                                 'TE_interp': 'T',
44
45
46
47
48
49
                                 'do_sync_emission': 'F',
                                 'do_rm': 'F',
                                 'do_dm': 'F',
                                 'do_dust': 'F',
                                 'do_tau': 'F',
                                 'do_ff': 'F',
50
51
52
                                 'obs_freq_GHz': '23',
                                 'sync_template_fname': sync_template_fname,
                                 'dust_template_fname': dust_template_fname
53
                                 }
54
55

    @abc.abstractproperty
56
57
    def magnetic_field_class(self):
        return MagneticField
58
59

    def _make_temp_folder(self):
60
        prefix = os.path.join(self.working_directory_base, 'temp_hammurabi_')
61
62
        return tempfile.mkdtemp(prefix=prefix)

63
64
65
66
67
68
69
70
71
72
73
74
75
76
    def _remove_folder(self, path):
        # Try multiple times in order to overcome 'Directory not empty' errors
        # Hopefully open file handles get closed in the meantime
        n = 0
        while n < 10:
            temp_process = subprocess.Popen(['rm', '-rf', path],
                                            stderr=subprocess.PIPE)
            # wait until process is finished
            errlog = temp_process.communicate()[1]
            # check if there were some errors
            if errlog == '':
                break
        else:
            self.logger.warning('Could not delete %s' % path)
77

78
    def _read_fits_file(self, path, name, nside):
79
80
81
82
83
84
85
        map_path = os.path.join(path, name)
        result_list = []
        i = 0
        while True:
            try:
                loaded_map = healpy.read_map(map_path, verbose=False,
                                             field=i)
86
                loaded_map = healpy.ud_grade(loaded_map, nside_out=nside)
87
88
89
90
91
92
93
94
95
96
97
98
99
100
                result_list += [loaded_map]
                i += 1
            except IndexError:
                break
        return result_list

    def _call_hammurabi(self, path):
        parameters_file_path = os.path.join(path, 'parameters.txt')
        temp_process = subprocess.Popen([self.hammurabi_executable,
                                         parameters_file_path],
                                        stdout=subprocess.PIPE,
                                        cwd=self.conf_directory)
        self.last_call_log = temp_process.communicate()[0]

101
102
    def _initialize_observable_dict(self, observable_dict, magnetic_field):
        pass
103

104
105
106
107
108
    def _build_parameter_dict(self, parameter_dict, magnetic_field,
                              working_directory, local_ensemble_index):
        grid_space = magnetic_field.domain[1]
        lx, ly, lz = np.array(grid_space.shape)*np.array(grid_space.distances)
        nx, ny, nz = grid_space.shape
109
        random_seed = magnetic_field.random_seed[local_ensemble_index]
110
111
112
113
114
115
116
117

        parameter_dict.update({'B_field_lx': lx,
                               'B_field_ly': ly,
                               'B_field_lz': lz,
                               'B_field_nx': nx,
                               'B_field_ny': ny,
                               'B_field_nz': nz,
                               })
118
        parameter_dict.update({'obs_NSIDE': self.nside})
119
        parameter_dict.update({'B_field_seed': random_seed})
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136

    def _write_parameter_dict(self, parameter_dict, working_directory):
        parameters_string = ''
        for item in parameter_dict:
            parameters_string += item + '=' + str(parameter_dict[item]) + '\n'

        parameters_file_path = os.path.join(working_directory,
                                            'parameters.txt')
        with open(parameters_file_path, 'wb') as config_file:
            config_file.write(parameters_string)

    def _fill_observable_dict(self, observable_dict, working_directory,
                              ensemble_index):
        return observable_dict

    def __call__(self, magnetic_field):

137
138
139
140
        if not isinstance(magnetic_field, self.magnetic_field_class):
            raise ValueError("Given magnetic field is not a subclass of" +
                             " %s" % str(self.magnetic_field_class))

141
142
143
144
145
146
147
148
        observable_dict = {}
        self._initialize_observable_dict(observable_dict=observable_dict,
                                         magnetic_field=magnetic_field)

        # iterate over ensemble and put result into result_observable
        # get the local shape by creating a dummy d2o
        ensemble_number = magnetic_field.shape[0]
        dummy = distributed_data_object(global_shape=(ensemble_number,),
149
150
                                        distribution_strategy='equal',
                                        dtype=np.float)
151
152
153

        local_length = dummy.distributor.local_length
        for local_ensemble_index in xrange(local_length):
Theo Steininger's avatar
Theo Steininger committed
154
155
            self.logger.debug("Processing local_ensemble_index %i." %
                              local_ensemble_index)
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
            # create a temporary folder
            working_directory = self._make_temp_folder()

            # create dictionary for parameter file
            parameter_dict = self.basic_parameters.copy()
            self._build_parameter_dict(
                                    parameter_dict=parameter_dict,
                                    magnetic_field=magnetic_field,
                                    working_directory=working_directory,
                                    local_ensemble_index=local_ensemble_index)

            self._write_parameter_dict(parameter_dict=parameter_dict,
                                       working_directory=working_directory)

            self._call_hammurabi(working_directory)
            self._fill_observable_dict(observable_dict,
                                       working_directory,
                                       local_ensemble_index)
            self._remove_folder(working_directory)

        return observable_dict