Skip to content
Snippets Groups Projects
Commit 5c07233b authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Acks_late is now configurable.

parent c10500fb
No related branches found
No related tags found
1 merge request!34Merged latest changes torwards 4.2.
Pipeline #44713 passed
...@@ -30,7 +30,7 @@ FilesConfig = namedtuple( ...@@ -30,7 +30,7 @@ FilesConfig = namedtuple(
'FilesConfig', ['uploads_bucket', 'raw_bucket', 'archive_bucket', 'staging_bucket', 'public_bucket']) 'FilesConfig', ['uploads_bucket', 'raw_bucket', 'archive_bucket', 'staging_bucket', 'public_bucket'])
""" API independent configuration for the object storage. """ """ API independent configuration for the object storage. """
CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout']) CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout', 'acks_late'])
""" Used to configure the RabbitMQ for celery. """ """ Used to configure the RabbitMQ for celery. """
FSConfig = namedtuple('FSConfig', ['tmp', 'objects', 'nomad_tmp']) FSConfig = namedtuple('FSConfig', ['tmp', 'objects', 'nomad_tmp'])
...@@ -86,7 +86,8 @@ def get_loglevel_from_env(key, default_level=logging.INFO): ...@@ -86,7 +86,8 @@ def get_loglevel_from_env(key, default_level=logging.INFO):
celery = CeleryConfig( celery = CeleryConfig(
broker_url=rabbit_url, broker_url=rabbit_url,
max_memory=int(os.environ.get('NOMAD_CELERY_MAXMEMORY', 64e6)), # 64 GB max_memory=int(os.environ.get('NOMAD_CELERY_MAXMEMORY', 64e6)), # 64 GB
timeout=int(os.environ.get('NOMAD_CELERY_TIMEOUT', 3 * 3600)) # 3h timeout=int(os.environ.get('NOMAD_CELERY_TIMEOUT', 3 * 3600)), # 3h
acks_late=bool(os.environ.get('NOMAD_CELERY_ACKS_LATE', True))
) )
fs = FSConfig( fs = FSConfig(
......
...@@ -401,7 +401,9 @@ def unwarp_task(task, cls_name, self_id, *args, **kwargs): ...@@ -401,7 +401,9 @@ def unwarp_task(task, cls_name, self_id, *args, **kwargs):
return self return self
@app.task(bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3, acks_late=True) @app.task(
bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3,
acks_late=config.celery.acks_late)
def proc_task(task, cls_name, self_id, func_attr): def proc_task(task, cls_name, self_id, func_attr):
""" """
The celery task that is used to execute async process functions. The celery task that is used to execute async process functions.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment