diff --git a/README.md b/README.md index 46611e343a26e390b5a326e0ee4da278f576d10d..673561792174a2706db4daf7cf41eb27371d3bf0 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,13 @@ your browser. ## Change log +### v0.4.3 +- more flexible celery routing +- config via nomad.yml +- repo_db can be disabled +- publishing of calculations with failed processing +- cli for managing running processing tasks + ### v0.4.2 - bugfixes regarding the migration - better migration configurability and reproducibility diff --git a/nomad/coe_repo/upload.py b/nomad/coe_repo/upload.py index 54b69292ecc372af4409e7637a4d3990ad8e8ab8..13dd0de49b9ec62a5672d9d1d4b3452e55a27469 100644 --- a/nomad/coe_repo/upload.py +++ b/nomad/coe_repo/upload.py @@ -155,16 +155,17 @@ class Upload(Base): # type: ignore retries = 0 while True: - publish_filelock = filelock.FileLock( - os.path.join(config.fs.tmp, 'publish.lock')) - logger.info('waiting for filelock') - while True: - try: - publish_filelock.acquire(timeout=15 * 60, poll_intervall=1) - logger.info('acquired filelock') - break - except filelock.Timeout: - logger.warning('could not acquire publish lock after generous timeout') + if config.repository_db.sequential_publish: + publish_filelock = filelock.FileLock( + os.path.join(config.fs.tmp, 'publish.lock')) + logger.info('waiting for filelock') + while True: + try: + publish_filelock.acquire(timeout=15 * 60, poll_intervall=1) + logger.info('acquired filelock') + break + except filelock.Timeout: + logger.warning('could not acquire publish lock after generous timeout') repo_db = infrastructure.repository_db repo_db.expunge_all() @@ -223,5 +224,6 @@ class Upload(Base): # type: ignore logger.error('Unexpected exception.', exc_info=e) raise e finally: - publish_filelock.release() - logger.info('released filelock') + if config.repository_db.sequential_publish: + publish_filelock.release() + logger.info('released filelock') diff --git a/nomad/config.py b/nomad/config.py index 336830b472bbe84ca3cc4304ffe13a8f582fd590..d302819e208cca2663419db89961f127e03367f5 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -17,6 +17,7 @@ import os import os.path import yaml import warnings +from kombu import Queue from nomad import utils @@ -55,7 +56,11 @@ celery = NomadConfig( max_memory=64e6, # 64 GB timeout=1800, # 1/2 h acks_late=True, - routing=CELERY_QUEUE_ROUTING + routing=CELERY_QUEUE_ROUTING, + task_queues=[ + Queue('calcs', routing_key='calcs', queue_arguments={'x-max-priority': 10}), + Queue('uploads', routing_key='uploads', queue_arguments={'x-max-priority': 100}) + ] ) fs = NomadConfig( @@ -72,6 +77,8 @@ elastic = NomadConfig( ) repository_db = NomadConfig( + sequential_publish=False, + publish_enabled=True, host='localhost', port=5432, dbname='nomad_fairdi_repo_db', diff --git a/nomad/processing/base.py b/nomad/processing/base.py index bf7aa238a75c1e61a38cb9688353b5706a3eb66b..a94f50f659ff332cc5fe0323a4fae1ac2f5c299a 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -54,6 +54,8 @@ app.conf.update(worker_max_memory_per_child=config.celery.max_memory) if config.celery.routing == config.CELERY_WORKER_ROUTING: app.conf.update(worker_direct=True) +app.conf.task_queues = config.celery.task_queues + CREATED = 'CREATED' PENDING = 'PENDING' RUNNING = 'RUNNING' diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 7f626bff744d48332e5ece3be4bb0dc9ef1e90e2..66c86a38aefb0a5309d6ce07b18d803a38bcfcf7 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -412,37 +412,43 @@ class Upload(Proc): processing state db. """ logger = self.get_logger() - logger.info('started to publish', step='publish') + logger.info('started to publish') with utils.lnr(logger, 'publish failed'): upload_with_metadata = self.to_upload_with_metadata() + if config.repository_db.publish_enabled: + with utils.timer( + logger, 'upload added to repository', step='repo', + upload_size=self.upload_files.size): + coe_repo.Upload.publish(upload_with_metadata) + + if config.repository_db.publish_enabled: + coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id) + if coe_upload is not None: + calcs = [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs] + else: + calcs = [] + else: + calcs = upload_with_metadata.calcs + with utils.timer( - logger, 'upload added to repository', step='repo', + logger, 'upload metadata updated', step='metadata', upload_size=self.upload_files.size): - coe_repo.Upload.publish(upload_with_metadata) + for calc in calcs: + calc.published = True + self.upload_files.metadata.update( + calc_id=calc.calc_id, updates=calc.to_dict()) with utils.timer( logger, 'staged upload files packed', step='pack', upload_size=self.upload_files.size): - coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id) - if coe_upload is not None: - for coe_calc in coe_upload.calcs: - calc_metadata = coe_calc.to_calc_with_metadata() - calc_metadata.published = True - self.upload_files.metadata.update( - calc_id=calc_metadata.calc_id, updates=calc_metadata.to_dict()) - logger.info('metadata updated after publish to coe repo', step='publish') - self.upload_files.pack() with utils.timer( logger, 'index updated', step='index', upload_size=self.upload_files.size): - coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id) - if coe_upload is not None: - search.publish( - [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs]) + search.publish(calcs) with utils.timer( logger, 'staged upload deleted', step='delete staged', @@ -572,7 +578,7 @@ class Upload(Proc): '', 'your data %suploaded %s has completed processing.' % ( self.name if self.name else '', self.upload_time.isoformat()), - 'You can review your data on your upload page: %s' % config.services.upload_url + 'You can review your data on your upload page: %s' % config.upload_url() ]) try: infrastructure.send_mail( diff --git a/nomad/search.py b/nomad/search.py index ff13d4fca4878d75de8b76ae86b5679316bad998..f96ced395ac5e6e640bb822d4aa6794f69c49d95 100644 --- a/nomad/search.py +++ b/nomad/search.py @@ -123,7 +123,7 @@ class Entry(Document): self.upload_time = source.upload_time self.calc_id = source.calc_id self.calc_hash = source.calc_hash - self.pid = str(source.pid) + self.pid = None if source.pid is None else str(source.pid) self.mainfile = source.mainfile if source.files is None: diff --git a/ops/helm/nomad/templates/nomad-configmap.yml b/ops/helm/nomad/templates/nomad-configmap.yml index afb07e1da34df2d0f1b463e44b3b42ddd78be9c9..6786107e00332a02199e1556d0fc71d3ca358a14 100644 --- a/ops/helm/nomad/templates/nomad-configmap.yml +++ b/ops/helm/nomad/templates/nomad-configmap.yml @@ -42,6 +42,8 @@ data: host: "{{ .Values.postgres.host }}" port: {{ .Values.postgres.port }} dbname: "{{ .Values.dbname }}" + sequential_public: {{ .Values.postgres.sequential_publish }} + publish_enabled: {{ .Values.postgres.publish_enabled }} mail: host: "{{ .Values.mail.host }}" port: {{ .Values.mail.port }} diff --git a/ops/helm/nomad/templates/worker-deployment.yaml b/ops/helm/nomad/templates/worker-deployment.yaml index 1c47b9733db16cc9e89d8e58765ab687f3ae2a1d..bba64aae9a909200a0d68f03b1d1ee051bae718d 100644 --- a/ops/helm/nomad/templates/worker-deployment.yaml +++ b/ops/helm/nomad/templates/worker-deployment.yaml @@ -43,7 +43,7 @@ spec: value: "{{ .Values.worker.console_loglevel }}" - name: NOMAD_LOGSTASH_LEVEL value: "{{ .Values.worker.logstash_loglevel }}" - command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "celery,calcs,uploads"] + command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "calcs,uploads"] livenessProbe: exec: command: diff --git a/ops/helm/nomad/values.yaml b/ops/helm/nomad/values.yaml index 668d919da9068ffe126caee766e8e9ae383f8a80..dd33a9962fe2ecf0ea4a6b1ddde0034234688e0a 100644 --- a/ops/helm/nomad/values.yaml +++ b/ops/helm/nomad/values.yaml @@ -96,6 +96,8 @@ elastic: port: 19200 postgres: + sequential_publish: false + publish_enabled: true host: enc-preprocessing-nomad.esc port: 5432 diff --git a/requirements.txt b/requirements.txt index b7c9bbb92d4fd3058118543e33b5a27c1e575e43..0e0d841388c7a4faf3a5cb28083978d4ac1efb50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -43,6 +43,7 @@ PyJWT jsonschema[format] python-magic runstats +pyyml # dev/ops related setuptools diff --git a/tests/conftest.py b/tests/conftest.py index b0667d4d0ad8705574460cb34587d96d7b64c843..498ff2c987b98abbe9b17946a9d8dfdc58a66d59 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -96,7 +96,8 @@ def celery_includes(): @pytest.fixture(scope='session') def celery_config(): return { - 'broker_url': config.rabbitmq_url() + 'broker_url': config.rabbitmq_url(), + 'task_queues': config.celery.task_queues } @@ -517,3 +518,9 @@ def processed(uploaded: Tuple[str, str], test_user: coe_repo.User, proc_infra) - Provides a processed upload. Upload was uploaded with test_user. """ return test_processing.run_processing(uploaded, test_user) + + +@pytest.fixture(scope='function', params=[False, True]) +def with_publish_to_coe_repo(monkeypatch, request): + monkeypatch.setattr('nomad.config.repository_db.publish_enabled', request.param) + return request.param diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index 6341b1cc1606aab1797fd130cc1c12d244b5518d..040baf3f989ae9f2f48c9a1ec7ca384a89c3060c 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -20,10 +20,14 @@ import json import re from nomad import utils, infrastructure -from nomad.files import UploadFiles, StagingUploadFiles +from nomad.files import UploadFiles, StagingUploadFiles, PublicUploadFiles from nomad.processing import Upload, Calc from nomad.processing.base import task as task_decorator, FAILURE, SUCCESS +from tests.test_search import assert_search_upload +from tests.test_files import assert_upload_files +from tests.test_coe_repo import assert_coe_upload + def test_send_mail(mails, monkeypatch): monkeypatch.setattr('nomad.config.mail.enabled', True) @@ -98,6 +102,28 @@ def test_processing(processed, no_warn, mails, monkeypatch): assert re.search(r'Processing completed', mails.messages[0].data.decode('utf-8')) is not None +def test_publish(processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo): + processed.metadata = example_user_metadata + + n_calcs = processed.total_calcs + additional_keys = ['with_embargo'] + if with_publish_to_coe_repo: + additional_keys.append('pid') + + processed.publish_upload() + try: + processed.block_until_complete(interval=.01) + except Exception: + pass + + assert_coe_upload(processed.upload_id, user_metadata=example_user_metadata) + + assert_upload_files( + processed.upload_id, PublicUploadFiles, n_calcs, additional_keys, published=True) + + assert_search_upload(processed.upload_id, n_calcs, additional_keys, published=True) + + @pytest.mark.timeout(10) def test_processing_with_warning(proc_infra, test_user, with_warn): example_file = 'tests/data/proc/examples_with_warning_template.zip' diff --git a/tests/test_api.py b/tests/test_api.py index 2191667ccc407251f5d340ec6f456c809b22cc2c..cf85b84860e1af5d998e9b20825025104ab80312 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -225,7 +225,7 @@ class TestUploads: assert_upload_files(upload_id, files.StagingUploadFiles, n_calcs) assert_search_upload(upload_id, n_calcs) - def assert_unstage(self, client, test_user_auth, upload_id, proc_infra, metadata={}): + def assert_published(self, client, test_user_auth, upload_id, proc_infra, with_pid=True, metadata={}): rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth) upload = self.assert_upload(rv.data) n_calcs = upload['calcs']['pagination']['total'] @@ -240,10 +240,13 @@ class TestUploads: assert upload['current_process'] == 'publish_upload' assert upload['process_running'] + additional_keys = ['with_embargo'] + if with_pid: + additional_keys.append('pid') self.assert_upload_does_not_exist(client, upload_id, test_user_auth) assert_coe_upload(upload_id, user_metadata=metadata) - assert_upload_files(upload_id, files.PublicUploadFiles, n_calcs, additional_keys=['with_embargo', 'pid'], published=True) - assert_search_upload(upload_id, n_calcs, additional_keys=['with_embargo', 'pid'], published=True) + assert_upload_files(upload_id, files.PublicUploadFiles, n_calcs, additional_keys=additional_keys, published=True) + assert_search_upload(upload_id, n_calcs, additional_keys=additional_keys, published=True) def assert_upload_does_not_exist(self, client, upload_id: str, test_user_auth): # poll until publish/delete completed @@ -329,11 +332,11 @@ class TestUploads: yield True monkeypatch.setattr('nomad.processing.data.Upload.cleanup', old_cleanup) - def test_delete_unstaged(self, client, test_user_auth, proc_infra, no_warn): + def test_delete_published(self, client, test_user_auth, proc_infra, no_warn, with_publish_to_coe_repo): rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth) upload = self.assert_upload(rv.data) self.assert_processing(client, test_user_auth, upload['upload_id']) - self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra) + self.assert_published(client, test_user_auth, upload['upload_id'], proc_infra, with_pid=with_publish_to_coe_repo) rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth) assert rv.status_code == 404 @@ -345,11 +348,11 @@ class TestUploads: assert rv.status_code == 200 self.assert_upload_does_not_exist(client, upload['upload_id'], test_user_auth) - def test_post(self, client, test_user_auth, example_upload, proc_infra, no_warn): + def test_post(self, client, test_user_auth, example_upload, proc_infra, no_warn, with_publish_to_coe_repo): rv = client.put('/uploads/?local_path=%s' % example_upload, headers=test_user_auth) upload = self.assert_upload(rv.data) self.assert_processing(client, test_user_auth, upload['upload_id']) - self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra) + self.assert_published(client, test_user_auth, upload['upload_id'], proc_infra, with_pid=with_publish_to_coe_repo) def test_post_metadata( self, client, proc_infra, admin_user_auth, test_user_auth, test_user, @@ -359,7 +362,7 @@ class TestUploads: self.assert_processing(client, test_user_auth, upload['upload_id']) metadata = dict(**example_user_metadata) metadata['_upload_time'] = datetime.now().isoformat() - self.assert_unstage(client, admin_user_auth, upload['upload_id'], proc_infra, metadata) + self.assert_published(client, admin_user_auth, upload['upload_id'], proc_infra, metadata) def test_post_metadata_forbidden(self, client, proc_infra, test_user_auth, no_warn): rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth) diff --git a/tests/test_search.py b/tests/test_search.py index 4a754331b0c08180f4da90aa3dfc361386761ec8..6e017b76f458f7e27768393607c66f6dc8828daa 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -132,10 +132,11 @@ def assert_search_upload(upload_id, n_calcs: int, additional_keys: List[str] = [ for hit in search: hit = hit.to_dict() for key, value in kwargs.items(): - if key == 'published': - assert int(hit.get('pid')) > 0 assert hit.get(key, None) == value + if 'pid' in hit: + assert int(hit.get('pid')) > 0 + for key in keys: assert key in hit