hammurapy.py 7.31 KB
Newer Older
1 2 3 4 5
# -*- coding: utf-8 -*-

import abc
import os
import tempfile
6
import subprocess
7 8

import healpy
9
import numpy as np
10

11
from d2o import distributed_data_object
12 13

from imagine.observers.observer import Observer
14
from imagine.magnetic_fields.magnetic_field import MagneticField
15 16


17
class Hammurapy(Observer):
18
    def __init__(self, hammurabi_executable, conf_directory='./confs',
19
                 working_directory_base='.', nside=128):
20 21
        self.hammurabi_executable = os.path.abspath(hammurabi_executable)
        self.conf_directory = os.path.abspath(conf_directory)
22
        self.working_directory_base = os.path.abspath(working_directory_base)
23

24
        self.nside = int(nside)
25

26 27
        self.last_call_log = ""

28 29 30 31 32
        sync_template_fname = os.path.join(self.conf_directory,
                                           'IQU_sync.fits')
        dust_template_fname = os.path.join(self.conf_directory,
                                           'IQU_dust.fits')

33
        self.basic_parameters = {'obs_shell_index_numb': '1',
34 35
                                 'total_shell_numb': '3',
                                 'vec_size_R': '500',
36 37 38 39 40 41 42
                                 'max_radius': '35',
                                 'B_field_transform_lon': '-999',
                                 'B_field_transform_lat': '-999',
                                 'TE_grid_filename': 'negrid_n400.bin',
                                 'TE_nx': '400',
                                 'TE_ny': '400',
                                 'TE_nz': '80',
Theo Steininger's avatar
Theo Steininger committed
43
                                 'TE_interp': 'T',
44 45 46 47 48 49
                                 'do_sync_emission': 'F',
                                 'do_rm': 'F',
                                 'do_dm': 'F',
                                 'do_dust': 'F',
                                 'do_tau': 'F',
                                 'do_ff': 'F',
50 51 52
                                 'obs_freq_GHz': '23',
                                 'sync_template_fname': sync_template_fname,
                                 'dust_template_fname': dust_template_fname
53
                                 }
54 55

    @abc.abstractproperty
56 57
    def magnetic_field_class(self):
        return MagneticField
58 59

    def _make_temp_folder(self):
60
        prefix = os.path.join(self.working_directory_base, 'temp_hammurabi_')
61 62
        return tempfile.mkdtemp(prefix=prefix)

63 64 65 66 67 68 69 70 71 72 73 74 75 76
    def _remove_folder(self, path):
        # Try multiple times in order to overcome 'Directory not empty' errors
        # Hopefully open file handles get closed in the meantime
        n = 0
        while n < 10:
            temp_process = subprocess.Popen(['rm', '-rf', path],
                                            stderr=subprocess.PIPE)
            # wait until process is finished
            errlog = temp_process.communicate()[1]
            # check if there were some errors
            if errlog == '':
                break
        else:
            self.logger.warning('Could not delete %s' % path)
77

78
    def _read_fits_file(self, path, name, nside):
79 80 81 82 83 84 85
        map_path = os.path.join(path, name)
        result_list = []
        i = 0
        while True:
            try:
                loaded_map = healpy.read_map(map_path, verbose=False,
                                             field=i)
86
                loaded_map = healpy.ud_grade(loaded_map, nside_out=nside)
87 88 89 90 91 92 93 94 95 96 97 98 99 100
                result_list += [loaded_map]
                i += 1
            except IndexError:
                break
        return result_list

    def _call_hammurabi(self, path):
        parameters_file_path = os.path.join(path, 'parameters.txt')
        temp_process = subprocess.Popen([self.hammurabi_executable,
                                         parameters_file_path],
                                        stdout=subprocess.PIPE,
                                        cwd=self.conf_directory)
        self.last_call_log = temp_process.communicate()[0]

101 102
    def _initialize_observable_dict(self, observable_dict, magnetic_field):
        pass
103

104 105 106 107 108
    def _build_parameter_dict(self, parameter_dict, magnetic_field,
                              working_directory, local_ensemble_index):
        grid_space = magnetic_field.domain[1]
        lx, ly, lz = np.array(grid_space.shape)*np.array(grid_space.distances)
        nx, ny, nz = grid_space.shape
109
        random_seed = magnetic_field.random_seed[local_ensemble_index]
110 111 112 113 114 115 116 117

        parameter_dict.update({'B_field_lx': lx,
                               'B_field_ly': ly,
                               'B_field_lz': lz,
                               'B_field_nx': nx,
                               'B_field_ny': ny,
                               'B_field_nz': nz,
                               })
118
        parameter_dict.update({'obs_NSIDE': self.nside})
119
        parameter_dict.update({'B_field_seed': random_seed})
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136

    def _write_parameter_dict(self, parameter_dict, working_directory):
        parameters_string = ''
        for item in parameter_dict:
            parameters_string += item + '=' + str(parameter_dict[item]) + '\n'

        parameters_file_path = os.path.join(working_directory,
                                            'parameters.txt')
        with open(parameters_file_path, 'wb') as config_file:
            config_file.write(parameters_string)

    def _fill_observable_dict(self, observable_dict, working_directory,
                              ensemble_index):
        return observable_dict

    def __call__(self, magnetic_field):

137 138 139 140
        if not isinstance(magnetic_field, self.magnetic_field_class):
            raise ValueError("Given magnetic field is not a subclass of" +
                             " %s" % str(self.magnetic_field_class))

141 142 143 144 145 146 147 148
        observable_dict = {}
        self._initialize_observable_dict(observable_dict=observable_dict,
                                         magnetic_field=magnetic_field)

        # iterate over ensemble and put result into result_observable
        # get the local shape by creating a dummy d2o
        ensemble_number = magnetic_field.shape[0]
        dummy = distributed_data_object(global_shape=(ensemble_number,),
149 150
                                        distribution_strategy='equal',
                                        dtype=np.float)
151 152 153

        local_length = dummy.distributor.local_length
        for local_ensemble_index in xrange(local_length):
Theo Steininger's avatar
Theo Steininger committed
154 155
            self.logger.debug("Processing local_ensemble_index %i." %
                              local_ensemble_index)
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
            # create a temporary folder
            working_directory = self._make_temp_folder()

            # create dictionary for parameter file
            parameter_dict = self.basic_parameters.copy()
            self._build_parameter_dict(
                                    parameter_dict=parameter_dict,
                                    magnetic_field=magnetic_field,
                                    working_directory=working_directory,
                                    local_ensemble_index=local_ensemble_index)

            self._write_parameter_dict(parameter_dict=parameter_dict,
                                       working_directory=working_directory)

            self._call_hammurabi(working_directory)
            self._fill_observable_dict(observable_dict,
                                       working_directory,
                                       local_ensemble_index)
            self._remove_folder(working_directory)

        return observable_dict