uploads.py 21 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
18
#

19
import click
20
21
22
23
24
25
26
import typing

from nomad import config

from .admin import admin


27
def _run_parallel(uploads, parallel: int, callable, label: str, print_progress: int = 0):
28
    import threading
29
    import time
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

    from nomad import utils, processing as proc

    if isinstance(uploads, (tuple, list)):
        uploads_count = len(uploads)

    else:
        uploads_count = uploads.count()
        uploads = list(uploads)  # copy the whole mongo query set to avoid cursor timeouts

    cv = threading.Condition()
    threads: typing.List[threading.Thread] = []

    state = dict(
        completed_count=0,
        skipped_count=0,
        available_threads_count=parallel)

    logger = utils.get_logger(__name__)

    print('%d uploads selected, %s ...' % (uploads_count, label))

    def process_upload(upload: proc.Upload):
        logger.info('%s started' % label, upload_id=upload.upload_id)

        completed = False
56
57
58
59
        try:
            if callable(upload, logger):
                completed = True
        except Exception as e:
60
            completed = True
61
            logger.error('%s failed' % label, upload_id=upload.upload_id, exc_info=e)
62
63
64
65
66
67
68
69
70

        with cv:
            state['completed_count'] += 1 if completed else 0
            state['skipped_count'] += 1 if not completed else 0
            state['available_threads_count'] += 1

            print(
                '   %s %s and skipped %s of %s uploads' %
                (label, state['completed_count'], state['skipped_count'], uploads_count))
71

72
            cv.notify()
73

74
75
76
77
    for upload in uploads:
        logger.info(
            'cli schedules parallel %s processing for upload' % label,
            current_process=upload.current_process,
78
            last_status_message=upload.last_status_message, upload_id=upload.upload_id)
79
80
81
82
83
84
85
        with cv:
            cv.wait_for(lambda: state['available_threads_count'] > 0)
            state['available_threads_count'] -= 1
            thread = threading.Thread(target=lambda: process_upload(upload))
            threads.append(thread)
            thread.start()

86
87
88
89
90
91
92
93
94
95
    def print_progress_lines():
        while True:
            time.sleep(print_progress)
            print('.', flush=True)

    if print_progress > 0:
        progress_thread = threading.Thread(target=print_progress_lines)
        progress_thread.daemon = True
        progress_thread.start()

96
97
98
99
100
    for thread in threads:
        thread.join()


def _run_processing(
101
        uploads, parallel: int, process, label: str, process_running: bool = False,
102
        wait_until_complete: bool = True, reset_first: bool = False, **kwargs):
103
104
105
106
107
108
109

    from nomad import processing as proc

    def run_process(upload, logger):
        logger.info(
            'cli calls %s processing' % label,
            current_process=upload.current_process,
110
            last_status_message=upload.last_status_message, upload_id=upload.upload_id)
111
        if upload.process_running and not process_running:
112
113
114
            logger.warn(
                'cannot trigger %s, since the upload is already/still processing' % label,
                current_process=upload.current_process,
115
                last_status_message=upload.last_status_message, upload_id=upload.upload_id)
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
            return False

        if reset_first:
            upload.reset(force=True)
        elif upload.process_running:
            upload.reset(force=True, process_status=proc.ProcessStatus.FAILURE)

        process(upload)
        if wait_until_complete:
            upload.block_until_complete(interval=.5)
        else:
            upload.block_until_complete_or_waiting_for_result(interval=.5)

        if upload.process_status == proc.ProcessStatus.FAILURE:
            logger.info('%s with failure' % label, upload_id=upload.upload_id)

        logger.info('%s complete' % label, upload_id=upload.upload_id)
        return True

135
    _run_parallel(uploads, parallel=parallel, callable=run_process, label=label, **kwargs)
136
137


138
@admin.group(help='Upload related commands')
139
140
141
@click.option('--uploads-mongo-query', type=str, help='A query')
@click.option('--entries-mongo-query', type=str, help='A query')
@click.option('--entries-es-query', type=str, help='A query')
142
143
@click.option('--unpublished', help='Select only uploads in staging', is_flag=True)
@click.option('--published', help='Select only uploads that are publised', is_flag=True)
144
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
145
146
147
148
149
150
151
152
@click.option('--processing', help='Select only processing uploads', is_flag=True)
@click.option('--processing-failure-uploads', is_flag=True, help='Select uploads with failed processing')
@click.option('--processing-failure-calcs', is_flag=True, help='Select uploads with calcs with failed processing')
@click.option('--processing-failure', is_flag=True, help='Select uploads where the upload or any calc has failed processing')
@click.option('--processing-incomplete-uploads', is_flag=True, help='Select uploads that have not yet been processed')
@click.option('--processing-incomplete-calcs', is_flag=True, help='Select uploads where any calc has net yot been processed')
@click.option('--processing-incomplete', is_flag=True, help='Select uploads where the upload or any calc has not yet been processed')
@click.option('--processing-necessary', is_flag=True, help='Select uploads where the upload or any calc has either not been processed or processing has failed in the past')
153
@click.option('--unindexed', is_flag=True, help='Select uploads that have no calcs in the elastic search index.')
154
@click.pass_context
155
156
157
158
159
160
def uploads(ctx, **kwargs):
    ctx.obj.uploads_kwargs = kwargs


def _query_uploads(
        uploads,
161
162
        unpublished: bool, published: bool, processing: bool, outdated: bool,
        uploads_mongo_query: str, entries_mongo_query: str, entries_es_query: str,
163
164
165
        processing_failure_uploads: bool, processing_failure_calcs: bool,
        processing_failure: bool, processing_incomplete_uploads: bool,
        processing_incomplete_calcs: bool, processing_incomplete: bool,
166
        processing_necessary: bool, unindexed: bool):
167

168
169
170
171
172
173
    '''
    Produces a list of uploads (mongoengine proc.Upload objects) based on a given
    list of upoad ids and further filter parameters.
    '''

    from typing import Set, cast
174
    import json
175
    from mongoengine import Q
176

177
178
    from nomad import infrastructure, processing as proc, search
    from nomad.app.v1 import models
179

180
    infrastructure.setup_mongo()
181
182
    infrastructure.setup_elastic()

183
184
185
186
187
    if uploads is not None and len(uploads) == 0:
        uploads = None  # None meaning all uploads
    else:
        uploads = set(uploads)

188
    entries_mongo_query_q = Q()
189
190
191
    if entries_mongo_query:
        entries_mongo_query_q = Q(**json.loads(entries_mongo_query))

192
    entries_query_uploads: Set[str] = None
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211

    if entries_es_query is not None:
        entries_es_query_dict = json.loads(entries_es_query)
        results = search.search(
            owner='admin',
            query=entries_es_query_dict,
            pagination=models.MetadataPagination(page_size=0),
            user_id=config.services.admin_user_id,
            aggregations={
                'uploads': models.Aggregation(
                    terms=models.TermsAggregation(
                        quantity='upload_id',
                        pagination=models.AggregationPagination(
                            page_size=10000
                        )
                    )
                )
            })

212
        entries_query_uploads = set([
213
214
            cast(str, bucket.value)
            for bucket in results.aggregations['uploads'].terms.data])  # pylint: disable=no-member
215

216
    if outdated:
217
        entries_mongo_query_q &= Q(nomad_version={'$ne': config.meta.version})
218

219
    if processing_failure_calcs or processing_failure or processing_necessary:
220
        entries_mongo_query_q &= Q(process_status=proc.ProcessStatus.FAILURE)
221

222
    if processing_incomplete_calcs or processing_incomplete or processing_necessary:
223
        entries_mongo_query_q &= Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
224

225
226
227
228
229
    mongo_entry_based_uploads = set(proc.Calc.objects(entries_mongo_query_q).distinct(field="upload_id"))
    if entries_query_uploads is not None:
        entries_query_uploads = entries_query_uploads.intersection(mongo_entry_based_uploads)
    else:
        entries_query_uploads = mongo_entry_based_uploads
230

231
    if entries_query_uploads:
232
233
234
        uploads_mongo_query_q = Q(upload_id__in=list(entries_query_uploads))
    else:
        uploads_mongo_query_q = Q()
235

236
    if uploads_mongo_query:
237
        uploads_mongo_query_q &= Q(**json.loads(uploads_mongo_query))
238

239
    if published:
240
        uploads_mongo_query_q &= Q(publish_time__exists=True)
241

242
    if unpublished:
243
        uploads_mongo_query_q &= Q(publish_time__exists=False)
244

245
    if processing:
246
        uploads_mongo_query_q &= Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
247

248
    if processing_failure_uploads or processing_failure or processing_necessary:
249
        uploads_mongo_query_q &= Q(process_status=proc.ProcessStatus.FAILURE)
250

251
    if processing_incomplete_uploads or processing_incomplete or processing_necessary:
252
        uploads_mongo_query_q &= Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
253

254
255
256
    final_query = uploads_mongo_query_q
    if uploads is not None:
        final_query &= Q(upload_id__in=list(uploads))
257

258
    return final_query, proc.Upload.objects(final_query)
259
260


261
@uploads.command(help='List selected uploads')
262
@click.argument('UPLOADS', nargs=-1)
263
264
265
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
266
@click.pass_context
267
def ls(ctx, uploads, calculations, ids, json):
268
269
270
    import tabulate

    _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
271

272
273
274
    def row(upload):
        row = [
            upload.upload_id,
275
            upload.upload_name,
276
            upload.main_author,
277
278
279
280
281
282
283
284
285
286
287
            upload.process_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

288
    headers = ['id', 'upload_name', 'user', 'process', 'published']
289
290
291
292
293
294
295
296
297
298
299
300
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

301
    print('%d uploads selected, showing no more than first 10' % uploads.count())
Markus Scheidgen's avatar
Markus Scheidgen committed
302
    print(tabulate.tabulate(
303
304
        [row(upload) for upload in uploads[:10]],
        headers=headers))
305
306


307
@uploads.command(help='Change the owner of the upload and all its calcs.')
308
@click.argument('USERNAME', nargs=1)
309
310
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
311
def chown(ctx, username, uploads):
312
313
314
    from nomad import datamodel

    _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
315

316
    print('%d uploads selected, changing owner ...' % uploads.count())
317

318
    user = datamodel.User.get(username=username)
319
    for upload in uploads:
320
        upload.edit_upload_metadata(
David Sikter's avatar
David Sikter committed
321
            edit_request_json=dict(metadata={'main_author': user.user_id}),
322
            user_id=config.services.admin_user_id)
323
324


325
@uploads.command(help='Reset the processing state.')
326
327
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
328
329
@click.option('--success', is_flag=True, help='Set the process status to success instead of pending')
@click.option('--failure', is_flag=True, help='Set the process status to failure instead of pending.')
330
@click.pass_context
331
def reset(ctx, uploads, with_calcs, success, failure):
332
333
334
    from nomad import processing as proc

    _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
335
336
337
338
339
340
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
341
342
343
        if with_calcs:
            calc_update = proc.Calc.reset_pymongo_update()
            if success:
344
                calc_update['process_status'] = proc.ProcessStatus.SUCCESS
345
            if failure:
346
                calc_update['process_status'] = proc.ProcessStatus.FAILURE
347
348
349

            proc.Calc._get_collection().update_many(
                dict(upload_id=upload.upload_id), {'$set': calc_update})
350

351
        upload.reset(force=True)
352
        if success:
353
            upload.process_status = proc.ProcessStatus.SUCCESS
354
        if failure:
355
            upload.process_status = proc.ProcessStatus.FAILURE
356
357
358
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
359
360


361
362
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
Markus Scheidgen's avatar
Markus Scheidgen committed
363
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
364
@click.option('--transformer', help='Qualified name to a Python function that should be applied to each EntryMetadata.')
365
@click.option('--skip-materials', is_flag=True, help='Only update the entries index.')
366
@click.option('--print-progress', default=0, type=int, help='Prints a dot every given seconds. Can be used to keep terminal open that have an i/o-based timeout.')
367
@click.pass_context
368
def index(ctx, uploads, parallel, transformer, skip_materials, print_progress):
369
370
    from nomad import search

371
372
373
374
375
376
377
    transformer_func = None
    if transformer is not None:
        import importlib
        module_name, func_name = transformer.rsplit('.', 1)
        module = importlib.import_module(module_name)
        transformer_func = getattr(module, func_name)

378
    _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
379

380
381
    def transform(entries):
        for entry in entries:
382
            try:
383
                entry = transformer_func(entry)
384
385
386
387
388
389
            except Exception as e:
                import traceback
                traceback.print_exc()
                print(f'   ERROR failed to transform calc (stop transforming for upload): {str(e)}')
                break

Markus Scheidgen's avatar
Markus Scheidgen committed
390
    def index_upload(upload, logger):
391
        with upload.entries_metadata() as entries:
392
            if transformer is not None:
393
                transform(entries)
394
            archives = [entry.m_parent for entry in entries]
395
            search.index(archives, update_materials=not skip_materials, refresh=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
396
397

        return True
398

399
    _run_parallel(uploads, parallel, index_upload, 'index', print_progress=print_progress)
400
401


402
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
403
404
    from nomad import search, files, utils, processing as proc

405
406
    # delete elastic
    if not skip_es:
407
        search.delete_upload(upload_id=upload.upload_id, update_materials=True, refresh=True)
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


429
@uploads.command(help='Delete selected upload')
430
@click.argument('UPLOADS', nargs=-1)
431
432
433
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
434
@click.pass_context
435
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
436
    _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
437

438
    print('%d uploads selected, deleting ...' % uploads.count())
439

440
    for upload in uploads:
441
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
442
443


444
445
446
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
447
448
@click.option('--process-running', is_flag=True, help='Also reprocess already running processes.')
@click.option('--setting', type=str, multiple=True, help='key=value to overwrite a default reprocess config setting.')
449
@click.option('--print-progress', default=0, type=int, help='Prints a dot every given seconds. Can be used to keep terminal open that have an i/o-based timeout.')
450
@click.pass_context
451
def process(ctx, uploads, parallel: int, process_running: bool, setting: typing.List[str], print_progress: int):
452
    _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
453
454
455
456
    settings: typing.Dict[str, bool] = {}
    for settings_str in setting:
        key, value = settings_str.split('=')
        settings[key] = bool(value)
457
    _run_processing(
458
        uploads, parallel, lambda upload: upload.process_upload(reprocess_settings=settings),
459
        'processing', process_running=process_running, reset_first=True, print_progress=print_progress)
460
461
462
463
464


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
465
def re_pack(ctx, uploads):
466
    _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
467
468
469
470
471
472
473
474

    for upload in uploads:
        if not upload.published:
            print(f'Cannot repack unpublished upload {upload.upload_id}')
            continue

        upload.upload_files.re_pack(upload.with_embargo)
        print(f'successfully re-packed {upload.upload_id}')
475
476


477
@uploads.command(help='Attempt to abort the processing of uploads.')
478
@click.argument('UPLOADS', nargs=-1)
479
480
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
481
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
482
@click.pass_context
483
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
484
485
486
487
    import mongoengine

    from nomad import utils, processing as proc
    query, _ = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
488

489
490
491
    logger = utils.get_logger(__name__)

    def stop_all(query):
492
493
494
495
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
496

497
498
499
500
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
501
502
503
504
505

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
506
507
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
508
509
510
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
511
                    celery_task_id=process.celery_task_id, **logger_kwargs)
512

513
514
            if kill:
                logger.info(
515
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
516
517
                    **logger_kwargs)

518
                process.fail('process terminate via nomad cli')
519

520
    running_query = query & mongoengine.Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
521
    stop_all(proc.Calc.objects(running_query))
522
    if not calcs:
523
        stop_all(proc.Upload.objects(running_query))
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551


@uploads.group(help='Check certain integrity criteria')
@click.pass_context
def integrity(ctx):
    pass


@integrity.command(help='Uploads that have more entries in mongo than in ES.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def entry_index(ctx, uploads):
    from nomad.search import search
    from nomad.processing import Upload
    from nomad.app.v1.models import Pagination

    _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)

    upload: Upload = None
    for upload in uploads:
        search_results = search(
            owner='admin',
            query=dict(upload_id=upload.upload_id),
            pagination=Pagination(page_size=0),
            user_id=config.services.admin_user_id)

        if search_results.pagination.total != upload.total_calcs:
            print(upload.upload_id)