Commit e5a2fb50 authored by Alvin Noe Ladines's avatar Alvin Noe Ladines
Browse files

Integration of new archive msgpack r/w interface

parent 8cc5d822
# New archive implementation
# Archive API tutorial
This contains the tutorials to use the new archive query functionality.
It uses the new metainfo definition for the archive data. In addition, the archive data
......@@ -46,15 +46,12 @@ parameters are downloaded. The results are then expressed in the new metainfo sc
which offers auto-completion feature, among others.
## Msgpack container
The archive data are now stored in a binary format called msgpack. The archive data are
fragmented and upon query will access only the relevant fragment without loading the whole
archives collection. This is beneficial when one only query small chunks but will approach the
efficiency of zip files when one accesses the whole archive. To create a msgpack database
The archive data are now stored in a binary format called msgpack. To create a msgpack database
from the archive data and query it, one uses ArchiveFileDB.
```python
from nomad.archive_library.filedb import ArchiveFileDB
db = ArchiveFileDB('archive.msg', mode='w', max_lfragment=2)
db = ArchiveFileDB('archive.msg', mode='w', entry_toc_depth=2)
db.add_data({'calc1':{'secA': {'subsecA': {'propA': 1.0}}, 'secB': {'propB': 'X'}}})
db.add_data({'calc2':{'secA': {'subsecA': {'propA': 2.0}}, 'secB': {'propB': 'Y'}}})
db.close()
......
......@@ -14,5 +14,6 @@ and infrastructure with a simplyfied architecture and consolidated code base.
api
metainfo
parser_tutorial
archive_tutorial
reference
ops
......@@ -30,9 +30,8 @@ import nomad_meta_info
from nomad.files import UploadFiles, Restricted
from nomad import search, config
from nomad.archive_library.utils import get_dbs
from nomad import search, config
from nomad.app import common
from nomad.archive import ArchiveFileDB
from .auth import authenticate, create_authorization_predicate
from .api import api
......@@ -228,6 +227,17 @@ _archive_query_model['Scroll'] = _archive_query_model.pop('scroll')
_archive_query_model['Pagination'] = _archive_query_model.pop('pagination')
def get_dbs(upload_id):
upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
return []
files = upload_files.archive_file_msg(',')
msgdbs = [ArchiveFileDB(f) for f in files if f is not None]
return msgdbs
@ns.route('/query')
class ArchiveQueryResource(Resource):
@api.doc('post_archive_query')
......
from typing import Iterable, Any, Tuple, Dict, BinaryIO, Union, List, cast
from io import BytesIO
from io import BytesIO, BufferedReader
from collections.abc import Mapping, Sequence
import msgpack
from msgpack.fallback import Packer, StringIO
import struct
import json
import math
import os.path
from nomad import utils
......@@ -98,6 +99,10 @@ class TOCPacker(Packer):
return pack_result
_toc_uuid_size = utils.default_hash_len + 1
_toc_item_size = _toc_uuid_size + 25 # packed(uuid + [10-byte-pos, 10-byte-pos])
class ArchiveWriter:
def __init__(self, file_or_path: Union[str, BytesIO], n_entries: int, entry_toc_depth: int):
self.file_or_path = file_or_path
......@@ -119,17 +124,13 @@ class ArchiveWriter:
raise ValueError('not a file or path')
# write empty placeholder header
toc_item_length = len(packb(utils.create_uuid()))
toc_item_length += len(packb([
ArchiveWriter._encode_position(0, 0), ArchiveWriter._encode_position(0, 0)]))
self._write_map_header(3)
self.write(packb('toc_pos'))
self.write(packb(ArchiveWriter._encode_position(0, 0)))
self.write(packb('toc'))
toc_start, _ = self._write_map_header(self.n_entries)
_, toc_end = self.write(b'0' * toc_item_length * self.n_entries)
_, toc_end = self.write(b'0' * _toc_item_size * self.n_entries)
self._toc_position = toc_start, toc_end
self.write(packb('data'))
......@@ -162,7 +163,7 @@ class ArchiveWriter:
self.write(packb('toc'))
toc_position = self.write(packb(toc, use_bin_type=True))
assert toc_position == self._toc_position
assert toc_position == self._toc_position, '%s - %s' % (toc_position, self._toc_position)
if isinstance(self.file_or_path, str):
self._f.close()
......@@ -272,6 +273,8 @@ class ArchiveReader(ArchiveObject):
f = cast(BytesIO, open(self.file_or_path, 'rb'))
elif isinstance(self.file_or_path, BytesIO):
f = self.file_or_path
elif isinstance(self.file_or_path, BufferedReader):
f = self.file_or_path
else:
raise ValueError('not a file or path')
......@@ -309,10 +312,8 @@ class ArchiveReader(ArchiveObject):
return self
fs_block_size = 4096
toc_uuid_size = 23
toc_entry_size = 48
toc_block_size_entries = math.ceil(4096 / 48)
toc_block_size_bytes = math.ceil(4096 / 48) * 48
toc_block_size_entries = math.ceil(4096 / _toc_item_size)
toc_block_size_bytes = math.ceil(4096 / 54) * _toc_item_size
def _load_toc_block(self, i_entry: int):
i_block = math.floor(i_entry / ArchiveReader.toc_block_size_entries)
......@@ -325,9 +326,9 @@ class ArchiveReader(ArchiveObject):
self._n_toc - i_block * ArchiveReader.toc_block_size_entries)
for i in range(0, toc_block_size_entries):
offset = i * ArchiveReader.toc_entry_size
entry_uuid = unpackb(block_data[offset:offset + ArchiveReader.toc_uuid_size])
positions_encoded = unpackb(block_data[offset + ArchiveReader.toc_uuid_size:offset + ArchiveReader.toc_entry_size])
offset = i * _toc_item_size
entry_uuid = unpackb(block_data[offset:offset + _toc_uuid_size])
positions_encoded = unpackb(block_data[offset + _toc_uuid_size:offset + _toc_item_size])
positions = (
ArchiveReader._decode_position(positions_encoded[0]),
ArchiveReader._decode_position(positions_encoded[1]))
......@@ -468,13 +469,161 @@ def read_archive(file_or_path: str, **kwargs) -> ArchiveReader:
return ArchiveReader(file_or_path, **kwargs)
class ArchiveFileDB:
def __init__(self, fileio: Union[str, BytesIO], mode: str = 'r', entry_toc_depth: int = 2, **kwargs):
self._fileobj = fileio
self._mode = mode
self._entry_toc_depth = entry_toc_depth
self._data: Dict[str, Any] = {}
self._key_length = utils.default_hash_len
self._db = None
self._ids: List[str] = []
self._infokey = self._adjust_key('INFO', 'X')
def write(self, abspath: str, relpath: str):
"""
Mimic the zipfile function to write files to database.
Arguments:
abspath: The absolute path to the file to be read
relpath: For compatibility with zipfile
"""
self.add_data(abspath)
def close(self, save: bool = True):
"""
Mimic the zipfile function to close the msgpack file.
Will trigger the creation of the database when in write mode.
Arguments:
save: If True will add the current data in memory to database
"""
if 'w' in self._mode:
self.create_db()
if isinstance(self._fileobj, BytesIO) and save:
self._fileobj.close()
self._fileobj = None
def create_db(self):
with ArchiveWriter(self._fileobj, len(self._data) + 1, self._entry_toc_depth) as db:
for key, val in self._data.items():
key = self._adjust_key(key)
self._ids.append(key)
db.add(key, val)
db.add(self._infokey, dict(ids=self._ids, entry_toc_depth=self._entry_toc_depth))
def _adjust_key(self, key: str, fill_with: str = ' '):
key = key.rjust(self._key_length, fill_with)
assert len(key) == self._key_length
return key
def add_data(self, data: Union[str, Dict[str, Any], List[Union[str, Dict]]]):
"""
Add data to the msgpack database.
Arguments:
data: Can be a filename or dictionary or list of both
"""
if isinstance(data, str):
key = os.path.basename(data)
if data.endswith('json'):
uid = key.split('.')[0]
val = json.load(open(data))
if isinstance(val, dict):
self._data[uid] = val
else:
try:
uid = key.split('.')[0]
dtype = key.split('.')[-1]
val = open(data).read()
if dtype not in self._data:
self._data[dtype] = {}
if val:
self._data[dtype].update({uid: val})
except Exception:
pass
elif isinstance(data, dict):
for key, val in data.items():
if val:
self._data[key] = val
elif isinstance(data, list):
for i in range(len(data)):
self.add_data(data[i])
else:
raise ValueError
@property
def ids(self):
if not self._ids:
with ArchiveReader(self._fileobj) as db:
self._ids = db[self._infokey]['ids']
return self._ids
@staticmethod
def _get_index(key: str) -> Union[Tuple[int, int], int]:
key = key.strip()
bracket = key.find('[')
if bracket <= 0:
return None
assert key[-1] == ']'
str_index = key[bracket + 1: -1]
if ':' in str_index:
lo_str, hi_str = str_index.split(':')
lo = int(lo_str) if lo_str else 0
hi = int(hi_str) if hi_str else 10000000
return lo, hi
else:
# if db structure should be maintained, return lo, lo + 1
# if conform with python indexing, return lo
lo = int(str_index)
return lo
def _load_data(self, qdict: Dict[str, Any], db: ArchiveObject, main_section: bool = False):
if not isinstance(qdict, dict):
if isinstance(db, ArchiveObject):
return db.to_dict()
elif isinstance(db, ArchiveList):
return list(db)
elif isinstance(db, ArchiveItem):
return dict(db)
else:
return db
res = {}
for key, val in qdict.items():
index = ArchiveFileDB._get_index(key)
dbkey = key.split('[')[0]
if main_section:
dbkey = self._adjust_key(dbkey)
try:
if index is None:
res[key] = self._load_data(val, db[dbkey])
elif isinstance(index, int):
res[key] = self._load_data(val, db[dbkey])[index]
else:
res[key] = self._load_data(val, db[dbkey])[index[0]: index[1]]
except Exception:
continue
return res
def query(self, qdict):
with ArchiveReader(self._fileobj) as db:
return self._load_data(qdict, db, True)
if __name__ == '__main__':
def benchmark():
from time import time
import sys
with open('local/test_be.json') as f:
with open('archive_test.json') as f:
example_data = json.load(f)
size = 5000 if len(sys.argv) == 1 else int(sys.argv[1])
......@@ -513,30 +662,4 @@ if __name__ == '__main__':
packb(example_archive)
print('msgpack: create archive (1): ', time() - start)
# v0.8.0 impl
from nomad.archive_library import filedb
start = time()
buffer = BytesIO()
db = filedb.ArchiveFileDB(buffer, mode='w', max_lfragment=3)
db.add_data({
uuid: data for uuid, data in example_archive})
db.close(save=False)
print('filedb.py: create archive (1): ', time() - start)
buffer = BytesIO(buffer.getbuffer())
start = time()
for _ in range(0, 23):
db = filedb.ArchiveFileDB(buffer, mode='r', max_lfragment=3)
db.get_docs(db.ids[example_uuid + '/section_run/section_system'][0])
print('filedb.py: access single entry system (23): ', (time() - start) / 23)
buffer = BytesIO(buffer.getbuffer())
start = time()
db = filedb.ArchiveFileDB(buffer, mode='r', max_lfragment=3)
for _ in range(0, 23):
for i, entry in enumerate(example_archive):
if i % access_every == 0:
db.get_docs(db.ids[entry[0] + '/section_run/section_system'][0])
print('filedb.py: access every %d-ed entry single entry system (23): ' % access_every, (time() - start) / 23)
benchmark()
# Copyright 2019 Alvin Noe Ladines, Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module for storage of archive data using the msgpack module.
In module ``ArchiveFileDB, the data are fragmented and saved as a list
in a msgpack file.
A component can be retrieved by giving an offset to the msgpack file to
be unpacked. The database can then be queried by giving a schema similar
to the archive data.
To build the database,
.. code-block:: python
db = ArchiveFileDB("db.msg", mode='w', max_lfragment=3)
db.add_data(["archive1.json", "archive2.json"])
db.close()
To query the database,
.. code-block:: python
db = ArchiveFileDB("db.msg", mode='r')
db.query({'idX':{'sectionX':{'propertyX':'*'}}})
db.close()
"""
import msgpack
import json
import os
import re
from io import StringIO, BufferedWriter, BufferedReader, BytesIO
from typing import Union, Dict, List, Any, IO
_PLACEHOLDER = '*'
class JSONdata:
"""
Provides a graphQL-style query for a given json data and query schema.
Arguments:
data: The json data to be queried
"""
def __init__(self, data: Dict[str, Any]):
self.data = self._merge_list(data)
def _merge_list(self, data: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(data, dict):
return data
merged: Dict[str, Any] = {}
main_keys = []
for key, val in data.items():
bracket = key.find('[')
val = self._merge_list(val)
if bracket > 0:
index = int(key[bracket + 1:].strip().rstrip(']'))
main_key = key[:bracket]
if main_key not in merged:
main_keys.append(main_key)
merged[main_key] = {}
merged[main_key][index] = val
else:
merged[key] = val
for key in main_keys:
merged[key] = [merged[key][n] for n in sorted(merged[key].keys())]
return merged
def _get_index(self, str_index: str, nval_ref: int):
str_index = str_index.strip().lstrip('[').rstrip(']')
if ':' in str_index:
lo_str, hi_str = str_index.split(':')
lo = int(lo_str) if lo_str else 0
hi = int(hi_str) if hi_str else 0
lo = nval_ref + lo if lo < 0 else lo
hi = nval_ref + hi if hi <= 0 else hi
if hi > nval_ref:
hi = nval_ref
if lo > hi or lo < 0 or hi < 0:
return
if lo > hi or lo < 0 or hi < 0:
return
index = list(range(lo, hi))
else:
index = [int(str_index)]
return index
def get_data(self, entry: Dict[str, Any], ref=None) -> Dict[str, Any]:
out_data: Dict[str, Any] = {}
if ref is None:
ref = self.data
if not isinstance(entry, dict):
return ref
for key, val in entry.items():
index = None
bracket = key.find('[')
if bracket > 0:
str_index = key[bracket:]
key = key[:bracket]
index = self._get_index(str_index, len(ref[key]))
if key not in ref:
continue
if index is None:
out_data[key] = self.get_data(val, ref[key])
else:
try:
data = [self.get_data(val, ref[key][n]) for n in index]
out_data[key] = data
except Exception:
continue
out_data = self._merge_list(out_data)
return out_data
class ArchiveFileDB:
"""
An interface to the messagepack module to provide an searchable
container of archive data.
Arguments:
fileio: can be a string or file object to read/write the msgpack file
mode: r/w to indicate read or write the msgpack file
max_lfragment: the maximum level for which the archive data will
be fragmented for more efficient unpacking of msgpack components
"""
def __init__(self, fileio: Union[str, IO, BytesIO], mode: str = 'r', max_lfragment: int = None):
self._fileobj = fileio
if isinstance(fileio, str) or isinstance(fileio, BytesIO):
self._mode = mode
elif isinstance(fileio, BufferedReader):
self._mode = 'rb'
elif isinstance(fileio, BufferedWriter):
self._mode = 'wb'
else:
raise TypeError
self._max_lfragment = max_lfragment
if 'w' in self._mode and self._max_lfragment is None:
self._max_lfragment = 2
self._ids = None
self._data: Dict[str, Any] = {}
@property
def max_lfragment(self) -> int:
if self._max_lfragment is None:
orig_mode = self.mode
self.mode = 'rb'
self._max_lfragment = self.get_docs('max_lfragment')
self.mode = orig_mode
return self._max_lfragment
def _fragment_json(self, data: Dict[str, Any], key='', cur_lfragment=0) -> List[Dict[str, Dict]]:
if cur_lfragment >= self.max_lfragment:
pass
elif isinstance(data, list):
res: List[Dict[str, Any]] = []
main = dict(path=key, data=[])
for i in range(len(data)):
if not isinstance(data[i], dict):
break
p = '%s[%d]' % (key, i)
res += self._fragment_json(data[i], p, cur_lfragment)
main['data'].append(p)
res += [main]
return res
elif isinstance(data, dict):
res = []
cur_lfragment += 1
main = dict(path=key, data=[])
for k, v in data.items():
p = os.path.join(key, k)
res += self._fragment_json(v, p, cur_lfragment)
main['data'].append(p)
res += [main]
return res
return [dict(path=key, data={os.path.basename(key): data})]
def write(self, abspath: str, relpath: str):
"""
Mimic the zipfile function to write files to database.
Arguments:
abspath: The absolute path to the file to be read
relpath: For compatibility with zipfile
"""
self.add_data(abspath)
def close(self, save=True):
"""
Mimic the zipfile function to close the msgpack file.
Will trigger the creation of the database when in write mode.
Arguments:
save: If True will add the current data in memory to database
"""
if 'w' in self._mode:
self.create_db()
if self._fileobj and save:
self._fileobj.close()
self._fileobj = None
def save(self):
"""
Commits current data in memory to database
"""
self.create_db()
def add_data(self, data: Union[str, Dict[str, Any], List[Union[str, Dict]]]):
"""
Add data to the msgpack database.
Arguments:
data: Can be a filename or dictionary or list of both
"""
if isinstance(data, str):
key = os.path.basename(data)
if data.endswith('json'):
key = key.split('.')[0]
val = json.load(open(data))
if val:
self._data[key] = val
else:
key = key.replace('.', '_')
val = open(data).read()
if val:
self._data[key] = val
elif isinstance(data, dict):
for key, val in data.items():
if val:
self._data[key] = val
elif isinstance(data, list):
for i in range(len(data)):
self.add_data(data[i])
else:
raise NotImplementedError
def _load_data(self):
orig_mode = self.mode
self.mode = 'rb'
self.fileobj.seek(0)
data_loaded = msgpack.load(self.fileobj)
self.mode = orig_mode
return data_loaded
def create_db(self):
"""