Skip to content
Snippets Groups Projects
Commit 57962d7f authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Processing now stores results to archive.

parent fa106adf
No related branches found
No related tags found
No related merge requests found
...@@ -38,7 +38,7 @@ Uploads ...@@ -38,7 +38,7 @@ Uploads
.. autoclass:: Upload .. autoclass:: Upload
""" """
from typing import Callable, List, Any, Generator, IO, TextIO from typing import Callable, List, Any, Generator, IO, TextIO, cast
import sys import sys
import os import os
from os.path import join from os.path import join
...@@ -268,10 +268,10 @@ class Upload(): ...@@ -268,10 +268,10 @@ class Upload():
@contextmanager @contextmanager
def write_archive_json(archive_id) -> Generator[IO, None, None]: def write_archive_json(archive_id) -> Generator[TextIO, None, None]:
""" Context manager that yiels a file-like to write the archive json. """ """ Context manager that yiels a file-like to write the archive json. """
binary_out = io.BytesIO() binary_out = io.BytesIO()
gzip_wrapper = gzip.open(binary_out, 'wt') gzip_wrapper = cast(TextIO, gzip.open(binary_out, 'wt'))
try: try:
yield gzip_wrapper yield gzip_wrapper
......
...@@ -414,18 +414,16 @@ class Parser(): ...@@ -414,18 +414,16 @@ class Parser():
return False return False
def run(self, mainfile: str, out: TextIO=None) -> Tuple[str, List[str]]: def run(self, mainfile: str) -> LocalBackend:
""" """
Runs the parser on the given mainfile. It uses :class:`LocalBackend` as Runs the parser on the given mainfile. It uses :class:`LocalBackend` as
a backend. The meta-info access is handled by the underlying NOMAD-coe parser. a backend. The meta-info access is handled by the underlying NOMAD-coe parser.
Args: Args:
mainfile: A path to a mainfile that this parser can parse. mainfile: A path to a mainfile that this parser can parse.
out: Optional file like that is used to write the 'archive'.json results
to.
Returns: Returns:
The parser status from the backend, see :property:`LocalBackend.status`. The used :class:`LocalBackend` with status information and result data.
""" """
def create_backend(meta_info): def create_backend(meta_info):
return LocalBackend(meta_info, debug=False) return LocalBackend(meta_info, debug=False)
...@@ -438,10 +436,7 @@ class Parser(): ...@@ -438,10 +436,7 @@ class Parser():
parser.parse(mainfile) parser.parse(mainfile)
backend = parser.parser_context.super_backend backend = parser.parser_context.super_backend
if out is not None: return backend
backend.write_json(out, pretty=True)
return backend.status
def __repr__(self): def __repr__(self):
return self.python_git.__repr__() return self.python_git.__repr__()
......
...@@ -41,6 +41,7 @@ import json ...@@ -41,6 +41,7 @@ import json
import nomad.config as config import nomad.config as config
from nomad.files import Upload, UploadError from nomad.files import Upload, UploadError
from nomad import files, utils
from nomad.parsing import parsers, parser_dict from nomad.parsing import parsers, parser_dict
# The legacy nomad code uses a logger called 'nomad'. We do not want that this # The legacy nomad code uses a logger called 'nomad'. We do not want that this
...@@ -110,6 +111,7 @@ class UploadProcessing(): ...@@ -110,6 +111,7 @@ class UploadProcessing():
self.parse_specs: List[Tuple[str, str]] = None self.parse_specs: List[Tuple[str, str]] = None
self.parse_results: List[bool] = None self.parse_results: List[bool] = None
self.upload_hash: str = None
self.status: str = 'PENDING' self.status: str = 'PENDING'
self.task_name: str = None self.task_name: str = None
...@@ -246,6 +248,12 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing: ...@@ -246,6 +248,12 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing:
logger.debug('Could not open upload %s: %s' % (processing.upload_id, e)) logger.debug('Could not open upload %s: %s' % (processing.upload_id, e))
return processing.fail(e) return processing.fail(e)
try:
processing.upload_hash = upload.hash()
except UploadError as e:
logger.error('Could not create an upload hash %s: %s' % (processing.upload_id, e))
return processing.fail(e)
try: try:
processing.parse_specs = list() processing.parse_specs = list()
for filename in upload.filelist: for filename in upload.filelist:
...@@ -284,18 +292,20 @@ def distributed_parse( ...@@ -284,18 +292,20 @@ def distributed_parse(
chord([])(close_upload.clone(args=(processing,))) chord([])(close_upload.clone(args=(processing,)))
return processing return processing
parses = group(parse.s(parse_spec) for parse_spec in processing.parse_specs) parses = group(parse.s(processing, parse_spec) for parse_spec in processing.parse_specs)
chord(parses)(close_upload.clone(args=(processing,))) chord(parses)(close_upload.clone(args=(processing,)))
return processing return processing
@app.task() @app.task()
def parse(parse_spec: Tuple[str, str]) -> Any: def parse(processing: UploadProcessing, parse_spec: Tuple[str, str]) -> Any:
assert processing.upload_hash is not None
parser, mainfile = parse_spec parser, mainfile = parse_spec
logger.debug('Start %s for %s.' % parse_spec) logger.debug('Start %s for %s.' % parse_spec)
try: try:
parser_dict[parser].run(mainfile) parser_backend = parser_dict[parser].run(mainfile)
except ValueError as e: except ValueError as e:
logger.warning('%s stopped on %s: %s' % (parser, mainfile, e)) logger.warning('%s stopped on %s: %s' % (parser, mainfile, e))
return e return e
...@@ -303,4 +313,10 @@ def parse(parse_spec: Tuple[str, str]) -> Any: ...@@ -303,4 +313,10 @@ def parse(parse_spec: Tuple[str, str]) -> Any:
logger.warning('%s stopped on %s: %s' % (parser, mainfile, e), exc_info=e) logger.warning('%s stopped on %s: %s' % (parser, mainfile, e), exc_info=e)
return e return e
return True # TODO some other value? archive_id = '%s/%s' % (processing.upload_hash, utils.hash(mainfile))
logger.debug('Written results of %s for %s to %s.' % (parser, mainfile, archive_id))
with files.write_archive_json(archive_id) as out:
parser_backend.write_json(out, pretty=True)
return parser_backend.status
from typing import Union, IO, cast
import hashlib
import base64
def hash(obj: Union[IO, str]) -> str:
""" First 28 character of an URL safe base 64 encoded sha512 digest. """
hash = hashlib.sha512()
if getattr(obj, 'read', None) is not None:
for data in iter(lambda: cast(IO, obj).read(65536), b''):
hash.update(data)
elif isinstance(obj, str):
hash.update(obj.encode('utf-8'))
return base64.b64encode(hash.digest(), altchars=b'-_')[0:28].decode('utf-8')
...@@ -64,7 +64,8 @@ def test_stream_generator(pretty): ...@@ -64,7 +64,8 @@ def test_stream_generator(pretty):
def test_vasp_parser(): def test_vasp_parser():
vasp_parser = parser_dict['parsers/vasp'] vasp_parser = parser_dict['parsers/vasp']
example_mainfile = '.dependencies/parsers/vasp/test/examples/xml/perovskite.xml' example_mainfile = '.dependencies/parsers/vasp/test/examples/xml/perovskite.xml'
status, errors = vasp_parser.run(example_mainfile) parser_backend = vasp_parser.run(example_mainfile)
status, errors = parser_backend.status
assert status == 'ParseSuccess' assert status == 'ParseSuccess'
assert errors is None or len(errors) == 0 assert errors is None or len(errors) == 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment