Commit d509f37e authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Archive and raw query oder via elastic.

parent 6bf2f0e6
Pipeline #65718 passed with stages
in 16 minutes and 44 seconds
......@@ -45,9 +45,7 @@ class DownloadButton extends React.Component {
async handleSelect(choice) {
const {api, query, user, raiseError} = this.props
const params = {
strip: true
}
const params = {}
Object.keys(query).forEach(key => { params[key] = query[key] })
if (user) {
......
......@@ -18,6 +18,7 @@ The archive API of the nomad@FAIRDI APIs. This API is about serving processed
"""
from typing import Dict, Any
from io import BytesIO
import os.path
from flask import send_file
from flask_restplus import abort, Resource
......@@ -144,11 +145,10 @@ class ArchiveQueryResource(Resource):
search_request = search.SearchRequest()
add_query(search_request, search_request_parser.parse_args())
calcs = sorted(
[entry for entry in search_request.execute_scan()],
key=lambda x: x['upload_id'])
calcs = search_request.execute_scan(order_by='upload_id')
def generator():
manifest = {}
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
......@@ -169,24 +169,27 @@ class ArchiveQueryResource(Resource):
lambda calc_id: upload_files.archive_file(calc_id, 'rb'),
lambda calc_id: upload_files.archive_file_size(calc_id))
try:
manifest = {
entry['calc_id']: {
manifest[calc_id] = {
key: entry[key]
for key in ArchiveQueryResource.manifest_quantities
if entry.get(key) is not None
}
for entry in calcs
}
manifest_contents = json.dumps(manifest)
except Exception as e:
manifest_contents = dict(error='Could not create the manifest: %s' % (e))
utils.get_logger(__name__).error(
'could not create raw query manifest', exc_info=e)
try:
manifest_contents = json.dumps(manifest).encode('utf-8')
except Exception as e:
manifest_contents = json.dumps(
dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
utils.get_logger(__name__).error(
'could not create raw query manifest', exc_info=e)
yield (
'manifest.json', 'manifest',
lambda *args: BytesIO(manifest_contents),
lambda *args: len(manifest_contents))
return streamed_zipfile(
generator(), zipfile_name='nomad_archive.zip', compress=compress,
manifest=manifest_contents)
generator(), zipfile_name='nomad_archive.zip', compress=compress)
@ns.route('/metainfo/<string:metainfo_package_name>')
......
......@@ -90,7 +90,7 @@ def upload_route(ns, prefix: str = ''):
def streamed_zipfile(
files: Iterable[Tuple[str, str, Callable[[str], IO], Callable[[str], int]]],
zipfile_name: str, compress: bool = False, manifest: str = None):
zipfile_name: str, compress: bool = False):
"""
Creates a response that streams the given files as a streamed zip file. Ensures that
each given file is only streamed once, based on its filename in the resulting zipfile.
......@@ -102,7 +102,6 @@ def streamed_zipfile(
zipfile_name: A name that will be used in the content disposition attachment
used as an HTTP respone.
compress: Uses compression. Default is stored only.
manifest: Add a ``manifest.json`` with the given content.
"""
streamed_files: Set[str] = set()
......@@ -114,11 +113,7 @@ def streamed_zipfile(
Replace the directory based iter of zipstream with an iter over all given
files.
"""
# first the manifest
if manifest is not None:
yield dict(arcname='manifest.json', iterable=(manifest.encode('utf-8'),))
# now the actual contents
# the actual contents
for zipped_filename, file_id, open_io, file_size in files:
if zipped_filename in streamed_files:
continue
......
......@@ -16,8 +16,9 @@
The raw API of the nomad@FAIRDI APIs. Can be used to retrieve raw calculation files.
"""
from typing import IO, Any, Union, Iterable, Tuple, List
from typing import IO, Any, Union, List
import os.path
from io import BytesIO
from flask import request, send_file
from flask_restplus import abort, Resource, fields
import magic
......@@ -27,7 +28,7 @@ import json
import gzip
import urllib.parse
from nomad import search, utils
from nomad import search, utils, config
from nomad.files import UploadFiles, Restricted
from nomad.processing import Calc
......@@ -416,17 +417,19 @@ class RawFileQueryResource(Resource):
def path(entry):
return '%s/%s' % (entry['upload_id'], entry['mainfile'])
calcs = sorted(
[entry for entry in search_request.execute_scan()],
key=lambda x: x['upload_id'])
calcs = search_request.execute_scan(order_by='upload_id')
paths = [path(entry) for entry in calcs]
if strip:
if search_request.execute()['total'] > config.raw_file_strip_cutoff:
abort(400, 'The requested download has to many files for using "strip".')
calcs = list(calcs)
paths = [path(entry) for entry in calcs]
common_prefix_len = len(utils.common_prefix(paths))
else:
common_prefix_len = 0
def generator():
manifest = {}
for entry in calcs:
upload_id = entry['upload_id']
mainfile = entry['mainfile']
......@@ -451,26 +454,32 @@ class RawFileQueryResource(Resource):
fnmatch.fnmatchcase(os.path.basename(filename_wo_prefix), pattern)
for pattern in patterns):
yield filename_wo_prefix, filename, upload_files
yield (
filename_wo_prefix, filename,
lambda upload_filename: upload_files.raw_file(upload_filename, 'rb'),
lambda upload_filename: upload_files.raw_file_size(upload_filename))
try:
manifest = {
path(entry): {
key: entry[key]
for key in RawFileQueryResource.manifest_quantities
if entry.get(key) is not None
}
for entry in calcs
}
manifest_contents = json.dumps(manifest)
except Exception as e:
manifest_contents = dict(error='Could not create the manifest: %s' % (e))
utils.get_logger(__name__).error(
'could not create raw query manifest', exc_info=e)
return _streamed_zipfile(
generator(), zipfile_name='nomad_raw_files.zip', compress=compress,
manifest=manifest_contents)
manifest[path(entry)] = {
key: entry[key]
for key in RawFileQueryResource.manifest_quantities
if entry.get(key) is not None
}
try:
manifest_contents = json.dumps(manifest).encode('utf-8')
except Exception as e:
manifest_contents = json.dumps(
dict(error='Could not create the manifest: %s' % (e))).encode('utf-8')
utils.get_logger(__name__).error(
'could not create raw query manifest', exc_info=e)
yield (
'manifest.json', 'manifest',
lambda *args: BytesIO(manifest_contents),
lambda *args: len(manifest_contents))
return streamed_zipfile(
generator(), zipfile_name='nomad_raw_files.zip', compress=compress)
def respond_to_get_raw_files(upload_id, files, compress=False, strip=False):
......@@ -492,28 +501,10 @@ def respond_to_get_raw_files(upload_id, files, compress=False, strip=False):
common_prefix_len = 0
with zipfile_cache:
return _streamed_zipfile(
[(filename[common_prefix_len:], filename, upload_files) for filename in files],
return streamed_zipfile(
[(
filename[common_prefix_len:], filename,
lambda upload_filename: upload_files.raw_file(upload_filename, 'rb'),
lambda upload_filename: upload_files.raw_file_size(upload_filename)
) for filename in files],
zipfile_name='%s.zip' % upload_id, compress=compress)
def _streamed_zipfile(
files: Iterable[Tuple[str, str, UploadFiles]], **kwargs):
"""
Creates a response that streams the given files as a streamed zip file. Ensures that
each given file is only streamed once, based on its filename in the resulting zipfile.
Arguments:
files: An iterable of tuples with the filename to be used in the resulting zipfile,
the filename within the upload, the :class:`UploadFiles` that contains
the file.
**kwargs: See :func:`streamed_zipfile`
"""
def map(name, upload_filename, upload_files):
return (
name, upload_filename,
lambda upload_filename: upload_files.raw_file(upload_filename, 'rb'),
lambda upload_filename: upload_files.raw_file_size(upload_filename))
return streamed_zipfile([map(*item) for item in files], **kwargs)
......@@ -204,6 +204,7 @@ auxfile_cutoff = 100
parser_matching_size = 9128
console_log_level = logging.WARNING
max_upload_size = 32 * (1024 ** 3)
raw_file_strip_cutoff = 1000
def normalize_loglevel(value, default_level=logging.INFO):
......
......@@ -526,12 +526,25 @@ class SearchRequest:
"""
return self._response(self._search.query(self.q)[0:0].execute())
def execute_scan(self):
def execute_scan(self, order_by: str = None, order: int = -1):
"""
This execute the search as scan. The result will be a generator over the found
entries. Everything but the query part of this object, will be ignored.
"""
for hit in self._search.query(self.q).scan():
search = self._search.query(self.q)
if order_by is not None:
if order_by not in quantities:
raise KeyError('Unknown order quantity %s' % order_by)
order_by_quantity = quantities[order_by]
if order == 1:
search = search.sort(order_by_quantity.elastic_field)
else:
search = search.sort('-%s' % order_by_quantity.elastic_field)
for hit in search.scan():
yield hit.to_dict()
def execute_paginated(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment