diff --git a/.vscode/launch.json b/.vscode/launch.json index 47f932328c8fe792a191e055c09a824d95935548..72dc9af2a2459cfcafab2161e8511075501bf688 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -51,6 +51,16 @@ "-sv", "tests/test_search.py" ] }, + { + "name": "Python: tests/test_utils.py", + "type": "python", + "request": "launch", + "cwd": "${workspaceFolder}", + "program": "${workspaceFolder}/.pyenv/bin/pytest", + "args": [ + "-sv", "tests/test_utils.py" + ] + }, { "name": "Python: Current File", "type": "python", diff --git a/nomad/api.py b/nomad/api.py index e020e92b38f48b6c9e948da82903bae2b3af246d..d0c8982a098153bc269fcf7105cba05ecf5a2103 100644 --- a/nomad/api.py +++ b/nomad/api.py @@ -1,12 +1,13 @@ from flask import Flask, Response from flask_restful import Resource, Api, abort from datetime import datetime -from threading import Thread import mongoengine.errors from flask_cors import CORS import logging +import json -from nomad import users, files, processing +from nomad import users, files +from nomad.processing import UploadProc from nomad.utils import get_logger app = Flask(__name__) @@ -25,31 +26,21 @@ class Uploads(Resource): @staticmethod def _render(upload: users.Upload): + if upload.proc: + proc = UploadProc(**upload.proc) + proc.update_from_backend() + else: + proc = None + data = { - 'id': upload.upload_id, # deprecated 'upload_id': upload.upload_id, 'presigned_url': upload.presigned_url, 'create_time': upload.create_time.isoformat() if upload.create_time is not None else None, 'upload_time': upload.upload_time.isoformat() if upload.upload_time is not None else None, - 'upload_hash': upload.upload_hash, - 'tasks': processing.upload_task_names + 'proc_time': upload.proc_time.isoformat() if upload.proc_time is not None else None, + 'proc': proc } - # TODO this should partially be done in processing.UploadProcessing.to_dict - if upload.proc_results is not None: - data['processing'] = upload.proc_results - elif upload.proc_task is not None: - proc = processing.UploadProcessing.from_result_backend(upload.upload_id, upload.proc_task) - data['processing'] = proc.to_dict() - data['task'] = proc.task_name - - if upload.upload_time is None: - data['status'] = 'UPLOADING' - elif 'processing' in data: - data['status'] = data['processing']['status'] - if data['status'] == 'PENDING': - data['status'] == 'EXTRACTING' - return {key: value for key, value in data.items() if value is not None} def get(self): diff --git a/nomad/processing.py b/nomad/processing.py deleted file mode 100644 index 2d376853a47a02ae96c4c8513a37a87705924d75..0000000000000000000000000000000000000000 --- a/nomad/processing.py +++ /dev/null @@ -1,530 +0,0 @@ -# Copyright 2018 Markus Scheidgen -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an"AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This modules allows to (1) run a celery worker that can perform all upload processing -tasks, (2) allows to start, manage, get status of upload processings -(i.e. celery canvas, i.e. workflow of tasks), (3) contains the implementation of said -celery tasks. - -We make use of celery. It is a widely popular module for running complex distributed -task workflows on a variety of task and result backends. We are using the popular -rabbitmq, redis combination. Rabbitmq allows for a very scalable distribution of tasks in -clouds and clusters, while redis provides a more centralized, reliable temporary storage -for task stati and results. - -The class :class:`UploadProcessing` allows to start, manage, and read the status of -a upload processing in a serializable form. - -.. autoclass:: nomad.processing.UploadProcessing -""" - -from typing import List, Any, Dict -from celery import Celery, Task, chord, group -from celery.result import AsyncResult, result_from_tuple -from celery.signals import after_setup_task_logger, after_setup_logger -from celery.utils.log import get_task_logger -from celery.canvas import Signature -import logging -import logstash -from datetime import datetime - -import nomad.config as config -from nomad.files import Upload, UploadError -from nomad import files, utils, users -from nomad.parsing import parsers, parser_dict -from nomad.normalizing import normalizers -from nomad import search -import nomad.patch # pylint: disable=unused-import - -# The legacy nomad code uses a logger called 'nomad'. We do not want that this -# logger becomes a child of this logger due to its module name starting with 'nomad.' -logger = get_task_logger(__name__.replace('nomad', 'nomad-xt')) -logger.setLevel(logging.DEBUG) - -if config.logstash.enabled: - def initialize_logstash(logger=None, loglevel=logging.INFO, **kwargs): - handler = logstash.TCPLogstashHandler( - config.logstash.host, config.logstash.tcp_port, - tags=['celery'], message_type='celery', version=1) - handler.setLevel(loglevel) - logger.addHandler(handler) - return logger - - after_setup_task_logger.connect(initialize_logstash) - after_setup_logger.connect(initialize_logstash) - - -app = Celery('nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url) -app.add_defaults(dict( - accept_content=['json', 'pickle'], - task_serializer=config.celery.serializer, - result_serializer=config.celery.serializer, -)) - - -class CalcProcessing(dict): - """ - Represents the processing of a singular calculation. It is used as argument and - results for the task that processes an individual calculation from a mainfile. - - Arguments: - upload_hash: The hash that identifies the upload in the archive. - mainfile: The path to the mainfile in the upload. - parser_name: The name of the parser to use/used. - tmp_mainfile: The full path to the mainfile in the local fs. - - Attributes: - pipeline: A list of sub processings for all parser, normalizers, indexing, storage. - calc_hash: The mainfile hash that identifies the calc in the archive. - archive_id: The id that identifies the archive via `upload_hash/calc_hash`. - """ - def __init__(self, upload_hash, mainfile, parser_name, tmp_mainfile): - self['upload_hash'] = upload_hash - self['parser_name'] = parser_name - self['mainfile'] = mainfile - self['calc_hash'] = utils.hash(mainfile) - self.tmp_mainfile = tmp_mainfile - - def append(self, task, status, errors=[]): - if errors is None: - errors = [] - stage = dict(task=task, status=status, errors=errors) - self.setdefault('pipeline', []).append(stage) - - @property - def parser_name(self): - return self['parser_name'] - - @property - def mainfile(self): - return self['mainfile'] - - @property - def pipeline(self): - return self.get('pipeline', []) - - @property - def upload_hash(self): - return self.get('upload_hash', None) - - @property - def calc_hash(self): - return self.get('calc_hash', None) - - @property - def archive_id(self): - return '%s/%s' % (self.upload_hash, self.calc_hash) - - -class UploadProcessing(): - """ - Represents the processing of an uploaded file. It allows to start and manage a - processing run, acts itself as an client accesible and serializable state of the - processing. - - It is serializable (JSON, pickle). Iternaly stores - :class:`~celery.results.AsyncResults` instance in serialized *tuple* form to - keep connected to the results backend. - - Instances of this class represent the state of a processing and are handed - from task to task through the processing workflow. - - Warning: - You have to call :func:`forget` eventually to free all resources and the celery - results backend. - - Anyhow, results will be deleted after 1 day, depending on `configuration - <http://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires>`_. - - Arguments: - upload_id: The id of the uploaded file in the object storage, - see also :mod:`nomad.files`. - - Attributes: - calc_processings: Information about identified calcs and their processing. - upload_hash: The hash of the uploaded file. E.g., used for archive/repo ids. - status: Aggregated celery status for the whole process. - task_name: Name of the currently running task. - task_id: Id of the currently running task. - cause: *None* or *Exception* that caused failure. - result_tuple: Serialized form of the celery async_results tree for the processing. - """ - def __init__(self, upload_id: str) -> None: - self.upload_id = upload_id - - self.calc_processing_task_ids: List[str] = [] - self.calc_processings: List[CalcProcessing] = None - self.upload_hash: str = None - - self.status: str = 'PENDING' - self.task_name: str = None - self.task_id: str = None - self.cause: Exception = None - self.result_tuple: Any = None - - self._main = None - - @staticmethod - def from_result_backend(upload_id, result_tuple): - """ Loads the processing data from the results backend and returnes an updated instance. """ - processing = UploadProcessing(upload_id) - processing.result_tuple = result_tuple - return processing.updated() - - def start(self): - """ Initiates the processing tasks via celery canvas. """ - assert not self._is_started, 'Cannot start a started or used processing.' - - # Keep the results of the last task is the workflow. - # The last task is started by another task, therefore it - # is not the end of the main task chain. - finalize = close_upload.s() - finalize_result = finalize.freeze() - - # start the main chain - main_chain = open_upload.s(self) | distributed_parse.s(finalize) - main_chain_result = main_chain.delay() - - # Create a singular result tree. This might not be the right way to do it. - finalize_result.parent = main_chain_result - - # Keep the result as tuple that also includes all parents, i.e. the whole - # serializable tree - self.result_tuple = finalize_result.as_tuple() - - @property - def _async_result(self) -> AsyncResult: - """ - The celery async_result in its regular usable, but not serializable form. - - We use the tuple form to allow serialization (i.e. storage). Keep in mind - that the sheer `task_id` is not enough, because it does not contain - the parent tasks, i.e. result tree. - See `third comment <https://github.com/celery/celery/issues/1328>`_ - for details. - """ - return result_from_tuple(self.result_tuple, app=app) - - @property - def _is_started(self) -> bool: - """ True, if the task is started. """ - return self.result_tuple is not None - - def _update(self, other: 'UploadProcessing') -> 'UploadProcessing': - """ Updates all attributes from another instance. Returns itself. """ - self.calc_processing_task_ids = other.calc_processing_task_ids - self.calc_processings = other.calc_processings - self.upload_hash = other.upload_hash - - self.status = other.status - self.task_name = other.task_name - self.task_id = other.task_id - self.cause = other.cause - - return self - - def updated(self) -> 'UploadProcessing': - """ Consults the result backend and updates itself with the available results. """ - assert self._is_started, 'Run is not yet started.' - - async_result = self._async_result - is_last_task = True - - while async_result is not None: - if async_result.ready(): - status = async_result.status - if status == 'SUCCESS' and not is_last_task: - status = 'PROGRESS' - async_result.result.status = status - self._update(async_result.result) - break - else: - is_last_task = False - async_result = async_result.parent - - self.calc_processings = [] - for calc_task_id in self.calc_processing_task_ids: - calc_task_result = parse.AsyncResult(calc_task_id) - if calc_task_result.ready() and calc_task_result.status == 'SUCCESS': - self.calc_processings.append(calc_task_result.result) - elif calc_task_result.state == 'PROGRESS': - self.calc_processings.append(calc_task_result.info['processing']) - - return self - - def forget(self) -> None: - """ Forget the results of a completed run; free all resources in the results backend. """ - assert self.ready(), 'Run is not completed.' - - async_result = self._async_result - while async_result is not None: - async_result.forget() - async_result = async_result.parent - - def ready(self) -> bool: - """ Returns: True if the task has been executed. """ - assert self._is_started, 'Run is not yet started.' - - return self._async_result.ready() - - def get(self, *args, **kwargs) -> 'UploadProcessing': - """ - Blocks until the processing has finished. Forwards args, kwargs to - *celery.result.get* for timeouts, etc. - - Returns: An upadted instance of itself with all the results. - """ - assert self._is_started, 'Run is not yet started.' - - self._async_result.get(*args, **kwargs) - return self.updated() - - def fail(self, e: Exception) -> 'UploadProcessing': - """ Allows tasks to mark this processing as failed. All following task will do nothing. """ - self.cause = e - self.status = 'FAILURE' - return self - - def continue_with(self, task: Task) -> bool: - """ Upadtes itself with information about the new current task. """ - assert self.status != 'SUCCESS', 'Cannot continue on completed workflow.' - - if self.status == 'FAILURE': - return False - else: - self.status = 'STARTED' - self.task_name = task.name - self.task_id = task.request.id - return True - - def to_dict(self) -> Dict[str, Any]: - """ Render processing information into a serializable dict. """ - result = { - 'status': self.status, - 'calcs': self.calc_processings, - 'current_task': self.task_name, - 'error': self.cause.__str__() - } - return {key: value for key, value in result.items() if value is not None} - - -@app.task(bind=True) -def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing: - if not processing.continue_with(task): - return processing - - try: - upload = Upload(processing.upload_id) - upload.open() - except KeyError as e: - logger.debug('Process request for non existing upload %s.' % processing.upload_id) - return processing.fail(e) - except UploadError as e: - logger.debug('Could not open upload %s: %s' % (processing.upload_id, e)) - return processing.fail(e) - - logger.debug('Opened upload %s' % processing.upload_id) - - try: - processing.upload_hash = upload.hash() - except UploadError as e: - logger.error('Could not create an upload hash %s: %s' % (processing.upload_id, e)) - return processing.fail(e) - - try: - # TODO: deal with multiple possible parser specs - processing.calc_processings = list() - for filename in upload.filelist: - for parser in parsers: - if parser.is_mainfile(upload, filename): - calc_processing = CalcProcessing( - processing.upload_hash, filename, parser.name, - upload.get_path(filename)) - - processing.calc_processings.append(calc_processing) - except UploadError as e: - logger.warning('Could find parse specs in open upload %s: %s' % (processing.upload_id, e)) - return processing.fail(e) - - return processing - - -@app.task(bind=True) -def close_upload( - task, calc_processings: List[CalcProcessing], processing: UploadProcessing) \ - -> UploadProcessing: - - if processing.continue_with(task): - processing.calc_processings = calc_processings - - try: - upload = Upload(processing.upload_id) - except KeyError as e: - logger.warning('No upload %s' % processing.upload_id) - return processing.fail(e) - - try: - upload.close() - except Exception as e: - logger.error('Could not close upload %s: %s' % (processing.upload_id, e)) - return processing.fail(e) - - logger.debug('Closed upload %s' % processing.upload_id) - - return processing - - -def _report_progress(task, **kwargs): - if not task.request.called_directly: - task.update_state(state='PROGRESS', meta=kwargs) - - -@app.task(bind=True) -def distributed_parse( - task: Task, processing: UploadProcessing, close_upload: Signature) -> UploadProcessing: - if not processing.continue_with(task): - chord([])(close_upload.clone(args=(processing,))) - return processing - - # prepare the group of parallel calc processings - parses = group(parse.s(calc_processing) for calc_processing in processing.calc_processings) - # save the calc processing task ids to the overall processing - processing.calc_processing_task_ids = list(child.task_id for child in parses.freeze().children) - # initiate the chord that runs calc processings first, and close_upload afterwards - chord(parses)(close_upload.clone(args=(processing,))) - - return processing - - -@app.task(bind=True) -def parse(self, processing: CalcProcessing) -> CalcProcessing: - assert processing.upload_hash is not None - - upload_hash = processing.upload_hash - parser, mainfile = processing.parser_name, processing.mainfile - - # parsing - logger.debug('Start %s for %s/%s.' % (parser, upload_hash, mainfile)) - try: - parser_backend = parser_dict[parser].run(processing.tmp_mainfile) - processing.append(parser, *parser_backend.status) - _report_progress(self, processing=processing) - except Exception as e: - logger.warning( - '%s stopped on %s/%s: %s' % - (parser, upload_hash, mainfile, e), exc_info=e) - processing.append(parser, 'ParseFailed', [e.__str__()]) - return processing - - # normalization - for normalizer in normalizers: - normalizer_name = normalizer.__name__ - logger.debug('Start %s for %s/%s.' % (normalizer, upload_hash, mainfile)) - try: - normalizer(parser_backend).normalize() - processing.append(normalizer_name, *parser_backend.status) - _report_progress(self, processing=processing) - except Exception as e: - logger.warning( - '%s stopped on %s/%s: %s' % - (normalizer, upload_hash, mainfile, e), exc_info=e) - processing.append(normalizer_name, 'NormalizeFailed', [e.__str__()]) - return normalizer_name - - # update search - try: - search.Calc.add_from_backend( - parser_backend, - upload_hash=upload_hash, - calc_hash=processing.calc_hash, - mainfile=mainfile, - upload_time=datetime.now()) - processing.append('Indexer', 'IndexSuccess') - _report_progress(self, processing=processing) - except Exception as e: - logger.error( - 'Could not add %s/%s to search index: %s.' % - (upload_hash, mainfile, e), exc_info=e) - processing.append('Indexer', 'IndexFailed', [e.__str__()]) - - # calc data persistence - archive_id = processing.archive_id - try: - with files.write_archive_json(archive_id) as out: - parser_backend.write_json(out, pretty=True) - processing.append('Storage', 'PersistenceSuccess') - _report_progress(self, processing=processing) - except Exception as e: - logger.error( - 'Could not write archive %s for paring %s with %s.' % - (archive_id, mainfile, parser), exc_info=e) - processing.append('Storage', 'PersistenceFailed', [e.__str__()]) - return processing - - logger.debug('Written results of %s for %s to %s.' % (parser, mainfile, archive_id)) - - return processing - - -upload_task_names = [ - open_upload.name, - distributed_parse.name, - close_upload.name -] - - -def handle_uploads(quit=False): - """ - Starts a daemon that will listen to files for new uploads. For each new - upload it will initiate the processing and save the task in the upload user data, - it will wait for processing to be completed and store the results in the upload - user data. - - Arguments: - quit: If true, will only handling one event and stop. Otherwise run forever. - """ - - @files.upload_put_handler - def handle_upload_put(received_upload_id: str): - logger = utils.get_logger(__name__, upload_id=received_upload_id) - logger.debug('Initiate upload processing') - try: - upload = users.Upload.objects(id=received_upload_id).first() - if upload is None: - logger.error('Upload does not exist') - raise Exception() - - if upload.upload_time is not None: - logger.warn('Ignore upload notification, since file is already uploaded') - raise StopIteration - - with logger.lnr_error('Save upload time'): - upload.upload_time = datetime.now() - upload.save() - - with logger.lnr_error('Start processing'): - proc = UploadProcessing(received_upload_id) - proc.start() - upload.proc_task = proc.result_tuple - upload.save() - - except Exception: - pass - - if quit: - raise StopIteration - - logger.debug('Start upload put notification handler.') - handle_upload_put(received_upload_id='provided by decorator') diff --git a/nomad/processing/__init__.py b/nomad/processing/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..d95601d72ab3b9ee2ee93f7cf1b5e0b209a7a8e0 --- /dev/null +++ b/nomad/processing/__init__.py @@ -0,0 +1,33 @@ +# Copyright 2018 Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Processing comprises everything that is necessary to take a file uploaded by a user +and processes it until we have all data necessary for repository, archive, and +potentially further services. This includes storing respective data and information +in the data services (e.g. *minio*, *mongo*, or *elastic*). + +A further responsiblity of this module is to provide state information about +running and completed processings. It also needs to provide static information about +the existing processing steps and tasks. + +This module does not contain the functions to do the actual work. Those are encapsulated +in :module:`nomad.files`, :module:`nomad.search`, :module:`nomad.users`, +:module:`nomad.parsing`, and :module:`nomad.normalizing`. +""" + +from nomad.processing.app import app +from nomad.processing import tasks +from nomad.processing.state import UploadProc, CalcProc +from nomad.processing.handler import handle_uploads, handle_uploads_thread, start_processing diff --git a/nomad/processing/app.py b/nomad/processing/app.py new file mode 100644 index 0000000000000000000000000000000000000000..0831e0c72b40208eb680524e477808bd2be50c44 --- /dev/null +++ b/nomad/processing/app.py @@ -0,0 +1,48 @@ +# Copyright 2018 Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from celery import Celery +from celery.signals import after_setup_task_logger, after_setup_logger +from celery.utils.log import get_task_logger +import logging +import logstash + +import nomad.config as config +import nomad.patch # pylint: disable=unused-import + + +# The legacy nomad code uses a logger called 'nomad'. We do not want that this +# logger becomes a child of this logger due to its module name starting with 'nomad.' +logger = get_task_logger(__name__.replace('nomad', 'nomad-xt')) +logger.setLevel(logging.DEBUG) + +if config.logstash.enabled: + def initialize_logstash(logger=None, loglevel=logging.INFO, **kwargs): + handler = logstash.TCPLogstashHandler( + config.logstash.host, config.logstash.tcp_port, + tags=['celery'], message_type='celery', version=1) + handler.setLevel(loglevel) + logger.addHandler(handler) + return logger + + after_setup_task_logger.connect(initialize_logstash) + after_setup_logger.connect(initialize_logstash) + + +app = Celery('nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url) +app.add_defaults(dict( + accept_content=['json', 'pickle'], + task_serializer=config.celery.serializer, + result_serializer=config.celery.serializer, +)) diff --git a/nomad/processing/handler.py b/nomad/processing/handler.py new file mode 100644 index 0000000000000000000000000000000000000000..72bdba4fdeefc6b9f28e930b2606790543f18c99 --- /dev/null +++ b/nomad/processing/handler.py @@ -0,0 +1,110 @@ +# Copyright 2018 Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +import logging +from threading import Thread + +from nomad import files, utils, users + +from nomad.processing.tasks import extracting_task, cleanup_task, parse_all_task +from nomad.processing.state import UploadProc + + +def start_processing(upload_id) -> UploadProc: + """ Starts the processing tasks via celery canvas. """ + + task_names = [ + extracting_task.name, + parse_all_task.name, + cleanup_task.name + ] + + proc = UploadProc(upload_id, task_names) + + # Keep the results of the last task is the workflow. + # The last task is started by another task, therefore it + # is not the end of the main task chain. + finalize = cleanup_task.s() + finalize_result = finalize.freeze() + + # start the main chain + main_chain = extracting_task.s(proc) | parse_all_task.s(finalize) + main_chain_result = main_chain.delay() + + # Create a singular result tree. This might not be the right way to do it. + finalize_result.parent = main_chain_result + + # Keep the result as tuple that also includes all parents, i.e. the whole + # serializable tree + proc.celery_task_ids = finalize_result.as_tuple() + proc.status = 'STARTED' + + return proc + + +def handle_uploads(quit=False): + """ + Starts a daemon that will listen to files for new uploads. For each new + upload it will initiate the processing and save the task in the upload user data, + it will wait for processing to be completed and store the results in the upload + user data. + + Arguments: + quit: If true, will only handling one event and stop. Otherwise run forever. + """ + + @files.upload_put_handler + def handle_upload_put(received_upload_id: str): + logger = utils.get_logger(__name__, upload_id=received_upload_id) + logger.debug('Initiate upload processing') + try: + upload = users.Upload.objects(id=received_upload_id).first() + if upload is None: + logger.error('Upload does not exist') + raise Exception() + + if upload.upload_time is not None: + logger.warn('Ignore upload notification, since file is already uploaded') + raise StopIteration + + with logger.lnr_error('Save upload time'): + upload.upload_time = datetime.now() + upload.save() + + with logger.lnr_error('Start processing'): + proc = start_processing(received_upload_id) + assert proc.is_started + upload.proc = proc + upload.save() + + except Exception: + pass + + if quit: + raise StopIteration + + utils.get_logger(__name__).debug('Start upload put notification handler.') + handle_upload_put(received_upload_id='provided by decorator') + + +def handle_uploads_thread(quit=True): + thread = Thread(target=lambda: handle_uploads(quit)) + thread.start() + return thread + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + handle_uploads() diff --git a/nomad/processing/state.py b/nomad/processing/state.py new file mode 100644 index 0000000000000000000000000000000000000000..b11b3e5ffcec895eb12426cd30583531c559b67f --- /dev/null +++ b/nomad/processing/state.py @@ -0,0 +1,234 @@ +# Copyright 2018 Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Any, Union, cast +from celery.result import AsyncResult, result_from_tuple +import itertools + +from nomad.normalizing import normalizers +from nomad.utils import DataObject +from nomad.processing.app import app + + +class ProcPipeline(DataObject): + """ + Arguments: + task_names: A list of task names in pipeline order. + + Attributes: + current_task_name: Name of the currently running task. + status: Aggregated status for the whole process. Celery status names as convention. + errors: A list of potential error that caused failure. + """ + def __init__(self, task_names: List[str], *args, **kwargs) -> None: + super().__init__(*args) + self.task_names: List[str] = task_names + self.current_task_name: str = None + self.status: str = 'PENDING' + self.errors: List[str] = [] + + self.update(kwargs) + + def continue_with(self, task_name: str) -> bool: + """ Upadtes itself with information about the new current task. """ + + assert self.status != 'SUCCESS', 'Cannot continue on completed workflow.' + + if self.status == 'FAILURE': + return False + else: + self.status = 'PROGRESS' + self.current_task_name = task_name + return True + + def success(self) -> None: + self.status = 'SUCCESS' + + @property + def is_started(self) -> bool: + """ True, if the task is started. """ + return self.status is not 'PENDING' + + def fail(self, e: Union[List[str], List[Exception], Exception, str]): + """ Allows tasks to mark this processing as failed. All following task will do nothing. """ + raw_errors: Union[List[str], List[Exception]] = None + if isinstance(e, list): + raw_errors = e + else: + raw_errors = cast(Union[List[str], List[Exception]], [e]) + + for error in raw_errors: + if isinstance(error, str): + self.errors.append(error) + elif isinstance(error, Exception): + self.errors.append(error.__str__()) + else: + assert False, 'Unknown error' + + self.status = 'FAILURE' + + +class CalcProc(ProcPipeline): + """ + Used to represent the state of an calc processing. It is used to provide + information to the user (via api, users) and act as a state object within the + more complex calc processing task. Keep in mind that this task might become several + celery tasks in the future. + + Arguments: + upload_hash: The hash that identifies the upload in the archive. + mainfile: The path to the mainfile in the upload. + parser_name: The name of the parser to use/used. + tmp_mainfile: The full path to the mainfile in the local fs. + + Attributes: + calc_hash: The mainfile hash that identifies the calc in the archive. + archive_id: The id that identifies the archive via `upload_hash/calc_hash`. + celery_task_id: The celery task id for the calc parse celery task. + """ + def __init__(self, upload_hash, mainfile, parser_name, tmp_mainfile, *args, **kwargs): + task_names = [ + [parser_name], + [n.__name__ for n in normalizers], + ['indexing', 'storage'] + ] + + super().__init__(task_names=list(itertools.chain(*task_names)), *args) + + self.upload_hash = upload_hash + self.mainfile = mainfile + self.parser_name = parser_name + self.tmp_mainfile = tmp_mainfile + + self.calc_hash = hash(mainfile) + self.archive_id = '%s/%s' % (self.upload_hash, self.calc_hash) + + self.celery_task_id: str = None + + self.update(kwargs) + + def update_from_backend(self): + assert self.celery_task_id is not None + + celery_task_result = AsyncResult(self.celery_task_id, app=app) + if celery_task_result.ready(): + self.update(celery_task_result.result) + else: + info = celery_task_result.info + if info is not None: + self.update(info) + + +class UploadProc(ProcPipeline): + """ + Used to represent the state of an upload processing. It is used to provide + information to the user (via api, users) and act as a state object that is passed + from celery task to celery task. + + It is serializable (JSON, pickle). Iternaly stores + :class:`~celery.results.AsyncResults` instance in serialized *tuple* form to + keep connected to the results backend. + + Warning: + You have to call :func:`forget` eventually to free all resources and the celery + results backend. + + Anyhow, results will be deleted after 1 day, depending on `configuration + <http://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires>`_. + + Arguments: + upload_id: The id of the uploaded file in the object storage, + see also :mod:`nomad.files`. + task_name: The names of all task in pipeline order. + + Attributes: + upload_hash: The hash of the uploaded file. E.g., used for archive/repo ids. + calc_procs: The state data for all child calc processings. + celery_task_ids: Serialized form of the celery async_results tree for the processing. + """ + def __init__(self, upload_id: str, task_names: List[str], *args, **kwargs) -> None: + assert upload_id is not None + super().__init__(task_names, *args) + + self.upload_id = upload_id + self.upload_hash: str = None + + self.calc_procs: List[CalcProc] = [] + + self.celery_task_ids: Any = None + + self.update(kwargs) + + @property + def _celery_task_result(self) -> AsyncResult: + """ + The celery async_result in its regular usable, but not serializable form. + + We use the tuple form to allow serialization (i.e. storage). Keep in mind + that the sheer `task_id` is not enough, because it does not contain + the parent tasks, i.e. result tree. + See `third comment <https://github.com/celery/celery/issues/1328>`_ + for details. + """ + assert self.celery_task_ids is not None + + return result_from_tuple(self.celery_task_ids, app=app) + + def update_from_backend(self) -> 'UploadProc': + """ Consults the result backend and updates itself with the available results. """ + assert self.is_started, 'Run is not yet started.' + + celery_task_result = self._celery_task_result + + while celery_task_result is not None: + if celery_task_result.ready(): + self.update(celery_task_result.result) + break + else: + celery_task_result = celery_task_result.parent + + if self.calc_procs is not None: + for calc_proc in self.calc_procs: + calc_proc.update_from_backend() + + return self + + def forget(self) -> None: + """ Forget the results of a completed run; free all resources in the results backend. """ + # TODO, this is not forgetting the parse task in the parse_all header, right? + assert self.ready(), 'Run is not completed.' + + celery_task_result = self._celery_task_result + while celery_task_result is not None: + celery_task_result.forget() + celery_task_result = celery_task_result.parent + + def ready(self) -> bool: + """ Returns: True if the task has been executed. """ + assert self.is_started, 'Run is not yet started.' + + return self._celery_task_result.ready() + + def get(self, *args, **kwargs) -> 'UploadProc': + """ + Blocks until the processing has finished. Forwards args, kwargs to + *celery.result.get* for timeouts, etc. + + Returns: An upadted instance of itself with all the results. + """ + assert self.is_started, 'Run is not yet started.' + + self._celery_task_result.get(*args, **kwargs) + self.update_from_backend() + return self diff --git a/nomad/processing/tasks.py b/nomad/processing/tasks.py new file mode 100644 index 0000000000000000000000000000000000000000..5298f0ecf347afda407f39bad3951d43fa6b4d5e --- /dev/null +++ b/nomad/processing/tasks.py @@ -0,0 +1,196 @@ +# Copyright 2018 Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List +from celery import Task, chord, group +from celery.canvas import Signature +from datetime import datetime + +from nomad import files, utils, search +from nomad.parsing import parsers, parser_dict +from nomad.normalizing import normalizers +import nomad.patch # pylint: disable=unused-import + +from nomad.processing.app import app +from nomad.processing.state import UploadProc, CalcProc + +import json + + +@app.task(bind=True, name='extracting') +def extracting_task(task: Task, proc: UploadProc) -> UploadProc: + logger = utils.get_logger(__name__, task=task.name, upload_id=proc.upload_id) + if not proc.continue_with(task.name): + return proc + + try: + upload = files.Upload(proc.upload_id) + upload.open() + except KeyError as e: + logger.debug('Process request for non existing upload') + proc.fail(e) + return proc + except files.UploadError as e: + logger.debug('Could not open upload, %s' % e) + proc.fail(e) + return proc + except Exception as e: + logger.error('Unknown exception %s', exc_info=e) + proc.fail(e) + return proc + + logger.debug('Upload opened') + + try: + proc.upload_hash = upload.hash() + except files.UploadError as e: + logger.error('Could not create upload hash', exc_info=e) + proc.fail(e) + return proc + + try: + # TODO: deal with multiple possible parser specs + for filename in upload.filelist: + for parser in parsers: + if parser.is_mainfile(upload, filename): + tmp_mainfile = upload.get_path(filename) + calc_processing = CalcProc( + proc.upload_hash, filename, parser.name, tmp_mainfile) + + proc.calc_procs.append(calc_processing) + except files.UploadError as e: + logger.warn('Could find parse specs in open upload', exc_info=e) + proc.fail(e) + return proc + + return proc + + +@app.task(bind=True, name='cleanup') +def cleanup_task(task, calc_procs: List[CalcProc], upload_proc: UploadProc) -> UploadProc: + logger = utils.get_logger(__name__, task=task.name, upload_id=upload_proc.upload_id) + if upload_proc.continue_with(task.name): + try: + upload = files.Upload(upload_proc.upload_id) + except KeyError as e: + logger.warn('Upload does not exist') + upload_proc.fail(e) + return upload_proc + + try: + upload.close() + except Exception as e: + logger.error('Could not close upload', exc_info=e) + upload_proc.fail(e) + return upload_proc + + logger.debug('Closed upload') + upload_proc.success() + + return upload_proc + + +def _report_progress(task, dct): + if not task.request.called_directly: + task.update_state(state='PROGRESS', meta=dct) + + +@app.task(bind=True, name='parse_all') +def parse_all_task(task: Task, upload_proc: UploadProc, cleanup: Signature) -> UploadProc: + if not upload_proc.continue_with(task.name): + chord([])(cleanup.clone(args=(upload_proc,))) + return upload_proc + + # prepare the group of parallel calc processings + parses = group(parse_task.s(calc_proc) for calc_proc in upload_proc.calc_procs) + + # save the calc processing task ids to the overall processing + i = 0 + for child in parses.freeze().children: + upload_proc.calc_procs[i].celery_task_id = child.task_id + i = i + 1 + + # initiate the chord that runs calc processings first, and close_upload afterwards + chord(parses)(cleanup.clone(args=(upload_proc,))) + + return upload_proc + + +@app.task(bind=True, name='parse') +def parse_task(self, proc: CalcProc) -> CalcProc: + assert proc.upload_hash is not None + + upload_hash, parser, mainfile = proc.upload_hash, proc.parser_name, proc.mainfile + logger = utils.get_logger(__name__, upload_hash=upload_hash, mainfile=mainfile) + + # parsing + proc.continue_with(parser) + logger.debug('Start parsing with %s' % parser) + try: + parser_backend = parser_dict[parser].run(proc.tmp_mainfile) + if parser_backend.status[0] != 'ParseSuccess': + proc.fail(parser_backend.status[1]) + return proc + except Exception as e: + logger.warn('Exception wile parsing', exc_info=e) + proc.fail(e) + return proc + _report_progress(self, proc) + + # normalization + for normalizer in normalizers: + normalizer_name = normalizer.__name__ + proc.continue_with(normalizer_name) + logger.debug('Start %s.' % normalizer) + try: + normalizer(parser_backend).normalize() + if parser_backend.status[0] != 'ParseSuccess': + proc.fail(parser_backend.status[1]) + return proc + except Exception as e: + logger.warn('Exception wile normalizing with %s' % normalizer, exc_info=e) + proc.fail(e) + return proc + _report_progress(self, proc) + + # update search + proc.continue_with('index') + try: + search.Calc.add_from_backend( + parser_backend, + upload_hash=upload_hash, + calc_hash=proc.calc_hash, + mainfile=mainfile, + upload_time=datetime.now()) + except Exception as e: + logger.error('Could not index', exc_info=e) + proc.fail(e) + return proc + _report_progress(self, proc) + + # calc data persistence + proc.continue_with('archiving') + archive_id = proc.archive_id + try: + with files.write_archive_json(archive_id) as out: + parser_backend.write_json(out, pretty=True) + except Exception as e: + logger.error('Could not write archive', exc_info=e) + proc.fail(e) + return proc + + logger.debug('Completed') + + proc.success() + return proc diff --git a/nomad/users.py b/nomad/users.py index 5c1561f5588bc2b0b779284c61255b9bc2a938a5..8e8a65706ea9ed4db60d56eb752f27436d41a0f0 100644 --- a/nomad/users.py +++ b/nomad/users.py @@ -44,15 +44,15 @@ class Upload(Document): and processing system. Attributes: - upload_hash: The UUID hash of the uploaded file. Used to id the extracted upload - in the repository files storage. + upload_id: The upload id. Generated by the database. in_staging: True if the upload is still in staging and can be edited by the uploader. is_private: True if the upload and its derivitaves are only visible to the uploader. - procssing: The serialized instance of :class:`nomad.processing.UploadProcessing`. + proc: The :class:`nomad.processing.UploadProc` that holds the processing state. + created_time: The timestamp this upload was created. upload_time: The timestamp when the system realised the upload. + proc_time: The timestamp when the processing realised finished by the system. """ - upload_hash = StringField() in_staging = BooleanField(default=True) is_private = BooleanField(default=False) @@ -61,14 +61,13 @@ class Upload(Document): create_time = DateTimeField() proc_time = DateTimeField() - proc_task = ListField(default=None) - proc_results = DictField(default=None) + proc = DictField() user = ReferenceField(User, required=True) meta = { 'indexes': [ - 'upload_hash', + 'proc.upload_hash', 'user' ] } diff --git a/nomad/utils.py b/nomad/utils.py index e13ce4f8da60217342884d738ee0488e8ecdc2ea..05b231f2b9e59c34099ac07fc738fb2a95484bf9 100644 --- a/nomad/utils.py +++ b/nomad/utils.py @@ -66,3 +66,25 @@ def get_logger(name, *args, **kwargs): log output. Allowing more structured logging. """ return DataLogger(logging.getLogger(name), *args, **kwargs) + + +class DataObject(dict): + """ + A simpe data class base that allows to create javascript style objects. + Is also json serializable, if you only but json serializable contents into it. + """ + def __getattr__(self, name): + try: + return self.__getitem__(name) + except KeyError: + raise AttributeError + + def __setattr__(self, name, val): + return self.__setitem__(name, val) + + def __delattr__(self, name): + assert name != 'data' + return self.__delitem__(name) + + def update(self, dct): + return super().update({key: value for key, value in dct.items() if value is not None}) diff --git a/tests/test_api.py b/tests/test_api.py index 0e48f5b4139c76558e5fa71926ec84bfaef3a11e..9a93e51462f7de3d524d32a806ddc96165944503 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -38,9 +38,9 @@ def assert_uploads(upload_json_str, count=0, **kwargs): def assert_upload(upload_json_str, id=None): data = json.loads(upload_json_str) - assert 'id' in data + assert 'upload_id' in data if id is not None: - assert id == data['id'] + assert id == data['upload_id'] assert 'create_time' in data assert 'presigned_url' in data @@ -68,7 +68,7 @@ def test_create_upload(client): rv = client.post('/uploads') assert rv.status_code == 200 - upload_id = assert_upload(rv.data)['id'] + upload_id = assert_upload(rv.data)['upload_id'] rv = client.get('/uploads/%s' % upload_id) assert rv.status_code == 200 @@ -80,7 +80,7 @@ def test_create_upload(client): @pytest.mark.parametrize("file", example_files) -@pytest.mark.timeout(10) +@pytest.mark.timeout(30) def test_upload_to_upload(client, file): rv = client.post('/uploads') assert rv.status_code == 200 @@ -88,7 +88,7 @@ def test_upload_to_upload(client, file): @files.upload_put_handler def handle_upload_put(received_upload_id: str): - assert upload['id'] == received_upload_id + assert upload['upload_id'] == received_upload_id raise StopIteration def handle_uploads(): @@ -104,14 +104,13 @@ def test_upload_to_upload(client, file): handle_uploads_thread.join() - assert_exists(config.files.uploads_bucket, upload['id']) + assert_exists(config.files.uploads_bucket, upload['upload_id']) @pytest.mark.parametrize("file", example_files) -@pytest.mark.timeout(10) +@pytest.mark.timeout(30) def test_processing(client, file, celery_session_worker): - handle_uploads_thread = Thread(target=lambda: processing.handle_uploads(quit=True)) - handle_uploads_thread.start() + handle_uploads_thread = processing.handle_uploads_thread(quit=True) rv = client.post('/uploads') assert rv.status_code == 200 @@ -127,21 +126,22 @@ def test_processing(client, file, celery_session_worker): while True: time.sleep(1) - rv = client.get('/uploads/%s' % upload['id']) + rv = client.get('/uploads/%s' % upload['upload_id']) assert rv.status_code == 200 upload = assert_upload(rv.data) assert 'upload_time' in upload - assert 'processing' in upload - if upload['processing']['status'] in ['SUCCESS', 'FAILURE']: - break + if 'proc' in upload: + assert 'status' in upload['proc'] + if upload['proc']['status'] in ['SUCCESS', 'FAILURE']: + break - proc = upload['processing'] + proc = upload['proc'] assert proc['status'] == 'SUCCESS' - assert 'calcs' in proc - assert proc['calcs'] is not None - assert proc['current_task'] == 'nomad.processing.close_upload' - assert_exists(config.files.uploads_bucket, upload['id']) + assert 'calc_procs' in proc + assert proc['calc_procs'] is not None + assert proc['current_task_name'] == 'cleanup' + assert_exists(config.files.uploads_bucket, upload['upload_id']) def test_get_archive(client, archive_id): diff --git a/tests/test_processing.py b/tests/test_processing.py index d32eed2be7b5e78e0abe41b894609924ffef6d61..2b8b87414d4f49fc1f61df2ed76454fc9c161d0b 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -24,7 +24,7 @@ import time import nomad.config as config import nomad.files as files -from nomad.processing import UploadProcessing +from nomad.processing import start_processing from tests.test_files import example_file, empty_file # import fixtures @@ -36,7 +36,7 @@ example_files = [empty_file, example_file] @pytest.fixture(scope='session') def celery_includes(): - return ['nomad.processing'] + return ['nomad.processing.tasks'] @pytest.fixture(scope='session') @@ -59,50 +59,43 @@ def uploaded_id(request, clear_files) -> Generator[str, None, None]: yield example_upload_id -@pytest.mark.timeout(10) +@pytest.mark.timeout(30) def test_processing(uploaded_id, celery_session_worker): - run = UploadProcessing(uploaded_id) - run.start() + upload_proc = start_processing(uploaded_id) - # test that the instance can be reinstantiated from a persistable representation - run = UploadProcessing.from_result_backend(uploaded_id, run.result_tuple) + upload_proc.update_from_backend() - assert run.status in ['PENDING', 'PROGRESS'] + assert upload_proc.status in ['PENDING', 'STARTED', 'PROGRESS'] - while not run.ready(): + while not upload_proc.ready(): time.sleep(1) - run.updated() - - assert run.ready() - assert run.task_name == 'nomad.processing.close_upload' - assert run.upload_hash is not None - assert run.cause is None - assert run.status == 'SUCCESS' - for calc_proc in run.calc_processings: - assert 'parser_name' in calc_proc - assert 'mainfile' in calc_proc - assert 'pipeline' in calc_proc - assert 'upload_hash' in calc_proc - assert 'calc_hash' in calc_proc - for stage in calc_proc['pipeline']: - assert 'task' in stage - assert 'status' in stage - assert stage['status'] in ['ParseSuccess', 'NormalizeSuccess', 'IndexSuccess', 'PersistenceSuccess'] - assert 'errors' in stage and len(stage['errors']) == 0 - - run.forget() + upload_proc.update_from_backend() + + assert upload_proc.ready() + assert upload_proc.current_task_name == 'cleanup' + assert upload_proc.upload_hash is not None + assert len(upload_proc.errors) == 0 + assert upload_proc.status == 'SUCCESS' + for calc_proc in upload_proc.calc_procs: + assert calc_proc.parser_name is not None + assert calc_proc.mainfile is not None + assert calc_proc.upload_hash is not None + assert calc_proc.calc_hash is not None + assert calc_proc.archive_id is not None + assert calc_proc.status == 'SUCCESS' + assert len(calc_proc.errors) == 0 + + upload_proc.forget() def test_process_non_existing(celery_session_worker): - run = UploadProcessing('__does_not_exist') - run.start() + upload_proc = start_processing('__does_not_exist') - run.get(timeout=10) + upload_proc.get(timeout=30) - assert run.ready() - run.forget() + assert upload_proc.ready() + upload_proc.forget() - assert run.task_name == 'nomad.processing.open_upload' - assert run.status == 'SUCCESS' - assert run.cause is not None - assert isinstance(run.cause, KeyError) + assert upload_proc.current_task_name == 'extracting' + assert upload_proc.status == 'FAILURE' + assert len(upload_proc.errors) > 0 diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..ac91e0bd81fdfaccd8d9d23fc0d5e27c023cfee9 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,34 @@ +import json +import pickle + +from nomad.utils import DataObject + + +class ExampleClass(DataObject): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.msg = 'Hello' + + +def test_data_objects(): + example = ExampleClass() + assert example['msg'] == 'Hello' + assert example.msg == 'Hello' + json.dumps(example) + pickled = pickle.dumps(example) + example = pickle.loads(pickled) + assert example['msg'] == 'Hello' + assert example.msg == 'Hello' + + +def test_data_object_update(): + example = ExampleClass() + example.update(ExampleClass(more='Hi')) + assert json.loads(json.dumps(example)).get('more', None) == 'Hi' + + example = ExampleClass() + example.update({'more': 'Hi'}) + assert json.loads(json.dumps(example)).get('more', None) == 'Hi' + + example.update({'more': None}) + assert example.more == 'Hi'