upload.py 22.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
17
The upload API of the nomad@FAIRDI APIs. Provides endpoints to upload files and
get the processing status of uploads.
18
'''
19

20
from typing import Dict, Any
21
from flask import g, request, Response
22
23
from flask_restplus import Resource, fields, abort
from datetime import datetime
24
25
from werkzeug.datastructures import FileStorage
import os.path
26
import os
27
import io
28
from functools import wraps
29

30
from nomad import config, utils, files, search
31
from nomad.processing import Upload, FAILURE
32
from nomad.processing import ProcessAlreadyRunning
33
34
from nomad.app import common
from nomad.app.common import RFC3339DateTime
35

Markus Scheidgen's avatar
Markus Scheidgen committed
36
from .api import api
37
from .auth import authenticate, generate_upload_token
38
from .common import pagination_request_parser, pagination_model, upload_route, metadata_model
39

40

41
ns = api.namespace(
42
    'uploads',
43
44
45
46
47
    description='Uploading data and tracing uploaded data and its processing.')

proc_model = api.model('Processing', {
    'tasks': fields.List(fields.String),
    'current_task': fields.String,
48
    'tasks_running': fields.Boolean,
49
    'tasks_status': fields.String,
50
51
    'errors': fields.List(fields.String),
    'warnings': fields.List(fields.String),
52
53
    'create_time': RFC3339DateTime,
    'complete_time': RFC3339DateTime,
54
55
56
57
58
59
    'current_process': fields.String,
    'process_running': fields.Boolean,
})

calc_metadata_model = api.inherit('CalcMetaData', metadata_model, {
    'mainfile': fields.String(description='The calculation main output file is used to identify the calculation in the upload.'),
60
    '_pid': fields.String(description='Assign a specific pid. It must be unique.'),
61
    'external_id': fields.String(description='External user provided id. Does not have to be unique necessarily.')
62
63
64
})

upload_metadata_model = api.inherit('UploadMetaData', metadata_model, {
65
    'embargo_length': fields.Integer(description='Length of the requested embargo in months.'),
66
    'calculations': fields.List(fields.Nested(model=calc_metadata_model, skip_none=True), description='Specific per calculation data that will override the upload data.')
67
68
})

69
upload_model = api.inherit('UploadProcessing', proc_model, {
70
71
72
73
    'name': fields.String(
        description='The name of the upload. This can be provided during upload '
                    'using the name query parameter.'),
    'upload_id': fields.String(
74
        description='The unique id for the upload.'),
75
    # TODO just removed during migration, where this get particularily large
76
    # 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.', skip_none=True),
77
    'upload_path': fields.String(description='The uploaded file on the server'),
78
    'published': fields.Boolean(description='If this upload is already published'),
79
    'upload_time': RFC3339DateTime(),
80
81
})

82
upload_list_model = api.model('UploadList', {
83
    'pagination': fields.Nested(model=pagination_model, skip_none=True),
84
    'results': fields.List(fields.Nested(model=upload_model, skip_none=True))
85
86
})

87
calc_model = api.inherit('UploadCalculationProcessing', proc_model, {
88
    'calc_id': fields.String,
89
90
    'mainfile': fields.String,
    'upload_id': fields.String,
91
    'parser': fields.String,
92
93
94
    'metadata': fields.Raw(
        attribute='_entry_metadata',
        description='The repository metadata for this entry.')
95
96
97
98
99
100
101
})

upload_with_calcs_model = api.inherit('UploadWithPaginatedCalculations', upload_model, {
    'processed_calcs': fields.Integer,
    'total_calcs': fields.Integer,
    'failed_calcs': fields.Integer,
    'pending_calcs': fields.Integer,
102
103
    'calcs': fields.Nested(model=api.model('UploadPaginatedCalculations', {
        'pagination': fields.Nested(model=api.inherit('UploadCalculationPagination', pagination_model, {
104
105
            'successes': fields.Integer,
            'failures': fields.Integer,
106
        }), skip_none=True),
107
108
        'results': fields.List(fields.Nested(model=calc_model, skip_none=True))
    }), skip_none=True)
109
110
})

111
112
upload_operation_model = api.model('UploadOperation', {
    'operation': fields.String(description='Currently publish is the only operation.'),
113
    'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data. Will replace previously given metadata.')
114
115
116
117
118
119
})


upload_metadata_parser = api.parser()
upload_metadata_parser.add_argument('name', type=str, help='An optional name for the upload.', location='args')
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
120
upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args')
121
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
122

123
upload_list_parser = pagination_request_parser.copy()
124
upload_list_parser.add_argument('state', type=str, help='List uploads with given state: all, unpublished, published.', location='args')
125
126
upload_list_parser.add_argument('name', type=str, help='Filter for uploads with the given name.', location='args')

127

128
129
130
131
132
133
134
135
136
137
138
139
def disable_marshalling(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except DisableMarshalling as e:
            return e.un_marshalled

    return wrapper


def marshal_with(*args, **kwargs):
140
    '''
141
142
    A special version of the RESTPlus marshal_with decorator that allows to disable
    marshalling at runtime by raising DisableMarshalling.
143
    '''
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
    def decorator(func):
        @api.marshal_with(*args, **kwargs)
        def with_marshalling(*args, **kwargs):
            return func(*args, **kwargs)

        @wraps(with_marshalling)
        def wrapper(*args, **kwargs):
            try:
                return with_marshalling(*args, **kwargs)
            except DisableMarshalling as e:
                return e.un_marshalled

        return wrapper
    return decorator


class DisableMarshalling(Exception):
    def __init__(self, body, status, headers):
        super().__init__()
        self.un_marshalled = Response(body, status=status, headers=headers)


166
@ns.route('/')
167
class UploadListResource(Resource):
168
    @api.doc('get_uploads')
169
    @api.response(400, 'Bad parameters')
170
    @api.marshal_with(upload_list_model, skip_none=True, code=200, description='Uploads send')
171
    @api.expect(upload_list_parser)
172
    @authenticate(required=True)
173
    def get(self):
174
        ''' Get the list of all uploads from the authenticated user. '''
175
        try:
176
            state = request.args.get('state', 'unpublished')
177
178
179
180
181
182
183
184
185
186
187
188
            name = request.args.get('name', None)
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
        except Exception:
            abort(400, message='bad parameter types')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

189
        query_kwargs = {}
190
191
192
        if state == 'published':
            query_kwargs.update(published=True)
        elif state == 'unpublished':
193
            query_kwargs.update(published=False)
194
195
196
197
198
        elif state == 'all':
            pass
        else:
            abort(400, message='bad state value %s' % state)

199
200
        if name is not None:
            query_kwargs.update(name=name)
201
202
203
204
205
206

        uploads = Upload.user_uploads(g.user, **query_kwargs)
        total = uploads.count()

        results = [
            upload
Markus Scheidgen's avatar
Markus Scheidgen committed
207
            for upload in uploads.order_by('published', '-upload_time')[(page - 1) * per_page: page * per_page]]
208
209
210
211

        return dict(
            pagination=dict(total=total, page=page, per_page=per_page),
            results=results), 200
212

213
    @api.doc('upload')
214
    @api.expect(upload_metadata_parser)
215
    @api.response(400, 'To many uploads')
216
    @marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
217
    @authenticate(required=True, upload_token=True)
218
    def put(self):
219
        '''
Markus Scheidgen's avatar
Markus Scheidgen committed
220
221
222
223
224
225
226
227
228
        Upload a file and automatically create a new upload in the process.
        Can be used to upload files via browser or other http clients like curl.
        This will also start the processing of the upload.

        There are two basic ways to upload a file: multipart-formdata or simply streaming
        the file data. Both are supported. The later one does not allow to transfer a
        filename or other meta-data. If a filename is available, it will become the
        name of the upload.

229
        Example commands:
Markus Scheidgen's avatar
Markus Scheidgen committed
230

231
232
            curl -X put ".../nomad/api/uploads/" -F file=@local_file
            curl ".../nomad/api/uploads/" --upload-file local_file
233
234
235

        There is a general limit on how many unpublished uploads a user can have. Will
        return 400 if this limit is exceeded.
236
        '''
237
        # check existence of local_path if local_path is used
238
        local_path = request.args.get('local_path')
239
240
241
242
        if local_path:
            if not os.path.exists(local_path):
                abort(404, message='The given local_path was not found.')

243
244
245
246
247
        # check the upload limit
        if not g.user.is_admin:
            if Upload.user_uploads(g.user, published=False).count() >= config.services.upload_limit:
                abort(400, 'Limit of unpublished uploads exceeded for user.')

248
        upload_name = request.args.get('name')
249
        upload_id = utils.create_uuid()
Markus Scheidgen's avatar
Markus Scheidgen committed
250

251
        logger = common.logger.bind(upload_id=upload_id, upload_name=upload_name)
252
        logger.info('upload created', )
Markus Scheidgen's avatar
Markus Scheidgen committed
253

254
255
256
        try:
            if local_path:
                # file is already there and does not to be received
257
                upload_path = local_path
258
            elif request.mimetype in ['multipart/form-data', 'application/multipart-formdata']:
259
                logger.info('receive upload as multipart formdata')
260
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
261
262
                # multipart formdata, e.g. with curl -X put "url" -F file=@local_file
                # might have performance issues for large files: https://github.com/pallets/flask/issues/2086
263
                if 'file' not in request.files:
264
265
                    abort(400, message='Bad multipart-formdata, there is no file part.')
                file = request.files['file']
266
267
                if upload_name is None or upload_name is '':
                    upload_name = file.filename
268

269
                file.save(upload_path)
270
271
            else:
                # simple streaming data in HTTP body, e.g. with curl "url" -T local_file
272
                logger.info('started to receive upload streaming data')
273
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
274
275

                try:
276
                    with open(upload_path, 'wb') as f:
Markus Scheidgen's avatar
Markus Scheidgen committed
277
278
                        received_data = 0
                        received_last = 0
279
                        while True:
Markus Scheidgen's avatar
Markus Scheidgen committed
280
                            data = request.stream.read(io.DEFAULT_BUFFER_SIZE)
281
282
283
                            if len(data) == 0:
                                break

Markus Scheidgen's avatar
Markus Scheidgen committed
284
285
                            received_data += len(data)
                            received_last += len(data)
Markus Scheidgen's avatar
Markus Scheidgen committed
286
                            if received_last > 1e9:
Markus Scheidgen's avatar
Markus Scheidgen committed
287
288
                                received_last = 0
                                # TODO remove this logging or reduce it to debug
289
                                logger.info('received streaming data', size=received_data)
Markus Scheidgen's avatar
Markus Scheidgen committed
290
                            f.write(data)
291
292
293
294
295

                except Exception as e:
                    logger.warning('Error on streaming upload', exc_info=e)
                    abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
        except Exception as e:
296
297
            if not local_path and os.path.isfile(upload_path):
                os.remove(upload_path)
298
299
            logger.info('Invalid or aborted upload')
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
300
301

        logger.info('received uploaded file')
302
303
304
305
306

        upload = Upload.create(
            upload_id=upload_id,
            user=g.user,
            name=upload_name,
307
            upload_time=datetime.utcnow(),
308
309
310
            upload_path=upload_path,
            temporary=local_path != upload_path)

311
        upload.process_upload()
Markus Scheidgen's avatar
Markus Scheidgen committed
312
313
        logger.info('initiated processing')

314
        if bool(request.args.get('token', False)) and request.headers.get('Accept', '') != 'application/json':
315
316
317
318
319
            raise DisableMarshalling(
                '''
Thanks for uploading your data to nomad.
Go back to %s and press reload to see the progress on your upload and publish your data.

320
''' % config.gui_url(),
321
                200, {'Content-Type': 'text/plain; charset=utf-8'})
Markus Scheidgen's avatar
Markus Scheidgen committed
322
323

        return upload, 200
324

Markus Scheidgen's avatar
Markus Scheidgen committed
325

326
327
328
329
class ProxyUpload:
    def __init__(self, upload, calcs):
        self.upload = upload
        self.calcs = calcs
330

331
332
333
334
    def __getattr__(self, name):
        return self.upload.__getattribute__(name)


335
@upload_route(ns)
336
class UploadResource(Resource):
337
    @api.doc('get_upload')
338
    @api.response(404, 'Upload does not exist')
339
    @api.response(400, 'Invalid parameters')
340
    @api.marshal_with(upload_with_calcs_model, skip_none=True, code=200, description='Upload send')
341
    @api.expect(pagination_request_parser)
342
    @authenticate(required=True)
343
    def get(self, upload_id: str):
344
        '''
345
346
347
348
        Get an update for an existing upload.

        Will not only return the upload, but also its calculations paginated.
        Use the pagination params to determine the page.
349
        '''
350
        try:
351
            upload = Upload.get(upload_id)
352
353
354
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

355
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
356
357
358
359
360
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        try:
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
361
            order_by = request.args.get('order_by', None)
362
363
364
365
366
367
368
369
370
371
            order = int(str(request.args.get('order', -1)))
        except Exception:
            abort(400, message='invalid pagination or ordering')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

372
373
374
375
        if order_by is not None:
            order_by = str(order_by)
            if order_by not in ['mainfile', 'tasks_status', 'parser']:
                abort(400, message='invalid order_by field %s' % order_by)
376

377
            order_by = ('-%s' if order == -1 else '+%s') % order_by
378

379
380
381
382
383
384
385
386
387
388
389
390
        # load upload's calcs
        calcs = list(upload.all_calcs(
            (page - 1) * per_page, page * per_page, order_by=order_by))

        calc_ids = [calc.calc_id for calc in calcs]
        search_results = {
            hit['calc_id']: hit
            for hit in search.SearchRequest().search_parameter('calc_id', calc_ids).execute_scan()}

        for calc in calcs:
            calc._entry_metadata = search_results.get(calc.calc_id)

391
        failed_calcs = upload.failed_calcs
392
        result = ProxyUpload(upload, {
393
394
395
            'pagination': dict(
                total=upload.total_calcs, page=page, per_page=per_page,
                successes=upload.processed_calcs - failed_calcs, failures=failed_calcs),
396
            'results': calcs
397
        })
398
399
400

        return result, 200

401
    @api.doc('delete_upload')
402
    @api.response(404, 'Upload does not exist')
403
    @api.response(401, 'Upload does not belong to authenticated user.')
404
    @api.response(400, 'The upload is still/already processed')
405
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload deleted')
406
    @authenticate(required=True)
407
    def delete(self, upload_id: str):
408
        '''
409
        Delete an existing upload.
410

411
        Only uploads that are sill in staging, not already deleted, not still uploaded, and
412
        not currently processed, can be deleted.
413
        '''
414
        try:
415
            upload = Upload.get(upload_id)
416
417
418
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

419
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
420
            abort(401, message='Upload with id %s does not belong to you.' % upload_id)
421

422
423
424
        if upload.published:
            abort(400, message='The upload is already published')

425
        if upload.tasks_running:
426
            abort(400, message='The upload is not processed yet')
427

428
429
430
        try:
            upload.delete_upload()
        except ProcessAlreadyRunning:
431
            abort(400, message='The upload is still processed')
432
        except Exception as e:
433
            common.logger.error('could not delete processing upload', exc_info=e)
434
            raise e
435
436

        return upload, 200
437

438
    @api.doc('exec_upload_operation')
439
    @api.response(404, 'Upload does not exist or not in staging')
440
    @api.response(400, 'Operation is not supported or the upload is still/already processed')
441
442
443
    @api.response(401, 'If the operation is not allowed for the current user')
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload published successfully')
    @api.expect(upload_operation_model)
444
    @authenticate(required=True)
445
    def post(self, upload_id):
446
        '''
447
        Execute an upload operation. Available operations are ``publish`` and ``re-process``
448

449
        Publish accepts further meta data that allows to provide coauthors, comments,
450
        external references, etc. See the model for details. The fields that start with
451
        ``_underscore`` are only available for users with administrative privileges.
452

453
        Publish changes the visibility of the upload. Clients can specify the visibility
454
        via meta data.
455
456
457
458

        Re-process will re-process the upload and produce updated repository metadata and
        archive. Only published uploads that are not processing at the moment are allowed.
        Only for uploads where calculations have been processed with an older nomad version.
459
        '''
460
        try:
461
            upload = Upload.get(upload_id)
462
463
464
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

465
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
466
            abort(404, message='Upload with id %s does not exist.' % upload_id)
467

468
469
470
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
471

472
        operation = json_data.get('operation')
473

474
475
476
477
        user_metadata: Dict[str, Any] = json_data.get('metadata', {})
        metadata: Dict[str, Any] = {}
        for user_key in user_metadata:
            if user_key.startswith('_'):
478
                if not g.user.is_admin:
479
                    abort(401, message='Only admin users can use _metadata_keys.')
480
481
482
483
484
485

                key = user_key[1:]
            else:
                key = user_key

            metadata[key] = user_metadata[user_key]
486

487
        if operation == 'publish':
488
            if upload.tasks_running:
489
                abort(400, message='The upload is not processed yet')
490
            if upload.tasks_status == FAILURE:
491
                abort(400, message='Cannot publish an upload that failed processing')
492
493
            if upload.processed_calcs == 0:
                abort(400, message='Cannot publish an upload without calculations')
494
            try:
495
                upload.compress_and_set_metadata(metadata)
496
                upload.publish_upload()
497
498
            except ProcessAlreadyRunning:
                abort(400, message='The upload is still/already processed')
499

500
            return upload, 200
501
        elif operation == 're-process':
502
            if upload.tasks_running or upload.process_running or not upload.published:
503
504
505
506
507
508
509
510
                abort(400, message='Can only non processing, re-process published uploads')

            if len(metadata) > 0:
                abort(400, message='You can not provide metadata for re-processing')

            if len(upload.outdated_calcs) == 0:
                abort(400, message='You can only re-process uploads with at least one outdated calculation')

511
            upload.reset()
512
513
514
            upload.re_process_upload()

            return upload, 200
515

516
        abort(400, message='Unsupported operation %s.' % operation)
517
518
519
520


upload_command_model = api.model('UploadCommand', {
    'upload_url': fields.Url,
Markus Scheidgen's avatar
Markus Scheidgen committed
521
    'upload_command': fields.String,
522
    'upload_command_with_name': fields.String,
Markus Scheidgen's avatar
Markus Scheidgen committed
523
    'upload_progress_command': fields.String,
524
    'upload_command_form': fields.String,
Markus Scheidgen's avatar
Markus Scheidgen committed
525
    'upload_tar_command': fields.String
526
527
528
529
530
})


@ns.route('/command')
class UploadCommandResource(Resource):
531
    @api.doc('get_upload_command')
532
    @api.marshal_with(upload_command_model, code=200, description='Upload command send')
533
    @authenticate(required=True)
534
    def get(self):
535
        ''' Get url and example command for shell based uploads. '''
536
        token = generate_upload_token(g.user)
537
        upload_url = '%s/uploads/?token=%s' % (config.api_url(ssl=False), token)
538
        upload_url_with_name = upload_url + '&name=<name>'
539

540
        # upload_command = 'curl -X PUT "%s" -F file=@<local_file>' % upload_url
541
542
543

        # Upload via streaming data tends to work much easier, e.g. no mime type issues, etc.
        # It is also easier for the user to unterstand IMHO.
544
        upload_command = 'curl %s -T <local_file>' % upload_url
545

546
        upload_command_form = 'curl %s -X PUT -F file=@<local_file>' % upload_url
547

548
        upload_command_with_name = 'curl "%s" -X PUT -T <local_file>' % upload_url_with_name
549

550
        upload_progress_command = upload_command + ' | xargs echo'
551
        upload_tar_command = 'tar -cf - <local_folder> | curl -# -H %s -T - | xargs echo' % upload_url
552

553
554
555
        return dict(
            upload_url=upload_url,
            upload_command=upload_command,
556
            upload_command_with_name=upload_command_with_name,
557
            upload_progress_command=upload_progress_command,
558
            upload_command_form=upload_command_form,
559
            upload_tar_command=upload_tar_command), 200