Commit da8dc217 authored by Cristian Lalescu's avatar Cristian Lalescu
Browse files

Merge branch 'develop-threaded-fftw' into develop

New fluid solver NSVorticityEquation should be stable from now on.
Old fluid solver NavierStokes should work as before.
parents 981185dc 77d75c0b
......@@ -98,7 +98,7 @@ class FluidConvert(_fluid_particle_base):
nx, ny, nz,
dkx, dky, dkz,
dealias_type,
FFTW_ESTIMATE);
DEFAULT_FFTW_FLAG);
//endcpp
""".format(self.C_dtype)
self.fluid_loop += """
......
......@@ -44,6 +44,13 @@ class NSVorticityEquation(_fluid_particle_base):
fluid_precision = 'single',
fftw_plan_rigor = 'FFTW_MEASURE',
use_fftw_wisdom = True):
"""
This code uses checkpoints for DNS restarts, and it can be stopped
by creating the file "stop_<simname>" in the working directory.
For postprocessing of field snapshots, consider creating a separate
HDF5 file (from the python wrapper) which contains links to all the
different snapshots.
"""
self.fftw_plan_rigor = fftw_plan_rigor
_fluid_particle_base.__init__(
self,
......@@ -52,15 +59,16 @@ class NSVorticityEquation(_fluid_particle_base):
simname = simname,
dtype = fluid_precision,
use_fftw_wisdom = use_fftw_wisdom)
self.parameters['nu'] = 0.1
self.parameters['nu'] = float(0.1)
self.parameters['fmode'] = 1
self.parameters['famplitude'] = 0.5
self.parameters['fk0'] = 2.0
self.parameters['fk1'] = 4.0
self.parameters['famplitude'] = float(0.5)
self.parameters['fk0'] = float(2.0)
self.parameters['fk1'] = float(4.0)
self.parameters['forcing_type'] = 'linear'
self.parameters['histogram_bins'] = 256
self.parameters['max_velocity_estimate'] = 1.0
self.parameters['max_vorticity_estimate'] = 1.0
self.parameters['histogram_bins'] = int(256)
self.parameters['max_velocity_estimate'] = float(1)
self.parameters['max_vorticity_estimate'] = float(1)
self.parameters['checkpoints_per_file'] = int(1)
self.file_datasets_grow = """
//begincpp
hid_t group;
......@@ -72,15 +80,7 @@ class NSVorticityEquation(_fluid_particle_base):
self.style = {}
self.statistics = {}
self.fluid_output = """
{
char file_name[512];
fs->fill_up_filename("cvorticity", file_name);
fs->cvorticity->io(
file_name,
"vorticity",
fs->iteration,
false);
}
fs->io_checkpoint(false);
"""
# vorticity_equation specific things
self.includes += '#include "vorticity_equation.hpp"\n'
......@@ -206,6 +206,22 @@ class NSVorticityEquation(_fluid_particle_base):
'fs->rvorticity->get_rdata()',
data_type = field_H5T)
self.stat_src += '}\n'
## checkpoint
self.stat_src += """
//begincpp
if (myrank == 0)
{
std::string fname = (
std::string("stop_") +
std::string(simname));
{
struct stat file_buffer;
stop_code_now = (stat(fname.c_str(), &file_buffer) == 0);
}
}
MPI_Bcast(&stop_code_now, 1, MPI_C_BOOL, 0, MPI_COMM_WORLD);
//endcpp
"""
return None
def fill_up_fluid_code(self):
self.fluid_includes += '#include <cstring>\n'
......@@ -224,6 +240,43 @@ class NSVorticityEquation(_fluid_particle_base):
field_H5T = 'H5T_NATIVE_FLOAT'
elif self.dtype == np.float64:
field_H5T = 'H5T_NATIVE_DOUBLE'
self.variables += 'int checkpoint;\n'
self.variables += 'bool stop_code_now;\n'
self.read_checkpoint = """
//begincpp
if (myrank == 0)
{
hid_t dset = H5Dopen(stat_file, "checkpoint", H5P_DEFAULT);
H5Dread(
dset,
H5T_NATIVE_INT,
H5S_ALL,
H5S_ALL,
H5P_DEFAULT,
&checkpoint);
H5Dclose(dset);
}
MPI_Bcast(&checkpoint, 1, MPI_INT, 0, MPI_COMM_WORLD);
fs->checkpoint = checkpoint;
//endcpp
"""
self.store_checkpoint = """
//begincpp
checkpoint = fs->checkpoint;
if (myrank == 0)
{
hid_t dset = H5Dopen(stat_file, "checkpoint", H5P_DEFAULT);
H5Dwrite(
dset,
H5T_NATIVE_INT,
H5S_ALL,
H5S_ALL,
H5P_DEFAULT,
&checkpoint);
H5Dclose(dset);
}
//endcpp
"""
self.fluid_start += """
//begincpp
char fname[512];
......@@ -240,6 +293,7 @@ class NSVorticityEquation(_fluid_particle_base):
nx, ny, nz,
MPI_COMM_WORLD,
{1});
fs->checkpoints_per_file = checkpoints_per_file;
fs->nu = nu;
fs->fmode = fmode;
fs->famplitude = famplitude;
......@@ -247,26 +301,29 @@ class NSVorticityEquation(_fluid_particle_base):
fs->fk1 = fk1;
strncpy(fs->forcing_type, forcing_type, 128);
fs->iteration = iteration;
{{
char file_name[512];
fs->fill_up_filename("cvorticity", file_name);
fs->cvorticity->real_space_representation = false;
fs->cvorticity->io(
file_name,
"vorticity",
fs->iteration,
true);
fs->kk->template low_pass<{0}, THREE>(fs->cvorticity->get_cdata(), fs->kk->kM);
fs->kk->template force_divfree<{0}>(fs->cvorticity->get_cdata());
}}
{2}
fs->cvorticity->real_space_representation = false;
fs->io_checkpoint();
//endcpp
""".format(self.C_dtype, self.fftw_plan_rigor, field_H5T)
""".format(
self.C_dtype,
self.fftw_plan_rigor,
self.read_checkpoint)
self.fluid_start += self.store_kspace
self.fluid_start += 'stop_code_now = false;\n'
self.fluid_loop = 'fs->step(dt);\n'
self.fluid_loop += ('if (fs->iteration % niter_out == 0)\n{\n' +
self.fluid_output + '\n}\n')
self.fluid_output +
self.store_checkpoint +
'\n}\n' +
'if (stop_code_now){\n' +
'iteration = fs->iteration;\n' +
'break;\n}\n')
self.fluid_end = ('if (fs->iteration % niter_out != 0)\n{\n' +
self.fluid_output + '\n}\n' +
self.fluid_output +
self.store_checkpoint +
'DEBUG_MSG("checkpoint value is %d\\n", checkpoint);\n' +
'\n}\n' +
'delete fs;\n' +
'delete tmp_vec_field;\n' +
'delete tmp_scal_field;\n')
......@@ -487,6 +544,7 @@ class NSVorticityEquation(_fluid_particle_base):
self.parameters['histogram_bins'],
4),
dtype = np.int64)
ofile['checkpoint'] = int(0)
return None
def specific_parser_arguments(
self,
......@@ -581,30 +639,38 @@ class NSVorticityEquation(_fluid_particle_base):
self.write_par()
init_condition_file = os.path.join(
self.work_dir,
self.simname + '_cvorticity_i{0:0>5x}.h5'.format(0))
self.simname + '_checkpoint_0.h5')
if not os.path.exists(init_condition_file):
f = h5py.File(init_condition_file, 'w')
if len(opt.src_simname) > 0:
src_file = os.path.join(
source_cp = 0
src_file = 'not_a_file'
while True:
src_file = os.path.join(
os.path.realpath(opt.src_work_dir),
opt.src_simname + '_cvorticity_i{0:0>5x}.h5'.format(opt.src_iteration))
f = h5py.File(init_condition_file, 'w')
opt.src_simname + '_checkpoint_{0}.h5'.format(source_cp))
f0 = h5py.File(src_file, 'r')
if '{0}'.format(opt.src_iteration) in f0['vorticity/complex'].keys():
f0.close()
break
source_cp += 1
f['vorticity/complex/{0}'.format(0)] = h5py.ExternalLink(
src_file,
'vorticity/complex/{0}'.format(opt.src_iteration))
f.close()
else:
data = self.generate_vector_field(
write_to_file = False,
spectra_slope = 2.0,
amplitude = 0.05)
f = h5py.File(init_condition_file, 'w')
f['vorticity/complex/{0}'.format(0)] = data
f.close()
f.close()
self.run(
ncpu = opt.ncpu,
nb_processes = opt.nb_processes,
nb_threads_per_process = opt.nb_threads_per_process,
njobs = opt.njobs,
hours = opt.minutes // 60,
minutes = opt.minutes % 60)
minutes = opt.minutes % 60,
no_submit = opt.no_submit)
return None
if __name__ == '__main__':
......
......@@ -1179,10 +1179,12 @@ class NavierStokes(_fluid_particle_base):
write_to_file = True,
spectra_slope = 2.0,
amplitude = 0.05)
self.run(
ncpu = opt.ncpu,
self.run(
nb_processes = opt.nb_processes,
nb_threads_per_process = opt.nb_threads_per_process,
njobs = opt.njobs,
hours = opt.minutes // 60,
minutes = opt.minutes % 60)
minutes = opt.minutes % 60,
no_submit = opt.no_submit)
return None
......@@ -250,7 +250,19 @@ class _base(object):
parser.add_argument(
'--ncpu',
type = int, dest = 'ncpu',
default = 2)
default = -1)
parser.add_argument(
'--np', '--nprocesses',
type = int, dest = 'nb_processes',
default = 4)
parser.add_argument(
'--ntpp', '--nthreads-per-process',
type = int, dest = 'nb_threads_per_process',
default = 1)
parser.add_argument(
'--no-submit',
action = 'store_true',
dest = 'no_submit')
parser.add_argument(
'--simname',
type = str, dest = 'simname',
......@@ -270,6 +282,7 @@ class _base(object):
dest = 'minutes',
default = 5,
help = 'If environment supports it, this is the requested wall-clock-limit.')
return None
def parameters_to_parser_arguments(
self,
......
This diff is collapsed.
......@@ -264,8 +264,15 @@ class _fluid_particle_base(_code):
'<< time_difference/nprocs << " seconds" << std::endl;\n' +
'time0 = time1;\n')
if not postprocess_mode:
self.main += 'for (int max_iter = iteration+niter_todo; iteration < max_iter; iteration++)\n'
self.main += 'for (int max_iter = iteration+niter_todo-iteration%niter_todo; iteration < max_iter; iteration++)\n'
self.main += '{\n'
self.main += """
#ifdef USE_TIMINGOUTPUT
const std::string loopLabel = "code::main_start::loop-" + std::to_string(iteration);
TIMEZONE(loopLabel.c_str());
#endif
"""
self.main += 'if (iteration % niter_stat == 0) do_stats();\n'
if self.particle_species > 0:
self.main += 'if (iteration % niter_part == 0) do_particle_stats();\n'
......@@ -279,6 +286,12 @@ class _fluid_particle_base(_code):
else:
self.main += 'for (int frame_index = iter0; frame_index <= iter1; frame_index += niter_out)\n'
self.main += '{\n'
self.main += """
#ifdef USE_TIMINGOUTPUT
const std::string loopLabel = "code::main_start::loop-" + std::to_string(frame_index);
TIMEZONE(loopLabel.c_str());
#endif
"""
if self.particle_species > 0:
self.main += self.particle_loop
self.main += self.fluid_loop
......
......@@ -133,5 +133,7 @@ inline void DEBUG_MSG_WAIT(MPI_Comm communicator, const char * format, ...)
#endif//NDEBUG
#define variable_used_only_in_assert(x) ((void)(x))
#endif//BASE
......@@ -27,6 +27,13 @@
#include <fftw3-mpi.h>
#ifdef USE_FFTWESTIMATE
#define DEFAULT_FFTW_FLAG FFTW_ESTIMATE
#warning You are using FFTW estimate
#else
#define DEFAULT_FFTW_FLAG FFTW_PATIENT
#endif
template <class realtype>
class fftw_interface;
......
......@@ -30,6 +30,7 @@
#include <cassert>
#include "field.hpp"
#include "scope_timer.hpp"
#include "shared_array.hpp"
......@@ -706,19 +707,24 @@ void field<rnumber, be, fc>::compute_rspace_stats(
MPI_Bcast(&nbins, 1, MPI_INT, 0, this->comm);
}
assert(nvals == int(max_estimate.size()));
double *moments = new double[nmoments*nvals];
double *local_moments = new double[nmoments*nvals];
double *val_tmp = new double[nvals];
shared_array<double> local_moments_threaded(nmoments*nvals, [&](double* local_moments){
std::fill_n(local_moments, nmoments*nvals, 0);
if (nvals == 4) local_moments[3] = max_estimate[3];
});
shared_array<double> val_tmp_threaded(nvals,[&](double *val_tmp){
std::fill_n(val_tmp, nvals, 0);
});
shared_array<ptrdiff_t> local_hist_threaded(nbins*nvals,[&](ptrdiff_t* local_hist){
std::fill_n(local_hist, nbins*nvals, 0);
});
double *binsize = new double[nvals];
double *pow_tmp = new double[nvals];
ptrdiff_t *hist = new ptrdiff_t[nbins*nvals];
ptrdiff_t *local_hist = new ptrdiff_t[nbins*nvals];
int bin;
for (int i=0; i<nvals; i++)
binsize[i] = 2*max_estimate[i] / nbins;
std::fill_n(local_hist, nbins*nvals, 0);
std::fill_n(local_moments, nmoments*nvals, 0);
if (nvals == 4) local_moments[3] = max_estimate[3];
{
TIMEZONE("field::RLOOP");
this->RLOOP(
......@@ -726,7 +732,10 @@ void field<rnumber, be, fc>::compute_rspace_stats(
ptrdiff_t xindex,
ptrdiff_t yindex,
ptrdiff_t zindex){
std::fill_n(pow_tmp, nvals, 1.0);
double *local_moments = local_moments_threaded.getMine();
double *val_tmp = val_tmp_threaded.getMine();
ptrdiff_t *local_hist = local_hist_threaded.getMine();
if (nvals == int(4)) val_tmp[3] = 0.0;
for (unsigned int i=0; i<ncomp(fc); i++)
{
......@@ -740,9 +749,10 @@ void field<rnumber, be, fc>::compute_rspace_stats(
local_moments[0*nvals+3] = val_tmp[3];
if (val_tmp[3] > local_moments[9*nvals+3])
local_moments[9*nvals+3] = val_tmp[3];
bin = int(floor(val_tmp[3]*2/binsize[3]));
if (bin >= 0 && bin < nbins)
int bin = int(floor(val_tmp[3]*2/binsize[3]));
if (bin >= 0 && bin < nbins){
local_hist[bin*nvals+3]++;
}
}
for (unsigned int i=0; i<ncomp(fc); i++)
{
......@@ -750,34 +760,58 @@ void field<rnumber, be, fc>::compute_rspace_stats(
local_moments[0*nvals+i] = val_tmp[i];
if (val_tmp[i] > local_moments[(nmoments-1)*nvals+i])
local_moments[(nmoments-1)*nvals+i] = val_tmp[i];
bin = int(floor((val_tmp[i] + max_estimate[i]) / binsize[i]));
int bin = int(floor((val_tmp[i] + max_estimate[i]) / binsize[i]));
if (bin >= 0 && bin < nbins)
local_hist[bin*nvals+i]++;
}
for (int n=1; n < int(nmoments)-1; n++)
for (int i=0; i<nvals; i++)
local_moments[n*nvals + i] += (pow_tmp[i] = val_tmp[i]*pow_tmp[i]);
for (int n=1; n < int(nmoments)-1; n++){
double pow_tmp = 1;
for (int i=0; i<nvals; i++){
local_moments[n*nvals + i] += (pow_tmp = val_tmp[i]*pow_tmp);
}
}
});
TIMEZONE("FIELD_RLOOP::Merge");
local_moments_threaded.mergeParallel([&](const int idx, const double& v1, const double& v2) -> double {
if(nvals == int(4) && idx == 0*nvals+3){
return std::min(v1, v2);
}
if(nvals == int(4) && idx == 9*nvals+3){
return std::max(v1, v2);
}
if(idx < int(ncomp(fc))){
return std::min(v1, v2);
}
if(int(nmoments-1)*nvals <= idx && idx < int(int(nmoments-1)*nvals+ncomp(fc))){
return std::max(v1, v2);
}
return v1 + v2;
});
local_hist_threaded.mergeParallel();
}
ptrdiff_t *hist = new ptrdiff_t[nbins*nvals];
double *moments = new double[nmoments*nvals];
{
TIMEZONE("MPI_Allreduce");
MPI_Allreduce(
(void*)local_moments,
(void*)local_moments_threaded.getMasterData(),
(void*)moments,
nvals,
MPI_DOUBLE, MPI_MIN, this->comm);
MPI_Allreduce(
(void*)(local_moments + nvals),
(void*)(local_moments_threaded.getMasterData() + nvals),
(void*)(moments+nvals),
(nmoments-2)*nvals,
MPI_DOUBLE, MPI_SUM, this->comm);
MPI_Allreduce(
(void*)(local_moments + (nmoments-1)*nvals),
(void*)(local_moments_threaded.getMasterData() + (nmoments-1)*nvals),
(void*)(moments+(nmoments-1)*nvals),
nvals,
MPI_DOUBLE, MPI_MAX, this->comm);
MPI_Allreduce(
(void*)local_hist,
(void*)local_hist_threaded.getMasterData(),
(void*)hist,
nbins*nvals,
MPI_INT64_T, MPI_SUM, this->comm);
......@@ -785,11 +819,8 @@ void field<rnumber, be, fc>::compute_rspace_stats(
for (int n=1; n < int(nmoments)-1; n++)
for (int i=0; i<nvals; i++)
moments[n*nvals + i] /= this->npoints;
delete[] local_moments;
delete[] local_hist;
delete[] val_tmp;
delete[] binsize;
delete[] pow_tmp;
if (this->myrank == 0)
{
TIMEZONE("root-work");
......
......@@ -29,6 +29,7 @@
#include <vector>
#include <string>
#include "kspace.hpp"
#include "omputils.hpp"
#ifndef FIELD_HPP
......@@ -71,7 +72,7 @@ class field
const int ny,
const int nz,
const MPI_Comm COMM_TO_USE,
const unsigned FFTW_PLAN_RIGOR = FFTW_ESTIMATE);
const unsigned FFTW_PLAN_RIGOR = DEFAULT_FFTW_FLAG);
~field();
int io(
......@@ -206,16 +207,22 @@ class field
switch(be)
{
case FFTW:
for (hsize_t zindex = 0; zindex < this->rlayout->subsizes[0]; zindex++)
for (hsize_t yindex = 0; yindex < this->rlayout->subsizes[1]; yindex++)
#pragma omp parallel
{
ptrdiff_t rindex = (
zindex * this->rlayout->subsizes[1] + yindex)*(
this->rmemlayout->subsizes[2]);
for (hsize_t xindex = 0; xindex < this->rlayout->subsizes[2]; xindex++)
const hsize_t start = OmpUtils::ForIntervalStart(this->rlayout->subsizes[1]);
const hsize_t end = OmpUtils::ForIntervalEnd(this->rlayout->subsizes[1]);
for (hsize_t zindex = 0; zindex < this->rlayout->subsizes[0]; zindex++)
for (hsize_t yindex = start; yindex < end; yindex++)
{
expression(rindex, xindex, yindex, zindex);
rindex++;
ptrdiff_t rindex = (
zindex * this->rlayout->subsizes[1] + yindex)*(
this->rmemlayout->subsizes[2]);
for (hsize_t xindex = 0; xindex < this->rlayout->subsizes[2]; xindex++)
{
expression(rindex, xindex, yindex, zindex);
rindex++;
}
}
}
break;
......
......@@ -361,7 +361,7 @@ int field_descriptor<rnumber>::transpose(
this->sizes[0], this->slice_size,
input, output,
this->comm,
FFTW_ESTIMATE);
DEFAULT_FFTW_FLAG);
fftw_interface<rnumber>::execute(tplan);
fftw_interface<rnumber>::destroy_plan(tplan);
return EXIT_SUCCESS;
......@@ -389,7 +389,7 @@ int field_descriptor<rnumber>::transpose(
FFTW_MPI_DEFAULT_BLOCK,
(rnumber*)input, (rnumber*)output,
this->comm,
FFTW_ESTIMATE);
DEFAULT_FFTW_FLAG);
fftw_interface<rnumber>::execute(tplan);
fftw_interface<rnumber>::destroy_plan(tplan);
break;
......@@ -449,7 +449,7 @@ int field_descriptor<rnumber>::interleave(
a,
a,
/*kind*/nullptr,
FFTW_ESTIMATE);
DEFAULT_FFTW_FLAG);
fftw_interface<rnumber>::execute(tmp);
fftw_interface<rnumber>::destroy_plan(tmp);
return EXIT_SUCCESS;
......@@ -478,7 +478,7 @@ int field_descriptor<rnumber>::interleave(
a,
a,
+1,
FFTW_ESTIMATE);
DEFAULT_FFTW_FLAG);
fftw_interface<rnumber>::execute(tmp);
fftw_interface<rnumber>::destroy_plan(tmp);
return EXIT_SUCCESS;
......
......@@ -32,7 +32,7 @@
#include "fluid_solver.hpp"
#include "fftw_tools.hpp"
#include "scope_timer.hpp"
#include "shared_array.hpp"
template <class rnumber>
......@@ -195,11 +195,11 @@ template <class rnumber>
void fluid_solver<rnumber>::compute_vorticity()
{
TIMEZONE("fluid_solver::compute_vorticity");
ptrdiff_t tindex;
CLOOP_K2(
this,
[&](ptrdiff_t cindex, ptrdiff_t xindex, ptrdiff_t yindex, ptrdiff_t zindex, double k2){
tindex = 3*cindex;
// cindex indexing is thread safe (and tindex too) + it is a write
ptrdiff_t tindex = 3*cindex;
if (k2 <= this->kM2)
{
this->cvorticity[tindex+0][0] = -(this->ky[yindex]*this->cu[tindex+2][1] - this->kz[zindex]*this->cu[tindex+1][1]);
......@@ -209,8 +209,9 @@ void fluid_solver<rnumber>::compute_vorticity()
this->cvorticity[tindex+1][1] = (this->kz[zindex]*this->cu[tindex+0][0] - this->kx[xindex]*this->cu[tindex+2][0]);
this->cvorticity[tindex+2][1] = (this->kx[xindex]*this->cu[tindex+1][0] - this->ky[yindex]*this->cu[tindex+0][0]);
}
else
else{
std::fill_n((rnumber*)(this->cvorticity+tindex), 6, 0.0);
}
}
);
this->symmetrize(this->cvorticity, 3);
......@@ -220,11 +221,11 @@ template <class rnumber>
void fluid_solver<rnumber>::compute_velocity(rnumber (*__restrict__ vorticity)[2])
{
TIMEZONE("fluid_solver::compute_velocity");
ptrdiff_t tindex;
CLOOP_K2(
this,
[&](ptrdiff_t cindex, ptrdiff_t xindex, ptrdiff_t yindex, ptrdiff_t zindex, double k2){