Commit e0d6429a authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Replaced multi queue priority routing with plain priorities.

parent 252b4916
......@@ -65,10 +65,11 @@ celery = NomadConfig(
timeout=1800, # 1/2 h
acks_late=True,
routing=CELERY_QUEUE_ROUTING,
task_queues=[
Queue('calcs', routing_key='calcs', queue_arguments={'x-max-priority': 10}),
Queue('uploads', routing_key='uploads', queue_arguments={'x-max-priority': 100})
]
priorities={
'Upload.process_upload': 5,
'Upload.delete_upload': 9,
'Upload.publish_upload': 10
}
)
fs = NomadConfig(
......
......@@ -54,7 +54,7 @@ app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
if config.celery.routing == config.CELERY_WORKER_ROUTING:
app.conf.update(worker_direct=True)
app.conf.task_queues = config.celery.task_queues
app.conf.task_queue_max_priority = 10
CREATED = 'CREATED'
PENDING = 'PENDING'
......@@ -496,14 +496,18 @@ def process(func):
self_id = self.id.__str__()
cls_name = self.__class__.__name__
queue = getattr(self.__class__, 'queue', None)
queue = None
if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
queue = 'celery@%s' % worker_direct(self.worker_hostname).name
priority = config.celery.priorities.get('%s.%s' % (cls_name, func.__name__), 1)
logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__)
logger.debug('calling process function', queue=queue)
logger.debug('calling process function', queue=queue, priority=priority)
return proc_task.apply_async(args=[cls_name, self_id, func.__name__], queue=queue)
return proc_task.apply_async(
args=[cls_name, self_id, func.__name__],
queue=queue, priority=priority)
task = getattr(func, '__task_name', None)
if task is not None:
......
......@@ -66,8 +66,6 @@ class Calc(Proc):
metadata = DictField()
queue = 'calcs'
meta: Any = {
'indexes': [
'upload_id', 'mainfile', 'parser', 'tasks_status', 'process_status'
......@@ -366,8 +364,6 @@ class Upload(Proc):
published = BooleanField(default=False)
publish_time = DateTimeField()
queue = 'uploads'
meta: Any = {
'indexes': [
'user_id', 'tasks_status', 'process_status', 'published'
......
......@@ -20,12 +20,12 @@ spec:
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
affinity:
podAntiAffinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- topologyKey: kubernetes.io/hostname
labelSelector:
matchLabels:
app.kubernetes.io/name: {{ include "nomad.name" . }}-worker
labelSelector:
matchLabels:
app.kubernetes.io/name: {{ include "nomad.name" . }}-worker
app.kubernetes.io/instance: {{ .Release.Name }}
containers:
- name: {{ include "nomad.name" . }}-worker
......@@ -56,7 +56,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-n", "$(NOMAD_CELERY_NODE_NAME)", "-Q", "calcs,uploads"]
command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-n", "$(NOMAD_CELERY_NODE_NAME)"]
livenessProbe:
exec:
command:
......
......@@ -98,14 +98,7 @@ def celery_includes():
def celery_config():
return {
'broker_url': config.rabbitmq_url(),
'task_queues': config.celery.task_queues
}
@pytest.fixture(scope='session')
def celery_worker_parameters():
return {
'queues': ('celery', 'uploads', 'calcs')
'task_queue_max_priority': 10
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment