Commit 4e3b1201 authored by Lauri Himanen's avatar Lauri Himanen

Added parsing of input file if found, added support for more functionals,...

Added parsing of input file if found, added support for more functionals, corrected the formatting of functionals, restructured the package a bit.
parent 75773e82
......@@ -70,12 +70,18 @@ easily readable formatting is also provided for the log messages.
## Testing
The parsers can become quite complicated and maintaining them without
systematic testing can become troublesome. Unit tests provide one way to
test each parseable quantity and python has a very good [library for
unit testing](https://docs.python.org/2/library/unittest.html). When the parser
supports a new quantity it is quite fast to create unit tests for it. These
tests will validate the parsing, and also easily detect bugs that may rise when
the code is modified in the future.
systematic testing is impossible. There are general tests that are
performed automatically in the scala layer for all parsers. This is essential,
but can only test that the data is outputted in the correct format and
according to some general rules. These tests cannot verify that the contents
are correct.
In order to truly test the parser output, unit testing is needed. Unit tests
provide one way to test each parseable quantity and python has a very good
[library for unit testing](https://docs.python.org/2/library/unittest.html).
When the parser supports a new quantity it is quite fast to create unit tests
for it. These tests will validate the parsing, and also easily detect bugs that
may rise when the code is modified in the future.
## Unit conversion
You can find unit conversion tools from the python-common repository and its
......
import cp2kparser.utils.logconfig
from cp2kparser.parsing.parser import CP2KParser
from cp2kparser.parser import CP2KParser
import re
import logging
from cp2kparser.utils.baseclasses import ParserInterface
from cp2kparser.parsing.versions.versionsetup import get_main_parser
logger = logging.getLogger(__name__)
#===============================================================================
class CP2KParser(ParserInterface):
"""This class handles the initial setup before any parsing can happen. It
determines which version of CP2K was used to generate the output and then
sets up a correct implementation.
After the implementation has been setup, you can parse the files with
parse().
"""
def __init__(self, main_file, metainfo_to_keep=None, backend=None, default_units=None, metainfo_units=None):
super(CP2KParser, self).__init__(main_file, metainfo_to_keep, backend, default_units, metainfo_units)
def setup_version(self):
"""Setups the version by looking at the output file and the version
specified in it.
"""
# Search for the version specification and initialize a correct
# implementation for this version
regex = re.compile(r" CP2K\| version string:\s+CP2K version ([\d\.]+)")
n_lines = 30
with open(self.parser_context.main_file, 'r') as outputfile:
for i_line in xrange(n_lines):
line = next(outputfile)
result = regex.match(line)
if result:
version_id = result.group(1).replace('.', '')
break
if not result:
logger.error("Could not find a version specification from the given main file.")
self.parser_context.file_storage.setup_file_id(self.parser_context.main_file, "output")
self.main_parser = get_main_parser(version_id)(self.parser_context.main_file, self.parser_context)
def get_metainfo_filename(self):
return "cp2k.nomadmetainfo.json"
def get_parser_info(self):
return {'name': 'cp2k-parser', 'version': '1.0'}
import re
from nomadcore.simple_parser import SimpleMatcher as SM
from nomadcore.caching_backend import CachingLevel
from cp2kparser.utils.baseclasses import MainParser
import numpy as np
import logging
logger = logging.getLogger("nomad.CP2KParser")
#===============================================================================
class CP2KMainParser(MainParser):
"""The main parser class.
"""
def __init__(self, file_path, parser_context):
"""Initialize an output parser.
"""
super(CP2KMainParser, self).__init__(file_path, parser_context)
self.regex_f = "-?\d+\.\d+(?:E(?:\+|-)\d+)?" # Regex for a floating point value
self.regex_i = "-?\d+" # Regex for an integer
# Define the output parsing tree for this version
self.root_matcher = SM("",
forwardMatch=True,
sections=['section_run', "section_system_description"],
subMatchers=[
SM( r" DBCSR\| Multiplication driver",
sections=['cp2k_section_dbcsr'],
),
SM( r" \*\*\*\* \*\*\*\* \*\*\*\*\*\* \*\* PROGRAM STARTED AT\s+(?P<cp2k_run_start_date>\d{4}-\d{2}-\d{2}) (?P<cp2k_run_start_time>\d{2}:\d{2}:\d{2}.\d{3})",
sections=['cp2k_section_startinformation'],
),
SM( r" CP2K\|",
sections=['cp2k_section_programinformation'],
forwardMatch=True,
subMatchers=[
SM( r" CP2K\| version string:\s+(?P<program_version>[\w\d\W\s]+)"),
SM( r" CP2K\| source code revision number:\s+svn:(?P<cp2k_svn_revision>\d+)"),
]
),
SM( r" CP2K\| Input file name\s+(?P<cp2k_input_filename>.+$)",
sections=['cp2k_section_filenames'],
subMatchers=[
SM( r" GLOBAL\| Basis set file name\s+(?P<cp2k_basis_set_filename>.+$)"),
SM( r" GLOBAL\| Geminal file name\s+(?P<cp2k_geminal_filename>.+$)"),
SM( r" GLOBAL\| Potential file name\s+(?P<cp2k_potential_filename>.+$)"),
SM( r" GLOBAL\| MM Potential file name\s+(?P<cp2k_mm_potential_filename>.+$)"),
SM( r" GLOBAL\| Coordinate file name\s+(?P<cp2k_coordinate_filename>.+$)"),
]
),
SM( " CELL\|",
adHoc=self.adHoc_cp2k_section_cell(),
otherMetaInfo=["simulation_cell"]
),
SM( " DFT\|",
sections=["section_method"],
otherMetaInfo=["XC_functional", "self_interaction_correction_method"],
forwardMatch=True,
subMatchers=[
SM( " DFT\| Multiplicity\s+(?P<target_multiplicity>{})".format(self.regex_i)),
SM( " DFT\| Charge\s+(?P<total_charge>{})".format(self.regex_i)),
SM( " DFT\| Self-interaction correction \(SIC\)\s+(?P<self_interaction_correction_method>[^\n]+)"),
SM( " FUNCTIONAL\| ([\w\d\W\s]+):",
forwardMatch=True,
repeats=True,
sections=["section_XC_functionals"],
adHoc=self.adHoc_section_XC_functionals()
)
]
),
SM( " TOTAL NUMBERS AND MAXIMUM NUMBERS",
sections=["cp2k_section_total_numbers"],
subMatchers=[
SM( "\s+- Atoms:\s+(?P<number_of_atoms>\d+)"),
SM( "\s+- Shell sets:\s+(?P<cp2k_shell_sets>\d+)")
]
),
SM( " MODULE QUICKSTEP: ATOMIC COORDINATES IN angstrom",
adHoc=self.adHoc_cp2k_section_quickstep_atom_information(),
otherMetaInfo=["atom_label", "atom_position"]
),
SM( " SCF WAVEFUNCTION OPTIMIZATION",
sections=["section_single_configuration_calculation"],
subMatchers=[
SM( r"\s+\d+\s+\S+\s+{0}\s+{0}\s+{0}\s+(?P<energy_total_scf_iteration__hartree>{0})\s+{0}".format(self.regex_f),
sections=["section_scf_iteration"],
repeats=True,
),
SM( r" ENERGY\| Total FORCE_EVAL \( \w+ \) energy \(a\.u\.\):\s+(?P<energy_total__hartree>{0})".format(self.regex_f)),
SM( r" ATOMIC FORCES in \[a\.u\.\]"),
SM( r" # Atom Kind Element X Y Z",
adHoc=self.adHoc_atom_forces()
),
]
),
SM( " MD| Molecular Dynamics Protocol",
sections=["cp2k_section_md"],
forwardMatch=True,
subMatchers=[
SM( " ENERGY\| Total FORCE_EVAL",
repeats=True,
sections=["cp2k_section_md_step"],
subMatchers=[
SM( " ATOMIC FORCES in \[a\.u\.\]",
sections=["cp2k_section_md_forces"],
subMatchers=[
SM( "\s+\d+\s+\d+\s+[\w\W\d]+\s+(?P<cp2k_md_force_atom_string>{0}\s+{0}\s+{0})".format(self.regex_f),
sections=["cp2k_section_md_force_atom"],
repeats=True,
)
]
),
SM( " STEP NUMBER\s+=\s+(?P<cp2k_md_step_number>\d+)"),
SM( " TIME \[fs\]\s+=\s+(?P<cp2k_md_step_time>\d+\.\d+)"),
SM( " TEMPERATURE \[K\]\s+=\s+(?P<cp2k_md_temperature_instantaneous>{0})\s+(?P<cp2k_md_temperature_average>{0})".format(self.regex_f)),
SM( " i =",
sections=["cp2k_section_md_coordinates"],
otherMetaInfo=["cp2k_md_coordinates"],
dependencies={"cp2k_md_coordinates": ["cp2k_md_coordinate_atom_string"]},
subMatchers=[
SM( " \w+\s+(?P<cp2k_md_coordinate_atom_string>{0}\s+{0}\s+{0})".format(self.regex_f),
endReStr="\n",
sections=["cp2k_section_md_coordinate_atom"],
repeats=True,
)
]
)
]
)
]
)
]
)
#=======================================================================
# The cache settings
self.caching_level_for_metaname = {
'section_XC_functionals': CachingLevel.ForwardAndCache,
'self_interaction_correction_method': CachingLevel.Cache,
'cp2k_section_md_coordinates': CachingLevel.Cache,
'cp2k_section_md_coordinate_atom': CachingLevel.Cache,
'cp2k_md_coordinate_atom_string': CachingLevel.Cache,
'cp2k_md_coordinate_atom_float': CachingLevel.Cache,
'cp2k_section_md_forces': CachingLevel.Cache,
'cp2k_section_md_force_atom': CachingLevel.Cache,
'cp2k_md_force_atom_string': CachingLevel.Cache,
'cp2k_md_force_atom_float': CachingLevel.Cache,
}
#===========================================================================
# The functions that trigger when sections are closed
def onClose_section_method(self, backend, gIndex, section):
"""When all the functional definitions have been gathered, matches them
with the nomad correspondents and combines into one single string which
is put into the backend.
"""
# Combine the functional names into a one big string that is placed
# into XC_functional
functional_names = []
section_XC_functionals = section["section_XC_functionals"]
for functional in section_XC_functionals:
functional_name = functional["XC_functional_name"][0]
functional_names.append(functional_name)
functionals = "_".join(sorted(functional_names))
backend.addValue('XC_functional', functionals)
# Transform the CP2K self-interaction correction string to the NOMAD
# correspondent, and push directly to the superBackend to avoid caching
sic_cp2k = section["self_interaction_correction_method"][0]
sic_map = {
"NO": "",
"AD SIC": "SIC_AD",
"Explicit Orbital SIC": "SIC_EXPLICIT_ORBITALS",
"SPZ/MAURI SIC": "SIC_MAURI_SPZ",
"US/MAURI SIC": "SIC_MAURI_US",
}
sic_nomad = sic_map.get(sic_cp2k)
if sic_nomad is not None:
backend.superBackend.addValue('self_interaction_correction_method', sic_nomad)
else:
logger.warning("Unknown self-interaction correction method used.")
def onClose_cp2k_section_md_coordinate_atom(self, backend, gIndex, section):
"""Given the string with the coordinate components for one atom, make it
into a numpy array of coordinate components and store for later
concatenation.
"""
force_string = section["cp2k_md_coordinate_atom_string"][0]
components = np.array([float(x) for x in force_string.split()])
backend.addArrayValues("cp2k_md_coordinate_atom_float", components)
def onClose_cp2k_section_md_coordinates(self, backend, gIndex, section):
"""When all the coordinates for individual atoms have been gathered,
concatenate them into one big array and forward to the backend.
"""
forces = section["cp2k_md_coordinate_atom_float"]
forces = np.array(forces)
backend.addArrayValues("cp2k_md_coordinates", forces)
def onClose_cp2k_section_md_force_atom(self, backend, gIndex, section):
"""Given the string with the force components for one atom, make it
into a numpy array of force components and store for later
concatenation.
"""
force_string = section["cp2k_md_force_atom_string"][0]
components = np.array([float(x) for x in force_string.split()])
backend.addArrayValues("cp2k_md_force_atom_float", components)
def onClose_cp2k_section_md_forces(self, backend, gIndex, section):
"""When all the forces for individual atoms have been gathered,
concatenate them into one big array and forward to the backend.
"""
forces = section["cp2k_md_force_atom_float"]
forces = np.array(forces)
backend.addArrayValues("cp2k_md_forces", forces, unit="forceAu")
#===========================================================================
# adHoc functions that are used to do custom parsing. Primarily these
# functions are used for data that is formatted as a table or a list.
def adHoc_section_XC_functionals(self):
"""Used to extract the functional information.
"""
def wrapper(parser):
# Define the regex that extracts the information
regex_string = " FUNCTIONAL\| ([\w\d\W\s]+):"
regex_compiled = re.compile(regex_string)
# Parse out the functional name
functional_name = None
line = parser.fIn.readline()
result = regex_compiled.match(line)
if result:
functional_name = result.groups()[0]
# Define a mapping for the functionals
functional_map = {
"LYP": "GGA_C_LYP",
"BECKE88": "GGA_X_B88",
"PADE": "LDA_XC_TETER93",
"LDA": "LDA_XC_TETER93",
"BLYP": "HYB_GGA_XC_B3LYP",
}
# If match found, add the functional definition to the backend
nomad_name = functional_map.get(functional_name)
if nomad_name is not None:
parser.backend.addValue('XC_functional_name', nomad_name)
return wrapper
def adHoc_cp2k_section_cell(self):
"""Used to extract the cell information.
"""
def wrapper(parser):
# Read the lines containing the cell vectors
a_line = parser.fIn.readline()
b_line = parser.fIn.readline()
c_line = parser.fIn.readline()
# Define the regex that extracts the components and apply it to the lines
regex_string = r" CELL\| Vector \w \[angstrom\]:\s+({0})\s+({0})\s+({0})".format(self.regex_f)
regex_compiled = re.compile(regex_string)
a_result = regex_compiled.match(a_line)
b_result = regex_compiled.match(b_line)
c_result = regex_compiled.match(c_line)
# Convert the string results into a 3x3 numpy array
cell = np.zeros((3, 3))
cell[0, :] = [float(x) for x in a_result.groups()]
cell[1, :] = [float(x) for x in b_result.groups()]
cell[2, :] = [float(x) for x in c_result.groups()]
# Push the results to the correct section
parser.backend.addArrayValues("simulation_cell", cell, unit="angstrom")
return wrapper
def adHoc_cp2k_section_quickstep_atom_information(self):
"""Used to extract the initial atomic coordinates and names in the
Quickstep module.
"""
def wrapper(parser):
# Define the regex that extracts the information
regex_string = r"\s+\d+\s+\d+\s+(\w+)\s+\d+\s+({0})\s+({0})\s+({0})".format(self.regex_f)
regex_compiled = re.compile(regex_string)
match = True
coordinates = []
labels = []
# Currently these three lines are not processed
parser.fIn.readline()
parser.fIn.readline()
parser.fIn.readline()
while match:
line = parser.fIn.readline()
result = regex_compiled.match(line)
if result:
match = True
label = result.groups()[0]
labels.append(label)
coordinate = [float(x) for x in result.groups()[1:]]
coordinates.append(coordinate)
else:
match = False
coordinates = np.array(coordinates)
labels = np.array(labels)
# If anything found, push the results to the correct section
if len(coordinates) != 0:
parser.backend.addArrayValues("atom_position", coordinates, unit="angstrom")
parser.backend.addArrayValues("atom_label", labels)
return wrapper
def adHoc_atom_forces(self):
"""Used to extract the final atomic forces printed at the end of an
ENERGY_FORCE calculation is the PRINT setting is on.
"""
def wrapper(parser):
end_str = " SUM OF ATOMIC FORCES"
end = False
force_array = []
# Loop through coordinates until the sum of forces is read
while not end:
line = parser.fIn.readline()
if line.startswith(end_str):
end = True
else:
forces = line.split()[-3:]
forces = [float(x) for x in forces]
force_array.append(forces)
force_array = np.array(force_array)
# If anything found, push the results to the correct section
if len(force_array) != 0:
parser.backend.addArrayValues("atom_forces", force_array, unit="forceAu")
return wrapper
"""Returns the implementation classes based on the given version identifier.
The different version are grouped into subpackages.
"""
import importlib
import logging
logger = logging.getLogger(__name__)
def get_main_parser(version_id):
# Currently the version id is a pure integer, so it can directly be mapped
# into a package name.
base = "cp2kparser.parsing.versions.cp2k{}.".format(version_id)
try:
main_parser = importlib.import_module(base + "mainparser").CP2KMainParser
except ImportError:
logger.debug("A parser with the version id '{}' could not be found. Defaulting to the base implementation based on CP2K 2.6.2.".format(version_id))
base = "cp2kparser.parsing.versions.cp2k262."
main_parser = importlib.import_module(base + "mainparser").CP2KMainParser
return main_parser
This diff is collapsed.
"""
This module is used to control the logging of the parser.
Each module in the package can have it's own logger, so that you can control
the logging on a modular level easily.
If you want to use a logger on a module simply add the following in the module
preamble:
import logging
logger = logging.getLogger(__name__)
This creates a logger with a hierarchical name. The hierarchical name allows
the logger to inherit logger properties from a parent logger, but also allows
module level control for logging.
A custom formatting is also used for the log messages. The formatting is done
by the LogFormatter class and is different for different levels.
"""
import logging
import textwrap
#===============================================================================
class LogFormatter(logging.Formatter):
def format(self, record):
level = record.levelname
module = record.module
message = record.msg
if level == "INFO" or level == "DEBUG":
return make_titled_message("{}:{}".format(level, module), message)
else:
return "\n " + make_title(level, width=64) + "\n" + make_message(message, width=64, spaces=8) + "\n"
#===============================================================================
def make_titled_message(title, message, width=80):
"""Styles a message to be printed into console.
"""
wrapper = textwrap.TextWrapper(width=width-5)
lines = wrapper.wrap(message)
styled_message = ""
first = True
for line in lines:
if first:
new_line = " >> {}: ".format(title) + line
styled_message += new_line
first = False
else:
new_line = 5*" " + line
styled_message += "\n" + new_line
return styled_message
#===============================================================================
def make_message(message, width=80, spaces=0):
"""Styles a message to be printed into console.
"""
wrapper = textwrap.TextWrapper(width=width-6)
lines = wrapper.wrap(message)
styled_message = ""
first = True
for line in lines:
new_line = spaces*" " + "| " + line + (width-6-len(line))*" " + " |"
if first:
styled_message += new_line
first = False
else:
styled_message += "\n" + new_line
styled_message += "\n" + spaces*" " + "|" + (width-2)*"-" + "|"
return styled_message
#===============================================================================
def make_title(title, width=80):
"""Styles a title to be printed into console.
"""
space = width-len(title)-4
pre_space = space/2-1
post_space = space-pre_space
line = "|" + str((pre_space)*"=") + " "
line += title
line += " " + str((post_space)*"=") + "|"
return line
#===============================================================================
# The highest level logger setup
root_logger = logging.getLogger("cp2kparser")
root_logger.setLevel(logging.INFO)
# Create console handler and set level to debug
root_console_handler = logging.StreamHandler()
root_console_handler.setLevel(logging.DEBUG)
root_console_formatter = LogFormatter()
root_console_handler.setFormatter(root_console_formatter)
root_logger.addHandler(root_console_handler)
"""
Tools for testing a nomad parser.
"""
import os
import sys
import json
import numpy as np
#===============================================================================
def get_parser(path, metainfopath, parserbuilderclass, metainfo_to_keep=[], metainfo_to_skip=[], stream=sys.stdout):
"""Initialize a parser that is able to parse the contents in the given path.
Args:
path: String pointing to a path where all the calculation files are
stream: The stream where the results are dumped.
metainfopath: The metainfo filepath as a Get string
parserbuilder: An object that inherits the ParserBuilder class and can
create optimized parsers.
"""
# Scan the given path for all files
files = {}
for filename in os.listdir(path):
files[os.path.join(path, filename)] = ""
json_input = {
"version": "nomadparsein.json 1.0",
"metaInfoFile": metainfopath,
"metainfoToKeep": metainfo_to_keep,