uploads.py 12.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import List
16
17
18
import click
from tabulate import tabulate
from mongoengine import Q
19
from elasticsearch_dsl import Q as ESQ
20
from pymongo import UpdateOne
21
import elasticsearch_dsl as es
22
import json
23

24
25
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel, archive

26
from .admin import admin, __run_processing
27
28


29
@admin.group(help='Upload related commands')
30
31
32
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
33
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
34
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
Markus Scheidgen's avatar
Markus Scheidgen committed
35
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
36
@click.pass_context
Markus Scheidgen's avatar
Markus Scheidgen committed
37
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool, code: List[str], query_mongo):
38
39
40
41
42
43
44
45
46
47
48
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

49
    if outdated:
50
51
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
52
            {'metadata.nomad_version': {'$ne': config.version}})
53
        query &= Q(upload_id__in=uploads)
54

55
    if code is not None and len(code) > 0:
56
        code_queries = [es.Q('match', **{'dft.code_name': code_name}) for code_name in code]
57
58
59
60
61
62
63
64
65
66
67
68
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

        query &= Q(upload_id__in=uploads)

69
70
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)
Markus Scheidgen's avatar
Markus Scheidgen committed
71
    ctx.obj.query_mongo = query_mongo
72
73
74


def query_uploads(ctx, uploads):
75
76
    try:
        json_query = json.loads(' '.join(uploads))
Markus Scheidgen's avatar
Markus Scheidgen committed
77
78
79
80
81
82
83
        if ctx.obj.query_mongo:
            uploads = proc.Calc.objects(**json_query).distinct(field="upload_id")
        else:
            request = search.SearchRequest()
            request.q = ESQ(json_query)
            request.quantity('upload_id', size=10000)
            uploads = list(request.execute()['quantities']['upload_id']['values'])
84
85
86
    except Exception:
        pass

87
88
89
90
91
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
92
93


94
@uploads.command(help='List selected uploads')
95
@click.argument('UPLOADS', nargs=-1)
96
97
98
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
99
@click.pass_context
100
def ls(ctx, uploads, calculations, ids, json):
101
102
    _, uploads = query_uploads(ctx, uploads)

103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

133
134
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
135
136
        [row(upload) for upload in uploads[:10]],
        headers=headers))
137
138


139
@uploads.command(help='Change the owner of the upload and all its calcs.')
140
@click.argument('USERNAME', nargs=1)
141
142
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
143
def chown(ctx, username, uploads):
144
145
146
147
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

148
    user = datamodel.User.get(username=username)
149
150

    for upload in uploads:
151
        upload.user_id = user.user_id
152
        calcs = upload.entries_metadata()
153

154
        def create_update(calc_id):
155
            return UpdateOne(
156
                {'_id': calc_id},
157
                {'$set': {'metadata.uploader': user.user_id}})
158

159
160
        proc.Calc._get_collection().bulk_write(
            [create_update(calc_id) for calc_id in upload.entry_ids()])
161
162
        upload.save()

163
164
        with upload.entries_metadata() as calcs:
            search.index_all(calcs, do_refresh=False)
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
        search.refresh()


@uploads.command(help='Change the owner of the upload and all its calcs.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
180
181
182
        proc.Calc._get_collection().update_many(
            dict(upload_id=upload.upload_id),
            {'$set': proc.Calc.reset_pymongo_update()})
183
184
185
186
187

        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
188
189


190
191
192
193
194
195
196
197
198
199
200
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
201
202
203
        with upload.entries_metadata() as calcs:
            failed += search.index_all(calcs)
            i += 1
204
205
206
207

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


233
@uploads.command(help='Delete selected upload')
234
@click.argument('UPLOADS', nargs=-1)
235
236
237
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
238
@click.pass_context
239
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
240
241
    _, uploads = query_uploads(ctx, uploads)

242
    print('%d uploads selected, deleting ...' % uploads.count())
243

244
    for upload in uploads:
245
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
246
247


248
249
250
@uploads.command(help='Create msgpack file for upload')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
251
def msgpack(ctx, uploads):
252
253
254
255
256
257
258
259
260
261
262
263
264
    _, uploads = query_uploads(ctx, uploads)

    for upload in uploads:
        upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

        if isinstance(upload_files, files.PublicUploadFiles):
            def iterator(zf, names):
                for name in names:
                    calc_id = name.strip('.json')
                    with zf.open(name) as f:
                        yield (calc_id, json.load(f))

            for access in ['public', 'restricted']:
265
266
                with upload_files._open_zip_file('archive', access, 'json') as zf:
                    archive_path = upload_files._file_object('archive', access, 'msg', 'msg').os_path
267
                    names = [name for name in zf.namelist() if name.endswith('json')]
268
269
                    archive.write_archive(archive_path, len(names), iterator(zf, names))
                print('wrote msgpack archive %s' % archive_path)
270
271


272
273
274
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
275
@click.option('--reprocess-running', is_flag=True, help='Also reprocess already running processes.')
276
@click.pass_context
277
def re_process(ctx, uploads, parallel: int, reprocess_running: bool):
278
    _, uploads = query_uploads(ctx, uploads)
279
280
281
    __run_processing(
        uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing',
        reprocess_running=reprocess_running)
282
283
284
285
286
287
288


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
289
290
    _, uploads = query_uploads(ctx, uploads)
    __run_processing(uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
291
292


293
@uploads.command(help='Attempt to abort the processing of uploads.')
294
@click.argument('UPLOADS', nargs=-1)
295
296
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
297
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
298
@click.pass_context
299
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
300
301
    query, _ = query_uploads(ctx, uploads)

302
303
304
    logger = utils.get_logger(__name__)

    def stop_all(query):
305
306
307
308
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
309

310
311
312
313
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
314
315
316
317
318

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
319
320
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
321
322
323
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
324
                    celery_task_id=process.celery_task_id, **logger_kwargs)
325

326
327
            if kill:
                logger.info(
328
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
329
330
                    **logger_kwargs)

331
332
333
334
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
335

336
337
    running_query = query & (Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING))
    stop_all(proc.Calc.objects(running_query))
338
    if not calcs:
339
        stop_all(proc.Upload.objects(running_query))