Commit 4bdfd113 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Continued massive refactoring replacing files with new files(uploads) module.

parent ffecdabd
......@@ -44,7 +44,7 @@
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/processing/test_data.py::test_processing[tests/data/proc/examples_template.zip]"
"-sv", "tests/test_api.py::TestRaw::test_raw_files[test_data0]"
]
},
{
......
......@@ -24,12 +24,10 @@ from flask_restplus import abort, Resource
import nomad_meta_info
from nomad import config
from nomad.uploads import UploadFiles
from nomad.utils import get_logger
from nomad.uploads import UploadFiles, Restricted
from .app import api
from .auth import login_if_available
from .auth import login_if_available, create_authorization_predicate
from .common import calc_route
ns = api.namespace(
......@@ -41,6 +39,7 @@ ns = api.namespace(
class ArchiveCalcLogResource(Resource):
@api.doc('get_archive_logs')
@api.response(404, 'The upload or calculation does not exist')
@api.response(401, 'Not authorized to access the data.')
@api.response(200, 'Archive data send', headers={'Content-Type': 'application/plain'})
@login_if_available
def get(self, upload_hash, calc_hash):
......@@ -51,30 +50,29 @@ class ArchiveCalcLogResource(Resource):
"""
archive_id = '%s/%s' % (upload_hash, calc_hash)
try:
upload_files = UploadFiles.get(upload_hash)
with upload_files.archive_log_file(calc_hash, 'rt') as f:
rv = send_file(
f,
mimetype='text/plain',
as_attachment=True,
attachment_filename='%s.log' % archive_id)
upload_files = UploadFiles.get(
upload_hash, is_authorized=create_authorization_predicate(upload_hash, calc_hash))
return rv
except FileNotFoundError:
abort(404, message='Archive/calculation %s does not exist.' % archive_id)
except Exception as e:
logger = get_logger(
__name__, endpoint='logs', action='get',
upload_hash=upload_hash, calc_hash=calc_hash)
logger.error('Exception on accessing calc proc log', exc_info=e)
abort(500, message='Could not accessing the logs.')
if upload_files is None:
abort(404, message='Archive %s does not exist.' % upload_hash)
try:
return send_file(
upload_files.archive_log_file(calc_hash, 'rt'),
mimetype='text/plain',
as_attachment=True,
attachment_filename='%s.log' % archive_id)
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_hash, calc_hash))
except KeyError:
abort(404, message='Calculation %s does not exist.' % archive_id)
@calc_route(ns)
class ArchiveCalcResource(Resource):
@api.doc('get_archive_calc')
@api.response(404, 'The upload or calculation does not exist')
@api.response(401, 'Not authorized to access the data.')
@api.response(200, 'Archive data send')
@login_if_available
def get(self, upload_hash, calc_hash):
......@@ -85,28 +83,22 @@ class ArchiveCalcResource(Resource):
"""
archive_id = '%s/%s' % (upload_hash, calc_hash)
try:
upload_file = UploadFiles.get(upload_hash)
mode = 'rb' if config.files.compress_archive else 'rt'
with upload_file.archive_file(calc_hash, mode) as f:
rv = send_file(
f,
mimetype='application/json',
as_attachment=True,
attachment_filename='%s.json' % archive_id)
if config.files.compress_archive:
rv.headers['Content-Encoding'] = 'gzip'
upload_file = UploadFiles.get(
upload_hash, is_authorized=create_authorization_predicate(upload_hash, calc_hash))
return rv
if upload_file is None:
abort(404, message='Archive %s does not exist.' % upload_hash)
try:
return send_file(
upload_file.archive_file(calc_hash, 'rt'),
mimetype='application/json',
as_attachment=True,
attachment_filename='%s.json' % archive_id)
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_hash, calc_hash))
except KeyError:
abort(404, message='Archive %s does not exist.' % archive_id)
except Exception as e:
logger = get_logger(
__name__, endpoint='archive', action='get',
upload_hash=upload_hash, calc_hash=calc_hash)
logger.error('Exception on accessing archive', exc_info=e)
abort(500, message='Could not accessing the archive.')
abort(404, message='Calculation %s does not exist.' % archive_id)
@ns.route('/metainfo/<string:metainfo_path>')
......@@ -132,8 +124,3 @@ class MetainfoResource(Resource):
return rv
except FileNotFoundError:
abort(404, message='The metainfo %s does not exist.' % metainfo_path)
except Exception as e:
logger = get_logger(
__name__, endpoint='metainfo', action='get', metainfo_path=metainfo_path)
logger.error('Exception on accessing metainfo', exc_info=e)
abort(500, message='Could not accessing the metainfo.')
......@@ -39,7 +39,7 @@ from flask import g, request, make_response
from flask_restplus import abort, Resource
from flask_httpauth import HTTPBasicAuth
from nomad import config
from nomad import config, processing, uploads, utils, coe_repo
from nomad.coe_repo import User, LoginException
from .app import app, api
......@@ -147,3 +147,32 @@ class TokenResource(Resource):
401,
message='You are not propertly logged in at the NOMAD coe repository, '
'there is no token for you.')
def create_authorization_predicate(upload_hash, calc_hash=None):
"""
Returns a predicate that determines if the logged in user has the authorization
to access the given upload and calculation.
"""
def func():
if g.user is None:
# guest users don't have authorized access to anything
return False
# look in repository
upload = coe_repo.Upload.from_upload_hash(upload_hash)
if upload is not None:
return upload.user_id == g.user.user_id
# look in staging
staging_upload = processing.Upload.get(upload_hash)
if staging_upload is not None:
return str(g.user.user_id) == str(staging_upload.user_id)
# There are no db entries for the given resource
if uploads.UploadFiles.get(upload_hash) is not None:
logger = utils.get_logger(__name__, upload_hash=upload_hash, calc_hash=calc_hash)
logger.error('Upload files without respective db entry')
raise KeyError
return func
......@@ -22,15 +22,13 @@ import os.path
from zipfile import ZIP_DEFLATED, ZIP_STORED
import zipstream
from flask import Response, request, send_file
from flask import Response, request, send_file, stream_with_context
from flask_restplus import abort, Resource, fields
from werkzeug.exceptions import HTTPException
from nomad.utils import get_logger
from nomad.uploads import UploadFiles
from nomad.uploads import UploadFiles, Restricted
from .app import api
from .auth import login_if_available
from .auth import login_if_available, create_authorization_predicate
ns = api.namespace('raw', description='Downloading raw data files.')
......@@ -57,6 +55,7 @@ raw_file_from_path_parser.add_argument(**raw_file_compress_argument)
class RawFileFromPathResource(Resource):
@api.doc('get')
@api.response(404, 'The upload or path does not exist')
@api.response(401, 'Not authorized to access the data.')
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/gz'})
@api.expect(raw_file_from_path_parser, validate=True)
@login_if_available
......@@ -66,12 +65,14 @@ class RawFileFromPathResource(Resource):
If the given path points to a file, the file is provided. If the given path
points to an directory, the directory and all contents is provided as .zip file.
Zip files are streamed; instead of 401 errors, the zip file will just not contain
any files that the user is not authorized to access.
"""
upload_filepath = fix_file_paths(path)
try:
upload_files = UploadFiles.get(upload_hash)
except KeyError:
upload_files = UploadFiles.get(
upload_hash, create_authorization_predicate(upload_hash))
if upload_files is None:
abort(404, message='The upload with hash %s does not exist.' % upload_hash)
if upload_filepath[-1:] == '*':
......@@ -84,27 +85,19 @@ class RawFileFromPathResource(Resource):
return respond_to_get_raw_files(upload_hash, files, compress)
try:
with upload_files.raw_file(upload_filepath) as f:
rv = send_file(
f,
mimetype='application/octet-stream',
as_attachment=True,
attachment_filename=os.path.basename(upload_filepath))
return rv
return send_file(
upload_files.raw_file(upload_filepath),
mimetype='application/octet-stream',
as_attachment=True,
attachment_filename=os.path.basename(upload_filepath))
except Restricted:
abort(401, message='Not authorized to access upload %s.' % upload_hash)
except KeyError:
files = list(file for file in upload_files.raw_file_manifest(upload_filepath))
if len(files) == 0:
abort(404, message='The file %s does not exist.' % upload_filepath)
else:
abort(404, message='The file %s does not exist, but there are files with matching paths' % upload_filepath, files=files)
except HTTPException as e:
raise e
except Exception as e:
logger = get_logger(
__name__, endpoint='raw', action='get',
upload_hash=upload_hash, upload_filepath=upload_filepath)
logger.error('Exception on accessing raw data', exc_info=e)
abort(500, message='Could not accessing the raw data.')
raw_files_request_model = api.model('RawFilesRequest', {
......@@ -132,7 +125,11 @@ class RawFilesResource(Resource):
@api.expect(raw_files_request_model, validate=True)
@login_if_available
def post(self, upload_hash):
""" Download multiple raw calculation files. """
"""
Download multiple raw calculation files in a .zip file.
Zip files are streamed; instead of 401 errors, the zip file will just not contain
any files that the user is not authorized to access.
"""
json_data = request.get_json()
compress = json_data.get('compress', False)
files = [fix_file_paths(file.strip()) for file in json_data['files']]
......@@ -145,7 +142,12 @@ class RawFilesResource(Resource):
@api.expect(raw_files_request_parser, validate=True)
@login_if_available
def get(self, upload_hash):
""" Download multiple raw calculation files. """
"""
Download multiple raw calculation files.
Download multiple raw calculation files in a .zip file.
Zip files are streamed; instead of 401 errors, the zip file will just not contain
any files that the user is not authorized to access.
"""
files_str = request.args.get('files', None)
compress = request.args.get('compress', 'false') == 'true'
......@@ -157,36 +159,34 @@ class RawFilesResource(Resource):
def respond_to_get_raw_files(upload_hash, files, compress=False):
logger = get_logger(__name__, endpoint='raw', action='get files', upload_hash=upload_hash)
try:
upload_file = UploadFiles.get(upload_hash)
except KeyError:
upload_files = UploadFiles.get(
upload_hash, create_authorization_predicate(upload_hash))
if upload_files is None:
abort(404, message='The upload with hash %s does not exist.' % upload_hash)
def generator():
""" Stream a zip file with all files using zipstream. """
def iterator():
""" Replace the directory based iter of zipstream with an iter over all given files. """
try:
for filename in files:
# Write a file to the zipstream.
try:
with upload_file.raw_file(filename) as f:
def iter_content():
while True:
data = f.read(100000)
if not data:
break
yield data
yield dict(arcname=filename, iterable=iter_content())
except KeyError as e:
# files that are not found, will not be returned
pass
except Exception as e:
logger.error('Exception while accessing files.', exc_info=e)
for filename in files:
# Write a file to the zipstream.
try:
with upload_files.raw_file(filename, 'rb') as f:
def iter_content():
while True:
data = f.read(100000)
if not data:
break
yield data
yield dict(arcname=filename, iterable=iter_content())
except KeyError:
# files that are not found, will not be returned
pass
except Restricted:
# due to the streaming nature, we cannot raise 401 here
# we just leave it out in the download
pass
compression = ZIP_DEFLATED if compress else ZIP_STORED
zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True)
......@@ -195,6 +195,6 @@ def respond_to_get_raw_files(upload_hash, files, compress=False):
for chunk in zip_stream:
yield chunk
response = Response(generator(), mimetype='application/zip')
response = Response(stream_with_context(generator()), mimetype='application/zip')
response.headers['Content-Disposition'] = 'attachment; filename={}'.format('%s.zip' % upload_hash)
return response
......@@ -191,7 +191,7 @@ class UploadListResource(Resource):
abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
if not upload_files.is_valid:
upload_files.delete()
# TODO upload_files.delete()
upload.delete()
abort(400, message='Bad file format, excpected %s.' % ", ".join(upload_files.formats))
......
......@@ -22,7 +22,7 @@ import logging
from collections import namedtuple
FilesConfig = namedtuple(
'FilesConfig', ['uploads_bucket', 'raw_bucket', 'archive_bucket', 'compress_archive', 'staging_bucket', 'public_bucket'])
'FilesConfig', ['uploads_bucket', 'raw_bucket', 'archive_bucket', 'staging_bucket', 'public_bucket'])
""" API independent configuration for the object storage. """
CeleryConfig = namedtuple('Celery', ['broker_url'])
......@@ -50,7 +50,6 @@ files = FilesConfig(
uploads_bucket='uploads',
raw_bucket=os.environ.get('NOMAD_FILES_RAW_BUCKET', 'raw'),
archive_bucket='archive',
compress_archive=True,
staging_bucket='staging',
public_bucket='public'
)
......
This diff is collapsed.
......@@ -99,7 +99,7 @@ class Calc(Proc, datamodel.Calc):
@property
def upload_files(self) -> ArchiveBasedStagingUploadFiles:
if not self._upload_files:
self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, public_only=False)
self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.upload.local_path)
return self._upload_files
@property
......@@ -429,7 +429,7 @@ class Upload(Chord, datamodel.Upload):
@property
def upload_files(self) -> ArchiveBasedStagingUploadFiles:
if not self._upload_files:
self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, public_only=False)
self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.local_path)
return self._upload_files
@task
......
......@@ -37,7 +37,7 @@ almost readonly (beside metadata) storage.
"""
from abc import ABCMeta
from typing import IO, Generator, Dict, Iterator, Iterable
from typing import IO, Generator, Dict, Iterator, Iterable, Callable
from filelock import Timeout, FileLock
import ujson
import os.path
......@@ -45,7 +45,6 @@ import os
import shutil
from zipfile import ZipFile, BadZipFile, is_zipfile
from bagit import make_bag
import contextlib
import hashlib
import io
......@@ -297,10 +296,13 @@ class Restricted(Exception):
class UploadFiles(DirectoryObject, metaclass=ABCMeta):
_archive_ext = 'json'
def __init__(
self, bucket: str, upload_id: str, public_only: bool = True,
create: bool = False,
archive_ext: str = 'json.gz' if config.files.compress_archive else 'json') -> None:
self, bucket: str, upload_id: str,
is_authorized: Callable[[], bool] = lambda: False,
create: bool = False) -> None:
self.logger = utils.get_logger(__name__, upload_id=upload_id)
super().__init__(bucket, upload_id, create=create, prefix=True)
......@@ -309,8 +311,7 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
raise KeyError()
self.upload_id = upload_id
self.public_only = public_only
self._archive_ext = archive_ext
self._is_authorized = is_authorized
@staticmethod
def get(upload_id: str, *args, **kwargs) -> 'UploadFiles':
......@@ -326,8 +327,7 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
""" The calc metadata for this upload. """
raise NotImplementedError
@contextlib.contextmanager
def raw_file(self, file_path: str, *args, **kwargs) -> Generator[IO, None, None]:
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
"""
Opens a raw file and returns a file-like objects. Additional args, kwargs are
delegated to the respective `open` call.
......@@ -349,8 +349,7 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
"""
raise NotImplementedError()
@contextlib.contextmanager
def archive_file(self, calc_hash: str, *args, **kwargs) -> Generator[IO, None, None]:
def archive_file(self, calc_hash: str, *args, **kwargs) -> IO:
"""
Opens a archive file and returns a file-like objects. Additional args, kwargs are
delegated to the respective `open` call.
......@@ -362,8 +361,7 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
"""
raise NotImplementedError()
@contextlib.contextmanager
def archive_log_file(self, calc_hash: str, *args, **kwargs) -> Generator[IO, None, None]:
def archive_log_file(self, calc_hash: str, *args, **kwargs) -> IO:
"""
Opens a archive log file and returns a file-like objects. Additional args, kwargs are
delegated to the respective `open` call.
......@@ -397,40 +395,29 @@ class StagingUploadFiles(UploadFiles):
def metadata(self) -> Metadata:
return self._metadata
@contextlib.contextmanager
def _file(self, path_object: PathObject, *args, **kwargs) -> Generator[IO, None, None]:
def _file(self, path_object: PathObject, *args, **kwargs) -> IO:
try:
with open(path_object.os_path, *args, **kwargs) as f:
yield f
return open(path_object.os_path, *args, **kwargs)
except FileNotFoundError:
raise KeyError()
@contextlib.contextmanager
def raw_file(self, file_path: str, *args, **kwargs) -> Generator[IO, None, None]:
if self.public_only:
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
if not self._is_authorized():
raise Restricted
with self._file(self.raw_file_object(file_path), *args, **kwargs) as f:
yield f
return self._file(self.raw_file_object(file_path), *args, **kwargs)
def raw_file_object(self, file_path: str) -> PathObject:
return self._raw_dir.join_file(file_path)
@contextlib.contextmanager
def archive_file(self, calc_hash: str, *args, **kwargs) -> Generator[IO, None, None]:
if self.public_only:
def archive_file(self, calc_hash: str, *args, **kwargs) -> IO:
if not self._is_authorized():
raise Restricted
return self._file(self.archive_file_object(calc_hash), *args, **kwargs)
with self._file(self.archive_file_object(calc_hash), *args, **kwargs) as f:
yield f
@contextlib.contextmanager
def archive_log_file(self, calc_hash: str, *args, **kwargs) -> Generator[IO, None, None]:
if self.public_only:
def archive_log_file(self, calc_hash: str, *args, **kwargs) -> IO:
if not self._is_authorized():
raise Restricted
with self._file(self.archive_log_file_object(calc_hash), *args, **kwargs) as f:
yield f
return self._file(self.archive_log_file_object(calc_hash), *args, **kwargs)
def archive_file_object(self, calc_hash: str) -> PathObject:
return self._archive_dir.join_file('%s.%s' % (calc_hash, self._archive_ext))
......@@ -621,7 +608,7 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles):
@property
def upload_file_os_path(self):
if self._local_path:
if self._local_path is not None:
return self._local_path
else:
return self._upload_file.os_path
......@@ -653,8 +640,7 @@ class PublicUploadFiles(UploadFiles):
def metadata(self) -> Metadata:
return self._metadata
@contextlib.contextmanager
def _file(self, prefix: str, ext: str, path: str, *args, **kwargs) -> Generator[IO, None, None]:
def _file(self, prefix: str, ext: str, path: str, *args, **kwargs) -> IO:
mode = kwargs.get('mode') if len(args) == 0 else args[0]
if 'mode' in kwargs:
del(kwargs['mode'])
......@@ -664,26 +650,22 @@ class PublicUploadFiles(UploadFiles):
try:
zip_file = self.join_file('%s-%s.%s.zip' % (prefix, access, ext))
with ZipFile(zip_file.os_path) as zf:
with zf.open(path, 'r', **kwargs) as f:
if 't' in mode:
yield io.TextIOWrapper(f)
else:
yield f
return
f = zf.open(path, 'r', **kwargs)
if access == 'restricted' and not self._is_authorized():
raise Restricted
if 't' in mode:
return io.TextIOWrapper(f)
else:
return f
except FileNotFoundError:
pass
except KeyError:
pass
if self.public_only:
raise Restricted
raise KeyError()
@contextlib.contextmanager
def raw_file(self, file_path: str, *args, **kwargs) -> Generator[IO, None, None]:
with self._file('raw', 'bagit', 'data/' + file_path, *args, *kwargs) as f:
yield f
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
return self._file('raw', 'bagit', 'data/' + file_path, *args, *kwargs)
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
for access in ['public', 'restricted']:
......@@ -697,15 +679,11 @@ class PublicUploadFiles(UploadFiles):
except FileNotFoundError:
pass
@contextlib.contextmanager
def archive_file(self, calc_hash: str, *args, **kwargs) -> Generator[IO, None, None]:
with self._file('archive', self._archive_ext, '%s.%s' % (calc_hash, self._archive_ext), *args, **kwargs) as f:
yield f
def archive_file(self, calc_hash: str, *args, **kwargs) -> IO:
return self._file('archive', self._archive_ext, '%s.%s' % (calc_hash, self._archive_ext), *args, **kwargs)
@contextlib.contextmanager
def archive_log_file(self, calc_hash: str, *args, **kwargs) -> Generator[IO, None, None]:
with self._file('archive', self._archive_ext, '%s.log' % calc_hash, *args, **kwargs) as f:
yield f
def archive_log_file(self, calc_hash: str, *args, **kwargs) -> IO:
return self._file('archive', self._archive_ext, '%s.log' % calc_hash, *args, **kwargs)
def repack(self) -> None:
"""
......@@ -714,6 +692,3 @@ class PublicUploadFiles(UploadFiles):
the restrictions on calculations. This is potentially a long running operation.
"""
pass
def delete(self):
assert False, 'cannot delete public upload'
......@@ -89,7 +89,7 @@ def assert_processing(upload: Upload, mocksearch=None):
assert len(upload.errors) == 0
assert upload.status == 'SUCCESS'
upload_files = UploadFiles.get(upload.upload_id, public_only=False)
upload_files = UploadFiles.get(upload.upload_id, is_authorized=lambda: True)
assert isinstance(upload_files, StagingUploadFiles)