Commit a8513c50 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Backend refactor

parent 07effaf8
......@@ -28,7 +28,8 @@ import os
import signal
from nomad import metainfo
from nomad.datamodel.metainfo import m_env as general_nomad_metainfo_env
from nomad.datamodel import EntryArchive
from nomad.datamodel.metainfo.common import section_run as Run
from .legacy import Backend
from .parser import Parser, MatchingParser
......@@ -40,8 +41,8 @@ class ArtificalParser(Parser):
super().__init__()
self.backend = None
def init_backend(self):
self.backend = Backend(metainfo='vasp')
def init_backend(self, **kwargs):
self.backend = Backend(metainfo='vasp', **kwargs)
class EmptyParser(MatchingParser):
......@@ -50,12 +51,9 @@ class EmptyParser(MatchingParser):
'''
name = "parsers/empty"
def run(self, mainfile: str, logger=None) -> Backend:
backend = Backend(metainfo=general_nomad_metainfo_env, domain=self.domain, logger=logger)
backend.openSection('section_run')
backend.addValue('program_name', self.code_name)
backend.closeSection('section_run', 0)
return backend
def parse(self, mainfile: str, archive: EntryArchive, logger=None) -> None:
run = archive.m_create(Run)
run.program_name = self.code_name
class TemplateParser(ArtificalParser):
......@@ -109,12 +107,12 @@ class TemplateParser(ArtificalParser):
self.backend.closeSection(name, index)
def run(self, mainfile: str, logger=None) -> Backend:
def parse(self, mainfile: str, archive: EntryArchive, logger=None) -> None:
# tell tests about received logger
if logger is not None:
logger.debug('received logger')
self.init_backend()
self.init_backend(entry_archive=archive)
if 'warning' in mainfile:
self.backend.pwarn('A test warning.')
......@@ -125,7 +123,6 @@ class TemplateParser(ArtificalParser):
self.add_section(template_json['section_workflow'])
self.backend.finishedParsingSession('ParseSuccess', [])
logger.debug('a test log entry')
return self.backend
class ChaosParser(ArtificalParser):
......@@ -146,8 +143,8 @@ class ChaosParser(ArtificalParser):
compression: str = None) -> bool:
return filename.endswith('chaos.json')
def run(self, mainfile: str, logger=None) -> Backend:
self.init_backend()
def parse(self, mainfile: str, archive: EntryArchive, logger=None) -> None:
self.init_backend(entry_archive=archive)
chaos_json = json.load(open(mainfile, 'r'))
if isinstance(chaos_json, str):
......@@ -242,16 +239,15 @@ class GenerateRandomParser(TemplateParser):
else:
return value
def run(self, mainfile: str, logger=None) -> Backend:
def parse(self, mainfile: str, archive: EntryArchive, logger=None) -> None:
# tell tests about received logger
if logger is not None:
logger.debug('received logger')
self.init_backend()
self.init_backend(entry_archive=archive)
seed = int(os.path.basename(mainfile).split('_')[1])
random.seed(seed)
numpy.random.seed(seed)
section = self.template['section_run'][0]
self.add_section(section)
self.backend.finishedParsingSession('ParseSuccess', [])
return self.backend
......@@ -14,7 +14,7 @@
'''
This module contains functionality to use old 'legacy' NOMAD CoE parsers with the
new nomad@fairdi infrastructure. This covers aspects like the new metainfo, a unifying
new nomad@fairdi infrastructure. This covers aspects like the old metainfo, a unifying
wrapper for parsers, parser logging, and a parser backend.
'''
......@@ -41,136 +41,7 @@ class BackendError(Exception):
pass
class AbstractParserBackend(metaclass=ABCMeta):
'''
This ABS provides the parser backend interface used by the NOMAD-coe parsers.
'''
@abstractmethod
def metaInfoEnv(self):
''' Returns the meta info used by this backend. '''
pass
@abstractmethod
def startedParsingSession(
self, mainFileUri, parserInfo, parserStatus=None, parserErrors=None):
'''
Should be called when the parsing starts.
ParserInfo should be a valid json dictionary.
'''
pass
@abstractmethod
def finishedParsingSession(
self, parserStatus, parserErrors, mainFileUri=None, parserInfo=None,
parsingStats=None):
''' Called when the parsing finishes. '''
pass
@abstractmethod
def openSection(self, metaName, parent_index=-1):
''' Opens a new section and returns its new unique gIndex. '''
pass
@abstractmethod
def closeSection(self, metaName, gIndex):
'''
Closes the section with the given meta name and index. After this, no more
value can be added to this section.
'''
pass
@abstractmethod
def openNonOverlappingSection(self, metaName):
''' Opens a new non overlapping section. '''
pass
@abstractmethod
def setSectionInfo(self, metaName, gIndex, references):
'''
Sets info values of an open section references should be a dictionary with the
gIndexes of the root sections this section refers to.
'''
pass
@abstractmethod
def closeNonOverlappingSection(self, metaName):
'''
Closes the current non overlapping section for the given meta name. After
this, no more value can be added to this section.
'''
pass
@abstractmethod
def openSections(self):
''' Returns the sections that are still open as metaName, gIndex tuples. '''
pass
@abstractmethod
def addValue(self, metaName, value, gIndex=-1):
'''
Adds a json value for the given metaName. The gIndex is used to identify
the right parent section.
'''
pass
@abstractmethod
def addRealValue(self, metaName, value, gIndex=-1):
'''
Adds a float value for the given metaName. The gIndex is used to identify
the right parent section.
'''
pass
@abstractmethod
def addArray(self, metaName, shape, gIndex=-1):
'''
Adds an unannitialized array of the given shape for the given metaName.
The gIndex is used to identify the right parent section.
This is neccessary before array values can be set with :func:`setArrayValues`.
'''
@abstractmethod
def setArrayValues(self, metaName, values, offset=None, gIndex=-1):
'''
Adds values of the given numpy array to the last array added for the given
metaName and parent gIndex.
'''
pass
@abstractmethod
def addArrayValues(self, metaName, values, gIndex=-1, override: bool = False):
'''
Adds an array with the given numpy array values for the given metaName and
parent section gIndex. Override determines whether to rewrite exisiting values
in the backend.
'''
pass
@abstractmethod
def pwarn(self, msg):
''' Used to catch parser warnings. '''
pass
@abstractmethod
def get_sections(self, meta_name: str, g_index: int = -1) -> List[int]:
''' Return all gIndices for existing sections of the given meta_name and parent section index. '''
pass
@abstractmethod
def get_value(self, metaName: str, g_index=-1) -> Any:
'''
Return the value set to the given meta_name in its parent section of the given index.
An index of -1 (default) is only allowed if there is exactly one parent section.
'''
pass
@abstractmethod
def __getitem__(self, key):
pass
class Backend(AbstractParserBackend):
class Backend():
'''
A backend that uses the new metainfo to store all data.
......@@ -195,7 +66,7 @@ class Backend(AbstractParserBackend):
def __init__(
self, metainfo: Union[str, LegacyMetainfoEnvironment], domain: str = None,
logger=None):
entry_archive: datamodel.EntryArchive = None, logger=None):
assert metainfo is not None
if logger is None:
......@@ -212,7 +83,7 @@ class Backend(AbstractParserBackend):
self.env: LegacyMetainfoEnvironment = cast(LegacyMetainfoEnvironment, metainfo)
self.__legacy_env = None
self.resource = MResource(logger=logger)
self.entry_archive = datamodel.EntryArchive()
self.entry_archive = datamodel.EntryArchive() if entry_archive is None else entry_archive
self.resource.add(self.entry_archive)
self.strict = False # TODO
......@@ -255,12 +126,14 @@ class Backend(AbstractParserBackend):
return section.m_get_sub_sections(property_def)
def metaInfoEnv(self):
''' Returns the meta info used by this backend. '''
if self.__legacy_env is None:
self.__legacy_env = self.env.legacy_info_env()
return self.__legacy_env
def resolve_definition(self, name, section_cls: Type[MSectionBound]) -> MSectionBound:
definition = self.env.from_legacy_name(name, section_cls)
if definition:
return definition
......@@ -269,6 +142,8 @@ class Backend(AbstractParserBackend):
def openSection(self, name, parent_index: int = -1, return_section=False):
'''
Opens a new section and returns its new unique gIndex.
It will assume that there is a sub-section def with the given name.
It will use the latest opened section of the sub-sections parent as the parent
for the new section.
......@@ -311,9 +186,14 @@ class Backend(AbstractParserBackend):
return section, quantity_def
def closeSection(self, name, g_index):
'''
Closes the section with the given meta name and index. After this, no more
value can be added to this section.
'''
pass
def openNonOverlappingSection(self, metaName):
''' Opens a new non overlapping section. '''
return self.openSection(metaName)
def setSectionInfo(self, metaName, gIndex, references):
......@@ -325,6 +205,10 @@ class Backend(AbstractParserBackend):
pass
def closeNonOverlappingSection(self, name):
'''
Closes the current non overlapping section for the given meta name. After
this, no more value can be added to this section.
'''
return self.closeSection(name, -1)
def openSections(self):
......@@ -335,6 +219,10 @@ class Backend(AbstractParserBackend):
# yield section_def.name, sub_section.m_parent_index
def addValue(self, name, value, g_index=-1):
'''
Adds a json value for the given metaName. The gIndex is used to identify
the right parent section.
'''
section, quantity_def = self.get_open_section_for_quantity(name, g_index)
if isinstance(quantity_def.type, Reference):
# quantity is a reference
......@@ -349,6 +237,10 @@ class Backend(AbstractParserBackend):
setattr(section, name, value)
def addRealValue(self, name, value, g_index=-1):
'''
Adds a float value for the given metaName. The gIndex is used to identify
the right parent section.
'''
self.addValue(name, value, g_index)
def addArray(self, name, shape, g_index=-1):
......@@ -420,11 +312,16 @@ class Backend(AbstractParserBackend):
def startedParsingSession(
self, mainFileUri, parserInfo, parserStatus=None, parserErrors=None):
'''
Should be called when the parsing starts.
ParserInfo should be a valid json dictionary.
'''
self.reset_status()
def finishedParsingSession(
self, parserStatus, parserErrors, mainFileUri=None, parserInfo=None,
parsingStats=None):
''' Called when the parsing finishes. '''
self._status = parserStatus
self._errors = parserErrors
......@@ -433,6 +330,7 @@ class Backend(AbstractParserBackend):
pass
def pwarn(self, msg):
''' Used to catch parser warnings. '''
self.logger.warn(msg)
if len(self._warnings) < 10:
self._warnings.append(msg)
......@@ -488,7 +386,7 @@ class LegacyParser(MatchingParser):
return self.__parser_class
def run(self, mainfile: str, logger=None) -> Backend:
def parse(self, mainfile: str, archive: datamodel.EntryArchive, logger=None):
# TODO we need a homogeneous interface to parsers, but we dont have it right now.
# There are some hacks to distinguish between ParserInterface parser and simple_parser
# using hasattr, kwargs, etc.
......@@ -497,30 +395,34 @@ class LegacyParser(MatchingParser):
if issubclass(self.parser_class, CoEParser):
# TODO reuse parser
# TODO remove, this whole mechanism is only used by wien2k
parser = self.parser_class() # pylint: disable=not-callable
return parser.run(mainfile, logger)
backend = parser.run(mainfile, logger=logger, entry_archive=archive)
def create_backend(meta_info):
if self.backend_factory is not None:
return self.backend_factory(meta_info, logger=logger)
else:
def create_backend(meta_info):
if self.backend_factory is not None:
return self.backend_factory(meta_info, logger=logger)
return Backend(meta_info, logger=logger, domain=self.domain)
return Backend(meta_info, logger=logger, domain=self.domain, entry_archive=archive)
init_signature = inspect.getargspec(self.parser_class.__init__)
kwargs = dict(backend=create_backend, log_level=logging.DEBUG, debug=True)
kwargs = {key: value for key, value in kwargs.items() if key in init_signature.args}
init_signature = inspect.getargspec(self.parser_class.__init__)
kwargs = dict(backend=create_backend, log_level=logging.DEBUG, debug=True)
kwargs = {key: value for key, value in kwargs.items() if key in init_signature.args}
with utils.legacy_logger(logger):
self.parser = self.parser_class(**kwargs) # pylint: disable=not-callable
with utils.legacy_logger(logger):
self.parser = self.parser_class(**kwargs) # pylint: disable=not-callable
with patch.object(sys, 'argv', []):
backend = self.parser.parse(mainfile)
os.chdir(config.fs.working_directory)
with patch.object(sys, 'argv', []):
backend = self.parser.parse(mainfile)
if backend is None or not hasattr(backend, 'status'):
backend = self.parser.parser_context.super_backend
os.chdir(config.fs.working_directory)
return backend
if backend is None or not hasattr(backend, 'status'):
backend = self.parser.parser_context.super_backend
if backend.status[0] != 'ParseSuccess':
raise Exception(backend.status[1])
class CoEParser(metaclass=ABCMeta):
......@@ -583,9 +485,9 @@ class CoESimpleMatcherParser(CoEParser):
def create_super_context(self):
pass
def simple_parser(self, mainfile, logger) -> Backend:
def simple_parser(self, mainfile, **kwargs) -> Backend:
from nomadcore.simple_parser import mainFunction
backend = Backend(self._metainfo_env, logger=logger)
backend = Backend(self._metainfo_env, **kwargs)
from unittest.mock import patch
with patch.object(sys, 'argv', ['<exe>', '--uri', 'nmd://uri', mainfile]):
mainFunction(
......@@ -598,9 +500,9 @@ class CoESimpleMatcherParser(CoEParser):
return backend
def run(self, mainfile, logger) -> Backend:
def run(self, mainfile, logger, **kwargs) -> Backend:
with utils.legacy_logger(logger):
return self.simple_parser(mainfile, logger)
return self.simple_parser(mainfile, logger=logger, **kwargs)
class VaspOutcarParser(LegacyParser):
......
......@@ -15,7 +15,6 @@
from typing import List
from abc import ABCMeta, abstractmethod
import re
import importlib
from nomad.metainfo import Environment
from nomad.datamodel import EntryArchive
......@@ -25,11 +24,6 @@ class Parser(metaclass=ABCMeta):
'''
Instances specify a parser. It allows to find *main files* from given uploaded
and extracted files. Further, allows to run the parser on those 'main files'.
TODO: There are currently two "run" functions. :func:`run` and :func:`parse`.
Because we are in the middle of transitioning out of the backend dependence we currently
have both, where 'run' creates a backend and 'parse' simply gets an archive that the
parser is supposed to populate. Eventually, we will only have the 'parse' function.
'''
name = "parsers/parser"
......@@ -57,19 +51,6 @@ class Parser(metaclass=ABCMeta):
pass
@abstractmethod
def run(self, mainfile: str, logger=None):
'''
Runs the parser on the given mainfile. It uses :class:`Backend` as
a backend. The meta-info access is handled by the underlying NOMAD-coe parser.
Args:
mainfile: A path to a mainfile that this parser can parse.
logger: A optional logger
Returns:
The used :class:`Backend` with status information and result data.
'''
def parse(self, mainfile: str, archive: EntryArchive, logger=None) -> None:
'''
Runs the parser on the given mainfile and populates the result in the given
......@@ -110,7 +91,7 @@ class BrokenParser(Parser):
return False
def run(self, mainfile: str, logger=None):
def parse(self, mainfile: str, archive, logger=None):
raise Exception('Failed on purpose.')
......@@ -175,14 +156,6 @@ class MatchingParser(Parser):
class FairdiParser(MatchingParser):
def run(self, mainfile: str, logger=None):
from .legacy import Backend
python_module = importlib.import_module(self.__module__ + '.metainfo')
metainfo = getattr(python_module, 'm_env')
backend = Backend(metainfo, domain=self.domain, logger=logger)
self.parse(mainfile, backend.entry_archive, logger=logger)
return backend
def parse(self, mainfile: str, archive: EntryArchive, logger=None):
raise NotImplementedError()
......@@ -203,5 +176,5 @@ class MissingParser(MatchingParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def run(self, mainfile: str, logger=None):
def parse(self, mainfile: str, archive: EntryArchive, logger=None):
raise Exception('The code %s is not yet supported.' % self.code_name)
......@@ -36,17 +36,15 @@ import hashlib
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
from nomad import utils, config, infrastructure, search, datamodel
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
from nomad.files import (
PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
PublicUploadFiles, StagingUploadFiles)
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import Backend
from nomad.parsing.parsers import parser_dict, match_parser
from nomad.normalizing import normalizers
from nomad.datamodel import EntryArchive
from nomad.archive import query_archive
from nomad.datamodel.encyclopedia import (
EncyclopediaMetadata,
)
import phonopyparser.metainfo
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
section_metadata = datamodel.EntryArchive.section_metadata.name
......@@ -119,7 +117,7 @@ class Calc(Proc):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._parser_backend: Backend = None
self._parser_results: EntryArchive = None
self._upload: Upload = None
self._upload_files: ArchiveBasedStagingUploadFiles = None
self._calc_proc_logs: List[Any] = None
......@@ -306,8 +304,8 @@ class Calc(Proc):
finally:
# close loghandler that was not closed due to failures
try:
if self._parser_backend and self._parser_backend.resource:
self._parser_backend.resource.unload()
if self._parser_results and self._parser_results.m_resource:
self._parser_results.m_resource.unload()
except Exception as e:
logger.error('could unload processing results', exc_info=e)
......@@ -343,8 +341,8 @@ class Calc(Proc):
finally:
# close loghandler that was not closed due to failures
try:
if self._parser_backend and self._parser_backend.resource:
self._parser_backend.resource.unload()
if self._parser_results and self._parser_results.m_resource:
self._parser_results.m_resource.unload()
except Exception as e:
logger.error('could unload processing results', exc_info=e)
......@@ -358,11 +356,9 @@ class Calc(Proc):
self._entry_metadata.processed = False
self.apply_entry_metadata(self._entry_metadata)
if self._parser_backend and self._parser_backend.resource:
backend = self._parser_backend
else:
backend = None
self._entry_metadata.apply_domain_metadata(backend)
self._entry_metadata.apply_domain_metadata(self._parser_results)
if self._parser_results and self._parser_results.m_resource:
self._parser_results.m_resource.unload()
self._entry_metadata.a_elastic.index()
except Exception as e:
......@@ -393,8 +389,10 @@ class Calc(Proc):
with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
try:
self._parser_backend = parser.run(
self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
self._parser_results = datamodel.EntryArchive()
parser.parse(
self.upload_files.raw_file_object(self.mainfile).os_path,
self._parser_results, logger=logger)
except Exception as e:
self.fail('parser failed with exception', exc_info=e, error=str(e), **context)
......@@ -403,10 +401,6 @@ class Calc(Proc):
self.fail('parser raised system exit', error='system exit', **context)
return
if self._parser_backend.status[0] != 'ParseSuccess':
error = self._parser_backend.status[1]
self.fail('parser failed', error=error, **context)
def process_phonon(self):
"""Function that is run for phonon calculation before cleanup.
This task is run by the celery process that is calling the join for the
......@@ -427,15 +421,13 @@ class Calc(Proc):
self._entry_metadata = phonon_archive.section_metadata
self._calc_proc_logs = phonon_archive.processing_logs
# Re-create a backend
metainfo = phonopyparser.metainfo.m_env
self._parser_backend = Backend(metainfo, logger=logger, domain="dft")
self._parser_backend.entry_archive = phonon_archive
# Re-create the parse results
self._parser_results = phonon_archive
# Read in the first referenced calculation. The reference is given as
# an absolute path which needs to be converted into a path that is
# relative to upload root.
scc = self._parser_backend.entry_archive.section_run[0].section_single_configuration_calculation[0]
scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
ref_id = upload_files.calc_id(relative_ref)
with upload_files.read_archive(ref_id) as archive:
......@@ -446,7 +438,7 @@ class Calc(Proc):
ref_enc_method = ref_archive.section_metadata.encyclopedia.method
if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
raise ValueError("No method information available in referenced calculation.")
self._parser_backend.entry_archive.section_metadata.encyclopedia.method = ref_enc_method
self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
# Overwrite old entry with new data. The metadata is updated with
# new timestamp and method details taken from the referenced
......@@ -483,39 +475,15 @@ class Calc(Proc):
logger, 'archived', step='archive',
input_size=self.mainfile_file.size) as log_data: