uploads.py 10.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import List
16
17
18
import click
from tabulate import tabulate
from mongoengine import Q
19
from pymongo import UpdateOne
20
import threading
21
import elasticsearch_dsl as es
22

23
from nomad import processing as proc, config, infrastructure, utils, search, files, coe_repo
24
from .admin import admin
25
26


27
@admin.group(help='Upload related commands')
28
29
30
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
31
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
32
@click.option('--code', multiple=True, type=str, help='Select only uploads with calcs of given codes')
33
@click.pass_context
34
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool, code: List[str]):
35
36
37
38
39
40
41
42
43
44
45
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

46
    if outdated:
47
48
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
49
            {'metadata.nomad_version': {'$ne': config.version}})
50
        query &= Q(upload_id__in=uploads)
51

52
53
54
55
56
57
58
59
60
61
62
63
64
65
    if code is not None and len(code) > 0:
        code_queries = [es.Q('match', code_name=code_name) for code_name in code]
        code_query = es.Q('bool', should=code_queries, minimum_should_match=1)

        code_search = es.Search(index=config.elastic.index_name)
        code_search = code_search.query(code_query)
        code_search.aggs.bucket('uploads', es.A(
            'terms', field='upload_id', size=10000, min_doc_count=1))
        uploads = [
            upload['key']
            for upload in code_search.execute().aggs['uploads']['buckets']]

        query &= Q(upload_id__in=uploads)

66
67
68
69
70
71
72
73
74
75
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)


def query_uploads(ctx, uploads):
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
76
77


78
@uploads.command(help='List selected uploads')
79
80
81
82
83
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def ls(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)

84
85
86
87
88
89
90
91
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
        [
            [upload.upload_id, upload.name, upload.user_id, upload.process_status, upload.published]
            for upload in uploads[:10]],
        headers=['id', 'name', 'user', 'status', 'published']))


92
@uploads.command(help='Change the owner of the upload and all its calcs.')
93
94
95
96
97
98
99
100
101
@click.argument('USER', nargs=1)
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def chown(ctx, user, uploads):
    infrastructure.setup_repository_db()
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

102
103
    user_id = user
    user = coe_repo.User.from_user_id(int(user_id))
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

    for upload in uploads:
        upload.user_id = user_id
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs

        def create_update(calc):
            return UpdateOne(
                {'_id': calc.calc_id},
                {'$set': {'metadata.uploader': user.to_popo()}})

        proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
        upload.save()

        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        search.publish(calcs)


123
124
125
126
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
127
    infrastructure.setup_repository_db()
128
129
130
131
132
133
134
135
136
137
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        failed += search.index_all(calcs)
138
        i += 1
139
140
141
142

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


143
@uploads.command(help='Delete selected upload')
144
@click.argument('UPLOADS', nargs=-1)
145
@click.option('--with-coe-repo', help='Also attempt to delete from repository db', is_flag=True)
146
147
148
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
149
150
151
152
@click.pass_context
def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
    _, uploads = query_uploads(ctx, uploads)

153
    logger = utils.get_logger(__name__)
154
    print('%d uploads selected, deleting ...' % uploads.count())
155
156
157
158
159

    if with_coe_repo:
        from nomad import coe_repo
        infrastructure.setup_repository_db()

160
    for upload in uploads:
161
162
163
164
165
        # delete repository db entry
        if with_coe_repo:
            coe_repo.Upload.delete(upload.upload_id)

        # delete elastic
166
167
        if not skip_es:
            search.delete_upload(upload_id=upload.upload_id)
168
169

        # delete files
170
171
172
173
174
175
176
177
178
179
180
        if not skip_files:
            # do it twice to get the two potential versions 'public' and 'staging'
            for _ in range(0, 2):
                upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

                try:
                    if upload_files is not None:
                        upload_files.delete()
                except Exception as e:
                    logger.error('could not delete files', exc_info=e)
                    break
181
182

        # delete mongo
183
184
185
        if not skip_mongo:
            proc.Calc.objects(upload_id=upload.upload_id).delete()
            upload.delete()
186
187


188
@uploads.command(help='Reprocess selected uploads.')
189
@click.argument('UPLOADS', nargs=-1)
190
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
191
@click.pass_context
192
def re_process(ctx, uploads, parallel: int):
193
    _, uploads = query_uploads(ctx, uploads)
194
    uploads_count = uploads.count()
195
    uploads = list(uploads)  # copy the whole mongo query set to avoid cursor timeouts
196
197
198
199
200
201

    cv = threading.Condition()
    threads: List[threading.Thread] = []

    state = dict(
        completed_count=0,
202
        skipped_count=0,
203
        available_threads_count=parallel)
204
205

    logger = utils.get_logger(__name__)
206
207

    print('%d uploads selected, re-processing ...' % uploads_count)
208

209
    def re_process_upload(upload: proc.Upload):
210
        logger.info('re-processing started', upload_id=upload.upload_id)
211

212
213
214
215
216
217
        completed = False
        if upload.process_running:
            logger.warn(
                'cannot trigger re-process, since the upload is already/still processing',
                current_process=upload.current_process,
                current_task=upload.current_task, upload_id=upload.upload_id)
218

219
        else:
220
            upload.reset()
221
222
            upload.re_process_upload()
            upload.block_until_complete(interval=.5)
223
224
225
226

            if upload.tasks_status == proc.FAILURE:
                logger.info('re-processing with failure', upload_id=upload.upload_id)

227
228
229
            completed = True

            logger.info('re-processing complete', upload_id=upload.upload_id)
230

231
        with cv:
232
233
            state['completed_count'] += 1 if completed else 0
            state['skipped_count'] += 1 if not completed else 0
234
            state['available_threads_count'] += 1
235
236
237
238
239

            print(
                '   re-processed %s and skipped %s of %s uploads' %
                (state['completed_count'], state['skipped_count'], uploads_count))

240
            cv.notify()
241

242
    for upload in uploads:
243
244
245
246
247
248
249
250
251
        with cv:
            cv.wait_for(lambda: state['available_threads_count'] > 0)
            state['available_threads_count'] -= 1
            thread = threading.Thread(target=lambda: re_process_upload(upload))
            threads.append(thread)
            thread.start()

    for thread in threads:
        thread.join()
252
253


254
@uploads.command(help='Attempt to abort the processing of uploads.')
255
@click.argument('UPLOADS', nargs=-1)
256
257
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
258
259
260
261
@click.pass_context
def stop(ctx, uploads, calcs: bool, kill: bool):
    query, _ = query_uploads(ctx, uploads)

262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
    logger = utils.get_logger(__name__)

    def stop_all(query):
        for proc in query:
            logger_kwargs = dict(upload_id=proc.upload_id)
            if isinstance(proc, proc.Calc):
                logger_kwargs.update(calc_id=proc.calc_id)

            logger.info(
                'send terminate celery task', celery_task_id=proc.celery_task_id,
                kill=kill, **logger_kwargs)

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
                proc.app.control.revoke(proc.celery_task_id, terminate=True, **kwargs)
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
                    celery_task_id=proc.celery_task_id, **logger_kwargs)
            if kill:
                logger.info(
                    'fail proc', celery_task_id=proc.celery_task_id, kill=kill,
                    **logger_kwargs)

                proc.fail('process terminate via nomad cli')
                proc.process_status = proc.PROCESS_COMPLETED
                proc.on_process_complete(None)
                proc.save()

    stop_all(proc.Calc.objects(query))
    if not calcs:
        stop_all(proc.Upload.objects(query))