Commit 809a153b authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Fixed bad processing state after re-pack from cli. #548

parent 69b78d65
......@@ -86,7 +86,7 @@ def __run_parallel(
def __run_processing(
uploads, parallel: int, process, label: str, reprocess_running: bool = False,
wait_for_tasks: bool = True):
wait_for_tasks: bool = True, reset_first: bool = False):
def run_process(upload, logger):
logger.info(
......@@ -99,19 +99,26 @@ def __run_processing(
current_process=upload.current_process,
current_task=upload.current_task, upload_id=upload.upload_id)
return False
else:
if reset_first:
upload.reset(force=True)
process(upload)
if wait_for_tasks:
upload.block_until_complete(interval=.5)
else:
upload.block_until_process_complete(interval=.5)
elif upload.process_running:
tasks_status = upload.tasks_status
if tasks_status == proc.RUNNING:
tasks_status = proc.FAILURE
upload.reset(force=True, tasks_status=tasks_status)
process(upload)
if wait_for_tasks:
upload.block_until_complete(interval=.5)
else:
upload.block_until_process_complete(interval=.5)
if upload.tasks_status == proc.FAILURE:
logger.info('%s with failure' % label, upload_id=upload.upload_id)
if upload.tasks_status == proc.FAILURE:
logger.info('%s with failure' % label, upload_id=upload.upload_id)
logger.info('%s complete' % label, upload_id=upload.upload_id)
return True
logger.info('%s complete' % label, upload_id=upload.upload_id)
return True
__run_parallel(uploads, parallel=parallel, callable=run_process, label=label)
......
......@@ -374,7 +374,7 @@ def re_process(ctx, uploads, parallel: int, reprocess_running: bool):
_, uploads = query_uploads(ctx, uploads)
__run_processing(
uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing',
reprocess_running=reprocess_running)
reprocess_running=reprocess_running, reset_first=True)
@uploads.command(help='Repack selected uploads.')
......
......@@ -212,13 +212,16 @@ class Proc(Document, metaclass=ProcMetaclass):
return self
def reset(self, worker_hostname: str = None, force: bool = False):
def reset(
self, worker_hostname: str = None, force: bool = False,
tasks_status: str = PENDING):
''' Resets the task chain. Assumes there no current running process. '''
assert not self.process_running or force
self.current_task = None
self.process_status = None
self.tasks_status = PENDING
self.tasks_status = tasks_status
self.errors = []
self.warnings = []
self.worker_hostname = worker_hostname
......
......@@ -519,6 +519,9 @@ def test_re_pack(published: Upload, monkeypatch, with_failure):
with upload_files.read_archive(calc.calc_id) as archive:
archive[calc.calc_id].to_dict()
published.reload()
assert published.tasks_status == SUCCESS
def mock_failure(cls, task, monkeypatch):
def mock(self):
......
......@@ -27,6 +27,7 @@ from nomad import search, processing as proc, files
from nomad.cli import cli
from nomad.cli.cli import POPO
from nomad.processing import Upload, Calc
from nomad.processing.base import SUCCESS
from tests.app.flask.test_app import BlueprintClient
from tests.app.flask.conftest import ( # pylint: disable=unused-import
......@@ -253,6 +254,9 @@ class TestAdminUploads:
with upload_files.read_archive(calc.calc_id) as archive:
assert calc.calc_id in archive
published.reload()
assert published.tasks_status == SUCCESS
def test_chown(self, published, test_user, other_test_user):
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment