Commit 18912198 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added a proc command to control proc task from cli. [skip ci]

parent 71abf60e
Pipeline #45314 skipped
......@@ -18,12 +18,53 @@ import sys
import click
import asyncio
from concurrent.futures import ProcessPoolExecutor
from celery.task.control import revoke
from nomad import config
from nomad import config, infrastructure, processing, utils
from .main import cli
@cli.group(help='Processing related functions')
def proc():
pass
@proc.command(help='Stop all running processing')
@click.option('--calcs', is_flag=True, help='Only stop calculation processing')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure')
def stop_all(calcs: bool, kill: bool):
infrastructure.setup_logging()
infrastructure.setup_mongo()
logger = utils.get_logger(__name__)
def stop_all(query):
for proc in query:
logger_kwargs = dict(upload_id=proc.upload_id)
if isinstance(proc, processing.Calc):
logger_kwargs.update(calc_id=proc.calc_id)
logger.info(
'send terminate celery task', celery_task_id=proc.celery_task_id,
kill=kill, **logger_kwargs)
kwargs = {}
if kill:
kwargs.update(signal='SIGKILL')
revoke(proc.celery_task_id, terminate=True, **kwargs)
if kill:
logger.info(
'fail proc', celery_task_id=proc.celery_task_id, kill=kill,
**logger_kwargs)
proc.fail('process terminate via nomad cli')
stop_all(processing.Calc.objects(process_status=processing.PROCESS_RUNNING))
if not calcs:
stop_all(processing.Upload.objects(process_status=processing.PROCESS_RUNNING))
@cli.command(help='Attempts to reset the nomad.')
def reset():
from .main import create_client
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment