uploads.py 10.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import List
16
17
18
import click
from tabulate import tabulate
from mongoengine import Q
19
from pymongo import UpdateOne
20
import threading
21
import elasticsearch_dsl as es
22

23
from nomad import processing as proc, config, infrastructure, utils, search, files, coe_repo
24
from .admin import admin
25
26


27
@admin.group(help='Upload related commands')
28
29
30
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
31
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
32
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
33
@click.pass_context
34
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool, code: List[str]):
35
36
37
38
39
40
41
42
43
44
45
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

46
    if outdated:
47
48
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
49
            {'metadata.nomad_version': {'$ne': config.version}})
50
        query &= Q(upload_id__in=uploads)
51

52
53
54
55
56
57
58
59
60
61
62
63
64
65
    if code is not None and len(code) > 0:
        code_queries = [es.Q('match', code_name=code_name) for code_name in code]
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

        query &= Q(upload_id__in=uploads)

66
67
68
69
70
71
72
73
74
75
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)


def query_uploads(ctx, uploads):
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
76
77


78
@uploads.command(help='List selected uploads')
79
80
81
82
83
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def ls(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)

84
85
86
87
88
89
90
91
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
        [
            [upload.upload_id, upload.name, upload.user_id, upload.process_status, upload.published]
            for upload in uploads[:10]],
        headers=['id', 'name', 'user', 'status', 'published']))


92
@uploads.command(help='Change the owner of the upload and all its calcs.')
93
94
95
96
97
98
99
100
101
@click.argument('USER', nargs=1)
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def chown(ctx, user, uploads):
    infrastructure.setup_repository_db()
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

102
103
    user_id = user
    user = coe_repo.User.from_user_id(int(user_id))
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

    for upload in uploads:
        upload.user_id = user_id
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs

        def create_update(calc):
            return UpdateOne(
                {'_id': calc.calc_id},
                {'$set': {'metadata.uploader': user.to_popo()}})

        proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
        upload.save()

        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        search.publish(calcs)


123
124
125
126
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
127
    infrastructure.setup_repository_db()
128
129
130
131
132
133
134
135
136
137
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        failed += search.index_all(calcs)
138
        i += 1
139
140
141
142

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


143
@uploads.command(help='Delete selected upload')
144
@click.argument('UPLOADS', nargs=-1)
145
@click.option('--with-coe-repo', help='Also attempt to delete from repository db', is_flag=True)
146
147
148
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
149
150
151
152
@click.pass_context
def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
    _, uploads = query_uploads(ctx, uploads)

153
    logger = utils.get_logger(__name__)
154
    print('%d uploads selected, deleting ...' % uploads.count())
155
156
157
158
159

    if with_coe_repo:
        from nomad import coe_repo
        infrastructure.setup_repository_db()

160
    for upload in uploads:
161
162
163
164
165
        # delete repository db entry
        if with_coe_repo:
            coe_repo.Upload.delete(upload.upload_id)

        # delete elastic
166
167
        if not skip_es:
            search.delete_upload(upload_id=upload.upload_id)
168
169

        # delete files
170
171
172
173
174
175
176
177
178
179
180
        if not skip_files:
            # do it twice to get the two potential versions 'public' and 'staging'
            for _ in range(0, 2):
                upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

                try:
                    if upload_files is not None:
                        upload_files.delete()
                except Exception as e:
                    logger.error('could not delete files', exc_info=e)
                    break
181
182

        # delete mongo
183
184
185
        if not skip_mongo:
            proc.Calc.objects(upload_id=upload.upload_id).delete()
            upload.delete()
186
187


188
@uploads.command(help='Reprocess selected uploads.')
189
@click.argument('UPLOADS', nargs=-1)
190
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
191
@click.pass_context
192
def re_process(ctx, uploads, parallel: int):
193
    _, uploads = query_uploads(ctx, uploads)
194
    uploads_count = uploads.count()
195
    uploads = list(uploads)  # copy the whole mongo query set to avoid cursor timeouts
196
197
198
199
200
201

    cv = threading.Condition()
    threads: List[threading.Thread] = []

    state = dict(
        completed_count=0,
202
        skipped_count=0,
203
        available_threads_count=parallel)
204
205

    logger = utils.get_logger(__name__)
206
207

    print('%d uploads selected, re-processing ...' % uploads_count)
208

209
    def re_process_upload(upload: proc.Upload):
210
        logger.info('re-processing started', upload_id=upload.upload_id)
211

212
213
214
215
216
217
        completed = False
        if upload.process_running:
            logger.warn(
                'cannot trigger re-process, since the upload is already/still processing',
                current_process=upload.current_process,
                current_task=upload.current_task, upload_id=upload.upload_id)
218

219
        else:
220
            upload.reset()
221
222
            upload.re_process_upload()
            upload.block_until_complete(interval=.5)
223
224
225
226

            if upload.tasks_status == proc.FAILURE:
                logger.info('re-processing with failure', upload_id=upload.upload_id)

227
228
229
            completed = True

            logger.info('re-processing complete', upload_id=upload.upload_id)
230

231
        with cv:
232
233
            state['completed_count'] += 1 if completed else 0
            state['skipped_count'] += 1 if not completed else 0
234
            state['available_threads_count'] += 1
235
236
237
238
239

            print(
                '   re-processed %s and skipped %s of %s uploads' %
                (state['completed_count'], state['skipped_count'], uploads_count))

240
            cv.notify()
241

242
    for upload in uploads:
243
244
245
246
247
248
249
250
251
        with cv:
            cv.wait_for(lambda: state['available_threads_count'] > 0)
            state['available_threads_count'] -= 1
            thread = threading.Thread(target=lambda: re_process_upload(upload))
            threads.append(thread)
            thread.start()

    for thread in threads:
        thread.join()
252
253


254
@uploads.command(help='Attempt to abort the processing of uploads.')
255
@click.argument('UPLOADS', nargs=-1)
256
257
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
258
@click.option('--no-celery', is_flag=True, help='Do not attempt to stop the actual celery tasks')
259
@click.pass_context
260
def stop(ctx, uploads, calcs: bool, kill: bool, no_celery: bool):
261
    infrastructure.setup_repository_db()
262
263
    query, _ = query_uploads(ctx, uploads)

264
265
266
    logger = utils.get_logger(__name__)

    def stop_all(query):
267
268
269
270
        for process in query:
            logger_kwargs = dict(upload_id=process.upload_id)
            if isinstance(process, proc.Calc):
                logger_kwargs.update(calc_id=process.calc_id)
271

272
273
274
275
            if not no_celery:
                logger.info(
                    'send terminate celery task', celery_task_id=process.celery_task_id,
                    kill=kill, **logger_kwargs)
276
277
278
279
280

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
281
282
                if not no_celery:
                    proc.app.control.revoke(process.celery_task_id, terminate=True, **kwargs)
283
284
285
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
286
                    celery_task_id=process.celery_task_id, **logger_kwargs)
287

288
289
            if kill:
                logger.info(
290
                    'fail proc', celery_task_id=process.celery_task_id, kill=kill,
291
292
                    **logger_kwargs)

293
294
295
296
                process.fail('process terminate via nomad cli')
                process.process_status = proc.PROCESS_COMPLETED
                process.on_process_complete(None)
                process.save()
297

298
299
    running_query = query & (Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING))
    stop_all(proc.Calc.objects(running_query))
300
    if not calcs:
301
        stop_all(proc.Upload.objects(running_query))