diff --git a/nomad/app/v1/routers/entries.py b/nomad/app/v1/routers/entries.py index 6b2d9dcceaf650668662279c8311fe1d072c7693..dec77da1600330682caca6772b95958a0c5453f9 100644 --- a/nomad/app/v1/routers/entries.py +++ b/nomad/app/v1/routers/entries.py @@ -1038,7 +1038,7 @@ def _read_entry_from_archive(entry: dict, uploads, required_reader: RequiredRead try: upload_files = uploads.get_upload_files(upload_id) - with upload_files.read_archive(entry_id, True) as archive: + with upload_files.read_archive(entry_id) as archive: entry['archive'] = required_reader.read(archive, entry_id, upload_id) return entry except ArchiveQueryError as e: diff --git a/nomad/archive/converter.py b/nomad/archive/converter.py index 96b6e494fd78e1b69613958571f556b81c4273ef..75b5613495ede857467ced2dd7abae8fcdab43b3 100644 --- a/nomad/archive/converter.py +++ b/nomad/archive/converter.py @@ -130,7 +130,7 @@ def convert_archive( try: tmp_path = '' - with read_archive(original_path, use_blocked_toc=False) as reader: + with read_archive(original_path) as reader: flush(f'{prefix} [INFO] Converting: {original_path}') tmp_path = ( f'{original_path}.{hashlib.md5(original_path.encode()).hexdigest()}' diff --git a/nomad/archive/storage.py b/nomad/archive/storage.py index 086a1baaf23ec89d677caf54fef86020e749d452..6733f2b37d4fdb49c0cf811217cc0998d1c31b90 100644 --- a/nomad/archive/storage.py +++ b/nomad/archive/storage.py @@ -17,7 +17,6 @@ # from __future__ import annotations -import struct from collections.abc import Generator, Mapping, Sequence from io import BufferedReader, BytesIO from typing import Any, cast @@ -139,7 +138,7 @@ class ArchiveDict(ArchiveItem, Mapping): class ArchiveReader(ArchiveDict): - def __init__(self, file_or_path: str | BytesIO, use_blocked_toc=True): + def __init__(self, file_or_path: str | BytesIO): self._file_or_path = file_or_path if isinstance(self._file_or_path, str): @@ -160,36 +159,7 @@ class ArchiveReader(ArchiveDict): # this number is determined by the msgpack encoding of the file beginning: # { 'toc_pos': <...> # ^11 - self._toc_position = _decode(self._direct_read(10, 11)) - - self._use_blocked_toc = use_blocked_toc - - if not self._use_blocked_toc: - self._toc_entry = self._read(self._toc_position) - return - - toc_start = self._toc_position[0] - b = self._direct_read(1, toc_start)[0] - if b & 0b11110000 == 0b10000000: - self._toc_number = b & 0b00001111 - self._toc_offset = toc_start + 1 - elif b == 0xDE: - (self._toc_number,) = struct.unpack_from( - '>H', self._direct_read(2, toc_start + 1) - ) - self._toc_offset = toc_start + 3 - elif b == 0xDF: - (self._toc_number,) = struct.unpack_from( - '>I', self._direct_read(4, toc_start + 1) - ) - self._toc_offset = toc_start + 5 - else: - raise ArchiveError( - 'Archive top-level TOC is not a msgpack map (dictionary).' - ) - - self._toc: dict[str, Any] = {} - self._toc_block_info = [None] * (self._toc_number // _entries_per_block + 1) + self._toc_entry = self._read(_decode(self._direct_read(10, 11))) def __enter__(self): return self @@ -197,74 +167,9 @@ class ArchiveReader(ArchiveDict): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def _load_toc_block(self, i_entry: int): - i_block = i_entry // _entries_per_block - - if self._toc_block_info[i_block]: - return self._toc_block_info[i_block] - - i_offset = i_block * _bytes_per_block + self._toc_offset - block_data = self._direct_read(_bytes_per_block, i_offset) - - first, last = None, None - - entries_current_block = min( - _entries_per_block, self._toc_number - i_block * _entries_per_block - ) - - offset: int = 0 - for i in range(entries_current_block): - entry_uuid, positions = _unpack_entry( - block_data[offset : offset + _toc_item_size] - ) - self._toc[entry_uuid] = positions - offset += _toc_item_size - - if i == 0: - first = entry_uuid - - if i + 1 == entries_current_block: - last = entry_uuid - - self._toc_block_info[i_block] = (first, last) # type: ignore - - return self._toc_block_info[i_block] - def _locate_position(self, key: str) -> tuple: - if not self._use_blocked_toc or self._toc_entry is not None: - positions = self._toc_entry[key] - return _decode(positions[0]), _decode(positions[1]) - - if self._toc_number == 0: - raise KeyError(key) - - positions = self._toc.get(key) - # TODO use hash algorithm instead of binary search - if positions is None: - r_start = 0 - r_end = self._toc_number - i_entry = None - while r_start <= r_end: - m_entry = r_start + (r_end - r_start) // 2 - if i_entry == m_entry: - break - - i_entry = m_entry - - first, last = self._load_toc_block(i_entry) - - if key < first: - r_end = i_entry - 1 - elif key > last: - r_start = i_entry + 1 - else: - break - - positions = self._toc.get(key) - if positions is None: - raise KeyError(key) - - return positions + positions = self._toc_entry[key] + return _decode(positions[0]), _decode(positions[1]) def __getitem__(self, key): toc_position, data_position = self._locate_position(utils.adjust_uuid_size(key)) @@ -291,17 +196,9 @@ class ArchiveReader(ArchiveDict): return self._read(toc_position), _iter(data_position) def __iter__(self): - if self._toc_entry is None: - # is not necessarily read when using blocked toc - self._toc_entry = self._read(self._toc_position) - return self._toc_entry.__iter__() def __len__(self): - if self._toc_entry is None: - # is not necessarily read when using blocked toc - self._toc_entry = self._read(self._toc_position) - return self._toc_entry.__len__() def close(self): @@ -329,8 +226,6 @@ def read_archive(file_or_path: str | BytesIO, **kwargs) -> ArchiveReader: from .storage_v2 import ArchiveReader as ArchiveReaderNew from .storage_v2 import ArchiveWriter as ArchiveWriterNew - kwargs['use_blocked_toc'] = False - # todo: replace implementation to enable automatic conversion # if isinstance(file_or_path, str): # from nomad.archive.converter import convert_archive @@ -350,7 +245,7 @@ def read_archive(file_or_path: str | BytesIO, **kwargs) -> ArchiveReader: if magic == ArchiveWriterNew.magic: return ArchiveReaderNew(file_or_path, **kwargs) # type: ignore - return ArchiveReader(file_or_path, **kwargs) + return ArchiveReader(file_or_path) if __name__ == '__main__': diff --git a/nomad/archive/storage_v2.py b/nomad/archive/storage_v2.py index ffb8fdaf6fdfdc0eca76797f591756a71ea0934a..6eacd7957f7268cc3989507d1d565767e4cbb334 100644 --- a/nomad/archive/storage_v2.py +++ b/nomad/archive/storage_v2.py @@ -17,7 +17,6 @@ # from __future__ import annotations -import struct from collections.abc import Generator, Iterable from io import BytesIO @@ -624,7 +623,6 @@ class ArchiveReader(ArchiveItem): def __init__( self, file_or_path: str | BytesIO, - use_blocked_toc: bool = True, counter: ArchiveReadCounter = None, ): self._file_or_path: str | BytesIO = file_or_path @@ -647,40 +645,8 @@ class ArchiveReader(ArchiveItem): # { 'toc_pos': <...> # ^11 # 11 ==> 1 (0b0000XXXX for map) + 1 (0b101XXXXX for str key) + 7 ('toc_pos') + 2 (0xc4 0bXXXXXXXX for bin 8) - self._toc_position = tuple( - Utility.decode(self._direct_read(10, 11 + ArchiveWriter.magic_len)) - ) - self._toc_entry: dict | None = None - - self._use_blocked_toc: bool = use_blocked_toc - - # if not using blocked TOC, load everything now - if not self._use_blocked_toc: - self._ensure_toc() - return - - # determine the map storing the TOC - # https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family - toc_start: int = self._toc_position[0] - if (b := self._direct_read(1, toc_start)[0]) & 0b11110000 == 0b10000000: - self._toc_number: int = b & 0b00001111 - self._toc_offset: int = toc_start + 1 - elif b == 0xDE: - (self._toc_number,) = struct.unpack_from( - '>H', self._direct_read(2, toc_start + 1) - ) - self._toc_offset = toc_start + 3 - elif b == 0xDF: - (self._toc_number,) = struct.unpack_from( - '>I', self._direct_read(4, toc_start + 1) - ) - self._toc_offset = toc_start + 5 - else: - raise ArchiveError('Top level TOC is not a msgpack map (dictionary).') - - self._toc: dict = {} - self._toc_block_info: list = [None] * ( - self._toc_number // Utility.entries_per_block + 1 + self._toc_entry: dict = self._read( + *Utility.decode(self._direct_read(10, 11 + ArchiveWriter.magic_len)) ) def __enter__(self): @@ -691,69 +657,9 @@ class ArchiveReader(ArchiveItem): if exc_val: raise exc_val - def _load_toc_block(self, i_entry: int) -> tuple: - i_block: int = i_entry // Utility.entries_per_block - - if self._toc_block_info[i_block]: - return self._toc_block_info[i_block] - - i_offset: int = i_block * Utility.bytes_per_block + self._toc_offset - block_data = self._direct_read(Utility.bytes_per_block, i_offset) - - first, last = None, None - - offset: int = 0 - for i in range( - entries_current_block := min( - Utility.entries_per_block, - self._toc_number - i_block * Utility.entries_per_block, - ) - ): - entry_uuid, positions = Utility.unpack_entry( - block_data[offset : offset + Utility.toc_item_size] - ) - self._toc[entry_uuid] = positions - offset += Utility.toc_item_size - - if i == 0: - first = entry_uuid - - if i + 1 == entries_current_block: - last = entry_uuid - - self._toc_block_info[i_block] = first, last - - return self._toc_block_info[i_block] - def _locate_position(self, key: str) -> tuple: - if not self._use_blocked_toc or self._toc_entry is not None: - positions = self._toc_entry[key] - return Utility.decode(positions[0]), Utility.decode(positions[1]) - - if self._toc_number == 0: - raise KeyError(key) - - if (positions := self._toc.get(key)) is None: - r_start, r_end, i_entry = 0, self._toc_number, -9999 - while r_start <= r_end: - if i_entry == (m_entry := r_start + (r_end - r_start) // 2): - break - - i_entry = m_entry - - first, last = self._load_toc_block(i_entry) - - if key < first: - r_end = i_entry - 1 - elif key > last: - r_start = i_entry + 1 - else: - break - - if (positions := self._toc.get(key)) is None: - raise KeyError(key) - - return positions + positions = self._toc_entry[key] + return Utility.decode(positions[0]), Utility.decode(positions[1]) def __getitem__(self, key: str) -> ArchiveDict: key = utils.adjust_uuid_size(key) @@ -796,11 +702,9 @@ class ArchiveReader(ArchiveItem): return False def __iter__(self): - self._ensure_toc() return self._toc_entry.__iter__() def __len__(self): - self._ensure_toc() return self._toc_entry.__len__() def get(self, key: str, default=None): @@ -810,23 +714,16 @@ class ArchiveReader(ArchiveItem): return default def items(self): - self._ensure_toc() for k in self._toc_entry: yield k, self[k] def keys(self): - self._ensure_toc() return self._toc_entry.keys() def values(self): - self._ensure_toc() for k in self._toc_entry: yield self[k] - def _ensure_toc(self): - if self._toc_entry is None: - self._toc_entry = self._read(*self._toc_position) - def close(self, close_unowned: bool = False): if close_unowned or isinstance(self._file_or_path, str): self._f.close() diff --git a/nomad/cli/admin/springer.py b/nomad/cli/admin/springer.py index dec3b88fdc0c4f71670124933d6757971dceebcc..a3a02732b1594e5db3180b8b9396490b4f2c2555 100644 --- a/nomad/cli/admin/springer.py +++ b/nomad/cli/admin/springer.py @@ -181,8 +181,7 @@ def update_springer(max_n_query: int = 10, retry_time: int = 120): # querying database with unvailable dataset leads to error, # get toc keys first by making an empty query with read_archive(config.normalize.springer_db_path) as springer_archive: - _ = springer_archive._load_toc_block(0) - archive_keys = springer_archive._toc.keys() + archive_keys = springer_archive.keys() sp_data = archive.query_archive( config.normalize.springer_db_path, {spg: '*' for spg in archive_keys} diff --git a/nomad/cli/admin/uploads.py b/nomad/cli/admin/uploads.py index c55c64948f43df3568160bffa7438f1f9bc04783..95bf2862fc094ddae8486bff59b2fe19af21fc01 100644 --- a/nomad/cli/admin/uploads.py +++ b/nomad/cli/admin/uploads.py @@ -412,7 +412,7 @@ def export(ctx, uploads, required, output: str): entry_count += 1 total_count += 1 try: - archive = upload_files.read_archive(entry_id, use_blocked_toc=False) + archive = upload_files.read_archive(entry_id) archive_data = required_reader.read(archive, entry_id, upload_id) write(entry_id, archive_data) except ArchiveQueryError as e: @@ -1052,7 +1052,7 @@ def integrity( for entry in entries: entry_id = entry['entry_id'] es_nomad_version = entry['nomad_version'] - with upload.upload_files.read_archive(entry_id, False) as archive: + with upload.upload_files.read_archive(entry_id) as archive: archive_nomad_version = archive[entry_id]['metadata']['nomad_version'] if es_nomad_version != archive_nomad_version: return True diff --git a/nomad/files.py b/nomad/files.py index 14820896c12fa8ee36cc6bb25737c1761d23c4dc..4dcce117e49ccdbbc3e6cc20de012e98b45ef8ac 100644 --- a/nomad/files.py +++ b/nomad/files.py @@ -679,9 +679,7 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta): def scandir(self, path: str = '', depth: int = -1): raise NotImplementedError() - def read_archive( - self, entry_id: str, use_blocked_toc: bool = True - ) -> ArchiveReader: + def read_archive(self, entry_id: str) -> ArchiveReader: """ Returns an :class:`nomad.archive.ArchiveReader` that contains the given entry_id. @@ -880,15 +878,9 @@ class StagingUploadFiles(UploadFiles): return archive_file_object.size - def read_archive( - self, entry_id: str, use_blocked_toc: bool = True - ) -> ArchiveReader: + def read_archive(self, entry_id: str) -> ArchiveReader: try: - return read_archive( - self._archive_file_object(entry_id, True).os_path, - use_blocked_toc=use_blocked_toc, - ) - + return read_archive(self._archive_file_object(entry_id, True).os_path) except FileNotFoundError: raise KeyError(entry_id) @@ -1492,7 +1484,7 @@ class PublicUploadFiles(UploadFiles): return _versioned_archive_file_object(target_dir, versioned_file_name, fallback) - def _open_msg_file(self, use_blocked_toc: bool = True) -> ArchiveReader: + def _open_msg_file(self) -> ArchiveReader: if self._archive_msg_file is not None: if not self._archive_msg_file.is_closed(): return self._archive_msg_file @@ -1504,7 +1496,7 @@ class PublicUploadFiles(UploadFiles): if not msg_file_object.exists(): raise FileNotFoundError() - archive = read_archive(msg_file_object.os_path, use_blocked_toc=use_blocked_toc) + archive = read_archive(msg_file_object.os_path) assert archive is not None self._archive_msg_file = archive @@ -1706,9 +1698,9 @@ class PublicUploadFiles(UploadFiles): raise KeyError(file_path) - def read_archive(self, entry_id: str, use_blocked_toc: bool = True) -> Any: + def read_archive(self, entry_id: str) -> Any: try: - archive = self._open_msg_file(use_blocked_toc) + archive = self._open_msg_file() if entry_id in archive: return archive except FileNotFoundError: diff --git a/tests/archive/test_archive.py b/tests/archive/test_archive.py index b69d8c3af18c49f1482bbfd5001779528d865b92..67bea7b2e0f1a33b7a752c3d5cf9f90b521f05b3 100644 --- a/tests/archive/test_archive.py +++ b/tests/archive/test_archive.py @@ -165,14 +165,13 @@ def test_write_archive_multi(example_uuid, example_entry): assert example_uuid in toc -@pytest.mark.parametrize('use_blocked_toc', [False, True]) -def test_read_archive_single(example_uuid, example_entry, use_blocked_toc): +def test_read_archive_single(example_uuid, example_entry): f = BytesIO() write_archive(f, 1, [(example_uuid, example_entry)]) packed_archive = f.getbuffer() f = BytesIO(packed_archive) - data = read_archive(f, use_blocked_toc=use_blocked_toc) + data = read_archive(f) assert example_uuid in data assert data[example_uuid]['run']['system'][1] == example_entry['run']['system'][1] @@ -189,8 +188,7 @@ def test_read_archive_single(example_uuid, example_entry, use_blocked_toc): data[example_uuid]['run']['system'][2] -@pytest.mark.parametrize('use_blocked_toc', [False, True]) -def test_read_archive_multi(monkeypatch, example_uuid, example_entry, use_blocked_toc): +def test_read_archive_multi(monkeypatch, example_uuid, example_entry): monkeypatch.setattr('nomad.config.archive.small_obj_optimization_threshold', 256) archive_size = _entries_per_block * 2 + 23 @@ -203,17 +201,9 @@ def test_read_archive_multi(monkeypatch, example_uuid, example_entry, use_blocke packed_archive = f.getbuffer() f = BytesIO(packed_archive) - with read_archive(f, use_blocked_toc=use_blocked_toc) as reader: - # if use_blocked_toc: - # reader._load_toc_block(0) - # assert reader._toc.get(create_example_uuid(0)) is not None - # assert len(reader._toc) == _entries_per_block - # reader._load_toc_block(archive_size - 1) - # assert reader._toc.get(create_example_uuid(archive_size - 1)) is not None - # assert len(reader._toc) > _entries_per_block - + with read_archive(f) as reader: for i in range(0, archive_size): - reader.get(create_example_uuid(i)) is not None + assert reader.get(create_example_uuid(i)) is not None entry = reader[create_example_uuid(0)]