Commit 7fc43923 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added archive processing information to archive data.

parent b099d11e
Pipeline #38764 failed with stages
in 6 minutes and 6 seconds
......@@ -44,7 +44,7 @@
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_utils::test_timer"
"-sv", "tests/test_parsing.py::TestLocalBackend::test_two_sections"
]
},
{
......
......@@ -95,6 +95,10 @@ class TemplateParser(ArtificalParser):
def run(self, mainfile: str, logger=None) -> LocalBackend:
self.init_backend()
if 'warning' in mainfile:
self.backend.pwarn('A test warning.')
template_json = json.load(open(mainfile, 'r'))
section = template_json['section_run'][0]
self.add_section(section)
......
......@@ -327,8 +327,7 @@ class LocalBackend(LegacyParserBackend):
delegate = LegacyLocalBackend(*args, **kwargs)
super().__init__(delegate)
self._status = 'none'
self._errors = None
self.reset_status()
self._open_context: Tuple[str, int] = None
self._context_section = None
......@@ -351,6 +350,10 @@ class LocalBackend(LegacyParserBackend):
def pwarn(self, msg):
self.logger.warn(msg)
if len(self._warnings) < 10:
self._warnings.append(msg)
elif len(self._warnings) == 10:
self._warnings.append('There are more warnings, check the processing logs.')
def _parse_context_uri(self, context_uri: str) -> Tuple[str, int]:
"""
......@@ -491,6 +494,11 @@ class LocalBackend(LegacyParserBackend):
""" Returns status and potential errors. """
return (self._status, self._errors)
def reset_status(self) -> None:
self._status = 'ParseSuccess'
self._errors = None
self._warnings: List[str] = []
def write_json(self, out: TextIO, pretty=True, filter: Callable[[str, Any], Any] = None):
"""
Writes the results stored in the backend after parsing in an 'archive'.json
......@@ -504,19 +512,13 @@ class LocalBackend(LegacyParserBackend):
json_writer = JSONStreamWriter(out, pretty=pretty)
json_writer.open_object()
json_writer.key_value('parser_status', self._status)
if self._errors is not None and len(self._errors) > 0:
json_writer.key('parser_errors')
json_writer.open_array
for error in self._errors:
json_writer.value(error)
json_writer.close_array
json_writer.key('section_run')
json_writer.open_array()
for run in self._delegate.results['section_run']:
LocalBackend._write(json_writer, run, filter=filter)
json_writer.close_array()
# TODO the root sections should be determined programatically
for root_section in ['section_run', 'section_calculation_info']:
json_writer.key(root_section)
json_writer.open_array()
for run in self._delegate.results[root_section]:
LocalBackend._write(json_writer, run, filter=filter)
json_writer.close_array()
json_writer.close_object()
json_writer.close()
......
......@@ -32,6 +32,7 @@ import logging
import base64
import time
from structlog import wrap_logger
from contextlib import contextmanager
from nomad import config, utils
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, File
......@@ -197,10 +198,49 @@ class Calc(Proc):
with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
self._parser_backend = parser.run(self.mainfile_tmp_path, logger=logger)
self._parser_backend.openNonOverlappingSection('section_calculation_info')
self._parser_backend.addValue('upload_id', self.upload_id)
self._parser_backend.addValue('archive_id', self.archive_id)
self._parser_backend.addValue('main_file', self.mainfile)
self._parser_backend.addValue('parser_name', self.parser)
if self._parser_backend.status[0] != 'ParseSuccess':
logger.error(self._parser_backend.status[1])
error = self._parser_backend.status[1]
self._parser_backend.addValue('parse_status', 'ParseFailure')
self.fail(error, level=logging.DEBUG, **context)
else:
self._parser_backend.addValue('parse_status', 'ParseSuccess')
self._parser_backend.closeNonOverlappingSection('section_calculation_info')
self.add_processor_info(self.parser)
@contextmanager
def use_parser_backend(self, processor_name):
self._parser_backend.reset_status()
yield self._parser_backend
self.add_processor_info(processor_name)
def add_processor_info(self, processor_name: str) -> None:
self._parser_backend.openContext('/section_calculation_info/0')
self._parser_backend.openNonOverlappingSection('section_archive_processing_info')
self._parser_backend.addValue('archive_processor_name', processor_name)
if self._parser_backend.status[0] == 'ParseSuccess':
warnings = getattr(self._parser_backend, '_warnings', [])
if len(warnings) > 0:
self._parser_backend.addValue('archive_processor_status', 'WithWarnings')
self._parser_backend.addValue('archive_processor_warning_number', len(warnings))
self._parser_backend.addArrayValues('archive_processor_warnings', [str(warning) for warning in warnings])
else:
self._parser_backend.addValue('archive_processor_status', 'Success')
else:
errors = self._parser_backend.status[1]
self._parser_backend.addValue('archive_processor_error', str(errors))
self._parser_backend.closeNonOverlappingSection('section_archive_processing_info')
self._parser_backend.closeContext('/section_calculation_info/0')
@task
def normalizing(self):
......@@ -211,15 +251,18 @@ class Calc(Proc):
with utils.timer(
logger, 'normalizer executed', input_size=self.mainfile_file.size):
normalizer(self._parser_backend).normalize(logger=logger)
with self.use_parser_backend(normalizer_name) as backend:
normalizer(backend).normalize(logger=logger)
if self._parser_backend.status[0] != 'ParseSuccess':
failed = self._parser_backend.status[0] != 'ParseSuccess'
if failed:
logger.error(self._parser_backend.status[1])
error = self._parser_backend.status[1]
self.fail(error, level=logging.WARNING, **context)
return
logger.debug(
'completed normalizer successfully', normalizer=normalizer_name)
break
else:
logger.debug(
'completed normalizer successfully', normalizer=normalizer_name)
@task
def archiving(self):
......@@ -479,7 +522,7 @@ class Upload(Chord):
@task
def parse_all(self):
"""
Identified mainfail/parser combinations among the upload's files, creates
Identified mainfile/parser combinations among the upload's files, creates
respective :class:`Calc` instances, and triggers their processing.
"""
logger = self.get_logger()
......
......@@ -23,6 +23,7 @@ import pytest
from datetime import datetime
import shutil
import os.path
import json
from nomad import user, utils
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile
......@@ -53,6 +54,16 @@ def uploaded_id(request, clear_files) -> Generator[str, None, None]:
yield example_upload_id
@pytest.fixture
def uploaded_id_with_warning(request, clear_files) -> Generator[str, None, None]:
example_file = 'tests/data/proc/examples_with_warning_template.zip'
example_upload_id = os.path.basename(example_file).replace('.zip', '')
upload_file = UploadFile(example_upload_id).os_path
shutil.copyfile(example_file, upload_file)
yield example_upload_id
def run_processing(uploaded_id: str) -> Upload:
upload = Upload.create(upload_id=uploaded_id, user=user.me)
upload.upload_time = datetime.now()
......@@ -77,7 +88,14 @@ def assert_processing(upload: Upload, mocksearch=None):
assert calc.parser is not None
assert calc.mainfile is not None
assert calc.status == 'SUCCESS', calc.archive_id
assert ArchiveFile(calc.archive_id).exists()
archive_file = ArchiveFile(calc.archive_id)
assert archive_file.exists()
with archive_file.read_archive_json() as archive_json:
archive = json.load(archive_json)
assert 'section_run' in archive
assert 'section_calculation_info' in archive
assert ArchiveLogFile(calc.archive_id).exists()
with ArchiveLogFile(calc.archive_id).open('rt') as f:
assert 'a test' in f.read()
......@@ -95,6 +113,12 @@ def test_processing(uploaded_id, worker, mocksearch, no_warn):
assert_processing(upload, mocksearch)
@pytest.mark.timeout(30)
def test_processing_with_warning(uploaded_id_with_warning, worker, mocksearch):
upload = run_processing(uploaded_id_with_warning)
assert_processing(upload, mocksearch)
@pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True)
def test_processing_doublets(uploaded_id, worker, with_error):
......
......@@ -72,6 +72,26 @@ class TestLocalBackend(object):
for i in range(0, 3):
assert backend.get_value('program_name', i) == 't%d' % i
def test_two_sections(self, backend, no_warn):
g_index = backend.openSection('section_run')
assert g_index == 0
backend.addValue('program_name', 't0')
backend.closeSection('section_run', 0)
g_index = backend.openSection('section_calculation_info')
assert g_index == 0
backend.addValue('parser_name', 'p0')
backend.closeSection('section_calculation_info', 0)
assert backend.get_sections('section_run') == [0]
assert backend.get_sections('section_calculation_info') == [0]
output = StringIO()
backend.write_json(output)
archive = json.loads(output.getvalue())
assert 'section_run' in archive
assert 'section_calculation_info' in archive
def test_subsection(self, backend: LocalBackend, no_warn):
backend.openSection('section_run')
backend.openSection('section_method')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment