Commit b322fe93 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Separated task queques for upload and calc processing.

parent 4201086d
Pipeline #44357 passed with stages
in 26 minutes and 32 seconds
......@@ -57,7 +57,7 @@ def run_api(**kwargs):
def run_worker():
config.service = 'worker'
from nomad import processing
processing.app.worker_main(['worker', '--loglevel=INFO'])
processing.app.worker_main(['worker', '--loglevel=INFO', '-Q', 'celery,uploads,calcs'])
@run.command(help='Run both api and worker.')
......
......@@ -467,9 +467,12 @@ def process(func):
self_id = self.id.__str__()
cls_name = self.__class__.__name__
logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__)
queue = getattr(self.__class__, 'queue', None)
logger = utils.get_logger(
__name__, cls=cls_name, id=self_id, func=func.__name__, queue=queue)
logger.debug('calling process function')
return proc_task.s(cls_name, self_id, func.__name__).delay()
return proc_task.apply_async(args=[cls_name, self_id, func.__name__], queue=queue)
task = getattr(func, '__task_name', None)
if task is not None:
......
......@@ -60,6 +60,8 @@ class Calc(Proc):
mainfile = StringField()
parser = StringField()
queue = 'calcs'
meta: Any = {
'indexes': [
'upload_id', 'mainfile', 'parser', 'tasks_status'
......@@ -311,6 +313,8 @@ class Upload(Proc):
upload_time = DateTimeField()
user_id = StringField(required=True)
queue = 'uploads'
meta: Any = {
'indexes': [
'user_id', 'tasks_status'
......
......@@ -83,7 +83,7 @@ services:
- mongo
volumes:
- ${VOLUME_BINDS}/fs:/app/.volumes/fs
command: python -m celery worker -l info -A nomad.processing
command: python -m celery worker -l info -A nomad.processing -Q celery,cacls,uploads
# nomad api
api:
......
......@@ -77,7 +77,7 @@ spec:
value: "{{ .Values.mail.password }}"
- name: NOMAD_MAIL_FROM
value: "{{ .Values.mail.from }}"
command: ["python", "-m", "celery", "worker", "-l", "info", "-A", "nomad.processing"]
command: ["python", "-m", "celery", "worker", "-l", "info", "-A", "nomad.processing", "-Q", "celery,cacls,uploads"]
livenessProbe:
exec:
command:
......
......@@ -98,6 +98,13 @@ def celery_config():
}
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
'queues': ('celery', 'uploads', 'calcs')
}
@pytest.fixture(scope='session')
def purged_app(celery_session_app):
"""
......
......@@ -13,3 +13,5 @@
/nomad/repository/data/extracted/0wFKM2jQ7qH4f3moVtW3_LI7bpL-smp9K2oXlQFI
# very large repo of very small vasp files
/nomad/repository/data/extracted/01ff7cb7276543e5ad72cd74d249a8ca
# small one calc aims
/nomad/repository/data/extracted/0p-_qU8NvoQ9L5A6INOe3-hi0GdkUOMm9o66pAFI
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment