-
Chichi Lalescu authoredChichi Lalescu authored
TEST.py 16.62 KiB
#######################################################################
# #
# Copyright 2015 Max Planck Institute #
# for Dynamics and Self-Organization #
# #
# This file is part of bfps. #
# #
# bfps is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published #
# by the Free Software Foundation, either version 3 of the License, #
# or (at your option) any later version. #
# #
# bfps is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with bfps. If not, see <http://www.gnu.org/licenses/> #
# #
# Contact: Cristian.Lalescu@ds.mpg.de #
# #
#######################################################################
import os
import sys
import shutil
import subprocess
import argparse
import h5py
import math
import numpy as np
import warnings
import bfps
from ._code import _code
from bfps import tools
from bfps import DNS
class TEST(_code):
"""This class is meant to stitch together the C++ code into a final source file,
compile it, and handle all job launching.
"""
def __init__(
self,
work_dir = './',
simname = 'test'):
_code.__init__(
self,
work_dir = work_dir,
simname = simname)
self.host_info = {'type' : 'cluster',
'environment' : None,
'deltanprocs' : 1,
'queue' : '',
'mail_address': '',
'mail_events' : None}
self.generate_default_parameters()
return None
def set_precision(
self,
fluid_dtype):
if fluid_dtype in [np.float32, np.float64]:
self.fluid_dtype = fluid_dtype
elif fluid_dtype in ['single', 'double']:
if fluid_dtype == 'single':
self.fluid_dtype = np.dtype(np.float32)
elif fluid_dtype == 'double':
self.fluid_dtype = np.dtype(np.float64)
self.rtype = self.fluid_dtype
if self.rtype == np.float32:
self.ctype = np.dtype(np.complex64)
self.C_field_dtype = 'float'
self.fluid_precision = 'single'
elif self.rtype == np.float64:
self.ctype = np.dtype(np.complex128)
self.C_field_dtype = 'double'
self.fluid_precision = 'double'
return None
def write_src(self):
self.version_message = (
'/***********************************************************************\n' +
'* this code automatically generated by bfps\n' +
'* version {0}\n'.format(bfps.__version__) +
'***********************************************************************/\n\n\n')
self.include_list = [
'"base.hpp"',
'"scope_timer.hpp"',
'"fftw_interface.hpp"',
'"full_code/main_code.hpp"',
'<cmath>',
'<iostream>',
'<hdf5.h>',
'<string>',
'<cstring>',
'<fftw3-mpi.h>',
'<omp.h>',
'<cfenv>',
'<cstdlib>',
'"full_code/{0}.hpp"\n'.format(self.dns_type)]
self.main = """
int main(int argc, char *argv[])
{{
bool fpe = (
(getenv("BFPS_FPE_OFF") == nullptr) ||
(getenv("BFPS_FPE_OFF") != std::string("TRUE")));
return main_code< {0} >(argc, argv, fpe);
}}
""".format(self.dns_type + '<{0}>'.format(self.C_field_dtype))
self.includes = '\n'.join(
['#include ' + hh
for hh in self.include_list])
with open(self.name + '.cpp', 'w') as outfile:
outfile.write(self.version_message + '\n\n')
outfile.write(self.includes + '\n\n')
outfile.write(self.main + '\n')
return None
def generate_default_parameters(self):
# these parameters are relevant for all TEST classes
self.parameters['fftw_plan_rigor'] = 'FFTW_ESTIMATE'
self.parameters['dealias_type'] = int(1)
self.parameters['dkx'] = float(1.0)
self.parameters['dky'] = float(1.0)
self.parameters['dkz'] = float(1.0)
self.parameters['filter_length'] = float(1.0)
self.parameters['random_seed'] = int(1)
return None
def generate_extra_parameters(
self,
dns_type = None):
pars = {}
if dns_type == 'test_interpolation':
pars['nparticles'] = 3
pars['tracers0_integration_steps'] = int(4)
pars['tracers0_neighbours'] = int(1)
pars['tracers0_smoothness'] = int(1)
return pars
def get_kspace(self):
kspace = {}
if self.parameters['dealias_type'] == 1:
kMx = self.parameters['dkx']*(self.parameters['nx']//2 - 1)
kMy = self.parameters['dky']*(self.parameters['ny']//2 - 1)
kMz = self.parameters['dkz']*(self.parameters['nz']//2 - 1)
else:
kMx = self.parameters['dkx']*(self.parameters['nx']//3 - 1)
kMy = self.parameters['dky']*(self.parameters['ny']//3 - 1)
kMz = self.parameters['dkz']*(self.parameters['nz']//3 - 1)
kspace['kM'] = max(kMx, kMy, kMz)
kspace['dk'] = min(self.parameters['dkx'],
self.parameters['dky'],
self.parameters['dkz'])
nshells = int(kspace['kM'] / kspace['dk']) + 2
kspace['nshell'] = np.zeros(nshells, dtype = np.int64)
kspace['kshell'] = np.zeros(nshells, dtype = np.float64)
kspace['kx'] = np.arange( 0,
self.parameters['nx']//2 + 1).astype(np.float64)*self.parameters['dkx']
kspace['ky'] = np.arange(-self.parameters['ny']//2 + 1,
self.parameters['ny']//2 + 1).astype(np.float64)*self.parameters['dky']
kspace['ky'] = np.roll(kspace['ky'], self.parameters['ny']//2+1)
kspace['kz'] = np.arange(-self.parameters['nz']//2 + 1,
self.parameters['nz']//2 + 1).astype(np.float64)*self.parameters['dkz']
kspace['kz'] = np.roll(kspace['kz'], self.parameters['nz']//2+1)
return kspace
def get_data_file_name(self):
return os.path.join(self.work_dir, self.simname + '.h5')
def get_data_file(self):
return h5py.File(self.get_data_file_name(), 'r')
def write_par(
self,
iter0 = 0,
particle_ic = None):
assert (iter0 == 0)
_code.write_par(self, iter0 = iter0)
with h5py.File(self.get_data_file_name(), 'r+') as ofile:
ofile['bfps_info/exec_name'] = self.name
kspace = self.get_kspace()
for k in kspace.keys():
ofile['kspace/' + k] = kspace[k]
nshells = kspace['nshell'].shape[0]
kspace = self.get_kspace()
nshells = kspace['nshell'].shape[0]
ofile['checkpoint'] = int(0)
return None
def job_parser_arguments(
self,
parser):
parser.add_argument(
'--ncpu',
type = int,
dest = 'ncpu',
default = -1)
parser.add_argument(
'--np', '--nprocesses',
metavar = 'NPROCESSES',
help = 'number of mpi processes to use',
type = int,
dest = 'nb_processes',
default = 4)
parser.add_argument(
'--ntpp', '--nthreads-per-process',
type = int,
dest = 'nb_threads_per_process',
metavar = 'NTHREADS_PER_PROCESS',
help = 'number of threads to use per MPI process',
default = 1)
parser.add_argument(
'--no-submit',
action = 'store_true',
dest = 'no_submit')
parser.add_argument(
'--environment',
type = str,
dest = 'environment',
default = None)
parser.add_argument(
'--minutes',
type = int,
dest = 'minutes',
default = 5,
help = 'If environment supports it, this is the requested wall-clock-limit.')
parser.add_argument(
'--njobs',
type = int, dest = 'njobs',
default = 1)
return None
def simulation_parser_arguments(
self,
parser):
parser.add_argument(
'--simname',
type = str, dest = 'simname',
default = 'test')
parser.add_argument(
'-n', '--grid-size',
type = int,
dest = 'n',
default = 32,
metavar = 'N',
help = 'code is run by default in a grid of NxNxN')
for coord in ['x', 'y', 'z']:
parser.add_argument(
'--L{0}'.format(coord), '--box-length-{0}'.format(coord),
type = float,
dest = 'L{0}'.format(coord),
default = 2.0,
metavar = 'length{0}'.format(coord),
help = 'length of the box in the {0} direction will be `length{0} x pi`'.format(coord))
parser.add_argument(
'--wd',
type = str, dest = 'work_dir',
default = './')
parser.add_argument(
'--precision',
choices = ['single', 'double'],
type = str,
default = 'single')
return None
def add_parser_arguments(
self,
parser):
subparsers = parser.add_subparsers(
dest = 'TEST_class',
help = 'type of simulation to run')
subparsers.required = True
parser_filter_test = subparsers.add_parser(
'filter_test',
help = 'plain filter test')
parser_field_test = subparsers.add_parser(
'field_test',
help = 'plain field test')
parser_symmetrize_test = subparsers.add_parser(
'symmetrize_test',
help = 'plain symmetrize test')
parser_field_output_test = subparsers.add_parser(
'field_output_test',
help = 'plain field output test')
parser_test_interpolation = subparsers.add_parser(
'test_interpolation',
help = 'test velocity gradient interpolation')
for parser in ['parser_filter_test',
'parser_field_test',
'parser_symmetrize_test',
'parser_field_output_test',
'parser_test_interpolation']:
eval('self.simulation_parser_arguments(' + parser + ')')
eval('self.job_parser_arguments(' + parser + ')')
eval('self.parameters_to_parser_arguments(' + parser + ')')
eval('self.parameters_to_parser_arguments(' + parser + ',' +
'parameters = self.generate_extra_parameters(dns_type = \'' + parser + '\'))')
return None
def prepare_launch(
self,
args = []):
opt = _code.prepare_launch(self, args = args)
self.set_precision(opt.precision)
self.dns_type = opt.TEST_class
self.name = self.dns_type + '-' + self.fluid_precision + '-v' + bfps.__version__
# merge parameters if needed
self.pars_from_namespace(opt)
return opt
def launch(
self,
args = [],
**kwargs):
opt = self.prepare_launch(args = args)
self.parameters.update(
self.generate_extra_parameters(dns_type = self.dns_type))
self.launch_jobs(opt = opt, **kwargs)
return None
def launch_jobs(
self,
opt = None,
particle_initial_condition = None):
if not os.path.exists(os.path.join(self.work_dir, self.simname + '.h5')):
self.write_par(
particle_ic = particle_initial_condition)
if self.dns_type == 'test_interpolation':
if type(particle_initial_condition) == type(None):
pbase_shape = (self.parameters['nparticles'],)
number_of_particles = self.parameters['nparticles']
else:
pbase_shape = particle_initial_condition.shape[:-1]
assert(particle_initial_condition.shape[-1] == 3)
number_of_particles = 1
for val in pbase_shape[1:]:
number_of_particles *= val
ncomponents = 3
with h5py.File(os.path.join(self.work_dir, self.simname + '_input.h5'), 'a') as ofile:
s = 0
ofile.create_group('tracers{0}'.format(s))
ofile.create_group('tracers{0}/rhs'.format(s))
ofile.create_group('tracers{0}/state'.format(s))
ofile['tracers{0}/rhs'.format(s)].create_dataset(
'0',
shape = (
(self.parameters['tracers{0}_integration_steps'.format(s)],) +
pbase_shape +
(ncomponents,)),
dtype = np.float)
ofile['tracers{0}/state'.format(s)].create_dataset(
'0',
shape = (
pbase_shape +
(ncomponents,)),
dtype = np.float)
if type(particle_initial_condition) == type(None):
ofile['tracers0/state/0'][:] = np.random.random(pbase_shape + (ncomponents,))*2*np.pi
else:
ofile['tracers0/state/0'][:] = particle_initial_condition
with h5py.File(os.path.join(self.work_dir, self.simname + '_input.h5'), 'a') as ofile:
data = DNS.generate_vector_field(self,
write_to_file = False,
spectra_slope = 1.0,
amplitude = 0.05)
#data[:] = 0.0
## ABC
#data[0, 0, 1, 1] = -0.5*(1j)
#data[0, 0, 1, 2] = 0.5*(1j)
#data[0, 1, 0, 0] = -0.5*(1j)
#data[0, self.parameters['nz'] - 1, 0, 0] = 0.5*(1j)
#data[0, 1, 0, 1] = 0.5
#data[0, self.parameters['nz'] - 1, 0, 1] = 0.5
#data[1, 0, 0, 0] = 0.5
#data[self.parameters['ny'] - 1, 0, 0, 0] = 0.5
#data[1, 0, 0, 2] = -0.5*(1j)
#data[self.parameters['ny'] - 1, 0, 0, 2] = 0.5*(1j)
ofile['vorticity/complex/{0}'.format(0)] = data
with h5py.File(os.path.join(self.work_dir, self.simname + '_output.h5'), 'a') as ofile:
ofile.require_group('tracers0')
for kk in ['position', 'velocity', 'vorticity', 'velocity_gradient']:
ofile['tracers0'].require_group(kk)
self.run(
nb_processes = opt.nb_processes,
nb_threads_per_process = opt.nb_threads_per_process,
njobs = opt.njobs,
hours = opt.minutes // 60,
minutes = opt.minutes % 60,
no_submit = opt.no_submit)
return None