Commit 34005e1a authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added flag to search for uploads not in ES to CLI.

parent 0f2fa60c
Pipeline #83227 passed with stages
in 20 minutes and 36 seconds
......@@ -26,7 +26,8 @@ from .admin import admin, __run_processing, __run_parallel'Upload related commands')
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--unpublished', help='Select only uploads in staging', is_flag=True)
@click.option('--published', help='Select only uploads that are publised', is_flag=True)
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
......@@ -38,22 +39,25 @@ from .admin import admin, __run_processing, __run_parallel
@click.option('--processing-incomplete-calcs', is_flag=True, help='Select uploads where any calc has net yot been processed')
@click.option('--processing-incomplete', is_flag=True, help='Select uploads where the upload or any calc has not yet been processed')
@click.option('--processing-necessary', is_flag=True, help='Select uploads where the upload or any calc has either not been processed or processing has failed in the past')
@click.option('--unindexed', is_flag=True, help='Select uploads that have no calcs in the elastic search index.')
def uploads(
ctx, user: str, staging: bool, processing: bool, outdated: bool,
ctx, user: str, unpublished: bool, published: bool, processing: bool, outdated: bool,
code: typing.List[str], query_mongo: bool,
processing_failure_uploads: bool, processing_failure_calcs: bool, processing_failure: bool,
processing_incomplete_uploads: bool, processing_incomplete_calcs: bool, processing_incomplete: bool,
processing_necessary: bool):
processing_necessary: bool, unindexed: bool):
mongo_client = infrastructure.setup_mongo()
query = mongoengine.Q()
calc_query = None
if user is not None:
query |= mongoengine.Q(user_id=user)
if staging:
if unpublished:
query |= mongoengine.Q(published=False)
if published:
query |= mongoengine.Q(published=True)
if processing:
query |= mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING)
......@@ -90,6 +94,25 @@ def uploads(
if processing_incomplete_uploads or processing_incomplete or processing_necessary:
query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
if unindexed:
search_request = search.Search(index=config.elastic.index_name)
search_request.aggs.bucket('uploads', es.A('terms', field='upload_id', size=12000))
response = search_request.execute()
uploads_in_es = set(
for bucket in response.aggregations.uploads.buckets)
uploads_in_mongo = mongo_client[config.mongo.db_name]['calc'].distinct('upload_id')
uploads_not_in_es = []
for upload_id in uploads_in_mongo:
if upload_id not in uploads_in_es:
query |= mongoengine.Q(
ctx.obj.query = query
ctx.obj.calc_query = calc_query
ctx.obj.uploads = proc.Upload.objects(query)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment