diff --git a/.vscode/launch.json b/.vscode/launch.json index f4d81103203c434c1290d071ba825b9cbfac903c..1cc5a5ab83830bceb0dccfc2584fe8a2adcfa9d6 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -55,6 +55,16 @@ "-sv", "tests/test_search.py" ] }, + { + "name": "Python: tests/test_celery.py", + "type": "python", + "request": "launch", + "cwd": "${workspaceFolder}", + "program": "${workspaceFolder}/.pyenv/bin/pytest", + "args": [ + "-sv", "tests/test_celery.py" + ] + }, { "name": "Python: Current File", "type": "python", diff --git a/nomad/config.py b/nomad/config.py index 8205132e174b8e12a10757e87dc73ce1c9c9550a..69984077847b72527effabd2de837daa0a1b6b7f 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -24,8 +24,7 @@ FilesConfig = namedtuple( 'FilesConfig', ['uploads_bucket', 'repository_bucket', 'archive_bucket', 'compress_archive']) """ API independent configuration for the object storage. """ -CeleryConfig = namedtuple('Celery', [ - 'rabbit_host', 'rabbit_port', 'rabbit_user', 'rabbit_password', 'redis_host']) +CeleryConfig = namedtuple('Celery', ['broker_url', 'backend_url', 'serializer']) """ Used to configure the RabbitMQ and Redis backends for celery. """ MinioConfig = namedtuple('Minio', ['host', 'port', 'accesskey', 'secret']) @@ -49,13 +48,22 @@ files = FilesConfig( archive_bucket='archive', compress_archive=False ) + +rabbit_host = os.environ.get('NOMAD_RABBITMQ_HOST', 'localhost') +rabbit_port = os.environ.get('NOMAD_RABBITMQ_PORT', None) +rabbit_user = 'rabbitmq' +rabbit_password = 'rabbitmq' +redis_host = os.environ.get('NOMAD_REDIS_HOST', 'localhost') + +rabbit_url = 'pyamqp://%s:%s@%s//' % (rabbit_user, rabbit_password, rabbit_host) +redis_url = 'redis://%s/0' % redis_host + celery = CeleryConfig( - rabbit_host=os.environ.get('NOMAD_RABBITMQ_HOST', 'localhost'), - rabbit_port=os.environ.get('NOMAD_RABBITMQ_PORT', None), - rabbit_user='rabbitmq', - rabbit_password='rabbitmq', - redis_host=os.environ.get('NOMAD_REDIS_HOST', 'localhost'), + broker_url=redis_url, + backend_url=redis_url, + serializer='pickle' ) + minio = MinioConfig( host=os.environ.get('NOMAD_MINIO_HOST', 'localhost'), port=int(os.environ.get('NOMAD_MINIO_PORT', '9007')), diff --git a/nomad/parsing.py b/nomad/parsing.py index 86151fbe7a1ad33e88c75e8721d127bbab2597cc..ec469362761d6382c0131c8e44999eb24eba1a4d 100644 --- a/nomad/parsing.py +++ b/nomad/parsing.py @@ -634,13 +634,8 @@ parsers = [ Parser( python_git=dependencies['parsers/exciting'], parser_class_name='vaspparser.VASPParser', - main_file_re=r'^.*\.xml$', - main_contents_re=( - r'^\s*<\?xml version="1\.0" encoding="ISO-8859-1"\?>\s*' - r'?\s*<modeling>' - r'?\s*<generator>' - r'?\s*<i name="program" type="string">\s*vasp\s*</i>' - r'?') + main_file_re=r'^.*\.todo$', + main_contents_re=(r'^todo') ), ] """ Instanciation and constructor based config of all parsers. """ diff --git a/nomad/processing.py b/nomad/processing.py index 5fb54537e43be937ad5ae5eda82c81c3f98477eb..0a0aa5f7b49d33453cc3f4c76fcdaead3dc2d780 100644 --- a/nomad/processing.py +++ b/nomad/processing.py @@ -46,8 +46,8 @@ from nomad.files import Upload, UploadError from nomad import files, utils from nomad.parsing import parsers, parser_dict from nomad.normalizing import normalizers -from nomad import search, users -import nomad.patch # pylint: disable=ununsed-import +from nomad import search +import nomad.patch # pylint: disable=unused-import # The legacy nomad code uses a logger called 'nomad'. We do not want that this # logger becomes a child of this logger due to its module name starting with 'nomad.' @@ -67,15 +67,12 @@ if config.logstash.enabled: after_setup_logger.connect(initialize_logstash) -broker_url = 'pyamqp://%s:%s@%s//' % ( - config.celery.rabbit_user, config.celery.rabbit_password, config.celery.rabbit_host) -backend_url = 'redis://%s/0' % config.celery.redis_host -app = Celery('nomad.processing', backend=backend_url, broker=broker_url) -app.conf.update( - accept_content=['pickle'], - task_serializer='pickle', - result_serializer='pickle', -) +app = Celery('nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url) +app.add_defaults(dict( + accept_content=['json', 'pickle'], + task_serializer=config.celery.serializer, + result_serializer=config.celery.serializer, +)) ProcessingTaskResult = List[Tuple[str, List[str]]] """ A list of parser/normalizer (status, errors) tuples. """ @@ -261,6 +258,8 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing: logger.debug('Could not open upload %s: %s' % (processing.upload_id, e)) return processing.fail(e) + logger.debug('Opened upload %s' % processing.upload_id) + try: processing.upload_hash = upload.hash() except UploadError as e: @@ -268,6 +267,7 @@ def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing: return processing.fail(e) try: + # TODO: deal with multiple possible parser specs processing.parse_specs = list() for filename in upload.filelist: for parser in parsers: @@ -303,6 +303,8 @@ def close_upload( logger.error('Could not close upload %s: %s' % (processing.upload_id, e)) return processing.fail(e) + logger.debug('Closed upload %s' % processing.upload_id) + return processing @@ -368,10 +370,8 @@ def parse(processing: UploadProcessing, parse_spec: ParseSpec) -> ProcessingTask (upload_hash, mainfile, e), exc_info=e) results.append(('IndexFailed', [e.__str__()])) - archive_id = '%s/%s' % (upload_hash, calc_hash) - logger.debug('Written results of %s for %s to %s.' % (parser, mainfile, archive_id)) - # calc data persistence + archive_id = '%s/%s' % (upload_hash, calc_hash) try: with files.write_archive_json(archive_id) as out: parser_backend.write_json(out, pretty=True) @@ -382,4 +382,11 @@ def parse(processing: UploadProcessing, parse_spec: ParseSpec) -> ProcessingTask (archive_id, mainfile, parser), exc_info=e) results.append(('PersistenceFailed', [e.__str__()])) + logger.debug('Written results of %s for %s to %s.' % (parser, mainfile, archive_id)) + return results + + +@app.task() +def mul(x, y): + return x * y diff --git a/tests/test_processing.py b/tests/test_processing.py index a3834718b1b91770bb3c04029bc7b8b848f018fc..a127a5d8ff5b8cf191541b4edcdbe81085c7d2e5 100644 --- a/tests/test_processing.py +++ b/tests/test_processing.py @@ -12,6 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +To run this test, a celery worker must be running. The test worker provided by +the celery pytest plugin is currently not working. It results on a timeout when +reading from the redis result backend, even though all task apperently ended successfully. +""" + from typing import Generator import pytest from minio import ResponseError @@ -27,6 +33,29 @@ from tests.test_search import index # pylint: disable=unused-import example_files = ['data/examples_vasp.zip', 'data/empty.zip'] +# disable test worker for now, see docstring above +# all further celery_* fixtures become effectivly mute. +@pytest.fixture(scope='session') +def celery_session_worker(): + return None + + +@pytest.fixture(scope='session') +def celery_includes(): + return ['nomad.processing'] + + +@pytest.fixture(scope='session') +def celery_config(): + return { + 'broker_url': config.celery.broker_url, + 'result_backend': config.celery.backend_url, + 'accept_content': ['json', 'pickle'], + 'task_serializer': config.celery.serializer, + 'result_serializer': config.celery.serializer + } + + @pytest.fixture(scope='function', params=example_files) def uploaded_id(request) -> Generator[str, None, None]: example_file = request.param @@ -51,10 +80,11 @@ def uploaded_id(request) -> Generator[str, None, None]: pass -def test_processing(uploaded_id): +def test_processing(uploaded_id, celery_session_worker): run = UploadProcessing(uploaded_id) run.start() - run.get(timeout=30) + + run.get(timeout=10) assert run.ready() assert run.task_name == 'nomad.processing.close_upload' @@ -69,10 +99,12 @@ def test_processing(uploaded_id): run.forget() -def test_process_non_existing(): +def test_process_non_existing(celery_session_worker): run = UploadProcessing('__does_not_exist') run.start() - run.get(timeout=30) + + run.get(timeout=10) + assert run.ready() run.forget()