uploads.py 15.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
14
import typing
15
import click
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
import tabulate
import mongoengine
import pymongo
19
import elasticsearch_dsl as es
20
import json
21

22
23
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel, archive

Markus Scheidgen's avatar
Markus Scheidgen committed
24
from .admin import admin, __run_processing, __run_parallel
25
26


27
@admin.group(help='Upload related commands')
28
@click.option('--user', help='Select uploads of user with given id', type=str)
29
30
@click.option('--unpublished', help='Select only uploads in staging', is_flag=True)
@click.option('--published', help='Select only uploads that are publised', is_flag=True)
31
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
32
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
Markus Scheidgen's avatar
Markus Scheidgen committed
33
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
34
35
36
37
38
39
40
41
@click.option('--processing', help='Select only processing uploads', is_flag=True)
@click.option('--processing-failure-uploads', is_flag=True, help='Select uploads with failed processing')
@click.option('--processing-failure-calcs', is_flag=True, help='Select uploads with calcs with failed processing')
@click.option('--processing-failure', is_flag=True, help='Select uploads where the upload or any calc has failed processing')
@click.option('--processing-incomplete-uploads', is_flag=True, help='Select uploads that have not yet been processed')
@click.option('--processing-incomplete-calcs', is_flag=True, help='Select uploads where any calc has net yot been processed')
@click.option('--processing-incomplete', is_flag=True, help='Select uploads where the upload or any calc has not yet been processed')
@click.option('--processing-necessary', is_flag=True, help='Select uploads where the upload or any calc has either not been processed or processing has failed in the past')
42
@click.option('--unindexed', is_flag=True, help='Select uploads that have no calcs in the elastic search index.')
43
@click.pass_context
Markus Scheidgen's avatar
Markus Scheidgen committed
44
def uploads(
45
        ctx, user: str, unpublished: bool, published: bool, processing: bool, outdated: bool,
46
47
48
        code: typing.List[str], query_mongo: bool,
        processing_failure_uploads: bool, processing_failure_calcs: bool, processing_failure: bool,
        processing_incomplete_uploads: bool, processing_incomplete_calcs: bool, processing_incomplete: bool,
49
50
        processing_necessary: bool, unindexed: bool):
    mongo_client = infrastructure.setup_mongo()
51
52
    infrastructure.setup_elastic()

Markus Scheidgen's avatar
Markus Scheidgen committed
53
    query = mongoengine.Q()
54
    calc_query = None
55
    if user is not None:
56
        query |= mongoengine.Q(user_id=user)
57
    if unpublished:
58
        query |= mongoengine.Q(published=False)
59
60
    if published:
        query |= mongoengine.Q(published=True)
61
    if processing:
62
        query |= mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING)
63

64
    if outdated:
65
66
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
67
            {'metadata.nomad_version': {'$ne': config.meta.version}})
68
        query |= mongoengine.Q(upload_id__in=uploads)
69

70
    if code is not None and len(code) > 0:
71
        code_queries = [es.Q('match', **{'dft.code_name': code_name}) for code_name in code]
72
73
74
75
76
77
78
79
80
81
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

82
83
84
85
86
87
88
89
90
91
92
93
94
95
        query |= mongoengine.Q(upload_id__in=uploads)

    if processing_failure_calcs or processing_failure or processing_necessary:
        if calc_query is None:
            calc_query = mongoengine.Q()
        calc_query |= mongoengine.Q(tasks_status=proc.FAILURE)
    if processing_failure_uploads or processing_failure or processing_necessary:
        query |= mongoengine.Q(tasks_status=proc.FAILURE)
    if processing_incomplete_calcs or processing_incomplete or processing_necessary:
        if calc_query is None:
            calc_query = mongoengine.Q()
        calc_query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
    if processing_incomplete_uploads or processing_incomplete or processing_necessary:
        query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
96

97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
    if unindexed:
        search_request = search.Search(index=config.elastic.index_name)
        search_request.aggs.bucket('uploads', es.A('terms', field='upload_id', size=12000))
        response = search_request.execute()

        uploads_in_es = set(
            bucket.key
            for bucket in response.aggregations.uploads.buckets)

        uploads_in_mongo = mongo_client[config.mongo.db_name]['calc'].distinct('upload_id')

        uploads_not_in_es = []
        for upload_id in uploads_in_mongo:
            if upload_id not in uploads_in_es:
                uploads_not_in_es.append(upload_id)

        query |= mongoengine.Q(
            upload_id__in=uploads_not_in_es)

116
    ctx.obj.query = query
117
    ctx.obj.calc_query = calc_query
118
    ctx.obj.uploads = proc.Upload.objects(query)
Markus Scheidgen's avatar
Markus Scheidgen committed
119
    ctx.obj.query_mongo = query_mongo
120
121
122


def query_uploads(ctx, uploads):
123
124
    try:
        json_query = json.loads(' '.join(uploads))
Markus Scheidgen's avatar
Markus Scheidgen committed
125
126
127
128
        if ctx.obj.query_mongo:
            uploads = proc.Calc.objects(**json_query).distinct(field="upload_id")
        else:
            request = search.SearchRequest()
Markus Scheidgen's avatar
Markus Scheidgen committed
129
            request.q = es.Q(json_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
130
131
            request.quantity('upload_id', size=10000)
            uploads = list(request.execute()['quantities']['upload_id']['values'])
132
133
134
    except Exception:
        pass

135
    query = ctx.obj.query
136
137
138
    if ctx.obj.calc_query is not None:
        query |= mongoengine.Q(
            upload_id__in=proc.Calc.objects(ctx.obj.calc_query).distinct(field="upload_id"))
139
    if len(uploads) > 0:
140
        query |= mongoengine.Q(upload_id__in=uploads)
141
142

    return query, proc.Upload.objects(query)
143
144


145
@uploads.command(help='List selected uploads')
146
@click.argument('UPLOADS', nargs=-1)
147
148
149
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
150
@click.pass_context
151
def ls(ctx, uploads, calculations, ids, json):
152
153
    _, uploads = query_uploads(ctx, uploads)

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

184
    print('%d uploads selected, showing no more than first 10' % uploads.count())
Markus Scheidgen's avatar
Markus Scheidgen committed
185
    print(tabulate.tabulate(
186
187
        [row(upload) for upload in uploads[:10]],
        headers=headers))
188
189


190
@uploads.command(help='Change the owner of the upload and all its calcs.')
191
@click.argument('USERNAME', nargs=1)
192
193
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
194
def chown(ctx, username, uploads):
195
196
197
198
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

199
    user = datamodel.User.get(username=username)
200
201

    for upload in uploads:
202
        upload.user_id = user.user_id
203
        calcs = upload.entries_metadata()
204

205
        def create_update(calc_id):
Markus Scheidgen's avatar
Markus Scheidgen committed
206
            return pymongo.UpdateOne(
207
                {'_id': calc_id},
208
                {'$set': {'metadata.uploader': user.user_id}})
209

210
211
        proc.Calc._get_collection().bulk_write(
            [create_update(calc_id) for calc_id in upload.entry_ids()])
212
213
        upload.save()

214
215
        with upload.entries_metadata() as calcs:
            search.index_all(calcs, do_refresh=False)
216
217
218
        search.refresh()


219
@uploads.command(help='Reset the processing state.')
220
221
222
223
224
225
226
227
228
229
230
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
231
232
233
        proc.Calc._get_collection().update_many(
            dict(upload_id=upload.upload_id),
            {'$set': proc.Calc.reset_pymongo_update()})
234

235
        upload.process_status = None
236
237
238
239
        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
240
241


242
243
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
Markus Scheidgen's avatar
Markus Scheidgen committed
244
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
245
@click.pass_context
Markus Scheidgen's avatar
Markus Scheidgen committed
246
def index(ctx, uploads, parallel):
247
248
    _, uploads = query_uploads(ctx, uploads)

Markus Scheidgen's avatar
Markus Scheidgen committed
249
    def index_upload(upload, logger):
250
        with upload.entries_metadata() as calcs:
Markus Scheidgen's avatar
Markus Scheidgen committed
251
252
253
254
255
256
257
            # This is just a temporary fix to update the group hash without re-processing
            try:
                for calc in calcs:
                    if calc.dft is not None:
                        calc.dft.update_group_hash()
            except Exception:
                pass
Markus Scheidgen's avatar
Markus Scheidgen committed
258
259
260
261
262
            failed = search.index_all(calcs)
            if failed > 0:
                print('    WARNING failed to index %d entries' % failed)

        return True
263

Markus Scheidgen's avatar
Markus Scheidgen committed
264
    __run_parallel(uploads, parallel, index_upload, 'index')
265
266


267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


292
@uploads.command(help='Delete selected upload')
293
@click.argument('UPLOADS', nargs=-1)
294
295
296
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
297
@click.pass_context
298
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
299
300
    _, uploads = query_uploads(ctx, uploads)

301
    print('%d uploads selected, deleting ...' % uploads.count())
302

303
    for upload in uploads:
304
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
305
306


307
308
309
@uploads.command(help='Create msgpack file for upload')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
310
def msgpack(ctx, uploads):
311
312
313
314
315
316
317
318
319
320
321
322
323
    _, uploads = query_uploads(ctx, uploads)

    for upload in uploads:
        upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

        if isinstance(upload_files, files.PublicUploadFiles):
            def iterator(zf, names):
                for name in names:
                    calc_id = name.strip('.json')
                    with zf.open(name) as f:
                        yield (calc_id, json.load(f))

            for access in ['public', 'restricted']:
324
325
                with upload_files._open_zip_file('archive', access, 'json') as zf:
                    archive_path = upload_files._file_object('archive', access, 'msg', 'msg').os_path
326
                    names = [name for name in zf.namelist() if name.endswith('json')]
327
328
                    archive.write_archive(archive_path, len(names), iterator(zf, names))
                print('wrote msgpack archive %s' % archive_path)
329
330


331
332
333
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
334
@click.option('--reprocess-running', is_flag=True, help='Also reprocess already running processes.')
335
@click.pass_context
336
def re_process(ctx, uploads, parallel: int, reprocess_running: bool):
337
    _, uploads = query_uploads(ctx, uploads)
338
339
340
    __run_processing(
        uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing',
        reprocess_running=reprocess_running)
341
342
343
344
345
346
347


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
348
349
    _, uploads = query_uploads(ctx, uploads)
    __run_processing(uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
350
351


352
@uploads.command(help='Attempt to abort the processing of uploads.')
353
@click.argument('UPLOADS', nargs=-1)
354
355
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
356
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
357
@click.pass_context
358
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
359
360
    query, _ = query_uploads(ctx, uploads)

361
362
363
    logger = utils.get_logger(__name__)

    def stop_all(query):
364
365
366
367
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
368

369
370
371
372
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
373
374
375
376
377

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
378
379
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
380
381
382
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
383
                    celery_task_id=process.celery_task_id, **logger_kwargs)
384

385
386
            if kill:
                logger.info(
387
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
388
389
                    **logger_kwargs)

390
391
392
393
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
394

Markus Scheidgen's avatar
Markus Scheidgen committed
395
    running_query = query & (mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING))
396
    stop_all(proc.Calc.objects(running_query))
397
    if not calcs:
398
        stop_all(proc.Upload.objects(running_query))