Commit 813048e8 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added query-based archive api.

parent d933bf44
......@@ -23,14 +23,17 @@ from flask import send_file
from flask_restplus import abort, Resource
import json
import importlib
import contextlib
import nomad_meta_info
from nomad.files import UploadFiles, Restricted
from nomad import utils, search
from .auth import authenticate, create_authorization_predicate
from .api import api
from .common import calc_route
from .repo import search_request_parser, add_query
from .common import calc_route, streamed_zipfile
ns = api.namespace(
'archive',
......@@ -105,6 +108,87 @@ class ArchiveCalcResource(Resource):
abort(404, message='Calculation %s does not exist.' % archive_id)
archives_from_query_parser = search_request_parser.copy()
archives_from_query_parser.add_argument(
name='compress', type=bool, help='Use compression on .zip files, default is not.',
location='args')
@ns.route('/query')
class ArchiveQueryResource(Resource):
manifest_quantities = ['upload_id', 'calc_id', 'external_id', 'raw_id', 'pid', 'calc_hash']
@api.doc('archives_from_query')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
@api.expect(archives_from_query_parser, validate=True)
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
@authenticate(signature_token=True)
def get(self):
"""
Get calculation data in archive form from all query results.
See ``/repo`` endpoint for documentation on the search
parameters.
Zip files are streamed; instead of 401 errors, the zip file will just not contain
any files that the user is not authorized to access.
The zip file will contain a ``manifest.json`` with the repository meta data.
"""
try:
args = archives_from_query_parser.parse_args()
compress = args.get('compress', False)
except Exception:
abort(400, message='bad parameter types')
search_request = search.SearchRequest()
add_query(search_request, search_request_parser.parse_args())
calcs = sorted(
[entry for entry in search_request.execute_scan()],
key=lambda x: x['upload_id'])
def generator():
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
utils.get_logger(__name__).error('upload files do not exist', upload_id=upload_id)
continue
if hasattr(upload_files, 'zipfile_cache'):
zipfile_cache = upload_files.zipfile_cache()
else:
zipfile_cache = contextlib.suppress()
with zipfile_cache:
yield (
calc_id, calc_id,
lambda calc_id: upload_files.archive_file(calc_id, 'rb'),
lambda calc_id: upload_files.archive_file_size(calc_id))
try:
manifest = {
entry['calc_id']: {
key: entry[key]
for key in ArchiveQueryResource.manifest_quantities
if entry.get(key) is not None
}
for entry in calcs
}
manifest_contents = json.dumps(manifest)
except Exception as e:
manifest_contents = dict(error='Could not create the manifest: %s' % (e))
utils.get_logger(__name__).error(
'could not create raw query manifest', exc_info=e)
return streamed_zipfile(
generator(), zipfile_name='nomad_archive.zip', compress=compress,
manifest=manifest_contents)
@ns.route('/metainfo/<string:metainfo_package_name>')
@api.doc(params=dict(metainfo_package_name='The name of the metainfo package.'))
class MetainfoResource(Resource):
......
......@@ -15,14 +15,24 @@
"""
Common data, variables, decorators, models used throughout the API.
"""
from typing import Callable, IO, Set, Tuple, Iterable
from flask_restplus import fields
import zipstream
from flask import stream_with_context, Response
import sys
from nomad.app.utils import RFC3339DateTime
from nomad.files import Restricted
from .api import api
if sys.version_info >= (3, 7):
import zipfile
else:
import zipfile37 as zipfile
metadata_model = api.model('MetaData', {
'with_embargo': fields.Boolean(default=False, description='Data with embargo is only visible to the upload until the embargo period ended.'),
'comment': fields.String(description='The comment are shown in the repository for each calculation.'),
......@@ -76,3 +86,75 @@ def upload_route(ns, prefix: str = ''):
})(func)
)
return decorator
def streamed_zipfile(
files: Iterable[Tuple[str, str, Callable[[str], IO], Callable[[str], int]]],
zipfile_name: str, compress: bool = False, manifest: str = None):
"""
Creates a response that streams the given files as a streamed zip file. Ensures that
each given file is only streamed once, based on its filename in the resulting zipfile.
Arguments:
files: An iterable of tuples with the filename to be used in the resulting zipfile,
an file id within the upload, a callable that gives an binary IO object for the
file id, and a callable that gives the file size for the file id.
zipfile_name: A name that will be used in the content disposition attachment
used as an HTTP respone.
compress: Uses compression. Default is stored only.
manifest: Add a ``manifest.json`` with the given content.
"""
streamed_files: Set[str] = set()
def generator():
""" Stream a zip file with all files using zipstream. """
def iterator():
"""
Replace the directory based iter of zipstream with an iter over all given
files.
"""
# first the manifest
if manifest is not None:
yield dict(arcname='manifest.json', iterable=(manifest.encode('utf-8'),))
# now the actual contents
for zipped_filename, file_id, open_io, file_size in files:
if zipped_filename in streamed_files:
continue
streamed_files.add(zipped_filename)
# Write a file to the zipstream.
try:
f = open_io(file_id)
try:
def iter_content():
while True:
data = f.read(1024 * 64)
if not data:
break
yield data
yield dict(
arcname=zipped_filename, iterable=iter_content(),
buffer_size=file_size(file_id))
finally:
f.close()
except KeyError:
# files that are not found, will not be returned
pass
except Restricted:
# due to the streaming nature, we cannot raise 401 here
# we just leave it out in the download
pass
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True)
zip_stream.paths_to_write = iterator()
for chunk in zip_stream:
yield chunk
response = Response(stream_with_context(generator()), mimetype='application/zip')
response.headers['Content-Disposition'] = 'attachment; filename={}'.format(zipfile_name)
return response
......@@ -16,13 +16,11 @@
The raw API of the nomad@FAIRDI APIs. Can be used to retrieve raw calculation files.
"""
from typing import IO, Any, Union, Iterable, Tuple, Set, List
from typing import IO, Any, Union, Iterable, Tuple, List
import os.path
import zipstream
from flask import Response, request, send_file, stream_with_context
from flask import request, send_file
from flask_restplus import abort, Resource, fields
import magic
import sys
import contextlib
import fnmatch
import json
......@@ -36,11 +34,7 @@ from nomad.processing import Calc
from .api import api
from .auth import authenticate, create_authorization_predicate
from .repo import search_request_parser, add_query
if sys.version_info >= (3, 7):
import zipfile
else:
import zipfile37 as zipfile
from .common import streamed_zipfile
ns = api.namespace('raw', description='Downloading raw data files.')
......@@ -328,7 +322,7 @@ raw_files_request_parser.add_argument(**raw_file_compress_argument)
class RawFilesResource(Resource):
@api.doc('get_files')
@api.response(404, 'The upload or path does not exist')
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/gz'})
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
@api.expect(raw_files_request_model, validate=True)
@authenticate()
def post(self, upload_id):
......@@ -345,7 +339,7 @@ class RawFilesResource(Resource):
@api.doc('get_files_alternate')
@api.response(404, 'The upload or path does not exist')
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/gz'})
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
@api.expect(raw_files_request_parser, validate=True)
@authenticate(signature_token=True)
def get(self, upload_id):
......@@ -387,7 +381,7 @@ class RawFileQueryResource(Resource):
@api.doc('raw_files_from_query')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
@api.expect(raw_file_from_query_parser, validate=True)
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/gz'})
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/zip'})
@authenticate(signature_token=True)
def get(self):
""" Download a .zip file with all raw-files for all entries that match the given
......@@ -504,8 +498,7 @@ def respond_to_get_raw_files(upload_id, files, compress=False, strip=False):
def _streamed_zipfile(
files: Iterable[Tuple[str, str, UploadFiles]], zipfile_name: str,
compress: bool = False, manifest: str = None):
files: Iterable[Tuple[str, str, UploadFiles]], **kwargs):
"""
Creates a response that streams the given files as a streamed zip file. Ensures that
each given file is only streamed once, based on its filename in the resulting zipfile.
......@@ -514,59 +507,13 @@ def _streamed_zipfile(
files: An iterable of tuples with the filename to be used in the resulting zipfile,
the filename within the upload, the :class:`UploadFiles` that contains
the file.
zipfile_name: A name that will be used in the content disposition attachment
used as an HTTP respone.
compress: Uses compression. Default is stored only.
manifest: Add a ``manifest.json`` with the given content.
**kwargs: See :func:`streamed_zipfile`
"""
streamed_files: Set[str] = set()
def generator():
""" Stream a zip file with all files using zipstream. """
def iterator():
"""
Replace the directory based iter of zipstream with an iter over all given
files.
"""
# first the manifest
if manifest is not None:
yield dict(arcname='manifest.json', iterable=(manifest.encode('utf-8'),))
# now the actual contents
for zipped_filename, upload_filename, upload_files in files:
if zipped_filename in streamed_files:
continue
streamed_files.add(zipped_filename)
# Write a file to the zipstream.
try:
with upload_files.raw_file(upload_filename, 'rb') as f:
def iter_content():
while True:
data = f.read(1024 * 64)
if not data:
break
yield data
yield dict(
arcname=zipped_filename, iterable=iter_content(),
buffer_size=upload_files.raw_file_size(upload_filename))
except KeyError:
# files that are not found, will not be returned
pass
except Restricted:
# due to the streaming nature, we cannot raise 401 here
# we just leave it out in the download
pass
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True)
zip_stream.paths_to_write = iterator()
for chunk in zip_stream:
yield chunk
response = Response(stream_with_context(generator()), mimetype='application/zip')
response.headers['Content-Disposition'] = 'attachment; filename={}'.format(zipfile_name)
return response
def map(name, upload_filename, upload_files):
return (
name, upload_filename,
lambda upload_filename: upload_files.raw_file(upload_filename, 'rb'),
lambda upload_filename: upload_files.raw_file_size(upload_filename))
return streamed_zipfile([map(*item) for item in files], **kwargs)
......@@ -294,6 +294,13 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
"""
raise NotImplementedError()
def archive_file_size(self, calc_id: str, *args, **kwargs) -> IO:
"""
Returns:
The size of the archive.
"""
raise NotImplementedError()
def archive_log_file(self, calc_id: str, *args, **kwargs) -> IO:
"""
Opens a archive log file and returns a file-like objects. Additional args, kwargs are
......@@ -365,6 +372,11 @@ class StagingUploadFiles(UploadFiles):
def archive_file_object(self, calc_id: str) -> PathObject:
return self._archive_dir.join_file('%s.%s' % (calc_id, self._archive_ext))
def archive_file_size(self, calc_id: str) -> int:
if not self._is_authorized():
raise Restricted
return self._archive_dir.join_file('%s.%s' % (calc_id, self._archive_ext)).size
def archive_log_file_object(self, calc_id: str) -> PathObject:
return self._archive_dir.join_file('%s.log' % calc_id)
......@@ -836,6 +848,23 @@ class PublicUploadFiles(UploadFiles):
def archive_file(self, calc_id: str, *args, **kwargs) -> IO:
return self._file('archive', self._archive_ext, '%s.%s' % (calc_id, self._archive_ext), *args, **kwargs)
def archive_file_size(self, calc_id: str) -> int:
file_path = '%s.%s' % (calc_id, self._archive_ext)
for access in ['public', 'restricted']:
try:
zf = self.open_zip_file('archive', access, self._archive_ext)
info = zf.getinfo(file_path)
if (access == 'restricted' or always_restricted(file_path)) and not self._is_authorized():
raise Restricted
return info.file_size
except FileNotFoundError:
pass
except KeyError:
pass
raise KeyError()
def archive_log_file(self, calc_id: str, *args, **kwargs) -> IO:
return self._file('archive', self._archive_ext, '%s.log' % calc_id, *args, **kwargs)
......
......@@ -63,6 +63,25 @@ def get_upload_with_metadata(upload: dict) -> UploadWithMetadata:
for calc in upload['calcs']['results']])
def assert_zip_file(rv, files: int = -1, basename: bool = None):
assert rv.status_code == 200
assert len(rv.data) > 0
with zipfile.ZipFile(io.BytesIO(rv.data)) as zip_file:
assert zip_file.testzip() is None
zip_files = zip_file.namelist()
if files >= 0:
assert len(zip_files) == files
if basename is not None:
if basename:
assert all(
os.path.basename(name) == name
for name in zip_files if name != 'manifest.json')
else:
assert all(
os.path.basename(name) != name
for name in zip_files for name in zip_files if name != 'manifest.json')
class TestInfo:
def test_info(self, api):
rv = api.get('/info/')
......@@ -615,6 +634,37 @@ class TestArchive(UploadFilesBasedTests):
metainfo = json.loads((rv.data))
assert len(metainfo) > 0
@pytest.mark.parametrize('compress', [False, True])
def test_archive_from_query_upload_id(self, api, non_empty_processed, test_user_auth, compress):
url = '/archive/query?upload_id=%s&compress=%s' % (non_empty_processed.upload_id, 'true' if compress else 'false')
rv = api.get(url, headers=test_user_auth)
assert rv.status_code == 200
assert_zip_file(rv, files=2)
@pytest.mark.parametrize('query_params', [
{'atoms': 'Si'},
{'authors': 'Sheldon Cooper'}
])
def test_archive_from_query(self, api, processeds, test_user_auth, query_params):
url = '/archive/query?%s' % urlencode(query_params)
rv = api.get(url, headers=test_user_auth)
assert rv.status_code == 200
assert_zip_file(rv, files=len(processeds) + 1)
with zipfile.ZipFile(io.BytesIO(rv.data)) as zip_file:
with zip_file.open('manifest.json', 'r') as f:
manifest = json.load(f)
assert len(manifest) == len(processeds)
def test_archive_from_empty_query(self, api, elastic):
url = '/archive/query?upload_id=doesNotExist'
rv = api.get(url)
assert rv.status_code == 200
assert_zip_file(rv, files=1)
class TestRepo():
@pytest.fixture(scope='class')
......@@ -1298,24 +1348,6 @@ def test_edit_lift_embargo_unnecessary(api, published_wo_user_metadata, other_te
class TestRaw(UploadFilesBasedTests):
def assert_zip_file(self, rv, files: int = -1, basename: bool = None):
assert rv.status_code == 200
assert len(rv.data) > 0
with zipfile.ZipFile(io.BytesIO(rv.data)) as zip_file:
assert zip_file.testzip() is None
zip_files = zip_file.namelist()
if files >= 0:
assert len(zip_files) == files
if basename is not None:
if basename:
assert all(
os.path.basename(name) == name
for name in zip_files if name != 'manifest.json')
else:
assert all(
os.path.basename(name) != name
for name in zip_files for name in zip_files if name != 'manifest.json')
def test_raw_file_from_calc(self, api, non_empty_processed, test_user_auth):
calc = list(non_empty_processed.calcs)[0]
url = '/raw/calc/%s/%s/%s' % (
......@@ -1384,7 +1416,7 @@ class TestRaw(UploadFilesBasedTests):
rv = api.get(url, headers=auth_headers)
assert rv.status_code == 200
self.assert_zip_file(rv, files=len(example_file_contents))
assert_zip_file(rv, files=len(example_file_contents))
@UploadFilesBasedTests.ignore_authorization
def test_raw_file_wildcard_missing(self, api, upload, auth_headers):
......@@ -1410,7 +1442,7 @@ class TestRaw(UploadFilesBasedTests):
rv = api.get(url, headers=auth_headers)
assert rv.status_code == 200
self.assert_zip_file(rv, files=len(example_file_contents), basename=strip)
assert_zip_file(rv, files=len(example_file_contents), basename=strip)
@pytest.mark.parametrize('compress', [False, True])
def test_raw_files_from_query_upload_id(self, api, non_empty_processed, test_user_auth, compress):
......@@ -1418,7 +1450,7 @@ class TestRaw(UploadFilesBasedTests):
rv = api.get(url, headers=test_user_auth)
assert rv.status_code == 200
self.assert_zip_file(rv, files=len(example_file_contents) + 1)
assert_zip_file(rv, files=len(example_file_contents) + 1)
@pytest.mark.parametrize('query_params', [
{'atoms': 'Si'},
......@@ -1430,7 +1462,7 @@ class TestRaw(UploadFilesBasedTests):
rv = api.get(url, headers=test_user_auth)
assert rv.status_code == 200
self.assert_zip_file(rv, files=len(example_file_contents) * len(processeds) + 1)
assert_zip_file(rv, files=len(example_file_contents) * len(processeds) + 1)
with zipfile.ZipFile(io.BytesIO(rv.data)) as zip_file:
with zip_file.open('manifest.json', 'r') as f:
manifest = json.load(f)
......@@ -1441,7 +1473,7 @@ class TestRaw(UploadFilesBasedTests):
rv = api.get(url)
assert rv.status_code == 200
self.assert_zip_file(rv, files=1)
assert_zip_file(rv, files=1)
@pytest.mark.parametrize('files, pattern, strip', [
(1, '*.json', False),
......@@ -1454,7 +1486,7 @@ class TestRaw(UploadFilesBasedTests):
url = '/raw/query?%s' % urlencode(params, doseq=True)
rv = api.get(url, headers=test_user_auth)
assert rv.status_code == 200
self.assert_zip_file(rv, files=(files + 1), basename=strip)
assert_zip_file(rv, files=(files + 1), basename=strip)
@UploadFilesBasedTests.ignore_authorization
def test_raw_files_signed(self, api, upload, _, test_user_signature_token):
......@@ -1463,7 +1495,7 @@ class TestRaw(UploadFilesBasedTests):
rv = api.get(url)
assert rv.status_code == 200
self.assert_zip_file(rv, files=len(example_file_contents))
assert_zip_file(rv, files=len(example_file_contents))
@pytest.mark.parametrize('compress', [True, False, None])
@UploadFilesBasedTests.check_authorization
......@@ -1475,7 +1507,7 @@ class TestRaw(UploadFilesBasedTests):
rv = api.post(url, data=json.dumps(data), content_type='application/json', headers=auth_headers)
assert rv.status_code == 200
self.assert_zip_file(rv, files=len(example_file_contents))
assert_zip_file(rv, files=len(example_file_contents))
@pytest.mark.parametrize('compress', [True, False])
@UploadFilesBasedTests.ignore_authorization
......@@ -1486,7 +1518,7 @@ class TestRaw(UploadFilesBasedTests):
rv = api.get(url, headers=auth_headers)
assert rv.status_code == 200
self.assert_zip_file(rv, files=1)
assert_zip_file(rv, files=1)
@UploadFilesBasedTests.ignore_authorization
def test_raw_files_missing_upload(self, api, upload, auth_headers):
......
......@@ -220,6 +220,18 @@ class UploadFilesContract(UploadFilesFixtures):
assert not upload_files._is_authorized()
assert calc.with_embargo
def test_rawfile_size(self, test_upload: UploadWithFiles):
upload, upload_files = test_upload
for calc in upload.calcs:
try:
for file_path in calc.files:
assert upload_files.raw_file_size(file_path) > 0
if not upload_files._is_authorized():
assert not calc.with_embargo
except Restricted:
assert not upload_files._is_authorized()
assert calc.with_embargo
@pytest.mark.parametrize('prefix', [None, 'examples'])
def test_raw_file_manifest(self, test_upload: UploadWithFiles, prefix: str):
_, upload_files = test_upload
......@@ -259,6 +271,18 @@ class UploadFilesContract(UploadFilesFixtures):
assert not upload_files._is_authorized()
assert calcs.get(example_calc_id).with_embargo
def test_archive_size(self, test_upload: UploadWithFiles):
upload, upload_files = test_upload
calcs = upload.calcs_dict
try:
assert upload_files.archive_file_size(example_calc_id) > 0
if not upload_files._is_authorized():
assert not calcs.get(example_calc_id).with_embargo
except Restricted:
assert not upload_files._is_authorized()
assert calcs.get(example_calc_id).with_embargo
def create_staging_upload(upload_id: str, calc_specs: str) -> StagingUploadWithFiles:
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment