uploads.py 12.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
14
import typing
15
import click
Markus Scheidgen's avatar
Markus Scheidgen committed
16
17
18
import tabulate
import mongoengine
import pymongo
19
import elasticsearch_dsl as es
20
import json
21

22
23
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel, archive

24
from .admin import admin, __run_processing
25
26


27
@admin.group(help='Upload related commands')
28
29
30
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
31
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
32
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
Markus Scheidgen's avatar
Markus Scheidgen committed
33
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
34
@click.pass_context
Markus Scheidgen's avatar
Markus Scheidgen committed
35
36
37
def uploads(
        ctx, user: str, staging: bool, processing: bool, outdated: bool,
        code: typing.List[str], query_mongo: bool):
38
39
40
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

Markus Scheidgen's avatar
Markus Scheidgen committed
41
    query = mongoengine.Q()
42
    if user is not None:
Markus Scheidgen's avatar
Markus Scheidgen committed
43
        query &= mongoengine.Q(user_id=user)
44
    if staging:
Markus Scheidgen's avatar
Markus Scheidgen committed
45
        query &= mongoengine.Q(published=False)
46
    if processing:
Markus Scheidgen's avatar
Markus Scheidgen committed
47
        query &= mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING)
48

49
    if outdated:
50
51
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
52
            {'metadata.nomad_version': {'$ne': config.version}})
Markus Scheidgen's avatar
Markus Scheidgen committed
53
        query &= mongoengine.Q(upload_id__in=uploads)
54

55
    if code is not None and len(code) > 0:
56
        code_queries = [es.Q('match', **{'dft.code_name': code_name}) for code_name in code]
57
58
59
60
61
62
63
64
65
66
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

Markus Scheidgen's avatar
Markus Scheidgen committed
67
        query &= mongoengine.Q(upload_id__in=uploads)
68

69
70
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)
Markus Scheidgen's avatar
Markus Scheidgen committed
71
    ctx.obj.query_mongo = query_mongo
72
73
74


def query_uploads(ctx, uploads):
75
76
    try:
        json_query = json.loads(' '.join(uploads))
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78
79
80
        if ctx.obj.query_mongo:
            uploads = proc.Calc.objects(**json_query).distinct(field="upload_id")
        else:
            request = search.SearchRequest()
Markus Scheidgen's avatar
Markus Scheidgen committed
81
            request.q = es.Q(json_query)
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
            request.quantity('upload_id', size=10000)
            uploads = list(request.execute()['quantities']['upload_id']['values'])
84
85
86
    except Exception:
        pass

87
88
    query = ctx.obj.query
    if len(uploads) > 0:
Markus Scheidgen's avatar
Markus Scheidgen committed
89
        query &= mongoengine.Q(upload_id__in=uploads)
90
91

    return query, proc.Upload.objects(query)
92
93


94
@uploads.command(help='List selected uploads')
95
@click.argument('UPLOADS', nargs=-1)
96
97
98
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
99
@click.pass_context
100
def ls(ctx, uploads, calculations, ids, json):
101
102
    _, uploads = query_uploads(ctx, uploads)

103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

133
    print('%d uploads selected, showing no more than first 10' % uploads.count())
Markus Scheidgen's avatar
Markus Scheidgen committed
134
    print(tabulate.tabulate(
135
136
        [row(upload) for upload in uploads[:10]],
        headers=headers))
137
138


139
@uploads.command(help='Change the owner of the upload and all its calcs.')
140
@click.argument('USERNAME', nargs=1)
141
142
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
143
def chown(ctx, username, uploads):
144
145
146
147
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

148
    user = datamodel.User.get(username=username)
149
150

    for upload in uploads:
151
        upload.user_id = user.user_id
152
        calcs = upload.entries_metadata()
153

154
        def create_update(calc_id):
Markus Scheidgen's avatar
Markus Scheidgen committed
155
            return pymongo.UpdateOne(
156
                {'_id': calc_id},
157
                {'$set': {'metadata.uploader': user.user_id}})
158

159
160
        proc.Calc._get_collection().bulk_write(
            [create_update(calc_id) for calc_id in upload.entry_ids()])
161
162
        upload.save()

163
164
        with upload.entries_metadata() as calcs:
            search.index_all(calcs, do_refresh=False)
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
        search.refresh()


@uploads.command(help='Change the owner of the upload and all its calcs.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
180
181
182
        proc.Calc._get_collection().update_many(
            dict(upload_id=upload.upload_id),
            {'$set': proc.Calc.reset_pymongo_update()})
183
184
185
186
187

        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
188
189


190
191
192
193
194
195
196
197
198
199
200
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
201
202
203
        with upload.entries_metadata() as calcs:
            failed += search.index_all(calcs)
            i += 1
204
205
206
207

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


233
@uploads.command(help='Delete selected upload')
234
@click.argument('UPLOADS', nargs=-1)
235
236
237
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
238
@click.pass_context
239
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
240
241
    _, uploads = query_uploads(ctx, uploads)

242
    print('%d uploads selected, deleting ...' % uploads.count())
243

244
    for upload in uploads:
245
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
246
247


248
249
250
@uploads.command(help='Create msgpack file for upload')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
251
def msgpack(ctx, uploads):
252
253
254
255
256
257
258
259
260
261
262
263
264
    _, uploads = query_uploads(ctx, uploads)

    for upload in uploads:
        upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

        if isinstance(upload_files, files.PublicUploadFiles):
            def iterator(zf, names):
                for name in names:
                    calc_id = name.strip('.json')
                    with zf.open(name) as f:
                        yield (calc_id, json.load(f))

            for access in ['public', 'restricted']:
265
266
                with upload_files._open_zip_file('archive', access, 'json') as zf:
                    archive_path = upload_files._file_object('archive', access, 'msg', 'msg').os_path
267
                    names = [name for name in zf.namelist() if name.endswith('json')]
268
269
                    archive.write_archive(archive_path, len(names), iterator(zf, names))
                print('wrote msgpack archive %s' % archive_path)
270
271


272
273
274
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
275
@click.option('--reprocess-running', is_flag=True, help='Also reprocess already running processes.')
276
@click.pass_context
277
def re_process(ctx, uploads, parallel: int, reprocess_running: bool):
278
    _, uploads = query_uploads(ctx, uploads)
279
280
281
    __run_processing(
        uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing',
        reprocess_running=reprocess_running)
282
283
284
285
286
287
288


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
289
290
    _, uploads = query_uploads(ctx, uploads)
    __run_processing(uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
291
292


293
@uploads.command(help='Attempt to abort the processing of uploads.')
294
@click.argument('UPLOADS', nargs=-1)
295
296
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
297
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
298
@click.pass_context
299
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
300
301
    query, _ = query_uploads(ctx, uploads)

302
303
304
    logger = utils.get_logger(__name__)

    def stop_all(query):
305
306
307
308
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
309

310
311
312
313
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
314
315
316
317
318

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
319
320
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
321
322
323
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
324
                    celery_task_id=process.celery_task_id, **logger_kwargs)
325

326
327
            if kill:
                logger.info(
328
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
329
330
                    **logger_kwargs)

331
332
333
334
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
335

Markus Scheidgen's avatar
Markus Scheidgen committed
336
    running_query = query & (mongoengine.Q(process_status=proc.PROCESS_RUNNING) | mongoengine.Q(tasks_status=proc.RUNNING))
337
    stop_all(proc.Calc.objects(running_query))
338
    if not calcs:
339
        stop_all(proc.Upload.objects(running_query))