Commit 9471c4da authored by Cristian Lalescu's avatar Cristian Lalescu
Browse files

Merge branch 'develop' into bugfix/psample-memory-leak

parents 1b387500 79e0f520
Pipeline #43842 failed with stage
......@@ -130,7 +130,7 @@ enough).
./configure --prefix=PREFIX --enable-single --enable-sse --enable-mpi --enable-openmp --enable-threads
make
make install
./configure --prefix=PREFIX --enable-sse --enable-sse2 --enable-mpi --enable-openmp --enable-threads
./configure --prefix=PREFIX --enable-sse2 --enable-mpi --enable-openmp --enable-threads
make
make install
......
......@@ -430,9 +430,7 @@ class DNS(_code):
return None
def write_par(
self,
iter0 = 0,
particle_ic = None,
particles_off = False):
iter0 = 0):
assert (self.parameters['niter_todo'] % self.parameters['niter_stat'] == 0)
assert (self.parameters['niter_todo'] % self.parameters['niter_out'] == 0)
assert (self.parameters['niter_out'] % self.parameters['niter_stat'] == 0)
......@@ -479,40 +477,8 @@ class DNS(_code):
4),
dtype = np.int64)
ofile['checkpoint'] = int(0)
if (self.dns_type in ['NSVE', 'NSVE_no_output']) or particles_off:
if (self.dns_type in ['NSVE', 'NSVE_no_output']):
return None
if type(particle_ic) == type(None):
pbase_shape = (self.parameters['nparticles'],)
number_of_particles = self.parameters['nparticles']
else:
pbase_shape = particle_ic.shape[:-1]
assert(particle_ic.shape[-1] == 3)
number_of_particles = 1
for val in pbase_shape[1:]:
number_of_particles *= val
ncomponents = 3
if self.dns_type in ['NSVEcomplex_particles']:
ncomponents = 6
with h5py.File(self.get_checkpoint_0_fname(), 'a') as ofile:
s = 0
if not 'tracers{0}'.format(s) in ofile.keys():
ofile.create_group('tracers{0}'.format(s))
ofile.create_group('tracers{0}/rhs'.format(s))
ofile.create_group('tracers{0}/state'.format(s))
ofile['tracers{0}/rhs'.format(s)].create_dataset(
'0',
shape = (
(self.parameters['tracers{0}_integration_steps'.format(s)],) +
pbase_shape +
(ncomponents,)),
dtype = np.float)
ofile['tracers{0}/state'.format(s)].create_dataset(
'0',
shape = (
pbase_shape +
(ncomponents,)),
dtype = np.float)
return None
def job_parser_arguments(
self,
......@@ -818,35 +784,56 @@ class DNS(_code):
def generate_tracer_state(
self,
rseed = None,
species = 0):
with h5py.File(self.get_checkpoint_0_fname(), 'a') as data_file:
dset = data_file[
'tracers{0}/state/0'.format(species)]
if not type(rseed) == type(None):
np.random.seed(rseed)
nn = self.parameters['nparticles']
cc = int(0)
batch_size = int(1e6)
def get_random_phases(npoints):
return np.random.random(
(npoints, 3))*2*np.pi
def get_random_versors(npoints):
bla = np.random.normal(
size = (npoints, 3))
bla /= np.sum(bla**2, axis = 1)[:, None]**.5
return bla
while nn > 0:
if nn > batch_size:
dset[cc*batch_size:(cc+1)*batch_size, :3] = get_random_phases(batch_size)
if dset.shape[1] == 6:
dset[cc*batch_size:(cc+1)*batch_size, 3:] = get_random_versors(batch_size)
nn -= batch_size
else:
dset[cc*batch_size:cc*batch_size+nn, :3] = get_random_phases(nn)
if dset.shape[1] == 6:
dset[cc*batch_size:cc*batch_size+nn, 3:] = get_random_versors(nn)
nn = 0
cc += 1
species = 0,
integration_steps = None,
ncomponents = 3):
try:
if type(integration_steps) == type(None):
integration_steps = self.NSVEp_extra_parameters['tracers0_integration_steps']
if 'tracers{0}_integration_steps'.format(species) in self.parameters.keys():
integration_steps = self.parameters['tracers{0}_integration_steps'.format(species)]
if self.dns_type == 'NSVEcomplex_particles' and species == 0:
ncomponents = 6
with h5py.File(self.get_checkpoint_0_fname(), 'a') as data_file:
nn = self.parameters['nparticles']
if not 'tracers{0}'.format(species) in data_file.keys():
data_file.create_group('tracers{0}'.format(species))
data_file.create_group('tracers{0}/rhs'.format(species))
data_file.create_group('tracers{0}/state'.format(species))
data_file['tracers{0}/rhs'.format(species)].create_dataset(
'0',
shape = (integration_steps, nn, ncomponents,),
dtype = np.float)
dset = data_file['tracers{0}/state'.format(species)].create_dataset(
'0',
shape = (nn, ncomponents,),
dtype = np.float)
if not type(rseed) == type(None):
np.random.seed(rseed)
cc = int(0)
batch_size = int(1e6)
def get_random_phases(npoints):
return np.random.random(
(npoints, 3))*2*np.pi
def get_random_versors(npoints):
bla = np.random.normal(
size = (npoints, 3))
bla /= np.sum(bla**2, axis = 1)[:, None]**.5
return bla
while nn > 0:
if nn > batch_size:
dset[cc*batch_size:(cc+1)*batch_size, :3] = get_random_phases(batch_size)
if dset.shape[1] == 6:
dset[cc*batch_size:(cc+1)*batch_size, 3:] = get_random_versors(batch_size)
nn -= batch_size
else:
dset[cc*batch_size:cc*batch_size+nn, :3] = get_random_phases(nn)
if dset.shape[1] == 6:
dset[cc*batch_size:cc*batch_size+nn, 3:] = get_random_versors(nn)
nn = 0
cc += 1
except Exception as e:
print(e)
return None
def generate_vector_field(
self,
......@@ -995,66 +982,61 @@ class DNS(_code):
particle_file.create_group('tracers0/pressure_gradient')
particle_file.create_group('tracers0/pressure_Hessian')
return None
def generate_initial_condition(
self,
opt = None):
# take care of fields' initial condition
# first, check if initial field exists
need_field = False
if not os.path.exists(self.get_checkpoint_0_fname()):
need_field = True
else:
f = h5py.File(self.get_checkpoint_0_fname(), 'r')
try:
dset = f['vorticity/complex/0']
need_field = (dset.shape == (self.parameters['ny'],
self.parameters['nz'],
self.parameters['nx']//2+1,
3))
except:
need_field = True
f.close()
if need_field:
f = h5py.File(self.get_checkpoint_0_fname(), 'a')
if len(opt.src_simname) > 0:
source_cp = 0
src_file = 'not_a_file'
while True:
src_file = os.path.join(
os.path.realpath(opt.src_work_dir),
opt.src_simname + '_checkpoint_{0}.h5'.format(source_cp))
f0 = h5py.File(src_file, 'r')
if '{0}'.format(opt.src_iteration) in f0['vorticity/complex'].keys():
f0.close()
break
source_cp += 1
self.copy_complex_field(
src_file,
'vorticity/complex/{0}'.format(opt.src_iteration),
f,
'vorticity/complex/{0}'.format(0))
else:
data = self.generate_vector_field(
write_to_file = False,
spectra_slope = 2.0,
amplitude = 0.05)
f['vorticity/complex/{0}'.format(0)] = data
f.close()
# now take care of particles' initial condition
if self.dns_type in ['NSVEparticles', 'NSVEcomplex_particles', 'NSVEparticles_no_output', 'NSVEp_extra_sampling']:
self.generate_particle_data(opt = opt)
return None
def launch_jobs(
self,
opt = None,
particle_initial_condition = None):
if not os.path.exists(os.path.join(self.work_dir, self.simname + '.h5')):
# take care of fields' initial condition
if not os.path.exists(self.get_checkpoint_0_fname()):
f = h5py.File(self.get_checkpoint_0_fname(), 'w')
if len(opt.src_simname) > 0:
source_cp = 0
src_file = 'not_a_file'
while True:
src_file = os.path.join(
os.path.realpath(opt.src_work_dir),
opt.src_simname + '_checkpoint_{0}.h5'.format(source_cp))
f0 = h5py.File(src_file, 'r')
if '{0}'.format(opt.src_iteration) in f0['vorticity/complex'].keys():
f0.close()
break
source_cp += 1
self.copy_complex_field(
src_file,
'vorticity/complex/{0}'.format(opt.src_iteration),
f,
'vorticity/complex/{0}'.format(0))
else:
data = self.generate_vector_field(
write_to_file = False,
spectra_slope = 2.0,
amplitude = 0.05)
f['vorticity/complex/{0}'.format(0)] = data
f.close()
## take care of particles' initial condition
#if self.dns_type in ['NSVEparticles', 'NSVEparticles_no_output']:
# if opt.pclouds > 1:
# np.random.seed(opt.particle_rand_seed)
# if opt.pcloud_type == 'random-cube':
# particle_initial_condition = (
# np.random.random((opt.pclouds, 1, 3))*2*np.pi +
# np.random.random((1, self.parameters['nparticles'], 3))*opt.particle_cloud_size)
# elif opt.pcloud_type == 'regular-cube':
# onedarray = np.linspace(
# -opt.particle_cloud_size/2,
# opt.particle_cloud_size/2,
# self.parameters['nparticles'])
# particle_initial_condition = np.zeros(
# (opt.pclouds,
# self.parameters['nparticles'],
# self.parameters['nparticles'],
# self.parameters['nparticles'], 3),
# dtype = np.float64)
# particle_initial_condition[:] = \
# np.random.random((opt.pclouds, 1, 1, 1, 3))*2*np.pi
# particle_initial_condition[..., 0] += onedarray[None, None, None, :]
# particle_initial_condition[..., 1] += onedarray[None, None, :, None]
# particle_initial_condition[..., 2] += onedarray[None, :, None, None]
self.write_par(
particle_ic = None)
if self.dns_type in ['NSVEparticles', 'NSVEcomplex_particles', 'NSVEparticles_no_output', 'NSVEp_extra_sampling']:
self.generate_particle_data(opt = opt)
opt = None):
if not os.path.exists(self.get_data_file_name()):
self.generate_initial_condition(opt = opt)
self.write_par()
self.run(
nb_processes = opt.nb_processes,
nb_threads_per_process = opt.nb_threads_per_process,
......
......@@ -33,6 +33,7 @@ import h5py
import math
import numpy as np
import warnings
import glob
import bfps
from ._code import _code
......@@ -804,21 +805,15 @@ class PP(_code):
with h5py.File(os.path.join(self.work_dir, self.simname + '_fields.h5'), 'a') as ff:
ff.require_group('vorticity')
ff.require_group('vorticity/complex')
checkpoint = (iter0 // niter_out) // cppf
while True:
cpf_name = os.path.join(
self.work_dir,
self.simname + '_checkpoint_{0}.h5'.format(checkpoint))
if os.path.exists(cpf_name):
cpf = h5py.File(cpf_name, 'r')
for iter_name in cpf['vorticity/complex'].keys():
if iter_name not in ff['vorticity/complex'].keys():
ff['vorticity/complex/' + iter_name] = h5py.ExternalLink(
cpf_name,
'vorticity/complex/' + iter_name)
checkpoint += 1
else:
break
checkpoint_file_list = glob.glob(self.simname + '_checkpoint_*.h5')
for cpf_name in checkpoint_file_list:
cpf = h5py.File(cpf_name, 'r')
for iter_name in cpf['vorticity/complex'].keys():
if iter_name not in ff['vorticity/complex'].keys():
ff['vorticity/complex/' + iter_name] = h5py.ExternalLink(
cpf_name,
'vorticity/complex/' + iter_name)
cpf.close()
return None
def launch_jobs(
self,
......
......@@ -33,7 +33,7 @@ from .PP import PP
from .TEST import TEST
def main():
parser = argparse.ArgumentParser(prog = 'bfps')
parser = argparse.ArgumentParser(prog = 'bfps', conflict_handler = 'resolve')
parser.add_argument(
'-v', '--version',
action = 'version',
......
......@@ -262,10 +262,16 @@ class _code(_base):
current_dir = os.getcwd()
os.chdir(self.work_dir)
os.chdir(current_dir)
if not 'MPI' in self.host_info.keys():
self.host_info['MPI'] = 'openmpi'
if self.host_info['MPI'] == 'openmpi':
mpirun_environment_set = 'x'
else:
mpirun_environment_set = 'env'
command_atoms = ['mpirun',
'-np',
'{0}'.format(nb_processes),
'-x',
'-' + mpirun_environment_set,
'OMP_NUM_THREADS={0}'.format(nb_threads_per_process),
'./' + self.name,
self.simname]
......
......@@ -26,7 +26,7 @@
#include <mpi.h>
#include <fftw3-mpi.h>
#include "field_descriptor.hpp"
#include <map>
#ifndef FFTW_TOOLS
......
......@@ -1387,7 +1387,6 @@ void field<rnumber, be, fc>::compute_stats(
// what follows gave me a headache until I found this link:
// http://stackoverflow.com/questions/8256636/expected-primary-expression-error-on-template-method-using
kk->template cospectrum<rnumber, fc>(
(typename fftw_interface<rnumber>::complex*)this->data,
(typename fftw_interface<rnumber>::complex*)this->data,
group,
dset_name + "_" + dset_name,
......
/**********************************************************************
* *
* Copyright 2019 Max Planck Institute *
* for Dynamics and Self-Organization *
* *
* This file is part of bfps. *
* *
* bfps is free software: you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published *
* by the Free Software Foundation, either version 3 of the License, *
* or (at your option) any later version. *
* *
* bfps is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with bfps. If not, see <http://www.gnu.org/licenses/> *
* *
* Contact: Cristian.Lalescu@ds.mpg.de *
* *
**********************************************************************/
......@@ -39,6 +61,7 @@ int NSVEparticles<rnumber>::initialize(void)
"tracers0",
nparticles,
tracers0_integration_steps);
this->particles_output_writer_mpi->setParticleFileLayout(this->ps->getParticleFileLayout());
this->particles_sample_writer_mpi = new particles_output_sampling_hdf5<
long long int, double, 3>(
MPI_COMM_WORLD,
......@@ -46,6 +69,7 @@ int NSVEparticles<rnumber>::initialize(void)
(this->simname + "_particles.h5"),
"tracers0",
"position/0");
this->particles_sample_writer_mpi->setParticleFileLayout(this->ps->getParticleFileLayout());
return EXIT_SUCCESS;
}
......
......@@ -525,7 +525,7 @@ void kspace<be, dt>::cospectrum(
const std::string dset_name,
const hsize_t toffset)
{
TIMEZONE("field::cospectrum");
TIMEZONE("field::cospectrum2");
shared_array<double> spec_local_thread(this->nshells*ncomp(fc)*ncomp(fc),[&](double* spec_local){
std::fill_n(spec_local, this->nshells*ncomp(fc)*ncomp(fc), 0);
});
......@@ -593,6 +593,84 @@ void kspace<be, dt>::cospectrum(
}
}
template <field_backend be,
kspace_dealias_type dt>
template <typename rnumber,
field_components fc>
void kspace<be, dt>::cospectrum(
const rnumber(* __restrict a)[2],
const hid_t group,
const std::string dset_name,
const hsize_t toffset)
{
TIMEZONE("field::cospectrum1");
shared_array<double> spec_local_thread(this->nshells*ncomp(fc)*ncomp(fc),[&](double* spec_local){
std::fill_n(spec_local, this->nshells*ncomp(fc)*ncomp(fc), 0);
});
this->CLOOP_K2_NXMODES(
[&](ptrdiff_t cindex,
ptrdiff_t xindex,
ptrdiff_t yindex,
ptrdiff_t zindex,
double k2,
int nxmodes){
if (k2 <= this->kM2)
{
double* spec_local = spec_local_thread.getMine();
int tmp_int = int(sqrt(k2) / this->dk)*ncomp(fc)*ncomp(fc);
for (hsize_t i=0; i<ncomp(fc); i++)
for (hsize_t j=0; j<ncomp(fc); j++){
spec_local[tmp_int + i*ncomp(fc)+j] += nxmodes * (
(a[ncomp(fc)*cindex + i][0] * a[ncomp(fc)*cindex + j][0]) +
(a[ncomp(fc)*cindex + i][1] * a[ncomp(fc)*cindex + j][1]));
}
}
});
spec_local_thread.mergeParallel();
std::vector<double> spec;
spec.resize(this->nshells*ncomp(fc)*ncomp(fc), 0);
MPI_Allreduce(
spec_local_thread.getMasterData(),
&spec.front(),
spec.size(),
MPI_DOUBLE, MPI_SUM, this->layout->comm);
if (this->layout->myrank == 0)
{
hid_t dset, wspace, mspace;
hsize_t count[(ndim(fc)-2)*2], offset[(ndim(fc)-2)*2], dims[(ndim(fc)-2)*2];
dset = H5Dopen(group, ("spectra/" + dset_name).c_str(), H5P_DEFAULT);
wspace = H5Dget_space(dset);
H5Sget_simple_extent_dims(wspace, dims, NULL);
switch (fc)
{
case THREExTHREE:
offset[4] = 0;
offset[5] = 0;
count[4] = 3;
count[5] = 3;
case THREE:
offset[2] = 0;
offset[3] = 0;
count[2] = 3;
count[3] = 3;
default:
offset[0] = toffset;
offset[1] = 0;
count[0] = 1;
count[1] = this->nshells;
}
mspace = H5Screate_simple((ndim(fc)-2)*2, count, NULL);
H5Sselect_hyperslab(wspace, H5S_SELECT_SET, offset, NULL, count, NULL);
H5Dwrite(dset, H5T_NATIVE_DOUBLE, mspace, wspace, H5P_DEFAULT, &spec.front());
H5Sclose(wspace);
H5Sclose(mspace);
H5Dclose(dset);
}
}
template <field_backend be,
kspace_dealias_type dt>
template <typename rnumber,
......@@ -837,6 +915,68 @@ template void kspace<FFTW, SMOOTH>::cospectrum<double, THREExTHREE>(
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, TWO_THIRDS>::cospectrum<float, ONE>(
const typename fftw_interface<float>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, TWO_THIRDS>::cospectrum<float, THREE>(
const typename fftw_interface<float>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, TWO_THIRDS>::cospectrum<float, THREExTHREE>(
const typename fftw_interface<float>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, TWO_THIRDS>::cospectrum<double, ONE>(
const typename fftw_interface<double>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, TWO_THIRDS>::cospectrum<double, THREE>(
const typename fftw_interface<double>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, TWO_THIRDS>::cospectrum<double, THREExTHREE>(
const typename fftw_interface<double>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, SMOOTH>::cospectrum<float, ONE>(
const typename fftw_interface<float>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, SMOOTH>::cospectrum<float, THREE>(
const typename fftw_interface<float>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, SMOOTH>::cospectrum<float, THREExTHREE>(
const typename fftw_interface<float>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, SMOOTH>::cospectrum<double, ONE>(
const typename fftw_interface<double>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, SMOOTH>::cospectrum<double, THREE>(
const typename fftw_interface<double>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template void kspace<FFTW, SMOOTH>::cospectrum<double, THREExTHREE>(
const typename fftw_interface<double>::complex *__restrict__ a,
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template double kspace<FFTW, TWO_THIRDS>::L2norm<float, ONE>(
const typename fftw_interface<float>::complex *__restrict__ a);
template double kspace<FFTW, TWO_THIRDS>::L2norm<float, THREE>(
......
......@@ -114,6 +114,14 @@ class kspace
const std::string dset_name,
const hsize_t toffset);
template <typename rnumber,
field_components fc>
void cospectrum(
const rnumber(* __restrict__ a)[2],
const hid_t group,
const std::string dset_name,
const hsize_t toffset);
template <typename rnumber,
field_components fc>
double L2norm(
......
......@@ -80,7 +80,7 @@ public:
void completeLoopWithExtraField(
const real_number dt,
const field<rnumber, be, fc>& in_field) {
static_assert(fc == THREE || THREExTHREE, "only THREE or THREExTHREE is supported for now");
static_assert((fc == THREE) || (fc == THREExTHREE), "only THREE or THREExTHREE is supported for now");
if (fc == THREE)
{
std::unique_ptr<real_number[]> extra_rhs(new real_number[getLocalNbParticles()*3]());
......@@ -96,6 +96,9 @@ public:
completeLoopWithVelocityGradient(dt, extra_rhs.get());
}
}
virtual int setParticleFileLayout(std::vector<hsize_t>) = 0;
virtual std::vector<hsize_t> getParticleFileLayout() = 0;
};
#endif