-
Cristian Lalescu authoredCristian Lalescu authored
PP.py 36.30 KiB
#######################################################################
# #
# Copyright 2015 Max Planck Institute #
# for Dynamics and Self-Organization #
# #
# This file is part of bfps. #
# #
# bfps is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published #
# by the Free Software Foundation, either version 3 of the License, #
# or (at your option) any later version. #
# #
# bfps is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with bfps. If not, see <http://www.gnu.org/licenses/> #
# #
# Contact: Cristian.Lalescu@ds.mpg.de #
# #
#######################################################################
import os
import sys
import shutil
import subprocess
import argparse
import h5py
import math
import numpy as np
import warnings
import bfps
from ._code import _code
from bfps import tools
class PP(_code):
"""This class is meant to stitch together the C++ code into a final source file,
compile it, and handle all job launching.
"""
def __init__(
self,
work_dir = './',
simname = 'test'):
_code.__init__(
self,
work_dir = work_dir,
simname = simname)
self.host_info = {'type' : 'cluster',
'environment' : None,
'deltanprocs' : 1,
'queue' : '',
'mail_address': '',
'mail_events' : None}
self.generate_default_parameters()
return None
def set_precision(
self,
fluid_dtype):
if fluid_dtype in [np.float32, np.float64]:
self.fluid_dtype = fluid_dtype
elif fluid_dtype in ['single', 'double']:
if fluid_dtype == 'single':
self.fluid_dtype = np.dtype(np.float32)
elif fluid_dtype == 'double':
self.fluid_dtype = np.dtype(np.float64)
self.rtype = self.fluid_dtype
if self.rtype == np.float32:
self.ctype = np.dtype(np.complex64)
self.C_field_dtype = 'float'
self.fluid_precision = 'single'
elif self.rtype == np.float64:
self.ctype = np.dtype(np.complex128)
self.C_field_dtype = 'double'
self.fluid_precision = 'double'
return None
def write_src(self):
self.version_message = (
'/***********************************************************************\n' +
'* this code automatically generated by bfps\n' +
'* version {0}\n'.format(bfps.__version__) +
'***********************************************************************/\n\n\n')
self.include_list = [
'"base.hpp"',
'"scope_timer.hpp"',
'"fftw_interface.hpp"',
'"full_code/main_code.hpp"',
'<cmath>',
'<iostream>',
'<hdf5.h>',
'<string>',
'<cstring>',
'<fftw3-mpi.h>',
'<omp.h>',
'<cfenv>',
'<cstdlib>',
'"full_code/{0}.hpp"\n'.format(self.dns_type)]
self.main = """
int main(int argc, char *argv[])
{{
bool fpe = (
(getenv("BFPS_FPE_OFF") == nullptr) ||
(getenv("BFPS_FPE_OFF") != std::string("TRUE")));
return main_code< {0} >(argc, argv, fpe);
}}
""".format(self.dns_type + '<{0}>'.format(self.C_field_dtype))
self.includes = '\n'.join(
['#include ' + hh
for hh in self.include_list])
with open(self.name + '.cpp', 'w') as outfile:
outfile.write(self.version_message + '\n\n')
outfile.write(self.includes + '\n\n')
outfile.write(self.main + '\n')
return None
def generate_default_parameters(self):
# these parameters are relevant for all PP classes
self.parameters['dealias_type'] = int(1)
self.parameters['dkx'] = float(1.0)
self.parameters['dky'] = float(1.0)
self.parameters['dkz'] = float(1.0)
self.parameters['nu'] = float(0.1)
self.parameters['fmode'] = int(1)
self.parameters['famplitude'] = float(0.5)
self.parameters['fk0'] = float(2.0)
self.parameters['fk1'] = float(4.0)
self.parameters['forcing_type'] = 'linear'
self.pp_parameters = {}
self.pp_parameters['iteration_list'] = np.zeros(1).astype(np.int)
return None
def extra_postprocessing_parameters(
self,
dns_type = 'joint_acc_vel_stats'):
pars = {}
if dns_type == 'joint_acc_vel_stats':
pars['max_acceleration_estimate'] = float(10)
pars['max_velocity_estimate'] = float(1)
pars['histogram_bins'] = int(129)
return pars
def get_data_file_name(self):
return os.path.join(self.work_dir, self.simname + '.h5')
def get_data_file(self):
return h5py.File(self.get_data_file_name(), 'r')
def get_particle_file_name(self):
return os.path.join(self.work_dir, self.simname + '_particles.h5')
def get_particle_file(self):
return h5py.File(self.get_particle_file_name(), 'r')
def get_postprocess_file_name(self):
return os.path.join(self.work_dir, self.simname + '_postprocess.h5')
def get_postprocess_file(self):
return h5py.File(self.get_postprocess_file_name(), 'r')
def compute_statistics(self, iter0 = 0, iter1 = None):
"""Run basic postprocessing on raw data.
The energy spectrum :math:`E(t, k)` and the enstrophy spectrum
:math:`\\frac{1}{2}\omega^2(t, k)` are computed from the
.. math::
\sum_{k \\leq \\|\\mathbf{k}\\| \\leq k+dk}\\hat{u_i} \\hat{u_j}^*, \\hskip .5cm
\sum_{k \\leq \\|\\mathbf{k}\\| \\leq k+dk}\\hat{\omega_i} \\hat{\\omega_j}^*
tensors, and the enstrophy spectrum is also used to
compute the dissipation :math:`\\varepsilon(t)`.
These basic quantities are stored in a newly created HDF5 file,
``simname_postprocess.h5``.
"""
if len(list(self.statistics.keys())) > 0:
return None
self.read_parameters()
with self.get_data_file() as data_file:
if 'moments' not in data_file['statistics'].keys():
return None
iter0 = min((data_file['statistics/moments/velocity'].shape[0] *
self.parameters['niter_stat']-1),
iter0)
if type(iter1) == type(None):
iter1 = data_file['iteration'].value
else:
iter1 = min(data_file['iteration'].value, iter1)
ii0 = iter0 // self.parameters['niter_stat']
ii1 = iter1 // self.parameters['niter_stat']
self.statistics['kshell'] = data_file['kspace/kshell'].value
self.statistics['kM'] = data_file['kspace/kM'].value
self.statistics['dk'] = data_file['kspace/dk'].value
computation_needed = True
pp_file = h5py.File(self.get_postprocess_file_name(), 'a')
if 'ii0' in pp_file.keys():
computation_needed = not (ii0 == pp_file['ii0'].value and
ii1 == pp_file['ii1'].value)
if computation_needed:
for k in pp_file.keys():
del pp_file[k]
if computation_needed:
pp_file['iter0'] = iter0
pp_file['iter1'] = iter1
pp_file['ii0'] = ii0
pp_file['ii1'] = ii1
pp_file['t'] = (self.parameters['dt']*
self.parameters['niter_stat']*
(np.arange(ii0, ii1+1).astype(np.float)))
pp_file['energy(t, k)'] = (
data_file['statistics/spectra/velocity_velocity'][ii0:ii1+1, :, 0, 0] +
data_file['statistics/spectra/velocity_velocity'][ii0:ii1+1, :, 1, 1] +
data_file['statistics/spectra/velocity_velocity'][ii0:ii1+1, :, 2, 2])/2
pp_file['enstrophy(t, k)'] = (
data_file['statistics/spectra/vorticity_vorticity'][ii0:ii1+1, :, 0, 0] +
data_file['statistics/spectra/vorticity_vorticity'][ii0:ii1+1, :, 1, 1] +
data_file['statistics/spectra/vorticity_vorticity'][ii0:ii1+1, :, 2, 2])/2
pp_file['vel_max(t)'] = data_file['statistics/moments/velocity'] [ii0:ii1+1, 9, 3]
pp_file['renergy(t)'] = data_file['statistics/moments/velocity'][ii0:ii1+1, 2, 3]/2
for k in ['t',
'energy(t, k)',
'enstrophy(t, k)',
'vel_max(t)',
'renergy(t)']:
if k in pp_file.keys():
self.statistics[k] = pp_file[k].value
self.compute_time_averages()
return None
def compute_time_averages(self):
"""Compute easy stats.
Further computation of statistics based on the contents of
``simname_postprocess.h5``.
Standard quantities are as follows
(consistent with [Ishihara]_):
.. math::
U_{\\textrm{int}}(t) = \\sqrt{\\frac{2E(t)}{3}}, \\hskip .5cm
L_{\\textrm{int}}(t) = \\frac{\pi}{2U_{int}^2(t)} \\int \\frac{dk}{k} E(t, k), \\hskip .5cm
T_{\\textrm{int}}(t) =
\\frac{L_{\\textrm{int}}(t)}{U_{\\textrm{int}}(t)}
\\eta_K = \\left(\\frac{\\nu^3}{\\varepsilon}\\right)^{1/4}, \\hskip .5cm
\\tau_K = \\left(\\frac{\\nu}{\\varepsilon}\\right)^{1/2}, \\hskip .5cm
\\lambda = \\sqrt{\\frac{15 \\nu U_{\\textrm{int}}^2}{\\varepsilon}}
Re = \\frac{U_{\\textrm{int}} L_{\\textrm{int}}}{\\nu}, \\hskip
.5cm
R_{\\lambda} = \\frac{U_{\\textrm{int}} \\lambda}{\\nu}
.. [Ishihara] T. Ishihara et al,
*Small-scale statistics in high-resolution direct numerical
simulation of turbulence: Reynolds number dependence of
one-point velocity gradient statistics*.
J. Fluid Mech.,
**592**, 335-366, 2007
"""
for key in ['energy', 'enstrophy']:
self.statistics[key + '(t)'] = (self.statistics['dk'] *
np.sum(self.statistics[key + '(t, k)'], axis = 1))
self.statistics['Uint(t)'] = np.sqrt(2*self.statistics['energy(t)'] / 3)
self.statistics['Lint(t)'] = ((self.statistics['dk']*np.pi /
(2*self.statistics['Uint(t)']**2)) *
np.nansum(self.statistics['energy(t, k)'] /
self.statistics['kshell'][None, :], axis = 1))
for key in ['energy',
'enstrophy',
'vel_max',
'Uint',
'Lint']:
if key + '(t)' in self.statistics.keys():
self.statistics[key] = np.average(self.statistics[key + '(t)'], axis = 0)
for suffix in ['', '(t)']:
self.statistics['diss' + suffix] = (self.parameters['nu'] *
self.statistics['enstrophy' + suffix]*2)
self.statistics['etaK' + suffix] = (self.parameters['nu']**3 /
self.statistics['diss' + suffix])**.25
self.statistics['tauK' + suffix] = (self.parameters['nu'] /
self.statistics['diss' + suffix])**.5
self.statistics['Re' + suffix] = (self.statistics['Uint' + suffix] *
self.statistics['Lint' + suffix] /
self.parameters['nu'])
self.statistics['lambda' + suffix] = (15 * self.parameters['nu'] *
self.statistics['Uint' + suffix]**2 /
self.statistics['diss' + suffix])**.5
self.statistics['Rlambda' + suffix] = (self.statistics['Uint' + suffix] *
self.statistics['lambda' + suffix] /
self.parameters['nu'])
self.statistics['kMeta' + suffix] = (self.statistics['kM'] *
self.statistics['etaK' + suffix])
if self.parameters['dealias_type'] == 1:
self.statistics['kMeta' + suffix] *= 0.8
self.statistics['Tint'] = self.statistics['Lint'] / self.statistics['Uint']
self.statistics['Taylor_microscale'] = self.statistics['lambda']
return None
def set_plt_style(
self,
style = {'dashes' : (None, None)}):
self.style.update(style)
return None
def convert_complex_from_binary(
self,
field_name = 'vorticity',
iteration = 0,
file_name = None):
"""read the Fourier representation of a vector field.
Read the binary file containing iteration ``iteration`` of the
field ``field_name``, and write it in a ``.h5`` file.
"""
data = np.memmap(
os.path.join(self.work_dir,
self.simname + '_{0}_i{1:0>5x}'.format('c' + field_name, iteration)),
dtype = self.ctype,
mode = 'r',
shape = (self.parameters['ny'],
self.parameters['nz'],
self.parameters['nx']//2+1,
3))
if type(file_name) == type(None):
file_name = self.simname + '_{0}_i{1:0>5x}.h5'.format('c' + field_name, iteration)
file_name = os.path.join(self.work_dir, file_name)
f = h5py.File(file_name, 'a')
f[field_name + '/complex/{0}'.format(iteration)] = data
f.close()
return None
def job_parser_arguments(
self,
parser):
parser.add_argument(
'--ncpu',
type = int,
dest = 'ncpu',
default = -1)
parser.add_argument(
'--np', '--nprocesses',
metavar = 'NPROCESSES',
help = 'number of mpi processes to use',
type = int,
dest = 'nb_processes',
default = 4)
parser.add_argument(
'--ntpp', '--nthreads-per-process',
type = int,
dest = 'nb_threads_per_process',
metavar = 'NTHREADS_PER_PROCESS',
help = 'number of threads to use per MPI process',
default = 1)
parser.add_argument(
'--no-submit',
action = 'store_true',
dest = 'no_submit')
parser.add_argument(
'--environment',
type = str,
dest = 'environment',
default = None)
parser.add_argument(
'--minutes',
type = int,
dest = 'minutes',
default = 5,
help = 'If environment supports it, this is the requested wall-clock-limit.')
parser.add_argument(
'--njobs',
type = int, dest = 'njobs',
default = 1)
return None
def simulation_parser_arguments(
self,
parser):
parser.add_argument(
'--simname',
type = str, dest = 'simname',
default = 'test')
parser.add_argument(
'--wd',
type = str, dest = 'work_dir',
default = './')
parser.add_argument(
'--precision',
choices = ['single', 'double'],
type = str,
default = 'single')
parser.add_argument(
'--iter0',
type = int,
dest = 'iter0',
default = 0)
parser.add_argument(
'--iter1',
type = int,
dest = 'iter1',
default = 0)
return None
def particle_parser_arguments(
self,
parser):
parser.add_argument(
'--particle-rand-seed',
type = int,
dest = 'particle_rand_seed',
default = None)
parser.add_argument(
'--pclouds',
type = int,
dest = 'pclouds',
default = 1,
help = ('number of particle clouds. Particle "clouds" '
'consist of particles distributed according to '
'pcloud-type.'))
parser.add_argument(
'--pcloud-type',
choices = ['random-cube',
'regular-cube'],
dest = 'pcloud_type',
default = 'random-cube')
parser.add_argument(
'--particle-cloud-size',
type = float,
dest = 'particle_cloud_size',
default = 2*np.pi)
return None
def add_parser_arguments(
self,
parser):
subparsers = parser.add_subparsers(
dest = 'DNS_class',
help = 'type of simulation to run')
subparsers.required = True
parser_native_binary_to_hdf5 = subparsers.add_parser(
'native_binary_to_hdf5',
help = 'convert native binary to hdf5')
self.simulation_parser_arguments(parser_native_binary_to_hdf5)
self.job_parser_arguments(parser_native_binary_to_hdf5)
self.parameters_to_parser_arguments(parser_native_binary_to_hdf5)
parser_get_rfields = subparsers.add_parser(
'get_rfields',
help = 'get real space velocity field')
self.simulation_parser_arguments(parser_get_rfields)
self.job_parser_arguments(parser_get_rfields)
self.parameters_to_parser_arguments(parser_get_rfields)
parser_joint_acc_vel_stats = subparsers.add_parser(
'joint_acc_vel_stats',
help = 'get joint acceleration and velocity statistics')
self.simulation_parser_arguments(parser_joint_acc_vel_stats)
self.job_parser_arguments(parser_joint_acc_vel_stats)
self.parameters_to_parser_arguments(parser_joint_acc_vel_stats)
self.parameters_to_parser_arguments(
parser_joint_acc_vel_stats,
parameters = self.extra_postprocessing_parameters('joint_acc_vel_stats'))
return None
def prepare_launch(
self,
args = []):
"""Set up reasonable parameters.
With the default Lundgren forcing applied in the band [2, 4],
we can estimate the dissipation, therefore we can estimate
:math:`k_M \\eta_K` and constrain the viscosity.
In brief, the command line parameter :math:`k_M \\eta_K` is
used in the following formula for :math:`\\nu` (:math:`N` is the
number of real space grid points per coordinate):
.. math::
\\nu = \\left(\\frac{2 k_M \\eta_K}{N} \\right)^{4/3}
With this choice, the average dissipation :math:`\\varepsilon`
will be close to 0.4, and the integral scale velocity will be
close to 0.77, yielding the approximate value for the Taylor
microscale and corresponding Reynolds number:
.. math::
\\lambda \\approx 4.75\\left(\\frac{2 k_M \\eta_K}{N} \\right)^{4/6}, \\hskip .5in
R_\\lambda \\approx 3.7 \\left(\\frac{N}{2 k_M \\eta_K} \\right)^{4/6}
"""
opt = _code.prepare_launch(self, args = args)
self.set_precision(opt.precision)
self.dns_type = opt.DNS_class
self.name = self.dns_type + '-' + self.fluid_precision + '-v' + bfps.__version__
# merge parameters if needed
for k in self.pp_parameters.keys():
self.parameters[k] = self.pp_parameters[k]
self.pars_from_namespace(opt)
niter_out = self.get_data_file()['parameters/niter_out'].value
assert(opt.iter0 % niter_out == 0)
self.pp_parameters['iteration_list'] = np.arange(
opt.iter0, opt.iter1+niter_out, niter_out, dtype = np.int)
return opt
def launch(
self,
args = [],
**kwargs):
opt = self.prepare_launch(args = args)
self.launch_jobs(opt = opt, **kwargs)
return None
def get_checkpoint_0_fname(self):
return os.path.join(
self.work_dir,
self.simname + '_checkpoint_0.h5')
def generate_tracer_state(
self,
rseed = None,
species = 0):
with h5py.File(self.get_checkpoint_0_fname(), 'a') as data_file:
dset = data_file[
'tracers{0}/state/0'.format(species)]
if not type(rseed) == type(None):
np.random.seed(rseed)
nn = self.parameters['nparticles']
cc = int(0)
batch_size = int(1e6)
while nn > 0:
if nn > batch_size:
dset[cc*batch_size:(cc+1)*batch_size] = np.random.random(
(batch_size, 3))*2*np.pi
nn -= batch_size
else:
dset[cc*batch_size:cc*batch_size+nn] = np.random.random(
(nn, 3))*2*np.pi
nn = 0
cc += 1
return None
def generate_vector_field(
self,
rseed = 7547,
spectra_slope = 1.,
amplitude = 1.,
iteration = 0,
field_name = 'vorticity',
write_to_file = False,
# to switch to constant field, use generate_data_3D_uniform
# for scalar_generator
scalar_generator = tools.generate_data_3D):
"""generate vector field.
The generated field is not divergence free, but it has the proper
shape.
:param rseed: seed for random number generator
:param spectra_slope: spectrum of field will look like k^(-p)
:param amplitude: all amplitudes are multiplied with this value
:param iteration: the field is written at this iteration
:param field_name: the name of the field being generated
:param write_to_file: should we write the field to file?
:param scalar_generator: which function to use for generating the
individual components.
Possible values: bfps.tools.generate_data_3D,
bfps.tools.generate_data_3D_uniform
:type rseed: int
:type spectra_slope: float
:type amplitude: float
:type iteration: int
:type field_name: str
:type write_to_file: bool
:type scalar_generator: function
:returns: ``Kdata``, a complex valued 4D ``numpy.array`` that uses the
transposed FFTW layout.
Kdata[ky, kz, kx, i] is the amplitude of mode (kx, ky, kz) for
the i-th component of the field.
(i.e. x is the fastest index and z the slowest index in the
real-space representation).
"""
np.random.seed(rseed)
Kdata00 = scalar_generator(
self.parameters['nz']//2,
self.parameters['ny']//2,
self.parameters['nx']//2,
p = spectra_slope,
amplitude = amplitude).astype(self.ctype)
Kdata01 = scalar_generator(
self.parameters['nz']//2,
self.parameters['ny']//2,
self.parameters['nx']//2,
p = spectra_slope,
amplitude = amplitude).astype(self.ctype)
Kdata02 = scalar_generator(
self.parameters['nz']//2,
self.parameters['ny']//2,
self.parameters['nx']//2,
p = spectra_slope,
amplitude = amplitude).astype(self.ctype)
Kdata0 = np.zeros(
Kdata00.shape + (3,),
Kdata00.dtype)
Kdata0[..., 0] = Kdata00
Kdata0[..., 1] = Kdata01
Kdata0[..., 2] = Kdata02
Kdata1 = tools.padd_with_zeros(
Kdata0,
self.parameters['nz'],
self.parameters['ny'],
self.parameters['nx'])
if write_to_file:
Kdata1.tofile(
os.path.join(self.work_dir,
self.simname + "_c{0}_i{1:0>5x}".format(field_name, iteration)))
return Kdata1
def copy_complex_field(
self,
src_file_name,
src_dset_name,
dst_file,
dst_dset_name,
make_link = True):
# I define a min_shape thingie, but for now I only trust this method for
# the case of increasing/decreasing by the same factor in all directions.
# in principle we could write something more generic, but i'm not sure
# how complicated that would be
dst_shape = (self.parameters['nz'],
self.parameters['ny'],
(self.parameters['nx']+2) // 2,
3)
src_file = h5py.File(src_file_name, 'r')
if (src_file[src_dset_name].shape == dst_shape):
if make_link and (src_file[src_dset_name].dtype == self.ctype):
dst_file[dst_dset_name] = h5py.ExternalLink(
src_file_name,
src_dset_name)
else:
dst_file.create_dataset(
dst_dset_name,
shape = dst_shape,
dtype = self.ctype,
fillvalue = 0.0)
for kz in range(src_file[src_dset_name].shape[0]):
dst_file[dst_dset_name][kz] = src_file[src_dset_name][kz]
else:
print('aloha')
min_shape = (min(dst_shape[0], src_file[src_dset_name].shape[0]),
min(dst_shape[1], src_file[src_dset_name].shape[1]),
min(dst_shape[2], src_file[src_dset_name].shape[2]),
3)
print(self.ctype)
dst_file.create_dataset(
dst_dset_name,
shape = dst_shape,
dtype = np.dtype(self.ctype),
fillvalue = complex(0))
for kz in range(min_shape[0]):
dst_file[dst_dset_name][kz,:min_shape[1], :min_shape[2]] = \
src_file[src_dset_name][kz, :min_shape[1], :min_shape[2]]
return None
def prepare_post_file(
self,
opt = None):
self.pp_parameters.update(
self.extra_postprocessing_parameters(self.dns_type))
self.pars_from_namespace(
opt,
parameters = self.pp_parameters,
get_sim_info = False)
for kk in ['nx', 'ny', 'nz']:
self.parameters[kk] = self.get_data_file()['parameters/' + kk].value
n = self.parameters['nx']
if self.dns_type in ['filtered_slices',
'filtered_acceleration']:
if type(opt.klist_kmax) == type(None):
opt.klist_kmax = n / 3.
if type(opt.klist_kmin) == type(None):
opt.klist_kmin = 6.
kvals = bfps_addons.tools.power_space_array(
power = opt.klist_power,
size = opt.klist_size,
vmin = opt.klist_kmin,
vmax = opt.klist_kmax)
if opt.test_klist:
for i in range(opt.klist_size):
print('kcut{0} = {1}, ell{0} = {2:.3e}'.format(
i, kvals[i], 2*np.pi / kvals[i]))
opt.no_submit = True
self.pp_parameters['kcut'] = kvals
self.rewrite_par(
group = self.dns_type + '/parameters',
parameters = self.pp_parameters,
file_name = os.path.join(self.work_dir, self.simname + '_post.h5'))
if 'histogram_bins' in opt.__dict__.keys():
histogram_bins = opt.histogram_bins
if (type(histogram_bins) == type(None) and
'histogram_bins' in self.pp_parameters.keys()):
histogram_bins = self.pp_parameters['histogram_bins']
with h5py.File(os.path.join(self.work_dir, self.simname + '_post.h5'), 'r+') as ofile:
group = ofile[self.dns_type]
group.require_group('histograms')
group.require_group('moments')
group.require_group('spectra')
vec_spectra_stats = []
vec4_rspace_stats = []
scal_rspace_stats = []
if self.dns_type == 'joint_acc_vel_stats':
vec_spectra_stats.append('velocity')
vec4_rspace_stats.append('velocity')
vec_spectra_stats.append('acceleration')
vec4_rspace_stats.append('acceleration')
for quantity in scal_rspace_stats:
if quantity not in group['histograms'].keys():
time_chunk = 2**20 // (8*histogram_bins)
time_chunk = max(time_chunk, 1)
group['histograms'].create_dataset(
quantity,
(1, histogram_bins),
chunks = (time_chunk, histogram_bins),
maxshape = (None, histogram_bins),
dtype = np.int64)
else:
assert(histogram_bins ==
group['histograms/' + quantity].shape[1])
if quantity not in group['moments'].keys():
time_chunk = 2**20 // (8*10)
time_chunk = max(time_chunk, 1)
group['moments'].create_dataset(
quantity,
(1, 10),
chunks = (time_chunk, 10),
maxshape = (None, 10),
dtype = np.float64)
if self.dns_type == 'joint_acc_vel_stats':
quantity = 'acceleration_and_velocity_components'
if quantity not in group['histograms'].keys():
time_chunk = 2**20 // (8*9*histogram_bins**2)
time_chunk = max(time_chunk, 1)
group['histograms'].create_dataset(
quantity,
(1, histogram_bins, histogram_bins, 3, 3),
chunks = (time_chunk, histogram_bins, histogram_bins, 3, 3),
maxshape = (None, histogram_bins, histogram_bins, 3, 3),
dtype = np.int64)
quantity = 'acceleration_and_velocity_magnitudes'
if quantity not in group['histograms'].keys():
time_chunk = 2**20 // (8*histogram_bins**2)
time_chunk = max(time_chunk, 1)
group['histograms'].create_dataset(
quantity,
(1, histogram_bins, histogram_bins),
chunks = (time_chunk, histogram_bins, histogram_bins),
maxshape = (None, histogram_bins, histogram_bins),
dtype = np.int64)
ncomps = 4
for quantity in vec4_rspace_stats:
if quantity not in group['histograms'].keys():
time_chunk = 2**20 // (8*histogram_bins*ncomps)
time_chunk = max(time_chunk, 1)
group['histograms'].create_dataset(
quantity,
(1, histogram_bins, ncomps),
chunks = (time_chunk, histogram_bins, ncomps),
maxshape = (None, histogram_bins, ncomps),
dtype = np.int64)
if quantity not in group['moments'].keys():
time_chunk = 2**20 // (8*10*ncomps)
time_chunk = max(time_chunk, 1)
group['moments'].create_dataset(
quantity,
(1, 10, ncomps),
chunks = (time_chunk, 10, ncomps),
maxshape = (None, 10, ncomps),
dtype = np.float64)
time_chunk = 2**20 // (
4*3*
self.parameters['nx']*self.parameters['ny'])
time_chunk = max(time_chunk, 1)
for quantity in vec_spectra_stats:
df = self.get_data_file()
if quantity + '_' + quantity not in group['spectra'].keys():
spec_chunks = df['statistics/spectra/velocity_velocity'].chunks
spec_shape = df['statistics/spectra/velocity_velocity'].shape
spec_maxshape = df['statistics/spectra/velocity_velocity'].maxshape
group['spectra'].create_dataset(
quantity + '_' + quantity,
spec_shape,
chunks = spec_chunks,
maxshape = spec_maxshape,
dtype = np.float64)
df.close()
return None
def prepare_field_file(self, iter0 = 0):
df = self.get_data_file()
if 'field_dtype' in df.keys():
# we don't need to do anything, raw binary files are used
return None
last_iteration = df['iteration'].value
cppf = df['parameters/checkpoints_per_file'].value
niter_out = df['parameters/niter_out'].value
with h5py.File(os.path.join(self.work_dir, self.simname + '_fields.h5'), 'a') as ff:
ff.require_group('vorticity')
ff.require_group('vorticity/complex')
checkpoint = (iter0 // niter_out) // cppf
while True:
cpf_name = os.path.join(
self.work_dir,
self.simname + '_checkpoint_{0}.h5'.format(checkpoint))
if os.path.exists(cpf_name):
cpf = h5py.File(cpf_name, 'r')
for iter_name in cpf['vorticity/complex'].keys():
if iter_name not in ff['vorticity/complex'].keys():
ff['vorticity/complex/' + iter_name] = h5py.ExternalLink(
cpf_name,
'vorticity/complex/' + iter_name)
checkpoint += 1
else:
break
return None
def launch_jobs(
self,
opt = None,
particle_initial_condition = None):
self.prepare_post_file(opt)
self.prepare_field_file(iter0 = opt.iter0)
self.run(
nb_processes = opt.nb_processes,
nb_threads_per_process = opt.nb_threads_per_process,
njobs = opt.njobs,
hours = opt.minutes // 60,
minutes = opt.minutes % 60,
no_submit = opt.no_submit,
err_file = 'err_' + self.dns_type,
out_file = 'out_' + self.dns_type)
return None