Commit f33aa43b authored by Simeon Doetsch's avatar Simeon Doetsch

Refactored most of the package

parent 6b30d360
import multiprocessing
import os
import subprocess
import numpy as np
import matplotlib.pyplot as plt
# local imports
from .plutodata import PlutoData
from .simulation import Simulation
def parameter_generator(sim: Simulation, plot_func, output_path,
plot_args: dict={'vmin': None, 'vmax': None},
save_args: dict={'bbox_inches': 'tight'}):
for i in range(sim.n):
yield (sim, i, plot_func, output_path, plot_args, save_args)
def generate_frame(sim, i, plot_func, output_path, plot_args, save_args):
fig = plot_func(sim[i], **plot_args)
fig.savefig(f"{output_path}{i:04d}.png", **save_args)
plt.close(fig)
del sim[i]
def render_frames_parallel(sim: Simulation, plot_func, output_path: str='',
plot_args: dict={'vmin': None, 'vmax': None},
save_args: dict={'bbox_inches': 'tight'}):
with multiprocessing.Pool() as p:
p.starmap(generate_frame, parameter_generator(sim, plot_func, output_path,
plot_args, save_args))
def generate_animation(sim: Simulation, plot_func, output_name: str='animation.mp4',
framerate: int=25,
plot_args: dict={'vmin': None, 'vmax': None},
save_args: dict={'bbox_inches': 'tight'}):
os.mkdir('tmp')
render_frames_parallel(sim, plot_func, 'tmp/', plot_args, save_args)
subprocess.run(['ffmpeg', '-framerate', f'{framerate:d}', '-i', 'tmp/%04d.png', output_name])
subprocess.run(['rm', '-r', 'tmp'])
import numpy as np
def generate_coord_mapping(coordinates: str) -> dict:
mappings = {
'cartesian': {
'x': 'x1',
'y': 'x2',
'z': 'x3',
},
'polar': {
'r': 'x1',
'z': 'x2'
},
'cylindrical': {
'r': 'x1',
'phi': 'x2',
'z': 'x3'
},
'spherical': {
'r': 'x1',
'theta': 'x2',
'phi': 'x3',
}
}
if coordinates not in mappings:
raise NotImplementedError(f'Coordinate system {coordinates} not implemented')
mapping = mappings[coordinates]
velocities = {}
for key, value in mapping.items():
velocities[f"v{key}"] = f"v{value}"
mapping.update(velocities)
return mapping
def generate_tex_mapping(coordinates: str) -> dict:
mappings = {
'cartesian': {
'x1': 'x',
'x2': 'y',
'x3': 'z'
},
'polar': {
'x1': 'r',
'x2': 'z'
},
'cylindrical': {
'x1': 'r',
'x2': r'\phi',
'x3': 'z'
},
'spherical': {
'x1': 'r',
'x2': r'\theta',
'x3': r'\phi'
}
}
if coordinates not in mappings:
raise NotImplementedError(f'Tex mappings for {coordinates} not implemented')
mapping = mappings[coordinates]
velocities = {}
for key, value in mapping:
velocities[f"v{key}"] = f"v_{value}"
mapping.update(velocities)
mapping['rho'] = r'\rho'
mapping['prs'] = 'p'
return mapping
def generate_coordinate_mesh(coordinates, x1, x2):
if coordinates == 'cartesian':
return x1, x2
elif coordinates == 'spherical':
r, theta = np.meshgrid(x1, x2)
x = r * np.sin(theta)
y = r * np.cos(theta)
return x, y
elif coordinates == 'cylindrical':
r, phi = np.meshgrid(x1, x2)
x = r * np.cos(phi)
y = r * np.sin(phi)
return x, y
\ No newline at end of file
import numpy as np
from collections import OrderedDict
from itertools import zip_longest
from .coordinates import generate_coordinate_mesh
class Grid:
"""
dims: dimensions
shape: shape of arrays
size: total cells
"""
def __init__(self, gridfile):
self.coordinates = None
self.mappings = {}
self.mappings_tex = {}
self.read_gridfile(gridfile)
def set_coordinate_system(self, coordinates, mappings={}, mappings_tex={}):
self.coordinates = coordinates
self.mappings = mappings
self.mappings_tex = mappings_tex
def read_gridfile(self, gridfile_path) -> None:
x = []
dims = []
with open(gridfile_path, 'r') as gf:
# read all dimensions
while True:
# read line by line, stop if EOF
line = gf.readline()
if not line:
break
# ignore comments
if line[0] == '#':
continue
# find line with resolution in dimension
splitted = line.split()
if len(splitted) == 1:
dim = int(splitted[0])
dims.append(dim)
# read all data from dimension, moves file pointer
data = np.fromfile(gf, sep=' ', count=dim*3).reshape(-1, 3)
# calculate center of cell, and difference between cells
x.append((np.sum(data[:, 1:], axis=1)/2, data[:, 2] - data[:, 1]))
# save in grid datastructure
for i, xn in enumerate(x, start=1):
setattr(self, f"x{i}", xn[0])
setattr(self, f"dx{i}", xn[1])
self.dims = tuple(dims)
shape = []
if self.dims[0] > 1:
shape.append(self.dims[0])
if self.dims[1] > 1:
shape.append(self.dims[1])
if self.dims[2] > 1:
shape.append(self.dims[2])
self.data_shape = tuple(shape)
self.size = np.product(self.dims)
def mesh(self):
return generate_coordinate_mesh(self.coordinates, self.x1, self.x2)
def __getattr__(self, name):
try:
return object.__getattribute__(self, self.mappings[name])
except KeyError:
raise AttributeError
def __str__(self):
return f"PLUTO Grid, Dimensions {self.dims}"
__repr__ = __str__
class SimulationMetadata:
def __init__(self, path, format) -> None:
self.read_vars(path, format)
def read_vars(self, path, format) -> None:
"""Read simulation step data and written variables"""
with open(path, 'r') as f:
lines = f.readlines()
self.n = len(lines)
# prepare arrays
self.t = np.empty(self.n, float)
self.dt = np.empty(self.n, float)
self.nstep = np.empty(self.n, int)
# information for all steps the same
file_mode, endianness, *self.vars = lines[0].split()[4:]
for i, line in enumerate(lines):
self.t[i], self.dt[i], self.nstep[i] = line.split()[1:4]
if file_mode == 'single_file':
self.file_mode = 'single'
elif file_mode == 'multiple_files':
self.file_mode = 'multiple'
self.charsize = 8 if format == 'dbl' else 4
endianness = '<' if endianness == 'little' else '>'
self.binformat = f"{endianness}f{self.charsize}"
class Pluto_ini(OrderedDict):
"""Parser for Plutocode initialization file pluto.ini"""
......@@ -114,12 +216,12 @@ class Definitions_h(OrderedDict):
# self.userdef = OrderedDict()
## TODO
def _parse_def(self, txt):
if not line:
continue
segments = line.split()
if segments[0] == "#define":
return segments[1].lower(), segments[2].lower()
# def _parse_def(self, txt):
# if not line:
# continue
# segments = line.split()
# if segments[0] == "#define":
# return segments[1].lower(), segments[2].lower()
def parse(self, txt: str=None) -> None:
if txt is None:
......
import multiprocessing
import os
import subprocess
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter
from mpl_toolkits.axes_grid1 import make_axes_locatable
# local imports
from .plutodata import PlutoData
from .simulation import Simulation
def parameter_generator(sim: Simulation, plot_func, output_path,
plot_args: dict={'vmin': None, 'vmax': None},
save_args: dict={'bbox_inches': 'tight'}):
for i in range(sim.n):
yield (sim, i, plot_func, output_path, plot_args, save_args)
from .io import Grid
def generate_frame(sim, i, plot_func, output_path, plot_args, save_args):
fig = plot_func(sim[i], **plot_args)
fig.savefig(f"{output_path}{i:04d}.png", **save_args)
plt.close(fig)
del sim[i]
def plot(data: np.ndarray, grid: Grid, ax=None, label: str=None, figsize=None,
cbar=True, vmin=None, vmax=None, cmap=None, projection: bool=True) -> None:
"""Simple colorplot for 2-dim data"""
def render_frames_parallel(sim: Simulation, plot_func, output_path: str='',
plot_args: dict={'vmin': None, 'vmax': None},
save_args: dict={'bbox_inches': 'tight'}):
with multiprocessing.Pool() as p:
p.starmap(generate_frame, parameter_generator(sim, plot_func, output_path,
plot_args, save_args))
if ax is None:
if figsize is None:
x_size = 6.4
y_size = x_size * grid.dims[1] / grid.dims[0] / 1.1
figsize = (x_size, y_size)
fig, ax = plt.subplots(figsize=figsize)
if projection:
X, Y = grid.mesh()
ax.set_xlabel('$x$')
ax.set_ylabel('$y$')
else:
X, Y = grid.x1, grid.x2
ax.set_xlabel(f"${grid.mappings_tex['x1']}$")
ax.set_ylabel(f"${grid.mappings_tex['x2']}$")
def generate_animation(sim: Simulation, plot_func, output_name: str='animation.mp4',
framerate: int=25,
plot_args: dict={'vmin': None, 'vmax': None},
save_args: dict={'bbox_inches': 'tight'}):
os.mkdir('tmp')
render_frames_parallel(sim, plot_func, 'tmp/', plot_args, save_args)
subprocess.run(['ffmpeg', '-framerate', f'{framerate:d}', '-i', 'tmp/%04d.png', output_name])
subprocess.run(['rm', '-r', 'tmp'])
im = ax.pcolormesh(X, Y, data.T, vmin=vmin, vmax=vmax, cmap=cmap)
ax.set_aspect(1)
if cbar:
formatter = ScalarFormatter()
formatter.set_powerlimits((-2,2))
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="10%", pad=0.05)
plt.colorbar(im, label=label, format=formatter, cax=cax)
return fig, ax
import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter
from mpl_toolkits.axes_grid1 import make_axes_locatable
from .io import Grid
from .plotting import plot
class PlutoData(object):
_coordinate_systems = {'cartesian': ['x', 'y', 'z'],
'cylindrical': ['R', 'z'],
'polar': ['r', 'phi', 'z'],
'spherical': ['r', 'theta', 'phi']}
def __init__(self, n: int=-1, wdir: str="", format: str='dbl', coordinates: str='cartesian',
variables: dict=None, grid: dict=None, parent=None):
def __init__(self, n: int=-1, simulation=None):
"""
Read PLUTO output file
n: output step number. Default: -1, uses last picture
......@@ -19,144 +14,63 @@ class PlutoData(object):
coordinates: 'cartesian', 'cylindrical', 'polar', or 'spherical', only for names
vars [dict]: variables for manual construction
parent [Simulation]: parent Simulation, missing attributes are tried to fetch from there
simulation [Simulation]: parent Simulation, missing attributes are tried to fetch from there
"""
# construct object from simulation simulation
self.simulation = simulation
self.format = simulation.format
self.binformat = simulation.metadata.binformat
self.file_mode = simulation.metadata.file_mode
self.wdir, self.grid, self.vars = simulation.data_dir, simulation.grid, simulation.vars
self.n, self.t, self.dt, self.nstep = n, simulation.t[n], simulation.dt[n], simulation.nstep[n]
self.data = {}
self.format = format
# Read all information in if object is not an child to Simulation
if parent is None:
self.wdir = wdir
# read info about data file
self._read_vars(n)
try:
self.coordinate_system = coordinates
self.coord_names = self._coordinate_systems[coordinates]
except KeyError:
raise KeyError('Coordinate system not recognized')
# read grid data
self._read_grid()
else:
# construct object from parent simulation
self.parent = parent
self.wdir, self.grid, self.vars = parent.wdir, parent.grid, parent.vars
self.n, self.t, self.dt, self.nstep = n, parent.t[n], parent.dt[n], parent.nstep[n]
def __getattr__(self, name):
"""Get grid/data attributes from corresponding dict, or load it"""
# normal attributes
# grid
getattribute = object.__getattribute__
grid = getattribute(self, 'grid')
try:
return object.__getattribute__(self, 'grid')[name]
except KeyError:
return getattr(grid, name)
except AttributeError:
pass
# data
data = getattribute(self, 'data')
try:
return self.data[name]
return data[name]
except KeyError:
if name in self.vars:
self._load_var(name)
return self.data[name]
if name in getattribute(self, 'vars'):
getattribute(self, '_load_var')(name)
return data[name]
# parent
# simulation
try:
return getattr(self.parent, name)
return getattr(getattribute(self, 'simulation'), name)
except AttributeError:
pass
raise AttributeError(f"{type(self)} has no attribute '{name}'")
def _read_vars(self, n: int=-1) -> None:
"""Read simulation step data and written variables"""
with open(os.path.join(self.wdir, f'{self.format}.out'), 'r') as f:
lines = f.readlines()
# find last saved step
if n == -1:
n = int(lines[-1].split()[0])
# save/tranform into wanted variables
n, t, dt, nstep, file_mode, endianness, *self.vars = lines[n].split()
self.n, self.t, self.dt, self.nstep = int(n), float(t), float(dt), int(nstep)
if file_mode == 'single_file':
self._file_mode = 'single'
elif file_mode == 'multiple_files':
self._file_mode = 'multiple'
# format of binary files
if self.format in ['dbl', 'flt']:
self.charsize = 8 if self.format == 'dbl' else 4
endianness = '<' if endianness == 'little' else '>'
self._binformat = f"{endianness}f{self.charsize}"
def _read_grid(self) -> None:
"""
Read PLUTO gridfile and calculate center of cells
"""
x = []
self.dims = []
with open(os.path.join(self.wdir, 'grid.out'), 'r') as gf:
# read all dimensions
while True:
# read line by line, stop if EOF
line = gf.readline()
if not line:
break
# ignore comments
if line[0] == '#':
continue
# find line with resolution in dimension
splitted = line.split()
if len(splitted) == 1:
dim = int(splitted[0])
self.dims.append(dim)
# read all data from dimension, moves file pointer
data = np.fromfile(gf, sep=' ', count=dim*3).reshape(-1, 3)
# calculate center of cell, and difference between cells
x.append((np.sum(data[:, 1:], axis=1)/2, data[:, 2] - data[:, 1]))
# save in grid datastructure
self.grid = {}
for i, (xn, coord_name) in enumerate(zip(x, self.coord_names), start=1):
self.grid[f"x{i}"] = xn[0]
self.grid[f"dx{i}"] = xn[1]
self.grid[coord_name] = self.grid[f"x{i}"]
self.grid[f"d{coord_name}"] = self.grid[f"dx{i}"]
# find shape of data
self.shape = []
if self.dims[0] > 1:
self.shape.append(self.dims[0])
if self.dims[1] > 1:
self.shape.append(self.dims[1])
if self.dims[2] > 1:
self.shape.append(self.dims[2])
self.size = 1
for dim in self.dims: self.size *= dim
def _load_var(self, var):
"""Load data for var into memory. Read either var dbl file (multiple_files mode),
or, slice data from single dbl file"""
if self._file_mode == 'single':
if self.file_mode == 'single':
filename = f"data.{self.n:04d}.{self.format}"
# byte offset of variable in dbl file
offset = self.charsize * self.size * self.vars.index(var)
elif self._file_mode == 'multiple':
elif self.file_mode == 'multiple':
filename = f"{var}.{self.n:04d}.{self.format}"
offset = 0
with open(os.path.join(self.wdir, filename), 'rb') as f:
f.seek(offset)
shape = tuple(reversed(self.shape))
self.data[var] = np.fromfile(f, dtype=self._binformat, count=self.size).reshape(shape).T
shape = tuple(reversed(self.data_shape))
self.data[var] = np.fromfile(f, dtype=self.binformat, count=self.size).reshape(shape).T
def __getitem__(self, var: str) -> np.ndarray:
return self.data[var]
def _latex(self, coord: str, tags: bool=True) -> str:
if coord is None:
......@@ -176,9 +90,8 @@ class PlutoData(object):
return latex_map[coord]
except KeyError:
return coord
def plot(self, var=None, ax=None, figsize=None, cbar=True, vmin=None, vmax=None, cmap=None) -> None:
"""Simple colorplot for 2-dim data"""
def plot(self, var=None, **kwargs):
if var is None:
var = self.vars[0]
if isinstance(var, str):
......@@ -186,36 +99,8 @@ class PlutoData(object):
var = getattr(self, var)
else:
varname = None
if ax is None:
if figsize is None:
x_size = 6.4
y_size = x_size * self.dims[1] / self.dims[0] / 1.1
figsize = (x_size, y_size)
self.fig, self.ax = plt.subplots(figsize=figsize)
ax = self.ax
if self.coordinate_system == 'spherical':
R, THETA = np.meshgrid(self.r, self.theta)
X = R * np.sin(THETA)
Y = R * np.cos(THETA)
ax.set_xlabel(self._latex('$x$'))
ax.set_ylabel(self._latex('$z$'))
else:
X, Y = self.x1, self.x2
ax.set_xlabel(self._latex(self.coord_names[0]))
ax.set_ylabel(self._latex(self.coord_names[1]))
im = ax.pcolormesh(X, Y, var.T, vmin=vmin, vmax=vmax, cmap=cmap)
ax.set_aspect(1)
if cbar:
formatter = ScalarFormatter()
formatter.set_powerlimits((-2,2))
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="10%", pad=0.05)
plt.colorbar(im, label=self._latex(varname), format=formatter, cax=cax)
return plot(var, self.grid, label=f"${self.grid.mappings_tex.get(varname, varname)}$", **kwargs)
def __str__(self) -> None:
return f"""PlutoData, wdir: '{self.wdir}'
......
import os
from os.path import join
import multiprocessing
import warnings
from typing import Generator, Tuple
......@@ -6,8 +7,10 @@ import numpy as np
import matplotlib.pyplot as plt
# local imports
from .plutodata import PlutoData
from .io import Grid, Pluto_ini, Definitions_h, SimulationMetadata
from .coordinates import generate_coord_mapping, generate_tex_mapping
warnings.simplefilter("always")
# warnings.simplefilter("always")
class Simulation:
"""
......@@ -16,70 +19,93 @@ class Simulation:
loads individual files when needed.
Simulation is subscriptable and iterable.
"""
def __init__(self, wdir: str='', format='dbl', coordinates: str='cartesian'):
self.wdir = wdir
self.format = format
try:
self.read_vars()
except FileNotFoundError:
self.wdir = os.path.join(wdir, 'data')
self.read_vars()
supported_formats = ('dbl', 'flt')
def __init__(self, sim_dir: str='', format: str=None, coordinates: str=None):
self.sim_dir = sim_dir
if os.path.exists(join(sim_dir, 'grid.out')):
self.data_dir = sim_dir
elif os.path.exists(join(sim_dir, 'data', 'grid.out')):
self.data_dir = join(sim_dir, 'data')
elif os.path.exists(join(sim_dir, self.ini['Static Grid Output']['output_dir'], 'grid.out')):
self.data_dir = join(join(sim_dir, self.ini['Static Grid Output']['output_dir']))
else:
raise FileNotFoundError("Gridfile not found")
self.grid = Grid(join(self.data_dir, 'grid.out'))
if format is None:
for f in self.supported_formats:
if os.path.exists(join(self.data_dir, f'{f}.out')):
self.format = f
break
try:
self.format
except AttributeError:
raise FileNotFoundError(f'No Metadata file for formats {self.supported_formats} found in {self.data_dir}')
else:
if format not in self.supported_formats:
raise NotImplementedError(f"Format '{format}' not supported")
if os.path.exists(join(self.data_dir, f'{format}.out')):
self.format = format
else:
raise FileNotFoundError(f"Metadata file {join(self.data_dir, f'{format}.out')} for format {format} not found")
try:
self.coordinate_system = coordinates
self.coord_names = PlutoData.