upload.py 7.61 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
from tabulate import tabulate
from mongoengine import Q
18
from pymongo import UpdateOne
19

20
from nomad import processing as proc, config, infrastructure, utils, search, files, coe_repo
21
22
23
24
25
26
27
from .__main__ import cli


@cli.group(help='Upload related commands')
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--staging', help='Select only uploads in staging', is_flag=True)
@click.option('--processing', help='Select only processing uploads', is_flag=True)
28
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
29
@click.pass_context
30
def upload(ctx, user: str, staging: bool, processing: bool, outdated: bool):
31
32
33
34
35
36
37
38
39
40
41
    infrastructure.setup_mongo()
    infrastructure.setup_elastic()

    query = Q()
    if user is not None:
        query &= Q(user_id=user)
    if staging:
        query &= Q(published=False)
    if processing:
        query &= Q(process_status=proc.PROCESS_RUNNING) | Q(tasks_status=proc.RUNNING)

42
43
44
45
46
    if outdated:
        uploads = proc.Calc.objects(
            {'metadata.nomad_version__ne': config.version}).distinct(field='upload_id')
        query &= Q(uploads__in=uploads)

47
48
49
50
51
52
53
54
55
56
    ctx.obj.query = query
    ctx.obj.uploads = proc.Upload.objects(query)


def query_uploads(ctx, uploads):
    query = ctx.obj.query
    if len(uploads) > 0:
        query &= Q(upload_id__in=uploads)

    return query, proc.Upload.objects(query)
57
58
59


@upload.command(help='List selected uploads')
60
61
62
63
64
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def ls(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)

65
66
67
68
69
70
71
72
    print('%d uploads selected, showing no more than first 10' % uploads.count())
    print(tabulate(
        [
            [upload.upload_id, upload.name, upload.user_id, upload.process_status, upload.published]
            for upload in uploads[:10]],
        headers=['id', 'name', 'user', 'status', 'published']))


73
74
75
76
77
78
79
80
81
82
@upload.command(help='Change the owner of the upload and all its calcs.')
@click.argument('USER', nargs=1)
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def chown(ctx, user, uploads):
    infrastructure.setup_repository_db()
    _, uploads = query_uploads(ctx, uploads)

    print('%d uploads selected, changing its owner ...' % uploads.count())

83
84
    user_id = user
    user = coe_repo.User.from_user_id(int(user_id))
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

    for upload in uploads:
        upload.user_id = user_id
        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs

        def create_update(calc):
            return UpdateOne(
                {'_id': calc.calc_id},
                {'$set': {'metadata.uploader': user.to_popo()}})

        proc.Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
        upload.save()

        upload_with_metadata = upload.to_upload_with_metadata()
        calcs = upload_with_metadata.calcs
        search.publish(calcs)


104
@upload.command(help='Delete selected upload')
105
@click.argument('UPLOADS', nargs=-1)
106
@click.option('--with-coe-repo', help='Also attempt to delete from repository db', is_flag=True)
107
108
109
@click.option('--skip-es', help='Keep the elastic index version of the data.', is_flag=True)
@click.option('--skip-mongo', help='Keep uploads and calcs in mongo.', is_flag=True)
@click.option('--skip-files', help='Keep all related files.', is_flag=True)
110
111
112
113
@click.pass_context
def rm(ctx, uploads, with_coe_repo, skip_es, skip_mongo, skip_files):
    _, uploads = query_uploads(ctx, uploads)

114
    logger = utils.get_logger(__name__)
115
    print('%d uploads selected, deleting ...' % uploads.count())
116
117
118
119
120

    if with_coe_repo:
        from nomad import coe_repo
        infrastructure.setup_repository_db()

121
    for upload in uploads:
122
123
124
125
126
        # delete repository db entry
        if with_coe_repo:
            coe_repo.Upload.delete(upload.upload_id)

        # delete elastic
127
128
        if not skip_es:
            search.delete_upload(upload_id=upload.upload_id)
129
130

        # delete files
131
132
133
134
135
136
137
138
139
140
141
        if not skip_files:
            # do it twice to get the two potential versions 'public' and 'staging'
            for _ in range(0, 2):
                upload_files = files.UploadFiles.get(upload_id=upload.upload_id)

                try:
                    if upload_files is not None:
                        upload_files.delete()
                except Exception as e:
                    logger.error('could not delete files', exc_info=e)
                    break
142
143

        # delete mongo
144
145
146
        if not skip_mongo:
            proc.Calc.objects(upload_id=upload.upload_id).delete()
            upload.delete()
147
148


149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@upload.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def re_process(ctx, uploads):
    _, uploads = query_uploads(ctx, uploads)

    logger = utils.get_logger(__name__)
    print('%d uploads selected, re-processing ...' % uploads.count())

    def re_process_upload(upload_id: str):
        logger.info('re-processing started', upload_id=upload_id)
        upload = proc.Upload.objects(upload_id=upload_id).first()
        if upload is None:
            logger.error('upload for re-processing does not exist', upload_id=upload_id)

        upload.re_process_upload()
        upload.block_until_complete(interval=.1)

        logger.info('re-processing complete', upload_id=upload_id)

    count = 0
    for upload_id in uploads:
        re_process_upload(upload_id)
        count += 1
        print('   re-processed %s of %s uploads' % (count, len(uploads)))


176
@upload.command(help='Attempt to abort the processing of uploads.')
177
@click.argument('UPLOADS', nargs=-1)
178
179
@click.option('--calcs', is_flag=True, help='Only stop calculation processing.')
@click.option('--kill', is_flag=True, help='Use the kill signal and force task failure.')
180
181
182
183
@click.pass_context
def stop(ctx, uploads, calcs: bool, kill: bool):
    query, _ = query_uploads(ctx, uploads)

184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
    logger = utils.get_logger(__name__)

    def stop_all(query):
        for proc in query:
            logger_kwargs = dict(upload_id=proc.upload_id)
            if isinstance(proc, proc.Calc):
                logger_kwargs.update(calc_id=proc.calc_id)

            logger.info(
                'send terminate celery task', celery_task_id=proc.celery_task_id,
                kill=kill, **logger_kwargs)

            kwargs = {}
            if kill:
                kwargs.update(signal='SIGKILL')
            try:
                proc.app.control.revoke(proc.celery_task_id, terminate=True, **kwargs)
            except Exception as e:
                logger.warning(
                    'could not revoke celery task', exc_info=e,
                    celery_task_id=proc.celery_task_id, **logger_kwargs)
            if kill:
                logger.info(
                    'fail proc', celery_task_id=proc.celery_task_id, kill=kill,
                    **logger_kwargs)

                proc.fail('process terminate via nomad cli')
                proc.process_status = proc.PROCESS_COMPLETED
                proc.on_process_complete(None)
                proc.save()

    stop_all(proc.Calc.objects(query))
    if not calcs:
        stop_all(proc.Upload.objects(query))