-
Markus Scheidgen authoredMarkus Scheidgen authored
backend.py 21.16 KiB
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import TextIO, Tuple, List, Any, Callable, Dict, Iterable
from abc import ABCMeta, abstractmethod
from io import StringIO
import json
import re
import io
from nomadcore.local_backend import LocalBackend as LegacyLocalBackend
from nomadcore.local_backend import Section, Results
from nomad.utils import get_logger
from nomad.metainfo import MSection, Section as MI2Section
logger = get_logger(__name__)
ParserStatus = Tuple[str, List[str]]
class DelegatingMeta(ABCMeta):
def __new__(meta, name, bases, dct):
abstract_method_names = frozenset.union(*(base.__abstractmethods__ for base in bases))
for name in abstract_method_names:
if name not in dct:
dct[name] = DelegatingMeta._make_delegator_method(name)
return super(DelegatingMeta, meta).__new__(meta, name, bases, dct)
@staticmethod
def _make_delegator_method(name):
def delegator(self, *args, **kwargs):
return getattr(self._delegate, name)(*args, **kwargs)
return delegator
class BadContextURI(Exception):
pass
class WrongContextState(Exception):
pass
class AbstractParserBackend(metaclass=ABCMeta):
"""
This ABS provides the parser backend interface used by the NOMAD-coe parsers
and normalizers.
"""
@abstractmethod
def metaInfoEnv(self):
""" Returns the meta info used by this backend. """
pass
@abstractmethod
def startedParsingSession(
self, mainFileUri, parserInfo, parserStatus=None, parserErrors=None):
"""
Should be called when the parsing starts.
ParserInfo should be a valid json dictionary.
"""
pass
@abstractmethod
def finishedParsingSession(
self, parserStatus, parserErrors, mainFileUri=None, parserInfo=None,
parsingStats=None):
""" Called when the parsing finishes. """
pass
@abstractmethod
def openContext(self, contextUri: str):
""" Open existing archive data to introduce new data into an existing section. """
pass
@abstractmethod
def closeContext(self, contextUri: str):
""" Close priorly opened existing archive data again. """
pass
@abstractmethod
def openSection(self, metaName):
""" Opens a new section and returns its new unique gIndex. """
pass
@abstractmethod
def closeSection(self, metaName, gIndex):
"""
Closes the section with the given meta name and index. After this, no more
value can be added to this section.
"""
pass
@abstractmethod
def openNonOverlappingSection(self, metaName):
""" Opens a new non overlapping section. """
pass
@abstractmethod
def setSectionInfo(self, metaName, gIndex, references):
"""
Sets info values of an open section references should be a dictionary with the
gIndexes of the root sections this section refers to.
"""
pass
@abstractmethod
def closeNonOverlappingSection(self, metaName):
"""
Closes the current non overlapping section for the given meta name. After
this, no more value can be added to this section.
"""
pass
@abstractmethod
def openSections(self):
""" Returns the sections that are still open as metaName, gIndex tuples. """
pass
@abstractmethod
def addValue(self, metaName, value, gIndex=-1):
"""
Adds a json value for the given metaName. The gIndex is used to identify
the right parent section.
"""
pass
@abstractmethod
def addRealValue(self, metaName, value, gIndex=-1):
"""
Adds a float value for the given metaName. The gIndex is used to identify
the right parent section.
"""
pass
@abstractmethod
def addArray(self, metaName, shape, gIndex=-1):
"""
Adds an unannitialized array of the given shape for the given metaName.
The gIndex is used to identify the right parent section.
This is neccessary before array values can be set with :func:`setArrayValues`.
"""
@abstractmethod
def setArrayValues(self, metaName, values, offset=None, gIndex=-1):
"""
Adds values of the given numpy array to the last array added for the given
metaName and parent gIndex.
"""
pass
@abstractmethod
def addArrayValues(self, metaName, values, gIndex=-1, override: bool = False):
"""
Adds an array with the given numpy array values for the given metaName and
parent section gIndex. Override determines whether to rewrite exisiting values
in the backend.
"""
pass
@abstractmethod
def pwarn(self, msg):
""" Used to catch parser warnings. """
pass
# The following are extensions to the origin NOMAD-coe parser backend. And allow
# access to existing data
@property
@abstractmethod
def data(self) -> Results:
pass
@abstractmethod
def get_sections(self, meta_name: str, g_index: int = -1) -> List[int]:
""" Return all gIndices for existing sections of the given meta_name and parent section index. """
pass
@abstractmethod
def get_value(self, metaName: str, g_index=-1) -> Any:
"""
Return the value set to the given meta_name in its parent section of the given index.
An index of -1 (default) is only allowed if there is exactly one parent section.
"""
pass
def write_json(
self, out: TextIO, pretty=True, filter: Callable[[str, Any], Any] = None,
root_sections: List[str] = ['section_run', 'section_entry_info']):
""" Writes the backend contents. """
pass
def add_mi2_section(self, section: MSection):
""" Allows to mix a metainfo2 style section into backend. """
pass
def get_mi2_section(self, section_def: MI2Section):
""" Allows to mix a metainfo2 style section into backend. """
pass
def traverse(self, *args, **kwargs) -> Iterable[Tuple[str, str, Any]]:
""" Traverses the backend data and yiels tuples with metainfo name, event type,
and value """
pass
class JSONStreamWriter():
START = 0
OBJECT = 1
ARRAY = 2
KEY_VALUE = 3
"""
A generator that allows to output JSON based on calling 'event' functions.
Its pure python and could be replaced by some faster implementation, e.g. yajl-py.
It uses standard json decode to write values. This allows to mix streaming with
normal encoding. Furthermore, this writer understands numpy values and encodes
them properly.
Arguments:
file: A file like to write to.
pretty: True to indent and use separators.
Raises:
AssertionError: If methods were called in a non JSON fashion. Call :func:`close`
to make sure everything was closed properly.
"""
def __init__(self, file, pretty=False):
self._fp = file
self._pretty = pretty
self._indent = '' # the current indent
self._separators = [''] # a stack of the next necessary separator
self._states = [JSONStreamWriter.START] # a stack of what is currenty open
def _write(self, str):
self._fp.write(str)
def _write_seperator(self):
self._write(self._separators.pop())
def _seperator_with_newline(self, base=None):
pretty_ext = ('\n%s' % self._indent) if self._pretty else ''
if base is None:
return pretty_ext
else:
return '%s%s' % (base, pretty_ext)
def _open(self, open_char):
self._write_seperator()
self._write(open_char)
self._indent = '%s ' % self._indent
self._separators.append(self._seperator_with_newline())
def _close(self, close_char):
self._separators.pop()
self._indent = self._indent[:-2]
self._write(self._seperator_with_newline())
self._write(close_char)
self._separators.append(self._seperator_with_newline(','))
def open_object(self):
assert self._states[-1] != JSONStreamWriter.OBJECT, "Cannot open object in object."
if self._states[-1] == JSONStreamWriter.KEY_VALUE:
self._states.pop()
self._open('{')
self._states.append(JSONStreamWriter.OBJECT)
def close_object(self):
assert self._states.pop() == JSONStreamWriter.OBJECT, "Can only close object in object."
self._close('}')
def open_array(self):
assert self._states[-1] != JSONStreamWriter.OBJECT, "Cannot open array in object."
if self._states[-1] == JSONStreamWriter.KEY_VALUE:
self._states.pop()
self._open('[')
self._states.append(JSONStreamWriter.ARRAY)
def close_array(self):
assert self._states.pop() == JSONStreamWriter.ARRAY, "Can only close array in array."
self._close(']')
def key_value(self, key, value):
self.key(key)
self.value(value)
def key(self, key: str):
assert self._states[-1] == JSONStreamWriter.OBJECT, "Key can only be in objects."
self._write_seperator()
json.dump(key, self._fp)
self._separators.append(': ' if self._pretty else ':')
self._states.append(JSONStreamWriter.KEY_VALUE)
@staticmethod
def _json_serializable_value(value):
if hasattr(value, 'tolist'):
# run tolist of pentential numpy array types
return value.tolist()
else:
return value
def write(self, str):
if self._pretty:
str = str.replace('\n', '\n%s' % self._indent)
self._fp.write(str)
def value(self, value):
assert self._states[-1] != JSONStreamWriter.OBJECT, "Values can not be in objects."
if self._states[-1] == JSONStreamWriter.KEY_VALUE:
self._states.pop()
self._write_seperator()
json.dump(
JSONStreamWriter._json_serializable_value(value), self,
indent=2 if self._pretty else 1,
separators=(', ', ': ') if self._pretty else (',', ':'))
self._separators.append(self._seperator_with_newline(','))
def close(self):
assert self._states[-1] == JSONStreamWriter.START, "Something was not closed."
class LegacyParserBackend(AbstractParserBackend):
"""
Partial implementation of :class:`AbstractParserBackend` that implements some
methods that are independent from the core backend implementation.
"""
def __init__(self, logger):
self.logger = logger if logger is not None else get_logger(__name__)
self.reset_status()
# things that have no real purpos, but are required by some legacy code
self._unknown_attributes = {}
self._known_attributes = ['results']
self.fileOut = io.StringIO()
def startedParsingSession(
self, mainFileUri, parserInfo, parserStatus=None, parserErrors=None):
self.reset_status()
def finishedParsingSession(self, parserStatus, parserErrors, *args, **kwargs):
self._status = parserStatus
self._errors = parserErrors
def pwarn(self, msg):
self.logger.warn(msg)
if len(self._warnings) < 10:
self._warnings.append(msg)
elif len(self._warnings) == 10:
self._warnings.append('There are more warnings, check the processing logs.')
def _parse_context_uri(self, context_uri: str) -> Tuple[str, int]:
"""
Returns the last segment of the given context uri, i.e. the section that
constitutes the context.
"""
path_str = re.sub(r'^(nmd://[^/]+/[^/]+)?/', '', context_uri, count=1)
path = path_str.split('/')[::-1] # reversed path via extended slice syntax
if len(path) == 0:
raise BadContextURI('Uri %s has not path.' % context_uri)
while len(path) > 0:
meta_name = path.pop()
potential_index = path[-1] if len(path) > 0 else 'none'
try:
index = int(potential_index)
path.pop()
except ValueError:
index = 0
return meta_name, index
@property
def status(self) -> ParserStatus:
""" Returns status and potential errors. """
return (self._status, self._errors)
def reset_status(self) -> None:
self._status = 'ParseSuccess'
self._errors = None
self._warnings: List[str] = []
class LocalBackend(LegacyParserBackend, metaclass=DelegatingMeta):
"""
This implementation of :class:`AbstractParserBackend` is a extended version of
NOMAD-coe's ``LocalBackend`` that allows to write the results in an *archive*-style .json.
It can be used like the original thing, but also allows to output archive JSON
after parsing via :func:`write_json`.
"""
def __init__(self, *args, **kwargs):
logger = kwargs.pop('logger', None)
super().__init__(logger=logger)
self._delegate = LegacyLocalBackend(*args, **kwargs)
self.mi2_data: Dict[str, MSection] = {}
self._open_context: Tuple[str, int] = None
self._context_section = None
def __getattr__(self, name):
""" Support for unimplemented and unexpected methods. """
if name not in self._known_attributes and self._unknown_attributes.get(name) is None:
self.logger.debug('Access of unexpected backend attribute/method', attribute=name)
self._unknown_attributes[name] = name
return getattr(self._delegate, name)
def add_mi2_section(self, section: MSection):
""" Allows to mix a metainfo2 style section into backend. """
self.mi2_data[section.m_def.name] = section
def get_mi2_section(self, section_def: MI2Section):
""" Allows to mix a metainfo2 style section into backend. """
return self.mi2_data.get(section_def.name, None)
def finishedParsingSession(self, *args, **kwargs):
super().finishedParsingSession(*args, **kwargs)
self._delegate.finishedParsingSession(*args, **kwargs)
def openSection(self, metaName: str) -> int:
if self._open_context is None:
return self._delegate.openSection(metaName)
else:
assert self._context_section is not None
child_sections = list()
def find_child_sections(section):
for subsections in section.subsections.values():
for subsection in subsections:
if subsection.name == metaName:
child_sections.append(subsection)
find_child_sections(subsection)
find_child_sections(self._context_section)
if len(child_sections) == 0:
return self._delegate.openSection(metaName)
elif len(child_sections) == 1:
index = child_sections[0].gIndex # TODO this also needs to be reversed, on closing sections
self._delegate.sectionManagers[metaName].lastSectionGIndex = index
return index
else:
raise WrongContextState(
'You cannot re-open %s with multiple instances in the context.' % metaName)
def openContext(self, contextUri: str):
if self._open_context is not None:
raise WrongContextState('There is already an open context on this backend.')
meta_name, index = self._parse_context_uri(contextUri)
try:
section_manager = self._delegate.sectionManagers[meta_name]
except KeyError:
raise BadContextURI('The section %s does not exist.' % meta_name)
if section_manager.lastSectionGIndex < index:
raise BadContextURI(
'Last index of section %s is %d, cannot open %d.' %
(meta_name, section_manager.lastSectionGIndex, index))
self._context_section = section_manager.openSections[index]
self._open_context = meta_name, section_manager.lastSectionGIndex
section_manager.lastSectionGIndex = index
def closeContext(self, contextUri):
if self._open_context is None:
raise WrongContextState('There is no context to close on this backend.')
meta_name, old_index = self._open_context
context_meta_name, _ = self._parse_context_uri(contextUri)
if context_meta_name != meta_name:
raise BadContextURI(
'%d is not the URI that his context was opened with.' % contextUri)
self._delegate.sectionManagers[context_meta_name].lastSectionGIndex = old_index
self._open_context = None
self._context_section = None
@property
def data(self) -> Results:
return self._delegate.results
# def get_value(self, meta_name, g_index=-1):
# return self._delegate.results._datamanagers[meta_name].get_value(meta_name, g_index)
# def get_sections(self, meta_name, g_index=-1):
# sections = self._delegate.results[meta_name]
# return [
# section.gIndex for section in sections
# if g_index == -1 or section.parents[0].gIndex == g_index]
def _write(
self, json_writer: JSONStreamWriter, value: Any,
filter: Callable[[str, Any], Any] = None):
if isinstance(value, list):
if len(value) == 1 and isinstance(value[0], Section) and \
not self._delegate.metaInfoEnv().infoKindEl(value[0].name).repeats:
self._write(json_writer, value[0], filter=filter)
else:
json_writer.open_array()
for item in value:
self._write(json_writer, item, filter=filter)
json_writer.close_array()
elif isinstance(value, Section):
section = value
json_writer.open_object()
json_writer.key_value('_name', section.name)
json_writer.key_value('_gIndex', section.gIndex)
for name, value in section.items():
if filter is not None:
value = filter(name, value)
if value is not None:
json_writer.key(name)
self._write(json_writer, value, filter=filter)
json_writer.close_object()
else:
json_writer.value(value)
def _obj(self, value: Any, filter: Callable[[str, Any], Any] = None) -> Any:
if isinstance(value, list):
if len(value) == 1 and isinstance(value[0], Section) and \
not self._delegate.metaInfoEnv().infoKindEl(value[0].name).repeats:
return self._obj(value[0], filter=filter)
else:
return [self._obj(item, filter=filter) for item in value]
elif isinstance(value, Section):
section = value
obj = dict(_name=section.name, _gIndex=section.gIndex)
for name, value in section.items():
if filter is not None:
value = filter(name, value)
if value is not None:
obj[name] = self._obj(value, filter=filter)
return obj
else:
return JSONStreamWriter._json_serializable_value(value)
def write_json(
self, out: TextIO, pretty=True, filter: Callable[[str, Any], Any] = None,
root_sections: List[str] = ['section_run', 'section_entry_info']):
"""
Writes the results stored in the backend after parsing in an 'archive'.json
style format.
Arguments:
out: The file-like that is used to write the json to.
pretty: Format the json or not.
filter: Optional filter that takes metaname, value pairs and returns a new value.
"""
json_writer = JSONStreamWriter(out, pretty=pretty)
json_writer.open_object()
# TODO the root sections should be determined programatically
for root_section in root_sections:
json_writer.key(root_section)
self._write(json_writer, self._delegate.results[root_section], filter=filter)
for name, section in self.mi2_data.items():
json_writer.key_value(name, section.m_to_dict())
json_writer.close_object()
json_writer.close()
def traverse(self, *args, **kwargs):
return self._delegate.results.traverse(*args, **kwargs)
def __repr__(self):
def filter(name, value):
if name.startswith('section_'):
return value
if name.startswith('x_'):
return None
if getattr(value, 'tolist', None) or isinstance(value, list):
return '<some array>'
else:
return value
out = StringIO()
self.write_json(JSONStreamWriter(out), filter=filter)
return out.getvalue()