uploads.py 12.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import List, Callable
16
17
18
import click
from tabulate import tabulate
from mongoengine import Q
19
from pymongo import UpdateOne
20
import threading
21
import elasticsearch_dsl as es
22

23
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel
24
from .admin import admin
25
26


27
@admin.group(help='Upload related commands')
28
29
30
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
31
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
32
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
33
@click.pass_context
34
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool, code: List[str]):
35
36
37
38
39
40
41
42
43
44
45
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

46
    if outdated:
47
48
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
49
            {'metadata.nomad_version': {'$ne': config.version}})
50
        query &= Q(upload_id__in=uploads)
51

52
53
54
55
56
57
58
59
60
61
62
63
64
65
    if code is not None and len(code) > 0:
        code_queries = [es.Q('match', code_name=code_name) for code_name in code]
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

        query &= Q(upload_id__in=uploads)

66
67
68
69
70
71
72
73
74
75
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)


def query_uploads(ctx, uploads):
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
76
77


78
@uploads.command(help='List selected uploads')
79
@click.argument('UPLOADS', nargs=-1)
80
81
82
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
83
@click.pass_context
84
def ls(ctx, uploads, calculations, ids, json):
85
86
    _, uploads = query_uploads(ctx, uploads)

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

117
118
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
119
120
        [row(upload) for upload in uploads[:10]],
        headers=headers))
121
122


123
@uploads.command(help='Change the owner of the upload and all its calcs.')
124
@click.argument('USERNAME', nargs=1)
125
126
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
127
def chown(ctx, username, uploads):
128
129
130
131
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

132
    user = datamodel.User.get(username=username)
133
134

    for upload in uploads:
135
        upload.user_id = user.user_id
136
137
138
139
140
141
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs

        def create_update(calc):
            return UpdateOne(
                {'_id': calc.calc_id},
142
                {'$set': {'metadata.uploader': user.user_id}})
143
144
145
146
147
148

        proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
        upload.save()

        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
        search.index_all(calcs, do_refresh=False)
        search.refresh()


@uploads.command(help='Change the owner of the upload and all its calcs.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
        if with_calcs:
            for calc in proc.Calc.objects(upload_id=upload.upload_id):
                calc.reset()
                calc.save()

        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
174
175


176
177
178
179
180
181
182
183
184
185
186
187
188
189
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        failed += search.index_all(calcs)
190
        i += 1
191
192
193
194

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


220
@uploads.command(help='Delete selected upload')
221
@click.argument('UPLOADS', nargs=-1)
222
223
224
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
225
@click.pass_context
226
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
227
228
    _, uploads = query_uploads(ctx, uploads)

229
    print('%d uploads selected, deleting ...' % uploads.count())
230

231
    for upload in uploads:
232
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
233
234


235
236
def __run_processing(
        ctx, uploads, parallel: int, process: Callable[[proc.Upload], None], label: str):
237
    _, uploads = query_uploads(ctx, uploads)
238
    uploads_count = uploads.count()
239
    uploads = list(uploads)  # copy the whole mongo query set to avoid cursor timeouts
240
241
242
243
244
245

    cv = threading.Condition()
    threads: List[threading.Thread] = []

    state = dict(
        completed_count=0,
246
        skipped_count=0,
247
        available_threads_count=parallel)
248
249

    logger = utils.get_logger(__name__)
250

251
    print('%d uploads selected, %s ...' % (uploads_count, label))
252

253
254
    def process_upload(upload: proc.Upload):
        logger.info('%s started' % label, upload_id=upload.upload_id)
255

256
257
258
        completed = False
        if upload.process_running:
            logger.warn(
259
                'cannot trigger %s, since the upload is already/still processing' % label,
260
261
                current_process=upload.current_process,
                current_task=upload.current_task, upload_id=upload.upload_id)
262

263
        else:
264
            upload.reset()
265
            process(upload)
266
            upload.block_until_complete(interval=.5)
267
268

            if upload.tasks_status == proc.FAILURE:
269
                logger.info('%s with failure' % label, upload_id=upload.upload_id)
270

271
272
            completed = True

273
            logger.info('%s complete' % label, upload_id=upload.upload_id)
274

275
        with cv:
276
277
            state['completed_count'] += 1 if completed else 0
            state['skipped_count'] += 1 if not completed else 0
278
            state['available_threads_count'] += 1
279
280

            print(
281
282
                '   %s %s and skipped %s of %s uploads' %
                (label, state['completed_count'], state['skipped_count'], uploads_count))
283

284
            cv.notify()
285

286
    for upload in uploads:
287
288
289
        with cv:
            cv.wait_for(lambda: state['available_threads_count'] > 0)
            state['available_threads_count'] -= 1
290
            thread = threading.Thread(target=lambda: process_upload(upload))
291
292
293
294
295
            threads.append(thread)
            thread.start()

    for thread in threads:
        thread.join()
296
297


298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads, parallel: int):
    __run_processing(ctx, uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing')


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
    __run_processing(ctx, uploads, parallel, lambda upload: upload.re_pack(), 're-packing')


314
@uploads.command(help='Attempt to abort the processing of uploads.')
315
@click.argument('UPLOADS', nargs=-1)
316
317
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
318
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
319
@click.pass_context
320
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
321
322
    query, _ = query_uploads(ctx, uploads)

323
324
325
    logger = utils.get_logger(__name__)

    def stop_all(query):
326
327
328
329
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
330

331
332
333
334
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
335
336
337
338
339

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
340
341
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
342
343
344
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
345
                    celery_task_id=process.celery_task_id, **logger_kwargs)
346

347
348
            if kill:
                logger.info(
349
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
350
351
                    **logger_kwargs)

352
353
354
355
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
356

357
358
    running_query = query & (Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING))
    stop_all(proc.Calc.objects(running_query))
359
    if not calcs:
360
        stop_all(proc.Upload.objects(running_query))