Skip to content
Snippets Groups Projects
upload.py 21.68 KiB
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The upload API of the nomad@FAIRDI APIs. Provides endpoints to upload files and
get the processing status of uploads.
"""

from flask import g, request, Response
from flask_restplus import Resource, fields, abort
from datetime import datetime
from werkzeug.datastructures import FileStorage
import os.path
import os
import io
from functools import wraps

from nomad import config, utils, files, search, datamodel
from nomad.processing import Upload, FAILURE
from nomad.processing import ProcessAlreadyRunning
from nomad.app import common
from nomad.app.common import RFC3339DateTime

from .api import api
from .auth import authenticate, generate_upload_token
from .common import pagination_request_parser, pagination_model, upload_route, metadata_model


ns = api.namespace(
    'uploads',
    description='Uploading data and tracing uploaded data and its processing.')


class CalcMetadata(fields.Raw):
    def format(self, value):
        calc_with_metadata = datamodel.CalcWithMetadata(**value)
        return search.Entry.from_calc_with_metadata(calc_with_metadata).to_dict()


proc_model = api.model('Processing', {
    'tasks': fields.List(fields.String),
    'current_task': fields.String,
    'tasks_running': fields.Boolean,
    'tasks_status': fields.String,
    'errors': fields.List(fields.String),
    'warnings': fields.List(fields.String),
    'create_time': RFC3339DateTime,
    'complete_time': RFC3339DateTime,
    'current_process': fields.String,
    'process_running': fields.Boolean,
})

calc_metadata_model = api.inherit('CalcMetaData', metadata_model, {
    'mainfile': fields.String(description='The calculation main output file is used to identify the calculation in the upload.'),
    '_pid': fields.Integer(description='Assign a specific pid. It must be unique.'),
    'external_id': fields.String(description='External user provided id. Does not have to be unique necessarily.')
})

upload_metadata_model = api.inherit('UploadMetaData', metadata_model, {
    'embargo_length': fields.Integer(description='Length of the requested embargo in months.'),
    'calculations': fields.List(fields.Nested(model=calc_metadata_model, skip_none=True), description='Specific per calculation data that will override the upload data.')
})

upload_model = api.inherit('UploadProcessing', proc_model, {
    'name': fields.String(
        description='The name of the upload. This can be provided during upload '
                    'using the name query parameter.'),
    'upload_id': fields.String(
        description='The unique id for the upload.'),
    # TODO just removed during migration, where this get particularily large
    # 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.', skip_none=True),
    'upload_path': fields.String(description='The uploaded file on the server'),
    'published': fields.Boolean(description='If this upload is already published'),
    'upload_time': RFC3339DateTime(),
})

upload_list_model = api.model('UploadList', {
    'pagination': fields.Nested(model=pagination_model),
    'results': fields.List(fields.Nested(model=upload_model, skip_none=True))
})

calc_model = api.inherit('UploadCalculationProcessing', proc_model, {
    'calc_id': fields.String,
    'mainfile': fields.String,
    'upload_id': fields.String,
    'parser': fields.String,
    'metadata': CalcMetadata(description='The repository metadata for this entry.')
})

upload_with_calcs_model = api.inherit('UploadWithPaginatedCalculations', upload_model, {
    'processed_calcs': fields.Integer,
    'total_calcs': fields.Integer,
    'failed_calcs': fields.Integer,
    'pending_calcs': fields.Integer,
    'calcs': fields.Nested(model=api.model('UploadPaginatedCalculations', {
        'pagination': fields.Nested(model=api.inherit('UploadCalculationPagination', pagination_model, {
            'successes': fields.Integer,
            'failures': fields.Integer,
        })),
        'results': fields.List(fields.Nested(model=calc_model, skip_none=True))
    }), skip_none=True)
})

upload_operation_model = api.model('UploadOperation', {
    'operation': fields.String(description='Currently publish is the only operation.'),
    'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data. Will replace previously given metadata.')
})


upload_metadata_parser = api.parser()
upload_metadata_parser.add_argument('name', type=str, help='An optional name for the upload.', location='args')
upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args')
upload_metadata_parser.add_argument('token', type=str, help='Upload token to authenticate with curl command.', location='args')
upload_metadata_parser.add_argument('file', type=FileStorage, help='The file to upload.', location='files')

upload_list_parser = pagination_request_parser.copy()
upload_list_parser.add_argument('state', type=str, help='List uploads with given state: all, unpublished, published.', location='args')
upload_list_parser.add_argument('name', type=str, help='Filter for uploads with the given name.', location='args')


def disable_marshalling(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except DisableMarshalling as e:
            return e.un_marshalled

    return wrapper


def marshal_with(*args, **kwargs):
    """
    A special version of the RESTPlus marshal_with decorator that allows to disable
    marshalling at runtime by raising DisableMarshalling.
    """
    def decorator(func):
        @api.marshal_with(*args, **kwargs)
        def with_marshalling(*args, **kwargs):
            return func(*args, **kwargs)

        @wraps(with_marshalling)
        def wrapper(*args, **kwargs):
            try:
                return with_marshalling(*args, **kwargs)
            except DisableMarshalling as e:
                return e.un_marshalled

        return wrapper
    return decorator


class DisableMarshalling(Exception):
    def __init__(self, body, status, headers):
        super().__init__()
        self.un_marshalled = Response(body, status=status, headers=headers)


@ns.route('/')
class UploadListResource(Resource):
    @api.doc('get_uploads')
    @api.response(400, 'Bad parameters')
    @api.marshal_with(upload_list_model, skip_none=True, code=200, description='Uploads send')
    @api.expect(upload_list_parser)
    @authenticate(required=True)
    def get(self):
        """ Get the list of all uploads from the authenticated user. """
        try:
            state = request.args.get('state', 'unpublished')
            name = request.args.get('name', None)
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
        except Exception:
            abort(400, message='bad parameter types')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

        query_kwargs = {}
        if state == 'published':
            query_kwargs.update(published=True)
        elif state == 'unpublished':
            query_kwargs.update(published=False)
        elif state == 'all':
            pass
        else:
            abort(400, message='bad state value %s' % state)

        if name is not None:
            query_kwargs.update(name=name)

        uploads = Upload.user_uploads(g.user, **query_kwargs)
        total = uploads.count()

        results = [
            upload
            for upload in uploads.order_by('published', '-upload_time')[(page - 1) * per_page: page * per_page]]

        return dict(
            pagination=dict(total=total, page=page, per_page=per_page),
            results=results), 200

    @api.doc('upload')
    @api.expect(upload_metadata_parser)
    @api.response(400, 'To many uploads')
    @marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
    @authenticate(required=True, upload_token=True)
    def put(self):
        """
        Upload a file and automatically create a new upload in the process.
        Can be used to upload files via browser or other http clients like curl.
        This will also start the processing of the upload.

        There are two basic ways to upload a file: multipart-formdata or simply streaming
        the file data. Both are supported. The later one does not allow to transfer a
        filename or other meta-data. If a filename is available, it will become the
        name of the upload.

        Example commands:

            curl -X put ".../nomad/api/uploads/" -F file=@local_file
            curl ".../nomad/api/uploads/" --upload-file local_file

        There is a general limit on how many unpublished uploads a user can have. Will
        return 400 if this limit is exceeded.
        """
        # check existence of local_path if local_path is used
        local_path = request.args.get('local_path')
        if local_path:
            if not os.path.exists(local_path):
                abort(404, message='The given local_path was not found.')

        # check the upload limit
        if not g.user.is_admin:
            if Upload.user_uploads(g.user, published=False).count() >= config.services.upload_limit:
                abort(400, 'Limit of unpublished uploads exceeded for user.')

        upload_name = request.args.get('name')
        upload_id = utils.create_uuid()

        logger = common.logger.bind(upload_id=upload_id, upload_name=upload_name)
        logger.info('upload created', )

        try:
            if local_path:
                # file is already there and does not to be received
                upload_path = local_path
            elif request.mimetype in ['multipart/form-data', 'application/multipart-formdata']:
                logger.info('receive upload as multipart formdata')
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
                # multipart formdata, e.g. with curl -X put "url" -F file=@local_file
                # might have performance issues for large files: https://github.com/pallets/flask/issues/2086
                if 'file' not in request.files:
                    abort(400, message='Bad multipart-formdata, there is no file part.')
                file = request.files['file']
                if upload_name is None or upload_name is '':
                    upload_name = file.filename

                file.save(upload_path)
            else:
                # simple streaming data in HTTP body, e.g. with curl "url" -T local_file
                logger.info('started to receive upload streaming data')
                upload_path = files.PathObject(config.fs.tmp, upload_id).os_path

                try:
                    with open(upload_path, 'wb') as f:
                        received_data = 0
                        received_last = 0
                        while True:
                            data = request.stream.read(io.DEFAULT_BUFFER_SIZE)
                            if len(data) == 0:
                                break

                            received_data += len(data)
                            received_last += len(data)
                            if received_last > 1e9:
                                received_last = 0
                                # TODO remove this logging or reduce it to debug
                                logger.info('received streaming data', size=received_data)
                            f.write(data)

                except Exception as e:
                    logger.warning('Error on streaming upload', exc_info=e)
                    abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
        except Exception as e:
            if not local_path and os.path.isfile(upload_path):
                os.remove(upload_path)
            logger.info('Invalid or aborted upload')
            raise e

        logger.info('received uploaded file')

        upload = Upload.create(
            upload_id=upload_id,
            user=g.user,
            name=upload_name,
            upload_time=datetime.utcnow(),
            upload_path=upload_path,
            temporary=local_path != upload_path)

        upload.process_upload()
        logger.info('initiated processing')

        if bool(request.args.get('token', False)):
            raise DisableMarshalling(
                '''
Thanks for uploading your data to nomad.
Go back to %s and press reload to see the progress on your upload and publish your data.

''' % config.gui_url(),
                200, {'Content-Type': 'text/plain; charset=utf-8'})

        return upload, 200


class ProxyUpload:
    def __init__(self, upload, calcs):
        self.upload = upload
        self.calcs = calcs

    def __getattr__(self, name):
        return self.upload.__getattribute__(name)


@upload_route(ns)
class UploadResource(Resource):
    @api.doc('get_upload')
    @api.response(404, 'Upload does not exist')
    @api.response(400, 'Invalid parameters')
    @api.marshal_with(upload_with_calcs_model, skip_none=True, code=200, description='Upload send')
    @api.expect(pagination_request_parser)
    @authenticate(required=True)
    def get(self, upload_id: str):
        """
        Get an update for an existing upload.

        Will not only return the upload, but also its calculations paginated.
        Use the pagination params to determine the page.
        """
        try:
            upload = Upload.get(upload_id)
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        try:
            page = int(request.args.get('page', 1))
            per_page = int(request.args.get('per_page', 10))
            order_by = request.args.get('order_by', None)
            order = int(str(request.args.get('order', -1)))
        except Exception:
            abort(400, message='invalid pagination or ordering')

        try:
            assert page >= 1
            assert per_page > 0
        except AssertionError:
            abort(400, message='invalid pagination')

        if order_by is not None:
            order_by = str(order_by)
            if order_by not in ['mainfile', 'tasks_status', 'parser']:
                abort(400, message='invalid order_by field %s' % order_by)

            order_by = ('-%s' if order == -1 else '+%s') % order_by

        calcs = upload.all_calcs((page - 1) * per_page, page * per_page, order_by=order_by)
        failed_calcs = upload.failed_calcs
        result = ProxyUpload(upload, {
            'pagination': dict(
                total=upload.total_calcs, page=page, per_page=per_page,
                successes=upload.processed_calcs - failed_calcs, failures=failed_calcs),
            'results': [calc for calc in calcs]
        })

        return result, 200

    @api.doc('delete_upload')
    @api.response(404, 'Upload does not exist')
    @api.response(401, 'Upload does not belong to authenticated user.')
    @api.response(400, 'The upload is still/already processed')
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload deleted')
    @authenticate(required=True)
    def delete(self, upload_id: str):
        """
        Delete an existing upload.

        Only uploads that are sill in staging, not already deleted, not still uploaded, and
        not currently processed, can be deleted.
        """
        try:
            upload = Upload.get(upload_id)
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
            abort(401, message='Upload with id %s does not belong to you.' % upload_id)

        if upload.published:
            abort(400, message='The upload is already published')

        if upload.tasks_running:
            abort(400, message='The upload is not processed yet')

        try:
            upload.delete_upload()
        except ProcessAlreadyRunning:
            abort(400, message='The upload is still processed')
        except Exception as e:
            common.logger.error('could not delete processing upload', exc_info=e)
            raise e

        return upload, 200

    @api.doc('exec_upload_operation')
    @api.response(404, 'Upload does not exist or not in staging')
    @api.response(400, 'Operation is not supported or the upload is still/already processed')
    @api.response(401, 'If the operation is not allowed for the current user')
    @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload published successfully')
    @api.expect(upload_operation_model)
    @authenticate(required=True)
    def post(self, upload_id):
        """
        Execute an upload operation. Available operations are ``publish`` and ``re-process``

        Publish accepts further meta data that allows to provide coauthors, comments,
        external references, etc. See the model for details. The fields that start with
        ``_underscore`` are only available for users with administrative privileges.

        Publish changes the visibility of the upload. Clients can specify the visibility
        via meta data.

        Re-process will re-process the upload and produce updated repository metadata and
        archive. Only published uploads that are not processing at the moment are allowed.
        Only for uploads where calculations have been processed with an older nomad version.
        """
        try:
            upload = Upload.get(upload_id)
        except KeyError:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
            abort(404, message='Upload with id %s does not exist.' % upload_id)

        json_data = request.get_json()
        if json_data is None:
            json_data = {}

        operation = json_data.get('operation')

        metadata = json_data.get('metadata', {})
        for key in metadata:
            if key.startswith('_'):
                if not g.user.is_admin:
                    abort(401, message='Only admin users can use _metadata_keys.')
                break

        if operation == 'publish':
            if upload.tasks_running:
                abort(400, message='The upload is not processed yet')
            if upload.tasks_status == FAILURE:
                abort(400, message='Cannot publish an upload that failed processing')
            if upload.processed_calcs == 0:
                abort(400, message='Cannot publish an upload without calculations')
            try:
                upload.compress_and_set_metadata(metadata)
                upload.publish_upload()
            except ProcessAlreadyRunning:
                abort(400, message='The upload is still/already processed')

            return upload, 200
        elif operation == 're-process':
            if upload.tasks_running or upload.process_running or not upload.published:
                abort(400, message='Can only non processing, re-process published uploads')

            if len(metadata) > 0:
                abort(400, message='You can not provide metadata for re-processing')

            if len(upload.outdated_calcs) == 0:
                abort(400, message='You can only re-process uploads with at least one outdated calculation')

            upload.reset()
            upload.re_process_upload()

            return upload, 200

        abort(400, message='Unsupported operation %s.' % operation)


upload_command_model = api.model('UploadCommand', {
    'upload_url': fields.Url,
    'upload_command': fields.String,
    'upload_command_with_name': fields.String,
    'upload_progress_command': fields.String,
    'upload_command_form': fields.String,
    'upload_tar_command': fields.String
})


@ns.route('/command')
class UploadCommandResource(Resource):
    @api.doc('get_upload_command')
    @api.marshal_with(upload_command_model, code=200, description='Upload command send')
    @authenticate(required=True)
    def get(self):
        """ Get url and example command for shell based uploads. """
        token = generate_upload_token(g.user)
        upload_url = '%s/uploads/?token=%s' % (config.api_url(ssl=False), token)
        upload_url_with_name = upload_url + '&name=<name>'

        # upload_command = 'curl -X PUT "%s" -F file=@<local_file>' % upload_url

        # Upload via streaming data tends to work much easier, e.g. no mime type issues, etc.
        # It is also easier for the user to unterstand IMHO.
        upload_command = 'curl %s -T <local_file>' % upload_url

        upload_command_form = 'curl %s -X PUT -F file=@<local_file>' % upload_url

        upload_command_with_name = 'curl "%s" -X PUT -T <local_file>' % upload_url_with_name

        upload_progress_command = upload_command + ' | xargs echo'
        upload_tar_command = 'tar -cf - <local_folder> | curl -# -H %s -T - | xargs echo' % upload_url

        return dict(
            upload_url=upload_url,
            upload_command=upload_command,
            upload_command_with_name=upload_command_with_name,
            upload_progress_command=upload_progress_command,
            upload_command_form=upload_command_form,
            upload_tar_command=upload_tar_command), 200