Commit dd4919e7 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Better fixtures without abitrary 'sleep' fixes.

parent 78247f73
......@@ -58,13 +58,13 @@
]
},
{
"name": "Python: tests/test_parsing.py fhi",
"name": "Python: tests/processing/test_base.py",
"type": "python",
"request": "launch",
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_parsing.py::test_parser[parsers/fhi-aims-.dependencies/parsers/fhi-aims/test/examples/Au2_non-periodic_geometry_optimization.out]"
"-sv", "tests/processing/test_base.py"
]
},
{
......
......@@ -281,7 +281,7 @@ class Proc(Document, metaclass=ProcMetaclass):
else:
return updated_raw[field], [updated_raw[field] for field in other_fields]
def block_until_complete(self, interval=0.1):
def block_until_complete(self, interval=0.01):
"""
Reloads the process constrantly until it sees a completed process. Should be
used with care as it can block indefinetly. Just intended for testing purposes.
......
......@@ -163,17 +163,19 @@ class Calc(Proc):
@task
def archiving(self):
upload_hash, calc_hash = self.archive_id.split('/')
# persist to elastic search
RepoCalc.create_from_backend(
self._parser_backend,
upload_hash=upload_hash,
calc_hash=calc_hash,
upload_id=self.upload_id,
additional = dict(
mainfile=self.mainfile,
upload_time=self._upload.upload_time,
staging=True,
restricted=False,
user_id=self._upload.user_id)
# persist to elastic search
RepoCalc.create_from_backend(
self._parser_backend,
additional=additional,
upload_hash=upload_hash,
calc_hash=calc_hash,
upload_id=self.upload_id)
# persist the archive
with files.write_archive_json(self.archive_id) as out:
......
......@@ -23,6 +23,7 @@ is an elasticsearch_dsl document that is used to represent repository index entr
:members:
"""
from typing import Dict, Any
import sys
from elasticsearch.exceptions import ConflictError, RequestError, ConnectionTimeout
from elasticsearch_dsl import Document as ElasticDocument, Search, Date, Keyword, Boolean, \
......@@ -88,7 +89,8 @@ class RepoCalc(ElasticDocument):
@classmethod
def create_from_backend(
cls, backend: LocalBackend, upload_id: str, upload_hash: str, calc_hash: str,
cls, backend: LocalBackend, additional: Dict[str, Any],
upload_id: str, upload_hash: str, calc_hash: str,
**kwargs) -> 'RepoCalc':
"""
Create a new calculation instance in elastic search. The data from the given backend
......@@ -97,11 +99,12 @@ class RepoCalc(ElasticDocument):
Arguments:
backend: The parsing/normalizing backend that contains the calculation data.
additional: Additional arguments not stored in the backend. E.g. ``user_id``,
``staging``, ``restricted``
upload_hash: The upload hash of the originating upload.
upload_id: The upload id of the originating upload.
calc_hash: The upload unique hash for this calculation.
kwargs: Additional arguments not stored in the backend. E.g. ``user_id``,
``staging``, ``restricted``
kwargs: Arguments are passed to elasticsearch index operation.
Raises:
AlreadyExists: If the calculation already exists in elastic search. We use
......@@ -109,15 +112,15 @@ class RepoCalc(ElasticDocument):
``archive_id``.
"""
assert upload_hash is not None and calc_hash is not None and upload_id is not None
kwargs.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id))
additional.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id))
# prepare the entry with all necessary properties from the backend
calc = cls(meta=dict(id='%s/%s' % (upload_hash, calc_hash)))
for property in cls._doc_type.mapping:
property = key_mappings.get(property, property)
if property in kwargs:
value = kwargs[property]
if property in additional:
value = additional[property]
else:
try:
value = backend.get_value(property, 0)
......@@ -140,7 +143,7 @@ class RepoCalc(ElasticDocument):
e_after_retries = None
for _ in range(0, 2):
try:
calc.save(op_type='create')
calc.save(op_type='create', **kwargs)
e_after_retries = None
break
except ConnectionTimeout as e:
......
......@@ -107,24 +107,20 @@ if not _logging_is_configured:
def logger_factory(*args):
logger = default_factory(*args)
if 'pytest' in sys.modules:
logger.setLevel(logging.WARNING)
else:
logger.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
return logger
structlog.configure(processors=log_processors, logger_factory=logger_factory)
# configure logging in general
logging.basicConfig(level=logging.WARNING)
logging.basicConfig(level=logging.WARNING, stream=sys.stdout)
root = logging.getLogger()
root.setLevel(config.logstash.level)
for handler in root.handlers:
handler.setLevel(logging.WARNING if 'pytest' not in sys.modules else logging.CRITICAL)
# configure logstash
if config.logstash.enabled:
root = logging.getLogger()
for handler in root.handlers:
handler.setLevel(logging.WARNING)
root.setLevel(config.logstash.level)
if config.logstash.enabled and 'pytest' not in sys.modules:
add_logstash_handler(root)
root.info('Structlog configured for logstash')
......
import pytest
from mongoengine import connect
from mongoengine.connection import disconnect
import time
from nomad import config
......@@ -19,15 +18,42 @@ def celery_config():
@pytest.fixture(scope='function')
def worker(celery_session_worker):
def purged_queue(celery_app):
"""
Extension of the buildin celery_session_worker fixture that adds sleep to consume
bleeding tasks.
Processes might be completed (and therefore the test it self) before child
processes are finished. Therefore open task request might bleed into the next test.
Purges all pending tasks of the celery app before test. This is necessary to
remove tasks from the queue that might be 'left over' from prior tests.
"""
celery_app.control.purge()
yield
@pytest.fixture(scope='function')
def patched_celery(monkeypatch):
# There is a bug in celery, which prevents to use the celery_worker for multiple
# tests: https://github.com/celery/celery/issues/4088
# The bug has a fix from Aug 2018, but it is not yet released (TODO).
# We monkeypatch a similar solution here.
def add_reader(self, fds, callback, *args):
from kombu.utils.eventio import ERR, READ, WRITE, poll
if self.poller is None:
self.poller = poll()
return self.add(fds, callback, READ | ERR, args)
monkeypatch.setattr('kombu.asynchronous.hub.Hub.add_reader', add_reader)
yield
@pytest.fixture(scope='function')
def worker(patched_celery, purged_queue, celery_worker):
"""
Extension of the celery_worker fixture that ensures a clean task queue before yielding.
"""
# This wont work with the session_worker, it will already have old/unexecuted tasks
# taken from the queue and might resubmit them. Therefore, purging the queue won't
# help much.
yield
time.sleep(0.5)
@pytest.fixture(scope='function', autouse=True)
......
......@@ -2,6 +2,8 @@ import pytest
from mongoengine import connect, IntField, ReferenceField
from mongoengine.connection import disconnect
import time
import logging
import json
from nomad import config
from nomad.processing.base import Proc, process, task, SUCCESS, FAILURE, RUNNING, PENDING
......@@ -52,11 +54,18 @@ class FailTasks(Proc):
self.fail('fail fail fail')
def test_fail():
def test_fail(caplog):
caplog.set_level(logging.CRITICAL, logger='nomad.processing.base')
p = FailTasks.create()
p.will_fail()
assert_proc(p, 'will_fail', FAILURE, errors=1)
has_log = False
for record in caplog.records:
if record.levelname == 'ERROR':
has_log = True
assert json.loads(record.msg)['event'] == 'task failed'
assert has_log
class SimpleProc(Proc):
......@@ -74,7 +83,7 @@ class SimpleProc(Proc):
pass
def test_simple_process(celery_session_worker):
def test_simple_process(worker):
p = SimpleProc.create()
p.process()
p.block_until_complete()
......@@ -88,7 +97,8 @@ class TaskInProc(Proc):
pass
def test_task_as_proc(celery_session_worker):
# @pytest.mark.timeout(5)
def test_task_as_proc(worker):
p = TaskInProc.create()
p.process()
p.block_until_complete()
......@@ -122,9 +132,12 @@ class ChildProc(Proc):
self.parent.on_child_complete()
def test_counter(worker):
# @pytest.mark.timeout(5)
def test_counter(worker, caplog):
p = ParentProc.create()
p.spawn_children()
p.block_until_complete()
assert_proc(p, 'after_children')
for record in caplog.records:
assert record.levelname not in ['WARNING', 'ERROR', 'CRITICAL']
......@@ -163,7 +163,7 @@ def test_upload_to_upload(client, file, test_user_auth):
handle_uploads_thread = Thread(target=handle_uploads)
handle_uploads_thread.start()
time.sleep(1)
time.sleep(0.1)
upload_url = upload['presigned_url']
cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file)
subprocess.call(shlex.split(cmd))
......@@ -183,7 +183,7 @@ def test_processing(client, file, worker, mocksearch, test_user_auth):
assert rv.status_code == 200
upload = assert_upload(rv.data)
time.sleep(1)
time.sleep(0.1)
upload_url = upload['presigned_url']
cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file)
subprocess.call(shlex.split(cmd))
......
......@@ -148,7 +148,7 @@ def test_upload_notification(upload_id):
handle_uploads_thread = Thread(target=handle_uploads)
handle_uploads_thread.start()
time.sleep(1)
time.sleep(0.1)
test_presigned_url(upload_id)
handle_uploads_thread.join()
......
......@@ -18,6 +18,7 @@ from nomad.parsing import LocalBackend
from nomad.normalizing import normalizers
from tests.test_parsing import parsed_vasp_example # pylint: disable=unused-import
from tests.test_parsing import parsed_template_example # pylint: disable=unused-import
from tests.test_parsing import parsed_example # pylint: disable=unused-import
......@@ -43,6 +44,11 @@ def normalized_example(parsed_example: LocalBackend) -> LocalBackend:
return run_normalize(parsed_example)
@pytest.fixture
def normalized_template_example(parsed_template_example) -> LocalBackend:
return run_normalize(parsed_template_example)
def assert_normalized(backend):
assert backend.get_value('atom_species', 0) is not None
assert backend.get_value('system_type', 0) is not None
......
......@@ -203,6 +203,12 @@ def parsed_vasp_example() -> LocalBackend:
'parsers/vasp', '.dependencies/parsers/vasp/test/examples/xml/perovskite.xml')
@pytest.fixture
def parsed_template_example() -> LocalBackend:
return run_parser(
'parsers/template', 'tests/data/parsers/template.json')
@pytest.fixture(params=parser_examples, ids=lambda spec: '%s-%s' % spec)
def parsed_example(request) -> LocalBackend:
parser_name, mainfile = request.param
......
......@@ -23,13 +23,13 @@ from nomad import config
from nomad.parsing import LocalBackend
from nomad.repo import AlreadyExists, RepoCalc, key_mappings
from tests.test_normalizing import normalized_vasp_example # pylint: disable=unused-import
from tests.test_parsing import parsed_vasp_example # pylint: disable=unused-import
from tests.test_normalizing import normalized_template_example # pylint: disable=unused-import
from tests.test_parsing import parsed_template_example # pylint: disable=unused-import
from tests.test_files import assert_not_exists
@pytest.fixture(scope='function')
def example_elastic_calc(normalized_vasp_example: LocalBackend, caplog) \
def example_elastic_calc(normalized_template_example: LocalBackend, caplog) \
-> Generator[RepoCalc, None, None]:
try:
caplog.set_level(logging.ERROR)
......@@ -40,14 +40,15 @@ def example_elastic_calc(normalized_vasp_example: LocalBackend, caplog) \
caplog.set_level(logging.WARNING)
entry = RepoCalc.create_from_backend(
normalized_vasp_example,
normalized_template_example,
upload_hash='test_upload_hash',
calc_hash='test_calc_hash',
upload_id='test_upload_id',
mainfile='/test/mainfile',
upload_time=datetime.now(),
staging=True, restricted=False, user_id='me')
time.sleep(1) # eventually consistent?
additional=dict(
mainfile='/test/mainfile',
upload_time=datetime.now(),
staging=True, restricted=False, user_id='me'),
refresh='true')
yield entry
......@@ -77,17 +78,19 @@ def test_create_elasitc_calc(example_elastic_calc: RepoCalc):
def test_create_existing_elastic_calc(
example_elastic_calc: RepoCalc, normalized_vasp_example, caplog):
example_elastic_calc: RepoCalc, normalized_template_example, caplog):
try:
caplog.set_level(logging.ERROR)
RepoCalc.create_from_backend(
normalized_vasp_example,
normalized_template_example,
upload_hash='test_upload_hash',
calc_hash='test_calc_hash',
upload_id='test_upload_id',
mainfile='/test/mainfile',
upload_time=datetime.now(),
staging=True, restricted=False, user_id='me')
additional=dict(
mainfile='/test/mainfile',
upload_time=datetime.now(),
staging=True, restricted=False, user_id='me'),
refresh='true')
assert False
except AlreadyExists:
caplog.set_level(logging.WARNING)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment