diff --git a/nomad/api/raw.py b/nomad/api/raw.py index d83100f2096e02cfdcb6ae0e45ac4c1f2a1d63d6..4096ff669dbde82504081b2f70e238406420e0f7 100644 --- a/nomad/api/raw.py +++ b/nomad/api/raw.py @@ -16,7 +16,7 @@ The raw API of the nomad@FAIRDI APIs. Can be used to retrieve raw calculation files. """ -from typing import IO, Any, Union +from typing import IO, Any, Union, Iterable, Tuple import os.path import zipstream from flask import Response, request, send_file, stream_with_context @@ -25,12 +25,14 @@ import magic import sys import contextlib +from nomad import search from nomad.files import UploadFiles, Restricted from nomad.processing import Calc from .app import api from .auth import login_if_available, create_authorization_predicate, \ signature_token_argument, with_signature_token +from .repo import search_request_parser, create_search_kwargs if sys.version_info >= (3, 7): import zipfile @@ -343,6 +345,45 @@ class RawFilesResource(Resource): return respond_to_get_raw_files(upload_id, files, compress) +@ns.route('/query') +class RawFileQueryResource(Resource): + @api.doc('raw_files_from_query') + @api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters') + @api.expect(search_request_parser, validate=True) + @api.response(200, 'File(s) send', headers={'Content-Type': 'application/gz'}) + @login_if_available + def get(self): + """ + Download a .zip file with all raw-files for all entries that match the given + search parameters. See ``/repo`` endpoint for documentation on the search + parameters. + Zip files are streamed; instead of 401 errors, the zip file will just not contain + any files that the user is not authorized to access. + """ + search_kwargs = create_search_kwargs() + calcs = sorted([ + (entry['upload_id'], entry['mainfile']) + for entry in search.entry_scan(**search_kwargs)], key=lambda x: x[0]) + + def generator(): + for upload_id, mainfile in calcs: + upload_files = UploadFiles.get( + upload_id, create_authorization_predicate(upload_id)) + if upload_files is None: + pass # this should not happen, TODO log error + + if hasattr(upload_files, 'zipfile_cache'): + zipfile_cache = upload_files.zipfile_cache() + else: + zipfile_cache = contextlib.suppress() + + with zipfile_cache: + for filename in list(upload_files.raw_file_manifest(path_prefix=os.path.dirname(mainfile))): + yield filename, upload_files + + return _streamed_zipfile(generator(), zipfile_name='nomad_raw_files.zip') + + def respond_to_get_raw_files(upload_id, files, compress=False): upload_files = UploadFiles.get( upload_id, create_authorization_predicate(upload_id)) @@ -357,40 +398,48 @@ def respond_to_get_raw_files(upload_id, files, compress=False): zipfile_cache = contextlib.suppress() with zipfile_cache: - def generator(): - """ Stream a zip file with all files using zipstream. """ - def iterator(): - """ - Replace the directory based iter of zipstream with an iter over all given - files. - """ - for filename in files: - # Write a file to the zipstream. - try: - with upload_files.raw_file(filename, 'rb') as f: - def iter_content(): - while True: - data = f.read(100000) - if not data: - break - yield data - - yield dict(arcname=filename, iterable=iter_content()) - except KeyError: - # files that are not found, will not be returned - pass - except Restricted: - # due to the streaming nature, we cannot raise 401 here - # we just leave it out in the download - pass - - compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED - zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True) - zip_stream.paths_to_write = iterator() - - for chunk in zip_stream: - yield chunk + return _streamed_zipfile( + [(filename, upload_files) for filename in files], + zipfile_name='%s.zip' % upload_id, compress=compress) + + +def _streamed_zipfile( + files: Iterable[Tuple[str, UploadFiles]], zipfile_name: str, compress: bool = False): + + def generator(): + """ Stream a zip file with all files using zipstream. """ + def iterator(): + """ + Replace the directory based iter of zipstream with an iter over all given + files. + """ + for filename, upload_files in files: + # Write a file to the zipstream. + try: + with upload_files.raw_file(filename, 'rb') as f: + def iter_content(): + while True: + data = f.read(100000) + if not data: + break + yield data + + yield dict(arcname=filename, iterable=iter_content()) + except KeyError: + # files that are not found, will not be returned + pass + except Restricted: + # due to the streaming nature, we cannot raise 401 here + # we just leave it out in the download + pass + + compression = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED + zip_stream = zipstream.ZipFile(mode='w', compression=compression, allowZip64=True) + zip_stream.paths_to_write = iterator() + + for chunk in zip_stream: + yield chunk response = Response(stream_with_context(generator()), mimetype='application/zip') - response.headers['Content-Disposition'] = 'attachment; filename={}'.format('%s.zip' % upload_id) + response.headers['Content-Disposition'] = 'attachment; filename={}'.format(zipfile_name) return response diff --git a/nomad/api/repo.py b/nomad/api/repo.py index 00660e6304bb09f658b25a244e7152b079131be5..e0914f748f55f31390b82736ee976aeef174bd26 100644 --- a/nomad/api/repo.py +++ b/nomad/api/repo.py @@ -118,7 +118,11 @@ repo_request_parser.add_argument( 'Possible values are %s.' % ', '.join(search.metrics_names))) -def create_owner_query(): +search_request_parser = api.parser() +add_common_parameters(search_request_parser) + + +def _create_owner_query(): owner = request.args.get('owner', 'all') # TODO this should be removed after migration @@ -149,10 +153,14 @@ def create_owner_query(): else: abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all') + # TODO this should be removed after migration + without_currupted_mainfile = ~Q('term', code_name='currupted mainfile') # pylint: disable=invalid-unary-operand-type + q = q & without_currupted_mainfile if q is not None else without_currupted_mainfile + return q -def create_search_parameters(): +def _create_search_parameters(): """ Helper that creates a request.args dict with isolated search parameters """ return { key: request.args.getlist(key) if search.search_quantities[key] else request.args.get(key) @@ -160,6 +168,28 @@ def create_search_parameters(): if key in search.search_quantities} +def _create_time_range(): + from_time_str = request.args.get('from_time', None) + until_time_str = request.args.get('until_time', None) + + try: + if from_time_str is None and until_time_str is None: + return None + else: + from_time = rfc3339DateTime.parse('2000-01-01' if from_time_str is None else from_time_str) + until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.utcnow() + return from_time, until_time + except Exception: + abort(400, message='bad datetime format') + + +def create_search_kwargs(): + return dict( + q=_create_owner_query(), + time_range=_create_time_range(), + search_parameters=_create_search_parameters()) + + @ns.route('/') class RepoCalcsResource(Resource): @api.doc('search') @@ -207,20 +237,10 @@ class RepoCalcsResource(Resource): per_page = int(request.args.get('per_page', 10 if not scroll else 1000)) order = int(request.args.get('order', -1)) metrics: List[str] = request.args.getlist('metrics') - from_time_str = request.args.get('from_time', None) - until_time_str = request.args.get('until_time', None) except Exception: abort(400, message='bad parameter types') - try: - if from_time_str is None and until_time_str is None: - time_range = None - else: - from_time = rfc3339DateTime.parse('2000-01-01' if from_time_str is None else from_time_str) - until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.utcnow() - time_range = (from_time, until_time) - except Exception: - abort(400, message='bad datetime format') + search_kwargs = create_search_kwargs() order_by = request.args.get('order_by', 'formula') @@ -237,24 +257,16 @@ class RepoCalcsResource(Resource): if metric not in search.metrics_names: abort(400, message='there is not metric %s' % metric) - q = create_owner_query() - - # TODO this should be removed after migration - without_currupted_mainfile = ~Q('term', code_name='currupted mainfile') # pylint: disable=invalid-unary-operand-type - q = q & without_currupted_mainfile if q is not None else without_currupted_mainfile - - search_parameters = create_search_parameters() - try: if scroll: results = search.scroll_search( - q=q, scroll_id=scroll_id, size=per_page, search_parameters=search_parameters) + scroll_id=scroll_id, size=per_page, **search_kwargs) else: results = search.metrics_search( - q=q, per_page=per_page, page=page, order=order, order_by=order_by, - time_range=time_range, metrics_to_use=metrics, search_parameters=search_parameters, - with_date_histogram=date_histogram) + per_page=per_page, page=page, order=order, order_by=order_by, + metrics_to_use=metrics, + with_date_histogram=date_histogram, **search_kwargs) # TODO just a work around to make things prettier quantities = results['quantities'] @@ -328,8 +340,8 @@ class RepoQuantityResource(Resource): except AssertionError: abort(400, message='invalid size') - q = create_owner_query() - search_parameters = create_search_parameters() + q = _create_owner_query() + search_parameters = _create_search_parameters() try: results = search.quantity_search( diff --git a/nomad/search.py b/nomad/search.py index 27b81169861325cd9f4b34dc465a9a9b69aaceef..8413b0e2cf720126dc62c4d6dbc7e71a42ee8476 100644 --- a/nomad/search.py +++ b/nomad/search.py @@ -321,7 +321,9 @@ def _execute_paginated_search( def scroll_search( scroll_id: str = None, size: int = 1000, scroll: str = u'5m', - q: Q = None, search_parameters: Dict[str, Any] = {}) -> Dict[str, Any]: + q: Q = None, + time_range: Tuple[datetime, datetime] = None, + search_parameters: Dict[str, Any] = {}) -> Dict[str, Any]: """ Alternative search based on ES scroll API. Can be used similar to :func:`aggregate_search`, but pagination is replaced with scrolling, no ordering, @@ -341,6 +343,9 @@ def scroll_search( size: The batch size in number of hits. scroll: The time the scroll should be kept alive (i.e. the time between requests to this method) in ES time units. Default is 5 minutes. + time_range: A tuple to filter for uploads within with start, end ``upload_time``. + search_parameters: Adds a ``and`` search for each key, value pair. Where the key corresponds + to a quantity and the value is the value to search for in this quantity. Returns: A dict with keys 'scroll' and 'results'. The key 'scroll' holds a dict with @@ -350,7 +355,7 @@ def scroll_search( if scroll_id is None: # initiate scroll - search = _construct_search(q, search_parameters=search_parameters) + search = _construct_search(q, time_range, search_parameters=search_parameters) resp = es.search(body=search.to_dict(), scroll=scroll, size=size, index=config.elastic.index_name) # pylint: disable=E1123 scroll_id = resp.get('_scroll_id') @@ -419,6 +424,15 @@ def entry_search( return results +def entry_scan(**kwargs): + """ + Like fund:`entry_search` put directly generates results without pagination. + """ + search = _construct_search(**kwargs) + for hit in search.scan(): + yield hit.to_dict() + + def quantity_search( quantities: Dict[str, Any], with_entries: bool = True, size: int = 100, **kwargs) -> Dict[str, Any]: diff --git a/tests/test_api.py b/tests/test_api.py index dfe8d18f237432d4303653754b77fa3b0ae47505..53ccaf807a1f93726f26df48caedb57a84688448 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1030,6 +1030,26 @@ class TestRaw(UploadFilesBasedTests): assert zip_file.testzip() is None assert len(zip_file.namelist()) == len(example_file_contents) + def test_raw_files_from_query(self, client, non_empty_processed, test_user_auth): + url = '/raw/query?upload_id=%s' % non_empty_processed.upload_id + rv = client.get(url, headers=test_user_auth) + + assert rv.status_code == 200 + assert len(rv.data) > 0 + with zipfile.ZipFile(io.BytesIO(rv.data)) as zip_file: + assert zip_file.testzip() is None + assert len(zip_file.namelist()) == len(example_file_contents) + + def test_raw_files_from_empty_query(self, client, elastic): + url = '/raw/query?upload_id=doesNotExist' + rv = client.get(url) + + assert rv.status_code == 200 + assert len(rv.data) > 0 + with zipfile.ZipFile(io.BytesIO(rv.data)) as zip_file: + assert zip_file.testzip() is None + assert len(zip_file.namelist()) == 0 + @UploadFilesBasedTests.ignore_authorization def test_raw_files_signed(self, client, upload, _, test_user_signature_token): url = '/raw/%s?files=%s&token=%s' % (