From 9adb97f9ca2523a6d57fd9bf0dc3acd7bd623c89 Mon Sep 17 00:00:00 2001
From: Theodore Chang <tlcfem@gmail.com>
Date: Sat, 22 Jun 2024 00:28:22 +0200
Subject: [PATCH] Add workers config

---
 nomad/cli/admin/run.py   | 61 ++++++++++++++++++++++++++++------------
 nomad/processing/base.py |  1 +
 2 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/nomad/cli/admin/run.py b/nomad/cli/admin/run.py
index 04487ba7d6..fb807a75b1 100644
--- a/nomad/cli/admin/run.py
+++ b/nomad/cli/admin/run.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 #
 
-import functools
 import click
 
 from nomad import utils
@@ -61,6 +60,7 @@ def app(with_gui: bool, **kwargs):
 
 
 def run_app(
+    *,
     with_gui: bool = False,
     gunicorn: bool = False,
     host: str = None,
@@ -165,11 +165,16 @@ def run_app(
         server.run()
 
 
-def run_worker():
+def run_worker(*, workers=None):
     config.meta.service = 'worker'
     from nomad import processing
 
-    processing.app.worker_main(['worker', '--loglevel=INFO', '-B', '-Q', 'celery'])
+    params = ['worker', '--loglevel=INFO', '-B', '-Q', 'celery']
+
+    if workers is not None:
+        params.append(f'--concurrency={workers}')
+
+    processing.app.worker_main(params)
 
 
 def run_hub():
@@ -194,36 +199,56 @@ def run_hub():
     )
 
 
-def task_app(*args, **kwargs):
+def task_app(**kwargs):
     logger = utils.get_logger('app')
     try:
-        run_app(*args, **kwargs)
+        run_app(**kwargs)
     except Exception as error:
         logger.exception(error)
 
 
-def task_worker():
+def task_worker(**kwargs):
     logger = utils.get_logger('worker')
     try:
-        run_worker()
+        run_worker(**kwargs)
     except Exception as error:
         logger.exception(error)
 
 
-def appworker_(app_host=None, app_port=None):
+def run_appworker(
+    *,
+    app_host: str = None,
+    app_port: int = None,
+    fastapi_workers: int = None,
+    celery_workers: int = None,
+    dev: bool = False,
+):
     from concurrent import futures as concurrent_futures
-    import asyncio
 
-    app_kwargs = {'host': app_host, 'port': app_port}
+    if dev:
+        fastapi_workers = 1
+        celery_workers = 1
 
-    executor = concurrent_futures.ProcessPoolExecutor(2)
-    loop = asyncio.get_event_loop()
-    loop.run_in_executor(executor, functools.partial(task_app, **app_kwargs))
-    loop.run_in_executor(executor, task_worker)
+    with concurrent_futures.ProcessPoolExecutor(2) as executor:
+        executor.submit(task_worker, workers=celery_workers)  # type: ignore
+        executor.submit(task_app, workers=fastapi_workers, host=app_host, port=app_port)  # type: ignore
 
 
 @run.command(help='Run both app and worker.')
-@click.option('--app-host', type=str, help='Passed as app host parameter.')
-@click.option('--app-port', type=int, help='Passed as app port parameter.')
-def appworker(app_host: str = None, app_port: int = None):
-    appworker_(app_host, app_port)
+@click.option(
+    '--app-host', type=str, default=None, help='Passed as app host parameter.'
+)
+@click.option(
+    '--app-port', type=int, default=None, help='Passed as app port parameter.'
+)
+@click.option(
+    '--fastapi-workers', type=int, default=None, help='Number of FastAPI workers.'
+)
+@click.option(
+    '--celery-workers', type=int, default=None, help='Number of Celery workers.'
+)
+@click.option(
+    '--dev', is_flag=True, default=False, help='Use one worker (for dev. env.).'
+)
+def appworker(**kwargs):
+    run_appworker(**kwargs)
diff --git a/nomad/processing/base.py b/nomad/processing/base.py
index a9bb9afc88..b9c0dddbd2 100644
--- a/nomad/processing/base.py
+++ b/nomad/processing/base.py
@@ -104,6 +104,7 @@ app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
 if config.celery.routing == CELERY_WORKER_ROUTING:
     app.conf.update(worker_direct=True)
 
+app.conf.broker_connection_retry_on_startup = True
 app.conf.task_queue_max_priority = 10
 app.conf.worker_redirect_stdouts = config.process.redirect_stdouts
 app.conf.worker_redirect_stdouts_level = 'INFO'
-- 
GitLab