diff --git a/nomad/api/__init__.py b/nomad/api/__init__.py index 488b659de445754b39d736a33a02bcc64b44ddc2..e42758024b8462efde23a509fd2ab736a772983f 100644 --- a/nomad/api/__init__.py +++ b/nomad/api/__init__.py @@ -29,7 +29,7 @@ There is a separate documentation for the API endpoints from a client perspectiv .. automodule:: nomad.api.admin """ from .app import app -from . import auth, admin, upload, repository, archive, raw, upload_v2 +from . import auth, admin, upload, repository, archive, raw @app.before_first_request diff --git a/nomad/api/app.py b/nomad/api/app.py index 855a8389b3b21faf03c782d37a90f973aeb176a9..0a16c3e0de999421e599edcb6a32445ffc9a49f8 100644 --- a/nomad/api/app.py +++ b/nomad/api/app.py @@ -33,9 +33,20 @@ app = Flask( static_folder=os.path.abspath(os.path.join(os.path.dirname(__file__), '../../docs/.build/html'))) """ The Flask app that serves all APIs. """ +app.config.setdefault('RESTPLUS_MASK_HEADER', False) +app.config.setdefault('RESTPLUS_MASK_SWAGGER', False) + CORS(app) -api = Api(app) +authorizations = { + 'HTTP Basic': { + 'type': 'basic' + } +} + +api = Api( + app, version='1.0', title='nomad@FAIRDI API', authorizations=authorizations, + description='Official API for nomad@FAIRDI services.') """ Provides the flask restful api instance """ diff --git a/nomad/api/auth.py b/nomad/api/auth.py index 4bb1b9017c07ad42511812e6fff70cad864c108c..8cffd551263da96f4308959ad9fbf9071ca0ae33 100644 --- a/nomad/api/auth.py +++ b/nomad/api/auth.py @@ -42,7 +42,7 @@ from flask_httpauth import HTTPBasicAuth from nomad import config from nomad.coe_repo import User -from .app import app, base_path +from .app import app, api, base_path app.config['SECRET_KEY'] = config.services.api_secret auth = HTTPBasicAuth() @@ -92,6 +92,8 @@ def login_really_required(func): A decorator for API endpoint implementations that forces user authentication on endpoints. """ + @api.response(401, 'Not Authorized') + @api.doc(security=['HTTP Basic']) @login_if_available def wrapper(*args, **kwargs): if g.user is None: diff --git a/nomad/api/upload.py b/nomad/api/upload.py index e7eabb4575d7177e0f1029a5c919bec9c9f22a5d..9144a6bffe8006fdd1185767e76f770eafed3532 100644 --- a/nomad/api/upload.py +++ b/nomad/api/upload.py @@ -12,154 +12,103 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime +""" +The upload API of the nomad@FAIRDI APIs. Provides endpoints to create uploads, upload +files, and retrieve the processing status of uploads. +""" from flask import g, request -from flask_restplus import Resource, abort +from flask_restplus import Resource, fields, abort +from datetime import datetime -from nomad.files import UploadFile -from nomad.processing import NotAllowedDuringProcessing, Upload +from nomad.processing import Upload as UploadProc +from nomad.processing import NotAllowedDuringProcessing from nomad.utils import get_logger +from nomad.files import UploadFile from .app import api, base_path from .auth import login_really_required -""" -The upload API of the nomad@FAIRDI APIs. Provides endpoints to create uploads, upload -files, and retrieve the processing status of uploads. -""" - -class UploadsRes(Resource): - """ Uploads """ +ns = api.namespace( + '%s/uploads' % base_path[1:] if base_path is not '' else 'uploads', + description='Uploading data and tracing uploaded data and its processing.') + + +proc_model = api.model('Processing', { + 'tasks': fields.List(fields.String), + 'current_task': fields.String, + 'status': fields.String, + 'completed': fields.Boolean, + 'errors': fields.List(fields.String), + 'warnings': fields.List(fields.String), + 'create_time': fields.DateTime(dt_format='iso8601'), + 'complete_time': fields.DateTime(dt_format='iso8601'), + '_async_status': fields.String(description='Only for debugging nomad') +}) + +upload_model = api.inherit('Upload', proc_model, { + 'name': fields.String( + description='The name of the upload. This can be provided during upload ' + 'using the name query parameter.'), + 'upload_id': fields.String( + description='The unique id for the upload. Its a random uuid and ' + 'and used within nomad as long as no upload_hash is available.'), + 'upload_hash': fields.String( + description='The unique upload hash. It is based on the uploaded content and ' + 'used within nomad to identify uploads.' + ), + 'additional_metadata': fields.Arbitrary, + 'upload_url': fields.String, + 'upload_command': fields.String, + 'local_path': fields.String, + 'upload_time': fields.DateTime(dt_format='iso8601'), +}) + +calc_model = api.inherit('Calculation', proc_model, { + 'archive_id': fields.String, + 'mainfile': fields.String, + 'upload_id': fields.String, + 'parser': fields.String +}) + +upload_with_calcs_model = api.inherit('UploadWithPaginatedCalculations', upload_model, { + 'processed_calcs': fields.Integer, + 'total_calcs': fields.Integer, + 'failed_calcs': fields.Integer, + 'pending_calcs': fields.Integer, + 'calcs': fields.Nested(model=api.model('PaginatedCalculations', { + 'pagination': fields.Nested(model=api.model('Pagination', { + 'total': fields.Integer, + 'successes': fields.Integer, + 'failures': fields.Integer, + 'page': fields.Integer, + 'per_page': fields.Integer, + })), + 'results': fields.List(fields.Nested(model=calc_model)) + })) +}) + +upload_operation_model = api.model('UploadOperation', { + 'operation': fields.String(description='Currently unstage is the only operation.') +}) + + +upload_metadata_parser = api.parser() +upload_metadata_parser.add_argument('name', type=str, help='An optional name for the upload.', location='args') +upload_metadata_parser.add_argument('local_path', type=str, help='Use a local file on the server.', location='args') + + +@ns.route('/') +class UploadList(Resource): + @api.marshal_list_with(upload_model, skip_none=True, code=200, description='Uploads send') @login_really_required def get(self): - """ - Get a list of current users uploads. - - .. :quickref: upload; Get a list of current users uploads. - - **Example request**: - - .. sourcecode:: http - - GET /nomad/api/uploads HTTP/1.1 - Accept: application/json - - **Example response**: - - .. sourcecode:: http - - HTTP/1.1 200 OK - Vary: Accept - Content-Type: application/json - - [ - { - "name": "examples_vasp_6.zip", - "upload_id": "5b89469e0d80d40008077dbc", - "presigned_url": "http://minio:9000/uploads/5b89469e0d80d40008077dbc?X-Amz-Algorithm=AWS4-...", - "create_time": "2018-08-31T13:46:06.781000", - "upload_time": "2018-08-31T13:46:07.531000", - "is_stale": false, - "completed": true, - "status": "SUCCESS", - "current_task": "cleanup", - "tasks": ["uploading", "extracting", "parse_all", "cleanup"] - "errors": [], - "warnings": [] - } - ] - - :resheader Content-Type: application/json - :status 200: uploads successfully provided - :returns: list of :class:`nomad.data.Upload` - """ - return [upload.json_dict for upload in Upload.user_uploads(g.user)], 200 - - @login_really_required - def post(self): - """ - Create a new upload. Creating an upload on its own wont do much, but provide - a *presigned* upload URL. PUT a file to this URL to do the actual upload and - initiate the processing. - - .. :quickref: upload; Create a new upload. - - **Example request**: - - .. sourcecode:: http - - POST /nomad/api/uploads HTTP/1.1 - Accept: application/json - Content-Type: application/json - - { - "name": "vasp_data.zip" - } - - **Example response**: - - .. sourcecode:: http - - HTTP/1.1 200 OK - Vary: Accept - Content-Type: application/json - - { - "name": "vasp_data.zip", - "upload_id": "5b89469e0d80d40008077dbc", - "presigned_url": "http://minio:9000/uploads/5b89469e0d80d40008077dbc?X-Amz-Algorithm=AWS4-...", - "create_time": "2018-08-31T13:46:06.781000", - "upload_time": "2018-08-31T13:46:07.531000", - "is_stale": false, - "completed": true, - "status": "SUCCESS", - "current_task": "cleanup", - "tasks": ["uploading", "extracting", "parse_all", "cleanup"] - "errors": [], - "warnings": [], - "calcs": [ - { - "current_task": "archiving", - "tasks": ["parsing", "normalizing", "archiving"] - "status": "SUCCESS", - "errors": [], - "warnings": [], - "parser": "parsers/vasp", - "mainfile": "Si.xml" - } - ] - } - - :jsonparam string name: An optional name for the upload. - :jsonparem string local_path: An optional path the a file that is already on the server. - In this case, uploading a file won't be possible, the local file is processed - immediatly as if it was uploaded. - :reqheader Content-Type: application/json - :resheader Content-Type: application/json - :status 200: upload successfully created - :returns: a new instance of :class:`nomad.data.Upload` - """ - json_data = request.get_json() - if json_data is None: - json_data = {} - - upload = Upload.create( - user=g.user, - name=json_data.get('name'), - local_path=json_data.get('local_path')) - - if upload.local_path is not None: - logger = get_logger( - __name__, endpoint='uploads', action='post', upload_id=upload.upload_id) - logger.info('file uploaded offline') - upload.upload_time = datetime.now() - upload.process() - logger.info('initiated processing') - - return upload.json_dict, 200 + """ Get the list of all uploads from the authenticated user. """ + return [upload for upload in UploadProc.user_uploads(g.user)], 200 + @api.marshal_list_with(upload_model, skip_none=True, code=200, description='Upload received') + @api.expect(upload_metadata_parser) @login_really_required def put(self): """ @@ -172,30 +121,26 @@ class UploadsRes(Resource): filename or other meta-data. If a filename is available, it will become the name of the upload. - .. :quickref: upload; Upload a file directly and create an upload. + Example commands: - **Curl examples for both approaches**: - - .. sourcecode:: sh - - curl -X put "/nomad/api/uploads/" -F file=@local_file - curl "/nomad/api/uploads/" --upload-file local_file - - :qparam name: an optional name for the upload - :status 200: upload successfully received. - :returns: the upload (see GET /uploads/<upload_id>) + curl -X put ".../nomad/api/uploads/" -F file=@local_file + curl ".../nomad/api/uploads/" --upload-file local_file """ + local_path = request.args.get('local_path') # create upload - upload = Upload.create( + upload = UploadProc.create( user=g.user, - name=request.args.get('name')) + name=request.args.get('name'), + local_path=local_path) logger = get_logger(__name__, endpoint='upload', action='put', upload_id=upload.upload_id) logger.info('upload created') - uploadFile = UploadFile(upload.upload_id) + uploadFile = UploadFile(upload.upload_id, local_path=local_path) - if request.mimetype == 'application/multipart-formdata': + if local_path: + pass + elif request.mimetype == 'application/multipart-formdata': # multipart formdata, e.g. with curl -X put "url" -F file=@local_file # might have performance issues for large files: https://github.com/pallets/flask/issues/2086 if 'file' in request.files: @@ -226,79 +171,40 @@ class UploadsRes(Resource): upload.process() logger.info('initiated processing') - return upload.json_dict, 200 + return upload, 200 + +class ProxyUpload: + def __init__(self, upload, calcs): + self.upload = upload + self.calcs = calcs -class UploadRes(Resource): - """ Uploads """ + def __getattr__(self, name): + return self.upload.__getattribute__(name) + + +pagination_parser = api.parser() +pagination_parser.add_argument('page', type=int, help='The page, starting with 1.', location='args') +pagination_parser.add_argument('per_page', type=int, help='Desired calcs per page.', location='args') +pagination_parser.add_argument('order_by', type=str, help='The field to sort the calcs by, use [status,mainfile].', location='args') + + +@ns.route('/<string:upload_id>') +@api.doc(params={'upload_id': 'The unique id for the requested upload.'}) +class Upload(Resource): + @api.response(404, 'Upload does not exist') + @api.marshal_with(upload_with_calcs_model, skip_none=True, code=200, description='Upload send') + @api.expect(pagination_parser) @login_really_required - def get(self, upload_id): + def get(self, upload_id: str): """ - Get an update on an existing upload. Will not only return the upload, but - also its calculations paginated. Use the pagination params to determine - the page. - - .. :quickref: upload; Get an update for an existing upload. - - **Example request**: - - .. sourcecode:: http - - GET /nomad/api/uploads/5b89469e0d80d40008077dbc HTTP/1.1 - Accept: application/json - - **Example response**: - - .. sourcecode:: http - - HTTP/1.1 200 OK - Vary: Accept - Content-Type: application/json - - { - "name": "vasp_data.zip", - "upload_id": "5b89469e0d80d40008077dbc", - "presigned_url": "http://minio:9000/uploads/5b89469e0d80d40008077dbc?X-Amz-Algorithm=AWS4-...", - "create_time": "2018-08-31T13:46:06.781000", - "upload_time": "2018-08-31T13:46:07.531000", - "is_stale": false, - "completed": true, - "status": "SUCCESS", - "current_task": "cleanup", - "tasks": ["uploading", "extracting", "parse_all", "cleanup"] - "errors": [], - "warnings": [], - "calcs": { - "pagination": { - "total": 1, - "page": 1, - "per_page": 25 - }, - "results": [ - { - "current_task": "archiving", - "tasks": ["parsing", "normalizing", "archiving"] - "status": "SUCCESS", - "errors": [], - "warnings": [], - "parser": "parsers/vasp", - "mainfile": "Si.xml" - } - ] - } - } - - :param string upload_id: the id for the upload - :qparam int page: the page starting with 1 - :qparam int per_page: desired calcs per page - :qparam str order_by: the field to sort the calcs by, use [status,mainfile] - :resheader Content-Type: application/json - :status 200: upload successfully updated and retrieved - :status 404: upload with id does not exist - :returns: the :class:`nomad.data.Upload` instance + Get an update for an existing upload. + + Will not only return the upload, but also its calculations paginated. + Use the pagination params to determine the page. """ try: - upload = Upload.get(upload_id) + upload = UploadProc.get(upload_id) except KeyError: abort(404, message='Upload with id %s does not exist.' % upload_id) @@ -326,174 +232,77 @@ class UploadRes(Resource): calcs = upload.all_calcs((page - 1) * per_page, page * per_page, order_by) failed_calcs = upload.failed_calcs - result = upload.json_dict - result['calcs'] = { + result = ProxyUpload(upload, { 'pagination': dict( total=upload.total_calcs, page=page, per_page=per_page, successes=upload.processed_calcs - failed_calcs, failures=failed_calcs), - 'results': [calc.json_dict for calc in calcs] - } + 'results': [calc for calc in calcs] + }) return result, 200 + @api.response(404, 'Upload does not exist') + @api.response(400, 'Not allowed during processing or when not in staging') + @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload deleted') @login_really_required - def post(self, upload_id): - """ - Move an upload out of the staging area. This changes the visibility of the upload. - Clients can specify, if the calcs should be restricted. - - .. :quickref: upload; Move an upload out of the staging area. - - **Example request**: - - .. sourcecode:: http - - POST /nomad/api/uploads HTTP/1.1 - Accept: application/json - Content-Type: application/json - - { - "operation": "unstage" - } - - - :param string upload_id: the upload id - :resheader Content-Type: application/json - :status 200: upload unstaged successfully - :status 404: upload could not be found - :status 400: if the operation is not supported - :returns: the upload record + def delete(self, upload_id: str): """ - try: - upload = Upload.get(upload_id) - except KeyError: - abort(404, message='Upload with id %s does not exist.' % upload_id) + Delete an existing upload. - if upload.user_id != str(g.user.user_id): - abort(404, message='Upload with id %s does not exist.' % upload_id) - - json_data = request.get_json() - if json_data is None: - json_data = {} - - operation = json_data.get('operation') - if operation == 'unstage': - upload.unstage() - return upload.json_dict, 200 - - abort(400, message='Unsuported operation %s.' % operation) - - @login_really_required - def delete(self, upload_id): - """ - Deletes an existing upload. Only ``is_ready`` or ``is_stale`` uploads + Only ``is_ready`` uploads can be deleted. Deleting an upload in processing is not allowed. - - .. :quickref: upload; Delete an existing upload. - - **Example request**: - - .. sourcecode:: http - - DELETE /nomad/api/uploads/5b89469e0d80d40008077dbc HTTP/1.1 - Accept: application/json - - :param string upload_id: the id for the upload - :resheader Content-Type: application/json - :status 200: upload successfully deleted - :status 400: upload cannot be deleted - :status 404: upload with id does not exist - :returns: the :class:`nomad.data.Upload` instance with the latest processing state """ try: - upload = Upload.get(upload_id) + upload = UploadProc.get(upload_id) except KeyError: abort(404, message='Upload with id %s does not exist.' % upload_id) if upload.user_id != str(g.user.user_id): abort(404, message='Upload with id %s does not exist.' % upload_id) + if not upload.in_staging: + abort(400, message='Operation not allowed, upload is not in staging.') + try: upload.delete() - return upload.json_dict, 200 + return upload, 200 except NotAllowedDuringProcessing: abort(400, message='You must not delete an upload during processing.') - -class UploadFileRes(Resource): - """ - Upload a file to an existing upload. Can be used to upload files via bowser - or other http clients like curl. This will start the processing of the upload. - - There are two basic ways to upload a file: multipart-formdata or simply streaming - the file data. Both are supported. The later one does not allow to transfer a - filename or other meta-data. If a filename is available, it will become the - name of the upload. - - .. :quickref: upload; Upload a file to an existing upload. - - **Curl examples for both approaches**: - - .. sourcecode:: sh - - curl -X put "/nomad/api/uploads/5b89469e0d80d40008077dbc/file" -F file=@local_file - curl "/nomad/api/uploads/5b89469e0d80d40008077dbc/file" --upload-file local_file - - :param string upload_id: the upload_id of the upload - :resheader Content-Type: application/json - :status 200: upload successfully received. - :status 404: upload with given id does not exist - :status 400: if the fileformat is not supported or the form data is different than expected. - :returns: the upload (see GET /uploads/<upload_id>) - """ + @api.response(404, 'Upload does not exist or is not allowed') + @api.response(400, 'Operation is not supported') + @api.marshal_with(upload_model, skip_none=True, code=200, description='Upload unstaged successfully') + @api.expect(upload_operation_model) @login_really_required - def put(self, upload_id): - logger = get_logger(__name__, endpoint='upload', action='put', upload_id=upload_id) + def post(self, upload_id): + """ + Execute an upload operation. Available operations: ``unstage`` + Untage changes the visibility of the upload. Clients can specify, if the calcs + should be restricted. + """ try: - upload = Upload.get(upload_id) + upload = UploadProc.get(upload_id) except KeyError: abort(404, message='Upload with id %s does not exist.' % upload_id) - if upload.upload_time is not None: - abort(400, message='A file was already uploaded to this uploade before.') + if upload.user_id != str(g.user.user_id): + abort(404, message='Upload with id %s does not exist.' % upload_id) - uploadFile = UploadFile(upload_id) + json_data = request.get_json() + if json_data is None: + json_data = {} - if request.mimetype == 'application/multipart-formdata': - # multipart formdata, e.g. with curl -X put "url" -F file=@local_file - # might have performance issues for large files: https://github.com/pallets/flask/issues/2086 - if 'file' in request.files: - abort(400, message='Bad multipart-formdata, there is no file part.') - file = request.files['file'] - if upload.name is '': - upload.name = file.filename + operation = json_data.get('operation') + if operation == 'unstage': + if not upload.in_staging: + abort(400, message='Operation not allowed, upload is not in staging.') - file.save(uploadFile.os_path) - else: - # simple streaming data in HTTP body, e.g. with curl "url" -T local_file try: - uploadFile.create_dirs() - with uploadFile.open('wb') as f: - while not request.stream.is_exhausted: - f.write(request.stream.read(1024)) + upload.unstage() + except NotAllowedDuringProcessing: + abort(400, message='You must not unstage an upload during processing.') - except Exception as e: - logger.error('Error on streaming upload', exc_info=e) - abort(400, message='Some IO went wrong, download probably aborted/disrupted.') - - if not uploadFile.is_valid: - uploadFile.delete() - abort(400, message='Bad file format, excpected %s.' % ", ".join(UploadFile.formats)) + return upload, 200 - logger.info('received uploaded file') - upload.upload_time = datetime.now() - upload.process() - logger.info('initiated processing') - - return upload.json_dict, 200 - - -api.add_resource(UploadsRes, '%s/uploads' % base_path) -api.add_resource(UploadRes, '%s/uploads/<string:upload_id>' % base_path) -api.add_resource(UploadFileRes, '%s/uploads/<string:upload_id>/file' % base_path) + abort(400, message='Unsuported operation %s.' % operation) diff --git a/nomad/processing/base.py b/nomad/processing/base.py index d00d782e90e8c4ddf894c13c2b5c7591ed6e7e9a..bc20b00b9f68e336ac271a826f45d6b3f50027d6 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -87,7 +87,6 @@ class Proc(Document, metaclass=ProcMetaclass): Processing state will be persistet at appropriate times and must not be persistet manually. All attributes are stored to mongodb. - The class allows to render into a JSON serializable dict via :attr:`json_dict`. Possible processing states are PENDING, RUNNING, FAILURE, and SUCCESS. @@ -254,22 +253,6 @@ class Proc(Document, metaclass=ProcMetaclass): time.sleep(interval) self.reload() - @property - def json_dict(self) -> dict: - """ A json serializable dictionary representation. """ - data = { - 'tasks': getattr(self.__class__, 'tasks'), - 'current_task': self.current_task, - 'status': self.status, - 'completed': self.completed, - 'errors': self.errors, - 'warnings': self.warnings, - 'create_time': self.create_time.isoformat() if self.create_time is not None else None, - 'complete_time': self.complete_time.isoformat() if self.complete_time is not None else None, - '_async_status': self._async_status - } - return {key: value for key, value in data.items() if value is not None} - class InvalidChordUsage(Exception): pass diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 31d5255588bf89ccea641f7b10d93b0dc260b217..048b44cdc869bfd5aba14b6ad589e471919abeb0 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -25,7 +25,6 @@ calculations, and files """ from typing import List, Any, ContextManager, Tuple, Generator -from datetime import datetime from elasticsearch.exceptions import NotFoundError from mongoengine import StringField, BooleanField, DateTimeField, DictField, IntField import logging @@ -154,18 +153,6 @@ class Calc(Proc): return wrap_logger(logger, processors=[save_to_calc_log]) - @property - def json_dict(self): - """ A json serializable dictionary representation. """ - data = { - 'archive_id': self.archive_id, - 'mainfile': self.mainfile, - 'upload_id': self.upload_id, - 'parser': self.parser - } - data.update(super().json_dict) - return {key: value for key, value in data.items() if value is not None} - @process def process(self): logger = self.get_logger() @@ -372,7 +359,7 @@ class Upload(Chord): def delete(self): logger = self.get_logger(task='delete') - if not (self.completed or self.is_stale or self.current_task == 'uploading'): + if not (self.completed or self.current_task == 'uploading'): raise NotAllowedDuringProcessing() with lnr(logger, 'delete upload file'): @@ -433,37 +420,17 @@ class Upload(Chord): return self - @property - def is_stale(self) -> bool: - if self.current_task == 'uploading' and self.upload_time is None: - return (datetime.now() - self.create_time).days > 1 - else: - return False - def unstage(self): self.get_logger().info('unstage') + + if not (self.completed or self.current_task == 'uploading'): + raise NotAllowedDuringProcessing() + self.in_staging = False RepoCalc.unstage(upload_id=self.upload_id) coe_repo.add_upload(self, restricted=False) # TODO allow users to choose restricted self.save() - @property - def json_dict(self) -> dict: - """ A json serializable dictionary representation. """ - data = { - 'name': self.name, - 'local_path': self.local_path, - 'additional_metadata': self.additional_metadata, - 'upload_id': self.upload_id, - 'upload_hash': self.upload_hash, - 'upload_url': self.upload_url, - 'upload_command': self.upload_command, - 'upload_time': self.upload_time.isoformat() if self.upload_time is not None else None, - 'is_stale': self.is_stale, - } - data.update(super().json_dict) - return {key: value for key, value in data.items() if value is not None} - @process def process(self): self.extracting() diff --git a/nomad/uploads.py b/nomad/uploads.py index 3f72ff527ebdc6178a188fd64f23352b62e37c1e..596839d9c0373df9d6504aab5d3368da6d7cd3fa 100644 --- a/nomad/uploads.py +++ b/nomad/uploads.py @@ -37,7 +37,7 @@ almost readonly (beside metadata) storage. """ from abc import ABCMeta -from typing import IO, Generator, Dict, Iterator, Iterable, Callable +from typing import IO, Generator, Dict, Iterator, Iterable from filelock import Timeout, FileLock import ujson import os.path diff --git a/nomad/utils.py b/nomad/utils.py index 9ab59408e3e96ad5140d53a7ff841b59e8ca71c4..056ad6d7f4475c1f21c79c5028b45792045e8cc6 100644 --- a/nomad/utils.py +++ b/nomad/utils.py @@ -52,6 +52,7 @@ from nomad import config default_hash_len = 28 """ Length of hashes and hash-based ids (e.g. calc, upload) in nomad. """ + def sanitize_logevent(event: str) -> str: """ Prepares a log event or message for analysis in elastic stack. It removes numbers, diff --git a/tests/conftest.py b/tests/conftest.py index 31ef00f3d6472f7e23c43c71dd8a61bd0c28e041..12d33ef6ee636dd7784b7966fd0131c373845e88 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -145,7 +145,7 @@ def mocksearch(monkeypatch): if upload_id in uploads_by_id: for calc in uploads_by_id[upload_id]: del(by_archive_id[calc.archive_id]) - upload_hash = next(uploads_by_id[upload_id]).upload_hash + upload_hash = uploads_by_id[upload_id][0].upload_hash del(uploads_by_id[upload_id]) del(uploads_by_hash[upload_hash]) diff --git a/tests/misc.http b/tests/misc.http index 7b4546f45b8aa49f81fe7019994197445e701cd8..ecc3a3db345302835c52e94f667d6d2060f9dcfb 100644 --- a/tests/misc.http +++ b/tests/misc.http @@ -8,3 +8,8 @@ content-type: application/json { "name": "aims-example-full" } +### + +GET http://localhost:8000/nomad/api/v2/uploads/ HTTP/1.1 +Authorization: Basic bGVvbmFyZC5ob2ZzdGFkdGVyQG5vbWFkLWZhaXJkaS50ZXN0cy5kZTo= +### diff --git a/tests/test_api.py b/tests/test_api.py index d86004aa8abce94bee24342f02f9b08a74a72604..3f749304e052e02dd48858866a9e204c10e4778b 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2,14 +2,13 @@ import pytest import time import json import zlib -import re import os.path from mongoengine import connect from mongoengine.connection import disconnect -from datetime import datetime, timedelta import base64 import zipfile import io +import datetime from nomad import config # for convinience we test the api without path prefix @@ -63,327 +62,251 @@ def test_other_user_auth(other_test_user): return create_auth_headers(other_test_user) -def assert_uploads(upload_json_str, count=0, **kwargs): - data = json.loads(upload_json_str) - assert isinstance(data, list) - assert len(data) == count +class TestAuth: + def test_xtoken_auth(self, client, test_user, no_warn): + rv = client.get('/uploads/', headers={ + 'X-Token': test_user.email + }) - if count > 0: - assert_upload(json.dumps(data[0]), **kwargs) - - -def assert_upload(upload_json_str, id=None, **kwargs): - data = json.loads(upload_json_str) - assert 'upload_id' in data - if id is not None: - assert id == data['upload_id'] - assert 'create_time' in data - assert 'upload_url' in data - assert 'upload_command' in data - - for key, value in kwargs.items(): - assert data.get(key, None) == value - - return data - - -def test_xtoken_auth(client, test_user, no_warn): - rv = client.get('/uploads', headers={ - 'X-Token': test_user.email - }) - - assert rv.status_code == 200 - - -def test_xtoken_auth_denied(client, no_warn): - rv = client.get('/uploads', headers={ - 'X-Token': 'invalid' - }) - - assert rv.status_code == 401 - - -def test_basic_auth(client, test_user_auth, no_warn): - rv = client.get('/uploads', headers=test_user_auth) - assert rv.status_code == 200 - - -def test_basic_auth_denied(client, no_warn): - basic_auth_base64 = base64.b64encode('invalid'.encode('utf-8')).decode('utf-8') - rv = client.get('/uploads', headers={ - 'Authorization': 'Basic %s' % basic_auth_base64 - }) - assert rv.status_code == 401 - - -def test_no_uploads(client, test_user_auth, no_warn): - rv = client.get('/uploads', headers=test_user_auth) - - assert rv.status_code == 200 - assert_uploads(rv.data, count=0) - - -def test_not_existing_upload(client, test_user_auth, no_warn): - rv = client.get('/uploads/123456789012123456789012', headers=test_user_auth) - assert rv.status_code == 404 - - -def test_stale_upload(client, test_user_auth): - rv = client.post( - '/uploads', - headers=test_user_auth, - data=json.dumps(dict(name='test_name')), - content_type='application/json') - assert rv.status_code == 200 - upload_id = assert_upload(rv.data)['upload_id'] - - upload = Upload.get(upload_id) - upload.create_time = datetime.now() - timedelta(days=2) - upload.save() - - rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth) - assert rv.status_code == 200 - assert_upload(rv.data, is_stale=True) - - -def test_create_upload(client, test_user_auth, no_warn): - rv = client.post('/uploads', headers=test_user_auth) - - assert rv.status_code == 200 - upload_id = assert_upload(rv.data)['upload_id'] - - rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth) - assert rv.status_code == 200 - assert_upload(rv.data, id=upload_id, is_stale=False) - - rv = client.get('/uploads', headers=test_user_auth) - assert rv.status_code == 200 - assert_uploads(rv.data, count=1, id=upload_id) - - -def test_create_upload_with_name(client, test_user_auth, no_warn): - rv = client.post( - '/uploads', headers=test_user_auth, - data=json.dumps(dict(name='test_name')), - content_type='application/json') - - assert rv.status_code == 200 - upload = assert_upload(rv.data) - assert upload['name'] == 'test_name' - - -def test_create_upload_with_local_path(client, test_user_auth, no_warn): - rv = client.post( - '/uploads', headers=test_user_auth, - data=json.dumps(dict(local_path='test_local_path')), - content_type='application/json') - - assert rv.status_code == 200 - upload = assert_upload(rv.data) - assert upload['local_path'] == 'test_local_path' - - -def test_delete_empty_upload(client, mocksearch, test_user_auth, no_warn): - rv = client.post('/uploads', headers=test_user_auth) - - assert rv.status_code == 200 - upload_id = assert_upload(rv.data)['upload_id'] - - rv = client.delete('/uploads/%s' % upload_id, headers=test_user_auth) - assert rv.status_code == 200 - - rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth) - assert rv.status_code == 404 + assert rv.status_code == 200 + def test_xtoken_auth_denied(self, client, no_warn): + rv = client.get('/uploads/', headers={ + 'X-Token': 'invalid' + }) -def assert_processing(client, test_user_auth, upload_id, repository_db): - upload_endpoint = '/uploads/%s' % upload_id + assert rv.status_code == 401 - while True: - time.sleep(0.1) + def test_basic_auth(self, client, test_user_auth, no_warn): + rv = client.get('/uploads/', headers=test_user_auth) + assert rv.status_code == 200 - rv = client.get(upload_endpoint, headers=test_user_auth) + def test_basic_auth_denied(self, client, no_warn): + basic_auth_base64 = base64.b64encode('invalid'.encode('utf-8')).decode('utf-8') + rv = client.get('/uploads/', headers={ + 'Authorization': 'Basic %s' % basic_auth_base64 + }) + assert rv.status_code == 401 + + +class TestUploads: + + @pytest.fixture(scope='function') + def proc_infra(self, repository_db, mocksearch, worker, no_warn): + return dict(repository_db=repository_db) + + def assert_uploads(self, upload_json_str, count=0, **kwargs): + data = json.loads(upload_json_str) + assert isinstance(data, list) + assert len(data) == count + + if count > 0: + self.assert_upload(json.dumps(data[0]), **kwargs) + + def assert_upload(self, upload_json_str, id=None, **kwargs): + data = json.loads(upload_json_str) + assert 'upload_id' in data + if id is not None: + assert id == data['upload_id'] + assert 'create_time' in data + assert 'upload_url' in data + assert 'upload_command' in data + + for key, value in kwargs.items(): + assert data.get(key, None) == value + + return data + + def assert_processing(self, client, test_user_auth, upload_id): + upload_endpoint = '/uploads/%s' % upload_id + + # poll until completed + while True: + time.sleep(0.1) + rv = client.get(upload_endpoint, headers=test_user_auth) + assert rv.status_code == 200 + upload = self.assert_upload(rv.data) + assert 'upload_time' in upload + if upload['completed']: + break + + assert len(upload['tasks']) == 4 + assert upload['status'] == 'SUCCESS' + assert upload['current_task'] == 'cleanup' + assert UploadFile(upload['upload_id'], upload.get('local_path')).exists() + calcs = upload['calcs']['results'] + for calc in calcs: + assert calc['status'] == 'SUCCESS' + assert calc['current_task'] == 'archiving' + assert len(calc['tasks']) == 3 + assert client.get('/logs/%s' % calc['archive_id']).status_code == 200 + + if upload['calcs']['pagination']['total'] > 1: + rv = client.get('%s?page=2&per_page=1&order_by=status' % upload_endpoint) + assert rv.status_code == 200 + upload = self.assert_upload(rv.data) + assert len(upload['calcs']['results']) == 1 + + def assert_unstage(self, client, test_user_auth, upload_id, proc_infra): + rv = client.post( + '/uploads/%s' % upload_id, + headers=test_user_auth, + data=json.dumps(dict(operation='unstage')), + content_type='application/json') assert rv.status_code == 200 - upload = assert_upload(rv.data) - assert 'upload_time' in upload - if upload['completed']: - break - - assert len(upload['tasks']) == 4 - assert upload['status'] == 'SUCCESS' - assert upload['current_task'] == 'cleanup' - assert UploadFile(upload['upload_id'], upload.get('local_path')).exists() - calcs = upload['calcs']['results'] - for calc in calcs: - assert calc['status'] == 'SUCCESS' - assert calc['current_task'] == 'archiving' - assert len(calc['tasks']) == 3 - assert client.get('/logs/%s' % calc['archive_id']).status_code == 200 - - empty_upload = upload['calcs']['pagination']['total'] == 0 - - if upload['calcs']['pagination']['total'] > 1: - rv = client.get('%s?page=2&per_page=1&order_by=status' % upload_endpoint) + rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth) assert rv.status_code == 200 - upload = assert_upload(rv.data) - assert len(upload['calcs']['results']) == 1 - - rv = client.post( - upload_endpoint, - headers=test_user_auth, - data=json.dumps(dict(operation='unstage')), - content_type='application/json') - assert rv.status_code == 200 - - rv = client.get('/uploads', headers=test_user_auth) - assert rv.status_code == 200 - assert_uploads(rv.data, count=0) - assert_coe_upload(upload['upload_hash'], repository_db, empty=empty_upload) - + upload = self.assert_upload(rv.data) + empty_upload = upload['calcs']['pagination']['total'] == 0 -@pytest.mark.parametrize('file', example_files) -@pytest.mark.parametrize('mode', ['multipart', 'stream']) -@pytest.mark.timeout(10) -def test_processing(client, file, mode, worker, mocksearch, test_user_auth, no_warn, repository_db): - rv = client.post('/uploads', headers=test_user_auth) - assert rv.status_code == 200 - upload = assert_upload(rv.data) - upload_id = upload['upload_id'] - - upload_cmd = upload['upload_command'] - headers = dict(Authorization='Basic %s' % re.search(r'.*Authorization: Basic ([^\s]+).*', upload_cmd).group(1)) - upload_endpoint = '/uploads/%s' % upload_id - upload_file_endpoint = '%s/file' % upload_endpoint - - upload_url = upload['upload_url'] - assert upload_url.endswith(upload_file_endpoint) - if mode == 'multipart': - rv = client.put( - upload_file_endpoint, - data=dict(file=(open(file, 'rb'), 'file')), - headers=headers) - elif mode == 'stream': - with open(file, 'rb') as f: - rv = client.put(upload_file_endpoint, data=f.read(), headers=headers) - else: - assert False - assert rv.status_code == 200 - upload = assert_upload(rv.data) - - assert_processing(client, test_user_auth, upload_id, repository_db) - - -@pytest.mark.parametrize('file', example_files) -@pytest.mark.timeout(10) -def test_processing_local_path(client, file, worker, mocksearch, test_user_auth, no_warn, repository_db): - rv = client.post( - '/uploads', headers=test_user_auth, - data=json.dumps(dict(local_path=file)), - content_type='application/json') - - assert rv.status_code == 200 - upload = assert_upload(rv.data) - upload_id = upload['upload_id'] - - assert_processing(client, test_user_auth, upload_id, repository_db) - - -@pytest.mark.parametrize('file', example_files) -@pytest.mark.parametrize('mode', ['multipart', 'stream']) -@pytest.mark.timeout(10) -def test_processing_upload(client, file, mode, worker, mocksearch, test_user_auth, no_warn, repository_db): - if mode == 'multipart': - rv = client.put( - '/uploads', - data=dict(file=(open(file, 'rb'), 'file')), - headers=test_user_auth) - elif mode == 'stream': - with open(file, 'rb') as f: - rv = client.put('/uploads', data=f.read(), headers=test_user_auth) - else: - assert False - assert rv.status_code == 200 - upload = assert_upload(rv.data) - upload_id = upload['upload_id'] + rv = client.get('/uploads/', headers=test_user_auth) + assert rv.status_code == 200 + self.assert_uploads(rv.data, count=0) + assert_coe_upload(upload['upload_hash'], proc_infra['repository_db'], empty=empty_upload) - assert_processing(client, test_user_auth, upload_id, repository_db) + def test_get_empty(self, client, test_user_auth, no_warn): + rv = client.get('/uploads/', headers=test_user_auth) + assert rv.status_code == 200 + self.assert_uploads(rv.data, count=0) -def test_repo_calc(client, example_elastic_calc, no_warn): - rv = client.get( - '/repo/%s/%s' % (example_elastic_calc.upload_hash, example_elastic_calc.calc_hash)) - assert rv.status_code == 200 + def test_get_not_existing(self, client, test_user_auth, no_warn): + rv = client.get('/uploads/123456789012123456789012', headers=test_user_auth) + assert rv.status_code == 404 + @pytest.mark.parametrize('file', example_files) + @pytest.mark.parametrize('mode', ['multipart', 'stream', 'local_path']) + @pytest.mark.parametrize('name', [None, 'test_name']) + def test_put(self, client, test_user_auth, proc_infra, file, mode, name): + if name: + url = '/uploads/?name=%s' % name + else: + url = '/uploads/' + + if mode == 'multipart': + rv = client.put( + url, data=dict(file=(open(file, 'rb'), 'file')), headers=test_user_auth) + elif mode == 'stream': + with open(file, 'rb') as f: + rv = client.put(url, data=f.read(), headers=test_user_auth) + elif mode == 'local_path': + url += '&' if name else '?' + url += 'local_path=%s' % file + rv = client.put(url, headers=test_user_auth) + else: + assert False -def test_non_existing_repo_cals(client, no_warn): - rv = client.get('/repo/doesnt/exist') - assert rv.status_code == 404 + assert rv.status_code == 200 + if mode == 'local_path': + upload = self.assert_upload(rv.data, local_path=file, name=name) + else: + upload = self.assert_upload(rv.data, name=name) + self.assert_processing(client, test_user_auth, upload['upload_id']) -def test_repo_calcs(client, example_elastic_calc, no_warn): - rv = client.get('/repo') - assert rv.status_code == 200 - data = json.loads(rv.data) - results = data.get('results', None) - assert results is not None - assert isinstance(results, list) - assert len(results) >= 1 + def test_delete_not_existing(self, client, test_user_auth, no_warn): + rv = client.delete('/uploads/123456789012123456789012', headers=test_user_auth) + assert rv.status_code == 404 + def test_delete_during_processing(self, client, test_user_auth, proc_infra): + rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth) + upload = self.assert_upload(rv.data) + rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth) + assert rv.status_code == 400 + self.assert_processing(client, test_user_auth, upload['upload_id']) + + def test_delete_unstaged(self, client, test_user_auth, proc_infra): + rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth) + upload = self.assert_upload(rv.data) + self.assert_processing(client, test_user_auth, upload['upload_id']) + self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra) + rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth) + assert rv.status_code == 400 + + def test_delete(self, client, test_user_auth, proc_infra): + rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth) + upload = self.assert_upload(rv.data) + self.assert_processing(client, test_user_auth, upload['upload_id']) + rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth) + assert rv.status_code == 200 -def test_repo_calcs_pagination(client, example_elastic_calc, no_warn): - rv = client.get('/repo?page=1&per_page=1') - assert rv.status_code == 200 - data = json.loads(rv.data) - results = data.get('results', None) - assert results is not None - assert isinstance(results, list) - assert len(results) == 1 + @pytest.mark.parametrize('example_file', example_files) + def test_post(self, client, test_user_auth, example_file, proc_infra): + rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth) + upload = self.assert_upload(rv.data) + self.assert_processing(client, test_user_auth, upload['upload_id']) + self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra) -def test_repo_calcs_user(client, example_elastic_calc, test_user_auth, no_warn): - rv = client.get('/repo?owner=user', headers=test_user_auth) - assert rv.status_code == 200 - data = json.loads(rv.data) - results = data.get('results', None) - assert results is not None - assert len(results) >= 1 +class TestRepo: + def test_calc(self, client, example_elastic_calc, no_warn): + rv = client.get( + '/repo/%s/%s' % (example_elastic_calc.upload_hash, example_elastic_calc.calc_hash)) + assert rv.status_code == 200 + def test_non_existing_calcs(self, client): + rv = client.get('/repo/doesnt/exist') + assert rv.status_code == 404 -def test_repo_calcs_user_authrequired(client, example_elastic_calc, no_warn): - rv = client.get('/repo?owner=user') - assert rv.status_code == 401 + def test_calcs(self, client, example_elastic_calc, no_warn): + rv = client.get('/repo') + assert rv.status_code == 200 + data = json.loads(rv.data) + results = data.get('results', None) + assert results is not None + assert isinstance(results, list) + assert len(results) >= 1 + def test_calcs_pagination(self, client, example_elastic_calc, no_warn): + rv = client.get('/repo?page=1&per_page=1') + assert rv.status_code == 200 + data = json.loads(rv.data) + results = data.get('results', None) + assert results is not None + assert isinstance(results, list) + assert len(results) == 1 -def test_repo_calcs_user_invisible(client, example_elastic_calc, test_other_user_auth, no_warn): - rv = client.get('/repo?owner=user', headers=test_other_user_auth) - assert rv.status_code == 200 - data = json.loads(rv.data) - results = data.get('results', None) - assert results is not None - assert len(results) == 0 + def test_calcs_user(self, client, example_elastic_calc, test_user_auth, no_warn): + rv = client.get('/repo?owner=user', headers=test_user_auth) + assert rv.status_code == 200 + data = json.loads(rv.data) + results = data.get('results', None) + assert results is not None + assert len(results) >= 1 + def test_calcs_user_authrequired(self, client, example_elastic_calc, no_warn): + rv = client.get('/repo?owner=user') + assert rv.status_code == 401 -def test_get_archive(client, archive, no_warn): - rv = client.get('/archive/%s' % archive.object_id) + def test_calcs_user_invisible(self, client, example_elastic_calc, test_other_user_auth, no_warn): + rv = client.get('/repo?owner=user', headers=test_other_user_auth) + assert rv.status_code == 200 + data = json.loads(rv.data) + results = data.get('results', None) + assert results is not None + assert len(results) == 0 - if rv.headers.get('Content-Encoding') == 'gzip': - json.loads(zlib.decompress(rv.data, 16 + zlib.MAX_WBITS)) - else: - json.loads(rv.data) - assert rv.status_code == 200 +class TestArchive: + def test_get(self, client, archive, no_warn): + rv = client.get('/archive/%s' % archive.object_id) + if rv.headers.get('Content-Encoding') == 'gzip': + json.loads(zlib.decompress(rv.data, 16 + zlib.MAX_WBITS)) + else: + json.loads(rv.data) -def test_get_calc_proc_log(client, archive_log, no_warn): - rv = client.get('/logs/%s' % archive_log.object_id) + assert rv.status_code == 200 - assert len(rv.data) > 0 - assert rv.status_code == 200 + def test_get_calc_proc_log(self, client, archive_log, no_warn): + rv = client.get('/logs/%s' % archive_log.object_id) + assert len(rv.data) > 0 + assert rv.status_code == 200 -def test_get_non_existing_archive(client, no_warn): - rv = client.get('/archive/%s' % 'doesnt/exist') - assert rv.status_code == 404 + def test_get_non_existing_archive(self, client, no_warn): + rv = client.get('/archive/%s' % 'doesnt/exist') + assert rv.status_code == 404 def test_docs(client): @@ -396,7 +319,7 @@ class TestRaw: @pytest.fixture def example_upload_hash(self, mockmongo, no_warn): upload = Upload(id='test_upload_id', local_path=os.path.abspath(example_file)) - upload.create_time = datetime.now() + upload.create_time = datetime.datetime.now() upload.user_id = 'does@not.exist' upload.save()