uploads.py 15.4 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
18
#

Markus Scheidgen's avatar
Markus Scheidgen committed
19
import typing
20
import click
Markus Scheidgen's avatar
Markus Scheidgen committed
21
22
23
import tabulate
import mongoengine
import pymongo
24
import elasticsearch_dsl as es
25
import json
26

27
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel
28

Markus Scheidgen's avatar
Markus Scheidgen committed
29
from .admin import admin, __run_processing, __run_parallel
30
31


32
@admin.group(help='Upload related commands')
33
@click.option('--user', help='Select uploads of user with given id', type=str)
34
35
@click.option('--unpublished', help='Select only uploads in staging', is_flag=True)
@click.option('--published', help='Select only uploads that are publised', is_flag=True)
36
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
37
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
Markus Scheidgen's avatar
Markus Scheidgen committed
38
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
39
40
41
42
43
44
45
46
@click.option('--processing', help='Select only processing uploads', is_flag=True)
@click.option('--processing-failure-uploads', is_flag=True, help='Select uploads with failed processing')
@click.option('--processing-failure-calcs', is_flag=True, help='Select uploads with calcs with failed processing')
@click.option('--processing-failure', is_flag=True, help='Select uploads where the upload or any calc has failed processing')
@click.option('--processing-incomplete-uploads', is_flag=True, help='Select uploads that have not yet been processed')
@click.option('--processing-incomplete-calcs', is_flag=True, help='Select uploads where any calc has net yot been processed')
@click.option('--processing-incomplete', is_flag=True, help='Select uploads where the upload or any calc has not yet been processed')
@click.option('--processing-necessary', is_flag=True, help='Select uploads where the upload or any calc has either not been processed or processing has failed in the past')
47
@click.option('--unindexed', is_flag=True, help='Select uploads that have no calcs in the elastic search index.')
48
@click.pass_context
Markus Scheidgen's avatar
Markus Scheidgen committed
49
def uploads(
50
        ctx, user: str, unpublished: bool, published: bool, processing: bool, outdated: bool,
51
52
53
        code: typing.List[str], query_mongo: bool,
        processing_failure_uploads: bool, processing_failure_calcs: bool, processing_failure: bool,
        processing_incomplete_uploads: bool, processing_incomplete_calcs: bool, processing_incomplete: bool,
54
55
        processing_necessary: bool, unindexed: bool):
    mongo_client = infrastructure.setup_mongo()
56
57
    infrastructure.setup_elastic()

Markus Scheidgen's avatar
Markus Scheidgen committed
58
    query = mongoengine.Q()
59
    calc_query = None
60
    if user is not None:
61
        query |= mongoengine.Q(user_id=user)
62
    if unpublished:
63
        query |= mongoengine.Q(published=False)
64
65
    if published:
        query |= mongoengine.Q(published=True)
66
    if processing:
67
        query |= mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING)
68

69
    if outdated:
70
71
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
72
            {'metadata.nomad_version': {'$ne': config.meta.version}})
73
        query |= mongoengine.Q(upload_id__in=uploads)
74

75
    if code is not None and len(code) > 0:
76
        code_queries = [es.Q('match', **{'dft.code_name': code_name}) for code_name in code]
77
78
79
80
81
82
83
84
85
86
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

87
88
89
90
91
92
93
94
95
96
97
98
99
100
        query |= mongoengine.Q(upload_id__in=uploads)

    if processing_failure_calcs or processing_failure or processing_necessary:
        if calc_query is None:
            calc_query = mongoengine.Q()
        calc_query |= mongoengine.Q(tasks_status=proc.FAILURE)
    if processing_failure_uploads or processing_failure or processing_necessary:
        query |= mongoengine.Q(tasks_status=proc.FAILURE)
    if processing_incomplete_calcs or processing_incomplete or processing_necessary:
        if calc_query is None:
            calc_query = mongoengine.Q()
        calc_query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
    if processing_incomplete_uploads or processing_incomplete or processing_necessary:
        query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
101

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
    if unindexed:
        search_request = search.Search(index=config.elastic.index_name)
        search_request.aggs.bucket('uploads', es.A('terms', field='upload_id', size=12000))
        response = search_request.execute()

        uploads_in_es = set(
            bucket.key
            for bucket in response.aggregations.uploads.buckets)

        uploads_in_mongo = mongo_client[config.mongo.db_name]['calc'].distinct('upload_id')

        uploads_not_in_es = []
        for upload_id in uploads_in_mongo:
            if upload_id not in uploads_in_es:
                uploads_not_in_es.append(upload_id)

        query |= mongoengine.Q(
            upload_id__in=uploads_not_in_es)

121
    ctx.obj.query = query
122
    ctx.obj.calc_query = calc_query
123
    ctx.obj.uploads = proc.Upload.objects(query)
Markus Scheidgen's avatar
Markus Scheidgen committed
124
    ctx.obj.query_mongo = query_mongo
125
126
127


def query_uploads(ctx, uploads):
128
129
    try:
        json_query = json.loads(' '.join(uploads))
Markus Scheidgen's avatar
Markus Scheidgen committed
130
131
132
133
        if ctx.obj.query_mongo:
            uploads = proc.Calc.objects(**json_query).distinct(field="upload_id")
        else:
            request = search.SearchRequest()
Markus Scheidgen's avatar
Markus Scheidgen committed
134
            request.q = es.Q(json_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
135
136
            request.quantity('upload_id', size=10000)
            uploads = list(request.execute()['quantities']['upload_id']['values'])
137
138
139
    except Exception:
        pass

140
    query = ctx.obj.query
141
142
143
    if ctx.obj.calc_query is not None:
        query |= mongoengine.Q(
            upload_id__in=proc.Calc.objects(ctx.obj.calc_query).distinct(field="upload_id"))
144
    if len(uploads) > 0:
145
        query |= mongoengine.Q(upload_id__in=uploads)
146
147

    return query, proc.Upload.objects(query)
148
149


150
@uploads.command(help='List selected uploads')
151
@click.argument('UPLOADS', nargs=-1)
152
153
154
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
155
@click.pass_context
156
def ls(ctx, uploads, calculations, ids, json):
157
158
    _, uploads = query_uploads(ctx, uploads)

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

189
    print('%d uploads selected, showing no more than first 10' % uploads.count())
Markus Scheidgen's avatar
Markus Scheidgen committed
190
    print(tabulate.tabulate(
191
192
        [row(upload) for upload in uploads[:10]],
        headers=headers))
193
194


195
@uploads.command(help='Change the owner of the upload and all its calcs.')
196
@click.argument('USERNAME', nargs=1)
197
198
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
199
def chown(ctx, username, uploads):
200
201
202
203
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

204
    user = datamodel.User.get(username=username)
205
206

    for upload in uploads:
207
        upload.user_id = user.user_id
208
        calcs = upload.entries_metadata()
209

210
        def create_update(calc_id):
Markus Scheidgen's avatar
Markus Scheidgen committed
211
            return pymongo.UpdateOne(
212
                {'_id': calc_id},
213
                {'$set': {'metadata.uploader': user.user_id}})
214

215
216
        proc.Calc._get_collection().bulk_write(
            [create_update(calc_id) for calc_id in upload.entry_ids()])
217
218
        upload.save()

219
220
        with upload.entries_metadata() as calcs:
            search.index_all(calcs, do_refresh=False)
221
222
223
        search.refresh()


224
@uploads.command(help='Reset the processing state.')
225
226
227
228
229
230
231
232
233
234
235
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
236
237
238
        proc.Calc._get_collection().update_many(
            dict(upload_id=upload.upload_id),
            {'$set': proc.Calc.reset_pymongo_update()})
239

240
        upload.process_status = None
241
242
243
244
        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
245
246


247
248
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
Markus Scheidgen's avatar
Markus Scheidgen committed
249
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
250
@click.option('--transformer', help='Qualified name to a Python function that should be applied to each EntryMetadata.')
251
@click.pass_context
252
253
254
255
256
257
258
259
def index(ctx, uploads, parallel, transformer):
    transformer_func = None
    if transformer is not None:
        import importlib
        module_name, func_name = transformer.rsplit('.', 1)
        module = importlib.import_module(module_name)
        transformer_func = getattr(module, func_name)

260
261
    _, uploads = query_uploads(ctx, uploads)

262
263
264
265
266
267
268
269
270
271
    def transform(calcs):
        for calc in calcs:
            try:
                calc = transformer_func(calc)
            except Exception as e:
                import traceback
                traceback.print_exc()
                print(f'   ERROR failed to transform calc (stop transforming for upload): {str(e)}')
                break

Markus Scheidgen's avatar
Markus Scheidgen committed
272
    def index_upload(upload, logger):
273
        with upload.entries_metadata() as calcs:
274
275
            if transformer is not None:
                transform(calcs)
Markus Scheidgen's avatar
Markus Scheidgen committed
276
277
278
279
280
            failed = search.index_all(calcs)
            if failed > 0:
                print('    WARNING failed to index %d entries' % failed)

        return True
281

Markus Scheidgen's avatar
Markus Scheidgen committed
282
    __run_parallel(uploads, parallel, index_upload, 'index')
283
284


285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


310
@uploads.command(help='Delete selected upload')
311
@click.argument('UPLOADS', nargs=-1)
312
313
314
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
315
@click.pass_context
316
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
317
318
    _, uploads = query_uploads(ctx, uploads)

319
    print('%d uploads selected, deleting ...' % uploads.count())
320

321
    for upload in uploads:
322
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
323
324


325
326
327
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
328
@click.option('--reprocess-running', is_flag=True, help='Also reprocess already running processes.')
329
@click.pass_context
330
def re_process(ctx, uploads, parallel: int, reprocess_running: bool):
331
    _, uploads = query_uploads(ctx, uploads)
332
333
334
    __run_processing(
        uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing',
        reprocess_running=reprocess_running)
335
336
337
338
339
340
341


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
342
343
    _, uploads = query_uploads(ctx, uploads)
    __run_processing(uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
344
345


346
@uploads.command(help='Attempt to abort the processing of uploads.')
347
@click.argument('UPLOADS', nargs=-1)
348
349
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
350
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
351
@click.pass_context
352
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
353
354
    query, _ = query_uploads(ctx, uploads)

355
356
357
    logger = utils.get_logger(__name__)

    def stop_all(query):
358
359
360
361
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
362

363
364
365
366
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
367
368
369
370
371

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
372
373
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
374
375
376
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
377
                    celery_task_id=process.celery_task_id, **logger_kwargs)
378

379
380
            if kill:
                logger.info(
381
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
382
383
                    **logger_kwargs)

384
385
386
387
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
388

Markus Scheidgen's avatar
Markus Scheidgen committed
389
    running_query = query & (mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING))
390
    stop_all(proc.Calc.objects(running_query))
391
    if not calcs:
392
        stop_all(proc.Upload.objects(running_query))