From 94d8412531ced89e8cc5f15c032e117bbfcc74ec Mon Sep 17 00:00:00 2001 From: Markus Scheidgen <markus.scheidgen@gmail.com> Date: Fri, 28 Apr 2023 14:12:22 +0200 Subject: [PATCH] Added config option to fallback to a different archive version. This allows to configure multiple archive version for an installation allowing to share an archive from another installation. All write operations go to one version. Read operations go through a list of versions to see what is available. Changelog: Added --- nomad/config/models.py | 10 ++++++- nomad/files.py | 68 +++++++++++++++++++++++++++++------------- tests/test_files.py | 41 ++++++++++++++++++++----- 3 files changed, 90 insertions(+), 29 deletions(-) diff --git a/nomad/config/models.py b/nomad/config/models.py index 7a786a4bec..32e4c5bc5c 100644 --- a/nomad/config/models.py +++ b/nomad/config/models.py @@ -359,7 +359,15 @@ class FS(NomadSettings): north_home_external: str = None local_tmp = '/tmp' prefix_size = 2 - archive_version_suffix = 'v1' + archive_version_suffix: Union[str, List[str]] = Field('v1', description=''' + This allows to add an additional segment to the names of archive files and + thereby allows different NOMAD installations to work with the same storage + directories and raw files, but with separate archives. + + If this is a list, the first string is used. If the file with the first + string does not exist on read, the system will look for the file with the + next string, etc. + ''') working_directory = os.getcwd() external_working_directory: str = None diff --git a/nomad/files.py b/nomad/files.py index a86ca961c2..0f6d57a774 100644 --- a/nomad/files.py +++ b/nomad/files.py @@ -44,7 +44,7 @@ original mainfile, and vice versa. ''' from abc import ABCMeta -from typing import IO, Set, Dict, Iterable, Iterator, List, Tuple, Any, NamedTuple +from typing import IO, Set, Dict, Iterable, Iterator, List, Tuple, Any, NamedTuple, Callable from functools import lru_cache from pydantic import BaseModel from datetime import datetime @@ -531,6 +531,37 @@ def create_zipstream( return iter(zip_stream) +def _versioned_archive_file_object( + target_dir: DirectoryObject, file_name: Callable[[str], str], fallback: bool +) -> PathObject: + ''' + Creates a file object for an archive file depending on the directory it is or + will be created in, the recipe to construct the name from a version suffix, and + a bool that denotes if alternative version suffixes should be considered. + ''' + version_suffixes = config.fs.archive_version_suffix + + if not isinstance(version_suffixes, list): + version_suffixes = [version_suffixes] + + if len(version_suffixes) <= 1: + version_suffix = version_suffixes[0] + if not version_suffix or version_suffix == '': + return target_dir.join_file(file_name('')) + else: + return target_dir.join_file(file_name(f'-{version_suffix}')) + + if not fallback: + return target_dir.join_file(file_name(f'-{version_suffixes[0]}')) + + for version_suffix in version_suffixes: + current_file = target_dir.join_file(file_name(f'-{version_suffix}')) + if os.path.exists(current_file.os_path): + return current_file + + return target_dir.join_file(file_name(f'-{version_suffixes[0]}')) + + class UploadFiles(DirectoryObject, metaclass=ABCMeta): ''' Abstract base class for upload files. ''' def __init__(self, upload_id: str, create: bool = False): @@ -794,7 +825,7 @@ class StagingUploadFiles(UploadFiles): def write_archive(self, entry_id: str, data: Any) -> int: ''' Writes the data as archive file and returns the archive file size. ''' - archive_file_object = self.archive_file_object(entry_id) + archive_file_object = self._archive_file_object(entry_id) try: write_archive(archive_file_object.os_path, 1, data=[(entry_id, data)]) except Exception as e: @@ -804,18 +835,20 @@ class StagingUploadFiles(UploadFiles): raise e - return self.archive_file_object(entry_id).size + return archive_file_object.size def read_archive(self, entry_id: str, use_blocked_toc: bool = True) -> ArchiveReader: try: - return read_archive(self.archive_file_object(entry_id).os_path, use_blocked_toc=use_blocked_toc) + return read_archive(self._archive_file_object(entry_id, True).os_path, use_blocked_toc=use_blocked_toc) except FileNotFoundError: raise KeyError(entry_id) - def archive_file_object(self, entry_id: str) -> PathObject: - version_suffix = '-' + config.fs.archive_version_suffix if config.fs.archive_version_suffix else '' - return self._archive_dir.join_file(f'{entry_id}{version_suffix}.msg') + def _archive_file_object(self, entry_id: str, fallback: bool = False) -> PathObject: + def versioned_file_name(version_suffix): + return f'{entry_id}{version_suffix}.msg' + + return _versioned_archive_file_object(self._archive_dir, versioned_file_name, fallback=fallback) def add_rawfiles( self, path: str, target_dir: str = '', cleanup_source_file_and_dir: bool = False, @@ -1072,7 +1105,7 @@ class StagingUploadFiles(UploadFiles): def create_iterator(): for entry in entries: - archive_file = self.archive_file_object(entry.entry_id) + archive_file = self._archive_file_object(entry.entry_id) if archive_file.exists(): data = read_archive(archive_file.os_path)[entry.entry_id].to_dict() yield (entry.entry_id, data) @@ -1285,25 +1318,18 @@ class PublicUploadFiles(UploadFiles): return self._missing_raw_files @staticmethod - def _create_msg_file_object(target_dir: DirectoryObject, access: str) -> PathObject: - version_suffix = '-' + config.fs.archive_version_suffix if config.fs.archive_version_suffix else '' - return target_dir.join_file(f'archive-{access}{version_suffix}.msg.msg') + def _create_msg_file_object(target_dir: DirectoryObject, access: str, fallback: bool = False) -> PathObject: + def versioned_file_name(version_suffix): + return f'archive-{access}{version_suffix}.msg.msg' - def msg_file_object(self) -> PathObject: - ''' - Gets the msg file, either public or restricted, depending on which one is used. - If both public and restricted files exist, or if none of them exist, a KeyError will - be thrown. - ''' - self.access # Invoke to initialize - return self._archive_msg_file_object + return _versioned_archive_file_object(target_dir, versioned_file_name, fallback) def _open_msg_file(self, use_blocked_toc: bool = True) -> ArchiveReader: if self._archive_msg_file is not None: if not self._archive_msg_file.is_closed(): return self._archive_msg_file - msg_file_object = self.msg_file_object() + msg_file_object = PublicUploadFiles._create_msg_file_object(self, self.access, fallback=True) if not msg_file_object.exists(): raise FileNotFoundError() @@ -1500,7 +1526,7 @@ class PublicUploadFiles(UploadFiles): return # Nothing to do self.close() new_access = 'restricted' if with_embargo else 'public' - msg_file_object = self.msg_file_object() + msg_file_object = PublicUploadFiles._create_msg_file_object(self, self.access) msg_file_object_new = PublicUploadFiles._create_msg_file_object(self, new_access) if msg_file_object.exists(): if msg_file_object_new.exists(): diff --git a/tests/test_files.py b/tests/test_files.py index 90d4c53b03..571add65d3 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -496,8 +496,13 @@ class TestPublicUploadFiles(UploadFilesContract): with pytest.raises(KeyError): StagingUploadFiles(upload_files.upload_id) - def test_archive_version_suffix(self, monkeypatch, test_upload_id): - monkeypatch.setattr('nomad.config.fs.archive_version_suffix', 'test_suffix') + @pytest.mark.parametrize('suffixes,suffix', [ + pytest.param(None, '', id='none'), + pytest.param('v1', '-v1', id='single'), + pytest.param(['v2', 'v1'], '-v2', id='fallback'), + ]) + def test_archive_version_suffix(self, monkeypatch, test_upload_id, suffixes, suffix): + monkeypatch.setattr('nomad.config.fs.archive_version_suffix', suffixes) _, entries, upload_files = create_staging_upload(test_upload_id, entry_specs='p') upload_files.pack(entries, with_embargo=False) upload_files.delete() @@ -505,14 +510,36 @@ class TestPublicUploadFiles(UploadFilesContract): public_upload_files = PublicUploadFiles(test_upload_id) assert os.path.exists(public_upload_files.join_file('raw-public.plain.zip').os_path) - assert not os.path.exists(public_upload_files.join_file('raw-public-test_suffix.plain.zip').os_path) - assert not os.path.exists(public_upload_files.join_file('raw-restricted-test_suffix.plain.zip').os_path) - assert os.path.exists(public_upload_files.join_file('archive-public-test_suffix.msg.msg').os_path) - assert not os.path.exists(public_upload_files.join_file('archive-public.msg.msg').os_path) - assert not os.path.exists(public_upload_files.join_file('archive-restricted.msg.msg').os_path) + assert os.path.exists(public_upload_files.join_file(f'archive-public{suffix}.msg.msg').os_path) assert_upload_files(test_upload_id, entries, PublicUploadFiles) + def test_archive_version_suffix_fallback(self, monkeypatch, test_upload_id): + monkeypatch.setattr('nomad.config.fs.archive_version_suffix', ['v1']) + _, entries, upload_files = create_staging_upload(test_upload_id, entry_specs='p') + upload_files.pack(entries, with_embargo=False) + + monkeypatch.setattr('nomad.config.fs.archive_version_suffix', ['v2', 'v1']) + v1_file = upload_files._archive_file_object(0, fallback=True) + v2_file = upload_files._archive_file_object(0, fallback=False) + assert os.path.basename(v1_file.os_path) == '0-v1.msg' + assert os.path.basename(v2_file.os_path) == '0-v2.msg' + + upload_files.write_archive('0', {}) + assert v1_file.exists() + assert v2_file.exists() + assert os.path.basename(upload_files._archive_file_object(0, fallback=True).os_path) == '0-v2.msg' + + upload_files.delete() + + public_upload_files = PublicUploadFiles(test_upload_id) + v2_file = PublicUploadFiles._create_msg_file_object( + public_upload_files, public_upload_files.access, fallback=False) + v1_file = PublicUploadFiles._create_msg_file_object( + public_upload_files, public_upload_files.access, fallback=True) + assert not v2_file.exists() + assert os.path.basename(v1_file.os_path) == 'archive-public-v1.msg.msg' + def assert_upload_files( upload_id: str, entries: Iterable[datamodel.EntryMetadata], cls, -- GitLab