Skip to content
Snippets Groups Projects
Commit 0ebeec98 authored by Lauri Himanen's avatar Lauri Himanen
Browse files

Added some convenience modules for reading trajectory files.

parent 60e87abe
No related branches found
No related tags found
No related merge requests found
......@@ -220,14 +220,13 @@ class FileService(object):
def get_absolute_path_to_file(self, relative_path):
"""
Returns:
If the given . Return none if no file with the given path can
be found.
If the given file is found, returns the absolute path to it. Return
none if no file with the given path can be found.
"""
path = os.path.join(self.root_folder, relative_path)
if os.path.isfile(path):
return path
else:
logger.error("Could not open the file '{}'.".format(path))
return None
def get_file_by_id(self, file_id):
......@@ -245,8 +244,9 @@ class FileService(object):
def set_file_id(self, path, file_id):
"""Used to map a simple identifier string to a file path. When a file
id has been setup, you can easily access the file by using the
functions get_file_handle() or get_file_contents()
id has been setup, you can easily access the filepath anywhere in the
code (FileService is shared via parser_context) by calling
get_file_by_id().
"""
if path is None:
return None
......
import ase.io
import ase.io.formats
import mdtraj as md
import mdtraj.formats
import numpy as np
import logging
logger = logging.getLogger("nomad")
#===============================================================================
def iread(filename, file_format=None):
"""Generator function that is used to read an atomic configuration file (MD
trajectory, geometry optimization, static snapshot) from a file one frame
at a time. Only the xyz positions are returned from the file, and no unit
conversion is done, so you have to be careful with units.
By using a generator pattern we can avoid loading the entire trajectory
file into memory. This function will instead load a chunk of the file into
memory (with MDTraj you can decide the chunk size, with ASE it seems to
always be one frame), and serve individual files from that chunk. Once the
frames in one chunk are iterated, the chunk will be garbage collected and
memory is freed.
Args:
filename: String for the file path.
file_format: String for the file format. If not given the format is
automatically detected from the extension.
Yields:
numpy array containing the atomic positions in one frame.
"""
# If file format is not explicitly stated, determine the format from the
# filename
if file_format is None:
file_format = filename.split(".")[-1]
# Try to open the file with MDTraj first. With a brief inspection it seems
# that MDTraj is better performance wise, because it can iteratively load a
# "chunk" of frames, and still serve the individual frames one by one. ASE
# on the other hand will iteratively read frames one by one (unnecessary
# IO).
mdtraj_chunk = 100 # How many frames MDTraj will load at once
mdtraj_failed = False
# Must use the low level MDTraj API to open files without topology.
class_format_map = {
"dcd": mdtraj.formats.DCDTrajectoryFile,
"xyz": mdtraj.formats.XYZTrajectoryFile,
"pdb": mdtraj.formats.PDBTrajectoryFile,
}
traj_class = class_format_map.get(file_format)
if traj_class is not None:
try:
with traj_class(filename, mode="r") as f:
empty = False
while not empty:
data = f.read(mdtraj_chunk)
if isinstance(data, tuple):
positions = data[0]
else:
positions = data
if len(positions) == 0:
empty = True
else:
for pos in positions:
yield pos
except IOError:
logger.warning("MDTraj could not read the file '{}' with format '{}'. The contents might be malformed or wrong format used.".format(filename, file_format))
return
else:
mdtraj_failed = True
# If MDTraj didn't support the format, try ASE instead
if mdtraj_failed:
try:
io = ase.io.formats.get_ioformat(file_format)
except ValueError:
logger.error("MDTraj could not read the file '{}' with format '{}'. If MDTraj is supposed to read this format, the contents might be malformed.".format(filename, file_format))
return
else:
# Return the positions in a numpy array instead of an ASE Atoms object
generator = ase.io.iread(filename, format=file_format)
for atoms in generator:
pos = atoms.positions
yield pos
import numpy as np
import logging
import re
logger = logging.getLogger(__name__)
#===============================================================================
def iread(filepath, columns, delimiter=r"\s+", comments=r"#", start=None, end=None, n_conf=None):
"""Used to iterate a CSV-like file. If a separator is provided the file
is iterated one configuration at a time. Only keeps one configuration
of the file in memory. If no separator is given, the whole file will be
handled.
The contents are separated into configurations whenever the separator
regex is encountered on a line.
Args:
filepath: Path to the CSV like file to be processed.
columns: List of integers indicating the columns of interest in the CSV file.
start: A regex that is used to indicate the start of a new configuration.
end: A regex that is used to indicate the end of a configuration.
comments: A regex that is used identify comments in the file that are ignored.
n_conf: Number of lines in a configuration. If you want to use multiple
lines as a single configuration.
"""
def split_line(line):
"""Chop off comments, strip, and split at delimiter.
"""
if line.isspace():
return None
if comments:
line = compiled_comments.split(line, maxsplit=1)[0]
line = line.strip('\r\n ')
if line:
return compiled_delimiter.split(line)
else:
return None
def is_end(line):
"""Check if the given line matches the separator pattern.
Separators are used to split a file into multiple configurations.
"""
if end:
return compiled_end.search(line)
return False
def is_start(line):
"""Check if the given line matches the separator pattern.
Separators are used to split a file into multiple configurations.
"""
if start:
return compiled_start.search(line)
return False
# Precompile the different regexs before looping
compiled_delimiter = re.compile(delimiter)
if comments:
comments = (re.escape(comment) for comment in comments)
compiled_comments = re.compile('|'.join(comments))
if end:
compiled_end = re.compile(end)
if start:
compiled_start = re.compile(start)
# Columns as list
if columns is not None:
columns = list(columns)
# Start iterating
configuration = []
started = False
# If no starting and ending condition are provided, read configuration by line
if start is None and end is None and n_conf is None:
with open(filepath, "r") as f:
for line in f: # This actually reads line by line and only keeps the current line in memory
# Ignore comments, separate by delimiter
vals = split_line(line)
line_forces = []
if vals:
for column in columns:
try:
value = vals[column]
except IndexError:
logger.warning("The given index '{}' could not be found on the line '{}'. The given delimiter or index could be wrong.".format(column, line))
return
try:
value = float(value)
except ValueError:
logger.warning("Could not cast value '{}' to float. Currently only floating point values are accepted".format(value))
return
else:
line_forces.append(value)
yield np.array(line_forces)
# If starting and ending condition are provided, after starting condition
# is detected, add the values from lines to a new array that is returned
# when the end condition is met
elif start is not None and end is not None:
with open(filepath, "r") as f:
for line in f: # This actually reads line by line and only keeps the current line in memory
# If a start regex is provided, use it to detect the start of a configuration
if is_start(line):
started = True
continue
# If separator encountered, yield the stored configuration
if is_end(line):
started = False
if configuration:
yield np.array(configuration)
configuration = []
elif start is not None and started:
# Ignore comments, separate by delimiter
vals = split_line(line)
line_forces = []
if vals:
for column in columns:
try:
value = vals[column]
except IndexError:
logger.warning("The given index '{}' could not be found on the line '{}'. The given delimiter or index could be wrong.".format(column, line))
return
try:
value = float(value)
except ValueError:
logger.warning("Could not cast value '{}' to float. Currently only floating point values are accepted".format(value))
return
else:
line_forces.append(value)
configuration.append(line_forces)
# The last configuration is yielded even if separator is not present at
# the end of file or is not given at all
if configuration:
yield np.array(configuration)
# If n_conf is defined, read multiple lines as one configuration
elif start is None and end is None and n_conf is not None:
with open(filepath, "r") as f:
i_line = 0
conf = []
for line in f: # This actually reads line by line and only keeps the current line in memory
# Ignore comments, separate by delimiter
vals = split_line(line)
line_values = []
if vals:
for column in columns:
try:
value = vals[column]
except IndexError:
logger.warning("The given index '{}' could not be found on the line '{}'. The given delimiter or index could be wrong.".format(column, line))
return
try:
value = float(value)
except ValueError:
logger.warning("Could not cast value '{}' to float. Currently only floating point values are accepted".format(value))
return
else:
line_values.append(value)
conf.append(line_values)
i_line += 1
if i_line == n_conf:
yield conf
conf = []
i_line = 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment