From 6bf356e34d83d6cfd724a0e8df1acda3a5458482 Mon Sep 17 00:00:00 2001 From: Markus Scheidgen <markus.scheidgen@gmail.com> Date: Tue, 28 Aug 2018 15:15:11 +0200 Subject: [PATCH] Added failure for non empty upload with existing hash. --- nomad/processing/tasks.py | 4 ++++ nomad/search.py | 11 ++++++++++- tests/test_api.py | 2 +- tests/test_processing.py | 39 +++++++++++++++++++++++++++------------ 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/nomad/processing/tasks.py b/nomad/processing/tasks.py index 35e1df36f7..e7bccaa22b 100644 --- a/nomad/processing/tasks.py +++ b/nomad/processing/tasks.py @@ -57,6 +57,10 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc: proc.fail(e) return proc + if search.Calc.upload_exists(proc.upload_hash): + proc.fail('The same file was already uploaded and processed.') + return proc + try: # TODO: deal with multiple possible parser specs for filename in upload.filelist: diff --git a/nomad/search.py b/nomad/search.py index 0ddd9cb9a1..7b0dd1d43c 100644 --- a/nomad/search.py +++ b/nomad/search.py @@ -20,7 +20,7 @@ of search relevant properties. """ import elasticsearch.exceptions -from elasticsearch_dsl import Document, Date, Keyword, connections +from elasticsearch_dsl import Document, Date, Keyword, Search, connections import logging import sys @@ -71,6 +71,15 @@ class Calc(Document): def search(body): return client.search(index=config.elastic.calc_index, body=body) + @staticmethod + def upload_exists(upload_hash): + """ Returns true if there are already calcs from the given upload. """ + search = Search(using=client, index=config.elastic.calc_index) \ + .query('match', upload_hash=upload_hash) \ + .execute() + + return len(search) > 0 + @staticmethod def add_from_backend(backend: LocalBackend, **kwargs) -> 'Calc': """ diff --git a/tests/test_api.py b/tests/test_api.py index 939d9bfff8..081b096087 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -24,7 +24,7 @@ from tests.test_files import clear_files, archive_id # noqa pylint: disable=unu from tests.test_normalizing import normalized_vasp_example # noqa pylint: disable=unused-import from tests.test_parsing import parsed_vasp_example # noqa pylint: disable=unused-import from tests.test_search import example_entry # noqa pylint: disable=unused-import -from tests.test_processing import celery_config, celery_includes # noqa pylint: disable=unused-import +from tests.test_processing import celery_config, celery_includes, mocksearch # noqa pylint: disable=unused-import @pytest.fixture(scope='function') diff --git a/tests/test_processing.py b/tests/test_processing.py index 15d2ab6922..7575806bed 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -22,8 +22,7 @@ from typing import Generator import pytest import time -import nomad.config as config -import nomad.files as files +from nomad import config, files, search from nomad.processing import start_processing, ProcPipeline from tests.test_files import example_file, empty_file @@ -33,19 +32,20 @@ from tests.test_files import clear_files # pylint: disable=unused-import example_files = [empty_file, example_file] -# @pytest.fixture(autouse=True) -# def patch(monkeypatch): -# original = UploadProc.continue_with +@pytest.fixture(scope='function', autouse=True) +def mocksearch(monkeypatch): + uploads = [] -# def continue_with(self: UploadProc, task): -# if self.upload_id.startswith('__fail_in_'): -# fail_in_task = self.upload_id.replace('__fail_in_', '') -# if fail_in_task == task: -# raise Exception('fail for test') + def add_from_backend(_, **kwargs): + upload_hash = kwargs.get('upload_hash', None) + uploads.append(upload_hash) + return {} -# return original(self, task) + def upload_exists(upload_hash): + return upload_hash in uploads -# monkeypatch.setattr('nomad.processing.state.UploadProc.continue_with', continue_with) + monkeypatch.setattr('nomad.search.Calc.add_from_backend', add_from_backend) + monkeypatch.setattr('nomad.search.Calc.upload_exists', upload_exists) @pytest.fixture(scope='session') @@ -102,6 +102,21 @@ def test_processing(uploaded_id, celery_session_worker): upload_proc.forget() +@pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True) +def test_processing_doublets(uploaded_id, celery_session_worker): + upload_proc = start_processing(uploaded_id) + upload_proc.get() + assert upload_proc.status == 'SUCCESS' + + assert search.Calc.upload_exists(upload_proc.upload_hash) + + upload_proc = start_processing(uploaded_id) + upload_proc.get() + assert upload_proc.status == 'FAILURE' + assert len(upload_proc.errors) > 0 + assert 'already' in upload_proc.errors[0] + + @pytest.mark.timeout(30) def test_process_non_existing(celery_session_worker): upload_proc = start_processing('__does_not_exist') -- GitLab