uploads.py 13.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import List, Callable
16
17
18
import click
from tabulate import tabulate
from mongoengine import Q
19
from pymongo import UpdateOne
20
import threading
21
import elasticsearch_dsl as es
22
import json
23

24
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel
25
from .admin import admin
26
from nomad.archive import write_archive
27
28


29
@admin.group(help='Upload related commands')
30
31
32
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
33
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
34
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
35
@click.pass_context
36
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool, code: List[str]):
37
38
39
40
41
42
43
44
45
46
47
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

48
    if outdated:
49
50
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
51
            {'metadata.nomad_version': {'$ne': config.version}})
52
        query &= Q(upload_id__in=uploads)
53

54
55
56
57
58
59
60
61
62
63
64
65
66
67
    if code is not None and len(code) > 0:
        code_queries = [es.Q('match', code_name=code_name) for code_name in code]
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

        query &= Q(upload_id__in=uploads)

68
69
70
71
72
73
74
75
76
77
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)


def query_uploads(ctx, uploads):
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
78
79


80
@uploads.command(help='List selected uploads')
81
@click.argument('UPLOADS', nargs=-1)
82
83
84
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
85
@click.pass_context
86
def ls(ctx, uploads, calculations, ids, json):
87
88
    _, uploads = query_uploads(ctx, uploads)

89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

119
120
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
121
122
        [row(upload) for upload in uploads[:10]],
        headers=headers))
123
124


125
@uploads.command(help='Change the owner of the upload and all its calcs.')
126
@click.argument('USERNAME', nargs=1)
127
128
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
129
def chown(ctx, username, uploads):
130
131
132
133
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

134
    user = datamodel.User.get(username=username)
135
136

    for upload in uploads:
137
        upload.user_id = user.user_id
138
139
140
141
142
143
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs

        def create_update(calc):
            return UpdateOne(
                {'_id': calc.calc_id},
144
                {'$set': {'metadata.uploader': user.user_id}})
145
146
147
148
149
150

        proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
        upload.save()

        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
        search.index_all(calcs, do_refresh=False)
        search.refresh()


@uploads.command(help='Change the owner of the upload and all its calcs.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
167
168
169
        proc.Calc._get_collection().update_many(
            dict(upload_id=upload.upload_id),
            {'$set': proc.Calc.reset_pymongo_update()})
170
171
172
173
174

        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
175
176


177
178
179
180
181
182
183
184
185
186
187
188
189
190
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        failed += search.index_all(calcs)
191
        i += 1
192
193
194
195

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


221
@uploads.command(help='Delete selected upload')
222
@click.argument('UPLOADS', nargs=-1)
223
224
225
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
226
@click.pass_context
227
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
228
229
    _, uploads = query_uploads(ctx, uploads)

230
    print('%d uploads selected, deleting ...' % uploads.count())
231

232
    for upload in uploads:
233
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
234
235


236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
@uploads.command(help='Create msgpack file for upload')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def msgpacked(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)

    for upload in uploads:
        upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

        if isinstance(upload_files, files.PublicUploadFiles):
            def iterator(zf, names):
                for name in names:
                    calc_id = name.strip('.json')
                    with zf.open(name) as f:
                        yield (calc_id, json.load(f))

            for access in ['public', 'restricted']:
                zf = upload_files.open_zip_file('archive', access, upload_files._archive_ext)
                archive_name = zf.filename.replace('.zip', '.msg')
                names = [name for name in zf.namelist() if name.endswith('json')]
                print('writing msgpack archive %s' % archive_name)
                write_archive(archive_name, len(names), iterator(zf, names))


260
261
def __run_processing(
        ctx, uploads, parallel: int, process: Callable[[proc.Upload], None], label: str):
262
    _, uploads = query_uploads(ctx, uploads)
263
    uploads_count = uploads.count()
264
    uploads = list(uploads)  # copy the whole mongo query set to avoid cursor timeouts
265
266
267
268
269
270

    cv = threading.Condition()
    threads: List[threading.Thread] = []

    state = dict(
        completed_count=0,
271
        skipped_count=0,
272
        available_threads_count=parallel)
273
274

    logger = utils.get_logger(__name__)
275

276
    print('%d uploads selected, %s ...' % (uploads_count, label))
277

278
279
    def process_upload(upload: proc.Upload):
        logger.info('%s started' % label, upload_id=upload.upload_id)
280

281
282
283
        completed = False
        if upload.process_running:
            logger.warn(
284
                'cannot trigger %s, since the upload is already/still processing' % label,
285
286
                current_process=upload.current_process,
                current_task=upload.current_task, upload_id=upload.upload_id)
287

288
        else:
289
            upload.reset()
290
            process(upload)
291
            upload.block_until_complete(interval=.5)
292
293

            if upload.tasks_status == proc.FAILURE:
294
                logger.info('%s with failure' % label, upload_id=upload.upload_id)
295

296
297
            completed = True

298
            logger.info('%s complete' % label, upload_id=upload.upload_id)
299

300
        with cv:
301
302
            state['completed_count'] += 1 if completed else 0
            state['skipped_count'] += 1 if not completed else 0
303
            state['available_threads_count'] += 1
304
305

            print(
306
307
                '   %s %s and skipped %s of %s uploads' %
                (label, state['completed_count'], state['skipped_count'], uploads_count))
308

309
            cv.notify()
310

311
    for upload in uploads:
312
313
314
        with cv:
            cv.wait_for(lambda: state['available_threads_count'] > 0)
            state['available_threads_count'] -= 1
315
            thread = threading.Thread(target=lambda: process_upload(upload))
316
317
318
319
320
            threads.append(thread)
            thread.start()

    for thread in threads:
        thread.join()
321
322


323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads, parallel: int):
    __run_processing(ctx, uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing')


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
    __run_processing(ctx, uploads, parallel, lambda upload: upload.re_pack(), 're-packing')


339
@uploads.command(help='Attempt to abort the processing of uploads.')
340
@click.argument('UPLOADS', nargs=-1)
341
342
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
343
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
344
@click.pass_context
345
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
346
347
    query, _ = query_uploads(ctx, uploads)

348
349
350
    logger = utils.get_logger(__name__)

    def stop_all(query):
351
352
353
354
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
355

356
357
358
359
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
360
361
362
363
364

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
365
366
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
367
368
369
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
370
                    celery_task_id=process.celery_task_id, **logger_kwargs)
371

372
373
            if kill:
                logger.info(
374
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
375
376
                    **logger_kwargs)

377
378
379
380
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
381

382
383
    running_query = query & (Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING))
    stop_all(proc.Calc.objects(running_query))
384
    if not calcs:
385
        stop_all(proc.Upload.objects(running_query))