Commit b8f536e5 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Run indexing in parallel.

parent 563ec032
Pipeline #75191 passed with stages
in 22 minutes and 34 seconds
......@@ -34,8 +34,8 @@ from nomad import atomutils
from nomad.cli.cli import cli
def __run_processing(
uploads, parallel: int, process, label: str, reprocess_running: bool = False):
def __run_parallel(
uploads, parallel: int, callable, label: str):
if isinstance(uploads, (tuple, list)):
uploads_count = len(uploads)
......@@ -59,24 +59,9 @@ def __run_processing(
logger.info('%s started' % label, upload_id=upload.upload_id)
completed = False
if upload.process_running and not reprocess_running:
logger.warn(
'cannot trigger %s, since the upload is already/still processing' % label,
current_process=upload.current_process,
current_task=upload.current_task, upload_id=upload.upload_id)
else:
upload.reset()
process(upload)
upload.block_until_complete(interval=.5)
if upload.tasks_status == proc.FAILURE:
logger.info('%s with failure' % label, upload_id=upload.upload_id)
if callable(upload, logger):
completed = True
logger.info('%s complete' % label, upload_id=upload.upload_id)
with cv:
state['completed_count'] += 1 if completed else 0
state['skipped_count'] += 1 if not completed else 0
......@@ -100,6 +85,30 @@ def __run_processing(
thread.join()
def __run_processing(
uploads, parallel: int, process, label: str, reprocess_running: bool = False):
def run_process(upload, logger):
if upload.process_running and not reprocess_running:
logger.warn(
'cannot trigger %s, since the upload is already/still processing' % label,
current_process=upload.current_process,
current_task=upload.current_task, upload_id=upload.upload_id)
return False
else:
upload.reset()
process(upload)
upload.block_until_complete(interval=.5)
if upload.tasks_status == proc.FAILURE:
logger.info('%s with failure' % label, upload_id=upload.upload_id)
logger.info('%s complete' % label, upload_id=upload.upload_id)
return True
__run_parallel(uploads, parallel=parallel, callable=run_process, label=label)
@cli.group(help='''The nomad admin commands to do nasty stuff directly on the databases.
Remember: With great power comes great responsibility!''')
@click.pass_context
......
......@@ -21,7 +21,7 @@ import json
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel, archive
from .admin import admin, __run_processing
from .admin import admin, __run_processing, __run_parallel
@admin.group(help='Upload related commands')
......@@ -218,15 +218,12 @@ def reset(ctx, uploads, with_calcs):
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def index(ctx, uploads):
def index(ctx, uploads, parallel):
_, uploads = query_uploads(ctx, uploads)
uploads_count = uploads.count()
print('%d uploads selected, indexing ...' % uploads_count)
i, failed = 0, 0
for upload in uploads:
def index_upload(upload, logger):
with upload.entries_metadata() as calcs:
# This is just a temporary fix to update the group hash without re-processing
try:
......@@ -235,10 +232,13 @@ def index(ctx, uploads):
calc.dft.update_group_hash()
except Exception:
pass
failed += search.index_all(calcs)
i += 1
failed = search.index_all(calcs)
if failed > 0:
print(' WARNING failed to index %d entries' % failed)
return True
print(' indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))
__run_parallel(uploads, parallel, index_upload, 'index')
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment