Skip to content
Snippets Groups Projects
TEST.py 16.62 KiB
#######################################################################
#                                                                     #
#  Copyright 2015 Max Planck Institute                                #
#                 for Dynamics and Self-Organization                  #
#                                                                     #
#  This file is part of bfps.                                         #
#                                                                     #
#  bfps is free software: you can redistribute it and/or modify       #
#  it under the terms of the GNU General Public License as published  #
#  by the Free Software Foundation, either version 3 of the License,  #
#  or (at your option) any later version.                             #
#                                                                     #
#  bfps is distributed in the hope that it will be useful,            #
#  but WITHOUT ANY WARRANTY; without even the implied warranty of     #
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the      #
#  GNU General Public License for more details.                       #
#                                                                     #
#  You should have received a copy of the GNU General Public License  #
#  along with bfps.  If not, see <http://www.gnu.org/licenses/>       #
#                                                                     #
# Contact: Cristian.Lalescu@ds.mpg.de                                 #
#                                                                     #
#######################################################################



import os
import sys
import shutil
import subprocess
import argparse
import h5py
import math
import numpy as np
import warnings

import bfps
from ._code import _code
from bfps import tools
from bfps import DNS

class TEST(_code):
    """This class is meant to stitch together the C++ code into a final source file,
    compile it, and handle all job launching.
    """
    def __init__(
            self,
            work_dir = './',
            simname = 'test'):
        _code.__init__(
                self,
                work_dir = work_dir,
                simname = simname)
        self.host_info = {'type'        : 'cluster',
                          'environment' : None,
                          'deltanprocs' : 1,
                          'queue'       : '',
                          'mail_address': '',
                          'mail_events' : None}
        self.generate_default_parameters()
        return None
    def set_precision(
            self,
            fluid_dtype):
        if fluid_dtype in [np.float32, np.float64]:
            self.fluid_dtype = fluid_dtype
        elif fluid_dtype in ['single', 'double']:
            if fluid_dtype == 'single':
                self.fluid_dtype = np.dtype(np.float32)
            elif fluid_dtype == 'double':
                self.fluid_dtype = np.dtype(np.float64)
        self.rtype = self.fluid_dtype
        if self.rtype == np.float32:
            self.ctype = np.dtype(np.complex64)
            self.C_field_dtype = 'float'
            self.fluid_precision = 'single'
        elif self.rtype == np.float64:
            self.ctype = np.dtype(np.complex128)
            self.C_field_dtype = 'double'
            self.fluid_precision = 'double'
        return None
    def write_src(self):
        self.version_message = (
                '/***********************************************************************\n' +
                '* this code automatically generated by bfps\n' +
                '* version {0}\n'.format(bfps.__version__) +
                '***********************************************************************/\n\n\n')
        self.include_list = [
                '"base.hpp"',
                '"scope_timer.hpp"',
                '"fftw_interface.hpp"',
                '"full_code/main_code.hpp"',
                '<cmath>',
                '<iostream>',
                '<hdf5.h>',
                '<string>',
                '<cstring>',
                '<fftw3-mpi.h>',
                '<omp.h>',
                '<cfenv>',
                '<cstdlib>',
                '"full_code/{0}.hpp"\n'.format(self.dns_type)]
        self.main = """
            int main(int argc, char *argv[])
            {{
                bool fpe = (
                    (getenv("BFPS_FPE_OFF") == nullptr) ||
                    (getenv("BFPS_FPE_OFF") != std::string("TRUE")));
                return main_code< {0} >(argc, argv, fpe);
            }}
            """.format(self.dns_type + '<{0}>'.format(self.C_field_dtype))
        self.includes = '\n'.join(
                ['#include ' + hh
                 for hh in self.include_list])
        with open(self.name + '.cpp', 'w') as outfile:
            outfile.write(self.version_message + '\n\n')
            outfile.write(self.includes + '\n\n')
            outfile.write(self.main + '\n')
        return None
    def generate_default_parameters(self):
        # these parameters are relevant for all TEST classes
        self.parameters['fftw_plan_rigor'] = 'FFTW_ESTIMATE'
        self.parameters['dealias_type'] = int(1)
        self.parameters['dkx'] = float(1.0)
        self.parameters['dky'] = float(1.0)
        self.parameters['dkz'] = float(1.0)
        self.parameters['filter_length'] = float(1.0)
        self.parameters['random_seed'] = int(1)
        return None
    def generate_extra_parameters(
            self,
            dns_type = None):
        pars = {}
        if dns_type == 'test_interpolation':
            pars['nparticles'] = 3
            pars['tracers0_integration_steps'] = int(4)
            pars['tracers0_neighbours'] = int(1)
            pars['tracers0_smoothness'] = int(1)
        return pars
    def get_kspace(self):
        kspace = {}
        if self.parameters['dealias_type'] == 1:
            kMx = self.parameters['dkx']*(self.parameters['nx']//2 - 1)
            kMy = self.parameters['dky']*(self.parameters['ny']//2 - 1)
            kMz = self.parameters['dkz']*(self.parameters['nz']//2 - 1)
        else:
            kMx = self.parameters['dkx']*(self.parameters['nx']//3 - 1)
            kMy = self.parameters['dky']*(self.parameters['ny']//3 - 1)
            kMz = self.parameters['dkz']*(self.parameters['nz']//3 - 1)
        kspace['kM'] = max(kMx, kMy, kMz)
        kspace['dk'] = min(self.parameters['dkx'],
                           self.parameters['dky'],
                           self.parameters['dkz'])
        nshells = int(kspace['kM'] / kspace['dk']) + 2
        kspace['nshell'] = np.zeros(nshells, dtype = np.int64)
        kspace['kshell'] = np.zeros(nshells, dtype = np.float64)
        kspace['kx'] = np.arange( 0,
                                  self.parameters['nx']//2 + 1).astype(np.float64)*self.parameters['dkx']
        kspace['ky'] = np.arange(-self.parameters['ny']//2 + 1,
                                  self.parameters['ny']//2 + 1).astype(np.float64)*self.parameters['dky']
        kspace['ky'] = np.roll(kspace['ky'], self.parameters['ny']//2+1)
        kspace['kz'] = np.arange(-self.parameters['nz']//2 + 1,
                                  self.parameters['nz']//2 + 1).astype(np.float64)*self.parameters['dkz']
        kspace['kz'] = np.roll(kspace['kz'], self.parameters['nz']//2+1)
        return kspace
    def get_data_file_name(self):
        return os.path.join(self.work_dir, self.simname + '.h5')
    def get_data_file(self):
        return h5py.File(self.get_data_file_name(), 'r')
    def write_par(
            self,
            iter0 = 0,
            particle_ic = None):
        assert (iter0 == 0)
        _code.write_par(self, iter0 = iter0)
        with h5py.File(self.get_data_file_name(), 'r+') as ofile:
            ofile['bfps_info/exec_name'] = self.name
            kspace = self.get_kspace()
            for k in kspace.keys():
                ofile['kspace/' + k] = kspace[k]
            nshells = kspace['nshell'].shape[0]
            kspace = self.get_kspace()
            nshells = kspace['nshell'].shape[0]
            ofile['checkpoint'] = int(0)
        return None
    def job_parser_arguments(
            self,
            parser):
        parser.add_argument(
                '--ncpu',
                type = int,
                dest = 'ncpu',
                default = -1)
        parser.add_argument(
                '--np', '--nprocesses',
                metavar = 'NPROCESSES',
                help = 'number of mpi processes to use',
                type = int,
                dest = 'nb_processes',
                default = 4)
        parser.add_argument(
                '--ntpp', '--nthreads-per-process',
                type = int,
                dest = 'nb_threads_per_process',
                metavar = 'NTHREADS_PER_PROCESS',
                help = 'number of threads to use per MPI process',
                default = 1)
        parser.add_argument(
                '--no-submit',
                action = 'store_true',
                dest = 'no_submit')
        parser.add_argument(
                '--environment',
                type = str,
                dest = 'environment',
                default = None)
        parser.add_argument(
                '--minutes',
                type = int,
                dest = 'minutes',
                default = 5,
                help = 'If environment supports it, this is the requested wall-clock-limit.')
        parser.add_argument(
               '--njobs',
               type = int, dest = 'njobs',
               default = 1)
        return None
    def simulation_parser_arguments(
            self,
            parser):
        parser.add_argument(
                '--simname',
                type = str, dest = 'simname',
                default = 'test')
        parser.add_argument(
               '-n', '--grid-size',
               type = int,
               dest = 'n',
               default = 32,
               metavar = 'N',
               help = 'code is run by default in a grid of NxNxN')
        for coord in ['x', 'y', 'z']:
            parser.add_argument(
                   '--L{0}'.format(coord), '--box-length-{0}'.format(coord),
                   type = float,
                   dest = 'L{0}'.format(coord),
                   default = 2.0,
                   metavar = 'length{0}'.format(coord),
                   help = 'length of the box in the {0} direction will be `length{0} x pi`'.format(coord))
        parser.add_argument(
                '--wd',
                type = str, dest = 'work_dir',
                default = './')
        parser.add_argument(
                '--precision',
                choices = ['single', 'double'],
                type = str,
                default = 'single')
        return None
    def add_parser_arguments(
            self,
            parser):
        subparsers = parser.add_subparsers(
                dest = 'TEST_class',
                help = 'type of simulation to run')
        subparsers.required = True
        parser_filter_test = subparsers.add_parser(
                'filter_test',
                help = 'plain filter test')
        parser_field_test = subparsers.add_parser(
                'field_test',
                help = 'plain field test')
        parser_symmetrize_test = subparsers.add_parser(
                'symmetrize_test',
                help = 'plain symmetrize test')
        parser_field_output_test = subparsers.add_parser(
                'field_output_test',
                help = 'plain field output test')
        parser_test_interpolation = subparsers.add_parser(
                'test_interpolation',
                help = 'test velocity gradient interpolation')
        for parser in ['parser_filter_test',
                       'parser_field_test',
                       'parser_symmetrize_test',
                       'parser_field_output_test',
                       'parser_test_interpolation']:
            eval('self.simulation_parser_arguments(' + parser + ')')
            eval('self.job_parser_arguments(' + parser + ')')
            eval('self.parameters_to_parser_arguments(' + parser + ')')
            eval('self.parameters_to_parser_arguments(' + parser + ',' +
                    'parameters = self.generate_extra_parameters(dns_type = \'' + parser + '\'))')
        return None
    def prepare_launch(
            self,
            args = []):
        opt = _code.prepare_launch(self, args = args)
        self.set_precision(opt.precision)
        self.dns_type = opt.TEST_class
        self.name = self.dns_type + '-' + self.fluid_precision + '-v' + bfps.__version__
        # merge parameters if needed
        self.pars_from_namespace(opt)
        return opt
    def launch(
            self,
            args = [],
            **kwargs):
        opt = self.prepare_launch(args = args)
        self.parameters.update(
                self.generate_extra_parameters(dns_type = self.dns_type))
        self.launch_jobs(opt = opt, **kwargs)
        return None
    def launch_jobs(
            self,
            opt = None,
            particle_initial_condition = None):
        if not os.path.exists(os.path.join(self.work_dir, self.simname + '.h5')):
            self.write_par(
                    particle_ic = particle_initial_condition)
            if self.dns_type == 'test_interpolation':
                if type(particle_initial_condition) == type(None):
                    pbase_shape = (self.parameters['nparticles'],)
                    number_of_particles = self.parameters['nparticles']
                else:
                    pbase_shape = particle_initial_condition.shape[:-1]
                    assert(particle_initial_condition.shape[-1] == 3)
                    number_of_particles = 1
                    for val in pbase_shape[1:]:
                        number_of_particles *= val
                ncomponents = 3
                with h5py.File(os.path.join(self.work_dir, self.simname + '_input.h5'), 'a') as ofile:
                    s = 0
                    ofile.create_group('tracers{0}'.format(s))
                    ofile.create_group('tracers{0}/rhs'.format(s))
                    ofile.create_group('tracers{0}/state'.format(s))
                    ofile['tracers{0}/rhs'.format(s)].create_dataset(
                            '0',
                            shape = (
                                (self.parameters['tracers{0}_integration_steps'.format(s)],) +
                                pbase_shape +
                                (ncomponents,)),
                            dtype = np.float)
                    ofile['tracers{0}/state'.format(s)].create_dataset(
                            '0',
                            shape = (
                                pbase_shape +
                                (ncomponents,)),
                            dtype = np.float)
                    if type(particle_initial_condition) == type(None):
                        ofile['tracers0/state/0'][:] = np.random.random(pbase_shape + (ncomponents,))*2*np.pi
                    else:
                        ofile['tracers0/state/0'][:] = particle_initial_condition
                with h5py.File(os.path.join(self.work_dir, self.simname + '_input.h5'), 'a') as ofile:
                    data = DNS.generate_vector_field(self,
                           write_to_file = False,
                           spectra_slope = 1.0,
                           amplitude = 0.05)
                    #data[:] = 0.0
                    ## ABC
                    #data[0, 0, 1, 1] = -0.5*(1j)
                    #data[0, 0, 1, 2] =  0.5*(1j)
                    #data[0, 1, 0, 0]                         = -0.5*(1j)
                    #data[0, self.parameters['nz'] - 1, 0, 0] =  0.5*(1j)
                    #data[0, 1, 0, 1]                         =  0.5
                    #data[0, self.parameters['nz'] - 1, 0, 1] =  0.5
                    #data[1, 0, 0, 0]                         =  0.5
                    #data[self.parameters['ny'] - 1, 0, 0, 0] =  0.5
                    #data[1, 0, 0, 2]                         = -0.5*(1j)
                    #data[self.parameters['ny'] - 1, 0, 0, 2] =  0.5*(1j)
                    ofile['vorticity/complex/{0}'.format(0)] = data
                with h5py.File(os.path.join(self.work_dir, self.simname + '_output.h5'), 'a') as ofile:
                    ofile.require_group('tracers0')
                    for kk in ['position', 'velocity', 'vorticity', 'velocity_gradient']:
                        ofile['tracers0'].require_group(kk)
        self.run(
                nb_processes = opt.nb_processes,
                nb_threads_per_process = opt.nb_threads_per_process,
                njobs = opt.njobs,
                hours = opt.minutes // 60,
                minutes = opt.minutes % 60,
                no_submit = opt.no_submit)
        return None