uploads.py 16 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
18
#

Markus Scheidgen's avatar
Markus Scheidgen committed
19
import typing
20
import click
Markus Scheidgen's avatar
Markus Scheidgen committed
21
22
23
import tabulate
import mongoengine
import pymongo
24
import elasticsearch_dsl as es
25
import json
26

27
28
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel, archive

Markus Scheidgen's avatar
Markus Scheidgen committed
29
from .admin import admin, __run_processing, __run_parallel
30
31


32
@admin.group(help='Upload related commands')
33
@click.option('--user', help='Select uploads of user with given id', type=str)
34
35
@click.option('--unpublished', help='Select only uploads in staging', is_flag=True)
@click.option('--published', help='Select only uploads that are publised', is_flag=True)
36
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
37
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
Markus Scheidgen's avatar
Markus Scheidgen committed
38
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
39
40
41
42
43
44
45
46
@click.option('--processing', help='Select only processing uploads', is_flag=True)
@click.option('--processing-failure-uploads', is_flag=True, help='Select uploads with failed processing')
@click.option('--processing-failure-calcs', is_flag=True, help='Select uploads with calcs with failed processing')
@click.option('--processing-failure', is_flag=True, help='Select uploads where the upload or any calc has failed processing')
@click.option('--processing-incomplete-uploads', is_flag=True, help='Select uploads that have not yet been processed')
@click.option('--processing-incomplete-calcs', is_flag=True, help='Select uploads where any calc has net yot been processed')
@click.option('--processing-incomplete', is_flag=True, help='Select uploads where the upload or any calc has not yet been processed')
@click.option('--processing-necessary', is_flag=True, help='Select uploads where the upload or any calc has either not been processed or processing has failed in the past')
47
@click.option('--unindexed', is_flag=True, help='Select uploads that have no calcs in the elastic search index.')
48
@click.pass_context
Markus Scheidgen's avatar
Markus Scheidgen committed
49
def uploads(
50
        ctx, user: str, unpublished: bool, published: bool, processing: bool, outdated: bool,
51
52
53
        code: typing.List[str], query_mongo: bool,
        processing_failure_uploads: bool, processing_failure_calcs: bool, processing_failure: bool,
        processing_incomplete_uploads: bool, processing_incomplete_calcs: bool, processing_incomplete: bool,
54
55
        processing_necessary: bool, unindexed: bool):
    mongo_client = infrastructure.setup_mongo()
56
57
    infrastructure.setup_elastic()

Markus Scheidgen's avatar
Markus Scheidgen committed
58
    query = mongoengine.Q()
59
    calc_query = None
60
    if user is not None:
61
        query |= mongoengine.Q(user_id=user)
62
    if unpublished:
63
        query |= mongoengine.Q(published=False)
64
65
    if published:
        query |= mongoengine.Q(published=True)
66
    if processing:
67
        query |= mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING)
68

69
    if outdated:
70
71
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
72
            {'metadata.nomad_version': {'$ne': config.meta.version}})
73
        query |= mongoengine.Q(upload_id__in=uploads)
74

75
    if code is not None and len(code) > 0:
76
        code_queries = [es.Q('match', **{'dft.code_name': code_name}) for code_name in code]
77
78
79
80
81
82
83
84
85
86
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

87
88
89
90
91
92
93
94
95
96
97
98
99
100
        query |= mongoengine.Q(upload_id__in=uploads)

    if processing_failure_calcs or processing_failure or processing_necessary:
        if calc_query is None:
            calc_query = mongoengine.Q()
        calc_query |= mongoengine.Q(tasks_status=proc.FAILURE)
    if processing_failure_uploads or processing_failure or processing_necessary:
        query |= mongoengine.Q(tasks_status=proc.FAILURE)
    if processing_incomplete_calcs or processing_incomplete or processing_necessary:
        if calc_query is None:
            calc_query = mongoengine.Q()
        calc_query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
    if processing_incomplete_uploads or processing_incomplete or processing_necessary:
        query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
101

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    if unindexed:
        search_request = search.Search(index=config.elastic.index_name)
        search_request.aggs.bucket('uploads', es.A('terms', field='upload_id', size=12000))
        response = search_request.execute()

        uploads_in_es = set(
            bucket.key
            for bucket in response.aggregations.uploads.buckets)

        uploads_in_mongo = mongo_client[config.mongo.db_name]['calc'].distinct('upload_id')

        uploads_not_in_es = []
        for upload_id in uploads_in_mongo:
            if upload_id not in uploads_in_es:
                uploads_not_in_es.append(upload_id)

        query |= mongoengine.Q(
            upload_id__in=uploads_not_in_es)

121
    ctx.obj.query = query
122
    ctx.obj.calc_query = calc_query
123
    ctx.obj.uploads = proc.Upload.objects(query)
Markus Scheidgen's avatar
Markus Scheidgen committed
124
    ctx.obj.query_mongo = query_mongo
125
126
127


def query_uploads(ctx, uploads):
128
129
    try:
        json_query = json.loads(' '.join(uploads))
Markus Scheidgen's avatar
Markus Scheidgen committed
130
131
132
133
        if ctx.obj.query_mongo:
            uploads = proc.Calc.objects(**json_query).distinct(field="upload_id")
        else:
            request = search.SearchRequest()
Markus Scheidgen's avatar
Markus Scheidgen committed
134
            request.q = es.Q(json_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
135
136
            request.quantity('upload_id', size=10000)
            uploads = list(request.execute()['quantities']['upload_id']['values'])
137
138
139
    except Exception:
        pass

140
    query = ctx.obj.query
141
142
143
    if ctx.obj.calc_query is not None:
        query |= mongoengine.Q(
            upload_id__in=proc.Calc.objects(ctx.obj.calc_query).distinct(field="upload_id"))
144
    if len(uploads) > 0:
145
        query |= mongoengine.Q(upload_id__in=uploads)
146
147

    return query, proc.Upload.objects(query)
148
149


150
@uploads.command(help='List selected uploads')
151
@click.argument('UPLOADS', nargs=-1)
152
153
154
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
155
@click.pass_context
156
def ls(ctx, uploads, calculations, ids, json):
157
158
    _, uploads = query_uploads(ctx, uploads)

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

189
    print('%d uploads selected, showing no more than first 10' % uploads.count())
Markus Scheidgen's avatar
Markus Scheidgen committed
190
    print(tabulate.tabulate(
191
192
        [row(upload) for upload in uploads[:10]],
        headers=headers))
193
194


195
@uploads.command(help='Change the owner of the upload and all its calcs.')
196
@click.argument('USERNAME', nargs=1)
197
198
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
199
def chown(ctx, username, uploads):
200
201
202
203
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

204
    user = datamodel.User.get(username=username)
205
206

    for upload in uploads:
207
        upload.user_id = user.user_id
208
        calcs = upload.entries_metadata()
209

210
        def create_update(calc_id):
Markus Scheidgen's avatar
Markus Scheidgen committed
211
            return pymongo.UpdateOne(
212
                {'_id': calc_id},
213
                {'$set': {'metadata.uploader': user.user_id}})
214

215
216
        proc.Calc._get_collection().bulk_write(
            [create_update(calc_id) for calc_id in upload.entry_ids()])
217
218
        upload.save()

219
220
        with upload.entries_metadata() as calcs:
            search.index_all(calcs, do_refresh=False)
221
222
223
        search.refresh()


224
@uploads.command(help='Reset the processing state.')
225
226
227
228
229
230
231
232
233
234
235
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
236
237
238
        proc.Calc._get_collection().update_many(
            dict(upload_id=upload.upload_id),
            {'$set': proc.Calc.reset_pymongo_update()})
239

240
        upload.process_status = None
241
242
243
244
        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
245
246


247
248
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
Markus Scheidgen's avatar
Markus Scheidgen committed
249
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
250
@click.pass_context
Markus Scheidgen's avatar
Markus Scheidgen committed
251
def index(ctx, uploads, parallel):
252
253
    _, uploads = query_uploads(ctx, uploads)

Markus Scheidgen's avatar
Markus Scheidgen committed
254
    def index_upload(upload, logger):
255
        with upload.entries_metadata() as calcs:
Markus Scheidgen's avatar
Markus Scheidgen committed
256
257
258
259
260
261
262
            # This is just a temporary fix to update the group hash without re-processing
            try:
                for calc in calcs:
                    if calc.dft is not None:
                        calc.dft.update_group_hash()
            except Exception:
                pass
Markus Scheidgen's avatar
Markus Scheidgen committed
263
264
265
266
267
            failed = search.index_all(calcs)
            if failed > 0:
                print('    WARNING failed to index %d entries' % failed)

        return True
268

Markus Scheidgen's avatar
Markus Scheidgen committed
269
    __run_parallel(uploads, parallel, index_upload, 'index')
270
271


272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


297
@uploads.command(help='Delete selected upload')
298
@click.argument('UPLOADS', nargs=-1)
299
300
301
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
302
@click.pass_context
303
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
304
305
    _, uploads = query_uploads(ctx, uploads)

306
    print('%d uploads selected, deleting ...' % uploads.count())
307

308
    for upload in uploads:
309
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
310
311


312
313
314
@uploads.command(help='Create msgpack file for upload')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
315
def msgpack(ctx, uploads):
316
317
318
319
320
321
322
323
324
325
326
327
328
    _, uploads = query_uploads(ctx, uploads)

    for upload in uploads:
        upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

        if isinstance(upload_files, files.PublicUploadFiles):
            def iterator(zf, names):
                for name in names:
                    calc_id = name.strip('.json')
                    with zf.open(name) as f:
                        yield (calc_id, json.load(f))

            for access in ['public', 'restricted']:
329
330
                with upload_files._open_zip_file('archive', access, 'json') as zf:
                    archive_path = upload_files._file_object('archive', access, 'msg', 'msg').os_path
331
                    names = [name for name in zf.namelist() if name.endswith('json')]
332
333
                    archive.write_archive(archive_path, len(names), iterator(zf, names))
                print('wrote msgpack archive %s' % archive_path)
334
335


336
337
338
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
339
@click.option('--reprocess-running', is_flag=True, help='Also reprocess already running processes.')
340
@click.pass_context
341
def re_process(ctx, uploads, parallel: int, reprocess_running: bool):
342
    _, uploads = query_uploads(ctx, uploads)
343
344
345
    __run_processing(
        uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing',
        reprocess_running=reprocess_running)
346
347
348
349
350
351
352


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
353
354
    _, uploads = query_uploads(ctx, uploads)
    __run_processing(uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
355
356


357
@uploads.command(help='Attempt to abort the processing of uploads.')
358
@click.argument('UPLOADS', nargs=-1)
359
360
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
361
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
362
@click.pass_context
363
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
364
365
    query, _ = query_uploads(ctx, uploads)

366
367
368
    logger = utils.get_logger(__name__)

    def stop_all(query):
369
370
371
372
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
373

374
375
376
377
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
378
379
380
381
382

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
383
384
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
385
386
387
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
388
                    celery_task_id=process.celery_task_id, **logger_kwargs)
389

390
391
            if kill:
                logger.info(
392
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
393
394
                    **logger_kwargs)

395
396
397
398
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
399

Markus Scheidgen's avatar
Markus Scheidgen committed
400
    running_query = query & (mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING))
401
    stop_all(proc.Calc.objects(running_query))
402
    if not calcs:
403
        stop_all(proc.Upload.objects(running_query))