Commit 215a1934 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'parser-move' into 'v0.10.0'

Parser move

See merge request !280
parents d8984d35 373fd704
Pipeline #95607 passed with stages
in 22 minutes and 48 seconds
......@@ -54,6 +54,6 @@ normalizers: Iterable[Type[Normalizer]] = [
# FhiAimsBaseNormalizer,
DosNormalizer,
BandStructureNormalizer,
EncyclopediaNormalizer,
WorkflowNormalizer,
EncyclopediaNormalizer,
]
......@@ -47,6 +47,23 @@ class EncyclopediaNormalizer(Normalizer):
calc_enums = Calculation.calculation_type.type
calc_type = calc_enums.unavailable
# Primarily try to determine the calculation type from workflow
# information
try:
workflow = self.entry_archive.section_workflow
workflow_map = {
"molecular_dynamics": calc_enums.molecular_dynamics,
"geometry_optimization": calc_enums.geometry_optimization,
"phonon": calc_enums.phonon_calculation,
}
workflow_enum = workflow_map.get(workflow.workflow_type)
if workflow_enum is not None:
calc.calculation_type = workflow_enum
return workflow_enum
except Exception:
pass
# Fall back to old frame sequence data
try:
sccs = self.section_run.section_single_configuration_calculation
except Exception:
......
......@@ -389,7 +389,11 @@ class MaterialBulkNormalizer(MaterialNormalizer):
std_atoms = symmetry_analyzer.get_conventional_system()
prim_atoms = symmetry_analyzer.get_primitive_system()
repr_atoms = sec_system.m_cache["representative_atoms"] # Temporary value stored by SystemNormalizer
wyckoff_sets = symmetry_analyzer.get_wyckoff_sets_conventional(return_parameters=True)
try:
wyckoff_sets = symmetry_analyzer.get_wyckoff_sets_conventional(return_parameters=True)
except Exception:
self.logger.error('Error resolving Wyckoff sets.')
wyckoff_sets = []
names, counts = atomutils.get_hill_decomposition(prim_atoms.get_chemical_symbols(), reduced=False)
greatest_common_divisor = reduce(gcd, counts)
context.greatest_common_divisor = greatest_common_divisor
......@@ -509,7 +513,12 @@ class Material2DNormalizer(MaterialNormalizer):
sec_enc = self.entry_archive.section_metadata.encyclopedia
material = sec_enc.material
repr_atoms = context.representative_system.m_cache["representative_atoms"] # Temporary value stored by SystemNormalizer
symmetry_analyzer = self.get_symmetry_analyzer(repr_atoms)
try:
symmetry_analyzer = self.get_symmetry_analyzer(repr_atoms)
except Exception:
self.logger.error('Error setting up symmetry analyzer.')
return
spg_number = symmetry_analyzer.get_space_group_number()
wyckoff_sets = symmetry_analyzer.get_wyckoff_sets_conventional(return_parameters=False)
std_atoms = symmetry_analyzer.get_conventional_system()
......@@ -560,6 +569,8 @@ class Material1DNormalizer(MaterialNormalizer):
def lattice_parameters(self, ideal: IdealizedStructure, std_atoms: Atoms, periodicity: NDArray) -> None:
# 1D systems only have one lattice parameter: length in periodic dimension
periodic_indices = np.where(np.array(periodicity) == True)[0] # noqa: E712
if len(periodic_indices) == 0:
return
cell = std_atoms.get_cell()
a = np.linalg.norm(cell[periodic_indices[0], :]) * 1e-10
params = ideal.m_create(LatticeParameters)
......@@ -575,7 +586,7 @@ class Material1DNormalizer(MaterialNormalizer):
# If one axis is not periodic, return. This only happens if the vacuum
# gap is not aligned with a cell vector.
if sum(periodicity) != 1:
raise ValueError("Could not detect the periodic dimensions in a 1D system.")
self.logger.error("Could not detect the periodic dimensions in a 1D system.")
ideal.periodicity = periodicity
......
......@@ -93,22 +93,34 @@ class SystemBasedNormalizer(Normalizer, metaclass=ABCMeta):
system = None
scc = None
# Try to find workflow information and select the representative system
# based on it
workflow = self.entry_archive.section_workflow
if workflow:
try:
iscc = workflow.calculation_result_ref
system = scc.single_configuration_calculation_to_system_ref
if system is not None:
scc = iscc
except Exception:
pass
# Try to find a frame sequence, only first found is considered
try:
frame_seqs = self.section_run.section_frame_sequence
frame_seq = frame_seqs[0]
sec_sampling_method = frame_seq.frame_sequence_to_sampling_ref
sampling_method = sec_sampling_method.sampling_method
frames = frame_seq.frame_sequence_local_frames_ref
if sampling_method == "molecular_dynamics":
iscc = frames[0]
else:
iscc = frames[-1]
system = iscc.single_configuration_calculation_to_system_ref
if system is not None:
scc = iscc
except Exception:
pass
else:
try:
frame_seqs = self.section_run.section_frame_sequence
frame_seq = frame_seqs[0]
sec_sampling_method = frame_seq.frame_sequence_to_sampling_ref
sampling_method = sec_sampling_method.sampling_method
frames = frame_seq.frame_sequence_local_frames_ref
if sampling_method == "molecular_dynamics":
iscc = frames[0]
else:
iscc = frames[-1]
system = iscc.single_configuration_calculation_to_system_ref
if system is not None:
scc = iscc
except Exception:
pass
# If no frame sequences detected, try to find valid scc by looping all
# available in reverse order until a valid one is found.
......
......@@ -134,6 +134,8 @@ class PhononNormalizer(Normalizer):
def _get_n_imaginary_frequencies(self):
scc = self.section_run.section_single_configuration_calculation
if not scc:
return
sec_band = scc[0].section_k_band
result = 0
for band_segment in sec_band[0].section_k_band_segment:
......@@ -303,7 +305,11 @@ class WorkflowNormalizer(Normalizer):
self._phonon_programs = ['phonopy']
def _resolve_workflow_type_vasp(self):
ibrion = self.section_run.section_method[0].x_vasp_incarOut_IBRION
try:
ibrion = self.section_run.section_method[0].x_vasp_incarOut_IBRION
except Exception:
ibrion = 1
if ibrion == 0:
workflow_type = "molecular_dynamics"
else:
......
......@@ -17,6 +17,9 @@ import os
import logging
import pint
from typing import Any, Dict
import gzip
import bz2
import lzma
class FileParser:
......@@ -67,12 +70,27 @@ class FileParser:
self._file_handler = None
self._mainfile = os.path.abspath(val) if val is not None else val
@property
def open(self):
if self.mainfile.endswith('.gz'):
open_file = gzip.open
elif self.mainfile.endswith('.bz2'):
open_file = bz2.open
elif self.mainfile.endswith('.xz'):
open_file = lzma.open
else:
open_file = open
return open_file
def get(self, key: str, default: Any = None, unit: str = None, **kwargs):
'''
Returns the parsed result for quantity with name key. If quantity is not in
results default will be returned. A pint unit can be provided which is attached
to the returned value.
'''
if self.mainfile is None:
return default
self._key = key
self._kwargs = kwargs
val = self.results.get(key, None)
......
......@@ -15,6 +15,7 @@
import logging
import mmap
import io
import re
import numpy as np
import pint
......@@ -103,7 +104,7 @@ class Quantity:
self.shape = None
elif isinstance(quantity, mQuantity):
self.name = quantity.name
self.dtype = quantity.type
self.dtype = quantity.type.type if isinstance(quantity.type, np.dtype) else quantity.type
self.unit = quantity.unit
# check if metainfo shape has dependencies
self.shape = quantity.shape
......@@ -177,7 +178,6 @@ class Quantity:
except Exception:
pass
self.shape = [] if self.shape is None else self.shape
return val
elif type(val) in [np.ndarray, list]:
......@@ -187,7 +187,6 @@ class Quantity:
if self.dtype is None:
if np.all(np.mod(val_test, 1) == 0):
val_test = np.array(val_test, dtype=int)
self.shape = list(np.shape(val)) if self.shape is None else self.shape
val = val_test
except Exception:
......@@ -197,13 +196,10 @@ class Quantity:
elif isinstance(val, dict):
for k, v in val.items():
self.dtype = None
val[k] = _convert(v)
return val
else:
self.dtype = type(val)
self.shape = [] if self.shape is None else self.shape
return val
if self.convert:
......@@ -342,12 +338,15 @@ class TextParser(FileParser):
Memory mapped representation of the file.
'''
if self._file_handler is None:
with open(self.mainfile) as f:
self._file_handler = mmap.mmap(
f.fileno(), self._file_length, access=mmap.ACCESS_COPY,
offset=self._file_offset)
# set the extra chunk loaded before the intended offset to empty
self._file_handler[:self._file_pad] = b' ' * self._file_pad
with self.open(self.mainfile) as f:
if isinstance(f, io.TextIOWrapper):
self._file_handler = mmap.mmap(
f.fileno(), self._file_length, access=mmap.ACCESS_COPY,
offset=self._file_offset)
# set the extra chunk loaded before the intended offset to empty
self._file_handler[:self._file_pad] = b' ' * self._file_pad
else:
self._file_handler = f.read()
self._file_pad = 0
return self._file_handler
......@@ -416,7 +415,7 @@ class TextParser(FileParser):
self._results[quantities[i].name] = value_processed
except Exception:
self.logger.warn('Error setting value for %s ' % quantities[i].name)
self.logger.warn('Error setting value', data=dict(quantity=quantities[i].name))
pass
def _parse_quantity(self, quantity):
......@@ -430,6 +429,7 @@ class TextParser(FileParser):
span = np.array(res.span()) + self.file_offset
sub_parser = quantity._sub_parser.copy()
sub_parser.mainfile = self.mainfile
sub_parser.logger = self.logger
if (span[1] - span[0]) < mmap.PAGESIZE or True:
# self.logger.warn(
# 'Cannot use sub parser on quantity %s with blocks with size <'
......@@ -452,6 +452,7 @@ class TextParser(FileParser):
span = np.array(res.span()) + self.file_offset
sub_parser = quantity._sub_parser.copy()
sub_parser.mainfile = self.mainfile
sub_parser.logger = self.logger
if (span[1] - span[0]) < mmap.PAGESIZE or True:
# self.logger.warn(
# 'Cannot use sub parser on quantity %s with blocks with size <'
......@@ -488,7 +489,7 @@ class TextParser(FileParser):
self._results[quantity.name] = value_processed
except Exception:
self.logger.warn('Error setting value for %s ' % quantity.name)
self.logger.warn('Error setting value', data=dict(quantity=quantity.name))
pass
def parse(self, key=None):
......
......@@ -17,6 +17,7 @@ import os
import re
import numpy as np
from xml.etree import ElementTree
from lxml import etree
from nomad.parsing.file_parser import FileParser
......@@ -49,7 +50,17 @@ class XMLParser(FileParser):
if self._file_handler is None:
if self.mainfile is None:
return
self._file_handler = ElementTree.parse(self.mainfile).getroot()
try:
self._file_handler = ElementTree.parse(self.open(self.mainfile)).getroot()
except Exception:
self.logger.error('failed to load xml file')
try:
# I cannot use the lxml XMLParser directly because it is not compatible with
# the ElementTree implementation.
xml = etree.parse(self.open(self.mainfile), parser=etree.XMLParser(recover=True))
self._file_handler = ElementTree.fromstring(etree.tostring(xml))
except Exception:
pass
self.init_parameters()
return self._file_handler
......@@ -69,12 +80,13 @@ class XMLParser(FileParser):
Parse a quantity identified by key or an xpath-style path. Automatic conversion
can be switch off by setting convert to False.
'''
_convert = convert if convert is not None else self.convert
_convert = convert if convert is not None else self._kwargs.get('convert', None)
_convert = _convert if _convert is not None else self.convert
if self._results is None:
self._results = dict()
if not self.root:
return
return self
key_in = key
key = key.lstrip('/')
......@@ -100,7 +112,7 @@ class XMLParser(FileParser):
val.append(element.attrib)
if not val:
return
return self
def convert_value(val_in):
if isinstance(val_in, dict):
......@@ -146,3 +158,4 @@ class XMLParser(FileParser):
val = val[0] if len(val) == 1 else val
self._results[key_in] = val
return self
......@@ -21,7 +21,7 @@ import os.path
from nomad import config, datamodel
from .parser import MissingParser, BrokenParser, Parser, ArchiveParser
from .legacy import LegacyParser, VaspOutcarParser
from .legacy import LegacyParser
from .artificial import EmptyParser, GenerateRandomParser, TemplateParser, ChaosParser
from eelsdbconverter import EELSApiJsonConverter
......@@ -129,12 +129,6 @@ parsers = [
ChaosParser(),
PhonopyParser(),
VASPParser(),
VaspOutcarParser(
name='parsers/vasp-outcar', code_name='VASP', code_homepage='https://www.vasp.at/',
parser_class_name='vaspparser.VaspOutcarParser',
mainfile_name_re=r'(.*/)?OUTCAR(\.[^\.]*)?',
mainfile_contents_re=(r'^\svasp\.')
),
ExcitingParser(),
FHIAimsParser(),
LegacyParser(
......
......@@ -337,7 +337,7 @@ class Proc(Document, metaclass=ProcMetaclass):
self.get_logger().info('started process')
else:
self.current_task = task
self.get_logger().info('successfully completed task')
self.get_logger().info('task completed successfully')
self.save()
return True
......@@ -586,7 +586,8 @@ def proc_task(task, cls_name, self_id, func_attr):
try:
self.process_status = PROCESS_RUNNING
os.chdir(config.fs.working_directory)
deleted = func(self)
with utils.timer(logger, 'process executed on worker'):
deleted = func(self)
except SoftTimeLimitExceeded as e:
logger.error('exceeded the celery task soft time limit')
self.fail(e)
......
......@@ -45,7 +45,7 @@ from functools import lru_cache
import urllib.parse
import requests
from nomad import utils, config, infrastructure, search, datamodel, metainfo
from nomad import utils, config, infrastructure, search, datamodel, metainfo, parsing
from nomad.files import (
PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles,
PublicUploadFiles, StagingUploadFiles)
......@@ -55,7 +55,7 @@ from nomad.normalizing import normalizers
from nomad.datamodel import (
EntryArchive, EditableUserMetadata, OasisMetadata, UserProvidableMetadata)
from nomad.archive import (
query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo)
write_partial_archive_to_mongo, delete_partial_archives_from_mongo)
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
......@@ -268,6 +268,11 @@ class Calc(Proc):
dump_dict.update(level=method_name.upper())
self._calc_proc_logs.append(dump_dict)
if method_name == 'error':
error = event_dict.get('event', None)
if error is not None:
self._entry_metadata.processing_errors.append(error)
except Exception:
# Exceptions here will cause indefinite loop
pass
......@@ -283,27 +288,34 @@ class Calc(Proc):
instead of creating it initially, we are just updating the existing
records.
'''
parser = match_parser(self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
logger = self.get_logger()
if parser is None and not config.reprocess_unmatched:
self.errors = ['no parser matches during re-process, will not re-process this calc']
if config.reprocess_rematch:
with utils.timer(logger, 'parser matching executed'):
parser = match_parser(
self.upload_files.raw_file_object(self.mainfile).os_path, strict=False)
try:
upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
with upload_files.read_archive(self.calc_id) as archive:
self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())
if parser is None and not config.reprocess_unmatched:
self.errors = ['no parser matches during re-process, will not re-process this calc']
except Exception as e:
logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
raise e
# mock the steps of actual processing
self._continue_with('parsing')
self._continue_with('normalizing')
self._continue_with('archiving')
self._complete()
return
try:
upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
with upload_files.read_archive(self.calc_id) as archive:
self.upload_files.write_archive(self.calc_id, archive[self.calc_id].to_dict())
except Exception as e:
logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
raise e
# mock the steps of actual processing
self._continue_with('parsing')
self._continue_with('normalizing')
self._continue_with('archiving')
self._complete()
return
else:
parser = parser_dict.get(self.parser)
if parser is None:
self.get_logger().warn('no parser matches during re-process, use the old parser')
......@@ -326,6 +338,7 @@ class Calc(Proc):
self._entry_metadata.nomad_version = config.meta.version
self._entry_metadata.nomad_commit = config.meta.commit
self._entry_metadata.last_processing = datetime.utcnow()
self._entry_metadata.processing_errors = []
self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
self.parsing()
......@@ -344,6 +357,7 @@ class Calc(Proc):
self._entry_metadata = self.create_metadata()
self._entry_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
self._entry_metadata.last_processing = datetime.utcnow()
self._entry_metadata.processing_errors = []
self._entry_metadata.files = self.upload_files.calc_files(self.mainfile)
self._entry_metadata.parser_name = self.parser
......@@ -428,6 +442,15 @@ class Calc(Proc):
self._entry_metadata.parser_name = self.parser
with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
if not config.process_reuse_parser:
if isinstance(parser, parsing.FairdiParser):
try:
parser = parser.__class__()
except Exception as e:
self.fail(
'could not re-create parser instance',
exc_info=e, error=str(e), **context)
return
try:
self._parser_results = EntryArchive()
# allow parsers to read/write metadata
......@@ -458,8 +481,8 @@ class Calc(Proc):
# Open the archive of the phonon calculation.
upload_files = StagingUploadFiles(self.upload_id, is_authorized=lambda: True)
with upload_files.read_archive(self.calc_id) as archive:
arch = query_archive(archive, {self.calc_id: self.calc_id})[self.calc_id]
phonon_archive = EntryArchive.m_from_dict(arch)
arch = archive[self.calc_id]
phonon_archive = EntryArchive.m_from_dict(arch.to_dict())
self._entry_metadata = phonon_archive.section_metadata
self._calc_proc_logs = phonon_archive.processing_logs
......@@ -470,16 +493,23 @@ class Calc(Proc):
# an absolute path which needs to be converted into a path that is
# relative to upload root.
scc = self._parser_results.section_run[0].section_single_configuration_calculation[0]
calculation_refs = scc.section_calculation_to_calculation_refs
if calculation_refs is None:
logger.error("No calculation_to_calculation references found")
return
relative_ref = scc.section_calculation_to_calculation_refs[0].calculation_to_calculation_external_url
ref_id = upload_files.calc_id(relative_ref)
with upload_files.read_archive(ref_id) as archive:
arch = query_archive(archive, {ref_id: ref_id})[ref_id]
ref_archive = EntryArchive.m_from_dict(arch)
arch = archive[ref_id]
ref_archive = EntryArchive.m_from_dict(arch.to_dict())
# Get encyclopedia method information directly from the referenced calculation.
ref_enc_method = ref_archive.section_metadata.encyclopedia.method
if ref_enc_method is None or len(ref_enc_method) == 0 or ref_enc_method.functional_type is None:
raise ValueError("No method information available in referenced calculation.")
logger.error("No method information available in referenced calculation.")
return
self._parser_results.section_metadata.encyclopedia.method = ref_enc_method
# Overwrite old entry with new data. The metadata is updated with
......@@ -505,16 +535,16 @@ class Calc(Proc):
finally:
# persist the calc metadata
with utils.timer(logger, 'saved calc metadata', step='metadata'):
with utils.timer(logger, 'calc metadata saved'):
self.apply_entry_metadata(self._entry_metadata)
# index in search
with utils.timer(logger, 'indexed', step='index'):
with utils.timer(logger, 'calc metadata indexed'):
self._entry_metadata.a_elastic.index()
# persist the archive
with utils.timer(
logger, 'archived', step='archive',
logger, 'calc archived',
input_size=self.mainfile_file.size) as log_data:
archive_size = self.write_archive(self._parser_results)
......@@ -540,7 +570,7 @@ class Calc(Proc):
with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size):
try:
normalizer(self._parser_results).normalize(logger=logger)
logger.info('processor completed successfull', **context)
logger.info('normalizer completed successfull', **context)
except Exception as e:
self.fail('normalizer failed with exception', exc_info=e, error=str(e), **context)
......@@ -617,16 +647,16 @@ class Calc(Proc):
self._read_metadata_from_file(logger)
# persist the calc metadata
with utils.timer(logger, 'saved calc metadata', step='metadata'):
with utils.timer(logger, 'calc metadata saved'):
self.apply_entry_metadata(self._entry_metadata)
# index in search
with utils.timer(logger, 'indexed', step='index'):
with utils.timer(logger, 'calc metadata indexed'):
self._entry_metadata.a_elastic.index()
# persist the archive
with utils.timer(
logger, 'archived', step='archive',
logger, 'calc archived',
input_size=self.mainfile_file.size) as log_data:
archive_size = self.write_archive(self._parser_results)
......@@ -829,24 +859,17 @@ class Upload(Proc):
Deletes the upload, including its processing state and
staging files. Local version without celery processing.
'''
logger = self.get_logger()
logger = self.get_logger(upload_size=self.upload_files.size)
with utils.lnr(logger, 'upload delete failed'):
with utils.timer(
logger, 'upload deleted from index', step='index',
upload_size=self.upload_files.size):
with utils.timer(logger, 'upload deleted from index'):
search.delete_upload(self.upload_id)
with utils.timer(
logger, 'upload partial archives', step='files',