From b322fe930c07ce04247b21b0bf52fcabcf961260 Mon Sep 17 00:00:00 2001
From: Markus Scheidgen <markus.scheidgen@gmail.com>
Date: Thu, 28 Feb 2019 13:40:31 +0100
Subject: [PATCH] Separated task queques for upload and calc processing.

---
 nomad/client/misc.py                            | 2 +-
 nomad/processing/base.py                        | 7 +++++--
 nomad/processing/data.py                        | 4 ++++
 ops/docker-compose/nomad/docker-compose.yml     | 2 +-
 ops/helm/nomad/templates/worker-deployment.yaml | 2 +-
 tests/conftest.py                               | 7 +++++++
 tests/data/test_coe_uploads.txt                 | 4 +++-
 7 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/nomad/client/misc.py b/nomad/client/misc.py
index 2f6d024268..a818a64681 100644
--- a/nomad/client/misc.py
+++ b/nomad/client/misc.py
@@ -57,7 +57,7 @@ def run_api(**kwargs):
 def run_worker():
     config.service = 'worker'
     from nomad import processing
-    processing.app.worker_main(['worker', '--loglevel=INFO'])
+    processing.app.worker_main(['worker', '--loglevel=INFO', '-Q', 'celery,uploads,calcs'])
 
 
 @run.command(help='Run both api and worker.')
diff --git a/nomad/processing/base.py b/nomad/processing/base.py
index 0c2d065c26..11b966a788 100644
--- a/nomad/processing/base.py
+++ b/nomad/processing/base.py
@@ -467,9 +467,12 @@ def process(func):
         self_id = self.id.__str__()
         cls_name = self.__class__.__name__
 
-        logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__)
+        queue = getattr(self.__class__, 'queue', None)
+
+        logger = utils.get_logger(
+            __name__, cls=cls_name, id=self_id, func=func.__name__, queue=queue)
         logger.debug('calling process function')
-        return proc_task.s(cls_name, self_id, func.__name__).delay()
+        return proc_task.apply_async(args=[cls_name, self_id, func.__name__], queue=queue)
 
     task = getattr(func, '__task_name', None)
     if task is not None:
diff --git a/nomad/processing/data.py b/nomad/processing/data.py
index 858dbc3ac8..c857cd4f1f 100644
--- a/nomad/processing/data.py
+++ b/nomad/processing/data.py
@@ -60,6 +60,8 @@ class Calc(Proc):
     mainfile = StringField()
     parser = StringField()
 
+    queue = 'calcs'
+
     meta: Any = {
         'indexes': [
             'upload_id', 'mainfile', 'parser', 'tasks_status'
@@ -311,6 +313,8 @@ class Upload(Proc):
     upload_time = DateTimeField()
     user_id = StringField(required=True)
 
+    queue = 'uploads'
+
     meta: Any = {
         'indexes': [
             'user_id', 'tasks_status'
diff --git a/ops/docker-compose/nomad/docker-compose.yml b/ops/docker-compose/nomad/docker-compose.yml
index d612faac96..b050044fcc 100644
--- a/ops/docker-compose/nomad/docker-compose.yml
+++ b/ops/docker-compose/nomad/docker-compose.yml
@@ -83,7 +83,7 @@ services:
             - mongo
         volumes:
             - ${VOLUME_BINDS}/fs:/app/.volumes/fs
-        command: python -m celery worker -l info -A nomad.processing
+        command: python -m celery worker -l info -A nomad.processing -Q celery,cacls,uploads
 
     # nomad api
     api:
diff --git a/ops/helm/nomad/templates/worker-deployment.yaml b/ops/helm/nomad/templates/worker-deployment.yaml
index 6f04af4f4d..4819bab4f3 100644
--- a/ops/helm/nomad/templates/worker-deployment.yaml
+++ b/ops/helm/nomad/templates/worker-deployment.yaml
@@ -77,7 +77,7 @@ spec:
           value: "{{ .Values.mail.password }}"
         - name: NOMAD_MAIL_FROM
           value: "{{ .Values.mail.from }}"
-        command: ["python", "-m", "celery", "worker", "-l", "info", "-A", "nomad.processing"]
+        command: ["python", "-m", "celery", "worker", "-l", "info", "-A", "nomad.processing", "-Q", "celery,cacls,uploads"]
         livenessProbe:
           exec:
             command:
diff --git a/tests/conftest.py b/tests/conftest.py
index dee37760da..d00fd9c0ab 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -98,6 +98,13 @@ def celery_config():
     }
 
 
+@pytest.fixture(scope='session')
+def celery_worker_parameters():
+    return {
+        'queues': ('celery', 'uploads', 'calcs')
+    }
+
+
 @pytest.fixture(scope='session')
 def purged_app(celery_session_app):
     """
diff --git a/tests/data/test_coe_uploads.txt b/tests/data/test_coe_uploads.txt
index 6cdd260abe..d8c6a82319 100755
--- a/tests/data/test_coe_uploads.txt
+++ b/tests/data/test_coe_uploads.txt
@@ -12,4 +12,6 @@
 # aims, extracted
 /nomad/repository/data/extracted/0wFKM2jQ7qH4f3moVtW3_LI7bpL-smp9K2oXlQFI
 # very large repo of very small vasp files
-/nomad/repository/data/extracted/01ff7cb7276543e5ad72cd74d249a8ca
\ No newline at end of file
+/nomad/repository/data/extracted/01ff7cb7276543e5ad72cd74d249a8ca
+# small one calc aims
+/nomad/repository/data/extracted/0p-_qU8NvoQ9L5A6INOe3-hi0GdkUOMm9o66pAFI
\ No newline at end of file
-- 
GitLab