Commit 7dcea1ce authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added the archive files to the processing results.

parent b5f20be0
......@@ -56,6 +56,8 @@ from nomad.dependencies import dependencies_dict as dependencies, PythonGit
logger = logging.getLogger(__name__)
ParserStatus = Tuple[str, List[str]]
class DelegatingMeta(ABCMeta):
def __new__(meta, name, bases, dct):
......@@ -227,7 +229,7 @@ class LocalBackend(LegacyParserBackend):
json_writer.value(value)
@property
def status(self) -> Tuple[str, List[str]]:
def status(self) -> ParserStatus:
""" Returns status and potential errors. """
return (self._status, self._errors)
......
......@@ -42,7 +42,7 @@ import json
import nomad.config as config
from nomad.files import Upload, UploadError
from nomad import files, utils
from nomad.parsing import parsers, parser_dict
from nomad.parsing import parsers, parser_dict, ParserStatus
# The legacy nomad code uses a logger called 'nomad'. We do not want that this
# logger becomes a child of this logger due to its module name starting with 'nomad.'
......@@ -72,6 +72,9 @@ app.conf.update(
result_serializer='pickle',
)
ParseTaskResult = Tuple[str, List[str], str]
ParseSpec = Tuple[str, str]
class UploadProcessing():
"""
......@@ -100,6 +103,7 @@ class UploadProcessing():
Attributes:
parse_specs: List of (parser_name, mainfile) tuples.
parse_results: Result of the parsers, currently bool indicating success.
upload_hash: The hash of the uploaded file. E.g., used for archive/repo ids.
status: Aggregated celery status for the whole process.
task_name: Name of the currently running task.
task_id: Id of the currently running task.
......@@ -109,8 +113,8 @@ class UploadProcessing():
def __init__(self, upload_id: str) -> None:
self.upload_id = upload_id
self.parse_specs: List[Tuple[str, str]] = None
self.parse_results: List[bool] = None
self.parse_specs: List[ParseSpec] = None
self.parse_results: List[ParseTaskResult] = None
self.upload_hash: str = None
self.status: str = 'PENDING'
......@@ -162,6 +166,7 @@ class UploadProcessing():
""" Updates all attributes from another instance. Returns itself. """
self.parse_specs = other.parse_specs
self.parse_results = other.parse_results
self.upload_hash = other.upload_hash
self.status = other.status
self.task_name = other.task_name
......@@ -269,17 +274,25 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing:
@app.task(bind=True)
def close_upload(task, parse_results: List[Any], processing: UploadProcessing) -> UploadProcessing:
def close_upload(
task, parse_results: List[ParseTaskResult], processing: UploadProcessing) -> UploadProcessing:
if not processing.continue_with(task):
return processing
processing.parse_results = parse_results
try:
upload = Upload(processing.upload_id)
except KeyError as e:
logger.warning('No upload %s' % processing.upload_id)
return processing.fail(e)
upload.close()
try:
upload.close()
except Exception as e:
logger.error('Could not close upload %s: %s' % (processing.upload_id, e))
return processing.fail(e)
return processing
......@@ -298,7 +311,7 @@ def distributed_parse(
@app.task()
def parse(processing: UploadProcessing, parse_spec: Tuple[str, str]) -> Any:
def parse(processing: UploadProcessing, parse_spec: ParseSpec) -> ParseTaskResult:
assert processing.upload_hash is not None
parser, mainfile = parse_spec
......@@ -306,17 +319,21 @@ def parse(processing: UploadProcessing, parse_spec: Tuple[str, str]) -> Any:
logger.debug('Start %s for %s.' % parse_spec)
try:
parser_backend = parser_dict[parser].run(mainfile)
except ValueError as e:
logger.warning('%s stopped on %s: %s' % (parser, mainfile, e))
return e
except Exception as e:
logger.warning('%s stopped on %s: %s' % (parser, mainfile, e), exc_info=e)
return e
return ('ParseFailed', [e.__str__()], None)
archive_id = '%s/%s' % (processing.upload_hash, utils.hash(mainfile))
logger.debug('Written results of %s for %s to %s.' % (parser, mainfile, archive_id))
with files.write_archive_json(archive_id) as out:
parser_backend.write_json(out, pretty=True)
try:
with files.write_archive_json(archive_id) as out:
parser_backend.write_json(out, pretty=True)
except Exception as e:
logger.error(
'Could not write archive %s for paring %s with %s.' %
(archive_id, mainfile, parser), exc_info=e)
return ('StorageFailed', [e.__str__()], archive_id)
return parser_backend.status
status = parser_backend.status
return (status[0], status[1], archive_id)
......@@ -51,13 +51,18 @@ def test_processing(uploaded_id):
run = UploadProcessing(uploaded_id)
run.start()
run.get(timeout=30)
assert run.ready()
run.forget()
assert run.ready()
assert run.task_name == 'nomad.processing.close_upload'
assert run.upload_hash is not None
assert run.cause is None
assert run.status == 'SUCCESS'
for status, errors, archive_id in run.parse_results:
assert errors is None or len(errors) == 0
assert status == 'ParseSuccess'
assert archive_id.startswith(run.upload_hash)
run.forget()
def test_process_non_existing():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment