diff --git a/nomad/client/misc.py b/nomad/client/misc.py index 2f6d024268456c08f1ed2a65b4118fa8f3046916..a818a64681b84683aad7ef704e00bc2166e89182 100644 --- a/nomad/client/misc.py +++ b/nomad/client/misc.py @@ -57,7 +57,7 @@ def run_api(**kwargs): def run_worker(): config.service = 'worker' from nomad import processing - processing.app.worker_main(['worker', '--loglevel=INFO']) + processing.app.worker_main(['worker', '--loglevel=INFO', '-Q', 'celery,uploads,calcs']) @run.command(help='Run both api and worker.') diff --git a/nomad/processing/base.py b/nomad/processing/base.py index 0c2d065c263b34c8144aa46f9c91afb166fd19e8..11b966a7881e3b02557f5a361f336ac8c4ca406f 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -467,9 +467,12 @@ def process(func): self_id = self.id.__str__() cls_name = self.__class__.__name__ - logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__) + queue = getattr(self.__class__, 'queue', None) + + logger = utils.get_logger( + __name__, cls=cls_name, id=self_id, func=func.__name__, queue=queue) logger.debug('calling process function') - return proc_task.s(cls_name, self_id, func.__name__).delay() + return proc_task.apply_async(args=[cls_name, self_id, func.__name__], queue=queue) task = getattr(func, '__task_name', None) if task is not None: diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 858dbc3ac8ffe6717f0db5aa2dea03b13133dc23..c857cd4f1f2d1be150b919a0ac7a19f164fb49b8 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -60,6 +60,8 @@ class Calc(Proc): mainfile = StringField() parser = StringField() + queue = 'calcs' + meta: Any = { 'indexes': [ 'upload_id', 'mainfile', 'parser', 'tasks_status' @@ -311,6 +313,8 @@ class Upload(Proc): upload_time = DateTimeField() user_id = StringField(required=True) + queue = 'uploads' + meta: Any = { 'indexes': [ 'user_id', 'tasks_status' diff --git a/ops/docker-compose/nomad/docker-compose.yml b/ops/docker-compose/nomad/docker-compose.yml index d612faac961ba4b85280be17139ab53f2eabaf8a..b050044fcc85de8447c0cb62359571a858a9ce78 100644 --- a/ops/docker-compose/nomad/docker-compose.yml +++ b/ops/docker-compose/nomad/docker-compose.yml @@ -83,7 +83,7 @@ services: - mongo volumes: - ${VOLUME_BINDS}/fs:/app/.volumes/fs - command: python -m celery worker -l info -A nomad.processing + command: python -m celery worker -l info -A nomad.processing -Q celery,cacls,uploads # nomad api api: diff --git a/ops/helm/nomad/templates/worker-deployment.yaml b/ops/helm/nomad/templates/worker-deployment.yaml index 6f04af4f4db7c20c942a0cf7ff53e13a313c0a3c..4819bab4f304303693413ee911a02cb6b6a294d4 100644 --- a/ops/helm/nomad/templates/worker-deployment.yaml +++ b/ops/helm/nomad/templates/worker-deployment.yaml @@ -77,7 +77,7 @@ spec: value: "{{ .Values.mail.password }}" - name: NOMAD_MAIL_FROM value: "{{ .Values.mail.from }}" - command: ["python", "-m", "celery", "worker", "-l", "info", "-A", "nomad.processing"] + command: ["python", "-m", "celery", "worker", "-l", "info", "-A", "nomad.processing", "-Q", "celery,cacls,uploads"] livenessProbe: exec: command: diff --git a/tests/conftest.py b/tests/conftest.py index dee37760da942d9b32e5b7b3e132d9fcdee5796a..d00fd9c0abe1b608e3ed3e758f95fbc92a404f15 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -98,6 +98,13 @@ def celery_config(): } +@pytest.fixture(scope='session') +def celery_worker_parameters(): + return { + 'queues': ('celery', 'uploads', 'calcs') + } + + @pytest.fixture(scope='session') def purged_app(celery_session_app): """ diff --git a/tests/data/test_coe_uploads.txt b/tests/data/test_coe_uploads.txt index 6cdd260abeea0b805f8d6486a7151f7ec4fe37ee..d8c6a82319575066e285dc42caa1103735345b3f 100755 --- a/tests/data/test_coe_uploads.txt +++ b/tests/data/test_coe_uploads.txt @@ -12,4 +12,6 @@ # aims, extracted /nomad/repository/data/extracted/0wFKM2jQ7qH4f3moVtW3_LI7bpL-smp9K2oXlQFI # very large repo of very small vasp files -/nomad/repository/data/extracted/01ff7cb7276543e5ad72cd74d249a8ca \ No newline at end of file +/nomad/repository/data/extracted/01ff7cb7276543e5ad72cd74d249a8ca +# small one calc aims +/nomad/repository/data/extracted/0p-_qU8NvoQ9L5A6INOe3-hi0GdkUOMm9o66pAFI \ No newline at end of file