Commit 2c57208e authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Continued massive refactoring replacing files with new files(uploads) module.

parent 4bdfd113
......@@ -27,7 +27,6 @@ ns = api.namespace('admin', description='Administrative operations')
@ns.route('/<string:operation>')
@api.doc(params={'operation': 'The operation to perform.'})
class AdminOperationsResource(Resource):
# TODO in production this requires authorization
@api.doc('exec_admin_command')
@api.response(200, 'Operation performed')
@api.response(404, 'Operation does not exist')
......
......@@ -22,8 +22,9 @@ from flask_cors import CORS
from werkzeug.exceptions import HTTPException
from werkzeug.wsgi import DispatcherMiddleware
import os.path
import inspect
from nomad import config
from nomad import config, utils
base_path = config.services.api_base_path
""" Provides the root path of the nomad APIs. """
......@@ -59,11 +60,12 @@ api = Api(
""" Provides the flask restplust api instance """
@app.errorhandler(HTTPException)
def handle(error):
@app.errorhandler(Exception)
@api.errorhandler(Exception)
def handle(error: Exception):
status_code = getattr(error, 'code', 500)
name = getattr(error, 'name', 'Internal Server Error')
description = getattr(error, 'description', None)
description = getattr(error, 'description', 'No description available')
data = dict(
code=status_code,
name=name,
......@@ -71,4 +73,40 @@ def handle(error):
data.update(getattr(error, 'data', []))
response = jsonify(data)
response.status_code = status_code
if status_code == 500:
utils.get_logger(__name__).error('internal server error', exc_info=error)
return response
def with_logger(func):
"""
Decorator for endpoint implementations that provides a pre configured logger and
automatically logs errors on all 500 responses.
"""
signature = inspect.signature(func)
has_logger = 'logger' in signature.parameters
wrapper_signature = signature.replace(parameters=tuple(
param for param in signature.parameters.values()
if param.name != 'logger'
))
def wrapper(*args, **kwargs):
if has_logger:
args = inspect.getcallargs(wrapper, *args, **kwargs)
logger_args = {
k: v for k, v in args.items()
if k in ['upload_id', 'upload_hash', 'calc_hash']}
logger = utils.get_logger(__name__, **logger_args)
args.update(logger=logger)
try:
return func(**args)
except HTTPException as e:
if getattr(e, 'code', None) == 500:
logger.error('Internal server error', exc_info=e)
raise e
except Exception as e:
logger.error('Internal server error', exc_info=e)
raise e
wrapper.__signature__ = wrapper_signature
return wrapper
......@@ -16,8 +16,6 @@
The raw API of the nomad@FAIRDI APIs. Can be used to retrieve raw calculation files.
"""
# TODO implement restrictions based on user, permissions, and upload/calc metadata
import os.path
from zipfile import ZIP_DEFLATED, ZIP_STORED
......@@ -33,12 +31,6 @@ from .auth import login_if_available, create_authorization_predicate
ns = api.namespace('raw', description='Downloading raw data files.')
def fix_file_paths(path):
""" Removed the leading data from file paths that where given in mainfile uris. """
# TODO, mainfile URI's should change or this implementation should change
return path[5:]
raw_file_compress_argument = dict(
name='compress', type=bool, help='Use compression on .zip files, default is not.',
location='args')
......@@ -68,7 +60,7 @@ class RawFileFromPathResource(Resource):
Zip files are streamed; instead of 401 errors, the zip file will just not contain
any files that the user is not authorized to access.
"""
upload_filepath = fix_file_paths(path)
upload_filepath = path
upload_files = UploadFiles.get(
upload_hash, create_authorization_predicate(upload_hash))
......@@ -132,7 +124,7 @@ class RawFilesResource(Resource):
"""
json_data = request.get_json()
compress = json_data.get('compress', False)
files = [fix_file_paths(file.strip()) for file in json_data['files']]
files = [file.strip() for file in json_data['files']]
return respond_to_get_raw_files(upload_hash, files, compress)
......@@ -153,7 +145,7 @@ class RawFilesResource(Resource):
if files_str is None:
abort(400, message="No files argument given.")
files = [fix_file_paths(file.strip()) for file in files_str.split(',')]
files = [file.strip() for file in files_str.split(',')]
return respond_to_get_raw_files(upload_hash, files, compress)
......
......@@ -23,13 +23,12 @@ from datetime import datetime
from werkzeug.datastructures import FileStorage
import os.path
from nomad import config
from nomad import config, utils
from nomad.processing import Upload
from nomad.processing import NotAllowedDuringProcessing
from nomad.utils import get_logger
from nomad.uploads import ArchiveBasedStagingUploadFiles
from nomad.uploads import ArchiveBasedStagingUploadFiles, StagingUploadFiles, UploadFiles
from .app import api
from .app import api, with_logger
from .auth import login_really_required
from .common import pagination_request_parser, pagination_model
......@@ -133,7 +132,8 @@ class UploadListResource(Resource):
@api.marshal_with(upload_model, skip_none=True, code=200, description='Upload received')
@api.expect(upload_metadata_parser)
@login_really_required
def put(self):
@with_logger
def put(self, logger):
"""
Upload a file and automatically create a new upload in the process.
Can be used to upload files via browser or other http clients like curl.
......@@ -160,8 +160,7 @@ class UploadListResource(Resource):
name=request.args.get('name'),
local_path=local_path)
logger = get_logger(__name__, endpoint='upload', action='put', upload_id=upload.upload_id)
logger.info('upload created')
logger.info('upload created', upload_id=upload.upload_id)
upload_files = ArchiveBasedStagingUploadFiles(
upload.upload_id, create=True, local_path=local_path)
......@@ -187,12 +186,13 @@ class UploadListResource(Resource):
f.write(request.stream.read(1024))
except Exception as e:
logger.error('Error on streaming upload', exc_info=e)
logger.warning('Error on streaming upload', exc_info=e)
abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
if not upload_files.is_valid:
# TODO upload_files.delete()
upload_files.delete()
upload.delete()
logger.info('Invalid upload')
abort(400, message='Bad file format, excpected %s.' % ", ".join(upload_files.formats))
logger.info('received uploaded file')
......@@ -267,15 +267,17 @@ class UploadResource(Resource):
@api.doc('delete_upload')
@api.response(404, 'Upload does not exist')
@api.response(400, 'Not allowed during processing or when not in staging')
@api.response(401, 'Upload does not belong to authenticated user.')
@api.response(400, 'Not allowed during processing')
@api.marshal_with(upload_model, skip_none=True, code=200, description='Upload deleted')
@login_really_required
def delete(self, upload_id: str):
@with_logger
def delete(self, upload_id: str, logger):
"""
Delete an existing upload.
Only ``is_ready`` uploads
can be deleted. Deleting an upload in processing is not allowed.
Only uploads that are sill in staging, not already delete, not still uploaded, and
not currently processed, can be deleted.
"""
try:
upload = Upload.get(upload_id)
......@@ -283,19 +285,25 @@ class UploadResource(Resource):
abort(404, message='Upload with id %s does not exist.' % upload_id)
if upload.user_id != str(g.user.user_id) and not g.user.is_admin:
abort(404, message='Upload with id %s does not exist.' % upload_id)
abort(401, message='Upload with id %s does not belong to you.' % upload_id)
if not upload.in_staging:
abort(400, message='Operation not allowed, upload is not in staging.')
with utils.lnr(logger, 'delete processing upload'):
try:
upload.delete()
except NotAllowedDuringProcessing:
abort(400, message='You must not delete an upload during processing.')
try:
upload.delete()
return upload, 200
except NotAllowedDuringProcessing:
abort(400, message='You must not delete an upload during processing.')
with utils.lnr(logger, 'delete upload files'):
upload_files = UploadFiles.get(upload_id)
assert upload_files is not None, 'Uploads existing in staging must have files.'
if upload_files is not None:
assert isinstance(upload_files, StagingUploadFiles), 'Uploads in staging must have staging files.'
upload_files.delete()
return upload, 200
@api.doc('exec_upload_command')
@api.response(404, 'Upload does not exist or is not allowed')
@api.response(404, 'Upload does not exist or not in staging')
@api.response(400, 'Operation is not supported')
@api.response(401, 'If the operation is not allowed for the current user')
@api.marshal_with(upload_model, skip_none=True, code=200, description='Upload unstaged successfully')
......@@ -334,9 +342,6 @@ class UploadResource(Resource):
break
if operation == 'unstage':
if not upload.in_staging:
abort(400, message='Operation not allowed, upload is not in staging.')
try:
upload.unstage(meta_data)
except NotAllowedDuringProcessing:
......
......@@ -28,7 +28,7 @@ from bravado.requests_client import RequestsClient
from bravado.client import SwaggerClient
from nomad import config, utils
from nomad.files import UploadFile
from nomad.uploads import ArchiveBasedStagingUploadFiles
from nomad.parsing import parsers, parser_dict, LocalBackend
from nomad.normalizing import normalizers
......@@ -163,17 +163,17 @@ class CalcProcReproduction:
else:
self.logger.info('Calc already downloaded.')
self.upload_file = UploadFile(upload_id='tmp_%s' % archive_id, local_path=local_path)
self.upload_files = ArchiveBasedStagingUploadFiles(upload_id='tmp_%s' % archive_id, local_path=local_path)
def __enter__(self):
# open/extract upload file
self.logger.info('Extracting calc data.')
self.upload_file.__enter__()
self.upload_files.extract()
# find mainfile matching calc_hash
self.mainfile = next(
filename for filename in self.upload_file.filelist
if utils.hash(filename) == self.calc_hash)
filename for filename in self.upload_files.raw_file_manifest()
if self.upload_files.calc_hash(filename) == self.calc_hash)
assert self.mainfile is not None, 'The mainfile could not be found.'
self.logger = self.logger.bind(mainfile=self.mainfile)
......@@ -182,19 +182,18 @@ class CalcProcReproduction:
return self
def __exit__(self, *args):
self.upload_file.__exit__(*args)
self.upload_files.delete()
def parse(self, parser_name: str = None) -> LocalBackend:
"""
Run the given parser on the downloaded calculation. If no parser is given,
do parser matching and use the respective parser.
"""
mainfile = self.upload_file.get_file(self.mainfile)
if parser_name is not None:
parser = parser_dict.get(parser_name)
else:
for potential_parser in parsers:
with mainfile.open() as mainfile_f:
with self.upload_files.raw_file(self.mainfile) as mainfile_f:
if potential_parser.is_mainfile(self.mainfile, lambda fn: mainfile_f):
parser = potential_parser
break
......@@ -203,7 +202,7 @@ class CalcProcReproduction:
self.logger = self.logger.bind(parser=parser.name) # type: ignore
self.logger.info('identified parser')
parser_backend = parser.run(mainfile.os_path, logger=self.logger)
parser_backend = parser.run(self.upload_files.raw_file_object(self.mainfile).os_path, logger=self.logger)
self.logger.info('ran parser')
return parser_backend
......
This diff is collapsed.
......@@ -25,7 +25,6 @@ calculations, and files
"""
from typing import List, Any, ContextManager, Tuple, Generator
from elasticsearch.exceptions import NotFoundError
from mongoengine import StringField, BooleanField, DateTimeField, DictField, IntField
import logging
from structlog import wrap_logger
......@@ -37,7 +36,6 @@ from nomad.repo import RepoCalc, RepoUpload
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parsers, parser_dict
from nomad.normalizing import normalizers
from nomad.utils import lnr
class NotAllowedDuringProcessing(Exception): pass
......@@ -106,26 +104,6 @@ class Calc(Proc, datamodel.Calc):
def upload_hash(self):
return utils.archive.upload_hash(self.archive_id)
def delete(self):
"""
Delete this calculation and all associated data. This includes all files,
the archive, and this search index entry.
TODO is this needed? Or do we always delete hole uploads in bulk.
"""
# delete all files
self.upload_files.delete()
# delete the search index entry
try:
elastic_entry = RepoCalc.get(self.archive_id)
if elastic_entry is not None:
elastic_entry.delete()
except NotFoundError:
pass
# delete this mongo document
super().delete()
def get_logger(self, **kwargs):
logger = super().get_logger()
logger = logger.bind(
......@@ -312,7 +290,6 @@ class Upload(Chord, datamodel.Upload):
local_path: optional local path, e.g. for files that are already somewhere on the server
additional_metadata: optional user provided additional meta data
upload_id: the upload id generated by the database
in_staging: true if the upload is still in staging and can be edited by the uploader
is_private: true if the upload and its derivitaves are only visible to the uploader
upload_time: the timestamp when the system realised the upload
upload_hash: the hash of the uploaded file
......@@ -326,7 +303,6 @@ class Upload(Chord, datamodel.Upload):
local_path = StringField(default=None)
additional_metadata = DictField(default=None)
in_staging = BooleanField(default=True)
is_private = BooleanField(default=False)
upload_time = DateTimeField()
......@@ -355,7 +331,7 @@ class Upload(Chord, datamodel.Upload):
@classmethod
def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
""" Returns all uploads for the given user. Currently returns all uploads. """
return cls.objects(user_id=str(user.user_id), in_staging=True)
return cls.objects(user_id=str(user.user_id))
@property
def uploader(self):
......@@ -367,23 +343,10 @@ class Upload(Chord, datamodel.Upload):
return logger
def delete(self):
logger = self.get_logger(task='delete')
if not (self.completed or self.current_task == 'uploading'):
raise NotAllowedDuringProcessing()
with lnr(logger, 'delete all files of upload'):
self.upload_files.delete()
with lnr(logger, 'deleting calcs db entries'):
# delete repo entries
self.to(RepoUpload).delete()
# delete calc processings
Calc.objects(upload_id=self.upload_id).delete()
with lnr(logger, 'deleting upload db entry'):
super().delete()
Calc.objects(upload_id=self.upload_id).delete()
super().delete()
@classmethod
def create(cls, **kwargs) -> 'Upload':
......@@ -412,11 +375,15 @@ class Upload(Chord, datamodel.Upload):
if not (self.completed or self.current_task == 'uploading'):
raise NotAllowedDuringProcessing()
self.in_staging = False
self.delete()
self.to(RepoUpload).unstage()
coe_repo.Upload.add(self, meta_data)
self.save()
self.upload_files.pack()
self.upload_files.delete()
@process
def process(self):
self.extracting()
......@@ -516,8 +483,6 @@ class Upload(Chord, datamodel.Upload):
self.get_logger(), 'pack staging upload', step='cleaning',
upload_size=self.upload_files.size):
pass
# self.upload_files.pack()
# self.upload_files.delete()
@property
def processed_calcs(self):
......
......@@ -46,6 +46,7 @@ import json
import uuid
import time
import re
from werkzeug.exceptions import HTTPException
from nomad import config
......@@ -218,6 +219,9 @@ def lnr(logger, event, **kwargs):
"""
try:
yield
except HTTPException as e:
# ignore HTTPException as they are part of the normal flask error handling
raise e
except Exception as e:
logger.error(event, exc_info=e, **kwargs)
raise e
......
......@@ -31,10 +31,10 @@ from nomad.processing import Upload, Calc
from nomad.processing.base import task as task_decorator
from nomad.repo import RepoUpload
from tests.test_files import example_file, empty_file
from tests.test_uploads import example_file, empty_file
# import fixtures
from tests.test_files import clear_files # pylint: disable=unused-import
from tests.test_uploads import clear_files # pylint: disable=unused-import
example_files = [empty_file, example_file]
......@@ -130,20 +130,6 @@ def test_processing_with_warning(uploaded_id_with_warning, worker, test_user, mo
assert_processing(upload, mocksearch)
# TODO
# @pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True)
# def test_processing_doublets(uploaded_id, worker, test_user, with_error):
# upload = run_processing(uploaded_id, test_user)
# assert upload.status == 'SUCCESS'
# assert upload.to(RepoUpload).exists()
# upload = run_processing(uploaded_id, test_user)
# assert upload.status == 'FAILURE'
# assert len(upload.errors) > 0
# assert 'already' in upload.errors[0]
@pytest.mark.timeout(30)
def test_process_non_existing(worker, test_user, with_error):
upload = run_processing('__does_not_exist', test_user)
......
......@@ -29,12 +29,12 @@ services_config.update(api_base_path='')
config.services = config.NomadServicesConfig(**services_config)
from nomad import api, coe_repo # noqa
from nomad.uploads import UploadFiles # noqa
from nomad.processing import Upload # noqa
from nomad.uploads import UploadFiles, PublicUploadFiles # noqa
from nomad.processing import Upload, Calc # noqa
from nomad.coe_repo import User # noqa
from tests.processing.test_data import example_files # noqa
from tests.test_files import example_file, example_file_mainfile, example_file_contents # noqa
from tests.test_uploads import example_file, example_file_mainfile, example_file_contents # noqa
from tests.test_uploads import create_staging_upload, create_public_upload # noqa
# import fixtures
......@@ -210,18 +210,27 @@ class TestUploads:
assert len(upload['calcs']['results']) == 1
def assert_unstage(self, client, test_user_auth, upload_id, proc_infra, meta_data={}):
rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth)
upload = self.assert_upload(rv.data)
empty_upload = upload['calcs']['pagination']['total'] == 0
rv = client.post(
'/uploads/%s' % upload_id,
headers=test_user_auth,
data=json.dumps(dict(operation='unstage', meta_data=meta_data)),
content_type='application/json')
assert rv.status_code == 200
rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth)
assert rv.status_code == 200
upload = self.assert_upload(rv.data)
empty_upload = upload['calcs']['pagination']['total'] == 0
assert_coe_upload(upload['upload_hash'], empty=empty_upload, meta_data=meta_data)
self.assert_upload_does_not_exist(client, upload_id, test_user_auth)
assert_coe_upload(upload_id, empty=empty_upload, meta_data=meta_data)
def assert_upload_does_not_exist(self, client, upload_id: str, test_user_auth):
rv = client.get('/uploads/%s' % upload_id, headers=test_user_auth)
assert rv.status_code == 404
assert Upload.objects(upload_id=upload_id).first() is None
assert Calc.objects(upload_id=upload_id).count() is 0
upload_files = UploadFiles.get(upload_id)
assert upload_files is None or isinstance(upload_files, PublicUploadFiles)
def test_get_command(self, client, test_user_auth, no_warn):
rv = client.get('/uploads/command', headers=test_user_auth)
......@@ -288,7 +297,7 @@ class TestUploads:
self.assert_processing(client, test_user_auth, upload['upload_id'])
self.assert_unstage(client, test_user_auth, upload['upload_id'], proc_infra)
rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth)
assert rv.status_code == 400
assert rv.status_code == 404
def test_delete(self, client, test_user_auth, proc_infra):
rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth)
......@@ -296,6 +305,7 @@ class TestUploads:
self.assert_processing(client, test_user_auth, upload['upload_id'])
rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth)
assert rv.status_code == 200
self.assert_upload_does_not_exist(client, upload['upload_id'], test_user_auth)
@pytest.mark.parametrize('example_file', example_files)
def test_post(self, client, test_user_auth, example_file, proc_infra, clean_repository_db):
......@@ -509,14 +519,14 @@ class TestRaw(UploadFilesBasedTests):
@UploadFilesBasedTests.check_authorizaton
def test_raw_file(self, client, upload, auth_headers):
url = '/raw/%s/data/%s' % (upload, example_file_mainfile)
url = '/raw/%s/%s' % (upload, example_file_mainfile)
rv = client.get(url, headers=auth_headers)
assert rv.status_code == 200
assert len(rv.data) > 0
@UploadFilesBasedTests.ignore_authorization
def test_raw_file_missing_file(self, client, upload, auth_headers):
url = '/raw/%s/data/does/not/exist' % upload
url = '/raw/%s/does/not/exist' % upload
rv = client.get(url, headers=auth_headers)
assert rv.status_code == 404
data = json.loads(rv.data)
......@@ -524,7 +534,7 @@ class TestRaw(UploadFilesBasedTests):
@UploadFilesBasedTests.ignore_authorization
def test_raw_file_listing(self, client, upload, auth_headers):
url = '/raw/%s/data/examples' % upload
url = '/raw/%s/examples' % upload
rv = client.get(url, headers=auth_headers)
assert rv.status_code == 404
data = json.loads(rv.data)
......@@ -533,7 +543,7 @@ class TestRaw(UploadFilesBasedTests):
@pytest.mark.parametrize('compress', [True, False])
@UploadFilesBasedTests.ignore_authorization
def test_raw_file_wildcard(self, client, upload, auth_headers, compress):
url = '/raw/%s/data/examples*' % upload
url = '/raw/%s/examples*' % upload
if compress:
url = '%s?compress=1' % url
rv = client.get(url, headers=auth_headers)
......@@ -560,7 +570,7 @@ class TestRaw(UploadFilesBasedTests):
@UploadFilesBasedTests.check_authorizaton
def test_raw_files(self, client, upload, auth_headers, compress):
url = '/raw/%s?files=%s' % (
upload, ','.join(['data/%s' % file for file in example_file_contents]))
upload, ','.join(example_file_contents))
if compress:
url = '%s&compress=1' % url
rv = client.get(url, headers=auth_headers)
......@@ -575,7 +585,7 @@ class TestRaw(UploadFilesBasedTests):
@UploadFilesBasedTests.check_authorizaton
def test_raw_files_post(self, client, upload, auth_headers, compress):
url = '/raw/%s' % upload
data = dict(files=['data/%s' % file for file in example_file_contents])
data = dict(files=example_file_contents)
if compress is not None:
data.update(compress=compress)
rv = client.post(url, data=json.dumps(data), content_type='application/json', headers=auth_headers)
......@@ -589,7 +599,7 @@ class TestRaw(UploadFilesBasedTests):
@pytest.mark.parametrize('compress', [True, False])
@UploadFilesBasedTests.ignore_authorization
def test_raw_files_missing_file(self, client, upload, auth_headers, compress):
url = '/raw/%s?files=data/%s,missing/file.txt' % (upload, example_file_mainfile)
url = '/raw/%s?files=%s,missing/file.txt' % (upload, example_file_mainfile)
if compress:
url = '%s&compress=1' % url
rv = client.get(url, headers=auth_headers)
......
......@@ -20,7 +20,7 @@ from nomad.coe_repo import User, Calc, Upload
from tests.processing.test_data import processed_upload # pylint: disable=unused-import
from tests.processing.test_data import uploaded_id # pylint: disable=unused-import
from tests.processing.test_data import mocks_forall # pylint: disable=unused-import
from tests.test_files import clear_files # pylint: disable=unused-import
from tests.test_uploads import clear_files # pylint: disable=unused-import
def assert_user(user, reference):
......
# Copyright 2018 Markus Scheidgen
#