From 27a37a8d82b34335fc4e89bf7f45e031cc7c62f6 Mon Sep 17 00:00:00 2001
From: Markus Scheidgen <markus.scheidgen@gmail.com>
Date: Wed, 20 Mar 2019 16:39:46 +0100
Subject: [PATCH] Publish to repo db is configurable now. Different priorities
 on celery queues.

---
 README.md                                     |  7 ++++
 nomad/coe_repo/upload.py                      | 26 ++++++------
 nomad/config.py                               |  9 ++++-
 nomad/processing/base.py                      |  2 +
 nomad/processing/data.py                      | 40 +++++++++++--------
 nomad/search.py                               |  2 +-
 ops/helm/nomad/templates/nomad-configmap.yml  |  2 +
 .../nomad/templates/worker-deployment.yaml    |  2 +-
 ops/helm/nomad/values.yaml                    |  2 +
 requirements.txt                              |  1 +
 tests/conftest.py                             |  9 ++++-
 tests/processing/test_data.py                 | 28 ++++++++++++-
 tests/test_api.py                             | 19 +++++----
 tests/test_search.py                          |  5 ++-
 14 files changed, 110 insertions(+), 44 deletions(-)

diff --git a/README.md b/README.md
index 46611e343a..6735617921 100644
--- a/README.md
+++ b/README.md
@@ -55,6 +55,13 @@ your browser.
 
 ## Change log
 
+### v0.4.3
+- more flexible celery routing
+- config via nomad.yml
+- repo_db can be disabled
+- publishing of calculations with failed processing
+- cli for managing running processing tasks
+
 ### v0.4.2
 - bugfixes regarding the migration
 - better migration configurability and reproducibility
diff --git a/nomad/coe_repo/upload.py b/nomad/coe_repo/upload.py
index 54b69292ec..13dd0de49b 100644
--- a/nomad/coe_repo/upload.py
+++ b/nomad/coe_repo/upload.py
@@ -155,16 +155,17 @@ class Upload(Base):  # type: ignore
         retries = 0
 
         while True:
-            publish_filelock = filelock.FileLock(
-                os.path.join(config.fs.tmp, 'publish.lock'))
-            logger.info('waiting for filelock')
-            while True:
-                try:
-                    publish_filelock.acquire(timeout=15 * 60, poll_intervall=1)
-                    logger.info('acquired filelock')
-                    break
-                except filelock.Timeout:
-                    logger.warning('could not acquire publish lock after generous timeout')
+            if config.repository_db.sequential_publish:
+                publish_filelock = filelock.FileLock(
+                    os.path.join(config.fs.tmp, 'publish.lock'))
+                logger.info('waiting for filelock')
+                while True:
+                    try:
+                        publish_filelock.acquire(timeout=15 * 60, poll_intervall=1)
+                        logger.info('acquired filelock')
+                        break
+                    except filelock.Timeout:
+                        logger.warning('could not acquire publish lock after generous timeout')
 
             repo_db = infrastructure.repository_db
             repo_db.expunge_all()
@@ -223,5 +224,6 @@ class Upload(Base):  # type: ignore
                     logger.error('Unexpected exception.', exc_info=e)
                     raise e
             finally:
-                publish_filelock.release()
-                logger.info('released filelock')
+                if config.repository_db.sequential_publish:
+                    publish_filelock.release()
+                    logger.info('released filelock')
diff --git a/nomad/config.py b/nomad/config.py
index 336830b472..d302819e20 100644
--- a/nomad/config.py
+++ b/nomad/config.py
@@ -17,6 +17,7 @@ import os
 import os.path
 import yaml
 import warnings
+from kombu import Queue
 
 from nomad import utils
 
@@ -55,7 +56,11 @@ celery = NomadConfig(
     max_memory=64e6,  # 64 GB
     timeout=1800,  # 1/2 h
     acks_late=True,
-    routing=CELERY_QUEUE_ROUTING
+    routing=CELERY_QUEUE_ROUTING,
+    task_queues=[
+        Queue('calcs', routing_key='calcs', queue_arguments={'x-max-priority': 10}),
+        Queue('uploads', routing_key='uploads', queue_arguments={'x-max-priority': 100})
+    ]
 )
 
 fs = NomadConfig(
@@ -72,6 +77,8 @@ elastic = NomadConfig(
 )
 
 repository_db = NomadConfig(
+    sequential_publish=False,
+    publish_enabled=True,
     host='localhost',
     port=5432,
     dbname='nomad_fairdi_repo_db',
diff --git a/nomad/processing/base.py b/nomad/processing/base.py
index bf7aa238a7..a94f50f659 100644
--- a/nomad/processing/base.py
+++ b/nomad/processing/base.py
@@ -54,6 +54,8 @@ app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
 if config.celery.routing == config.CELERY_WORKER_ROUTING:
     app.conf.update(worker_direct=True)
 
+app.conf.task_queues = config.celery.task_queues
+
 CREATED = 'CREATED'
 PENDING = 'PENDING'
 RUNNING = 'RUNNING'
diff --git a/nomad/processing/data.py b/nomad/processing/data.py
index 7f626bff74..66c86a38ae 100644
--- a/nomad/processing/data.py
+++ b/nomad/processing/data.py
@@ -412,37 +412,43 @@ class Upload(Proc):
         processing state db.
         """
         logger = self.get_logger()
-        logger.info('started to publish', step='publish')
+        logger.info('started to publish')
 
         with utils.lnr(logger, 'publish failed'):
             upload_with_metadata = self.to_upload_with_metadata()
 
+            if config.repository_db.publish_enabled:
+                with utils.timer(
+                        logger, 'upload added to repository', step='repo',
+                        upload_size=self.upload_files.size):
+                    coe_repo.Upload.publish(upload_with_metadata)
+
+            if config.repository_db.publish_enabled:
+                coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
+                if coe_upload is not None:
+                    calcs = [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs]
+                else:
+                    calcs = []
+            else:
+                calcs = upload_with_metadata.calcs
+
             with utils.timer(
-                    logger, 'upload added to repository', step='repo',
+                    logger, 'upload metadata updated', step='metadata',
                     upload_size=self.upload_files.size):
-                coe_repo.Upload.publish(upload_with_metadata)
+                for calc in calcs:
+                    calc.published = True
+                    self.upload_files.metadata.update(
+                        calc_id=calc.calc_id, updates=calc.to_dict())
 
             with utils.timer(
                     logger, 'staged upload files packed', step='pack',
                     upload_size=self.upload_files.size):
-                coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
-                if coe_upload is not None:
-                    for coe_calc in coe_upload.calcs:
-                        calc_metadata = coe_calc.to_calc_with_metadata()
-                        calc_metadata.published = True
-                        self.upload_files.metadata.update(
-                            calc_id=calc_metadata.calc_id, updates=calc_metadata.to_dict())
-                    logger.info('metadata updated after publish to coe repo', step='publish')
-
                 self.upload_files.pack()
 
             with utils.timer(
                     logger, 'index updated', step='index',
                     upload_size=self.upload_files.size):
-                coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
-                if coe_upload is not None:
-                    search.publish(
-                        [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs])
+                search.publish(calcs)
 
             with utils.timer(
                     logger, 'staged upload deleted', step='delete staged',
@@ -572,7 +578,7 @@ class Upload(Proc):
             '',
             'your data %suploaded %s has completed processing.' % (
                 self.name if self.name else '', self.upload_time.isoformat()),
-            'You can review your data on your upload page: %s' % config.services.upload_url
+            'You can review your data on your upload page: %s' % config.upload_url()
         ])
         try:
             infrastructure.send_mail(
diff --git a/nomad/search.py b/nomad/search.py
index ff13d4fca4..f96ced395a 100644
--- a/nomad/search.py
+++ b/nomad/search.py
@@ -123,7 +123,7 @@ class Entry(Document):
         self.upload_time = source.upload_time
         self.calc_id = source.calc_id
         self.calc_hash = source.calc_hash
-        self.pid = str(source.pid)
+        self.pid = None if source.pid is None else str(source.pid)
 
         self.mainfile = source.mainfile
         if source.files is None:
diff --git a/ops/helm/nomad/templates/nomad-configmap.yml b/ops/helm/nomad/templates/nomad-configmap.yml
index afb07e1da3..6786107e00 100644
--- a/ops/helm/nomad/templates/nomad-configmap.yml
+++ b/ops/helm/nomad/templates/nomad-configmap.yml
@@ -42,6 +42,8 @@ data:
       host: "{{ .Values.postgres.host }}"
       port: {{ .Values.postgres.port }}
       dbname: "{{ .Values.dbname }}"
+      sequential_public: {{ .Values.postgres.sequential_publish }}
+      publish_enabled: {{ .Values.postgres.publish_enabled }}
     mail:
       host: "{{ .Values.mail.host }}"
       port: {{ .Values.mail.port }}
diff --git a/ops/helm/nomad/templates/worker-deployment.yaml b/ops/helm/nomad/templates/worker-deployment.yaml
index 1c47b9733d..bba64aae9a 100644
--- a/ops/helm/nomad/templates/worker-deployment.yaml
+++ b/ops/helm/nomad/templates/worker-deployment.yaml
@@ -43,7 +43,7 @@ spec:
           value: "{{ .Values.worker.console_loglevel }}"
         - name: NOMAD_LOGSTASH_LEVEL
           value: "{{ .Values.worker.logstash_loglevel }}"
-        command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "celery,calcs,uploads"]
+        command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "calcs,uploads"]
         livenessProbe:
           exec:
             command:
diff --git a/ops/helm/nomad/values.yaml b/ops/helm/nomad/values.yaml
index 668d919da9..dd33a9962f 100644
--- a/ops/helm/nomad/values.yaml
+++ b/ops/helm/nomad/values.yaml
@@ -96,6 +96,8 @@ elastic:
   port: 19200
 
 postgres:
+  sequential_publish: false
+  publish_enabled: true
   host: enc-preprocessing-nomad.esc
   port: 5432
 
diff --git a/requirements.txt b/requirements.txt
index b7c9bbb92d..0e0d841388 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -43,6 +43,7 @@ PyJWT
 jsonschema[format]
 python-magic
 runstats
+pyyml
 
 # dev/ops related
 setuptools
diff --git a/tests/conftest.py b/tests/conftest.py
index b0667d4d0a..498ff2c987 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -96,7 +96,8 @@ def celery_includes():
 @pytest.fixture(scope='session')
 def celery_config():
     return {
-        'broker_url': config.rabbitmq_url()
+        'broker_url': config.rabbitmq_url(),
+        'task_queues': config.celery.task_queues
     }
 
 
@@ -517,3 +518,9 @@ def processed(uploaded: Tuple[str, str], test_user: coe_repo.User, proc_infra) -
     Provides a processed upload. Upload was uploaded with test_user.
     """
     return test_processing.run_processing(uploaded, test_user)
+
+
+@pytest.fixture(scope='function', params=[False, True])
+def with_publish_to_coe_repo(monkeypatch, request):
+    monkeypatch.setattr('nomad.config.repository_db.publish_enabled', request.param)
+    return request.param
diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py
index 6341b1cc16..040baf3f98 100644
--- a/tests/processing/test_data.py
+++ b/tests/processing/test_data.py
@@ -20,10 +20,14 @@ import json
 import re
 
 from nomad import utils, infrastructure
-from nomad.files import UploadFiles, StagingUploadFiles
+from nomad.files import UploadFiles, StagingUploadFiles, PublicUploadFiles
 from nomad.processing import Upload, Calc
 from nomad.processing.base import task as task_decorator, FAILURE, SUCCESS
 
+from tests.test_search import assert_search_upload
+from tests.test_files import assert_upload_files
+from tests.test_coe_repo import assert_coe_upload
+
 
 def test_send_mail(mails, monkeypatch):
     monkeypatch.setattr('nomad.config.mail.enabled', True)
@@ -98,6 +102,28 @@ def test_processing(processed, no_warn, mails, monkeypatch):
     assert re.search(r'Processing completed', mails.messages[0].data.decode('utf-8')) is not None
 
 
+def test_publish(processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo):
+    processed.metadata = example_user_metadata
+
+    n_calcs = processed.total_calcs
+    additional_keys = ['with_embargo']
+    if with_publish_to_coe_repo:
+        additional_keys.append('pid')
+
+    processed.publish_upload()
+    try:
+        processed.block_until_complete(interval=.01)
+    except Exception:
+        pass
+
+    assert_coe_upload(processed.upload_id, user_metadata=example_user_metadata)
+
+    assert_upload_files(
+        processed.upload_id, PublicUploadFiles, n_calcs, additional_keys, published=True)
+
+    assert_search_upload(processed.upload_id, n_calcs, additional_keys, published=True)
+
+
 @pytest.mark.timeout(10)
 def test_processing_with_warning(proc_infra, test_user, with_warn):
     example_file = 'tests/data/proc/examples_with_warning_template.zip'
diff --git a/tests/test_api.py b/tests/test_api.py
index 2191667ccc..cf85b84860 100644
--- a/tests/test_api.py
+++ b/tests/test_api.py
@@ -225,7 +225,7 @@ class TestUploads:
         assert_upload_files(upload_id, files.StagingUploadFiles, n_calcs)
         assert_search_upload(upload_id, n_calcs)
 
-    def assert_unstage(self, client, test_user_auth, upload_id, proc_infra, metadata={}):
+    def assert_published(self, client, test_user_auth, upload_id, proc_infra, with_pid=True, metadata={}):
         rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth)
         upload = self.assert_upload(rv.data)
         n_calcs = upload['calcs']['pagination']['total']
@@ -240,10 +240,13 @@ class TestUploads:
         assert upload['current_process'] == 'publish_upload'
         assert upload['process_running']
 
+        additional_keys = ['with_embargo']
+        if with_pid:
+            additional_keys.append('pid')
         self.assert_upload_does_not_exist(client, upload_id, test_user_auth)
         assert_coe_upload(upload_id, user_metadata=metadata)
-        assert_upload_files(upload_id, files.PublicUploadFiles, n_calcs, additional_keys=['with_embargo', 'pid'], published=True)
-        assert_search_upload(upload_id, n_calcs, additional_keys=['with_embargo', 'pid'], published=True)
+        assert_upload_files(upload_id, files.PublicUploadFiles, n_calcs, additional_keys=additional_keys, published=True)
+        assert_search_upload(upload_id, n_calcs, additional_keys=additional_keys, published=True)
 
     def assert_upload_does_not_exist(self, client, upload_id: str, test_user_auth):
         # poll until publish/delete completed
@@ -329,11 +332,11 @@ class TestUploads:
         yield True
         monkeypatch.setattr('nomad.processing.data.Upload.cleanup', old_cleanup)
 
-    def test_delete_unstaged(self, client, test_user_auth, proc_infra, no_warn):
+    def test_delete_published(self, client, test_user_auth, proc_infra, no_warn, with_publish_to_coe_repo):
         rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth)
         upload = self.assert_upload(rv.data)
         self.assert_processing(client, test_user_auth, upload['upload_id'])
-        self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra)
+        self.assert_published(client, test_user_auth, upload['upload_id'], proc_infra, with_pid=with_publish_to_coe_repo)
         rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth)
         assert rv.status_code == 404
 
@@ -345,11 +348,11 @@ class TestUploads:
         assert rv.status_code == 200
         self.assert_upload_does_not_exist(client, upload['upload_id'], test_user_auth)
 
-    def test_post(self, client, test_user_auth, example_upload, proc_infra, no_warn):
+    def test_post(self, client, test_user_auth, example_upload, proc_infra, no_warn, with_publish_to_coe_repo):
         rv = client.put('/uploads/?local_path=%s' % example_upload, headers=test_user_auth)
         upload = self.assert_upload(rv.data)
         self.assert_processing(client, test_user_auth, upload['upload_id'])
-        self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra)
+        self.assert_published(client, test_user_auth, upload['upload_id'], proc_infra, with_pid=with_publish_to_coe_repo)
 
     def test_post_metadata(
             self, client, proc_infra, admin_user_auth, test_user_auth, test_user,
@@ -359,7 +362,7 @@ class TestUploads:
         self.assert_processing(client, test_user_auth, upload['upload_id'])
         metadata = dict(**example_user_metadata)
         metadata['_upload_time'] = datetime.now().isoformat()
-        self.assert_unstage(client, admin_user_auth, upload['upload_id'], proc_infra, metadata)
+        self.assert_published(client, admin_user_auth, upload['upload_id'], proc_infra, metadata)
 
     def test_post_metadata_forbidden(self, client, proc_infra, test_user_auth, no_warn):
         rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth)
diff --git a/tests/test_search.py b/tests/test_search.py
index 4a754331b0..6e017b76f4 100644
--- a/tests/test_search.py
+++ b/tests/test_search.py
@@ -132,10 +132,11 @@ def assert_search_upload(upload_id, n_calcs: int, additional_keys: List[str] = [
         for hit in search:
             hit = hit.to_dict()
             for key, value in kwargs.items():
-                if key == 'published':
-                    assert int(hit.get('pid')) > 0
                 assert hit.get(key, None) == value
 
+            if 'pid' in hit:
+                assert int(hit.get('pid')) > 0
+
             for key in keys:
                 assert key in hit
 
-- 
GitLab