diff --git a/nomad/processing/tasks.py b/nomad/processing/tasks.py index 35e1df36f71182ad9640b4367f8c5cf390aadeff..e7bccaa22b24e857b2c5ed8d526f7f69ad8d448c 100644 --- a/nomad/processing/tasks.py +++ b/nomad/processing/tasks.py @@ -57,6 +57,10 @@ def extracting_task(task: Task, proc: UploadProc) -> UploadProc: proc.fail(e) return proc + if search.Calc.upload_exists(proc.upload_hash): + proc.fail('The same file was already uploaded and processed.') + return proc + try: # TODO: deal with multiple possible parser specs for filename in upload.filelist: diff --git a/nomad/search.py b/nomad/search.py index 0ddd9cb9a16a721d9e3b26d6b8af6ac701063122..7b0dd1d43ceb26d42799cbc8f112214ca672c19a 100644 --- a/nomad/search.py +++ b/nomad/search.py @@ -20,7 +20,7 @@ of search relevant properties. """ import elasticsearch.exceptions -from elasticsearch_dsl import Document, Date, Keyword, connections +from elasticsearch_dsl import Document, Date, Keyword, Search, connections import logging import sys @@ -71,6 +71,15 @@ class Calc(Document): def search(body): return client.search(index=config.elastic.calc_index, body=body) + @staticmethod + def upload_exists(upload_hash): + """ Returns true if there are already calcs from the given upload. """ + search = Search(using=client, index=config.elastic.calc_index) \ + .query('match', upload_hash=upload_hash) \ + .execute() + + return len(search) > 0 + @staticmethod def add_from_backend(backend: LocalBackend, **kwargs) -> 'Calc': """ diff --git a/tests/test_api.py b/tests/test_api.py index 939d9bfff897cd486fa3269cb2aaaf2d713d6a2b..081b0960875de1c3aeff2c65ac4a009d3122415f 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -24,7 +24,7 @@ from tests.test_files import clear_files, archive_id # noqa pylint: disable=unu from tests.test_normalizing import normalized_vasp_example # noqa pylint: disable=unused-import from tests.test_parsing import parsed_vasp_example # noqa pylint: disable=unused-import from tests.test_search import example_entry # noqa pylint: disable=unused-import -from tests.test_processing import celery_config, celery_includes # noqa pylint: disable=unused-import +from tests.test_processing import celery_config, celery_includes, mocksearch # noqa pylint: disable=unused-import @pytest.fixture(scope='function') diff --git a/tests/test_processing.py b/tests/test_processing.py index 15d2ab692294c969a7f4909559deec433d04d0f5..7575806bedf78b89ab7b21ab46246a0b292e5bb7 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -22,8 +22,7 @@ from typing import Generator import pytest import time -import nomad.config as config -import nomad.files as files +from nomad import config, files, search from nomad.processing import start_processing, ProcPipeline from tests.test_files import example_file, empty_file @@ -33,19 +32,20 @@ from tests.test_files import clear_files # pylint: disable=unused-import example_files = [empty_file, example_file] -# @pytest.fixture(autouse=True) -# def patch(monkeypatch): -# original = UploadProc.continue_with +@pytest.fixture(scope='function', autouse=True) +def mocksearch(monkeypatch): + uploads = [] -# def continue_with(self: UploadProc, task): -# if self.upload_id.startswith('__fail_in_'): -# fail_in_task = self.upload_id.replace('__fail_in_', '') -# if fail_in_task == task: -# raise Exception('fail for test') + def add_from_backend(_, **kwargs): + upload_hash = kwargs.get('upload_hash', None) + uploads.append(upload_hash) + return {} -# return original(self, task) + def upload_exists(upload_hash): + return upload_hash in uploads -# monkeypatch.setattr('nomad.processing.state.UploadProc.continue_with', continue_with) + monkeypatch.setattr('nomad.search.Calc.add_from_backend', add_from_backend) + monkeypatch.setattr('nomad.search.Calc.upload_exists', upload_exists) @pytest.fixture(scope='session') @@ -102,6 +102,21 @@ def test_processing(uploaded_id, celery_session_worker): upload_proc.forget() +@pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True) +def test_processing_doublets(uploaded_id, celery_session_worker): + upload_proc = start_processing(uploaded_id) + upload_proc.get() + assert upload_proc.status == 'SUCCESS' + + assert search.Calc.upload_exists(upload_proc.upload_hash) + + upload_proc = start_processing(uploaded_id) + upload_proc.get() + assert upload_proc.status == 'FAILURE' + assert len(upload_proc.errors) > 0 + assert 'already' in upload_proc.errors[0] + + @pytest.mark.timeout(30) def test_process_non_existing(celery_session_worker): upload_proc = start_processing('__does_not_exist')