Commit 7b6d482b authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Larger and configurable elastic scroll window for raw/archive download.

parent 420e3dcf
Pipeline #67512 passed with stages
in 28 minutes and 42 seconds
......@@ -24,11 +24,13 @@ from flask import send_file, request
from flask_restplus import abort, Resource, fields
import json
import importlib
import urllib.parse
from elasticsearch.helpers import ScanError
import nomad_meta_info
from nomad.files import UploadFiles, Restricted
from nomad import utils, search
from nomad import utils, search, config
from .auth import authenticate, create_authorization_predicate
from .api import api
......@@ -146,37 +148,45 @@ class ArchiveDownloadResource(Resource):
search_request = search.SearchRequest()
apply_search_parameters(search_request, args)
calcs = search_request.execute_scan(order_by='upload_id')
calcs = search_request.execute_scan(
order_by='upload_id',
size=config.services.download_scan_size,
scroll=config.services.download_scan_timeout)
def generator():
manifest = {}
upload_files = None
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
utils.get_logger(__name__).error('upload files do not exist', upload_id=upload_id)
continue
upload_files.open_zipfile_cache()
yield (
'%s.%s' % (calc_id, upload_files._archive_ext), calc_id,
lambda calc_id: upload_files.archive_file(calc_id, 'rb'),
lambda calc_id: upload_files.archive_file_size(calc_id))
manifest[calc_id] = {
key: entry[key]
for key in ArchiveDownloadResource.manifest_quantities
if entry.get(key) is not None
}
try:
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
utils.get_logger(__name__).error('upload files do not exist', upload_id=upload_id)
continue
upload_files.open_zipfile_cache()
yield (
'%s.%s' % (calc_id, upload_files._archive_ext), calc_id,
lambda calc_id: upload_files.archive_file(calc_id, 'rb'),
lambda calc_id: upload_files.archive_file_size(calc_id))
manifest[calc_id] = {
key: entry[key]
for key in ArchiveDownloadResource.manifest_quantities
if entry.get(key) is not None
}
except ScanError:
utils.get_logger(__name__).warning(
'scan error while streaming archive data from query',
query=urllib.parse.urlencode(request.args, doseq=True))
if upload_files is not None:
upload_files.close_zipfile_cache()
......@@ -224,7 +234,7 @@ class ArchiveQueryResource(Resource):
@api.response(200, 'Archive data send')
@api.expect(_archive_query_parser, validate=True)
@api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Search results sent')
@authenticate(signature_token=True)
@authenticate()
def get(self):
"""
Get archive data in json format from all query results.
......
......@@ -27,6 +27,7 @@ import json
import gzip
import lzma
import urllib.parse
from elasticsearch.helpers import ScanError
from nomad import search, utils, config
from nomad.files import UploadFiles, Restricted
......@@ -422,7 +423,10 @@ class RawFileQueryResource(Resource):
def path(entry):
return '%s/%s' % (entry['upload_id'], entry['mainfile'])
calcs = search_request.execute_scan(order_by='upload_id')
calcs = search_request.execute_scan(
order_by='upload_id',
size=config.services.download_scan_size,
scroll=config.services.download_scan_timeout)
if strip:
if search_request.execute()['total'] > config.raw_file_strip_cutoff:
......@@ -436,40 +440,45 @@ class RawFileQueryResource(Resource):
def generator():
manifest = {}
upload_files = None
for entry in calcs:
upload_id = entry['upload_id']
mainfile = entry['mainfile']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
utils.get_logger(__name__).error('upload files do not exist', upload_id=upload_id)
continue
upload_files.open_zipfile_cache()
filenames = upload_files.raw_file_manifest(path_prefix=os.path.dirname(mainfile))
for filename in filenames:
filename_w_upload = os.path.join(upload_files.upload_id, filename)
filename_wo_prefix = filename_w_upload[common_prefix_len:]
if len(patterns) == 0 or any(
fnmatch.fnmatchcase(os.path.basename(filename_wo_prefix), pattern)
for pattern in patterns):
yield (
filename_wo_prefix, filename,
lambda upload_filename: upload_files.raw_file(upload_filename, 'rb'),
lambda upload_filename: upload_files.raw_file_size(upload_filename))
manifest[path(entry)] = {
key: entry[key]
for key in RawFileQueryResource.manifest_quantities
if entry.get(key) is not None
}
try:
for entry in calcs:
upload_id = entry['upload_id']
mainfile = entry['mainfile']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
utils.get_logger(__name__).error('upload files do not exist', upload_id=upload_id)
continue
upload_files.open_zipfile_cache()
filenames = upload_files.raw_file_manifest(path_prefix=os.path.dirname(mainfile))
for filename in filenames:
filename_w_upload = os.path.join(upload_files.upload_id, filename)
filename_wo_prefix = filename_w_upload[common_prefix_len:]
if len(patterns) == 0 or any(
fnmatch.fnmatchcase(os.path.basename(filename_wo_prefix), pattern)
for pattern in patterns):
yield (
filename_wo_prefix, filename,
lambda upload_filename: upload_files.raw_file(upload_filename, 'rb'),
lambda upload_filename: upload_files.raw_file_size(upload_filename))
manifest[path(entry)] = {
key: entry[key]
for key in RawFileQueryResource.manifest_quantities
if entry.get(key) is not None
}
except ScanError:
utils.get_logger(__name__).warning(
'scan error while streaming raw data from query',
query=urllib.parse.urlencode(request.args, doseq=True))
if upload_files is not None:
upload_files.close_zipfile_cache()
......
......@@ -145,7 +145,9 @@ services = NomadConfig(
unavailable_value='unavailable',
https=False,
upload_limit=10,
force_raw_file_decoding=False
force_raw_file_decoding=False,
download_scan_size=500,
download_scan_timeout=u'30m'
)
tests = NomadConfig(
......
......@@ -532,7 +532,7 @@ class SearchRequest:
"""
return self._response(self._search.query(self.q)[0:0].execute())
def execute_scan(self, order_by: str = None, order: int = -1):
def execute_scan(self, order_by: str = None, order: int = -1, **kwargs):
"""
This execute the search as scan. The result will be a generator over the found
entries. Everything but the query part of this object, will be ignored.
......@@ -550,6 +550,7 @@ class SearchRequest:
else:
search = search.sort('-%s' % order_by_quantity.elastic_field)
search.params(**kwargs)
for hit in search.scan():
yield hit.to_dict()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment