diff --git a/nomad/processing/state.py b/nomad/processing/state.py index 64e8d8f6c0ba328b84bfae86db9ce2b1ab26976f..198b7d5cb28c0ca2daf52b0b51c15d2af1ba5fed 100644 --- a/nomad/processing/state.py +++ b/nomad/processing/state.py @@ -15,6 +15,7 @@ from typing import List, Any, Union, cast from celery.result import AsyncResult, result_from_tuple import itertools +import time from nomad import utils from nomad.normalizing import normalizers @@ -120,7 +121,7 @@ class CalcProc(ProcPipeline): def update_from_backend(self) -> bool: """ Consults results backend and updates. Returns if object might have changed. """ - if self.status in ['FAILED', 'SUCCESS']: + if self.status in ['FAILURE', 'SUCCESS']: return False if self.celery_task_id is None: return False @@ -225,17 +226,29 @@ class UploadProc(ProcPipeline): """ assert self.is_started, 'Run is not yet started.' - if self.status in ['SUCCESS', 'FAILED']: + if self.status in ['SUCCESS', 'FAILURE']: return False if self.celery_task_ids is None: return False celery_task_result = self._celery_task_result + task_index = len(self.task_names) might_have_changed = False while celery_task_result is not None: + task_index -= 1 if celery_task_result.ready(): - self.update(celery_task_result.result) + result = celery_task_result.result + if isinstance(result, Exception): + self.fail(result) + self.current_task_name = self.task_names[task_index] + logger = utils.get_logger( + __name__, + upload_id=self.upload_id, + current_task_name=self.current_task_name) + logger.error('Celery task raised exception.', exc_info=result) + else: + self.update(result) might_have_changed = True break else: @@ -260,21 +273,28 @@ class UploadProc(ProcPipeline): def ready(self) -> bool: """ Returns: True if the task has been executed. """ - assert self.is_started, 'Run is not yet started.' - - return self._celery_task_result.ready() + self.update_from_backend() + return self.status in ['FAILURE', 'SUCCESS'] - def get(self, *args, **kwargs) -> 'UploadProc': + def get(self, interval=1, timeout=None) -> 'UploadProc': """ - Blocks until the processing has finished. Forwards args, kwargs to - *celery.result.get* for timeouts, etc. + Blocks until the processing has finished. It uses the given interval + to contineously consult the results backend. + + Arguments: + interval: a period to sleep between updates + timeout: a rough timeout to terminated, even unfinished Returns: An upadted instance of itself with all the results. """ - # TODO this is not a good idea, we wont catch failed parent processes and block - # forever assert self.is_started, 'Run is not yet started.' - self._celery_task_result.get(*args, **kwargs) + slept = 0 + while not self.ready() and (timeout is None or slept < timeout): + time.sleep(interval) + slept += interval + self.update_from_backend() + self.update_from_backend() + return self diff --git a/nomad/processing/tasks.py b/nomad/processing/tasks.py index 4673d40d66672c12f63dd4c5138eb0665193f8ef..35e1df36f71182ad9640b4367f8c5cf390aadeff 100644 --- a/nomad/processing/tasks.py +++ b/nomad/processing/tasks.py @@ -25,8 +25,6 @@ import nomad.patch # pylint: disable=unused-import from nomad.processing.app import app from nomad.processing.state import UploadProc, CalcProc -import json - @app.task(bind=True, name='extracting') def extracting_task(task: Task, proc: UploadProc) -> UploadProc: diff --git a/tests/test_processing.py b/tests/test_processing.py index 35a2e8001f0014a05a8ab60862b8cd03dc380bbd..15d2ab692294c969a7f4909559deec433d04d0f5 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -24,7 +24,7 @@ import time import nomad.config as config import nomad.files as files -from nomad.processing import start_processing +from nomad.processing import start_processing, ProcPipeline from tests.test_files import example_file, empty_file # import fixtures @@ -33,6 +33,21 @@ from tests.test_files import clear_files # pylint: disable=unused-import example_files = [empty_file, example_file] +# @pytest.fixture(autouse=True) +# def patch(monkeypatch): +# original = UploadProc.continue_with + +# def continue_with(self: UploadProc, task): +# if self.upload_id.startswith('__fail_in_'): +# fail_in_task = self.upload_id.replace('__fail_in_', '') +# if fail_in_task == task: +# raise Exception('fail for test') + +# return original(self, task) + +# monkeypatch.setattr('nomad.processing.state.UploadProc.continue_with', continue_with) + + @pytest.fixture(scope='session') def celery_includes(): return ['nomad.processing.tasks'] @@ -87,10 +102,11 @@ def test_processing(uploaded_id, celery_session_worker): upload_proc.forget() +@pytest.mark.timeout(30) def test_process_non_existing(celery_session_worker): upload_proc = start_processing('__does_not_exist') - upload_proc.get(timeout=30) + upload_proc.get() assert upload_proc.ready() upload_proc.forget() @@ -98,3 +114,34 @@ def test_process_non_existing(celery_session_worker): assert upload_proc.current_task_name == 'extracting' assert upload_proc.status == 'FAILURE' assert len(upload_proc.errors) > 0 + + +@pytest.mark.parametrize('task', ['extracting', 'parse_all', 'cleanup', 'parsers/vasp']) +def test_task_failure(monkeypatch, uploaded_id, celery_session_worker, task): + original_continue_with = ProcPipeline.continue_with + + def continue_with(self: ProcPipeline, current_task): + if task == current_task: + raise Exception('fail for test') + + return original_continue_with(self, current_task) + + monkeypatch.setattr('nomad.processing.state.ProcPipeline.continue_with', continue_with) + + upload_proc = start_processing(uploaded_id) + upload_proc.get() + + assert upload_proc.ready() + + if task != 'parsers/vasp': + assert upload_proc.status == 'FAILURE' + assert upload_proc.current_task_name == task + assert len(upload_proc.errors) > 0 + elif len(upload_proc.calc_procs) > 0: # ignore the empty example upload + assert upload_proc.status == 'FAILURE' + assert upload_proc.current_task_name == 'cleanup' + assert len(upload_proc.errors) > 0 + for calc_proc in upload_proc.calc_procs: + assert calc_proc.status == 'FAILURE' + assert calc_proc.current_task_name == 'parser/vasp' + assert len(calc_proc.errors) > 0