Commit 1c38af33 authored by Cristian Lalescu's avatar Cristian Lalescu
Browse files

Merge branch 'feature/joint-vec-statistics' into develop

parents 8f13c0c2 c71f2a04
Pipeline #16282 passed with stage
in 6 minutes and 12 seconds
......@@ -117,7 +117,7 @@ class PP(_code):
outfile.write(self.main + '\n')
return None
def generate_default_parameters(self):
# these parameters are relevant for all DNS classes
# these parameters are relevant for all PP classes
self.parameters['dealias_type'] = int(1)
self.parameters['dkx'] = float(1.0)
self.parameters['dky'] = float(1.0)
......@@ -131,6 +131,15 @@ class PP(_code):
self.pp_parameters = {}
self.pp_parameters['iteration_list'] = np.zeros(1).astype(np.int)
return None
def extra_postprocessing_parameters(
self,
dns_type = 'joint_acc_vel_stats'):
pars = {}
if dns_type == 'joint_acc_vel_stats':
pars['max_acceleration_estimate'] = float(10)
pars['max_velocity_estimate'] = float(1)
pars['histogram_bins'] = int(129)
return pars
def get_data_file_name(self):
return os.path.join(self.work_dir, self.simname + '.h5')
def get_data_file(self):
......@@ -426,6 +435,15 @@ class PP(_code):
self.simulation_parser_arguments(parser_get_rfields)
self.job_parser_arguments(parser_get_rfields)
self.parameters_to_parser_arguments(parser_get_rfields)
parser_joint_acc_vel_stats = subparsers.add_parser(
'joint_acc_vel_stats',
help = 'get joint acceleration and velocity statistics')
self.simulation_parser_arguments(parser_joint_acc_vel_stats)
self.job_parser_arguments(parser_joint_acc_vel_stats)
self.parameters_to_parser_arguments(parser_joint_acc_vel_stats)
self.parameters_to_parser_arguments(
parser_joint_acc_vel_stats,
parameters = self.extra_postprocessing_parameters('joint_acc_vel_stats'))
return None
def prepare_launch(
self,
......@@ -463,7 +481,8 @@ class PP(_code):
for k in self.pp_parameters.keys():
self.parameters[k] = self.pp_parameters[k]
self.pars_from_namespace(opt)
niter_out = self.get_data_file()['parameters/niter_todo'].value
niter_out = self.get_data_file()['parameters/niter_out'].value
assert(opt.iter0 % niter_out == 0)
self.pp_parameters['iteration_list'] = np.arange(
opt.iter0, opt.iter1+niter_out, niter_out, dtype = np.int)
return opt
......@@ -622,13 +641,170 @@ class PP(_code):
dst_file[dst_dset_name][kz,:min_shape[1], :min_shape[2]] = \
src_file[src_dset_name][kz, :min_shape[1], :min_shape[2]]
return None
def prepare_post_file(
self,
opt = None):
self.pp_parameters.update(
self.extra_postprocessing_parameters(self.dns_type))
self.pars_from_namespace(
opt,
parameters = self.pp_parameters,
get_sim_info = False)
for kk in ['nx', 'ny', 'nz']:
self.parameters[kk] = self.get_data_file()['parameters/' + kk].value
n = self.parameters['nx']
if self.dns_type in ['filtered_slices',
'filtered_acceleration']:
if type(opt.klist_kmax) == type(None):
opt.klist_kmax = n / 3.
if type(opt.klist_kmin) == type(None):
opt.klist_kmin = 6.
kvals = bfps_addons.tools.power_space_array(
power = opt.klist_power,
size = opt.klist_size,
vmin = opt.klist_kmin,
vmax = opt.klist_kmax)
if opt.test_klist:
for i in range(opt.klist_size):
print('kcut{0} = {1}, ell{0} = {2:.3e}'.format(
i, kvals[i], 2*np.pi / kvals[i]))
opt.no_submit = True
self.pp_parameters['kcut'] = kvals
self.rewrite_par(
group = self.dns_type + '/parameters',
parameters = self.pp_parameters,
file_name = os.path.join(self.work_dir, self.simname + '_post.h5'))
histogram_bins = opt.histogram_bins
if (type(histogram_bins) == type(None) and
'histogram_bins' in self.pp_parameters.keys()):
histogram_bins = self.pp_parameters['histogram_bins']
with h5py.File(os.path.join(self.work_dir, self.simname + '_post.h5'), 'r+') as ofile:
group = ofile[self.dns_type]
group.require_group('histograms')
group.require_group('moments')
group.require_group('spectra')
vec_spectra_stats = []
vec4_rspace_stats = []
scal_rspace_stats = []
if self.dns_type == 'joint_acc_vel_stats':
vec_spectra_stats.append('velocity')
vec4_rspace_stats.append('velocity')
vec_spectra_stats.append('acceleration')
vec4_rspace_stats.append('acceleration')
for quantity in scal_rspace_stats:
if quantity not in group['histograms'].keys():
time_chunk = 2**20 // (8*histogram_bins)
time_chunk = max(time_chunk, 1)
group['histograms'].create_dataset(
quantity,
(1, histogram_bins),
chunks = (time_chunk, histogram_bins),
maxshape = (None, histogram_bins),
dtype = np.int64)
else:
assert(histogram_bins ==
group['histograms/' + quantity].shape[1])
if quantity not in group['moments'].keys():
time_chunk = 2**20 // (8*10)
time_chunk = max(time_chunk, 1)
group['moments'].create_dataset(
quantity,
(1, 10),
chunks = (time_chunk, 10),
maxshape = (None, 10),
dtype = np.float64)
if self.dns_type == 'joint_acc_vel_stats':
quantity = 'acceleration_and_velocity_components'
if quantity not in group['histograms'].keys():
time_chunk = 2**20 // (8*9*histogram_bins**2)
time_chunk = max(time_chunk, 1)
group['histograms'].create_dataset(
quantity,
(1, histogram_bins, histogram_bins, 3, 3),
chunks = (time_chunk, histogram_bins, histogram_bins, 3, 3),
maxshape = (None, histogram_bins, histogram_bins, 3, 3),
dtype = np.int64)
quantity = 'acceleration_and_velocity_magnitudes'
if quantity not in group['histograms'].keys():
time_chunk = 2**20 // (8*histogram_bins**2)
time_chunk = max(time_chunk, 1)
group['histograms'].create_dataset(
quantity,
(1, histogram_bins, histogram_bins),
chunks = (time_chunk, histogram_bins, histogram_bins),
maxshape = (None, histogram_bins, histogram_bins),
dtype = np.int64)
ncomps = 4
for quantity in vec4_rspace_stats:
if quantity not in group['histograms'].keys():
time_chunk = 2**20 // (8*histogram_bins*ncomps)
time_chunk = max(time_chunk, 1)
group['histograms'].create_dataset(
quantity,
(1, histogram_bins, ncomps),
chunks = (time_chunk, histogram_bins, ncomps),
maxshape = (None, histogram_bins, ncomps),
dtype = np.int64)
if quantity not in group['moments'].keys():
time_chunk = 2**20 // (8*10*ncomps)
time_chunk = max(time_chunk, 1)
group['moments'].create_dataset(
quantity,
(1, 10, ncomps),
chunks = (time_chunk, 10, ncomps),
maxshape = (None, 10, ncomps),
dtype = np.float64)
time_chunk = 2**20 // (
4*3*
self.parameters['nx']*self.parameters['ny'])
time_chunk = max(time_chunk, 1)
for quantity in vec_spectra_stats:
df = self.get_data_file()
if quantity + '_' + quantity not in group['spectra'].keys():
spec_chunks = df['statistics/spectra/velocity_velocity'].chunks
spec_shape = df['statistics/spectra/velocity_velocity'].shape
spec_maxshape = df['statistics/spectra/velocity_velocity'].maxshape
group['spectra'].create_dataset(
quantity + '_' + quantity,
spec_shape,
chunks = spec_chunks,
maxshape = spec_maxshape,
dtype = np.float64)
df.close()
return None
def prepare_field_file(self):
df = self.get_data_file()
if 'field_dtype' in df.keys():
# we don't need to do anything, raw binary files are used
return None
last_iteration = df['iteration'].value
cppf = df['parameters/checkpoints_per_file'].value
niter_out = df['parameters/niter_out'].value
with h5py.File(os.path.join(self.work_dir, self.simname + '_fields.h5'), 'a') as ff:
ff.require_group('vorticity')
ff.require_group('vorticity/complex')
checkpoint = 0
while True:
cpf_name = os.path.join(
self.work_dir,
self.simname + '_checkpoint_{0}.h5'.format(checkpoint))
if os.path.exists(cpf_name):
cpf = h5py.File(cpf_name, 'r')
for iter_name in cpf['vorticity/complex'].keys():
if iter_name not in ff['vorticity/complex'].keys():
ff['vorticity/complex/' + iter_name] = h5py.ExternalLink(
cpf_name,
'vorticity/complex/' + iter_name)
checkpoint += 1
else:
break
return None
def launch_jobs(
self,
opt = None,
particle_initial_condition = None):
self.rewrite_par(
group = self.dns_type,
parameters = self.pp_parameters)
self.prepare_post_file(opt)
self.prepare_field_file()
self.run(
nb_processes = opt.nb_processes,
nb_threads_per_process = opt.nb_threads_per_process,
......
......@@ -1135,6 +1135,231 @@ int invert_curl(
return EXIT_SUCCESS;
}
template <typename rnumber,
field_backend be,
field_components fc>
int joint_rspace_PDF(
field<rnumber, be, fc> *f1,
field<rnumber, be, fc> *f2,
const hid_t group,
const std::string dset_name,
const hsize_t toffset,
const std::vector<double> max_f1_estimate,
const std::vector<double> max_f2_estimate)
{
TIMEZONE("joint_rspace_PDF");
assert(f1->real_space_representation);
assert(f2->real_space_representation);
if (fc == THREE)
{
assert(max_f1_estimate.size() == 4);
assert(max_f2_estimate.size() == 4);
}
else if (fc == ONE)
{
assert(max_f1_estimate.size() == 1);
assert(max_f2_estimate.size() == 1);
}
int nbins;
std::string dsetc, dsetm;
dsetc = "histograms/" + dset_name + "_components";
if (fc == THREE)
dsetm = "histograms/" + dset_name + "_magnitudes";
else
dsetm = "histograms/" + dset_name;
if (f1->myrank == 0)
{
hid_t dset, wspace;
hsize_t dims[5];
int ndims;
if (fc == THREE)
{
dset = H5Dopen(
group,
dsetc.c_str(),
H5P_DEFAULT);
wspace = H5Dget_space(dset);
ndims = H5Sget_simple_extent_dims(wspace, dims, NULL);
DEBUG_MSG("number of dimensions is %d\n", ndims);
assert(ndims == 5);
assert(dims[3] == 3);
assert(dims[4] == 3);
H5Sclose(wspace);
H5Dclose(dset);
}
dset = H5Dopen(
group,
dsetm.c_str(),
H5P_DEFAULT);
wspace = H5Dget_space(dset);
ndims = H5Sget_simple_extent_dims(wspace, dims, NULL);
assert(ndims == 3);
H5Sclose(wspace);
H5Dclose(dset);
nbins = dims[1];
}
{
TIMEZONE("MPI_Bcast");
MPI_Bcast(&nbins, 1, MPI_INT, 0, f1->comm);
}
/// histogram components
shared_array<ptrdiff_t> local_histc_threaded(
nbins*nbins*9,
[&](ptrdiff_t* local_hist){
std::fill_n(local_hist, nbins*nbins*9, 0);
});
/// histogram magnitudes
shared_array<ptrdiff_t> local_histm_threaded(
nbins*nbins,
[&](ptrdiff_t* local_hist){
std::fill_n(local_hist, nbins*nbins, 0);
});
/// set up bin sizes
std::vector<double> bin1size, bin2size;
bin1size.resize(4);
bin2size.resize(4);
if (fc == THREE)
{
for (unsigned int i=0; i<3; i++)
{
bin1size[i] = 2*max_f1_estimate[i] / nbins;
bin2size[i] = 2*max_f2_estimate[i] / nbins;
}
bin1size[3] = max_f1_estimate[3] / nbins;
bin2size[3] = max_f2_estimate[3] / nbins;
}
else if (fc == ONE)
{
for (unsigned int i=0; i<4; i++)
{
bin1size[i] = max_f1_estimate[0] / nbins;
bin2size[i] = max_f2_estimate[0] / nbins;
}
}
{
TIMEZONE("field::RLOOP");
f1->RLOOP(
[&](ptrdiff_t rindex,
ptrdiff_t xindex,
ptrdiff_t yindex,
ptrdiff_t zindex){
ptrdiff_t *local_histm = local_histm_threaded.getMine();
int bin1 = 0;
int bin2 = 0;
if (fc == THREE)
{
ptrdiff_t *local_histc = local_histc_threaded.getMine();
double mag1, mag2;
mag1 = 0.0;
mag2 = 0.0;
for (unsigned int i=0; i<3; i++)
{
double val1 = f1->rval(rindex, i);
mag1 += val1*val1;
int bin1 = int(floor((val1 + max_f1_estimate[i])/bin1size[i]));
mag2 = 0.0;
for (unsigned int j=0; j<3; j++)
{
double val2 = f2->rval(rindex, j);
mag2 += val2*val2;
int bin2 = int(floor((val2 + max_f2_estimate[j])/bin2size[j]));
if ((bin1 >= 0 && bin1 < nbins) &&
(bin2 >= 0 && bin2 < nbins))
local_histc[(bin1*nbins + bin2)*9 + i*3 + j]++;
}
}
bin1 = int(floor(sqrt(mag1)/bin1size[3]));
bin2 = int(floor(sqrt(mag2)/bin2size[3]));
}
else if (fc == ONE)
{
bin1 = int(floor(f1->rval(rindex)/bin1size[3]));
bin2 = int(floor(f2->rval(rindex)/bin2size[3]));
}
if ((bin1 >= 0 && bin1 < nbins) &&
(bin2 >= 0 && bin2 < nbins))
local_histm[bin1*nbins + bin2]++;
});
}
local_histm_threaded.mergeParallel();
ptrdiff_t *histm = new ptrdiff_t[nbins*nbins];
ptrdiff_t *histc = NULL;
if (fc == THREE)
{
local_histc_threaded.mergeParallel();
histc = new ptrdiff_t[nbins*nbins*9];
}
{
MPI_Allreduce(
(void*)local_histm_threaded.getMasterData(),
(void*)histm,
nbins*nbins,
MPI_INT64_T, MPI_SUM, f1->comm);
if (fc == THREE)
MPI_Allreduce(
(void*)local_histc_threaded.getMasterData(),
(void*)histc,
nbins*nbins*9,
MPI_INT64_T, MPI_SUM, f1->comm);
}
if (f1->myrank == 0)
{
TIMEZONE("root-work");
hid_t dset, wspace, mspace;
hsize_t count[5], offset[5];
if (fc == THREE)
{
dset = H5Dopen(group, dsetc.c_str(), H5P_DEFAULT);
assert(dset > 0);
wspace = H5Dget_space(dset);
offset[0] = toffset;
offset[1] = 0;
offset[2] = 0;
offset[3] = 0;
offset[4] = 0;
count[0] = 1;
count[1] = nbins;
count[2] = nbins;
count[3] = 3;
count[4] = 3;
mspace = H5Screate_simple(5, count, NULL);
H5Sselect_hyperslab(wspace, H5S_SELECT_SET, offset, NULL, count, NULL);
H5Dwrite(dset, H5T_NATIVE_INT64, mspace, wspace, H5P_DEFAULT, histc);
H5Sclose(wspace);
H5Sclose(mspace);
H5Dclose(dset);
}
dset = H5Dopen(group, dsetm.c_str(), H5P_DEFAULT);
assert(dset > 0);
offset[0] = toffset;
offset[1] = 0;
offset[2] = 0;
count[0] = 1;
count[1] = nbins;
count[2] = nbins;
mspace = H5Screate_simple(3, count, NULL);
wspace = H5Dget_space(dset);
H5Sselect_hyperslab(wspace, H5S_SELECT_SET, offset, NULL, count, NULL);
H5Dwrite(dset, H5T_NATIVE_INT64, mspace, wspace, H5P_DEFAULT, histm);
H5Sclose(wspace);
H5Sclose(mspace);
H5Dclose(dset);
}
delete[] histm;
if (fc == THREE)
delete[] histc;
return EXIT_SUCCESS;
}
template class field<float, FFTW, ONE>;
template class field<float, FFTW, THREE>;
template class field<float, FFTW, THREExTHREE>;
......@@ -1209,3 +1434,37 @@ template int invert_curl<double, FFTW, SMOOTH>(
field<double, FFTW, THREE> *,
field<double, FFTW, THREE> *);
template int joint_rspace_PDF<float, FFTW, THREE>(
field<float, FFTW, THREE> *,
field<float, FFTW, THREE> *,
const hid_t,
const std::string,
const hsize_t,
const std::vector<double>,
const std::vector<double>);
template int joint_rspace_PDF<double, FFTW, THREE>(
field<double, FFTW, THREE> *,
field<double, FFTW, THREE> *,
const hid_t,
const std::string,
const hsize_t,
const std::vector<double>,
const std::vector<double>);
template int joint_rspace_PDF<float, FFTW, ONE>(
field<float, FFTW, ONE> *,
field<float, FFTW, ONE> *,
const hid_t,
const std::string,
const hsize_t,
const std::vector<double>,
const std::vector<double>);
template int joint_rspace_PDF<double, FFTW, ONE>(
field<double, FFTW, ONE> *,
field<double, FFTW, ONE> *,
const hid_t,
const std::string,
const hsize_t,
const std::vector<double>,
const std::vector<double>);
......@@ -306,5 +306,17 @@ int invert_curl(
field<rnumber, be, THREE> *source,
field<rnumber, be, THREE> *destination);
template <typename rnumber,
field_backend be,
field_components fc>
int joint_rspace_PDF(
field<rnumber, be, fc> *f1,
field<rnumber, be, fc> *f2,
const hid_t group,
const std::string dset_name,
const hsize_t toffset,
const std::vector<double> max_f1_estimate,
const std::vector<double> max_f2_estimate);
#endif//FIELD_HPP
#include <string>
#include <cmath>
#include "joint_acc_vel_stats.hpp"
#include "scope_timer.hpp"
template <typename rnumber>
int joint_acc_vel_stats<rnumber>::initialize(void)
{
this->NSVE_field_stats<rnumber>::initialize();
this->kk = new kspace<FFTW, SMOOTH>(
this->vorticity->clayout, this->dkx, this->dky, this->dkz);
this->ve = new vorticity_equation<rnumber, FFTW>(
this->simname.c_str(),
this->nx,
this->ny,
this->nz,
this->dkx,
this->dky,
this->dkz,
this->vorticity->fftw_plan_rigor);
hid_t parameter_file = H5Fopen(
(this->simname + std::string(".h5")).c_str(),
H5F_ACC_RDONLY,
H5P_DEFAULT);
hid_t dset = H5Dopen(parameter_file, "/parameters/niter_out", H5P_DEFAULT);
H5Dread(dset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, &this->niter_out);
H5Dclose(dset);
if (H5Lexists(parameter_file, "/parameters/checkpoints_per_file", H5P_DEFAULT))
{
dset = H5Dopen(parameter_file, "/parameters/checkpoints_per_file", H5P_DEFAULT);
H5Dread(dset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, &this->checkpoints_per_file);
H5Dclose(dset);
}
else
this->checkpoints_per_file = 1;
H5Fclose(parameter_file);
parameter_file = H5Fopen(
(this->simname + std::string("_post.h5")).c_str(),
H5F_ACC_RDONLY,
H5P_DEFAULT);
this->iteration_list = hdf5_tools::read_vector<int>(
parameter_file,
"/joint_acc_vel_stats/parameters/iteration_list");
dset = H5Dopen(parameter_file, "joint_acc_vel_stats/parameters/max_acceleration_estimate", H5P_DEFAULT);
H5Dread(dset, H5T_NATIVE_DOUBLE, H5S_ALL, H5S_ALL, H5P_DEFAULT, &this->max_acceleration_estimate);
H5Dclose(dset);
dset = H5Dopen(parameter_file, "joint_acc_vel_stats/parameters/max_velocity_estimate", H5P_DEFAULT);
H5Dread(dset, H5T_NATIVE_DOUBLE, H5S_ALL, H5S_ALL, H5P_DEFAULT, &this->max_velocity_estimate);
H5Dclose(dset);
H5Fclose(parameter_file);
if (this->myrank == 0)
{
// set caching parameters
hid_t fapl = H5Pcreate(H5P_FILE_ACCESS);
herr_t cache_err = H5Pset_cache(fapl, 0, 521, 134217728, 1.0);
DEBUG_MSG("when setting stat_file cache I got %d\n", cache_err);
this->stat_file = H5Fopen(
(this->simname + "_post.h5").c_str(),
H5F_ACC_RDWR,
fapl);
}
else
{
this->stat_file = 0;
}
int data_file_problem;
if (this->myrank == 0)
data_file_problem = hdf5_tools::require_size_file_datasets(
this->stat_file,
"joint_acc_vel_stats",
(this->iteration_list.back() / this->niter_out) + 1);
MPI_Bcast(&data_file_problem, 1, MPI_INT, 0, this->comm);
if (data_file_problem > 0)
{
std::cerr <<
data_file_problem <<
" problems setting sizes of file datasets.\ntrying to exit now." <<
std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}