uploads.py 8.31 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from typing import List
16
17
18
import click
from tabulate import tabulate
from mongoengine import Q
19
from pymongo import UpdateOne
20
import threading
21

22
from nomad import processing as proc, config, infrastructure, utils, search, files, coe_repo
23
from .admin import admin
24
25


26
@admin.group(help='Upload related commands')
27
28
29
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
30
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
31
@click.pass_context
32
def uploads(ctx, user: str, staging: bool, processing: bool, outdated: bool):
33
34
35
36
37
38
39
40
41
42
43
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

44
    if outdated:
45
46
        uploads = proc.Calc._get_collection().distinct(
            'upload_id',
47
            {'metadata.nomad_version': {'$ne': config.version}})
48
        query &= Q(upload_id__in=uploads)
49

50
51
52
53
54
55
56
57
58
59
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)


def query_uploads(ctx, uploads):
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
60
61


62
@uploads.command(help='List selected uploads')
63
64
65
66
67
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def ls(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)

68
69
70
71
72
73
74
75
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
        [
            [upload.upload_id, upload.name, upload.user_id, upload.process_status, upload.published]
            for upload in uploads[:10]],
        headers=['id', 'name', 'user', 'status', 'published']))


76
@uploads.command(help='Change the owner of the upload and all its calcs.')
77
78
79
80
81
82
83
84
85
@click.argument('USER', nargs=1)
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def chown(ctx, user, uploads):
    infrastructure.setup_repository_db()
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

86
87
    user_id = user
    user = coe_repo.User.from_user_id(int(user_id))
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106

    for upload in uploads:
        upload.user_id = user_id
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs

        def create_update(calc):
            return UpdateOne(
                {'_id': calc.calc_id},
                {'$set': {'metadata.uploader': user.to_popo()}})

        proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
        upload.save()

        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        search.publish(calcs)


107
@uploads.command(help='Delete selected upload')
108
@click.argument('UPLOADS', nargs=-1)
109
@click.option('--with-coe-repo', help='Also attempt to delete from repository db', is_flag=True)
110
111
112
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
113
114
115
116
@click.pass_context
def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
    _, uploads = query_uploads(ctx, uploads)

117
    logger = utils.get_logger(__name__)
118
    print('%d uploads selected, deleting ...' % uploads.count())
119
120
121
122
123

    if with_coe_repo:
        from nomad import coe_repo
        infrastructure.setup_repository_db()

124
    for upload in uploads:
125
126
127
128
129
        # delete repository db entry
        if with_coe_repo:
            coe_repo.Upload.delete(upload.upload_id)

        # delete elastic
130
131
        if not skip_es:
            search.delete_upload(upload_id=upload.upload_id)
132
133

        # delete files
134
135
136
137
138
139
140
141
142
143
144
        if not skip_files:
            # do it twice to get the two potential versions 'public' and 'staging'
            for _ in range(0, 2):
                upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

                try:
                    if upload_files is not None:
                        upload_files.delete()
                except Exception as e:
                    logger.error('could not delete files', exc_info=e)
                    break
145
146

        # delete mongo
147
148
149
        if not skip_mongo:
            proc.Calc.objects(upload_id=upload.upload_id).delete()
            upload.delete()
150
151


152
@uploads.command(help='Reprocess selected uploads.')
153
@click.argument('UPLOADS', nargs=-1)
154
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
155
@click.pass_context
156
def re_process(ctx, uploads, parallel: int):
157
    _, uploads = query_uploads(ctx, uploads)
158
    uploads_count = uploads.count()
159
    uploads = list(uploads)  # copy the whole mongo query set to avoid cursor timeouts
160
161
162
163
164
165
166

    cv = threading.Condition()
    threads: List[threading.Thread] = []

    state = dict(
        completed_count=0,
        available_threads_count=parallel)
167
168

    logger = utils.get_logger(__name__)
169
170

    print('%d uploads selected, re-processing ...' % uploads_count)
171

172
    def re_process_upload(upload):
173
        logger.info('re-processing started', upload_id=upload.upload_id)
174
175
176
177

        upload.re_process_upload()
        upload.block_until_complete(interval=.1)

178
        logger.info('re-processing complete', upload_id=upload.upload_id)
179

180
181
182
183
184
        with cv:
            state['completed_count'] += 1
            print('   re-processed %s of %s uploads' % (state['completed_count'], uploads_count))
            state['available_threads_count'] += 1
            cv.notify()
185

186
    for upload in uploads:
187
188
189
190
191
192
193
194
195
        with cv:
            cv.wait_for(lambda: state['available_threads_count'] > 0)
            state['available_threads_count'] -= 1
            thread = threading.Thread(target=lambda: re_process_upload(upload))
            threads.append(thread)
            thread.start()

    for thread in threads:
        thread.join()
196
197


198
@uploads.command(help='Attempt to abort the processing of uploads.')
199
@click.argument('UPLOADS', nargs=-1)
200
201
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
202
203
204
205
@click.pass_context
def stop(ctx, uploads, calcs: bool, kill: bool):
    query, _ = query_uploads(ctx, uploads)

206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    logger = utils.get_logger(__name__)

    def stop_all(query):
        for proc in query:
            logger_kwargs = dict(upload_id=proc.upload_id)
            if isinstance(proc, proc.Calc):
                logger_kwargs.update(calc_id=proc.calc_id)

            logger.info(
                'send terminate celery task', celery_task_id=proc.celery_task_id,
                kill=kill, **logger_kwargs)

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
                proc.app.control.revoke(proc.celery_task_id, terminate=True, **kwargs)
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
                    celery_task_id=proc.celery_task_id, **logger_kwargs)
            if kill:
                logger.info(
                    'fail proc', celery_task_id=proc.celery_task_id, kill=kill,
                    **logger_kwargs)

                proc.fail('process terminate via nomad cli')
                proc.process_status = proc.PROCESS_COMPLETED
                proc.on_process_complete(None)
                proc.save()

    stop_all(proc.Calc.objects(query))
    if not calcs:
        stop_all(proc.Upload.objects(query))