upload.py 21.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The upload API of the nomad@FAIRDI APIs. Provides endpoints to upload files and
get the processing status of uploads.
18
'''
19

20
from typing import Dict, Any
21
from flask import g, request, Response
22
23
from flask_restplus import Resource, fields, abort
from datetime import datetime
24
25
from werkzeug.datastructures import FileStorage
import os.path
26
import os
27
import io
28
from functools import wraps
29

30
from nomad import config, utils, files, datamodel
31
from nomad.processing import Upload, FAILURE
32
from nomad.processing import ProcessAlreadyRunning
33
34
from nomad.app import common
from nomad.app.common import RFC3339DateTime
35

Markus Scheidgen's avatar
Markus Scheidgen committed
36
from .api import api
37
from .auth import authenticate, generate_upload_token
38
from .common import pagination_request_parser, pagination_model, upload_route, metadata_model
39

40

41
ns = api.namespace(
42
    'uploads',
43
44
45
    description='Uploading data and tracing uploaded data and its processing.')


46
47
class CalcMetadata(fields.Raw):
    def format(self, value):
48
        entry_metadata = datamodel.EntryMetadata.m_from_dict(value)
49
        return entry_metadata.a_elastic.create_index_entry().to_dict()
50
51


52
53
54
proc_model = api.model('Processing', {
    'tasks': fields.List(fields.String),
    'current_task': fields.String,
55
    'tasks_running': fields.Boolean,
56
    'tasks_status': fields.String,
57
58
    'errors': fields.List(fields.String),
    'warnings': fields.List(fields.String),
59
60
    'create_time': RFC3339DateTime,
    'complete_time': RFC3339DateTime,
61
62
63
64
65
66
    'current_process': fields.String,
    'process_running': fields.Boolean,
})

calc_metadata_model = api.inherit('CalcMetaData', metadata_model, {
    'mainfile': fields.String(description='The calculation main output file is used to identify the calculation in the upload.'),
67
68
    '_pid': fields.Integer(description='Assign a specific pid. It must be unique.'),
    'external_id': fields.String(description='External user provided id. Does not have to be unique necessarily.')
69
70
71
})

upload_metadata_model = api.inherit('UploadMetaData', metadata_model, {
72
    'embargo_length': fields.Integer(description='Length of the requested embargo in months.'),
73
    'calculations': fields.List(fields.Nested(model=calc_metadata_model, skip_none=True), description='Specific per calculation data that will override the upload data.')
74
75
})

76
upload_model = api.inherit('UploadProcessing', proc_model, {
77
78
79
80
    'name': fields.String(
        description='The name of the upload. This can be provided during upload '
                    'using the name query parameter.'),
    'upload_id': fields.String(
81
        description='The unique id for the upload.'),
82
    # TODO just removed during migration, where this get particularily large
83
    # 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.', skip_none=True),
84
    'upload_path': fields.String(description='The uploaded file on the server'),
85
    'published': fields.Boolean(description='If this upload is already published'),
86
    'upload_time': RFC3339DateTime(),
87
88
})

89
90
upload_list_model = api.model('UploadList', {
    'pagination': fields.Nested(model=pagination_model),
91
    'results': fields.List(fields.Nested(model=upload_model, skip_none=True))
92
93
})

94
calc_model = api.inherit('UploadCalculationProcessing', proc_model, {
95
    'calc_id': fields.String,
96
97
    'mainfile': fields.String,
    'upload_id': fields.String,
98
99
    'parser': fields.String,
    'metadata': CalcMetadata(description='The repository metadata for this entry.')
100
101
102
103
104
105
106
})

upload_with_calcs_model = api.inherit('UploadWithPaginatedCalculations', upload_model, {
    'processed_calcs': fields.Integer,
    'total_calcs': fields.Integer,
    'failed_calcs': fields.Integer,
    'pending_calcs': fields.Integer,
107
108
    'calcs': fields.Nested(model=api.model('UploadPaginatedCalculations', {
        'pagination': fields.Nested(model=api.inherit('UploadCalculationPagination', pagination_model, {
109
110
111
            'successes': fields.Integer,
            'failures': fields.Integer,
        })),
112
113
        'results': fields.List(fields.Nested(model=calc_model, skip_none=True))
    }), skip_none=True)
114
115
})

116
117
upload_operation_model = api.model('UploadOperation', {
    'operation': fields.String(description='Currently publish is the only operation.'),
118
    'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data. Will replace previously given metadata.')
119
120
121
122
123
124
})


upload_metadata_parser = api.parser()
upload_metadata_parser.add_argument('name', type=str, help='An optional name for the upload.', location='args')
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
125
upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args')
126
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
127

128
upload_list_parser = pagination_request_parser.copy()
129
upload_list_parser.add_argument('state', type=str, help='List uploads with given state: all, unpublished, published.', location='args')
130
131
upload_list_parser.add_argument('name', type=str, help='Filter for uploads with the given name.', location='args')

132

133
134
135
136
137
138
139
140
141
142
143
144
def disable_marshalling(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except DisableMarshalling as e:
            return e.un_marshalled

    return wrapper


def marshal_with(*args, **kwargs):
145
    '''
146
147
    A special version of the RESTPlus marshal_with decorator that allows to disable
    marshalling at runtime by raising DisableMarshalling.
148
    '''
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
    def decorator(func):
        @api.marshal_with(*args, **kwargs)
        def with_marshalling(*args, **kwargs):
            return func(*args, **kwargs)

        @wraps(with_marshalling)
        def wrapper(*args, **kwargs):
            try:
                return with_marshalling(*args, **kwargs)
            except DisableMarshalling as e:
                return e.un_marshalled

        return wrapper
    return decorator


class DisableMarshalling(Exception):
    def __init__(self, body, status, headers):
        super().__init__()
        self.un_marshalled = Response(body, status=status, headers=headers)


171
@ns.route('/')
172
class UploadListResource(Resource):
173
    @api.doc('get_uploads')
174
    @api.response(400, 'Bad parameters')
175
    @api.marshal_with(upload_list_model, skip_none=True, code=200, description='Uploads send')
176
    @api.expect(upload_list_parser)
177
    @authenticate(required=True)
178
    def get(self):
179
        ''' Get the list of all uploads from the authenticated user. '''
180
        try:
181
            state = request.args.get('state', 'unpublished')
182
183
184
185
186
187
188
189
190
191
192
193
            name = request.args.get('name', None)
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
        except Exception:
            abort(400, message='bad parameter types')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

194
        query_kwargs = {}
195
196
197
        if state == 'published':
            query_kwargs.update(published=True)
        elif state == 'unpublished':
198
            query_kwargs.update(published=False)
199
200
201
202
203
        elif state == 'all':
            pass
        else:
            abort(400, message='bad state value %s' % state)

204
205
        if name is not None:
            query_kwargs.update(name=name)
206
207
208
209
210
211

        uploads = Upload.user_uploads(g.user, **query_kwargs)
        total = uploads.count()

        results = [
            upload
Markus Scheidgen's avatar
Markus Scheidgen committed
212
            for upload in uploads.order_by('published', '-upload_time')[(page - 1) * per_page: page * per_page]]
213
214
215
216

        return dict(
            pagination=dict(total=total, page=page, per_page=per_page),
            results=results), 200
217

218
    @api.doc('upload')
219
    @api.expect(upload_metadata_parser)
220
    @api.response(400, 'To many uploads')
221
    @marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
222
    @authenticate(required=True, upload_token=True)
223
    def put(self):
224
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
225
226
227
228
229
230
231
232
233
        Upload a file and automatically create a new upload in the process.
        Can be used to upload files via browser or other http clients like curl.
        This will also start the processing of the upload.

        There are two basic ways to upload a file: multipart-formdata or simply streaming
        the file data. Both are supported. The later one does not allow to transfer a
        filename or other meta-data. If a filename is available, it will become the
        name of the upload.

234
        Example commands:
Markus Scheidgen's avatar
Markus Scheidgen committed
235

236
237
            curl -X put ".../nomad/api/uploads/" -F file=@local_file
            curl ".../nomad/api/uploads/" --upload-file local_file
238
239
240

        There is a general limit on how many unpublished uploads a user can have. Will
        return 400 if this limit is exceeded.
241
        '''
242
        # check existence of local_path if local_path is used
243
        local_path = request.args.get('local_path')
244
245
246
247
        if local_path:
            if not os.path.exists(local_path):
                abort(404, message='The given local_path was not found.')

248
249
250
251
252
        # check the upload limit
        if not g.user.is_admin:
            if Upload.user_uploads(g.user, published=False).count() >= config.services.upload_limit:
                abort(400, 'Limit of unpublished uploads exceeded for user.')

253
        upload_name = request.args.get('name')
254
        upload_id = utils.create_uuid()
Markus Scheidgen's avatar
Markus Scheidgen committed
255

256
        logger = common.logger.bind(upload_id=upload_id, upload_name=upload_name)
257
        logger.info('upload created', )
Markus Scheidgen's avatar
Markus Scheidgen committed
258

259
260
261
        try:
            if local_path:
                # file is already there and does not to be received
262
                upload_path = local_path
263
            elif request.mimetype in ['multipart/form-data', 'application/multipart-formdata']:
264
                logger.info('receive upload as multipart formdata')
265
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
266
267
                # multipart formdata, e.g. with curl -X put "url" -F file=@local_file
                # might have performance issues for large files: https://github.com/pallets/flask/issues/2086
268
                if 'file' not in request.files:
269
270
                    abort(400, message='Bad multipart-formdata, there is no file part.')
                file = request.files['file']
271
272
                if upload_name is None or upload_name is '':
                    upload_name = file.filename
273

274
                file.save(upload_path)
275
276
            else:
                # simple streaming data in HTTP body, e.g. with curl "url" -T local_file
277
                logger.info('started to receive upload streaming data')
278
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
279
280

                try:
281
                    with open(upload_path, 'wb') as f:
Markus Scheidgen's avatar
Markus Scheidgen committed
282
283
                        received_data = 0
                        received_last = 0
284
                        while True:
Markus Scheidgen's avatar
Markus Scheidgen committed
285
                            data = request.stream.read(io.DEFAULT_BUFFER_SIZE)
286
287
288
                            if len(data) == 0:
                                break

Markus Scheidgen's avatar
Markus Scheidgen committed
289
290
                            received_data += len(data)
                            received_last += len(data)
Markus Scheidgen's avatar
Markus Scheidgen committed
291
                            if received_last > 1e9:
Markus Scheidgen's avatar
Markus Scheidgen committed
292
293
                                received_last = 0
                                # TODO remove this logging or reduce it to debug
294
                                logger.info('received streaming data', size=received_data)
Markus Scheidgen's avatar
Markus Scheidgen committed
295
                            f.write(data)
296
297
298
299
300

                except Exception as e:
                    logger.warning('Error on streaming upload', exc_info=e)
                    abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
        except Exception as e:
301
302
            if not local_path and os.path.isfile(upload_path):
                os.remove(upload_path)
303
304
            logger.info('Invalid or aborted upload')
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
305
306

        logger.info('received uploaded file')
307
308
309
310
311

        upload = Upload.create(
            upload_id=upload_id,
            user=g.user,
            name=upload_name,
312
            upload_time=datetime.utcnow(),
313
314
315
            upload_path=upload_path,
            temporary=local_path != upload_path)

316
        upload.process_upload()
Markus Scheidgen's avatar
Markus Scheidgen committed
317
318
        logger.info('initiated processing')

319
        if bool(request.args.get('token', False)) and request.headers.get('Accept', '') != 'application/json':
320
321
322
323
324
            raise DisableMarshalling(
                '''
Thanks for uploading your data to nomad.
Go back to %s and press reload to see the progress on your upload and publish your data.

325
''' % config.gui_url(),
326
                200, {'Content-Type': 'text/plain; charset=utf-8'})
Markus Scheidgen's avatar
Markus Scheidgen committed
327
328

        return upload, 200
329

Markus Scheidgen's avatar
Markus Scheidgen committed
330

331
332
333
334
class ProxyUpload:
    def __init__(self, upload, calcs):
        self.upload = upload
        self.calcs = calcs
335

336
337
338
339
    def __getattr__(self, name):
        return self.upload.__getattribute__(name)


340
@upload_route(ns)
341
class UploadResource(Resource):
342
    @api.doc('get_upload')
343
    @api.response(404, 'Upload does not exist')
344
    @api.response(400, 'Invalid parameters')
345
    @api.marshal_with(upload_with_calcs_model, skip_none=True, code=200, description='Upload send')
346
    @api.expect(pagination_request_parser)
347
    @authenticate(required=True)
348
    def get(self, upload_id: str):
349
        '''
350
351
352
353
        Get an update for an existing upload.

        Will not only return the upload, but also its calculations paginated.
        Use the pagination params to determine the page.
354
        '''
355
        try:
356
            upload = Upload.get(upload_id)
357
358
359
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

360
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
361
362
363
364
365
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        try:
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
366
            order_by = request.args.get('order_by', None)
367
368
369
370
371
372
373
374
375
376
            order = int(str(request.args.get('order', -1)))
        except Exception:
            abort(400, message='invalid pagination or ordering')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

377
378
379
380
        if order_by is not None:
            order_by = str(order_by)
            if order_by not in ['mainfile', 'tasks_status', 'parser']:
                abort(400, message='invalid order_by field %s' % order_by)
381

382
            order_by = ('-%s' if order == -1 else '+%s') % order_by
383

384
        calcs = upload.all_calcs((page - 1) * per_page, page * per_page, order_by=order_by)
385
        failed_calcs = upload.failed_calcs
386
        result = ProxyUpload(upload, {
387
388
389
            'pagination': dict(
                total=upload.total_calcs, page=page, per_page=per_page,
                successes=upload.processed_calcs - failed_calcs, failures=failed_calcs),
390
391
            'results': [calc for calc in calcs]
        })
392
393
394

        return result, 200

395
    @api.doc('delete_upload')
396
    @api.response(404, 'Upload does not exist')
397
    @api.response(401, 'Upload does not belong to authenticated user.')
398
    @api.response(400, 'The upload is still/already processed')
399
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload deleted')
400
    @authenticate(required=True)
401
    def delete(self, upload_id: str):
402
        '''
403
        Delete an existing upload.
404

405
        Only uploads that are sill in staging, not already deleted, not still uploaded, and
406
        not currently processed, can be deleted.
407
        '''
408
        try:
409
            upload = Upload.get(upload_id)
410
411
412
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

413
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
414
            abort(401, message='Upload with id %s does not belong to you.' % upload_id)
415

416
417
418
        if upload.published:
            abort(400, message='The upload is already published')

419
        if upload.tasks_running:
420
            abort(400, message='The upload is not processed yet')
421

422
423
424
        try:
            upload.delete_upload()
        except ProcessAlreadyRunning:
425
            abort(400, message='The upload is still processed')
426
        except Exception as e:
427
            common.logger.error('could not delete processing upload', exc_info=e)
428
            raise e
429
430

        return upload, 200
431

432
    @api.doc('exec_upload_operation')
433
    @api.response(404, 'Upload does not exist or not in staging')
434
    @api.response(400, 'Operation is not supported or the upload is still/already processed')
435
436
437
    @api.response(401, 'If the operation is not allowed for the current user')
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload published successfully')
    @api.expect(upload_operation_model)
438
    @authenticate(required=True)
439
    def post(self, upload_id):
440
        '''
441
        Execute an upload operation. Available operations are ``publish`` and ``re-process``
442

443
        Publish accepts further meta data that allows to provide coauthors, comments,
444
        external references, etc. See the model for details. The fields that start with
445
        ``_underscore`` are only available for users with administrative privileges.
446

447
        Publish changes the visibility of the upload. Clients can specify the visibility
448
        via meta data.
449
450
451
452

        Re-process will re-process the upload and produce updated repository metadata and
        archive. Only published uploads that are not processing at the moment are allowed.
        Only for uploads where calculations have been processed with an older nomad version.
453
        '''
454
        try:
455
            upload = Upload.get(upload_id)
456
457
458
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

459
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
460
            abort(404, message='Upload with id %s does not exist.' % upload_id)
461

462
463
464
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
465

466
        operation = json_data.get('operation')
467

468
469
470
471
        user_metadata: Dict[str, Any] = json_data.get('metadata', {})
        metadata: Dict[str, Any] = {}
        for user_key in user_metadata:
            if user_key.startswith('_'):
472
                if not g.user.is_admin:
473
                    abort(401, message='Only admin users can use _metadata_keys.')
474
475
476
477
478
479

                key = user_key[1:]
            else:
                key = user_key

            metadata[key] = user_metadata[user_key]
480

481
        if operation == 'publish':
482
            if upload.tasks_running:
483
                abort(400, message='The upload is not processed yet')
484
            if upload.tasks_status == FAILURE:
485
                abort(400, message='Cannot publish an upload that failed processing')
486
487
            if upload.processed_calcs == 0:
                abort(400, message='Cannot publish an upload without calculations')
488
            try:
489
                upload.compress_and_set_metadata(metadata)
490
                upload.publish_upload()
491
492
            except ProcessAlreadyRunning:
                abort(400, message='The upload is still/already processed')
493

494
            return upload, 200
495
        elif operation == 're-process':
496
            if upload.tasks_running or upload.process_running or not upload.published:
497
498
499
500
501
502
503
504
                abort(400, message='Can only non processing, re-process published uploads')

            if len(metadata) > 0:
                abort(400, message='You can not provide metadata for re-processing')

            if len(upload.outdated_calcs) == 0:
                abort(400, message='You can only re-process uploads with at least one outdated calculation')

505
            upload.reset()
506
507
508
            upload.re_process_upload()

            return upload, 200
509

510
        abort(400, message='Unsupported operation %s.' % operation)
511
512
513
514


upload_command_model = api.model('UploadCommand', {
    'upload_url': fields.Url,
Markus Scheidgen's avatar
Markus Scheidgen committed
515
    'upload_command': fields.String,
516
    'upload_command_with_name': fields.String,
Markus Scheidgen's avatar
Markus Scheidgen committed
517
    'upload_progress_command': fields.String,
518
    'upload_command_form': fields.String,
Markus Scheidgen's avatar
Markus Scheidgen committed
519
    'upload_tar_command': fields.String
520
521
522
523
524
})


@ns.route('/command')
class UploadCommandResource(Resource):
525
    @api.doc('get_upload_command')
526
    @api.marshal_with(upload_command_model, code=200, description='Upload command send')
527
    @authenticate(required=True)
528
    def get(self):
529
        ''' Get url and example command for shell based uploads. '''
530
        token = generate_upload_token(g.user)
531
        upload_url = '%s/uploads/?token=%s' % (config.api_url(ssl=False), token)
532
        upload_url_with_name = upload_url + '&name=<name>'
533

534
        # upload_command = 'curl -X PUT "%s" -F file=@<local_file>' % upload_url
535
536
537

        # Upload via streaming data tends to work much easier, e.g. no mime type issues, etc.
        # It is also easier for the user to unterstand IMHO.
538
        upload_command = 'curl %s -T <local_file>' % upload_url
539

540
        upload_command_form = 'curl %s -X PUT -F file=@<local_file>' % upload_url
541

542
        upload_command_with_name = 'curl "%s" -X PUT -T <local_file>' % upload_url_with_name
543

544
        upload_progress_command = upload_command + ' | xargs echo'
545
        upload_tar_command = 'tar -cf - <local_folder> | curl -# -H %s -T - | xargs echo' % upload_url
546

547
548
549
        return dict(
            upload_url=upload_url,
            upload_command=upload_command,
550
            upload_command_with_name=upload_command_with_name,
551
            upload_progress_command=upload_progress_command,
552
            upload_command_form=upload_command_form,
553
            upload_tar_command=upload_tar_command), 200