diff --git a/.vscode/launch.json b/.vscode/launch.json index f77216f8aa9ae392b0dbf001e9b236afba7a7830..f3f852c5026c09f65afbcc4813bf44d760fce28b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -58,13 +58,13 @@ ] }, { - "name": "Python: tests/test_parsing.py fhi", + "name": "Python: tests/processing/test_base.py", "type": "python", "request": "launch", "cwd": "${workspaceFolder}", "program": "${workspaceFolder}/.pyenv/bin/pytest", "args": [ - "-sv", "tests/test_parsing.py::test_parser[parsers/fhi-aims-.dependencies/parsers/fhi-aims/test/examples/Au2_non-periodic_geometry_optimization.out]" + "-sv", "tests/processing/test_base.py" ] }, { diff --git a/nomad/processing/base.py b/nomad/processing/base.py index c9ba8e30582c5a15d4ae79496ce6934c4cc994a6..c896ba7b2a4ebe433e2cfc3fcb7479ac2938c619 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -281,7 +281,7 @@ class Proc(Document, metaclass=ProcMetaclass): else: return updated_raw[field], [updated_raw[field] for field in other_fields] - def block_until_complete(self, interval=0.1): + def block_until_complete(self, interval=0.01): """ Reloads the process constrantly until it sees a completed process. Should be used with care as it can block indefinetly. Just intended for testing purposes. diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 9c17c0cb3acd52785fffb4fa4f69fb2524e881e7..13e7b35ffc10424b5e56424464073b9b4d99ac68 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -163,17 +163,19 @@ class Calc(Proc): @task def archiving(self): upload_hash, calc_hash = self.archive_id.split('/') - # persist to elastic search - RepoCalc.create_from_backend( - self._parser_backend, - upload_hash=upload_hash, - calc_hash=calc_hash, - upload_id=self.upload_id, + additional = dict( mainfile=self.mainfile, upload_time=self._upload.upload_time, staging=True, restricted=False, user_id=self._upload.user_id) + # persist to elastic search + RepoCalc.create_from_backend( + self._parser_backend, + additional=additional, + upload_hash=upload_hash, + calc_hash=calc_hash, + upload_id=self.upload_id) # persist the archive with files.write_archive_json(self.archive_id) as out: diff --git a/nomad/repo.py b/nomad/repo.py index 5a913366eae499cb9e638f5230da95175479b028..e07c85facd1bd20875f8071cf5bbd6b7ee45bfc7 100644 --- a/nomad/repo.py +++ b/nomad/repo.py @@ -23,6 +23,7 @@ is an elasticsearch_dsl document that is used to represent repository index entr :members: """ +from typing import Dict, Any import sys from elasticsearch.exceptions import ConflictError, RequestError, ConnectionTimeout from elasticsearch_dsl import Document as ElasticDocument, Search, Date, Keyword, Boolean, \ @@ -88,7 +89,8 @@ class RepoCalc(ElasticDocument): @classmethod def create_from_backend( - cls, backend: LocalBackend, upload_id: str, upload_hash: str, calc_hash: str, + cls, backend: LocalBackend, additional: Dict[str, Any], + upload_id: str, upload_hash: str, calc_hash: str, **kwargs) -> 'RepoCalc': """ Create a new calculation instance in elastic search. The data from the given backend @@ -97,11 +99,12 @@ class RepoCalc(ElasticDocument): Arguments: backend: The parsing/normalizing backend that contains the calculation data. + additional: Additional arguments not stored in the backend. E.g. ``user_id``, + ``staging``, ``restricted`` upload_hash: The upload hash of the originating upload. upload_id: The upload id of the originating upload. calc_hash: The upload unique hash for this calculation. - kwargs: Additional arguments not stored in the backend. E.g. ``user_id``, - ``staging``, ``restricted`` + kwargs: Arguments are passed to elasticsearch index operation. Raises: AlreadyExists: If the calculation already exists in elastic search. We use @@ -109,15 +112,15 @@ class RepoCalc(ElasticDocument): ``archive_id``. """ assert upload_hash is not None and calc_hash is not None and upload_id is not None - kwargs.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id)) + additional.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id)) # prepare the entry with all necessary properties from the backend calc = cls(meta=dict(id='%s/%s' % (upload_hash, calc_hash))) for property in cls._doc_type.mapping: property = key_mappings.get(property, property) - if property in kwargs: - value = kwargs[property] + if property in additional: + value = additional[property] else: try: value = backend.get_value(property, 0) @@ -140,7 +143,7 @@ class RepoCalc(ElasticDocument): e_after_retries = None for _ in range(0, 2): try: - calc.save(op_type='create') + calc.save(op_type='create', **kwargs) e_after_retries = None break except ConnectionTimeout as e: diff --git a/nomad/utils.py b/nomad/utils.py index 6489a14ae1f3a06b992ed1614892ecb84172fa6a..4a2461745daf354a01732c919e49a2479a4e4e9b 100644 --- a/nomad/utils.py +++ b/nomad/utils.py @@ -107,24 +107,20 @@ if not _logging_is_configured: def logger_factory(*args): logger = default_factory(*args) - if 'pytest' in sys.modules: - logger.setLevel(logging.WARNING) - else: - logger.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) return logger structlog.configure(processors=log_processors, logger_factory=logger_factory) # configure logging in general - logging.basicConfig(level=logging.WARNING) + logging.basicConfig(level=logging.WARNING, stream=sys.stdout) + root = logging.getLogger() + root.setLevel(config.logstash.level) + for handler in root.handlers: + handler.setLevel(logging.WARNING if 'pytest' not in sys.modules else logging.CRITICAL) # configure logstash - if config.logstash.enabled: - root = logging.getLogger() - for handler in root.handlers: - handler.setLevel(logging.WARNING) - root.setLevel(config.logstash.level) - + if config.logstash.enabled and 'pytest' not in sys.modules: add_logstash_handler(root) root.info('Structlog configured for logstash') diff --git a/tests/conftest.py b/tests/conftest.py index 04fefe18574e7c091fcaa313442b00a4adbe3f91..f3e423c2ad18e2501fb0279f3d84eb5199cbc2d1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,6 @@ import pytest from mongoengine import connect from mongoengine.connection import disconnect -import time from nomad import config @@ -19,15 +18,42 @@ def celery_config(): @pytest.fixture(scope='function') -def worker(celery_session_worker): +def purged_queue(celery_app): """ - Extension of the buildin celery_session_worker fixture that adds sleep to consume - bleeding tasks. - Processes might be completed (and therefore the test it self) before child - processes are finished. Therefore open task request might bleed into the next test. + Purges all pending tasks of the celery app before test. This is necessary to + remove tasks from the queue that might be 'left over' from prior tests. """ + celery_app.control.purge() + yield + + +@pytest.fixture(scope='function') +def patched_celery(monkeypatch): + # There is a bug in celery, which prevents to use the celery_worker for multiple + # tests: https://github.com/celery/celery/issues/4088 + # The bug has a fix from Aug 2018, but it is not yet released (TODO). + # We monkeypatch a similar solution here. + def add_reader(self, fds, callback, *args): + from kombu.utils.eventio import ERR, READ, WRITE, poll + + if self.poller is None: + self.poller = poll() + + return self.add(fds, callback, READ | ERR, args) + + monkeypatch.setattr('kombu.asynchronous.hub.Hub.add_reader', add_reader) + yield + + +@pytest.fixture(scope='function') +def worker(patched_celery, purged_queue, celery_worker): + """ + Extension of the celery_worker fixture that ensures a clean task queue before yielding. + """ + # This wont work with the session_worker, it will already have old/unexecuted tasks + # taken from the queue and might resubmit them. Therefore, purging the queue won't + # help much. yield - time.sleep(0.5) @pytest.fixture(scope='function', autouse=True) diff --git a/tests/processing/test_base.py b/tests/processing/test_base.py index a015ed6fd0c26e8399b90394cccfdaf831e40e8f..ab8a11d48ef4f043ff8ac72106f5da53922e79e4 100644 --- a/tests/processing/test_base.py +++ b/tests/processing/test_base.py @@ -2,6 +2,8 @@ import pytest from mongoengine import connect, IntField, ReferenceField from mongoengine.connection import disconnect import time +import logging +import json from nomad import config from nomad.processing.base import Proc, process, task, SUCCESS, FAILURE, RUNNING, PENDING @@ -52,11 +54,18 @@ class FailTasks(Proc): self.fail('fail fail fail') -def test_fail(): +def test_fail(caplog): + caplog.set_level(logging.CRITICAL, logger='nomad.processing.base') p = FailTasks.create() p.will_fail() assert_proc(p, 'will_fail', FAILURE, errors=1) + has_log = False + for record in caplog.records: + if record.levelname == 'ERROR': + has_log = True + assert json.loads(record.msg)['event'] == 'task failed' + assert has_log class SimpleProc(Proc): @@ -74,7 +83,7 @@ class SimpleProc(Proc): pass -def test_simple_process(celery_session_worker): +def test_simple_process(worker): p = SimpleProc.create() p.process() p.block_until_complete() @@ -88,7 +97,8 @@ class TaskInProc(Proc): pass -def test_task_as_proc(celery_session_worker): +# @pytest.mark.timeout(5) +def test_task_as_proc(worker): p = TaskInProc.create() p.process() p.block_until_complete() @@ -122,9 +132,12 @@ class ChildProc(Proc): self.parent.on_child_complete() -def test_counter(worker): +# @pytest.mark.timeout(5) +def test_counter(worker, caplog): p = ParentProc.create() p.spawn_children() p.block_until_complete() assert_proc(p, 'after_children') + for record in caplog.records: + assert record.levelname not in ['WARNING', 'ERROR', 'CRITICAL'] diff --git a/tests/test_api.py b/tests/test_api.py index 6a712f82de22b1ef467f5c64a4ecc0c5aee6697c..e38809b9ad17bf5ef84111206058bfc8a97c8d1e 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -163,7 +163,7 @@ def test_upload_to_upload(client, file, test_user_auth): handle_uploads_thread = Thread(target=handle_uploads) handle_uploads_thread.start() - time.sleep(1) + time.sleep(0.1) upload_url = upload['presigned_url'] cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file) subprocess.call(shlex.split(cmd)) @@ -183,7 +183,7 @@ def test_processing(client, file, worker, mocksearch, test_user_auth): assert rv.status_code == 200 upload = assert_upload(rv.data) - time.sleep(1) + time.sleep(0.1) upload_url = upload['presigned_url'] cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file) subprocess.call(shlex.split(cmd)) diff --git a/tests/test_files.py b/tests/test_files.py index 46324a4ab3b217bf5e4ac95833e9da87fc052045..6ee8f01cacc0871cd5b15f8538e2e81bdf13c9c2 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -148,7 +148,7 @@ def test_upload_notification(upload_id): handle_uploads_thread = Thread(target=handle_uploads) handle_uploads_thread.start() - time.sleep(1) + time.sleep(0.1) test_presigned_url(upload_id) handle_uploads_thread.join() diff --git a/tests/test_normalizing.py b/tests/test_normalizing.py index 79faeca9b3b892605b32141f52b58265348d8b9c..7f940884cf2fa4c26653eb9ccb1e65ea5000d10a 100644 --- a/tests/test_normalizing.py +++ b/tests/test_normalizing.py @@ -18,6 +18,7 @@ from nomad.parsing import LocalBackend from nomad.normalizing import normalizers from tests.test_parsing import parsed_vasp_example # pylint: disable=unused-import +from tests.test_parsing import parsed_template_example # pylint: disable=unused-import from tests.test_parsing import parsed_example # pylint: disable=unused-import @@ -43,6 +44,11 @@ def normalized_example(parsed_example: LocalBackend) -> LocalBackend: return run_normalize(parsed_example) +@pytest.fixture +def normalized_template_example(parsed_template_example) -> LocalBackend: + return run_normalize(parsed_template_example) + + def assert_normalized(backend): assert backend.get_value('atom_species', 0) is not None assert backend.get_value('system_type', 0) is not None diff --git a/tests/test_parsing.py b/tests/test_parsing.py index e68be92990b08d49231c37614f5d1e124d62a082..9bb62b746f2e44a26ff44466221aaa8531a63a79 100644 --- a/tests/test_parsing.py +++ b/tests/test_parsing.py @@ -203,6 +203,12 @@ def parsed_vasp_example() -> LocalBackend: 'parsers/vasp', '.dependencies/parsers/vasp/test/examples/xml/perovskite.xml') +@pytest.fixture +def parsed_template_example() -> LocalBackend: + return run_parser( + 'parsers/template', 'tests/data/parsers/template.json') + + @pytest.fixture(params=parser_examples, ids=lambda spec: '%s-%s' % spec) def parsed_example(request) -> LocalBackend: parser_name, mainfile = request.param diff --git a/tests/test_repo.py b/tests/test_repo.py index 1f70f71b5774a57e0f3d3855b893c0a14945077c..246fca6572b3fd29a920ff8222af64afeddcd42c 100644 --- a/tests/test_repo.py +++ b/tests/test_repo.py @@ -23,13 +23,13 @@ from nomad import config from nomad.parsing import LocalBackend from nomad.repo import AlreadyExists, RepoCalc, key_mappings -from tests.test_normalizing import normalized_vasp_example # pylint: disable=unused-import -from tests.test_parsing import parsed_vasp_example # pylint: disable=unused-import +from tests.test_normalizing import normalized_template_example # pylint: disable=unused-import +from tests.test_parsing import parsed_template_example # pylint: disable=unused-import from tests.test_files import assert_not_exists @pytest.fixture(scope='function') -def example_elastic_calc(normalized_vasp_example: LocalBackend, caplog) \ +def example_elastic_calc(normalized_template_example: LocalBackend, caplog) \ -> Generator[RepoCalc, None, None]: try: caplog.set_level(logging.ERROR) @@ -40,14 +40,15 @@ def example_elastic_calc(normalized_vasp_example: LocalBackend, caplog) \ caplog.set_level(logging.WARNING) entry = RepoCalc.create_from_backend( - normalized_vasp_example, + normalized_template_example, upload_hash='test_upload_hash', calc_hash='test_calc_hash', upload_id='test_upload_id', - mainfile='/test/mainfile', - upload_time=datetime.now(), - staging=True, restricted=False, user_id='me') - time.sleep(1) # eventually consistent? + additional=dict( + mainfile='/test/mainfile', + upload_time=datetime.now(), + staging=True, restricted=False, user_id='me'), + refresh='true') yield entry @@ -77,17 +78,19 @@ def test_create_elasitc_calc(example_elastic_calc: RepoCalc): def test_create_existing_elastic_calc( - example_elastic_calc: RepoCalc, normalized_vasp_example, caplog): + example_elastic_calc: RepoCalc, normalized_template_example, caplog): try: caplog.set_level(logging.ERROR) RepoCalc.create_from_backend( - normalized_vasp_example, + normalized_template_example, upload_hash='test_upload_hash', calc_hash='test_calc_hash', upload_id='test_upload_id', - mainfile='/test/mainfile', - upload_time=datetime.now(), - staging=True, restricted=False, user_id='me') + additional=dict( + mainfile='/test/mainfile', + upload_time=datetime.now(), + staging=True, restricted=False, user_id='me'), + refresh='true') assert False except AlreadyExists: caplog.set_level(logging.WARNING)