uploads.py 9.67 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import List
16
17
18
import click
from tabulate import tabulate
from mongoengine import Q
19
from pymongo import UpdateOne
20
import threading
21

22
from nomad import processing as proc, config, infrastructure, utils, search, files, coe_repo
23
from .admin import admin
24
25


26
@admin.group(help='Upload related commands')
27
28
29
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
30
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
31
@click.pass_context
32
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool):
33
34
35
36
37
38
39
40
41
42
43
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

44
    if outdated:
45
46
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
47
            {'metadata.nomad_version': {'$ne': config.version}})
48
        query &= Q(upload_id__in=uploads)
49

50
51
52
53
54
55
56
57
58
59
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)


def query_uploads(ctx, uploads):
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
60
61


62
@uploads.command(help='List selected uploads')
63
64
65
66
67
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def ls(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)

68
69
70
71
72
73
74
75
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
        [
            [upload.upload_id, upload.name, upload.user_id, upload.process_status, upload.published]
            for upload in uploads[:10]],
        headers=['id', 'name', 'user', 'status', 'published']))


76
@uploads.command(help='Change the owner of the upload and all its calcs.')
77
78
79
80
81
82
83
84
85
@click.argument('USER', nargs=1)
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def chown(ctx, user, uploads):
    infrastructure.setup_repository_db()
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

86
87
    user_id = user
    user = coe_repo.User.from_user_id(int(user_id))
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

    for upload in uploads:
        upload.user_id = user_id
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs

        def create_update(calc):
            return UpdateOne(
                {'_id': calc.calc_id},
                {'$set': {'metadata.uploader': user.to_popo()}})

        proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
        upload.save()

        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        search.publish(calcs)


107
108
109
110
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def index(ctx, uploads):
111
    infrastructure.setup_repository_db()
112
113
114
115
116
117
118
119
120
121
    _, uploads = query_uploads(ctx, uploads)
    uploads_count = uploads.count()

    print('%d uploads selected, indexing ...' % uploads_count)

    i, failed = 0, 0
    for upload in uploads:
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        failed += search.index_all(calcs)
122
        i += 1
123
124
125
126

        print('   indexed %d of %d uploads, failed to index %d entries' % (i, uploads_count, failed))


127
@uploads.command(help='Delete selected upload')
128
@click.argument('UPLOADS', nargs=-1)
129
@click.option('--with-coe-repo', help='Also attempt to delete from repository db', is_flag=True)
130
131
132
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
133
134
135
136
@click.pass_context
def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
    _, uploads = query_uploads(ctx, uploads)

137
    logger = utils.get_logger(__name__)
138
    print('%d uploads selected, deleting ...' % uploads.count())
139
140
141
142
143

    if with_coe_repo:
        from nomad import coe_repo
        infrastructure.setup_repository_db()

144
    for upload in uploads:
145
146
147
148
149
        # delete repository db entry
        if with_coe_repo:
            coe_repo.Upload.delete(upload.upload_id)

        # delete elastic
150
151
        if not skip_es:
            search.delete_upload(upload_id=upload.upload_id)
152
153

        # delete files
154
155
156
157
158
159
160
161
162
163
164
        if not skip_files:
            # do it twice to get the two potential versions 'public' and 'staging'
            for _ in range(0, 2):
                upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

                try:
                    if upload_files is not None:
                        upload_files.delete()
                except Exception as e:
                    logger.error('could not delete files', exc_info=e)
                    break
165
166

        # delete mongo
167
168
169
        if not skip_mongo:
            proc.Calc.objects(upload_id=upload.upload_id).delete()
            upload.delete()
170
171


172
@uploads.command(help='Reprocess selected uploads.')
173
@click.argument('UPLOADS', nargs=-1)
174
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
175
@click.pass_context
176
def re_process(ctx, uploads, parallel: int):
177
    _, uploads = query_uploads(ctx, uploads)
178
    uploads_count = uploads.count()
179
    uploads = list(uploads)  # copy the whole mongo query set to avoid cursor timeouts
180
181
182
183
184
185

    cv = threading.Condition()
    threads: List[threading.Thread] = []

    state = dict(
        completed_count=0,
186
        skipped_count=0,
187
        available_threads_count=parallel)
188
189

    logger = utils.get_logger(__name__)
190
191

    print('%d uploads selected, re-processing ...' % uploads_count)
192

193
    def re_process_upload(upload: proc.Upload):
194
        logger.info('re-processing started', upload_id=upload.upload_id)
195

196
197
198
199
200
201
        completed = False
        if upload.process_running:
            logger.warn(
                'cannot trigger re-process, since the upload is already/still processing',
                current_process=upload.current_process,
                current_task=upload.current_task, upload_id=upload.upload_id)
202

203
        else:
204
            upload.reset()
205
206
            upload.re_process_upload()
            upload.block_until_complete(interval=.5)
207
208
209
210

            if upload.tasks_status == proc.FAILURE:
                logger.info('re-processing with failure', upload_id=upload.upload_id)

211
212
213
            completed = True

            logger.info('re-processing complete', upload_id=upload.upload_id)
214

215
        with cv:
216
217
            state['completed_count'] += 1 if completed else 0
            state['skipped_count'] += 1 if not completed else 0
218
            state['available_threads_count'] += 1
219
220
221
222
223

            print(
                '   re-processed %s and skipped %s of %s uploads' %
                (state['completed_count'], state['skipped_count'], uploads_count))

224
            cv.notify()
225

226
    for upload in uploads:
227
228
229
230
231
232
233
234
235
        with cv:
            cv.wait_for(lambda: state['available_threads_count'] > 0)
            state['available_threads_count'] -= 1
            thread = threading.Thread(target=lambda: re_process_upload(upload))
            threads.append(thread)
            thread.start()

    for thread in threads:
        thread.join()
236
237


238
@uploads.command(help='Attempt to abort the processing of uploads.')
239
@click.argument('UPLOADS', nargs=-1)
240
241
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
242
243
244
245
@click.pass_context
def stop(ctx, uploads, calcs: bool, kill: bool):
    query, _ = query_uploads(ctx, uploads)

246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
    logger = utils.get_logger(__name__)

    def stop_all(query):
        for proc in query:
            logger_kwargs = dict(upload_id=proc.upload_id)
            if isinstance(proc, proc.Calc):
                logger_kwargs.update(calc_id=proc.calc_id)

            logger.info(
                'send terminate celery task', celery_task_id=proc.celery_task_id,
                kill=kill, **logger_kwargs)

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
                proc.app.control.revoke(proc.celery_task_id, terminate=True, **kwargs)
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
                    celery_task_id=proc.celery_task_id, **logger_kwargs)
            if kill:
                logger.info(
                    'fail proc', celery_task_id=proc.celery_task_id, kill=kill,
                    **logger_kwargs)

                proc.fail('process terminate via nomad cli')
                proc.process_status = proc.PROCESS_COMPLETED
                proc.on_process_complete(None)
                proc.save()

    stop_all(proc.Calc.objects(query))
    if not calcs:
        stop_all(proc.Upload.objects(query))