-
Berenger Bramas authoredBerenger Bramas authored
_code.py 25.57 KiB
#######################################################################
# #
# Copyright 2015 Max Planck Institute #
# for Dynamics and Self-Organization #
# #
# This file is part of bfps. #
# #
# bfps is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published #
# by the Free Software Foundation, either version 3 of the License, #
# or (at your option) any later version. #
# #
# bfps is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with bfps. If not, see <http://www.gnu.org/licenses/> #
# #
# Contact: Cristian.Lalescu@ds.mpg.de #
# #
#######################################################################
import os
import sys
import shutil
import subprocess
import argparse
import h5py
from datetime import datetime
import math
import bfps
from ._base import _base
class _code(_base):
"""This class is meant to stitch together the C++ code into a final source file,
compile it, and handle all job launching.
"""
def __init__(
self,
work_dir = './',
simname = 'test'):
_base.__init__(self, work_dir = work_dir, simname = simname)
self.version_message = ('/***********************************************************************\n' +
'* this code automatically generated by bfps\n' +
'* version {0}\n'.format(bfps.__version__) +
'***********************************************************************/\n\n\n')
self.includes = """
//begincpp
#include "base.hpp"
#include "fluid_solver.hpp"
#include "scope_timer.hpp"
#include <iostream>
#include <hdf5.h>
#include <string>
#include <cstring>
#include <fftw3-mpi.h>
#include <omp.h>
//endcpp
"""
self.variables = 'int myrank, nprocs;\n'
self.variables += 'int iteration;\n'
self.variables += 'char simname[256], fname[256];\n'
self.variables += ('hid_t parameter_file, stat_file, Cdset;\n')
self.definitions = ''
self.main_start = """
//begincpp
int main(int argc, char *argv[])
{
int mpiprovided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &mpiprovided);
assert(mpiprovided >= MPI_THREAD_FUNNELED);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
fftw_init_threads();
fftwf_init_threads();
fftw_mpi_init();
fftwf_mpi_init();
const int nbThreads = omp_get_max_threads();
DEBUG_MSG("Number of threads for the FFTW = %d\\n", nbThreads);
std::cout << "There are " << nprocs << " processes and " << nbThreads << " threads" << std::endl;
fftw_plan_with_nthreads(nbThreads);
fftwf_plan_with_nthreads(nbThreads);
if (argc != 2)
{
std::cerr << "Wrong number of command line arguments. Stopping." << std::endl;
MPI_Finalize();
return EXIT_SUCCESS;
}
strcpy(simname, argv[1]);
sprintf(fname, "%s.h5", simname);
parameter_file = H5Fopen(fname, H5F_ACC_RDONLY, H5P_DEFAULT);
Cdset = H5Dopen(parameter_file, "iteration", H5P_DEFAULT);
H5Dread(Cdset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, &iteration);
DEBUG_MSG("simname is %s and iteration is %d\\n", simname, iteration);
H5Dclose(Cdset);
H5Fclose(parameter_file);
read_parameters();
if (myrank == 0)
{
// set caching parameters
hid_t fapl = H5Pcreate(H5P_FILE_ACCESS);
herr_t cache_err = H5Pset_cache(fapl, 0, 521, 134217728, 1.0);
DEBUG_MSG("when setting stat_file cache I got %d\\n", cache_err);
stat_file = H5Fopen(fname, H5F_ACC_RDWR, fapl);
}
{
TIMEZONE("code::main_start");
//endcpp
"""
for ostream in ['cout', 'cerr']:
self.main_start += 'if (myrank == 0) std::{1} << "{0}" << std::endl;'.format(self.version_message, ostream).replace('\n', '\\n') + '\n'
self.main_end = """
//begincpp
}
// clean up
if (myrank == 0)
{
Cdset = H5Dopen(stat_file, "iteration", H5P_DEFAULT);
H5Dwrite(Cdset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, &iteration);
H5Dclose(Cdset);
H5Fclose(stat_file);
}
fftwf_mpi_cleanup();
fftw_mpi_cleanup();
#ifdef USE_TIMINGOUTPUT
global_timer_manager.show(MPI_COMM_WORLD);
global_timer_manager.showMpi(MPI_COMM_WORLD);
global_timer_manager.showHtml(MPI_COMM_WORLD);
#endif
MPI_Finalize();
return EXIT_SUCCESS;
}
//endcpp
"""
self.host_info = {'type' : 'cluster',
'environment' : None,
'deltanprocs' : 1,
'queue' : '',
'mail_address': '',
'mail_events' : None}
self.main = ''
return None
def write_src(self):
with open(self.name + '.cpp', 'w') as outfile:
outfile.write(self.version_message)
outfile.write(self.includes)
outfile.write(self.cdef_pars())
outfile.write(self.variables)
outfile.write(self.cread_pars())
outfile.write(self.definitions)
outfile.write(self.main_start)
outfile.write(self.main)
outfile.write(self.main_end)
return None
def compile_code(self):
# compile code
if not os.path.isfile(os.path.join(bfps.header_dir, 'base.hpp')):
raise IOError('header not there:\n' +
'{0}\n'.format(os.path.join(bfps.header_dir, 'base.hpp')) +
'{0}\n'.format(bfps.dist_loc))
libraries = ['bfps']
libraries += bfps.install_info['libraries']
libraries += ['fftw3_omp']
libraries += ['fftw3f_omp']
command_strings = [bfps.install_info['compiler']]
command_strings += [self.name + '.cpp', '-o', self.name]
command_strings += bfps.install_info['extra_compile_args']
command_strings += ['-I' + idir for idir in bfps.install_info['include_dirs']]
command_strings.append('-I' + bfps.header_dir)
command_strings += ['-L' + ldir for ldir in bfps.install_info['library_dirs']]
command_strings.append('-L' + bfps.lib_dir)
for libname in libraries:
command_strings += ['-l' + libname]
command_strings += ['-fopenmp']
self.write_src()
print('compiling code with command\n' + ' '.join(command_strings))
return subprocess.call(command_strings)
def set_host_info(
self,
host_info = {}):
self.host_info.update(host_info)
return None
def run(self,
ncpu = 2,
out_file = 'out_file',
err_file = 'err_file',
hours = 0,
minutes = 10,
njobs = 1):
self.read_parameters()
with h5py.File(os.path.join(self.work_dir, self.simname + '.h5'), 'r') as data_file:
iter0 = data_file['iteration'].value
if not os.path.isdir(self.work_dir):
os.makedirs(self.work_dir)
if not os.path.exists(os.path.join(self.work_dir, self.name)):
need_to_compile = True
else:
need_to_compile = (datetime.fromtimestamp(os.path.getctime(os.path.join(self.work_dir, self.name))) <
bfps.install_info['install_date'])
if need_to_compile:
assert(self.compile_code() == 0)
if self.work_dir != os.path.realpath(os.getcwd()):
shutil.copy(self.name, self.work_dir)
current_dir = os.getcwd()
os.chdir(self.work_dir)
os.chdir(current_dir)
command_atoms = ['mpirun',
'-np',
'{0}'.format(ncpu),
'./' + self.name,
self.simname]
if self.host_info['type'] == 'cluster':
job_name_list = []
for j in range(njobs):
suffix = self.simname + '_{0}'.format(iter0 + j*self.parameters['niter_todo'])
qsub_script_name = 'run_' + suffix + '.sh'
self.write_sge_file(
file_name = os.path.join(self.work_dir, qsub_script_name),
nprocesses = ncpu,
name_of_run = suffix,
command_atoms = command_atoms[3:],
hours = hours,
minutes = minutes,
out_file = out_file + '_' + suffix,
err_file = err_file + '_' + suffix)
os.chdir(self.work_dir)
qsub_atoms = ['qsub']
if len(job_name_list) >= 1:
qsub_atoms += ['-hold_jid', job_name_list[-1]]
subprocess.call(qsub_atoms + [qsub_script_name])
os.chdir(current_dir)
job_name_list.append(suffix)
if self.host_info['type'] == 'SLURM':
job_id_list = []
for j in range(njobs):
suffix = self.simname + '_{0}'.format(iter0 + j*self.parameters['niter_todo'])
qsub_script_name = 'run_' + suffix + '.sh'
self.write_slurm_file(
file_name = os.path.join(self.work_dir, qsub_script_name),
nprocesses = ncpu,
name_of_run = suffix,
command_atoms = command_atoms[3:],
hours = hours,
minutes = minutes,
out_file = out_file + '_' + suffix,
err_file = err_file + '_' + suffix)
os.chdir(self.work_dir)
qsub_atoms = ['sbatch']
if len(job_id_list) >= 1:
qsub_atoms += ['--dependency=afterok:{0}'.format(job_id_list[-1])]
p = subprocess.Popen(
qsub_atoms + [qsub_script_name],
stdout = subprocess.PIPE)
out, err = p.communicate()
p.terminate()
job_id_list.append(int(out.split()[-1]))
os.chdir(current_dir)
elif self.host_info['type'] == 'IBMLoadLeveler':
suffix = self.simname + '_{0}'.format(iter0)
job_script_name = 'run_' + suffix + '.sh'
if (njobs == 1):
self.write_IBMLoadLeveler_file_single_job(
file_name = os.path.join(self.work_dir, job_script_name),
nprocesses = ncpu,
name_of_run = suffix,
command_atoms = command_atoms[3:],
hours = hours,
minutes = minutes,
out_file = out_file + '_' + suffix,
err_file = err_file + '_' + suffix)
else:
self.write_IBMLoadLeveler_file_many_job(
file_name = os.path.join(self.work_dir, job_script_name),
nprocesses = ncpu,
name_of_run = suffix,
command_atoms = command_atoms[3:],
hours = hours,
minutes = minutes,
out_file = out_file + '_' + suffix,
err_file = err_file + '_' + suffix,
njobs = njobs)
submit_atoms = ['llsubmit']
subprocess.call(submit_atoms + [os.path.join(self.work_dir, job_script_name)])
elif self.host_info['type'] == 'pc':
os.chdir(self.work_dir)
os.environ['LD_LIBRARY_PATH'] += ':{0}'.format(bfps.lib_dir)
print('added to LD_LIBRARY_PATH the location {0}'.format(bfps.lib_dir))
for j in range(njobs):
suffix = self.simname + '_{0}'.format(iter0 + j*self.parameters['niter_todo'])
print('running code with command\n' + ' '.join(command_atoms))
subprocess.call(command_atoms,
stdout = open(out_file + '_' + suffix, 'w'),
stderr = open(err_file + '_' + suffix, 'w'))
os.chdir(current_dir)
return None
def write_IBMLoadLeveler_file_single_job(
self,
file_name = None,
nprocesses = None,
name_of_run = None,
command_atoms = [],
hours = None,
minutes = None,
out_file = None,
err_file = None):
script_file = open(file_name, 'w')
script_file.write('# @ shell=/bin/bash\n')
# error file
if type(err_file) == type(None):
err_file = 'err.job.$(jobid)'
script_file.write('# @ error = ' + os.path.join(self.work_dir, err_file) + '\n')
# output file
if type(out_file) == type(None):
out_file = 'out.job.$(jobid)'
script_file.write('# @ output = ' + os.path.join(self.work_dir, out_file) + '\n')
script_file.write('# @ job_type = parallel\n')
script_file.write('# @ node_usage = not_shared\n')
nb_cpus_per_node = 20
try:
nb_process_per_node = int(os.environ['NB_PROC_PER_NODE'])
except :
nb_process_per_node=nb_cpus_per_node
print('nb_process_per_node = {} (NB_PROC_PER_NODE)'.format(nb_process_per_node))
nb_cpus_per_task=int(nb_cpus_per_node/nb_process_per_node)
if nb_cpus_per_task*nb_process_per_node != nb_cpus_per_node:
raise Exception('nb cpus {} should be devided per nb proce per node {}(NB_PROC_PER_NODE)'.format(nb_cpus_per_node, nb_process_per_node))
nb_tasks_per_node = int(nb_cpus_per_node/nb_cpus_per_task)
number_of_nodes = int((nprocesses+nb_process_per_node-1)/nb_process_per_node)
first_node_tasks = nprocesses - (number_of_nodes-1)*nb_process_per_node
script_file.write('# @ resources = ConsumableCpus({})\n'.format(nb_cpus_per_task))
script_file.write('# @ network.MPI = sn_all,not_shared,us\n')
script_file.write('# @ wall_clock_limit = {0}:{1:0>2d}:00\n'.format(hours, minutes))
assert(type(self.host_info['environment']) != type(None))
script_file.write('# @ node = {0}\n'.format(number_of_nodes))
script_file.write('# @ tasks_per_node = {0}\n'.format(nb_process_per_node))
if (first_node_tasks > 0):
script_file.write('# @ first_node_tasks = {0}\n'.format(first_node_tasks))
script_file.write('# @ queue\n')
script_file.write('LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:' +
':'.join([bfps.lib_dir] + bfps.install_info['library_dirs']) +
'\n')
script_file.write('echo "Start time is `date`"\n')
script_file.write('cd ' + self.work_dir + '\n')
# script_file.write('cp -s ../*.h5 ./\n')
script_file.write('poe ' +
os.path.join(
self.work_dir,
command_atoms[0]) +
' ' +
' '.join(command_atoms[1:]) +
'\n')
script_file.write('echo "End time is `date`"\n')
script_file.write('exit 0\n')
script_file.close()
return None
def write_IBMLoadLeveler_file_many_job(
self,
file_name = None,
nprocesses = None,
name_of_run = None,
command_atoms = [],
hours = None,
minutes = None,
out_file = None,
err_file = None,
njobs = 2):
assert(type(self.host_info['environment']) != type(None))
script_file = open(file_name, 'w')
script_file.write('# @ shell=/bin/bash\n')
# error file
if type(err_file) == type(None):
err_file = 'err.job.$(jobid).$(stepid)'
script_file.write('# @ error = ' + os.path.join(self.work_dir, err_file) + '\n')
# output file
if type(out_file) == type(None):
out_file = 'out.job.$(jobid).$(stepid)'
script_file.write('# @ output = ' + os.path.join(self.work_dir, out_file) + '\n')
script_file.write('# @ job_type = parallel\n')
script_file.write('# @ node_usage = not_shared\n')
script_file.write('#\n')
nb_cpus_per_node = 20
try:
nb_process_per_node = int(os.environ['NB_PROC_PER_NODE'])
except :
nb_process_per_node=nb_cpus_per_node
print('nb_process_per_node = {} (NB_PROC_PER_NODE)'.format(nb_process_per_node))
nb_cpus_per_task=int(nb_cpus_per_node/nb_process_per_node)
if nb_cpus_per_task*nb_process_per_node != nb_cpus_per_node:
raise Exception('nb cpus {} should be devided per nb proce per node {}(NB_PROC_PER_NODE)'.format(nb_cpus_per_node, nb_process_per_node))
nb_tasks_per_node = int(nb_cpus_per_node/nb_cpus_per_task)
number_of_nodes = int((nprocesses+nb_process_per_node-1)/nb_process_per_node)
first_node_tasks = nprocesses - (number_of_nodes-1)*nb_process_per_node
for job in range(njobs):
script_file.write('# @ step_name = {0}.$(stepid)\n'.format(self.simname))
script_file.write('# @ resources = ConsumableCpus({})\n'.format(nb_cpus_per_task))
script_file.write('# @ network.MPI = sn_all,not_shared,us\n')
script_file.write('# @ wall_clock_limit = {0}:{1:0>2d}:00\n'.format(hours, minutes))
script_file.write('# @ node = {0}\n'.format(number_of_nodes))
script_file.write('# @ tasks_per_node = {0}\n'.format(nb_tasks_per_node))
if (first_node_tasks > 0):
script_file.write('# @ first_node_tasks = {0}\n'.format(first_node_tasks))
script_file.write('# @ queue\n')
script_file.write('LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:' +
':'.join([bfps.lib_dir] + bfps.install_info['library_dirs']) +
'\n')
script_file.write('echo "This is step $LOADL_STEP_ID out of {0}"\n'.format(njobs))
script_file.write('echo "Start time is `date`"\n')
# script_file.write('cp -s ../*.h5 ./\n')
script_file.write('cd ' + self.work_dir + '\n')
script_file.write('poe ' +
os.path.join(
self.work_dir,
command_atoms[0]) +
' ' +
' '.join(command_atoms[1:]) +
'\n')
script_file.write('echo "End time is `date`"\n')
script_file.write('exit 0\n')
script_file.close()
return None
def write_sge_file(
self,
file_name = None,
nprocesses = None,
name_of_run = None,
command_atoms = [],
hours = None,
minutes = None,
out_file = None,
err_file = None):
script_file = open(file_name, 'w')
script_file.write('#!/bin/bash\n')
# export all environment variables
script_file.write('#$ -V\n')
# job name
script_file.write('#$ -N {0}\n'.format(name_of_run))
# use current working directory
script_file.write('#$ -cwd\n')
# error file
if not type(err_file) == type(None):
script_file.write('#$ -e ' + err_file + '\n')
# output file
if not type(out_file) == type(None):
script_file.write('#$ -o ' + out_file + '\n')
if not type(self.host_info['environment']) == type(None):
envprocs = self.host_info['deltanprocs'] * int(math.ceil((nprocesses *1.0/ self.host_info['deltanprocs'])))
script_file.write('#$ -pe {0} {1}\n'.format(
self.host_info['environment'],
envprocs))
script_file.write('echo "got $NSLOTS slots."\n')
script_file.write('echo "Start time is `date`"\n')
script_file.write('mpiexec -machinefile $TMPDIR/machines ' +
'-genv LD_LIBRARY_PATH ' +
'"' +
':'.join([bfps.lib_dir] + bfps.install_info['library_dirs']) +
'" ' +
'-n {0} {1}\n'.format(nprocesses, ' '.join(command_atoms)))
script_file.write('echo "End time is `date`"\n')
script_file.write('exit 0\n')
script_file.close()
return None
def write_slurm_file(
self,
file_name = None,
nprocesses = None,
name_of_run = None,
command_atoms = [],
hours = None,
minutes = None,
out_file = None,
err_file = None):
script_file = open(file_name, 'w')
script_file.write('#!/bin/bash -l\n')
# job name
script_file.write('#SBATCH -J {0}\n'.format(name_of_run))
# use current working directory
script_file.write('#SBATCH -D ./\n')
# error file
if not type(err_file) == type(None):
script_file.write('#SBATCH -e ' + err_file + '\n')
# output file
if not type(out_file) == type(None):
script_file.write('#SBATCH -o ' + out_file + '\n')
script_file.write('#SBATCH --partition={0}\n'.format(
self.host_info['environment']))
nodes = nprocesses // self.host_info['deltanprocs']
if (nodes == 0):
nodes = 1
tasks_per_node = nprocesses
else:
assert(nprocesses % self.host_info['deltanprocs'] == 0)
tasks_per_node = self.host_info['deltanprocs']
script_file.write('#SBATCH --nodes={0}\n'.format(nodes))
nbprocesspernode = int(os.environ['NB_PROC_PER_NODE'])
print('NB_PROC_PER_NODE ', nbprocesspernode)
if (isinstance( nbprocesspernode, int ) and nbprocesspernode == 1) :
script_file.write('#SBATCH --ntasks-per-node=1\n') # tasks_per_node
script_file.write('#SBATCH --cpus-per-task={0}\n'.format(self.host_info['deltanprocs']))
elif (isinstance( nbprocesspernode, int ) and int(int(self.host_info['deltanprocs'])/nbprocesspernode)*nbprocesspernode == int(self.host_info['deltanprocs'])) :
script_file.write('#SBATCH --ntasks-per-node={0}\n'.format(nbprocesspernode))
script_file.write('#SBATCH --cpus-per-task={0}\n'.format(int(int(self.host_info['deltanprocs'])/nbprocesspernode)))
else :
print('Bad NB_PROC_PER_NODE ', nbprocesspernode)
exit(99)
script_file.write('#SBATCH --mail-type=none\n')
script_file.write('#SBATCH --time={0}:{1:0>2d}:00\n'.format(hours, minutes))
script_file.write('export OMP_NUM_THREADS={0}\n'.format(int(int(self.host_info['deltanprocs'])/nbprocesspernode)))
script_file.write('export OMP_PLACES=cores\n')
script_file.write('LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:' +
':'.join([bfps.lib_dir] + bfps.install_info['library_dirs']) +
'\n')
script_file.write('echo "Start time is `date`"\n')
script_file.write('cd ' + self.work_dir + '\n')
script_file.write('export HTMLOUTPUT={}.html\n'.format(command_atoms[-1]))
script_file.write('srun {0}\n'.format(' '.join(command_atoms)))
script_file.write('echo "End time is `date`"\n')
script_file.write('exit 0\n')
script_file.close()
return None
def prepare_launch(
self,
args = [],
**kwargs):
parser = argparse.ArgumentParser('bfps ' + type(self).__name__)
self.add_parser_arguments(parser)
opt = parser.parse_args(args)
self.set_host_info(bfps.host_info)
if type(opt.environment) != type(None):
self.host_info['environment'] = opt.environment
return opt