Commit 50e511a8 authored by Cristian Lalescu's avatar Cristian Lalescu
Browse files

use separate particle file

also, set "allocate_time" to early for particle datasets. it does seem
to have some negligible effect...
parent 73d3d465
......@@ -24,6 +24,7 @@
import sys
import os
import numpy as np
import h5py
......@@ -338,6 +339,7 @@ class NavierStokes(_fluid_particle_base):
self.fluid_includes += '#include <cstring>\n'
self.fluid_variables += ('fluid_solver<{0}> *fs;\n'.format(self.C_dtype) +
'int *kindices;\n' +
'hid_t particle_file;\n' +
'hid_t H5T_field_complex;\n')
self.fluid_definitions += """
typedef struct {{
......@@ -373,6 +375,15 @@ class NavierStokes(_fluid_particle_base):
H5Tinsert(H5T_field_complex, "r", HOFFSET(tmp_complex_type, re), {2});
H5Tinsert(H5T_field_complex, "i", HOFFSET(tmp_complex_type, im), {2});
}}
if (myrank == 0)
{{
// set caching parameters
hid_t fapl = H5Pcreate(H5P_FILE_ACCESS);
herr_t cache_err = H5Pset_cache(fapl, 0, 521, 134217728, 1.0);
DEBUG_MSG("when setting cache for particles I got %d\\n", cache_err);
sprintf(fname, "%s_particles.h5", simname);
particle_file = H5Fopen(fname, H5F_ACC_RDWR, fapl);
}}
//endcpp
""".format(self.C_dtype, self.fftw_plan_rigor, field_H5T)
if not self.frozen_fields:
......@@ -480,9 +491,9 @@ class NavierStokes(_fluid_particle_base):
self.parameters['tracers{0}_integration_steps'.format(s0 + s)] = integration_steps[s]
self.file_datasets_grow += """
//begincpp
temp_string = (std::string("/particles/") +
temp_string = (std::string("/") +
std::string(ps{0}->name));
group = H5Gopen(stat_file, temp_string.c_str(), H5P_DEFAULT);
group = H5Gopen(particle_file, temp_string.c_str(), H5P_DEFAULT);
grow_particle_datasets(group, temp_string.c_str(), NULL, NULL);
H5Gclose(group);
//endcpp
......@@ -519,10 +530,10 @@ class NavierStokes(_fluid_particle_base):
if (myrank == 0)
{{
//VELOCITY begin
std::string temp_string = (std::string("/particles/") +
std::string temp_string = (std::string("/") +
std::string(ps{0}->name) +
std::string("/velocity"));
hid_t Cdset = H5Dopen(stat_file, temp_string.c_str(), H5P_DEFAULT);
hid_t Cdset = H5Dopen(particle_file, temp_string.c_str(), H5P_DEFAULT);
hid_t mspace, wspace;
int ndims;
hsize_t count[3], offset[3];
......@@ -540,10 +551,10 @@ class NavierStokes(_fluid_particle_base):
if not type(acc_name) == type(None):
output_vel_acc += """
//ACCELERATION begin
temp_string = (std::string("/particles/") +
temp_string = (std::string("/") +
std::string(ps{0}->name) +
std::string("/acceleration"));
Cdset = H5Dopen(stat_file, temp_string.c_str(), H5P_DEFAULT);
Cdset = H5Dopen(particle_file, temp_string.c_str(), H5P_DEFAULT);
H5Dwrite(Cdset, H5T_NATIVE_DOUBLE, mspace, wspace, H5P_DEFAULT, acceleration);
H5Sclose(mspace);
H5Sclose(wspace);
......@@ -570,7 +581,7 @@ class NavierStokes(_fluid_particle_base):
for s in range(nspecies):
neighbours = self.parameters[interpolator[s] + '_neighbours']
self.particle_start += 'sprintf(fname, "tracers{0}");\n'.format(s0 + s)
self.particle_end += ('ps{0}->write(stat_file);\n' +
self.particle_end += ('ps{0}->write(particle_file);\n' +
'delete ps{0};\n').format(s0 + s)
self.particle_variables += 'rFFTW_particles<VELOCITY_TRACER, {0}, {1}> *ps{2};\n'.format(
self.C_dtype,
......@@ -586,7 +597,7 @@ class NavierStokes(_fluid_particle_base):
interpolator[s])
self.particle_start += ('ps{0}->dt = dt;\n' +
'ps{0}->iteration = iteration;\n' +
'ps{0}->read(stat_file);\n').format(s0 + s)
'ps{0}->read(particle_file);\n').format(s0 + s)
if not frozen_particles:
if type(kcut) == list:
update_field = ('fs->low_pass_Fourier(fs->cvelocity, 3, {0});\n'.format(kcut[s]) +
......@@ -594,7 +605,7 @@ class NavierStokes(_fluid_particle_base):
self.particle_loop += update_field
self.particle_loop += '{0}->field = fs->rvelocity;\n'.format(interpolator[s])
self.particle_loop += 'ps{0}->step();\n'.format(s0 + s)
self.particle_stat_src += 'ps{0}->write(stat_file, false);\n'.format(s0 + s)
self.particle_stat_src += 'ps{0}->write(particle_file, false);\n'.format(s0 + s)
self.particle_start += output_vel_acc
self.particle_stat_src += output_vel_acc
self.particle_stat_src += '}\n'
......@@ -604,6 +615,10 @@ class NavierStokes(_fluid_particle_base):
return os.path.join(self.work_dir, self.simname + '.h5')
def get_data_file(self):
return h5py.File(self.get_data_file_name(), 'r')
def get_particle_file_name(self):
return os.path.join(self.work_dir, self.simname + '_particles.h5')
def get_particle_file(self):
return h5py.File(self.get_particle_file_name(), 'r')
def get_postprocess_file_name(self):
return os.path.join(self.work_dir, self.simname + '_postprocess.h5')
def get_postprocess_file(self):
......@@ -627,11 +642,11 @@ class NavierStokes(_fluid_particle_base):
self.statistics['kshell'] = data_file['kspace/kshell'].value
self.statistics['kM'] = data_file['kspace/kM'].value
self.statistics['dk'] = data_file['kspace/dk'].value
if self.particle_species > 0:
self.trajectories = [data_file['particles/' + key + '/state'][
iter0//self.parameters['niter_part'] :
iter1//self.parameters['niter_part']+1]
for key in data_file['particles'].keys()]
#if self.particle_species > 0:
# self.trajectories = [data_file['particles/' + key + '/state'][
# iter0//self.parameters['niter_part'] :
# iter1//self.parameters['niter_part']+1]
# for key in data_file['particles'].keys()]
computation_needed = True
pp_file = h5py.File(self.get_postprocess_file_name(), 'a')
if 'ii0' in pp_file.keys():
......@@ -746,7 +761,7 @@ class NavierStokes(_fluid_particle_base):
3))
def write_par(self, iter0 = 0):
_fluid_particle_base.write_par(self, iter0 = iter0)
with h5py.File(os.path.join(self.work_dir, self.simname + '.h5'), 'r+') as ofile:
with h5py.File(self.get_data_file_name(), 'r+') as ofile:
kspace = self.get_kspace()
nshells = kspace['nshell'].shape[0]
for k in ['velocity', 'vorticity']:
......@@ -788,44 +803,6 @@ class NavierStokes(_fluid_particle_base):
4),
dtype = np.int64,
compression = 'gzip')
for s in range(self.particle_species):
time_chunk = 2**20 // (8*3*
self.parameters['nparticles']*
self.parameters['tracers{0}_integration_steps'.format(s)])
time_chunk = max(time_chunk, 1)
ofile.create_dataset('particles/tracers{0}/rhs'.format(s),
(1,
self.parameters['tracers{0}_integration_steps'.format(s)],
self.parameters['nparticles'],
3),
maxshape = (None,
self.parameters['tracers{0}_integration_steps'.format(s)],
self.parameters['nparticles'],
3),
chunks = (time_chunk,
self.parameters['tracers{0}_integration_steps'.format(s)],
self.parameters['nparticles'],
3),
dtype = np.float64)
time_chunk = 2**20 // (8*3*self.parameters['nparticles'])
time_chunk = max(time_chunk, 1)
ofile.create_dataset(
'/particles/tracers{0}/velocity'.format(s),
(1,
self.parameters['nparticles'],
3),
chunks = (time_chunk, self.parameters['nparticles'], 3),
maxshape = (None, self.parameters['nparticles'], 3),
dtype = np.float64)
if self.parameters['tracers{0}_acc_on'.format(s)]:
ofile.create_dataset(
'/particles/tracers{0}/acceleration'.format(s),
(1,
self.parameters['nparticles'],
3),
chunks = (time_chunk, self.parameters['nparticles'], 3),
maxshape = (None, self.parameters['nparticles'], 3),
dtype = np.float64)
if self.QR_stats_on:
time_chunk = 2**20//(8*3*self.parameters['histogram_bins'])
time_chunk = max(time_chunk, 1)
......@@ -888,6 +865,80 @@ class NavierStokes(_fluid_particle_base):
self.parameters['QR2D_histogram_bins']),
dtype = np.int64,
compression = 'gzip')
if self.particle_species == 0:
return None
def create_particle_dataset(
data_file,
dset_name,
dset_shape,
dset_maxshape,
dset_chunks,
# maybe something more general can be used here
dset_dtype = h5py.h5t.IEEE_F64LE):
# create the dataspace.
space_id = h5py.h5s.create_simple(
dset_shape,
dset_maxshape)
# create the dataset creation property list.
dcpl = h5py.h5p.create(h5py.h5p.DATASET_CREATE)
# set the allocation time to "early".
dcpl.set_alloc_time(h5py.h5d.ALLOC_TIME_EARLY)
dcpl.set_chunk(dset_chunks)
# and now create dataset
if sys.version_info[0] == 3:
dset_name = dset_name.encode()
return h5py.h5d.create(
data_file.id,
dset_name,
dset_dtype,
space_id,
dcpl,
h5py.h5p.DEFAULT)
with h5py.File(self.get_particle_file_name(), 'a') as ofile:
for s in range(self.particle_species):
ofile.create_group('tracers{0}'.format(s))
time_chunk = 2**20 // (8*3*
self.parameters['nparticles']*
self.parameters['tracers{0}_integration_steps'.format(s)])
time_chunk = max(time_chunk, 1)
dims = (1,
self.parameters['tracers{0}_integration_steps'.format(s)],
self.parameters['nparticles'],
3)
maxshape = (h5py.h5s.UNLIMITED,
self.parameters['tracers{0}_integration_steps'.format(s)],
self.parameters['nparticles'],
3)
chunks = (time_chunk,
self.parameters['tracers{0}_integration_steps'.format(s)],
self.parameters['nparticles'],
3)
create_particle_dataset(
ofile,
'/tracers{0}/rhs'.format(s),
dims, maxshape, chunks)
time_chunk = 2**20 // (8*3*self.parameters['nparticles'])
time_chunk = max(time_chunk, 1)
create_particle_dataset(
ofile,
'/tracers{0}/state'.format(s),
(1, self.parameters['nparticles'], 3),
(h5py.h5s.UNLIMITED, self.parameters['nparticles'], 3),
(time_chunk, self.parameters['nparticles'], 3))
create_particle_dataset(
ofile,
'/tracers{0}/velocity'.format(s),
(1, self.parameters['nparticles'], 3),
(h5py.h5s.UNLIMITED, self.parameters['nparticles'], 3),
(time_chunk, self.parameters['nparticles'], 3))
if self.parameters['tracers{0}_acc_on'.format(s)]:
create_particle_dataset(
ofile,
'/tracers{0}/acceleration'.format(s),
(1, self.parameters['nparticles'], 3),
(h5py.h5s.UNLIMITED, self.parameters['nparticles'], 3),
(time_chunk, self.parameters['nparticles'], 3))
return None
def add_particle_fields(
self,
......
......@@ -97,7 +97,7 @@ class _code(_base):
// set caching parameters
hid_t fapl = H5Pcreate(H5P_FILE_ACCESS);
herr_t cache_err = H5Pset_cache(fapl, 0, 521, 134217728, 1.0);
DEBUG_MSG("when setting cache I got %d\\n", cache_err);
DEBUG_MSG("when setting stat_file cache I got %d\\n", cache_err);
stat_file = H5Fopen(fname, H5F_ACC_RDWR, fapl);
}
//endcpp
......@@ -113,6 +113,7 @@ class _code(_base):
H5Dwrite(Cdset, H5T_NATIVE_INT, H5S_ALL, H5S_ALL, H5P_DEFAULT, &iteration);
H5Dclose(Cdset);
H5Fclose(stat_file);
H5Fclose(particle_file);
}
fftwf_mpi_cleanup();
fftw_mpi_cleanup();
......
......@@ -328,19 +328,8 @@ class _fluid_particle_base(_code):
if testing:
#data[0] = np.array([3.26434, 4.24418, 3.12157])
data[0] = np.array([ 0.72086101, 2.59043666, 6.27501953])
with h5py.File(os.path.join(self.work_dir, self.simname + '.h5'), 'r+') as data_file:
time_chunk = 2**20 // (8*ncomponents*
self.parameters['nparticles'])
time_chunk = max(time_chunk, 1)
dset = data_file.create_dataset(
'/particles/tracers{0}/state'.format(species),
(1,
self.parameters['nparticles'],
ncomponents),
chunks = (time_chunk, self.parameters['nparticles'], ncomponents),
maxshape = (None, self.parameters['nparticles'], ncomponents),
dtype = np.float64)
dset[0] = data
with h5py.File(self.get_particle_file_name(), 'r+') as data_file:
data_file['tracers{0}/state'.format(species)][0] = data
if write_to_file:
data.tofile(
os.path.join(
......
......@@ -198,7 +198,7 @@ void rFFTW_particles<particle_type, rnumber, interp_neighbours>::read(hid_t data
{
if (this->myrank == 0)
{
std::string temp_string = (std::string("/particles/") +
std::string temp_string = (std::string("/") +
std::string(this->name) +
std::string("/state"));
hid_t dset = H5Dopen(data_file_id, temp_string.c_str(), H5P_DEFAULT);
......@@ -218,7 +218,7 @@ void rFFTW_particles<particle_type, rnumber, interp_neighbours>::read(hid_t data
H5Dclose(dset);
if (this->iteration > 0)
{
temp_string = (std::string("/particles/") +
temp_string = (std::string("/") +
std::string(this->name) +
std::string("/rhs"));
dset = H5Dopen(data_file_id, temp_string.c_str(), H5P_DEFAULT);
......@@ -261,7 +261,7 @@ void rFFTW_particles<particle_type, rnumber, interp_neighbours>::write(hid_t dat
{
if (this->myrank == 0)
{
std::string temp_string = (std::string("/particles/") +
std::string temp_string = (std::string("/") +
std::string(this->name) +
std::string("/state"));
hid_t dset = H5Dopen(data_file_id, temp_string.c_str(), H5P_DEFAULT);
......@@ -281,7 +281,7 @@ void rFFTW_particles<particle_type, rnumber, interp_neighbours>::write(hid_t dat
H5Dclose(dset);
if (write_rhs)
{
temp_string = (std::string("/particles/") +
temp_string = (std::string("/") +
std::string(this->name) +
std::string("/rhs"));
dset = H5Dopen(data_file_id, temp_string.c_str(), H5P_DEFAULT);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment