Commit cc380606 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored celery config and processing tests.

parent 8199af7c
...@@ -55,6 +55,16 @@ ...@@ -55,6 +55,16 @@
"-sv", "tests/test_search.py" "-sv", "tests/test_search.py"
] ]
}, },
{
"name": "Python: tests/test_celery.py",
"type": "python",
"request": "launch",
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_celery.py"
]
},
{ {
"name": "Python: Current File", "name": "Python: Current File",
"type": "python", "type": "python",
......
...@@ -24,8 +24,7 @@ FilesConfig = namedtuple( ...@@ -24,8 +24,7 @@ FilesConfig = namedtuple(
'FilesConfig', ['uploads_bucket', 'repository_bucket', 'archive_bucket', 'compress_archive']) 'FilesConfig', ['uploads_bucket', 'repository_bucket', 'archive_bucket', 'compress_archive'])
""" API independent configuration for the object storage. """ """ API independent configuration for the object storage. """
CeleryConfig = namedtuple('Celery', [ CeleryConfig = namedtuple('Celery', ['broker_url', 'backend_url', 'serializer'])
'rabbit_host', 'rabbit_port', 'rabbit_user', 'rabbit_password', 'redis_host'])
""" Used to configure the RabbitMQ and Redis backends for celery. """ """ Used to configure the RabbitMQ and Redis backends for celery. """
MinioConfig = namedtuple('Minio', ['host', 'port', 'accesskey', 'secret']) MinioConfig = namedtuple('Minio', ['host', 'port', 'accesskey', 'secret'])
...@@ -49,13 +48,22 @@ files = FilesConfig( ...@@ -49,13 +48,22 @@ files = FilesConfig(
archive_bucket='archive', archive_bucket='archive',
compress_archive=False compress_archive=False
) )
rabbit_host = os.environ.get('NOMAD_RABBITMQ_HOST', 'localhost')
rabbit_port = os.environ.get('NOMAD_RABBITMQ_PORT', None)
rabbit_user = 'rabbitmq'
rabbit_password = 'rabbitmq'
redis_host = os.environ.get('NOMAD_REDIS_HOST', 'localhost')
rabbit_url = 'pyamqp://%s:%s@%s//' % (rabbit_user, rabbit_password, rabbit_host)
redis_url = 'redis://%s/0' % redis_host
celery = CeleryConfig( celery = CeleryConfig(
rabbit_host=os.environ.get('NOMAD_RABBITMQ_HOST', 'localhost'), broker_url=redis_url,
rabbit_port=os.environ.get('NOMAD_RABBITMQ_PORT', None), backend_url=redis_url,
rabbit_user='rabbitmq', serializer='pickle'
rabbit_password='rabbitmq',
redis_host=os.environ.get('NOMAD_REDIS_HOST', 'localhost'),
) )
minio = MinioConfig( minio = MinioConfig(
host=os.environ.get('NOMAD_MINIO_HOST', 'localhost'), host=os.environ.get('NOMAD_MINIO_HOST', 'localhost'),
port=int(os.environ.get('NOMAD_MINIO_PORT', '9007')), port=int(os.environ.get('NOMAD_MINIO_PORT', '9007')),
......
...@@ -634,13 +634,8 @@ parsers = [ ...@@ -634,13 +634,8 @@ parsers = [
Parser( Parser(
python_git=dependencies['parsers/exciting'], python_git=dependencies['parsers/exciting'],
parser_class_name='vaspparser.VASPParser', parser_class_name='vaspparser.VASPParser',
main_file_re=r'^.*\.xml$', main_file_re=r'^.*\.todo$',
main_contents_re=( main_contents_re=(r'^todo')
r'^\s*<\?xml version="1\.0" encoding="ISO-8859-1"\?>\s*'
r'?\s*<modeling>'
r'?\s*<generator>'
r'?\s*<i name="program" type="string">\s*vasp\s*</i>'
r'?')
), ),
] ]
""" Instanciation and constructor based config of all parsers. """ """ Instanciation and constructor based config of all parsers. """
......
...@@ -46,8 +46,8 @@ from nomad.files import Upload, UploadError ...@@ -46,8 +46,8 @@ from nomad.files import Upload, UploadError
from nomad import files, utils from nomad import files, utils
from nomad.parsing import parsers, parser_dict from nomad.parsing import parsers, parser_dict
from nomad.normalizing import normalizers from nomad.normalizing import normalizers
from nomad import search, users from nomad import search
import nomad.patch # pylint: disable=ununsed-import import nomad.patch # pylint: disable=unused-import
# The legacy nomad code uses a logger called 'nomad'. We do not want that this # The legacy nomad code uses a logger called 'nomad'. We do not want that this
# logger becomes a child of this logger due to its module name starting with 'nomad.' # logger becomes a child of this logger due to its module name starting with 'nomad.'
...@@ -67,15 +67,12 @@ if config.logstash.enabled: ...@@ -67,15 +67,12 @@ if config.logstash.enabled:
after_setup_logger.connect(initialize_logstash) after_setup_logger.connect(initialize_logstash)
broker_url = 'pyamqp://%s:%s@%s//' % ( app = Celery('nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url)
config.celery.rabbit_user, config.celery.rabbit_password, config.celery.rabbit_host) app.add_defaults(dict(
backend_url = 'redis://%s/0' % config.celery.redis_host accept_content=['json', 'pickle'],
app = Celery('nomad.processing', backend=backend_url, broker=broker_url) task_serializer=config.celery.serializer,
app.conf.update( result_serializer=config.celery.serializer,
accept_content=['pickle'], ))
task_serializer='pickle',
result_serializer='pickle',
)
ProcessingTaskResult = List[Tuple[str, List[str]]] ProcessingTaskResult = List[Tuple[str, List[str]]]
""" A list of parser/normalizer (status, errors) tuples. """ """ A list of parser/normalizer (status, errors) tuples. """
...@@ -261,6 +258,8 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing: ...@@ -261,6 +258,8 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing:
logger.debug('Could not open upload %s: %s' % (processing.upload_id, e)) logger.debug('Could not open upload %s: %s' % (processing.upload_id, e))
return processing.fail(e) return processing.fail(e)
logger.debug('Opened upload %s' % processing.upload_id)
try: try:
processing.upload_hash = upload.hash() processing.upload_hash = upload.hash()
except UploadError as e: except UploadError as e:
...@@ -268,6 +267,7 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing: ...@@ -268,6 +267,7 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing:
return processing.fail(e) return processing.fail(e)
try: try:
# TODO: deal with multiple possible parser specs
processing.parse_specs = list() processing.parse_specs = list()
for filename in upload.filelist: for filename in upload.filelist:
for parser in parsers: for parser in parsers:
...@@ -303,6 +303,8 @@ def close_upload( ...@@ -303,6 +303,8 @@ def close_upload(
logger.error('Could not close upload %s: %s' % (processing.upload_id, e)) logger.error('Could not close upload %s: %s' % (processing.upload_id, e))
return processing.fail(e) return processing.fail(e)
logger.debug('Closed upload %s' % processing.upload_id)
return processing return processing
...@@ -368,10 +370,8 @@ def parse(processing: UploadProcessing, parse_spec: ParseSpec) -> ProcessingTask ...@@ -368,10 +370,8 @@ def parse(processing: UploadProcessing, parse_spec: ParseSpec) -> ProcessingTask
(upload_hash, mainfile, e), exc_info=e) (upload_hash, mainfile, e), exc_info=e)
results.append(('IndexFailed', [e.__str__()])) results.append(('IndexFailed', [e.__str__()]))
archive_id = '%s/%s' % (upload_hash, calc_hash)
logger.debug('Written results of %s for %s to %s.' % (parser, mainfile, archive_id))
# calc data persistence # calc data persistence
archive_id = '%s/%s' % (upload_hash, calc_hash)
try: try:
with files.write_archive_json(archive_id) as out: with files.write_archive_json(archive_id) as out:
parser_backend.write_json(out, pretty=True) parser_backend.write_json(out, pretty=True)
...@@ -382,4 +382,11 @@ def parse(processing: UploadProcessing, parse_spec: ParseSpec) -> ProcessingTask ...@@ -382,4 +382,11 @@ def parse(processing: UploadProcessing, parse_spec: ParseSpec) -> ProcessingTask
(archive_id, mainfile, parser), exc_info=e) (archive_id, mainfile, parser), exc_info=e)
results.append(('PersistenceFailed', [e.__str__()])) results.append(('PersistenceFailed', [e.__str__()]))
logger.debug('Written results of %s for %s to %s.' % (parser, mainfile, archive_id))
return results return results
@app.task()
def mul(x, y):
return x * y
...@@ -12,6 +12,12 @@ ...@@ -12,6 +12,12 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""
To run this test, a celery worker must be running. The test worker provided by
the celery pytest plugin is currently not working. It results on a timeout when
reading from the redis result backend, even though all task apperently ended successfully.
"""
from typing import Generator from typing import Generator
import pytest import pytest
from minio import ResponseError from minio import ResponseError
...@@ -27,6 +33,29 @@ from tests.test_search import index # pylint: disable=unused-import ...@@ -27,6 +33,29 @@ from tests.test_search import index # pylint: disable=unused-import
example_files = ['data/examples_vasp.zip', 'data/empty.zip'] example_files = ['data/examples_vasp.zip', 'data/empty.zip']
# disable test worker for now, see docstring above
# all further celery_* fixtures become effectivly mute.
@pytest.fixture(scope='session')
def celery_session_worker():
return None
@pytest.fixture(scope='session')
def celery_includes():
return ['nomad.processing']
@pytest.fixture(scope='session')
def celery_config():
return {
'broker_url': config.celery.broker_url,
'result_backend': config.celery.backend_url,
'accept_content': ['json', 'pickle'],
'task_serializer': config.celery.serializer,
'result_serializer': config.celery.serializer
}
@pytest.fixture(scope='function', params=example_files) @pytest.fixture(scope='function', params=example_files)
def uploaded_id(request) -> Generator[str, None, None]: def uploaded_id(request) -> Generator[str, None, None]:
example_file = request.param example_file = request.param
...@@ -51,10 +80,11 @@ def uploaded_id(request) -> Generator[str, None, None]: ...@@ -51,10 +80,11 @@ def uploaded_id(request) -> Generator[str, None, None]:
pass pass
def test_processing(uploaded_id): def test_processing(uploaded_id, celery_session_worker):
run = UploadProcessing(uploaded_id) run = UploadProcessing(uploaded_id)
run.start() run.start()
run.get(timeout=30)
run.get(timeout=10)
assert run.ready() assert run.ready()
assert run.task_name == 'nomad.processing.close_upload' assert run.task_name == 'nomad.processing.close_upload'
...@@ -69,10 +99,12 @@ def test_processing(uploaded_id): ...@@ -69,10 +99,12 @@ def test_processing(uploaded_id):
run.forget() run.forget()
def test_process_non_existing(): def test_process_non_existing(celery_session_worker):
run = UploadProcessing('__does_not_exist') run = UploadProcessing('__does_not_exist')
run.start() run.start()
run.get(timeout=30)
run.get(timeout=10)
assert run.ready() assert run.ready()
run.forget() run.forget()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment