Commit 21616509 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored archive file related functionality.

parent c1057384
......@@ -20,8 +20,8 @@ The archive API of the nomad@FAIRDI APIs. This API is about serving processed
from typing import Dict, Any
from io import BytesIO
import os.path
from flask import send_file, request
from flask_restplus import abort, Resource
from flask import send_file, request, g
from flask_restplus import abort, Resource, fields
import json
import importlib
import urllib.parse
......@@ -31,12 +31,12 @@ import nomad_meta_info
from nomad.files import UploadFiles, Restricted
from nomad import search, config
from nomad.app import common
from nomad.archive import ArchiveFileDB
from nomad.archive import query_archive
from .auth import authenticate, create_authorization_predicate
from .api import api
from .common import calc_route, streamed_zipfile, search_model, add_pagination_parameters,\
add_scroll_parameters, add_search_parameters, apply_search_parameters
from .common import calc_route, streamed_zipfile, search_model,\
add_search_parameters, apply_search_parameters, query_model
ns = api.namespace(
'archive',
......@@ -212,30 +212,10 @@ class ArchiveDownloadResource(Resource):
generator(), zipfile_name='nomad_archive.zip', compress=compress)
_archive_query_parser = api.parser()
add_pagination_parameters(_archive_query_parser)
add_scroll_parameters(_archive_query_parser)
add_search_parameters(_archive_query_parser)
_archive_query_parser.add_argument(
'db', type=str, help='Database to use, zip or msg', default='zip', location='args')
_archive_query_parser.add_argument(
'qschema', type=str, help='Serialized archive dict with null values as placeholder for data.')
_archive_query_model = api.clone('ArchiveCalculations', search_model)
# scroll model should be capitalized to prevent ambiguity with scroll flag
_archive_query_model['Scroll'] = _archive_query_model.pop('scroll')
_archive_query_model['Pagination'] = _archive_query_model.pop('pagination')
def get_dbs(upload_id):
upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
return []
files = upload_files.archive_file_msg(',')
msgdbs = [ArchiveFileDB(f) for f in files if f is not None]
return msgdbs
_archive_query_model = api.inherit('ArchiveCalculations', search_model, {
'query': fields.Nested(query_model, description='The query used to find the requested entries.'),
'query_schema': fields.Raw(description='The query schema that defines what archive data to retrive.')
})
@ns.route('/query')
......@@ -261,48 +241,41 @@ class ArchiveQueryResource(Resource):
try:
data_in = request.get_json()
scroll = data_in.get('scroll', None)
scroll_id = data_in.get('scroll_id', None)
Scroll = data_in.get('Scroll', None)
if Scroll:
scroll = Scroll.get('scroll', scroll)
scroll_id = Scroll.get('scroll_id', scroll_id)
pagination = data_in.get('Pagination', None)
page = data_in.get('page', 1)
per_page = data_in.get('per_page', 10 if not scroll else 1000)
order = data_in.get('order', -1)
order_by = data_in.get('order_by', 'upload_id')
if pagination:
page = pagination.get('page', page)
per_page = pagination.get('per_page', per_page)
order = pagination.get('order', order)
order_by = pagination.get('order_by', order_by)
qschema = data_in.get('results', None)
if qschema is not None:
qschema = qschema[-1]
if scroll:
scroll_id = scroll.get('scroll_id')
scroll = True
pagination = data_in.get('pagination', {})
page = pagination.get('page', 1)
per_page = pagination.get('per_page', 10 if not scroll else 1000)
query = data_in.get('query', {})
query_schema = data_in.get('query_schema', '*')
except Exception:
abort(400, message='bad parameter types')
try:
assert page >= 1
assert per_page > 0
except AssertionError:
abort(400, message='invalid pagination')
if order not in [-1, 1]:
if not (page >= 1 and per_page > 0):
abort(400, message='invalid pagination')
search_request = search.SearchRequest()
apply_search_parameters(search_request, data_in)
search_request.include('calc_id', 'upload_id', 'mainfile')
if g.user is not None:
search_request.owner('all', user_id=g.user.user_id)
else:
search_request.owner('all')
apply_search_parameters(search_request, query)
search_request.include('calc_id', 'upload_id', 'with_embargo')
try:
if scroll:
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
results = search_request.execute_scrolled(
scroll_id=scroll_id, size=per_page, order_by='upload_id')
results['scroll']['scroll'] = True
else:
results = search_request.execute_paginated(
per_page=per_page, page=page, order=order, order_by=order_by)
per_page=per_page, page=page, order_by='upload_id')
except search.ScrollIdNotFound:
abort(400, 'The given scroll_id does not exist.')
......@@ -313,45 +286,33 @@ class ArchiveQueryResource(Resource):
data = []
calcs = results['results']
try:
msgdbs = None
cur_upload_id = None
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if msgdbs is None or cur_upload_id != upload_id:
msgdbs = get_dbs(upload_id)
cur_upload_id = upload_id
if len(msgdbs) == 0:
raise KeyError
calc_data = None
for msgdb in msgdbs:
calc_data = msgdb.query({calc_id: qschema})
if calc_data:
data.append(calc_data)
break
if calc_data is None:
raise Restricted
archive_files = None
cur_upload_id = None
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if archive_files is None or cur_upload_id != upload_id:
upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
return []
archive_files = upload_files.archive_file_msgs()
cur_upload_id = upload_id
if entry['with_embargo']:
archive_file = archive_files[1]
else:
archive_file = archive_files[0]
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
if archive_file is None:
continue
except KeyError:
abort(404, message='Calculation %s/%s does not exist.' % (upload_id, calc_id))
data.append(query_archive(archive_file, {calc_id: query_schema}))
# assign archive data to results
results['results'] = data
# for compatibility with archive model
# TODO should be changed in search
if scroll:
results['Scroll'] = results.pop('scroll', None)
if pagination:
results['Pagination'] = results.pop('pagination', None)
return results, 200
......
......@@ -74,10 +74,12 @@ search_model_fields = {
'from_time': fields.Raw(description='The minimum entry time.', allow_null=True, skip_none=True),
'until_time': fields.Raw(description='The maximum entry time.', allow_null=True, skip_none=True),
}
for quantity in search.quantities.values():
search_model_fields[quantity.name] = fields.Raw(description=quantity.description, allow_null=True, skip_none=True)
search_model = api.model('Search', search_model_fields)
query_model = api.model('Query', {
quantity.name: fields.Raw(description=quantity.description)
for quantity in search.quantities.values()})
def add_pagination_parameters(request_parser):
""" Add pagination parameters to Flask querystring parser. """
......@@ -124,6 +126,7 @@ def add_search_parameters(request_parser):
action=quantity.argparse_action if quantity.multi else None)
def apply_search_parameters(search_request: search.SearchRequest, args: Dict[str, Any]):
"""
Help that adds query relevant request args to the given SearchRequest.
......
......@@ -109,6 +109,8 @@ for group_name, (group_quantity, _) in search.groups.items():
'after': fields.String(description='The after value that can be used to retrieve the next %s.' % group_name),
'values': fields.Raw(description='A dict with %s as key. The values are dicts with "total" and "examples" keys.' % group_quantity)
}), skip_none=True)
for quantity in search.quantities.values():
_repo_calcs_model_fields[quantity.name] = fields.Raw(description=quantity.description, allow_null=True, skip_none=True)
_repo_calcs_model = api.inherit('RepoCalculations', search_model, _repo_calcs_model_fields)
......
......@@ -7,6 +7,7 @@ import struct
import json
import math
import os.path
import re
from nomad import utils
......@@ -22,6 +23,12 @@ def unpackb(o, **kwargs):
return msgpack.unpackb(o, raw=False)
def adjust_uuid_size(uuid):
uuid = uuid.rjust(utils.default_hash_len, ' ')
assert len(uuid) == utils.default_hash_len, 'uuids must have the right fixed size'
return uuid
class ArchiveError(Exception):
pass
......@@ -144,6 +151,9 @@ class ArchiveWriter:
return start, self._pos
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
raise exc_val
# go back and write the real TOC to the header
self._f.seek(0)
self._pos = 0
......@@ -174,6 +184,8 @@ class ArchiveWriter:
end.to_bytes(5, byteorder='little', signed=False)
def add(self, uuid: str, data: Any) -> None:
uuid = adjust_uuid_size(uuid)
self._toc_packer.reset()
packed = self._toc_packer.pack(data)
toc = self._toc_packer.toc
......@@ -582,39 +594,54 @@ class ArchiveFileDB:
lo = int(str_index)
return lo
def _load_data(self, qdict: Dict[str, Any], db: ArchiveObject, main_section: bool = False):
if not isinstance(qdict, dict):
if isinstance(db, ArchiveObject):
return db.to_dict()
elif isinstance(db, ArchiveList):
return list(db)
elif isinstance(db, ArchiveItem):
return dict(db)
def query_archive(f, query_dict: dict):
def _load_data(query_dict: Dict[str, Any], archive_item: ArchiveObject, main_section: bool = False):
if not isinstance(query_dict, dict):
if isinstance(archive_item, ArchiveObject):
return archive_item.to_dict()
elif isinstance(archive_item, ArchiveList):
return list(archive_item)
else:
return db
return archive_item
res = {}
for key, val in qdict.items():
index = ArchiveFileDB._get_index(key)
dbkey = key.split('[')[0]
if main_section:
dbkey = self._adjust_key(dbkey)
try:
if index is None:
res[key] = self._load_data(val, db[dbkey])
elif isinstance(index, int):
res[key] = self._load_data(val, db[dbkey])[index]
for key, val in query_dict.items():
key = key.strip()
# process array indices
match = re.match(r'([_a-bA-Z0-9]+)\[([0-9]+|:)\]', key)
if match:
archive_key = match.group(1)
index_str = match.group(2)
match = re.match(r'([0-9]*):([0-9]*)', index_str)
if match:
index = (
0 if match.group(1) == '' else int(match.group(1)),
None if match.group(2) == '' else int(match.group(2)))
else:
res[key] = self._load_data(val, db[dbkey])[index[0]: index[1]]
index = int(index_str) # type: ignore
else:
archive_key = key
index = None
except Exception:
continue
# support for shorter uuids
archive_key = key.split('[')[0]
if main_section:
archive_key = adjust_uuid_size(key)
if index is None:
res[key] = _load_data(val, archive_item[archive_key])
elif isinstance(index, int):
res[key] = _load_data(val, archive_item[archive_key])[index]
else:
res[key] = _load_data(val, archive_item[archive_key])[index[0]: index[1]]
return res
def query(self, qdict):
with ArchiveReader(self._fileobj) as db:
return self._load_data(qdict, db, True)
with ArchiveReader(f) as archive:
return _load_data(query_dict, archive, True)
if __name__ == '__main__':
......
......@@ -44,7 +44,7 @@ from typing import Dict, List, Any, Union
from nomad.metainfo import MSection, Quantity, SubSection
from nomad.metainfo.metainfo import MObjectMeta
from nomad.archive import ArchiveFileDB
# from nomad.archive import ArchiveFileDB
from nomad import config as nomad_config
from nomad.cli.client.client import KeycloakAuthenticator
......@@ -427,3 +427,7 @@ class ArchiveQuery:
self._get_archive_data()
if self.archive_data:
self.metainfo = ArchiveMetainfo(archive_data=self.archive_data, archive_schema='*')
# TODO
# def query()...
\ No newline at end of file
......@@ -50,7 +50,7 @@ being other mainfiles. Therefore, the aux files of a restricted calc might becom
from abc import ABCMeta
import sys
from typing import IO, Generator, Dict, Iterable, Callable, List, Tuple
from typing import IO, Generator, Dict, Iterable, Callable, List, Tuple, Any
import os.path
import os
import shutil
......@@ -58,10 +58,11 @@ import tarfile
import hashlib
import io
import pickle
import json
from nomad import config, utils
from nomad.datamodel import UploadWithMetadata
from nomad.archive import ArchiveFileDB
from nomad.archive import write_archive
# TODO this should become obsolete, once we are going beyong python 3.6. For now
# python 3.6's zipfile does not allow to seek/tell within a file-like opened from a
......@@ -490,22 +491,47 @@ class StagingUploadFiles(UploadFiles):
file = target_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext))
return zipfile.ZipFile(file.os_path, mode='w')
def create_msgfile(kind: str, prefix: str, ext: str) -> ArchiveFileDB:
def write_msgfile(kind: str, prefix: str, ext: str, size: int, data: Iterable[Tuple[str, Any]]):
file = target_dir.join_file('%s-%s.%s.msg' % (kind, prefix, ext))
return ArchiveFileDB(file.os_path, mode='w')
write_archive(file.os_path, size, data)
# zip archives
if not skip_archive:
with utils.timer(self.logger, 'packed zip json archive'):
self._pack_archive_files(upload, create_zipfile)
with utils.timer(self.logger, 'packed msgpack archive'):
self._pack_archive_files(upload, create_msgfile)
self._pack_archive_files_msgpack(upload, write_msgfile)
# zip raw files
if not skip_raw:
with utils.timer(self.logger, 'packed raw files'):
self._pack_raw_files(upload, create_zipfile)
def _pack_archive_files_msgpack(self, upload: UploadWithMetadata, write_msgfile):
restricted, public = 0, 0
for calc in upload.calcs:
if calc.with_embargo:
restricted += 1
else:
public += 1
def create_iterator(with_embargo: bool):
for calc in upload.calcs:
if with_embargo == calc.with_embargo:
archive_file = self.archive_file_object(calc.calc_id)
if archive_file.exists():
with open(archive_file.os_path, 'rt') as f:
yield (calc.calc_id, json.load(f))
else:
yield (calc.calc_id, {})
try:
write_msgfile('archive', 'public', self._archive_ext, public, create_iterator(False))
write_msgfile('archive', 'restricted', self._archive_ext, restricted, create_iterator(True))
except Exception as e:
self.logger.error('exception during packing archives', exc_info=e)
def _pack_archive_files(self, upload: UploadWithMetadata, create_zipfile):
archive_public_zip = create_zipfile('archive', 'public', self._archive_ext)
archive_restricted_zip = create_zipfile('archive', 'restricted', self._archive_ext)
......@@ -804,32 +830,29 @@ class PublicUploadFiles(UploadFiles):
raise KeyError(path)
def _file_msg(self, prefix: str, ext: str, path: str, *args, **kwargs) -> List[IO]:
f = []
for access in ['public', 'restricted']:
try:
f.append(self.open_msg_file(prefix, access, ext))
if (access == 'restricted' or always_restricted(path)) and not self._is_authorized():
f[-1] = None
except FileNotFoundError:
f.append(None)
except IsADirectoryError:
f.append(None)
except KeyError:
raise
def _file_msg(self, prefix: str, access: str, ext: str, *args, **kwargs) -> IO:
try:
if access == 'restricted' and not self._is_authorized():
return None
return f
return self.open_msg_file(prefix, access, ext)
except FileNotFoundError:
return None
except IsADirectoryError:
return None
except KeyError:
raise
def to_staging_upload_files(self, create: bool = False, **kwargs) -> 'StagingUploadFiles':
exists = False
try:
staging_upload_files = PublicUploadFilesBasedStagingUploadFiles(self)
staging_upload_files = PublicUploadFilesBasedStagingUploadFiles(self, is_authorized=lambda: True)
exists = True
except KeyError:
if not create:
return None
staging_upload_files = PublicUploadFilesBasedStagingUploadFiles(self, create=True)
staging_upload_files = PublicUploadFilesBasedStagingUploadFiles(self, create=True, is_authorized=lambda: True)
staging_upload_files.extract(**kwargs)
if exists and create:
......@@ -900,8 +923,10 @@ class PublicUploadFiles(UploadFiles):
def archive_file(self, calc_id: str, *args, **kwargs) -> IO:
return self._file('archive', self._archive_ext, '%s.%s' % (calc_id, self._archive_ext), *args, **kwargs)
def archive_file_msg(self, calc_id: str, *args, **kwargs) -> List[IO]:
return self._file_msg('archive', self._archive_ext, '%s.%s' % (calc_id, self._archive_ext), *args, **kwargs)
def archive_file_msgs(self, *args, **kwargs) -> Tuple[IO, IO]:
return (
self._file_msg('archive', 'public', self._archive_ext, *args, **kwargs),
self._file_msg('archive', 'restricted', self._archive_ext, *args, **kwargs))
def archive_file_size(self, calc_id: str) -> int:
file_path = '%s.%s' % (calc_id, self._archive_ext)
......@@ -956,15 +981,15 @@ class PublicUploadFiles(UploadFiles):
file = self.join_file('%s-%s.%s.repacked.zip' % (kind, prefix, ext))
return zipfile.ZipFile(file.os_path, mode='w')
def create_msgfile(kind: str, prefix: str, ext: str) -> ArchiveFileDB:
def write_msgfile(kind: str, prefix: str, ext: str, size: int, data: Iterable[Tuple[str, Any]]):
file = self.join_file('%s-%s.%s.repacked.msg' % (kind, prefix, ext))
return ArchiveFileDB(file.os_path, mode='w')
write_archive(file.os_path, size, data)
# perform the repacking
try:
if not skip_archive:
staging_upload._pack_archive_files(upload, create_zipfile)
staging_upload._pack_archive_files(upload, create_msgfile)
staging_upload._pack_archive_files_msgpack(upload, write_msgfile)
if not skip_raw:
staging_upload._pack_raw_files(upload, create_zipfile)
finally:
......
......@@ -609,7 +609,9 @@ class SearchRequest:
result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
return result
def execute_scrolled(self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m'):
def execute_scrolled(
self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m',
order_by: str = None, order: int = -1):
"""
Executes a scrolling search. based on ES scroll API. Pagination is replaced with
scrolling, no ordering is available, no statistics, no quantities will be provided.
......@@ -628,6 +630,8 @@ class SearchRequest:
size: The batch size in number of hits.
scroll: The time the scroll should be kept alive (i.e. the time between requests
to this method) in ES time units. Default is 5 minutes.
TODO support order and order_by
"""
es = infrastructure.elastic_client
......
......@@ -669,7 +669,10 @@ class TestArchive(UploadFilesBasedTests):
assert_zip_file(rv, files=1)
def test_post_archive_query(self, api, published_wo_user_metadata):
schema = {"section_run": {"section_single_configuration_calculation": {"energy_total": '*'}}}
schema = {
'section_run': {
'section_single_configuration_calculation': {
'energy_total': '*'}}}
data = {'results': [schema], 'per_page': 5}
uri = '/archive/query'
rv = api.post(uri, content_type='application/json', data=json.dumps(data))
......
......@@ -4,7 +4,7 @@ import msgpack
from io import BytesIO
from nomad import utils
from nomad.archive import TOCPacker, write_archive, read_archive, ArchiveReader, ArchiveFileDB
from nomad.archive import TOCPacker, write_archive, read_archive, ArchiveReader, query_archive
def create_example_uuid(index: int = 0):
......@@ -78,6 +78,11 @@ def test_toc_packer(example_entry):
assert msgpack.unpackb(data, raw=False) == example_entry
def test_write_archive_empty():
f = BytesIO()
write_archive(f, 0, [])
def test_write_archive_single(example_uuid, example_entry):
f = BytesIO()
print('#', len(example_uuid))
......@@ -165,22 +170,26 @@ def test_read_archive_multi(example_uuid, example_entry, use_blocked_toc):
reader.get(create_example_uuid(i)) is not None
def test_archivefiledb():
payload = [
{'calc1': {
'secA': {'subsecA1': [{'propA1a': 1.0}]}, 'secB': {'propB1a': ['a', 'b']}}},
{'calc2': {
'secA': {'subsecA1': [{'propA1a': 2.0}]}, 'secB': {'propB1a': ['c', 'd']}}}]
def test_query():
payload = {
'calc1': {
'secA': {
'subsecA1': [{'propA1a': 1.0}]
},
'secB': {'propB1a': ['a', 'b']}
},
'calc2': {
'secA': {'subsecA1': [{'propA1a': 2.0}]},
'secB': {'propB1a': ['c', 'd']}
}
}
f = BytesIO()
msgdb = ArchiveFileDB(f, mode='w', entry_toc_depth=1)
msgdb.add_data(payload)
msgdb.close(save=False)
f = BytesIO(f.getbuffer())
msgdb = ArchiveFileDB(f, mode='r')