upload.py 22.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
"""
16
17
The upload API of the nomad@FAIRDI APIs. Provides endpoints to upload files and
get the processing status of uploads.
18
"""
19

20
from flask import g, request, Response
21
22
from flask_restplus import Resource, fields, abort
from datetime import datetime
23
24
from werkzeug.datastructures import FileStorage
import os.path
25
import os
26
import io
27
from functools import wraps
28

29
from nomad import config, utils, files
30
from nomad.processing import Upload, FAILURE
31
from nomad.processing import ProcessAlreadyRunning
32

Markus Scheidgen's avatar
Markus Scheidgen committed
33
34
from nomad.app.utils import with_logger, RFC3339DateTime
from .api import api
35
from .auth import login_really_required
36
from .common import pagination_request_parser, pagination_model, upload_route
37

38

39
ns = api.namespace(
40
    'uploads',
41
42
43
44
45
46
    description='Uploading data and tracing uploaded data and its processing.')


proc_model = api.model('Processing', {
    'tasks': fields.List(fields.String),
    'current_task': fields.String,
47
    'tasks_running': fields.Boolean,
48
    'tasks_status': fields.String,
49
50
    'errors': fields.List(fields.String),
    'warnings': fields.List(fields.String),
51
52
    'create_time': RFC3339DateTime,
    'complete_time': RFC3339DateTime,
53
54
55
56
    'current_process': fields.String,
    'process_running': fields.Boolean,
})

Markus Scheidgen's avatar
Markus Scheidgen committed
57
58
59
60
61
62
dataset_model = api.model('DataSet', {
    'id': fields.Integer(required=True, description='The repository db dataset id'),
    '_doi': fields.String(description='The DOI of the dataset'),
    '_name': fields.String(description='The unique dataset name')
})

63
64
65
66
metadata_model = api.model('MetaData', {
    'with_embargo': fields.Boolean(default=False, description='Data with embargo is only visible to the upload until the embargo period ended.'),
    'comment': fields.String(description='The comment are shown in the repository for each calculation.'),
    'references': fields.List(fields.String, descriptions='References allow to link calculations to external source, e.g. URLs.'),
Markus Scheidgen's avatar
Markus Scheidgen committed
67
68
    'coauthors': fields.List(fields.Integer, description='A list of co-authors given by user_id.'),
    'shared_with': fields.List(fields.Integer, description='A list of users to share calculations with given by user_id.'),
69
    '_upload_time': RFC3339DateTime(description='Overrride the upload time.'),
Markus Scheidgen's avatar
Markus Scheidgen committed
70
    '_uploader': fields.Integer(description='Override the uploader with the given user id.'),
71
    'datasets': fields.List(fields.Nested(model=dataset_model, skip_none=True), description='A list of datasets.')
72
73
74
75
})

calc_metadata_model = api.inherit('CalcMetaData', metadata_model, {
    'mainfile': fields.String(description='The calculation main output file is used to identify the calculation in the upload.'),
Markus Scheidgen's avatar
Markus Scheidgen committed
76
    '_pid': fields.Integer(description='Assign a specific pid. It must be unique.')
77
78
79
})

upload_metadata_model = api.inherit('UploadMetaData', metadata_model, {
80
    'calculations': fields.List(fields.Nested(model=calc_metadata_model, skip_none=True), description='Specific per calculation data that will override the upload data.')
81
82
})

83
upload_model = api.inherit('UploadProcessing', proc_model, {
84
85
86
87
    'name': fields.String(
        description='The name of the upload. This can be provided during upload '
                    'using the name query parameter.'),
    'upload_id': fields.String(
88
        description='The unique id for the upload.'),
89
    # TODO just removed during migration, where this get particularily large
90
    # 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.', skip_none=True),
91
    'upload_path': fields.String(description='The uploaded file on the server'),
92
    'published': fields.Boolean(description='If this upload is already published'),
93
    'upload_time': RFC3339DateTime(),
94
95
})

96
97
upload_list_model = api.model('UploadList', {
    'pagination': fields.Nested(model=pagination_model),
98
    'results': fields.List(fields.Nested(model=upload_model, skip_none=True))
99
100
})

101
calc_model = api.inherit('UploadCalculationProcessing', proc_model, {
102
    'calc_id': fields.String,
103
104
105
106
107
108
109
110
111
112
    'mainfile': fields.String,
    'upload_id': fields.String,
    'parser': fields.String
})

upload_with_calcs_model = api.inherit('UploadWithPaginatedCalculations', upload_model, {
    'processed_calcs': fields.Integer,
    'total_calcs': fields.Integer,
    'failed_calcs': fields.Integer,
    'pending_calcs': fields.Integer,
113
114
    'calcs': fields.Nested(model=api.model('UploadPaginatedCalculations', {
        'pagination': fields.Nested(model=api.inherit('UploadCalculationPagination', pagination_model, {
115
116
117
            'successes': fields.Integer,
            'failures': fields.Integer,
        })),
118
119
        'results': fields.List(fields.Nested(model=calc_model, skip_none=True))
    }), skip_none=True)
120
121
})

122
123
upload_operation_model = api.model('UploadOperation', {
    'operation': fields.String(description='Currently publish is the only operation.'),
124
    'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data. Will replace previously given metadata.')
125
126
127
128
129
130
})


upload_metadata_parser = api.parser()
upload_metadata_parser.add_argument('name', type=str, help='An optional name for the upload.', location='args')
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
131
upload_metadata_parser.add_argument('curl', type=bool, help='Provide a human readable message as body.', location='args')
132
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')
133

134
upload_list_parser = pagination_request_parser.copy()
135
upload_list_parser.add_argument('state', type=str, help='List uploads with given state: all, unpublished, published.', location='args')
136
137
upload_list_parser.add_argument('name', type=str, help='Filter for uploads with the given name.', location='args')

138

139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def disable_marshalling(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except DisableMarshalling as e:
            print(e.un_marshalled)
            return e.un_marshalled

    return wrapper


def marshal_with(*args, **kwargs):
    """
    A special version of the RESTPlus marshal_with decorator that allows to disable
    marshalling at runtime by raising DisableMarshalling.
    """
    def decorator(func):
        @api.marshal_with(*args, **kwargs)
        def with_marshalling(*args, **kwargs):
            return func(*args, **kwargs)

        @wraps(with_marshalling)
        def wrapper(*args, **kwargs):
            try:
                return with_marshalling(*args, **kwargs)
            except DisableMarshalling as e:
                print(e.un_marshalled)
                return e.un_marshalled

        return wrapper
    return decorator


class DisableMarshalling(Exception):
    def __init__(self, body, status, headers):
        super().__init__()
        self.un_marshalled = Response(body, status=status, headers=headers)


179
@ns.route('/')
180
class UploadListResource(Resource):
181
    @api.doc('get_uploads')
182
    @api.response(400, 'Bad parameters')
183
    @api.marshal_with(upload_list_model, skip_none=True, code=200, description='Uploads send')
184
    @api.expect(upload_list_parser)
185
186
    @login_really_required
    def get(self):
187
        """ Get the list of all uploads from the authenticated user. """
188
        try:
189
            state = request.args.get('state', 'unpublished')
190
191
192
193
194
195
196
197
198
199
200
201
            name = request.args.get('name', None)
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
        except Exception:
            abort(400, message='bad parameter types')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

202
        query_kwargs = {}
203
204
205
        if state == 'published':
            query_kwargs.update(published=True)
        elif state == 'unpublished':
206
            query_kwargs.update(published=False)
207
208
209
210
211
        elif state == 'all':
            pass
        else:
            abort(400, message='bad state value %s' % state)

212
213
        if name is not None:
            query_kwargs.update(name=name)
214
215
216
217
218
219

        uploads = Upload.user_uploads(g.user, **query_kwargs)
        total = uploads.count()

        results = [
            upload
220
            for upload in uploads.order_by('-upload_time')[(page - 1) * per_page: page * per_page]]
221
222
223
224

        return dict(
            pagination=dict(total=total, page=page, per_page=per_page),
            results=results), 200
225

226
    @api.doc('upload')
227
    @api.expect(upload_metadata_parser)
228
    @api.response(400, 'To many uploads')
229
    @marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
Markus Scheidgen's avatar
Markus Scheidgen committed
230
    @login_really_required
231
232
    @with_logger
    def put(self, logger):
Markus Scheidgen's avatar
Markus Scheidgen committed
233
234
235
236
237
238
239
240
241
242
        """
        Upload a file and automatically create a new upload in the process.
        Can be used to upload files via browser or other http clients like curl.
        This will also start the processing of the upload.

        There are two basic ways to upload a file: multipart-formdata or simply streaming
        the file data. Both are supported. The later one does not allow to transfer a
        filename or other meta-data. If a filename is available, it will become the
        name of the upload.

243
        Example commands:
Markus Scheidgen's avatar
Markus Scheidgen committed
244

245
246
            curl -X put ".../nomad/api/uploads/" -F file=@local_file
            curl ".../nomad/api/uploads/" --upload-file local_file
247
248
249

        There is a general limit on how many unpublished uploads a user can have. Will
        return 400 if this limit is exceeded.
Markus Scheidgen's avatar
Markus Scheidgen committed
250
        """
251
        # check existence of local_path if local_path is used
252
        local_path = request.args.get('local_path')
253
254
255
256
        if local_path:
            if not os.path.exists(local_path):
                abort(404, message='The given local_path was not found.')

257
258
259
260
261
        # check the upload limit
        if not g.user.is_admin:
            if Upload.user_uploads(g.user, published=False).count() >= config.services.upload_limit:
                abort(400, 'Limit of unpublished uploads exceeded for user.')

262
        upload_name = request.args.get('name')
263
        upload_id = utils.create_uuid()
Markus Scheidgen's avatar
Markus Scheidgen committed
264

265
        logger = logger.bind(upload_id=upload_id, upload_name=upload_name)
266
        logger.info('upload created', )
Markus Scheidgen's avatar
Markus Scheidgen committed
267

268
269
270
        try:
            if local_path:
                # file is already there and does not to be received
271
                upload_path = local_path
272
            elif request.mimetype in ['multipart/form-data', 'application/multipart-formdata']:
273
                logger.info('receive upload as multipart formdata')
274
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
275
276
                # multipart formdata, e.g. with curl -X put "url" -F file=@local_file
                # might have performance issues for large files: https://github.com/pallets/flask/issues/2086
277
                if 'file' not in request.files:
278
279
                    abort(400, message='Bad multipart-formdata, there is no file part.')
                file = request.files['file']
280
281
                if upload_name is None or upload_name is '':
                    upload_name = file.filename
282

283
                file.save(upload_path)
284
            else:
285
                print(request.mimetype)
286
                # simple streaming data in HTTP body, e.g. with curl "url" -T local_file
287
                logger.info('started to receive upload streaming data')
288
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
289
290

                try:
291
                    with open(upload_path, 'wb') as f:
Markus Scheidgen's avatar
Markus Scheidgen committed
292
293
                        received_data = 0
                        received_last = 0
294
                        while True:
Markus Scheidgen's avatar
Markus Scheidgen committed
295
                            data = request.stream.read(io.DEFAULT_BUFFER_SIZE)
296
297
298
                            if len(data) == 0:
                                break

Markus Scheidgen's avatar
Markus Scheidgen committed
299
300
                            received_data += len(data)
                            received_last += len(data)
Markus Scheidgen's avatar
Markus Scheidgen committed
301
                            if received_last > 1e9:
Markus Scheidgen's avatar
Markus Scheidgen committed
302
303
                                received_last = 0
                                # TODO remove this logging or reduce it to debug
304
                                logger.info('received streaming data', size=received_data)
Markus Scheidgen's avatar
Markus Scheidgen committed
305
                            f.write(data)
306

307
308
                        print(received_data)

309
310
311
312
                except Exception as e:
                    logger.warning('Error on streaming upload', exc_info=e)
                    abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
        except Exception as e:
313
314
            if not local_path and os.path.isfile(upload_path):
                os.remove(upload_path)
315
316
            logger.info('Invalid or aborted upload')
            raise e
Markus Scheidgen's avatar
Markus Scheidgen committed
317
318

        logger.info('received uploaded file')
319
320
321
322
323

        upload = Upload.create(
            upload_id=upload_id,
            user=g.user,
            name=upload_name,
324
            upload_time=datetime.utcnow(),
325
326
327
            upload_path=upload_path,
            temporary=local_path != upload_path)

328
        upload.process_upload()
Markus Scheidgen's avatar
Markus Scheidgen committed
329
330
        logger.info('initiated processing')

331
332
333
334
335
336
337
338
        if bool(request.args.get('curl', False)):
            raise DisableMarshalling(
                '''
Thanks for uploading your data to nomad.
Go back to %s and press reload to see the progress on your upload and publish your data.

''' % upload.gui_url,
                200, {'Content-Type': 'text/plain; charset=utf-8'})
Markus Scheidgen's avatar
Markus Scheidgen committed
339
340

        return upload, 200
341

Markus Scheidgen's avatar
Markus Scheidgen committed
342

343
344
345
346
class ProxyUpload:
    def __init__(self, upload, calcs):
        self.upload = upload
        self.calcs = calcs
347

348
349
350
351
    def __getattr__(self, name):
        return self.upload.__getattribute__(name)


352
@upload_route(ns)
353
class UploadResource(Resource):
354
    @api.doc('get_upload')
355
    @api.response(404, 'Upload does not exist')
356
    @api.response(400, 'Invalid parameters')
357
    @api.marshal_with(upload_with_calcs_model, skip_none=True, code=200, description='Upload send')
358
    @api.expect(pagination_request_parser)
359
    @login_really_required
360
    def get(self, upload_id: str):
361
        """
362
363
364
365
        Get an update for an existing upload.

        Will not only return the upload, but also its calculations paginated.
        Use the pagination params to determine the page.
366
367
        """
        try:
368
            upload = Upload.get(upload_id)
369
370
371
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

372
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
373
374
375
376
377
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        try:
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
378
            order_by = request.args.get('order_by', None)
379
380
381
382
383
384
385
386
387
388
            order = int(str(request.args.get('order', -1)))
        except Exception:
            abort(400, message='invalid pagination or ordering')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

389
390
391
392
        if order_by is not None:
            order_by = str(order_by)
            if order_by not in ['mainfile', 'tasks_status', 'parser']:
                abort(400, message='invalid order_by field %s' % order_by)
393

394
            order_by = ('-%s' if order == -1 else '+%s') % order_by
395

396
        calcs = upload.all_calcs((page - 1) * per_page, page * per_page, order_by=order_by)
397
        failed_calcs = upload.failed_calcs
398
        result = ProxyUpload(upload, {
399
400
401
            'pagination': dict(
                total=upload.total_calcs, page=page, per_page=per_page,
                successes=upload.processed_calcs - failed_calcs, failures=failed_calcs),
402
403
            'results': [calc for calc in calcs]
        })
404
405
406

        return result, 200

407
    @api.doc('delete_upload')
408
    @api.response(404, 'Upload does not exist')
409
    @api.response(401, 'Upload does not belong to authenticated user.')
410
    @api.response(400, 'The upload is still/already processed')
411
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload deleted')
412
    @login_really_required
413
414
    @with_logger
    def delete(self, upload_id: str, logger):
415
        """
416
        Delete an existing upload.
417

418
        Only uploads that are sill in staging, not already deleted, not still uploaded, and
419
        not currently processed, can be deleted.
420
421
        """
        try:
422
            upload = Upload.get(upload_id)
423
424
425
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

426
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
427
            abort(401, message='Upload with id %s does not belong to you.' % upload_id)
428

429
430
431
        if upload.published:
            abort(400, message='The upload is already published')

432
        if upload.tasks_running:
433
            abort(400, message='The upload is not processed yet')
434

435
436
437
        try:
            upload.delete_upload()
        except ProcessAlreadyRunning:
438
            abort(400, message='The upload is still processed')
439
440
441
        except Exception as e:
            logger.error('could not delete processing upload', exc_info=e)
            raise e
442
443

        return upload, 200
444

445
    @api.doc('exec_upload_operation')
446
    @api.response(404, 'Upload does not exist or not in staging')
447
    @api.response(400, 'Operation is not supported or the upload is still/already processed')
448
449
450
    @api.response(401, 'If the operation is not allowed for the current user')
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload published successfully')
    @api.expect(upload_operation_model)
451
    @login_really_required
452
453
    def post(self, upload_id):
        """
454
        Execute an upload operation. Available operations are ``publish`` and ``re-process``
455

456
        Publish accepts further meta data that allows to provide coauthors, comments,
457
        external references, etc. See the model for details. The fields that start with
458
        ``_underscore`` are only available for users with administrative privileges.
459

460
        Publish changes the visibility of the upload. Clients can specify the visibility
461
        via meta data.
462
463
464
465

        Re-process will re-process the upload and produce updated repository metadata and
        archive. Only published uploads that are not processing at the moment are allowed.
        Only for uploads where calculations have been processed with an older nomad version.
466
        """
467
        try:
468
            upload = Upload.get(upload_id)
469
470
471
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

472
        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
473
            abort(404, message='Upload with id %s does not exist.' % upload_id)
474

475
476
477
        json_data = request.get_json()
        if json_data is None:
            json_data = {}
478

479
        operation = json_data.get('operation')
480

481
482
        metadata = json_data.get('metadata', {})
        for key in metadata:
483
484
            if key.startswith('_'):
                if not g.user.is_admin:
485
                    abort(401, message='Only admin users can use _metadata_keys.')
486
487
                break

488
        if operation == 'publish':
489
            if upload.tasks_running:
490
                abort(400, message='The upload is not processed yet')
491
            if upload.tasks_status == FAILURE:
492
                abort(400, message='Cannot publish an upload that failed processing')
493
494
            if upload.processed_calcs == 0:
                abort(400, message='Cannot publish an upload without calculations')
495
            try:
496
                upload.compress_and_set_metadata(metadata)
497
                upload.publish_upload()
498
499
            except ProcessAlreadyRunning:
                abort(400, message='The upload is still/already processed')
500

501
            return upload, 200
502
        elif operation == 're-process':
503
            if upload.tasks_running or upload.process_running or not upload.published:
504
505
506
507
508
509
510
511
                abort(400, message='Can only non processing, re-process published uploads')

            if len(metadata) > 0:
                abort(400, message='You can not provide metadata for re-processing')

            if len(upload.outdated_calcs) == 0:
                abort(400, message='You can only re-process uploads with at least one outdated calculation')

512
            upload.reset()
513
514
515
            upload.re_process_upload()

            return upload, 200
516

517
        abort(400, message='Unsupported operation %s.' % operation)
518
519
520
521


upload_command_model = api.model('UploadCommand', {
    'upload_url': fields.Url,
Markus Scheidgen's avatar
Markus Scheidgen committed
522
    'upload_command': fields.String,
523
    'upload_command_with_name': fields.String,
Markus Scheidgen's avatar
Markus Scheidgen committed
524
    'upload_progress_command': fields.String,
525
    'upload_command_form': fields.String,
Markus Scheidgen's avatar
Markus Scheidgen committed
526
    'upload_tar_command': fields.String
527
528
529
530
531
})


@ns.route('/command')
class UploadCommandResource(Resource):
532
    @api.doc('get_upload_command')
533
534
535
536
    @api.marshal_with(upload_command_model, code=200, description='Upload command send')
    @login_really_required
    def get(self):
        """ Get url and example command for shell based uploads. """
537
        upload_url = '%s/uploads/?curl=True' % config.api_url(ssl=False)
538
        upload_url_with_name = upload_url + '&name=<name>'
539

540
541
542
543
544
545
        # upload_command = 'curl -X PUT -H "X-Token: %s" "%s" -F file=@<local_file>' % (
        #     g.user.get_auth_token().decode('utf-8'), upload_url)

        # Upload via streaming data tends to work much easier, e.g. no mime type issues, etc.
        # It is also easier for the user to unterstand IMHO.
        upload_command = 'curl -H X-Token:%s %s -T <local_file>' % (
546
547
            g.user.get_auth_token().decode('utf-8'), upload_url)

548
549
550
551
552
553
        upload_command_form = 'curl -H X-Token:%s %s -X PUT -F file=@<local_file>' % (
            g.user.get_auth_token().decode('utf-8'), upload_url)

        upload_command_with_name = 'curl -H X-Token:%s "%s" -X PUT -T <local_file>' % (
            g.user.get_auth_token().decode('utf-8'), upload_url_with_name)

554
555
556
        upload_progress_command = upload_command + ' | xargs echo'
        upload_tar_command = 'tar -cf - <local_folder> | curl -# -H X-Token:%s %s -T - | xargs echo' % (
            g.user.get_auth_token().decode('utf-8'), upload_url)
557

558
559
560
        return dict(
            upload_url=upload_url,
            upload_command=upload_command,
561
            upload_command_with_name=upload_command_with_name,
562
            upload_progress_command=upload_progress_command,
563
            upload_command_form=upload_command_form,
564
            upload_tar_command=upload_tar_command), 200