Commit 54f086da authored by Alvin Noe Ladines's avatar Alvin Noe Ladines
Browse files

Implementation of msgpack in archive

parent e42036af
Pipeline #67831 passed with stages
in 14 minutes and 12 seconds
......@@ -30,6 +30,7 @@ import nomad_meta_info
from nomad.files import UploadFiles, Restricted
from nomad import utils, search, config
from nomad.archive_library.filedb import ArchiveFileDB
from .auth import authenticate, create_authorization_predicate
from .api import api
......@@ -214,6 +215,10 @@ _archive_query_parser = api.parser()
add_pagination_parameters(_archive_query_parser)
add_scroll_parameters(_archive_query_parser)
add_search_parameters(_archive_query_parser)
_archive_query_parser.add_argument(
'db', type=str, help='Database to use, zip or msg', default='zip', location='args')
_archive_query_parser.add_argument(
'qschema', type=str, help='Serialized archive dict with null values as placeholder for data.')
_archive_query_model_fields = {
'results': fields.List(fields.Raw, description=(
......@@ -227,103 +232,163 @@ _archive_query_model_fields = {
_archive_query_model = api.inherit('ArchiveCalculations', search_model, _archive_query_model_fields)
def execute_search():
try:
args = {
key: value for key, value in _archive_query_parser.parse_args().items()
if value is not None}
scroll = args.get('scroll', False)
scroll_id = args.get('scroll_id', None)
page = args.get('page', 1)
per_page = args.get('per_page', 10 if not scroll else 1000)
order = args.get('order', -1)
order_by = 'upload_id'
db = args.get('db')
qschema = args.get('qschema', None)
except Exception:
abort(400, message='bad parameter types')
try:
assert page >= 1
assert per_page > 0
except AssertionError:
abort(400, message='invalid pagination')
if order not in [-1, 1]:
abort(400, message='invalid pagination')
search_request = search.SearchRequest()
apply_search_parameters(search_request, args)
try:
if scroll:
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
else:
results = search_request.execute_paginated(
per_page=per_page, page=page, order=order, order_by=order_by)
except search.ScrollIdNotFound:
abort(400, 'The given scroll_id does not exist.')
except KeyError as e:
import traceback
traceback.print_exc()
abort(400, str(e))
if qschema is None:
qschema = request.get_json()
qschema = qschema if qschema is None else qschema['results']
else:
qschema = json.loads(qschema)
data = []
calcs = results['results']
try:
upload_files = None
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
raise KeyError
upload_files.open_zipfile_cache()
if db == 'msg':
fos = upload_files.archive_file_msg(calc_id)
msgdbs = [ArchiveFileDB(fo) for fo in fos if fo is not None]
if db == 'zip':
fo = upload_files.archive_file(calc_id, 'rb')
data.append({calc_id: json.loads(fo.read())})
elif db == 'msg':
for msgdb in msgdbs:
data.append(msgdb.query({calc_id: qschema}))
if upload_files is not None:
upload_files.close_zipfile_cache()
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
except KeyError:
abort(404, message='Calculation %s/%s does not exist.' % (upload_id, calc_id))
results['results'] = data
# build python code and curl snippet
results['python'] = query_api_python('archive', 'query', query_string=request.args)
results['curl'] = query_api_curl('archive', 'query', query_string=request.args)
return results
@ns.route('/query')
class ArchiveQueryResource(Resource):
@api.doc('archive_query')
@api.doc('get_archive_query')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
@api.response(401, 'Not authorized to access the data.')
@api.response(404, 'The upload or calculation does not exist')
@api.response(200, 'Archive data send')
@api.response(200, 'Archive data send', headers={'Content-Type': 'application/octet-stream'})
@api.expect(_archive_query_parser, validate=True)
@api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Search results sent')
@api.marshal_with(_archive_query_model, skip_none=True, code=200, description='File sent')
@authenticate()
def get(self):
"""
Get archive data in json format from all query results.
Get msgpack database resulting from query.
See ``/repo`` endpoint for documentation on the search
parameters.
The actual data are in archive_data and a supplementary python code (curl) to
The actual data are in results and a supplementary python code (curl) to
execute search is in python (curl).
"""
try:
args = {
key: value for key, value in _archive_query_parser.parse_args().items()
if value is not None}
scroll = args.get('scroll', False)
scroll_id = args.get('scroll_id', None)
page = args.get('page', 1)
per_page = args.get('per_page', 10 if not scroll else 1000)
order = args.get('order', -1)
order_by = 'upload_id'
except Exception:
abort(400, message='bad parameter types')
try:
assert page >= 1
assert per_page > 0
except AssertionError:
abort(400, message='invalid pagination')
if order not in [-1, 1]:
abort(400, message='invalid pagination')
search_request = search.SearchRequest()
apply_search_parameters(search_request, args)
try:
if scroll:
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
else:
results = search_request.execute_paginated(
per_page=per_page, page=page, order=order, order_by=order_by)
except search.ScrollIdNotFound:
abort(400, 'The given scroll_id does not exist.')
except KeyError as e:
import traceback
traceback.print_exc()
abort(400, str(e))
# build python code and curl snippet
results['python'] = query_api_python('archive', 'query', query_string=request.args)
results['curl'] = query_api_curl('archive', 'query', query_string=request.args)
data = []
calcs = results['results']
try:
upload_files = None
for entry in calcs:
upload_id = entry['upload_id']
calc_id = entry['calc_id']
if upload_files is None or upload_files.upload_id != upload_id:
if upload_files is not None:
upload_files.close_zipfile_cache()
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
raise KeyError
upload_files.open_zipfile_cache()
fo = upload_files.archive_file(calc_id, 'rb')
data.append(json.loads(fo.read()))
if upload_files is not None:
upload_files.close_zipfile_cache()
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
results = execute_search()
# create msgpack database from results and return database file
data = results['results']
filename = 'nomad_archive.msg'
msgdb = ArchiveFileDB(BytesIO(), max_lfragment=2)
for calc in data:
msgdb.add_data(calc)
msgdb.create_db()
data_stream = msgdb.fileobj
data_stream.seek(0)
return send_file(
data_stream,
mimetype='application/octet_stream',
as_attachment=True,
cache_timeout=0,
attachment_filename=filename)
@api.doc('post_archive_query')
@api.response(400, 'Invalid requests, e.g. wrong owner type or bad search parameters')
@api.response(401, 'Not authorized to access the data.')
@api.response(404, 'The upload or calculation does not exist')
@api.response(200, 'Archive data send')
@api.expect(_archive_query_parser, validate=True)
@api.marshal_with(_archive_query_model, skip_none=True, code=200, description='Search results sent')
@authenticate(signature_token=True)
def post(self):
"""
Post an query schema and return it filled with archive data in json format from
all query results.
except KeyError:
abort(404, message='Calculation %s/%s does not exist.' % (upload_id, calc_id))
See ``/repo`` endpoint for documentation on the search
parameters.
results['results'] = data
The actual data are in results and a supplementary python code (curl) to
execute search is in python (curl).
"""
results = execute_search()
return results, 200
......
......@@ -267,7 +267,7 @@ def query_api_python(*args, **kwargs):
"""
url = query_api_url(*args, **kwargs)
return '''import requests
response = requests.get("{}")
response = requests.post("{}")
data = response.json()'''.format(url)
......@@ -276,4 +276,4 @@ def query_api_curl(*args, **kwargs):
Creates a string of curl command to execute a search query to the repository.
"""
url = query_api_url(*args, **kwargs)
return 'curl -X GET %s -H "accept: application/json" --output "nomad.json"' % url
return 'curl -X POST %s -H "accept: application/json" --output "nomad.json"' % url
# Copyright 2019 Alvin Noe Ladines, Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module for storage of archive data using the msgpack module.
In module ``ArchiveFileDB, the data are fragmented and saved as a list
in a msgpack file.
A component can be retrieved by giving an offset to the msgpack file to
be unpacked. The database can then be queried by giving a schema similar
to the archive data.
To build the database,
.. code-block:: python
db = ArchiveFileDB("db.msg", mode='w', max_lfragment=3)
db.add_data(["archive1.json", "archive2.json"])
db.close()
To query the database,
.. code-block:: python
db = ArchiveFileDB("db.msg", mode='r')
db.query({'idX':{'sectionX':{'propertyX':None}}})
db.close()
"""
import msgpack
import json
import os
from io import StringIO, BufferedWriter, BufferedReader, BytesIO
class JSONdata:
"""
Provides a graphQL-style query for a given json data and query schema.
Arguments:
data: The json data to be queried
"""
def __init__(self, data):
self.data = data
def _get_data(self, key, val):
if isinstance(key, str):
key = key.split('/')
index = None
bn0 = key[0].find('[')
if bn0 > 0:
bn1 = key[0].find(']')
index = key[0][bn0 + 1:bn1]
key[0] = key[0].replace(key[0][bn0:bn1 + 1], '')
if ':' in index:
v = val[key[0]]
lo, hi = index.split(':')
lo = int(lo) if lo else 0
hi = int(hi) if hi else 0
lo = len(v) + lo if lo < 0 else lo
hi = len(v) + hi if hi <= 0 else hi
if hi > len(v):
hi = len(v)
if lo > hi or lo < 0 or hi < 0:
return
index = list(range(lo, hi))
else:
index = int(index)
if not key[0] in val:
return
v = val[key[0]]
if isinstance(index, int):
try:
v = v[index]
except (IndexError, KeyError, TypeError):
return
elif isinstance(index, list):
try:
v = [v[i] for i in index]
except (IndexError, KeyError, TypeError):
return
if len(key) > 1:
if isinstance(v, list):
return [self._get_data(key[1:], vi) for vi in v]
else:
return self._get_data(key[1:], v)
else:
return v
def get_data(self, entry, root=None):
"""
Recursively searches the json data to fill in the null values
in the given schema.
Arguments:
entry: a dict with a structure similar to the json data but
with null value to be filled with the desired value.
"""
data = {}
if entry is None:
v = self._get_data(root, self.data)
return v
else:
for key, val in entry.items():
if root is not None:
k = os.path.join(root, key)
else:
k = key
data[key] = self.get_data(val, k)
return data
class ArchiveFileDB:
"""
An interface to the messagepack module to provide an searchable
container of archive data.
Arguments:
fileio: can be a string or file object to read/write the msgpack file
mode: r/w to indicate read or write the msgpack file
max_lfragment: the maximum level for which the archive data will
be fragmented for more efficient unpacking of msgpack components
"""
def __init__(self, fileio, mode='r', max_lfragment=None):
self._filename = None
self._fileobj = None
if isinstance(fileio, str):
self._filename = fileio
self._mode = mode
elif isinstance(fileio, BufferedReader):
self._fileobj = fileio
self._mode = 'rb'
elif isinstance(fileio, BufferedWriter):
self._fileobj = fileio
self._mode = 'wb'
elif isinstance(fileio, BytesIO):
self._fileobj = fileio
self._mode = mode
else:
raise TypeError
self._max_lfragment = max_lfragment
if 'w' in self._mode and self._max_lfragment is None:
self._max_lfragment = 2
if '+' in self._mode:
self._max_lfragment = None
self._sep = 'MSG_ENTRY'
self._ids = None
self._data = {}
@property
def max_lfragment(self):
if self._max_lfragment is None:
orig_mode = self.mode
self.mode = 'rb'
self._max_lfragment = self.get_docs('MAX_LFRAGMENT')
self.mode = orig_mode
return self._max_lfragment
def _fragment_json(self, data, key='', cur_lfragment=0):
if cur_lfragment >= self.max_lfragment or not isinstance(data, dict):
return [dict(path=key, data={os.path.basename(key): data})]
res = []
cur_lfragment += 1
main = dict(path=key, data=[])
for k, v in data.items():
p = os.path.join(key, k)
res += self._fragment_json(v, p, cur_lfragment)
main['data'].append(p)
res += [main]
return res
def write(self, abspath, relpath):
"""
Mimic the zipfile function to write files to database.
Arguments:
abspath: The absolute path to the file to be read
relpath: For compatibility with zipfile
"""
self.add_data(abspath)
def close(self, save=True):
"""
Mimic the zipfile function to close the msgpack file.
Will trigger the creation of the database when in write mode.
Arguments:
save: If True will add the current data in memory to database
"""
if 'w' in self._mode:
self.create_db()
if self._fileobj:
self._fileobj.close()
self._fileobj = None
def save(self):
"""
Commits current data in memory to database
"""
self.create_db()
def add_data(self, data):
"""
Add data to the msgpack database.
Arguments:
data: Can be a filename or dictionary or list of both
"""
if isinstance(data, str):
key = os.path.basename(data)
if data.endswith('json'):
key = key.split('.')[0]
self._data[key] = json.load(open(data))
else:
key = key.replace('.', '_')
self._data[key] = open(data).read()
elif isinstance(data, dict):
key = list(data.keys())
assert len(key) == 1
self._data[key[-1]] = data[key[-1]]
elif isinstance(data, list):
for i in range(len(data)):
self.add_data(data[i])
else:
raise NotImplementedError
# TODO
# def edit_data(self, data):
# if self._data:
# print("Memory not empty. Commit existing data first")
# return
# self.add_data(data)
# entries = self._fragment_json(self._data)
# data_to_write = self._load_data()
# raise NotImplementedError
def _load_data(self):
orig_mode = self.mode
self.mode = 'rb'
self.fileobj.seek(0)
data_loaded = msgpack.load(self.fileobj)
self.mode = orig_mode
return data_loaded
def create_db(self):
"""
Creates the database and writes it to the msgpack file.
The database consists of the list of the fragmented data
and the list of footers such as the ids of the data.
"""
data = self._data
entries = self._fragment_json(data)
# save the data in a separate list everytime function is called
# last list reserved for IDs
if '+' in self._mode:
data_to_write = self._load_data()
data_to_write.insert(-1, [])
else:
data_to_write = [[], []]
# get last entry in database
last_index = 0
for i in range(len(data_to_write) - 1):
last_index += len(data_to_write[i]) / 2
last_index = int(last_index)
for i in range(len(entries)):
sep = '%s_%d' % (self._sep, i + last_index)
data_to_write[-2].append(sep)
data_to_write[-2].append(entries[i]['data'])
data_str = msgpack.dumps(data_to_write)
if not data_to_write[-1]:
head = {}
else:
sep = '%s_IDS' % (self._sep)
hi = data_to_write[-1].index(sep.encode()) + 1
head = data_to_write[-1][hi]
data_to_write[-1] = []
sep = '%s_MAX_LFRAGMENT' % (self._sep)
data_to_write[-1].append(sep)
data_to_write[-1].append(self.max_lfragment)
# add pointers to entries
for i in range(len(entries)):
sep = '%s_%d' % (self._sep, i + last_index)
index = data_str.index(sep.encode()) + len(sep)
head[entries[i]['path']] = index
sep = '%s_IDS' % (self._sep)
data_to_write[-1].append(sep)
data_to_write[-1].append(head)
data_str = msgpack.dumps(data_to_write)
self.fileobj.write(data_str)
self._data = {}
def _reduce_to_section(self, entry, cur_lfragment=0):
if entry is None:
return
cur_lfragment += 1
if cur_lfragment > self.max_lfragment:
return
new_dict = {}
for key, val in entry.items():
if '[' in key and ']' in key:
key = key.split('[')[0]
v = self._reduce_to_section(val, cur_lfragment)
new_dict[key] = v
return new_dict
@staticmethod
def to_list_path_str(entries, root='', paths=[]):
if entries is None:
return
if len(paths) > 0:
paths.remove(root)
for key, val in entries.items():
p = os.path.join(root, key)
paths.append(p)