upload.py 21.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
"""
16
17
The upload API of the nomad@FAIRDI APIs. Provides endpoints to upload files and
get the processing status of uploads.
18
"""
19

20
from flask import g, request, Response
21
22
from flask_restplus import Resource, fields, abort
from datetime import datetime
23
24
from werkzeug.datastructures import FileStorage
import os.path
25
import os
26
import io
27
from functools import wraps
28

29
from nomad import config, utils, files, search, datamodel
30
from nomad.processing import Upload, FAILURE
31
from nomad.processing import ProcessAlreadyRunning
32
33
from nomad.app import common
from nomad.app.common import RFC3339DateTime
34

Markus Scheidgen's avatar
Markus Scheidgen committed
35
from .api import api
36
from .auth import authenticate, generate_upload_token
37
from .common import pagination_request_parser, pagination_model, upload_route, metadata_model
38

39

40
ns = api.namespace(
41
    'uploads',
42
43
44
    description='Uploading data and tracing uploaded data and its processing.')


45
46
47
48
49
50
class CalcMetadata(fields.Raw):
    def format(self, value):
        calc_with_metadata = datamodel.CalcWithMetadata(**value)
        return search.Entry.from_calc_with_metadata(calc_with_metadata).to_dict()


51
52
53
proc_model = api.model('Processing', {
    'tasks': fields.List(fields.String),
    'current_task': fields.String,
54
    'tasks_running': fields.Boolean,
55
    'tasks_status': fields.String,
56
57
    'errors': fields.List(fields.String),
    'warnings': fields.List(fields.String),
58
59
    'create_time': RFC3339DateTime,
    'complete_time': RFC3339DateTime,
60
61
62
63
64
65
    'current_process': fields.String,
    'process_running': fields.Boolean,
})

calc_metadata_model = api.inherit('CalcMetaData', metadata_model, {
    'mainfile': fields.String(description='The calculation main output file is used to identify the calculation in the upload.'),
66
67
    '_pid': fields.Integer(description='Assign a specific pid. It must be unique.'),
    'external_id': fields.String(description='External user provided id. Does not have to be unique necessarily.')
68
69
70
})

upload_metadata_model = api.inherit('UploadMetaData', metadata_model, {
71
    'embargo_length': fields.Integer(description='Length of the requested embargo in months.'),
72
    'calculations': fields.List(fields.Nested(model=calc_metadata_model, skip_none=True), description='Specific per calculation data that will override the upload data.')
73
74
})

75
upload_model = api.inherit('UploadProcessing', proc_model, {
76
77
78
79
    'name': fields.String(
        description='The name of the upload. This can be provided during upload '
                    'using the name query parameter.'),
    'upload_id': fields.String(
80
        description='The unique id for the upload.'),
81
    # TODO just removed during migration, where this get particularily large
82
    # 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.', skip_none=True),
83
    'upload_path': fields.String(description='The uploaded file on the server'),
84
    'published': fields.Boolean(description='If this upload is already published'),
85
    'upload_time': RFC3339DateTime(),
86
87
})

88
89
upload_list_model = api.model('UploadList', {
    'pagination': fields.Nested(model=pagination_model),
90
    'results': fields.List(fields.Nested(model=upload_model, skip_none=True))
91
92
})

93
calc_model = api.inherit('UploadCalculationProcessing', proc_model, {
94
    'calc_id': fields.String,
95
96
    'mainfile': fields.String,
    'upload_id': fields.String,
97
98
    'parser': fields.String,
    'metadata': CalcMetadata(description='The repository metadata for this entry.')
99
100
101
102
103
104
105
})

upload_with_calcs_model = api.inherit('UploadWithPaginatedCalculations', upload_model, {
    'processed_calcs': fields.Integer,
    'total_calcs': fields.Integer,
    'failed_calcs': fields.Integer,
    'pending_calcs': fields.Integer,
106
107
    'calcs': fields.Nested(model=api.model('UploadPaginatedCalculations', {
        'pagination': fields.Nested(model=api.inherit('UploadCalculationPagination', pagination_model, {
108
109
110
            'successes': fields.Integer,
            'failures': fields.Integer,
        })),
111
112
        'results': fields.List(fields.Nested(model=calc_model, skip_none=True))
    }), skip_none=True)
113
114
})

115
116
upload_operation_model = api.model('UploadOperation', {
    'operation': fields.String(description='Currently publish is the only operation.'),
117
    'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data. Will replace previously given metadata.')
118
119
120
121
122
123
})


upload_metadata_parser = api.parser()
upload_metadata_parser.add_argument('name', type=str, help='An optional name for the upload.', location='args')
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
124
upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args')
125
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
126

127
upload_list_parser = pagination_request_parser.copy()
128
upload_list_parser.add_argument('state', type=str, help='List uploads with given state: all, unpublished, published.', location='args')
129
130
upload_list_parser.add_argument('name', type=str, help='Filter for uploads with the given name.', location='args')

131

132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def disable_marshalling(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except DisableMarshalling as e:
            return e.un_marshalled

    return wrapper


def marshal_with(*args, **kwargs):
    """
    A special version of the RESTPlus marshal_with decorator that allows to disable
    marshalling at runtime by raising DisableMarshalling.
    """
    def decorator(func):
        @api.marshal_with(*args, **kwargs)
        def with_marshalling(*args, **kwargs):
            return func(*args, **kwargs)

        @wraps(with_marshalling)
        def wrapper(*args, **kwargs):
            try:
                return with_marshalling(*args, **kwargs)
            except DisableMarshalling as e:
                return e.un_marshalled

        return wrapper
    return decorator


class DisableMarshalling(Exception):
    def __init__(self, body, status, headers):
        super().__init__()
        self.un_marshalled = Response(body, status=status, headers=headers)


170
@ns.route('/')
171
class UploadListResource(Resource):
172
    @api.doc('get_uploads')
173
    @api.response(400, 'Bad parameters')
174
    @api.marshal_with(upload_list_model, skip_none=True, code=200, description='Uploads send')
175
    @api.expect(upload_list_parser)
176
    @authenticate(required=True)
177
    def get(self):
178
        """ Get the list of all uploads from the authenticated user. """
179
        try:
180
            state = request.args.get('state', 'unpublished')
181
182
183
184
185
186
187
188
189
190
191
192
            name = request.args.get('name', None)
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
        except Exception:
            abort(400, message='bad parameter types')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

193
        query_kwargs = {}
194
195
196
        if state == 'published':
            query_kwargs.update(published=True)
        elif state == 'unpublished':
197
            query_kwargs.update(published=False)
198
199
200
201
202
        elif state == 'all':
            pass
        else:
            abort(400, message='bad state value %s' % state)

203
204
        if name is not None:
            query_kwargs.update(name=name)
205
206
207
208
209
210

        uploads = Upload.user_uploads(g.user, **query_kwargs)
        total = uploads.count()

        results = [
            upload
Markus Scheidgen's avatar
Markus Scheidgen committed
211
            for upload in uploads.order_by('published', '-upload_time')[(page - 1) * per_page: page * per_page]]
212
213
214
215

        return dict(
            pagination=dict(total=total, page=page, per_page=per_page),
            results=results), 200
216

217
    @api.doc('upload')
218
    @api.expect(upload_metadata_parser)
219
    @api.response(400, 'To many uploads')
220
    @marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
221
    @authenticate(required=True, upload_token=True)
222
    def put(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
223
224
225
226
227
228
229
230
231
232
        """
        Upload a file and automatically create a new upload in the process.
        Can be used to upload files via browser or other http clients like curl.
        This will also start the processing of the upload.

        There are two basic ways to upload a file: multipart-formdata or simply streaming
        the file data. Both are supported. The later one does not allow to transfer a
        filename or other meta-data. If a filename is available, it will become the
        name of the upload.

233
        Example commands:
Markus Scheidgen's avatar
Markus Scheidgen committed
234

235
236
            curl -X put ".../nomad/api/uploads/" -F file=@local_file
            curl ".../nomad/api/uploads/" --upload-file local_file
237
238
239

        There is a general limit on how many unpublished uploads a user can have. Will
        return 400 if this limit is exceeded.
Markus Scheidgen's avatar
Markus Scheidgen committed
240
        """
241
        # check existence of local_path if local_path is used
242
        local_path = request.args.get('local_path')
243
244
245
246
        if local_path:
            if not os.path.exists(local_path):
                abort(404, message='The given local_path was not found.')

247
248
249
250
251
        # check the upload limit
        if not g.user.is_admin:
            if Upload.user_uploads(g.user, published=False).count() >= config.services.upload_limit:
                abort(400, 'Limit of unpublished uploads exceeded for user.')

252
        upload_name = request.args.get('name')
253
        upload_id = utils.create_uuid()
Markus Scheidgen's avatar
Markus Scheidgen committed
254

255
        logger = common.logger.bind(upload_id=upload_id, upload_name=upload_name)
256
        logger.info('upload created', )
Markus Scheidgen's avatar
Markus Scheidgen committed
257

258
259
260
        try:
            if local_path:
                # file is already there and does not to be received
261
                upload_path = local_path
262
            elif request.mimetype in ['multipart/form-data', 'application/multipart-formdata']:
263
                logger.info('receive upload as multipart formdata')
264
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
265
266
                # multipart formdata, e.g. with curl -X put "url" -F file=@local_file
                # might have performance issues for large files: https://github.com/pallets/flask/issues/2086
267
                if 'file' not in request.files:
268
269
                    abort(400, message='Bad multipart-formdata, there is no file part.')
                file = request.files['file']
270
271
                if upload_name is None or upload_name is '':
                    upload_name = file.filename
272

273
                file.save(upload_path)
274
275
            else:
                # simple streaming data in HTTP body, e.g. with curl "url" -T local_file
276
                logger.info('started to receive upload streaming data')
277
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
278
279

                try:
280
                    with open(upload_path, 'wb') as f:
Markus Scheidgen's avatar
Markus Scheidgen committed
281
282
                        received_data = 0
                        received_last = 0
283
                        while True:
Markus Scheidgen's avatar
Markus Scheidgen committed
284
                            data = request.stream.read(io.DEFAULT_BUFFER_SIZE)
285
286
287
                            if len(data) == 0:
                                break

Markus Scheidgen's avatar
Markus Scheidgen committed
288
289
                            received_data += len(data)
                            received_last += len(data)
Markus Scheidgen's avatar
Markus Scheidgen committed
290
                            if received_last > 1e9:
Markus Scheidgen's avatar
Markus Scheidgen committed
291
292
                                received_last = 0
                                # TODO remove this logging or reduce it to debug
293
                                logger.info('received streaming data', size=received_data)
Markus Scheidgen's avatar
Markus Scheidgen committed
294
                            f.write(data)
295
296
297
298
299

                except Exception as e:
                    logger.warning('Error on streaming upload', exc_info=e)
                    abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
        except Exception as e:
300
301
            if not local_path and os.path.isfile(upload_path):
                os.remove(upload_path)
302
303
            logger.info('Invalid or aborted upload')
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
304
305

        logger.info('received uploaded file')
306
307
308
309
310

        upload = Upload.create(
            upload_id=upload_id,
            user=g.user,
            name=upload_name,
311
            upload_time=datetime.utcnow(),
312
313
314
            upload_path=upload_path,
            temporary=local_path != upload_path)

315
        upload.process_upload()
Markus Scheidgen's avatar
Markus Scheidgen committed
316
317
        logger.info('initiated processing')

318
        if bool(request.args.get('token', False)) and request.headers.get('Accept', '') != 'application/json':
319
320
321
322
323
            raise DisableMarshalling(
                '''
Thanks for uploading your data to nomad.
Go back to %s and press reload to see the progress on your upload and publish your data.

324
''' % config.gui_url(),
325
                200, {'Content-Type': 'text/plain; charset=utf-8'})
Markus Scheidgen's avatar
Markus Scheidgen committed
326
327

        return upload, 200
328

Markus Scheidgen's avatar
Markus Scheidgen committed
329

330
331
332
333
class ProxyUpload:
    def __init__(self, upload, calcs):
        self.upload = upload
        self.calcs = calcs
334

335
336
337
338
    def __getattr__(self, name):
        return self.upload.__getattribute__(name)


339
@upload_route(ns)
340
class UploadResource(Resource):
341
    @api.doc('get_upload')
342
    @api.response(404, 'Upload does not exist')
343
    @api.response(400, 'Invalid parameters')
344
    @api.marshal_with(upload_with_calcs_model, skip_none=True, code=200, description='Upload send')
345
    @api.expect(pagination_request_parser)
346
    @authenticate(required=True)
347
    def get(self, upload_id: str):
348
        """
349
350
351
352
        Get an update for an existing upload.

        Will not only return the upload, but also its calculations paginated.
        Use the pagination params to determine the page.
353
354
        """
        try:
355
            upload = Upload.get(upload_id)
356
357
358
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

359
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
360
361
362
363
364
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        try:
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
365
            order_by = request.args.get('order_by', None)
366
367
368
369
370
371
372
373
374
375
            order = int(str(request.args.get('order', -1)))
        except Exception:
            abort(400, message='invalid pagination or ordering')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

376
377
378
379
        if order_by is not None:
            order_by = str(order_by)
            if order_by not in ['mainfile', 'tasks_status', 'parser']:
                abort(400, message='invalid order_by field %s' % order_by)
380

381
            order_by = ('-%s' if order == -1 else '+%s') % order_by
382

383
        calcs = upload.all_calcs((page - 1) * per_page, page * per_page, order_by=order_by)
384
        failed_calcs = upload.failed_calcs
385
        result = ProxyUpload(upload, {
386
387
388
            'pagination': dict(
                total=upload.total_calcs, page=page, per_page=per_page,
                successes=upload.processed_calcs - failed_calcs, failures=failed_calcs),
389
390
            'results': [calc for calc in calcs]
        })
391
392
393

        return result, 200

394
    @api.doc('delete_upload')
395
    @api.response(404, 'Upload does not exist')
396
    @api.response(401, 'Upload does not belong to authenticated user.')
397
    @api.response(400, 'The upload is still/already processed')
398
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload deleted')
399
    @authenticate(required=True)
400
    def delete(self, upload_id: str):
401
        """
402
        Delete an existing upload.
403

404
        Only uploads that are sill in staging, not already deleted, not still uploaded, and
405
        not currently processed, can be deleted.
406
407
        """
        try:
408
            upload = Upload.get(upload_id)
409
410
411
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

412
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
413
            abort(401, message='Upload with id %s does not belong to you.' % upload_id)
414

415
416
417
        if upload.published:
            abort(400, message='The upload is already published')

418
        if upload.tasks_running:
419
            abort(400, message='The upload is not processed yet')
420

421
422
423
        try:
            upload.delete_upload()
        except ProcessAlreadyRunning:
424
            abort(400, message='The upload is still processed')
425
        except Exception as e:
426
            common.logger.error('could not delete processing upload', exc_info=e)
427
            raise e
428
429

        return upload, 200
430

431
    @api.doc('exec_upload_operation')
432
    @api.response(404, 'Upload does not exist or not in staging')
433
    @api.response(400, 'Operation is not supported or the upload is still/already processed')
434
435
436
    @api.response(401, 'If the operation is not allowed for the current user')
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload published successfully')
    @api.expect(upload_operation_model)
437
    @authenticate(required=True)
438
439
    def post(self, upload_id):
        """
440
        Execute an upload operation. Available operations are ``publish`` and ``re-process``
441

442
        Publish accepts further meta data that allows to provide coauthors, comments,
443
        external references, etc. See the model for details. The fields that start with
444
        ``_underscore`` are only available for users with administrative privileges.
445

446
        Publish changes the visibility of the upload. Clients can specify the visibility
447
        via meta data.
448
449
450
451

        Re-process will re-process the upload and produce updated repository metadata and
        archive. Only published uploads that are not processing at the moment are allowed.
        Only for uploads where calculations have been processed with an older nomad version.
452
        """
453
        try:
454
            upload = Upload.get(upload_id)
455
456
457
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

458
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
459
            abort(404, message='Upload with id %s does not exist.' % upload_id)
460

461
462
463
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
464

465
        operation = json_data.get('operation')
466

467
468
        metadata = json_data.get('metadata', {})
        for key in metadata:
469
470
            if key.startswith('_'):
                if not g.user.is_admin:
471
                    abort(401, message='Only admin users can use _metadata_keys.')
472
473
                break

474
        if operation == 'publish':
475
            if upload.tasks_running:
476
                abort(400, message='The upload is not processed yet')
477
            if upload.tasks_status == FAILURE:
478
                abort(400, message='Cannot publish an upload that failed processing')
479
480
            if upload.processed_calcs == 0:
                abort(400, message='Cannot publish an upload without calculations')
481
            try:
482
                upload.compress_and_set_metadata(metadata)
483
                upload.publish_upload()
484
485
            except ProcessAlreadyRunning:
                abort(400, message='The upload is still/already processed')
486

487
            return upload, 200
488
        elif operation == 're-process':
489
            if upload.tasks_running or upload.process_running or not upload.published:
490
491
492
493
494
495
496
497
                abort(400, message='Can only non processing, re-process published uploads')

            if len(metadata) > 0:
                abort(400, message='You can not provide metadata for re-processing')

            if len(upload.outdated_calcs) == 0:
                abort(400, message='You can only re-process uploads with at least one outdated calculation')

498
            upload.reset()
499
500
501
            upload.re_process_upload()

            return upload, 200
502

503
        abort(400, message='Unsupported operation %s.' % operation)
504
505
506
507


upload_command_model = api.model('UploadCommand', {
    'upload_url': fields.Url,
Markus Scheidgen's avatar
Markus Scheidgen committed
508
    'upload_command': fields.String,
509
    'upload_command_with_name': fields.String,
Markus Scheidgen's avatar
Markus Scheidgen committed
510
    'upload_progress_command': fields.String,
511
    'upload_command_form': fields.String,
Markus Scheidgen's avatar
Markus Scheidgen committed
512
    'upload_tar_command': fields.String
513
514
515
516
517
})


@ns.route('/command')
class UploadCommandResource(Resource):
518
    @api.doc('get_upload_command')
519
    @api.marshal_with(upload_command_model, code=200, description='Upload command send')
520
    @authenticate(required=True)
521
522
    def get(self):
        """ Get url and example command for shell based uploads. """
523
        token = generate_upload_token(g.user)
524
        upload_url = '%s/uploads/?token=%s' % (config.api_url(ssl=False), token)
525
        upload_url_with_name = upload_url + '&name=<name>'
526

527
        # upload_command = 'curl -X PUT "%s" -F file=@<local_file>' % upload_url
528
529
530

        # Upload via streaming data tends to work much easier, e.g. no mime type issues, etc.
        # It is also easier for the user to unterstand IMHO.
531
        upload_command = 'curl %s -T <local_file>' % upload_url
532

533
        upload_command_form = 'curl %s -X PUT -F file=@<local_file>' % upload_url
534

535
        upload_command_with_name = 'curl "%s" -X PUT -T <local_file>' % upload_url_with_name
536

537
        upload_progress_command = upload_command + ' | xargs echo'
538
        upload_tar_command = 'tar -cf - <local_folder> | curl -# -H %s -T - | xargs echo' % upload_url
539

540
541
542
        return dict(
            upload_url=upload_url,
            upload_command=upload_command,
543
            upload_command_with_name=upload_command_with_name,
544
            upload_progress_command=upload_progress_command,
545
            upload_command_form=upload_command_form,
546
            upload_tar_command=upload_tar_command), 200