Commit 8cc5d822 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added a binary search to read only some blocks of an archive files TOC instead of the whole TOC.

parent ebdf5115
......@@ -5,6 +5,7 @@ import msgpack
from msgpack.fallback import Packer, StringIO
import struct
import json
import math
from nomad import utils
......@@ -20,6 +21,10 @@ def unpackb(o, **kwargs):
return msgpack.unpackb(o, raw=False)
class ArchiveError(Exception):
pass
class TOCPacker(Packer):
"""
A special msgpack packer that records a TOC while packing.
......@@ -259,7 +264,7 @@ class ArchiveObject(ArchiveItem, Mapping):
class ArchiveReader(ArchiveObject):
def __init__(self, file_or_path: Union[str, BytesIO]):
def __init__(self, file_or_path: Union[str, BytesIO], use_blocked_toc=True):
self.file_or_path = file_or_path
f: BytesIO = None
......@@ -272,22 +277,95 @@ class ArchiveReader(ArchiveObject):
super().__init__(None, f)
# TODO do not load the whole top-level TOC. It has a fixed layout based on
# the msgpack spec and can be loaded block-by block. The calc_ids are uniformly
# distributed and sorted!
# this number is determined by the msgpack encoding of the file beginning:
# { 'toc_pos': <...>
# ^11
self._f.seek(11)
toc_position = ArchiveReader._decode_position(self._f.read(10))
self.toc_entry = self._read(toc_position)
self.use_blocked_toc = use_blocked_toc
if use_blocked_toc:
self._f.seek(11)
self._toc: Dict[str, Any] = {}
toc_start = toc_position[0]
self._f.seek(toc_start)
b = self._f.read(1)[0]
if b & 0b11110000 == 0b10000000:
self._n_toc = b & 0b00001111
self._toc_offset = toc_start + 1
elif b == 0xde:
self._n_toc, = struct.unpack_from(">H", self._f.read(2))
self._toc_offset = toc_start + 3
elif b == 0xdf:
self._n_toc, = struct.unpack_from(">I", self._f.read(4))
self._toc_offset = toc_start + 5
else:
raise ArchiveError('Archive top-level TOC is not a msgpack map (dictionary).')
else:
self.toc_entry = self._read(toc_position)
def __enter__(self):
return self
fs_block_size = 4096
toc_uuid_size = 23
toc_entry_size = 48
toc_block_size_entries = math.ceil(4096 / 48)
toc_block_size_bytes = math.ceil(4096 / 48) * 48
def _load_toc_block(self, i_entry: int):
i_block = math.floor(i_entry / ArchiveReader.toc_block_size_entries)
self._f.seek(i_block * ArchiveReader.toc_block_size_bytes + self._toc_offset)
block_data = self._f.read(ArchiveReader.toc_block_size_bytes)
first, last = None, None
toc_block_size_entries = min(
ArchiveReader.toc_block_size_entries,
self._n_toc - i_block * ArchiveReader.toc_block_size_entries)
for i in range(0, toc_block_size_entries):
offset = i * ArchiveReader.toc_entry_size
entry_uuid = unpackb(block_data[offset:offset + ArchiveReader.toc_uuid_size])
positions_encoded = unpackb(block_data[offset + ArchiveReader.toc_uuid_size:offset + ArchiveReader.toc_entry_size])
positions = (
ArchiveReader._decode_position(positions_encoded[0]),
ArchiveReader._decode_position(positions_encoded[1]))
self._toc[entry_uuid] = positions
if i == 0:
first = entry_uuid
if i + 1 == toc_block_size_entries:
last = entry_uuid
return first, last
def __getitem__(self, key):
toc_position = ArchiveReader._decode_position(self.toc_entry[key][0])
data_position = ArchiveReader._decode_position(self.toc_entry[key][1])
if self.use_blocked_toc:
positions = self._toc.get(key)
# TODO use hash algo instead of binary search
if positions is None:
r_start = 0
r_end = self._n_toc
while positions is None:
i_block = r_start + math.floor((r_end - r_start) / 2)
first, last = self._load_toc_block(i_block)
if key < first:
r_end = i_block - 1
elif key > last:
r_start = i_block + 1
else:
positions = self._toc.get(key)
break
toc_position, data_position = positions
else:
positions_encoded = self.toc_entry[key]
toc_position = ArchiveReader._decode_position(positions_encoded[0])
data_position = ArchiveReader._decode_position(positions_encoded[1])
toc = self._read(toc_position)
return ArchiveObject(toc, self._f, data_position[0])
......@@ -372,7 +450,7 @@ def write_archive(
writer.add(uuid, entry)
def read_archive(file_or_path: str) -> ArchiveReader:
def read_archive(file_or_path: str, **kwargs) -> ArchiveReader:
"""
Allows to read a msgpack-based archive.
......@@ -387,27 +465,19 @@ def read_archive(file_or_path: str) -> ArchiveReader:
a 'with' statement to free the underlying file resource after use.
"""
return ArchiveReader(file_or_path)
class PositionEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return 'position'
return json.JSONEncoder.default(self, obj)
return ArchiveReader(file_or_path, **kwargs)
if __name__ == '__main__':
def benchmark():
from time import time
import sys
with open('local/test_be.json') as f:
example_data = json.load(f)
size = 1000
size = 5000 if len(sys.argv) == 1 else int(sys.argv[1])
access_every = 2
example_archive = [(utils.create_uuid(), example_data) for _ in range(0, size)]
example_uuid = example_archive[int(size / 2)][0]
......@@ -421,20 +491,22 @@ if __name__ == '__main__':
# read single entry from archive
buffer = BytesIO(buffer.getbuffer())
start = time()
for _ in range(0, 23):
read_archive(buffer)[example_uuid]['section_run']['section_system']
print('archive.py: access single entry system (23): ', (time() - start) / 23)
for use_blocked_toc in [False, True]:
start = time()
for _ in range(0, 23):
read_archive(buffer, use_blocked_toc=use_blocked_toc)[example_uuid]['section_run']['section_system']
print('archive.py: access single entry system (23), blocked %d: ' % use_blocked_toc, (time() - start) / 23)
# read every n-ed entry from archive
buffer = BytesIO(buffer.getbuffer())
start = time()
for _ in range(0, 23):
with read_archive(buffer) as data:
for i, entry in enumerate(example_archive):
if i % access_every == 0:
data[entry[0]]['section_run']['section_system']
print('archive.py: access every %d-ed entry single entry system (23): ' % access_every, (time() - start) / 23)
for use_blocked_toc in [False, True]:
start = time()
for _ in range(0, 23):
with read_archive(buffer, use_blocked_toc=use_blocked_toc) as data:
for i, entry in enumerate(example_archive):
if i % access_every == 0:
data[entry[0]]['section_run']['section_system']
print('archive.py: access every %d-ed entry single entry system (23), blocked %d: ' % (access_every, use_blocked_toc), (time() - start) / 23)
# just msgpack
start = time()
......
......@@ -122,15 +122,39 @@ def test_write_archive_multi(example_uuid, example_entry):
assert example_uuid in toc
def test_read_archive_single(example_uuid, example_entry):
@pytest.mark.parametrize('use_blocked_toc', [False, True])
def test_read_archive_single(example_uuid, example_entry, use_blocked_toc):
f = BytesIO()
write_archive(f, 1, [(example_uuid, example_entry)])
packed_archive = f.getbuffer()
f = BytesIO(packed_archive)
data = read_archive(f)
data = read_archive(f, use_blocked_toc=use_blocked_toc)
assert example_uuid in data
assert data[example_uuid]['run']['system'][1] == example_entry['run']['system'][1]
assert data[example_uuid]['run'].to_dict() == example_entry['run']
assert data[example_uuid].to_dict() == example_entry
@pytest.mark.parametrize('use_blocked_toc', [False, True])
def test_read_archive_multi(example_uuid, example_entry, use_blocked_toc):
archive_size = ArchiveReader.toc_block_size_entries * 2 + 23
f = BytesIO()
write_archive(
f, archive_size,
[('{:22d}'.format(i), example_entry) for i in range(0, archive_size)])
packed_archive = f.getbuffer()
f = BytesIO(packed_archive)
with ArchiveReader(f, use_blocked_toc=use_blocked_toc) as reader:
if use_blocked_toc:
reader._load_toc_block(0)
assert reader._toc.get('{:22d}'.format(0)) is not None
assert len(reader._toc) == ArchiveReader.toc_block_size_entries
reader._load_toc_block(archive_size - 1)
assert reader._toc.get('{:22d}'.format(archive_size - 1)) is not None
assert len(reader._toc) > ArchiveReader.toc_block_size_entries
for i in range(0, archive_size):
reader.get('{:22d}'.format(i)) is not None
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment