Skip to content
Snippets Groups Projects
Commit 9eb6f7ae authored by Theodore Chang's avatar Theodore Chang
Browse files

Merge branch 'single-worker-dev-env' into 'develop'

Add workers config

See merge request !1926
parents 5f25325a 9adb97f9
No related branches found
No related tags found
1 merge request!1926Add workers config
Pipeline #209647 passed
......@@ -16,7 +16,6 @@
# limitations under the License.
#
import functools
import click
from nomad import utils
......@@ -61,6 +60,7 @@ def app(with_gui: bool, **kwargs):
def run_app(
*,
with_gui: bool = False,
gunicorn: bool = False,
host: str = None,
......@@ -165,11 +165,16 @@ def run_app(
server.run()
def run_worker():
def run_worker(*, workers=None):
config.meta.service = 'worker'
from nomad import processing
processing.app.worker_main(['worker', '--loglevel=INFO', '-B', '-Q', 'celery'])
params = ['worker', '--loglevel=INFO', '-B', '-Q', 'celery']
if workers is not None:
params.append(f'--concurrency={workers}')
processing.app.worker_main(params)
def run_hub():
......@@ -194,36 +199,56 @@ def run_hub():
)
def task_app(*args, **kwargs):
def task_app(**kwargs):
logger = utils.get_logger('app')
try:
run_app(*args, **kwargs)
run_app(**kwargs)
except Exception as error:
logger.exception(error)
def task_worker():
def task_worker(**kwargs):
logger = utils.get_logger('worker')
try:
run_worker()
run_worker(**kwargs)
except Exception as error:
logger.exception(error)
def appworker_(app_host=None, app_port=None):
def run_appworker(
*,
app_host: str = None,
app_port: int = None,
fastapi_workers: int = None,
celery_workers: int = None,
dev: bool = False,
):
from concurrent import futures as concurrent_futures
import asyncio
app_kwargs = {'host': app_host, 'port': app_port}
if dev:
fastapi_workers = 1
celery_workers = 1
executor = concurrent_futures.ProcessPoolExecutor(2)
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, functools.partial(task_app, **app_kwargs))
loop.run_in_executor(executor, task_worker)
with concurrent_futures.ProcessPoolExecutor(2) as executor:
executor.submit(task_worker, workers=celery_workers) # type: ignore
executor.submit(task_app, workers=fastapi_workers, host=app_host, port=app_port) # type: ignore
@run.command(help='Run both app and worker.')
@click.option('--app-host', type=str, help='Passed as app host parameter.')
@click.option('--app-port', type=int, help='Passed as app port parameter.')
def appworker(app_host: str = None, app_port: int = None):
appworker_(app_host, app_port)
@click.option(
'--app-host', type=str, default=None, help='Passed as app host parameter.'
)
@click.option(
'--app-port', type=int, default=None, help='Passed as app port parameter.'
)
@click.option(
'--fastapi-workers', type=int, default=None, help='Number of FastAPI workers.'
)
@click.option(
'--celery-workers', type=int, default=None, help='Number of Celery workers.'
)
@click.option(
'--dev', is_flag=True, default=False, help='Use one worker (for dev. env.).'
)
def appworker(**kwargs):
run_appworker(**kwargs)
......@@ -104,6 +104,7 @@ app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
if config.celery.routing == CELERY_WORKER_ROUTING:
app.conf.update(worker_direct=True)
app.conf.broker_connection_retry_on_startup = True
app.conf.task_queue_max_priority = 10
app.conf.worker_redirect_stdouts = config.process.redirect_stdouts
app.conf.worker_redirect_stdouts_level = 'INFO'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment