uploads.py 10.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import List
16
17
18
import click
from tabulate import tabulate
from mongoengine import Q
19
from pymongo import UpdateOne
20
import elasticsearch_dsl as es
21

22
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel
23
from .admin import admin, __run_processing
24
25


26
@admin.group(help='Upload related commands')
27
28
29
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
30
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
31
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
32
@click.pass_context
33
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool, code: List[str]):
34
35
36
37
38
39
40
41
42
43
44
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

45
    if outdated:
46
47
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
48
            {'metadata.nomad_version': {'$ne': config.version}})
49
        query &= Q(upload_id__in=uploads)
50

51
52
53
54
55
56
57
58
59
60
61
62
63
64
    if code is not None and len(code) > 0:
        code_queries = [es.Q('match', code_name=code_name) for code_name in code]
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

        query &= Q(upload_id__in=uploads)

65
66
67
68
69
70
71
72
73
74
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)


def query_uploads(ctx, uploads):
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
75
76


77
@uploads.command(help='List selected uploads')
78
@click.argument('UPLOADS', nargs=-1)
79
80
81
@click.option('-c', '--calculations', is_flag=True, help='Show details about calculations.')
@click.option('--ids', is_flag=True, help='Only show a list of ids.')
@click.option('--json', is_flag=True, help='Output a JSON array of ids.')
82
@click.pass_context
83
def ls(ctx, uploads, calculations, ids, json):
84
85
    _, uploads = query_uploads(ctx, uploads)

86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
    def row(upload):
        row = [
            upload.upload_id,
            upload.name,
            upload.user_id,
            upload.process_status,
            upload.tasks_status,
            upload.published]

        if calculations:
            row += [
                upload.total_calcs,
                upload.failed_calcs,
                upload.total_calcs - upload.processed_calcs]

        return row

    headers = ['id', 'name', 'user', 'process', 'tasks', 'published']
    if calculations:
        headers += ['calcs', 'failed', 'processing']

    if ids:
        for upload in uploads:
            print(upload.upload_id)
        return

    if json:
        print('[%s]' % ','.join(['"%s"' % upload.upload_id for upload in uploads]))
        return

116
117
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
118
119
        [row(upload) for upload in uploads[:10]],
        headers=headers))
120
121


122
@uploads.command(help='Change the owner of the upload and all its calcs.')
123
@click.argument('USERNAME', nargs=1)
124
125
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
126
def chown(ctx, username, uploads):
127
128
129
130
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

131
    user = datamodel.User.get(username=username)
132
133

    for upload in uploads:
134
        upload.user_id = user.user_id
135
136
137
138
139
140
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs

        def create_update(calc):
            return UpdateOne(
                {'_id': calc.calc_id},
141
                {'$set': {'metadata.uploader': user.user_id}})
142
143
144
145
146
147

        proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
        upload.save()

        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
        search.index_all(calcs, do_refresh=False)
        search.refresh()


@uploads.command(help='Change the owner of the upload and all its calcs.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--with-calcs', is_flag=True, help='Also reset all calculations.')
@click.pass_context
def reset(ctx, uploads, with_calcs):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, resetting their processing ...' % uploads_count)

    i = 0
    for upload in uploads:
164
165
166
        proc.Calc._get_collection().update_many(
            dict(upload_id=upload.upload_id),
            {'$set': proc.Calc.reset_pymongo_update()})
167
168
169
170
171

        upload.reset()
        upload.save()
        i += 1
        print('resetted %d of %d uploads' % (i, uploads_count))
172
173


174
175
176
177
178
179
180
181
182
183
184
185
186
187
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        failed += search.index_all(calcs)
188
        i += 1
189
190
191
192

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def delete_upload(upload, skip_es: bool = False, skip_files: bool = False, skip_mongo: bool = False):
    # delete elastic
    if not skip_es:
        search.delete_upload(upload_id=upload.upload_id)

    # delete files
    if not skip_files:
        # do it twice to get the two potential versions 'public' and 'staging'
        for _ in range(0, 2):
            upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

            try:
                if upload_files is not None:
                    upload_files.delete()
            except Exception as e:
                logger = utils.get_logger(__name__)
                logger.error('could not delete files', exc_info=e)
                break

    # delete mongo
    if not skip_mongo:
        proc.Calc.objects(upload_id=upload.upload_id).delete()
        upload.delete()


218
@uploads.command(help='Delete selected upload')
219
@click.argument('UPLOADS', nargs=-1)
220
221
222
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
223
@click.pass_context
224
def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
225
226
    _, uploads = query_uploads(ctx, uploads)

227
    print('%d uploads selected, deleting ...' % uploads.count())
228

229
    for upload in uploads:
230
        delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
231
232


233
234
235
236
237
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_process(ctx, uploads, parallel: int):
238
239
    _, uploads = query_uploads(ctx, uploads)
    __run_processing(uploads, parallel, lambda upload: upload.re_process_upload(), 're-processing')
240
241
242
243
244
245
246


@uploads.command(help='Repack selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.pass_context
def re_pack(ctx, uploads, parallel: int):
247
248
    _, uploads = query_uploads(ctx, uploads)
    __run_processing(uploads, parallel, lambda upload: upload.re_pack(), 're-packing')
249
250


251
@uploads.command(help='Attempt to abort the processing of uploads.')
252
@click.argument('UPLOADS', nargs=-1)
253
254
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
255
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
256
@click.pass_context
257
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
258
259
    query, _ = query_uploads(ctx, uploads)

260
261
262
    logger = utils.get_logger(__name__)

    def stop_all(query):
263
264
265
266
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
267

268
269
270
271
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
272
273
274
275
276

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
277
278
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
279
280
281
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
282
                    celery_task_id=process.celery_task_id, **logger_kwargs)
283

284
285
            if kill:
                logger.info(
286
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
287
288
                    **logger_kwargs)

289
290
291
292
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
293

294
295
    running_query = query & (Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING))
    stop_all(proc.Calc.objects(running_query))
296
    if not calcs:
297
        stop_all(proc.Upload.objects(running_query))