Commit 3031a5d9 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactorad nomad client and admin cli.

parent f65899fe
Pipeline #51377 passed with stages
in 19 minutes and 33 seconds
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Swagger/bravado based python client library for the API and various usefull shell commands.
"""
from . import upload, run
from .__main__ import cli
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
import logging
import os
import sys
import shutil
from elasticsearch_dsl import A
from nomad import config as nomad_config, infrastructure, processing
from nomad.search import Search
@click.group(help='''The nomad admin command to do nasty stuff directly on the databases.
Remember: With great power comes great responsibility!''')
@click.option('-v', '--verbose', help='sets log level to info', is_flag=True)
@click.option('--debug', help='sets log level to debug', is_flag=True)
@click.option('--config', help='the config file to use')
def cli(verbose: bool, debug: bool, config: str):
if config is not None:
nomad_config.load_config(config_file=config)
if debug:
nomad_config.console_log_level = logging.DEBUG
elif verbose:
nomad_config.console_log_level = logging.INFO
else:
nomad_config.console_log_level = logging.WARNING
nomad_config.service = os.environ.get('NOMAD_SERVICE', 'admin')
infrastructure.setup_logging()
@cli.command(help='Runs tests and linting. Useful before commit code.')
@click.option('--skip-tests', help='Do not test, just do code checks.', is_flag=True)
def qa(skip_tests: bool):
os.chdir(os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
ret_code = 0
if not skip_tests:
click.echo('Run tests ...')
ret_code += os.system('python -m pytest -svx tests')
click.echo('Run code style checks ...')
ret_code += os.system('python -m pycodestyle --ignore=E501,E701 nomad tests')
click.echo('Run linter ...')
ret_code += os.system('python -m pylint --load-plugins=pylint_mongoengine nomad tests')
click.echo('Run static type checks ...')
ret_code += os.system('python -m mypy --ignore-missing-imports --follow-imports=silent --no-strict-optional nomad tests')
sys.exit(ret_code)
@cli.command(help='Checks consistency of files and es vs mongo and deletes orphan entries.')
@click.option('--dry', is_flag=True, help='Do not delete anything, just check.')
@click.option('--skip-fs', is_flag=True, help='Skip cleaning the filesystem.')
@click.option('--skip-es', is_flag=True, help='Skip cleaning the es index.')
def clean(dry, skip_fs, skip_es):
infrastructure.setup_logging()
infrastructure.setup_mongo()
infrastructure.setup_elastic()
if not skip_fs:
upload_dirs = []
for bucket in [nomad_config.fs.public, nomad_config.fs.staging]:
for prefix in os.listdir(nomad_config.fs.public):
for upload in os.listdir(os.path.join(nomad_config.fs.public, prefix)):
upload_dirs.append((upload, os.path.join(nomad_config.fs.public, prefix, upload)))
to_delete = list(
path for upload, path in upload_dirs
if processing.Upload.objects(upload_id=upload).first() is None)
if not dry and len(to_delete) > 0:
input('Will delete %d upload directories. Press any key to continue ...' % len(to_delete))
for path in to_delete:
shutil.rmtree(path)
else:
print('Found %d upload directories with no upload in mongo.' % len(to_delete))
if not skip_es:
search = Search(index=nomad_config.elastic.index_name)
search.aggs.bucket('uploads', A('terms', field='upload_id', size=12000))
response = search.execute()
to_delete = list(
(bucket.key, bucket.doc_count)
for bucket in response.aggregations.uploads.buckets
if processing.Upload.objects(upload_id=bucket.key).first() is None)
calcs = 0
for _, upload_calcs in to_delete:
calcs += upload_calcs
if not dry and len(to_delete) > 0:
input(
'Will delete %d calcs in %d uploads from ES. Press any key to continue ...' %
(calcs, len(to_delete)))
for upload, _ in to_delete:
Search(index=nomad_config.elastic.index_name).query('term', upload_id=upload).delete()
else:
print('Found %d calcs in %d uploads from ES with no upload in mongo.' % (calcs, len(to_delete)))
if __name__ == '__main__':
cli() # pylint: disable=E1120
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
import asyncio
from concurrent.futures import ProcessPoolExecutor
from nomad import config
from nomad.admin.__main__ import cli
@cli.group(help='Run a nomad service locally (outside docker).')
def run():
pass
@run.command(help='Run the nomad development worker.')
def worker():
run_worker()
@run.command(help='Run the nomad development api.')
@click.option('--debug', help='Does run flask in debug.', is_flag=True)
def api(debug: bool):
run_api(debug=debug)
def run_api(**kwargs):
config.service = 'api'
from nomad import infrastructure
from nomad.api.__main__ import run_dev_server
infrastructure.setup()
run_dev_server(port=8000, **kwargs)
def run_worker():
config.service = 'worker'
from nomad import processing
processing.app.worker_main(['worker', '--loglevel=INFO', '-Q', 'celery,uploads,calcs'])
@run.command(help='Run both api and worker.')
def apiworker():
executor = ProcessPoolExecutor(2)
loop = asyncio.get_event_loop()
loop.run_in_executor(executor, run_api)
loop.run_in_executor(executor, run_worker)
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
from tabulate import tabulate
from mongoengine import Q
from nomad import processing as proc, infrastructure, utils
from .__main__ import cli
uploads = None
query = None
@cli.group(help='Upload related commands')
@click.option('--upload', help='Select upload of with given id', type=str)
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
def upload(upload: str, user: str, staging: bool, processing: bool):
infrastructure.setup_mongo()
infrastructure.setup_elastic()
global query
query = Q()
if upload is not None:
query &= Q(upload_id=upload)
if user is not None:
query &= Q(user_id=user)
if staging:
query &= Q(published=False)
if processing:
query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)
global uploads
uploads = proc.Upload.objects(query)
@upload.command(help='List selected uploads')
def ls():
print('%d uploads selected, showing no more than first 10' % uploads.count())
print(tabulate(
[
[upload.upload_id, upload.name, upload.user_id, upload.process_status, upload.published]
for upload in uploads[:10]],
headers=['id', 'name', 'user', 'status', 'published']))
@upload.command(help='Delete selected upload')
@click.option('--with-coe-repo', help='Also attempt to delete from repository db', is_flag=True)
def rm(with_coe_repo):
print('%d uploads selected, deleting ...' % uploads.count())
for upload in uploads:
upload.delete_upload_local(with_coe_repo=with_coe_repo)
@upload.command(help='Attempt to abort the processing of uploads.')
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
def stop(calcs: bool, kill: bool):
logger = utils.get_logger(__name__)
def stop_all(query):
for proc in query:
logger_kwargs = dict(upload_id=proc.upload_id)
if isinstance(proc, proc.Calc):
logger_kwargs.update(calc_id=proc.calc_id)
logger.info(
'send terminate celery task', celery_task_id=proc.celery_task_id,
kill=kill, **logger_kwargs)
kwargs = {}
if kill:
kwargs.update(signal='SIGKILL')
try:
proc.app.control.revoke(proc.celery_task_id, terminate=True, **kwargs)
except Exception as e:
logger.warning(
'could not revoke celery task', exc_info=e,
celery_task_id=proc.celery_task_id, **logger_kwargs)
if kill:
logger.info(
'fail proc', celery_task_id=proc.celery_task_id, kill=kill,
**logger_kwargs)
proc.fail('process terminate via nomad cli')
proc.process_status = proc.PROCESS_COMPLETED
proc.on_process_complete(None)
proc.save()
stop_all(proc.Calc.objects(query))
if not calcs:
stop_all(proc.Upload.objects(query))
......@@ -16,6 +16,6 @@
Swagger/bravado based python client library for the API and various usefull shell commands.
"""
from . import local, migration, misc, upload, integrationtests
from .main import cli, create_client
from . import local, migration, upload, integrationtests
from .__main__ import cli, create_client
from .upload import stream_upload_with_client
......@@ -12,7 +12,100 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import requests
import click
import logging
from bravado.requests_client import RequestsClient
from bravado.client import SwaggerClient
from urllib.parse import urlparse
import nomad.client
from nomad import config as nomad_config
from nomad import utils, infrastructure
def create_client():
return _create_client()
def _create_client(*args, **kwargs):
return __create_client(*args, **kwargs)
def __create_client(user: str = nomad_config.client.user, password: str = nomad_config.client.password):
""" A factory method to create the client. """
host = urlparse(nomad_config.client.url).netloc.split(':')[0]
http_client = RequestsClient()
if user is not None:
http_client.set_basic_auth(host, user, password)
client = SwaggerClient.from_url(
'%s/swagger.json' % nomad_config.client.url,
http_client=http_client)
utils.get_logger(__name__).info('created bravado client', user=user)
return client
def handle_common_errors(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except requests.exceptions.ConnectionError:
click.echo(
'\nCould not connect to nomad at %s. '
'Check connection and url.' % nomad_config.client.url)
sys.exit(0)
return wrapper
@click.group()
@click.option('-n', '--url', default=nomad_config.client.url, help='The URL where nomad is running, default is "%s".' % nomad_config.client.url)
@click.option('-u', '--user', default=None, help='the user name to login, default is "%s" login.' % nomad_config.client.user)
@click.option('-w', '--password', default=nomad_config.client.password, help='the password used to login.')
@click.option('-v', '--verbose', help='sets log level to info', is_flag=True)
@click.option('--debug', help='sets log level to debug', is_flag=True)
@click.option('--config', help='the config file to use')
def cli(url: str, verbose: bool, debug: bool, user: str, password: str, config: str):
if config is not None:
nomad_config.load_config(config_file=config)
if debug:
nomad_config.console_log_level = logging.DEBUG
elif verbose:
nomad_config.console_log_level = logging.INFO
else:
nomad_config.console_log_level = logging.WARNING
nomad_config.service = os.environ.get('NOMAD_SERVICE', 'client')
infrastructure.setup_logging()
logger = utils.get_logger(__name__)
logger.info('Used nomad is %s' % url)
logger.info('Used user is %s' % user)
nomad_config.client.url = url
global _create_client
def _create_client(*args, **kwargs): # pylint: disable=W0612
if user is not None:
logger.info('create client', user=user)
return __create_client(user=user, password=password)
else:
logger.info('create anonymous client')
return __create_client()
@cli.command(help='Attempts to reset the nomad.')
def reset():
from .__main__ import create_client
create_client().admin.exec_reset_command().response()
if __name__ == '__main__':
nomad.client.cli() # pylint: disable=E1120
......@@ -19,7 +19,7 @@ as a final integration test.
import time
from .main import cli
from .__main__ import cli
example_file = 'tests/data/proc/examples_vasp.zip'
......@@ -27,7 +27,7 @@ example_file = 'tests/data/proc/examples_vasp.zip'
@cli.command(help='Runs a few example operations as a test.')
def integrationtests():
from .main import create_client
from .__main__ import create_client
client = create_client()
print('upload with multiple code data')
......
......@@ -28,7 +28,7 @@ from nomad.parsing import parser_dict, LocalBackend, match_parser
from nomad.normalizing import normalizers
from nomad.datamodel import CalcWithMetadata
from .main import cli
from .__main__ import cli
class CalcProcReproduction:
......@@ -58,7 +58,7 @@ class CalcProcReproduction:
self.mainfile = mainfile
self.parser = None
from .main import create_client
from .__main__ import create_client
client = create_client()
if self.mainfile is None:
try:
......
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import requests
import click
import logging
from bravado.requests_client import RequestsClient
from bravado.client import SwaggerClient
from urllib.parse import urlparse
from nomad import config as nomad_config
from nomad import utils, infrastructure
def create_client():
return _create_client()
def _create_client(*args, **kwargs):
return __create_client(*args, **kwargs)
def __create_client(user: str = nomad_config.client.user, password: str = nomad_config.client.password):
""" A factory method to create the client. """
host = urlparse(nomad_config.client.url).netloc.split(':')[0]
http_client = RequestsClient()
if user is not None:
http_client.set_basic_auth(host, user, password)
client = SwaggerClient.from_url(
'%s/swagger.json' % nomad_config.client.url,
http_client=http_client)
utils.get_logger(__name__).info('created bravado client', user=user)
return client
def handle_common_errors(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except requests.exceptions.ConnectionError:
click.echo(
'\nCould not connect to nomad at %s. '
'Check connection and url.' % nomad_config.client.url)
sys.exit(0)
return wrapper
@click.group()
@click.option('-n', '--url', default=nomad_config.client.url, help='The URL where nomad is running, default is "%s".' % nomad_config.client.url)
@click.option('-u', '--user', default=None, help='the user name to login, default is "%s" login.' % nomad_config.client.user)
@click.option('-w', '--password', default=nomad_config.client.password, help='the password used to login.')
@click.option('-v', '--verbose', help='sets log level to info', is_flag=True)
@click.option('--debug', help='sets log level to debug', is_flag=True)
@click.option('--config', help='the config file to use')
def cli(url: str, verbose: bool, debug: bool, user: str, password: str, config: str):
if config is not None:
nomad_config.load_config(config_file=config)
if debug:
nomad_config.console_log_level = logging.DEBUG
elif verbose:
nomad_config.console_log_level = logging.INFO
else:
nomad_config.console_log_level = logging.WARNING
nomad_config.service = os.environ.get('NOMAD_SERVICE', 'client')
infrastructure.setup_logging()
logger = utils.get_logger(__name__)
logger.info('Used nomad is %s' % url)
logger.info('Used user is %s' % user)
nomad_config.client.url = url
global _create_client
def _create_client(*args, **kwargs): # pylint: disable=W0612
if user is not None:
logger.info('create client', user=user)
return __create_client(user=user, password=password)
else:
logger.info('create anonymous client')
return __create_client()
......@@ -26,7 +26,7 @@ import json
from nomad import config, infrastructure
from nomad.migration import NomadCOEMigration, SourceCalc, Package, missing_calcs_data
from .main import cli
from .__main__ import cli
def _Migration(**kwargs) -> NomadCOEMigration:
......
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os.path
import os
import shutil
import sys
import click
import asyncio
from concurrent.futures import ProcessPoolExecutor
from mongoengine import Q
from elasticsearch_dsl import Search, A
from nomad import config, infrastructure, processing, utils, files, search
from .main import cli
@cli.group(help='Processing related functions')
def proc():
pass
@proc.command(help='List processing tasks')
def ls():
infrastructure.setup_logging()
infrastructure.setup_mongo()
def ls(query):
for proc in query:
print(proc)