Commit 9c5c3656 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Celery testworker now works. Fixed processing init upload handler.

parent 44ef3cd3
...@@ -59,7 +59,7 @@ rabbit_url = 'pyamqp://%s:%s@%s//' % (rabbit_user, rabbit_password, rabbit_host) ...@@ -59,7 +59,7 @@ rabbit_url = 'pyamqp://%s:%s@%s//' % (rabbit_user, rabbit_password, rabbit_host)
redis_url = 'redis://%s/0' % redis_host redis_url = 'redis://%s/0' % redis_host
celery = CeleryConfig( celery = CeleryConfig(
broker_url=redis_url, broker_url=rabbit_url,
backend_url=redis_url, backend_url=redis_url,
serializer='pickle' serializer='pickle'
) )
......
import logging
from nomad.processing import handle_uploads
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
handle_uploads()
...@@ -470,11 +470,9 @@ def parse(self, processing: CalcProcessing) -> CalcProcessing: ...@@ -470,11 +470,9 @@ def parse(self, processing: CalcProcessing) -> CalcProcessing:
return processing return processing
def start_upload_handler(quit=False): def handle_uploads(quit=False):
""" """
Starts a notification handler for uploads in a different thread. This handler Listens for new uploads in files and initiates their processing.
will initiate processing for all received upload events. The processing status
will be saved to the users db.
Arguments: Arguments:
quit: If true, will only handling one event and stop. Otherwise run forever. quit: If true, will only handling one event and stop. Otherwise run forever.
...@@ -489,7 +487,6 @@ def start_upload_handler(quit=False): ...@@ -489,7 +487,6 @@ def start_upload_handler(quit=False):
logger.error('Upload does not exist') logger.error('Upload does not exist')
raise Exception() raise Exception()
logger.error('%s' % upload.upload_time)
if upload.upload_time is not None: if upload.upload_time is not None:
logger.warn('Ignore upload notification, since file is already uploaded') logger.warn('Ignore upload notification, since file is already uploaded')
raise StopIteration raise StopIteration
...@@ -510,11 +507,5 @@ def start_upload_handler(quit=False): ...@@ -510,11 +507,5 @@ def start_upload_handler(quit=False):
raise StopIteration raise StopIteration
logger.debug('Initiated upload processing') logger.debug('Initiated upload processing')
logger = logging.getLogger(__name__)
logger.debug('Start upload put notification handler.') logger.debug('Start upload put notification handler.')
handle_upload_put(received_upload_id='provided by decorator') handle_upload_put(received_upload_id='provided by decorator')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
start_upload_handler()
...@@ -7,12 +7,13 @@ import json ...@@ -7,12 +7,13 @@ import json
from mongoengine import connect from mongoengine import connect
from mongoengine.connection import disconnect from mongoengine.connection import disconnect
from nomad import config, api, files from nomad import config, api, files, processing
from tests.test_processing import example_files from tests.test_processing import example_files
from tests.test_files import assert_exists from tests.test_files import assert_exists
# import fixtures # import fixtures
from tests.test_files import clear_files, archive_id # pylint: disable=unused-import from tests.test_files import clear_files, archive_id # pylint: disable=unused-import
from tests.test_processing import celery_config, celery_includes # pylint: disable=unused-import
@pytest.fixture @pytest.fixture
...@@ -108,8 +109,9 @@ def test_upload_to_upload(client, file): ...@@ -108,8 +109,9 @@ def test_upload_to_upload(client, file):
@pytest.mark.parametrize("file", example_files) @pytest.mark.parametrize("file", example_files)
@pytest.mark.timeout(10) @pytest.mark.timeout(10)
def test_processing(client, file): def test_processing(client, file, celery_session_worker):
handle_uploads_thread = api.start_upload_handler(quit=True) handle_uploads_thread = Thread(target=lambda: processing.handle_uploads(quit=True))
handle_uploads_thread.start()
rv = client.post('/uploads') rv = client.post('/uploads')
assert rv.status_code == 200 assert rv.status_code == 200
...@@ -134,11 +136,11 @@ def test_processing(client, file): ...@@ -134,11 +136,11 @@ def test_processing(client, file):
if upload['processing']['status'] in ['SUCCESS', 'FAILURE']: if upload['processing']['status'] in ['SUCCESS', 'FAILURE']:
break break
processing = upload['processing'] proc = upload['processing']
assert processing['status'] == 'SUCCESS' assert proc['status'] == 'SUCCESS'
assert 'results' in processing assert 'results' in proc
assert processing['results'] is not None assert proc['results'] is not None
assert processing['current_task'] == 'nomad.processing.close_upload' assert proc['current_task'] == 'nomad.processing.close_upload'
assert_exists(config.files.uploads_bucket, upload['id']) assert_exists(config.files.uploads_bucket, upload['id'])
......
...@@ -34,13 +34,6 @@ from tests.test_files import clear_files # pylint: disable=unused-import ...@@ -34,13 +34,6 @@ from tests.test_files import clear_files # pylint: disable=unused-import
example_files = [empty_file, example_file] example_files = [empty_file, example_file]
# disable test worker for now, see docstring above
# all further celery_* fixtures become effectivly mute.
@pytest.fixture(scope='session')
def celery_session_worker():
return None
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def celery_includes(): def celery_includes():
return ['nomad.processing'] return ['nomad.processing']
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment