Commit 71abf60e authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Changed celery task time limit config.

parent 34540592
Pipeline #45311 passed with stages
in 26 minutes and 50 seconds
......@@ -29,7 +29,8 @@ warnings.filterwarnings("ignore", message="numpy.ufunc size changed")
CELERY_WORKER_ROUTING = 'worker'
CELERY_QUEUE_ROUTING = 'queue'
CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout', 'acks_late', 'routing'])
CeleryConfig = namedtuple('Celery', [
'broker_url', 'max_memory', 'timeout', 'acks_late', 'routing'])
"""
Used to configure the RabbitMQ for celery.
......@@ -89,7 +90,7 @@ def get_loglevel_from_env(key, default_level=logging.INFO):
celery = CeleryConfig(
broker_url=rabbit_url,
max_memory=int(os.environ.get('NOMAD_CELERY_MAXMEMORY', 64e6)), # 64 GB
timeout=int(os.environ.get('NOMAD_CELERY_TIMEOUT', 3 * 3600)), # 3h
timeout=int(os.environ.get('NOMAD_CELERY_TIMEOUT', 1800)), # 1/2h
acks_late=bool(os.environ.get('NOMAD_CELERY_ACKS_LATE', True)),
routing=os.environ.get('NOMAD_CELERY_ROUTING', CELERY_QUEUE_ROUTING)
)
......
......@@ -19,6 +19,7 @@ from celery import Celery, Task
from celery.worker.request import Request
from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init
from celery.utils import worker_direct
from celery.exceptions import SoftTimeLimitExceeded
from billiard.exceptions import WorkerLostError
from mongoengine import Document, StringField, ListField, DateTimeField, ValidationError
from mongoengine.connection import MongoEngineConnectionError
......@@ -50,7 +51,6 @@ def setup(**kwargs):
app = Celery('nomad.processing', broker=config.celery.broker_url)
app.conf.update(worker_hijack_root_logger=False)
app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
app.conf.update(task_time_limit=config.celery.timeout)
if config.celery.routing == config.CELERY_WORKER_ROUTING:
app.conf.update(worker_direct=True)
......@@ -416,7 +416,8 @@ def unwarp_task(task, cls_name, self_id, *args, **kwargs):
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3,
acks_late=config.celery.acks_late)
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout + 120)
def proc_task(task, cls_name, self_id, func_attr):
"""
The celery task that is used to execute async process functions.
......@@ -457,6 +458,9 @@ def proc_task(task, cls_name, self_id, func_attr):
try:
self.process_status = PROCESS_RUNNING
deleted = func(self)
except SoftTimeLimitExceeded as e:
logger.error('exceeded the celery task soft time limit')
self.fail(e)
except Exception as e:
self.fail(e)
finally:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment