uploads.py 14.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
14
import typing
15
import click
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
import tabulate
import mongoengine
import pymongo
19
import elasticsearch_dsl as es
20
import json
21

22
23
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel, archive

24
from .admin import admin, __run_processing
25
26


27
@admin.group(help='Upload related commands')
28
29
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
30
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
31
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
Markus Scheidgen's avatar
Markus Scheidgen committed
32
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
33
34
35
36
37
38
39
40
@click.option('--processing', help='Select only processing uploads', is_flag=True)
@click.option('--processing-failure-uploads', is_flag=True, help='Select uploads with failed processing')
@click.option('--processing-failure-calcs', is_flag=True, help='Select uploads with calcs with failed processing')
@click.option('--processing-failure', is_flag=True, help='Select uploads where the upload or any calc has failed processing')
@click.option('--processing-incomplete-uploads', is_flag=True, help='Select uploads that have not yet been processed')
@click.option('--processing-incomplete-calcs', is_flag=True, help='Select uploads where any calc has net yot been processed')
@click.option('--processing-incomplete', is_flag=True, help='Select uploads where the upload or any calc has not yet been processed')
@click.option('--processing-necessary', is_flag=True, help='Select uploads where the upload or any calc has either not been processed or processing has failed in the past')
41
@click.pass_context
Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
def uploads(
        ctx, user: str, staging: bool, processing: bool, outdated: bool,
44
45
46
47
        code: typing.List[str], query_mongo: bool,
        processing_failure_uploads: bool, processing_failure_calcs: bool, processing_failure: bool,
        processing_incomplete_uploads: bool, processing_incomplete_calcs: bool, processing_incomplete: bool,
        processing_necessary: bool):
48
49
50
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

Markus Scheidgen's avatar
Markus Scheidgen committed
51
    query = mongoengine.Q()
52
    calc_query = None
53
    if user is not None:
54
        query |= mongoengine.Q(user_id=user)
55
    if staging:
56
        query |= mongoengine.Q(published=False)
57
    if processing:
58
        query |= mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING)
59

60
    if outdated:
61
62
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
63
            {'metadata.nomad_version': {'$ne': config.version}})
64
        query |= mongoengine.Q(upload_id__in=uploads)
65

66
    if code is not None and len(code) > 0:
67
        code_queries = [es.Q('match', **{'dft.code_name': code_name}) for code_name in code]
68
69
70
71
72
73
74
75
76
77
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

78
79
80
81
82
83
84
85
86
87
88
89
90
91
        query |= mongoengine.Q(upload_id__in=uploads)

    if processing_failure_calcs or processing_failure or processing_necessary:
        if calc_query is None:
            calc_query = mongoengine.Q()
        calc_query |= mongoengine.Q(tasks_status=proc.FAILURE)
    if processing_failure_uploads or processing_failure or processing_necessary:
        query |= mongoengine.Q(tasks_status=proc.FAILURE)
    if processing_incomplete_calcs or processing_incomplete or processing_necessary:
        if calc_query is None:
            calc_query = mongoengine.Q()
        calc_query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
    if processing_incomplete_uploads or processing_incomplete or processing_necessary:
        query |= mongoengine.Q(process_status__ne=proc.PROCESS_COMPLETED)
92

93
    ctx.obj.query = query
94
    ctx.obj.calc_query = calc_query
95
    ctx.obj.uploads = proc.Upload.objects(query)
Markus Scheidgen's avatar
Markus Scheidgen committed
96
    ctx.obj.query_mongo = query_mongo
97
98
99


def query_uploads(ctx, uploads):
100
101
    try:
        json_query = json.loads(' '.join(uploads))
Markus Scheidgen's avatar
Markus Scheidgen committed
102
103
104
105
        if ctx.obj.query_mongo:
            uploads = proc.Calc.objects(**json_query).distinct(field="upload_id")
        else:
            request = search.SearchRequest()
Markus Scheidgen's avatar
Markus Scheidgen committed
106
            request.q = es.Q(json_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
107
108
            request.quantity('upload_id', size=10000)
            uploads = list(request.execute()['quantities']['upload_id']['values'])
109
110
111
    except Exception:
        pass

112
    query = ctx.obj.query
113
114
115
    if ctx.obj.calc_query is not None:
        query |= mongoengine.Q(
            upload_id__in=proc.Calc.objects(ctx.obj.calc_query).distinct(field="upload_id"))
116
    if len(uploads) > 0:
117
        query |= mongoengine.Q(upload_id__in=uploads)
118
119

    return query, proc.Upload.objects(query)
120
121


122
@uploads.command(help='List selected uploads')
123
@click.argument('UPLOADS', nargs=-1)
124
125
126
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
127
@click.pass_context
128
def ls(ctx, uploads, calculations, ids, json):
129
130
    _, uploads = query_uploads(ctx, uploads)

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

161
    print('%d uploads selected, showing no more than first 10' % uploads.count())
Markus Scheidgen's avatar
Markus Scheidgen committed
162
    print(tabulate.tabulate(
163
164
        [row(upload) for upload in uploads[:10]],
        headers=headers))
165
166


167
@uploads.command(help='Change the owner of the upload and all its calcs.')
168
@click.argument('USERNAME', nargs=1)
169
170
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
171
def chown(ctx, username, uploads):
172
173
174
175
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

176
    user = datamodel.User.get(username=username)
177
178

    for upload in uploads:
179
        upload.user_id = user.user_id
180
        calcs = upload.entries_metadata()
181

182
        def create_update(calc_id):
Markus Scheidgen's avatar
Markus Scheidgen committed
183
            return pymongo.UpdateOne(
184
                {'_id': calc_id},
185
                {'$set': {'metadata.uploader': user.user_id}})
186

187
188
        proc.Calc._get_collection().bulk_write(
            [create_update(calc_id) for calc_id in upload.entry_ids()])
189
190
        upload.save()

191
192
        with upload.entries_metadata() as calcs:
            search.index_all(calcs, do_refresh=False)
193
194
195
        search.refresh()


196
@uploads.command(help='Reset the processing state.')
197
198
199
200
201
202
203
204
205
206
207
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
208
209
210
        proc.Calc._get_collection().update_many(
            dict(upload_id=upload.upload_id),
            {'$set': proc.Calc.reset_pymongo_update()})
211

212
        upload.process_status = None
213
214
215
216
        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
217
218


219
220
221
222
223
224
225
226
227
228
229
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
230
        with upload.entries_metadata() as calcs:
Markus Scheidgen's avatar
Markus Scheidgen committed
231
232
233
234
235
236
237
            # This is just a temporary fix to update the group hash without re-processing
            try:
                for calc in calcs:
                    if calc.dft is not None:
                        calc.dft.update_group_hash()
            except Exception:
                pass
238
239
            failed += search.index_all(calcs)
            i += 1
240
241
242
243

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


269
@uploads.command(help='Delete selected upload')
270
@click.argument('UPLOADS', nargs=-1)
271
272
273
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
274
@click.pass_context
275
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
276
277
    _, uploads = query_uploads(ctx, uploads)

278
    print('%d uploads selected, deleting ...' % uploads.count())
279

280
    for upload in uploads:
281
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
282
283


284
285
286
@uploads.command(help='Create msgpack file for upload')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
287
def msgpack(ctx, uploads):
288
289
290
291
292
293
294
295
296
297
298
299
300
    _, uploads = query_uploads(ctx, uploads)

    for upload in uploads:
        upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

        if isinstance(upload_files, files.PublicUploadFiles):
            def iterator(zf, names):
                for name in names:
                    calc_id = name.strip('.json')
                    with zf.open(name) as f:
                        yield (calc_id, json.load(f))

            for access in ['public', 'restricted']:
301
302
                with upload_files._open_zip_file('archive', access, 'json') as zf:
                    archive_path = upload_files._file_object('archive', access, 'msg', 'msg').os_path
303
                    names = [name for name in zf.namelist() if name.endswith('json')]
304
305
                    archive.write_archive(archive_path, len(names), iterator(zf, names))
                print('wrote msgpack archive %s' % archive_path)
306
307


308
309
310
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
311
@click.option('--reprocess-running', is_flag=True, help='Also reprocess already running processes.')
312
@click.pass_context
313
def re_process(ctx, uploads, parallel: int, reprocess_running: bool):
314
    _, uploads = query_uploads(ctx, uploads)
315
316
317
    __run_processing(
        uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing',
        reprocess_running=reprocess_running)
318
319
320
321
322
323
324


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
325
326
    _, uploads = query_uploads(ctx, uploads)
    __run_processing(uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
327
328


329
@uploads.command(help='Attempt to abort the processing of uploads.')
330
@click.argument('UPLOADS', nargs=-1)
331
332
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
333
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
334
@click.pass_context
335
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
336
337
    query, _ = query_uploads(ctx, uploads)

338
339
340
    logger = utils.get_logger(__name__)

    def stop_all(query):
341
342
343
344
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
345

346
347
348
349
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
350
351
352
353
354

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
355
356
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
357
358
359
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
360
                    celery_task_id=process.celery_task_id, **logger_kwargs)
361

362
363
            if kill:
                logger.info(
364
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
365
366
                    **logger_kwargs)

367
368
369
370
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
371

Markus Scheidgen's avatar
Markus Scheidgen committed
372
    running_query = query & (mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING))
373
    stop_all(proc.Calc.objects(running_query))
374
    if not calcs:
375
        stop_all(proc.Upload.objects(running_query))