Commit fa94acbd authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added some tests for CLI. Added parallelization to the re-process cli command.

parent 4a62550b
......@@ -64,8 +64,10 @@ your browser.
- Refactored search API that allows to search for entries (paginated + scroll),
metrics based on quantity aggregations (+ paginated entries), quantity aggregations
with all values via `after` key (+ paginated entries).
- Reprocessing of published results (e.g. after parser/normalizer improvements).
- Mirror functionality.
- reprocessing of published results (e.g. after parser/normalizer improvements)
- mirror functionality
- refactored command line interface (CLI)
- many minor bugfixes
### v0.4.7
- more migration scripts
......
......@@ -13,4 +13,4 @@
# limitations under the License.
from . import admin, uploads, run
from . import admin, uploads, run, clean
......@@ -29,7 +29,8 @@ from .admin import admin
@click.option('--skip-calcs', is_flag=True, help='Skip cleaning calcs with missing uploads.')
@click.option('--skip-fs', is_flag=True, help='Skip cleaning the filesystem.')
@click.option('--skip-es', is_flag=True, help='Skip cleaning the es index.')
def clean(dry, skip_calcs, skip_fs, skip_es):
@click.option('--force', is_flag=True, help='Do not ask for confirmation.')
def clean(dry, skip_calcs, skip_fs, skip_es, force):
infrastructure.setup_logging()
mongo_client = infrastructure.setup_mongo()
infrastructure.setup_elastic()
......@@ -46,7 +47,8 @@ def clean(dry, skip_calcs, skip_fs, skip_es):
missing_uploads.append(upload_for_calc)
if not dry and len(missing_uploads) > 0:
input('Will delete calcs (mongo + es) for %d missing uploads. Press any key to continue ...' % len(missing_uploads))
if not force:
input('Will delete calcs (mongo + es) for %d missing uploads. Press any key to continue ...' % len(missing_uploads))
for upload in missing_uploads:
mongo_client[nomad_config.mongo.db_name]['calc'].remove(dict(upload_id=upload))
......@@ -69,7 +71,8 @@ def clean(dry, skip_calcs, skip_fs, skip_es):
if processing.Upload.objects(upload_id=upload).first() is None)
if not dry and len(to_delete) > 0:
input('Will delete %d upload directories. Press any key to continue ...' % len(to_delete))
if not force:
input('Will delete %d upload directories. Press any key to continue ...' % len(to_delete))
for path in to_delete:
shutil.rmtree(path)
......@@ -94,9 +97,10 @@ def clean(dry, skip_calcs, skip_fs, skip_es):
calcs += upload_calcs
if not dry and len(to_delete) > 0:
input(
'Will delete %d calcs in %d uploads from ES. Press any key to continue ...' %
(calcs, len(to_delete)))
if not force:
input(
'Will delete %d calcs in %d uploads from ES. Press any key to continue ...' %
(calcs, len(to_delete)))
for upload, _ in to_delete:
Search(index=nomad_config.elastic.index_name).query('term', upload_id=upload).delete()
else:
......
......@@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
import click
from tabulate import tabulate
from mongoengine import Q
from pymongo import UpdateOne
import threading
from nomad import processing as proc, config, infrastructure, utils, search, files, coe_repo
from .admin import admin
......@@ -149,12 +151,22 @@ def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads):
def re_process(ctx, uploads, parallel: int):
_, uploads = query_uploads(ctx, uploads)
uploads_count = uploads.count()
cv = threading.Condition()
threads: List[threading.Thread] = []
state = dict(
completed_count=0,
available_threads_count=parallel)
logger = utils.get_logger(__name__)
print('%d uploads selected, re-processing ...' % uploads.count())
print('%d uploads selected, re-processing ...' % uploads_count)
def re_process_upload(upload):
logger.info('re-processing started', upload_id=upload.upload_id)
......@@ -163,12 +175,22 @@ def re_process(ctx, uploads):
upload.block_until_complete(interval=.1)
logger.info('re-processing complete', upload_id=upload.upload_id)
state['completed_count'] += 1
print(' re-processed %s of %s uploads' % (state['completed_count'], uploads_count))
state['available_threads_count'] += 1
cv.notify()
count = 0
for upload in uploads:
re_process_upload(upload)
count += 1
print(' re-processed %s of %s uploads' % (count, len(uploads)))
with cv:
cv.wait_for(lambda: state['available_threads_count'] > 0)
state['available_threads_count'] -= 1
thread = threading.Thread(target=lambda: re_process_upload(upload))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
@uploads.command(help='Attempt to abort the processing of uploads.')
......
......@@ -281,7 +281,7 @@ def upload(
create_packages=create_packages, only_republish=only_republish, wait=wait, republish=republish)
@migration.command(help='Get an report about not migrated calcs.')
@migration.command(help='Get an report about not migrated calcs. This connects directly to nomad dbs, like admin commands.')
def missing():
infrastructure.setup_logging()
infrastructure.setup_mongo()
......
......@@ -569,3 +569,14 @@ def with_publish_to_coe_repo(monkeypatch, request):
monkeypatch.setattr('nomad.config.repository_db.publish_enabled', True)
monkeypatch.setattr('nomad.config.repository_db.mode', mode)
return request.param is not None
@pytest.fixture
def reset_config():
""" Fixture that resets the log-level after test. """
service = config.service
log_level = config.console_log_level
yield None
config.service = service
config.console_log_level = log_level
infrastructure.setup_logging()
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click.testing
from nomad import utils, search
from nomad.cli import cli
from nomad.processing import Upload, Calc
# TODO there is much more to test
class TestAdmin:
def test_clean(self, published, no_warn, reset_config):
upload_id = published.upload_id
Upload.objects(upload_id=upload_id).delete()
assert published.upload_files.exists()
assert Calc.objects(upload_id=upload_id).first() is not None
assert search.entry_search(search_parameters=dict(upload_id=upload_id))['pagination']['total'] > 0
result = click.testing.CliRunner().invoke(
cli, ['admin', 'clean', '--force', '--skip-es'], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert not published.upload_files.exists()
assert Calc.objects(upload_id=upload_id).first() is None
assert search.entry_search(search_parameters=dict(upload_id=upload_id))['pagination']['total'] > 0
class TestAdminUploads:
def test_ls(self, published, no_warn, reset_config):
upload_id = published.upload_id
result = click.testing.CliRunner().invoke(
cli, ['admin', 'uploads', 'ls', upload_id], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert '1 uploads selected' in result.stdout
def test_rm(self, published, no_warn, reset_config):
upload_id = published.upload_id
result = click.testing.CliRunner().invoke(
cli, ['admin', 'uploads', 'rm', upload_id], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert 'deleting' in result.stdout
assert Upload.objects(upload_id=upload_id).first() is None
assert Calc.objects(upload_id=upload_id).first() is None
def test_re_process(self, published, no_warn, monkeypatch, reset_config):
monkeypatch.setattr('nomad.config.version', 'test_version')
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
assert calc.metadata['nomad_version'] != 'test_version'
result = click.testing.CliRunner().invoke(
cli, ['admin', 'uploads', 're-process', '--parallel', '2', upload_id], catch_exceptions=False, obj=utils.POPO())
assert result.exit_code == 0
assert 're-processing' in result.stdout
calc.reload()
assert calc.metadata['nomad_version'] == 'test_version'
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment