Commit 980a95ca authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added raw file api endpoint for queries.

parent 2a23c4ff
Pipeline #53918 passed with stages
in 16 minutes and 54 seconds
......@@ -16,7 +16,7 @@
The raw API of the nomad@FAIRDI APIs. Can be used to retrieve raw calculation files.
"""
from typing import IO, Any, Union
from typing import IO, Any, Union, Iterable, Tuple
import os.path
import zipstream
from flask import Response, request, send_file, stream_with_context
......@@ -25,12 +25,14 @@ import magic
import sys
import contextlib
from nomad import search
from nomad.files import UploadFiles, Restricted
from nomad.processing import Calc
from .app import api
from .auth import login_if_available, create_authorization_predicate, \
signature_token_argument, with_signature_token
from .repo import search_request_parser, create_search_kwargs
if sys.version_info >= (3, 7):
import zipfile
......@@ -343,6 +345,45 @@ class RawFilesResource(Resource):
return respond_to_get_raw_files(upload_id, files, compress)
@ns.route('/query')
class RawFileQueryResource(Resource):
@api.doc('raw_files_from_query')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
@api.expect(search_request_parser, validate=True)
@api.response(200, 'File(s) send', headers={'Content-Type': 'application/gz'})
@login_if_available
def get(self):
"""
Download a .zip file with all raw-files for all entries that match the given
search parameters. See ``/repo`` endpoint for documentation on the search
parameters.
Zip files are streamed; instead of 401 errors, the zip file will just not contain
any files that the user is not authorized to access.
"""
search_kwargs = create_search_kwargs()
calcs = sorted([
(entry['upload_id'], entry['mainfile'])
for entry in search.entry_scan(**search_kwargs)], key=lambda x: x[0])
def generator():
for upload_id, mainfile in calcs:
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
pass # this should not happen, TODO log error
if hasattr(upload_files, 'zipfile_cache'):
zipfile_cache = upload_files.zipfile_cache()
else:
zipfile_cache = contextlib.suppress()
with zipfile_cache:
for filename in list(upload_files.raw_file_manifest(path_prefix=os.path.dirname(mainfile))):
yield filename, upload_files
return _streamed_zipfile(generator(), zipfile_name='nomad_raw_files.zip')
def respond_to_get_raw_files(upload_id, files, compress=False):
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
......@@ -357,40 +398,48 @@ def respond_to_get_raw_files(upload_id, files, compress=False):
zipfile_cache = contextlib.suppress()
with zipfile_cache:
def generator():
""" Stream a zip file with all files using zipstream. """
def iterator():
"""
Replace the directory based iter of zipstream with an iter over all given
files.
"""
for filename in files:
# Write a file to the zipstream.
try:
with upload_files.raw_file(filename, 'rb') as f:
def iter_content():
while True:
data = f.read(100000)
if not data:
break
yield data
yield dict(arcname=filename, iterable=iter_content())
except KeyError:
# files that are not found, will not be returned
pass
except Restricted:
# due to the streaming nature, we cannot raise 401 here
# we just leave it out in the download
pass
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True)
zip_stream.paths_to_write = iterator()
for chunk in zip_stream:
yield chunk
return _streamed_zipfile(
[(filename, upload_files) for filename in files],
zipfile_name='%s.zip' % upload_id, compress=compress)
def _streamed_zipfile(
files: Iterable[Tuple[str, UploadFiles]], zipfile_name: str, compress: bool = False):
def generator():
""" Stream a zip file with all files using zipstream. """
def iterator():
"""
Replace the directory based iter of zipstream with an iter over all given
files.
"""
for filename, upload_files in files:
# Write a file to the zipstream.
try:
with upload_files.raw_file(filename, 'rb') as f:
def iter_content():
while True:
data = f.read(100000)
if not data:
break
yield data
yield dict(arcname=filename, iterable=iter_content())
except KeyError:
# files that are not found, will not be returned
pass
except Restricted:
# due to the streaming nature, we cannot raise 401 here
# we just leave it out in the download
pass
compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED
zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True)
zip_stream.paths_to_write = iterator()
for chunk in zip_stream:
yield chunk
response = Response(stream_with_context(generator()), mimetype='application/zip')
response.headers['Content-Disposition'] = 'attachment; filename={}'.format('%s.zip' % upload_id)
response.headers['Content-Disposition'] = 'attachment; filename={}'.format(zipfile_name)
return response
......@@ -118,7 +118,11 @@ repo_request_parser.add_argument(
'Possible values are %s.' % ', '.join(search.metrics_names)))
def create_owner_query():
search_request_parser = api.parser()
add_common_parameters(search_request_parser)
def _create_owner_query():
owner = request.args.get('owner', 'all')
# TODO this should be removed after migration
......@@ -149,10 +153,14 @@ def create_owner_query():
else:
abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
# TODO this should be removed after migration
without_currupted_mainfile = ~Q('term', code_name='currupted mainfile') # pylint: disable=invalid-unary-operand-type
q = q & without_currupted_mainfile if q is not None else without_currupted_mainfile
return q
def create_search_parameters():
def _create_search_parameters():
""" Helper that creates a request.args dict with isolated search parameters """
return {
key: request.args.getlist(key) if search.search_quantities[key] else request.args.get(key)
......@@ -160,6 +168,28 @@ def create_search_parameters():
if key in search.search_quantities}
def _create_time_range():
from_time_str = request.args.get('from_time', None)
until_time_str = request.args.get('until_time', None)
try:
if from_time_str is None and until_time_str is None:
return None
else:
from_time = rfc3339DateTime.parse('2000-01-01' if from_time_str is None else from_time_str)
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.utcnow()
return from_time, until_time
except Exception:
abort(400, message='bad datetime format')
def create_search_kwargs():
return dict(
q=_create_owner_query(),
time_range=_create_time_range(),
search_parameters=_create_search_parameters())
@ns.route('/')
class RepoCalcsResource(Resource):
@api.doc('search')
......@@ -207,20 +237,10 @@ class RepoCalcsResource(Resource):
per_page = int(request.args.get('per_page', 10 if not scroll else 1000))
order = int(request.args.get('order', -1))
metrics: List[str] = request.args.getlist('metrics')
from_time_str = request.args.get('from_time', None)
until_time_str = request.args.get('until_time', None)
except Exception:
abort(400, message='bad parameter types')
try:
if from_time_str is None and until_time_str is None:
time_range = None
else:
from_time = rfc3339DateTime.parse('2000-01-01' if from_time_str is None else from_time_str)
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.utcnow()
time_range = (from_time, until_time)
except Exception:
abort(400, message='bad datetime format')
search_kwargs = create_search_kwargs()
order_by = request.args.get('order_by', 'formula')
......@@ -237,24 +257,16 @@ class RepoCalcsResource(Resource):
if metric not in search.metrics_names:
abort(400, message='there is not metric %s' % metric)
q = create_owner_query()
# TODO this should be removed after migration
without_currupted_mainfile = ~Q('term', code_name='currupted mainfile') # pylint: disable=invalid-unary-operand-type
q = q & without_currupted_mainfile if q is not None else without_currupted_mainfile
search_parameters = create_search_parameters()
try:
if scroll:
results = search.scroll_search(
q=q, scroll_id=scroll_id, size=per_page, search_parameters=search_parameters)
scroll_id=scroll_id, size=per_page, **search_kwargs)
else:
results = search.metrics_search(
q=q, per_page=per_page, page=page, order=order, order_by=order_by,
time_range=time_range, metrics_to_use=metrics, search_parameters=search_parameters,
with_date_histogram=date_histogram)
per_page=per_page, page=page, order=order, order_by=order_by,
metrics_to_use=metrics,
with_date_histogram=date_histogram, **search_kwargs)
# TODO just a work around to make things prettier
quantities = results['quantities']
......@@ -328,8 +340,8 @@ class RepoQuantityResource(Resource):
except AssertionError:
abort(400, message='invalid size')
q = create_owner_query()
search_parameters = create_search_parameters()
q = _create_owner_query()
search_parameters = _create_search_parameters()
try:
results = search.quantity_search(
......
......@@ -321,7 +321,9 @@ def _execute_paginated_search(
def scroll_search(
scroll_id: str = None, size: int = 1000, scroll: str = u'5m',
q: Q = None, search_parameters: Dict[str, Any] = {}) -> Dict[str, Any]:
q: Q = None,
time_range: Tuple[datetime, datetime] = None,
search_parameters: Dict[str, Any] = {}) -> Dict[str, Any]:
"""
Alternative search based on ES scroll API. Can be used similar to
:func:`aggregate_search`, but pagination is replaced with scrolling, no ordering,
......@@ -341,6 +343,9 @@ def scroll_search(
size: The batch size in number of hits.
scroll: The time the scroll should be kept alive (i.e. the time between requests
to this method) in ES time units. Default is 5 minutes.
time_range: A tuple to filter for uploads within with start, end ``upload_time``.
search_parameters: Adds a ``and`` search for each key, value pair. Where the key corresponds
to a quantity and the value is the value to search for in this quantity.
Returns:
A dict with keys 'scroll' and 'results'. The key 'scroll' holds a dict with
......@@ -350,7 +355,7 @@ def scroll_search(
if scroll_id is None:
# initiate scroll
search = _construct_search(q, search_parameters=search_parameters)
search = _construct_search(q, time_range, search_parameters=search_parameters)
resp = es.search(body=search.to_dict(), scroll=scroll, size=size, index=config.elastic.index_name) # pylint: disable=E1123
scroll_id = resp.get('_scroll_id')
......@@ -419,6 +424,15 @@ def entry_search(
return results
def entry_scan(**kwargs):
"""
Like fund:`entry_search` put directly generates results without pagination.
"""
search = _construct_search(**kwargs)
for hit in search.scan():
yield hit.to_dict()
def quantity_search(
quantities: Dict[str, Any], with_entries: bool = True, size: int = 100,
**kwargs) -> Dict[str, Any]:
......
......@@ -1030,6 +1030,26 @@ class TestRaw(UploadFilesBasedTests):
assert zip_file.testzip() is None
assert len(zip_file.namelist()) == len(example_file_contents)
def test_raw_files_from_query(self, client, non_empty_processed, test_user_auth):
url = '/raw/query?upload_id=%s' % non_empty_processed.upload_id
rv = client.get(url, headers=test_user_auth)
assert rv.status_code == 200
assert len(rv.data) > 0
with zipfile.ZipFile(io.BytesIO(rv.data)) as zip_file:
assert zip_file.testzip() is None
assert len(zip_file.namelist()) == len(example_file_contents)
def test_raw_files_from_empty_query(self, client, elastic):
url = '/raw/query?upload_id=doesNotExist'
rv = client.get(url)
assert rv.status_code == 200
assert len(rv.data) > 0
with zipfile.ZipFile(io.BytesIO(rv.data)) as zip_file:
assert zip_file.testzip() is None
assert len(zip_file.namelist()) == 0
@UploadFilesBasedTests.ignore_authorization
def test_raw_files_signed(self, client, upload, _, test_user_signature_token):
url = '/raw/%s?files=%s&token=%s' % (
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment