From eefa4945ed23d6688b6fce6107ccdf16c4b74c71 Mon Sep 17 00:00:00 2001 From: Theodore Chang <theodore.chang@physik.hu-berlin.de> Date: Tue, 17 May 2022 16:05:51 +0000 Subject: [PATCH] Resolve "Inter-entry reference resolution in client, cli, and archive reader" --- nomad/app/v1/models.py | 2 +- nomad/app/v1/routers/entries.py | 20 +-- nomad/archive/required.py | 250 ++++++++++++++++++++++---- nomad/search.py | 2 +- tests/test_archive.py | 308 ++++++++++++++++++++++++-------- 5 files changed, 459 insertions(+), 123 deletions(-) diff --git a/nomad/app/v1/models.py b/nomad/app/v1/models.py index 4671128836..87100e2a9e 100644 --- a/nomad/app/v1/models.py +++ b/nomad/app/v1/models.py @@ -45,7 +45,7 @@ from nomad.metainfo.elasticsearch_extension import DocumentType, material_entry_ from .utils import parameter_dependency_from_model, update_url_query_arguments -User = datamodel.User.m_def.a_pydantic.model +User: Any = datamodel.User.m_def.a_pydantic.model # It is important that datetime.datetime comes last. Otherwise, number valued strings # are interpreted as epoch dates by pydantic Value = Union[StrictInt, StrictFloat, StrictBool, str, datetime.datetime] diff --git a/nomad/app/v1/routers/entries.py b/nomad/app/v1/routers/entries.py index 270695b07c..3e21f5874b 100644 --- a/nomad/app/v1/routers/entries.py +++ b/nomad/app/v1/routers/entries.py @@ -681,14 +681,14 @@ def _read_archive(entry_metadata, uploads, required_reader: RequiredReader): return { 'entry_id': entry_id, 'parser_name': entry_metadata['parser_name'], - 'archive': required_reader.read(archive, entry_id)} + 'archive': required_reader.read(archive, entry_id, upload_id)} except ArchiveQueryError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e)) -def _validate_required(required: ArchiveRequired) -> RequiredReader: +def _validate_required(required: ArchiveRequired, user) -> RequiredReader: try: - return RequiredReader(required) + return RequiredReader(required, user=user) except RequiredValidationError as e: raise HTTPException( status.HTTP_422_UNPROCESSABLE_ENTITY, @@ -707,7 +707,7 @@ def _read_entry_from_archive(entry, uploads, required_reader: RequiredReader): 'entry_id': entry_id, 'upload_id': upload_id, 'parser_name': entry['parser_name'], - 'archive': required_reader.read(archive, entry_id)} + 'archive': required_reader.read(archive, entry_id, upload_id)} except ArchiveQueryError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e)) except KeyError as e: @@ -716,14 +716,14 @@ def _read_entry_from_archive(entry, uploads, required_reader: RequiredReader): return None -def _read_entries_from_archive(entries, required): +def _read_entries_from_archive(entries, required, user): ''' Takes pickleable arguments so that it can be offloaded to worker processes. It is important to ensure the return values are also pickleable. ''' with _Uploads() as uploads: - required_reader = _validate_required(required) + required_reader = _validate_required(required, user) responses = [_read_entry_from_archive( entry, uploads, required_reader) for entry in entries if entry is not None] @@ -756,7 +756,7 @@ def _answer_entries_archive_request( config.archive.max_process_number) if number <= 1: - request_data: list = _read_entries_from_archive(search_response.data, required) + request_data: list = _read_entries_from_archive(search_response.data, required, user) else: entries_per_process = len(search_response.data) // number + 1 @@ -765,7 +765,7 @@ def _answer_entries_archive_request( try: responses = pool.map( - functools.partial(_read_entries_from_archive, required=required), + functools.partial(_read_entries_from_archive, required=required, user=user), zip_longest(*[iter(search_response.data)] * entries_per_process)) finally: # gracefully shutdown the pool @@ -858,7 +858,7 @@ def _answer_entries_archive_download_request( manifest = [] search_includes = ['entry_id', 'upload_id', 'parser_name'] - required_reader = RequiredReader('*') + required_reader = RequiredReader('*', user=user) # a generator of StreamedFile objects to create the zipstream from def streamed_files(): @@ -1077,7 +1077,7 @@ async def get_entry_raw_file( def answer_entry_archive_request( query: Dict[str, Any], required: ArchiveRequired, user: User, entry_metadata=None): - required_reader = _validate_required(required) + required_reader = _validate_required(required, user) if not entry_metadata: response = perform_search( diff --git a/nomad/archive/required.py b/nomad/archive/required.py index 6d6dbbfac1..443f045eb9 100644 --- a/nomad/archive/required.py +++ b/nomad/archive/required.py @@ -16,12 +16,17 @@ # limitations under the License. # +import dataclasses import functools +import re from typing import cast, Union, Dict, Tuple, Any +from cachetools.func import lru_cache +from fastapi import HTTPException + from nomad import utils from nomad.metainfo import Definition, Section, Quantity, SubSection, Reference, QuantityReference -from .storage import ArchiveReader, ArchiveList, ArchiveError +from .storage import ArchiveReader, ArchiveList, ArchiveError, ArchiveDict from .query import ArchiveQueryError, _to_son, _query_archive_key_pattern, _extract_key_and_index, \ _extract_child @@ -56,6 +61,82 @@ def _setdefault(target: Union[dict, list], key, value_type: type): return target[key] +# ../entries/<entry_id>/archive#<path> +# /entries/<entry_id>/archive#<path> +_regex_form_a = re.compile(r'^(?:\.\.)?/entries/([^?]+)/(archive|raw)#([^?]+?)$') + +# ../upload/<upload_id>/archive/<entry_id>#<path> +# /uploads/<upload_id>/archive/<entry_id>#<path> +# <installation>/uploads/<upload_id>/archive/<entry_id>#<path> +_regex_form_b = re.compile(r'^([^?]+?)?/uploads?/(\w*)/?(archive|raw)/([^?]+?)#([^?]+?)$') + + +def _parse_path(url: str, upload_id: str = None): + ''' + Parse a reference path. + + The upload_id of current upload is taken as the input to account for that the relative reference has no + information about the upload_id, and it may also contain no entry_id. Has to know the upload_id when only + path to mainfile is given. + + On exit: + Returns None if the path is invalid. Otherwise, returns a tuple of: (installation, entry_id, kind, path) + + If installation is None, indicating it is a local path. + ''' + + url_match = _regex_form_a.match(url) + if url_match: + entry_id = url_match.group(1) + kind = url_match.group(2) # archive or raw + path = url_match.group(3) + + return None, upload_id, entry_id, kind, path + + # try another form + url_match = _regex_form_b.match(url) + if not url_match: + # not valid + return None + + installation = url_match.group(1) + if installation == '': + installation = None + elif installation == '..': + installation = None + + # if empty, it is a local reference to the same upload, use the current upload_id + other_upload_id = upload_id if url_match.group(2) == '' else url_match.group(2) + + kind = url_match.group(3) # archive or raw + entry_id = url_match.group(4) + path = url_match.group(5) + + if entry_id.startswith('mainfile/'): + entry_id = utils.hash(other_upload_id, entry_id.replace('mainfile/', '')) + elif '/' in entry_id: # should not contain '/' in entry_id + return None + + return installation, other_upload_id, entry_id, kind, path + + +@dataclasses.dataclass +class RequiredReferencedArchive: + ''' + Hold necessary information for each query so that there is no need to + pass them around as separate arguments. + + The same RequiredReader object will be reused for different archives, + so it is chosen to pass archive explicitly as a method argument instead of + class member. Otherwise, a new RequiredReader object needs to be created. + ''' + upload_id: str = None + path_prefix: str = None + result_root: dict = None + archive_root: ArchiveDict = None + visited_paths: set = dataclasses.field(default_factory=lambda: set()) + + class RequiredReader: ''' Clients can read only the required parts of an archive. They specify the required @@ -95,24 +176,22 @@ class RequiredReader: def __init__( self, required: Union[dict, str], root_section_def: Section = None, - resolve_inplace: bool = False): + resolve_inplace: bool = False, user=None): if root_section_def is None: from nomad import datamodel self.root_section_def = datamodel.EntryArchive.m_def else: self.root_section_def = root_section_def - # noinspection PyTypeChecker - self._result_root: dict = None - # noinspection PyTypeChecker - self._archive_root: dict = None # it is actually an ArchiveReader, but we use it as dict - self.resolve_inplace = resolve_inplace self.required = self.validate(required, is_root=True) + # store user information that will be used to retrieve references using the same authentication + self.user = user + def validate( self, required: Union[str, dict], definition: Definition = None, - loc: list = None, is_root=False) -> dict: + loc: list = None, is_root: bool = False) -> dict: ''' Validates the required specification of this instance. It will replace all string directives with dicts. Those will have keys `_def` and `_directive`. It @@ -187,25 +266,24 @@ class RequiredReader: return result - def read(self, archive_reader: ArchiveReader, entry_id: str) -> dict: + def read(self, archive_reader: ArchiveReader, entry_id: str, upload_id: str) -> dict: ''' Reads the archive of the given entry id from the given archive reader and applies the instance's requirement specification. ''' - assert self._result_root is None and self._archive_root is None, \ - 'instance cannot be used concurrently for multiple reads at the same time' - self._archive_root = archive_reader[utils.adjust_uuid_size(entry_id)] - result: dict = {} - self._result_root = result - result.update(**cast(dict, self._apply_required(self.required, self._archive_root))) + archive_root = archive_reader[utils.adjust_uuid_size(entry_id)] + result_root: dict = {} - self._archive_root = None - self._result_root = None + dataset = RequiredReferencedArchive(upload_id, '', result_root, archive_root) - return result + result = self._apply_required(self.required, archive_root, dataset) + result_root.update(**cast(dict, result)) - def _resolve_refs(self, archive: dict, definition: Definition) -> dict: + return result_root + + def _resolve_refs( + self, definition: Definition, archive: dict, dataset: RequiredReferencedArchive) -> dict: ''' Resolves all references in archive. ''' if isinstance(definition, Quantity): # it's a quantity ref, the archive is already resolved @@ -219,18 +297,17 @@ class RequiredReader: prop_def = section_def.all_properties[prop] if isinstance(prop_def, SubSection): def handle_item(v): - return self._resolve_refs(v, prop_def.sub_section.m_resolved()) + return self._resolve_refs(prop_def.sub_section.m_resolved(), v, dataset) elif isinstance(prop_def.type, Reference): if isinstance(prop_def.type, QuantityReference): target_def = prop_def.type.target_quantity_def.m_resolved() else: target_def = prop_def.type.target_section_def.m_resolved() - required = dict( - _directive='include-resolved', _def=target_def, _ref=prop_def.type) + required = dict(_directive='include-resolved', _def=target_def, _ref=prop_def.type) def handle_item(v): - return self._resolve_ref(required, v) + return self._resolve_ref(required, v, dataset) else: result[prop] = value.to_list() if isinstance(value, ArchiveList) else value continue @@ -240,18 +317,73 @@ class RequiredReader: return result - def _resolve_ref(self, required: dict, path: str) -> Union[dict, str]: + def _resolve_ref(self, required: dict, path: str, dataset: RequiredReferencedArchive) -> Union[dict, str]: # The archive item is a reference, the required is still a dict, the references # This is a simplified version of the metainfo implementation (m_resolve). # It implements the same semantics, but does not apply checks. # TODO the metainfo should also provide this implementation - # resolve to archive_item - if not path.startswith('/'): - # TODO support custom reference resolution, e.g. user_id based + # check if local reference first so that it does not break backward compatibility + if not path.startswith('/entries') and not path.startswith('/uploads'): + if path.startswith('/'): + # legacy version in the same entry + return self._resolve_ref_local(required, path, dataset) + + if path.startswith('#/'): + # new version in the same entry + return self._resolve_ref_local(required, path[1:], dataset) + + # it appears to be a local path may or may not be the same archive + url_parts = _parse_path(path, dataset.upload_id) + + # cannot identify the path, return the path + if url_parts is None: return path - resolved = self._archive_root + installation, upload_id, entry_id, kind, fragment = url_parts + + if installation is None: + # it is a local archive + # check circular reference + new_path = entry_id + fragment + # circular reference, does not handle + if new_path in dataset.visited_paths: + return path + + other_archive = self._retrieve_archive(kind, entry_id, upload_id) + # fail to parse the archive, the archive may not exist, return the plain path + if other_archive is None: + return path + + other_dataset = RequiredReferencedArchive( + upload_id=upload_id, path_prefix=f'{dataset.path_prefix}/{entry_id}', + visited_paths=dataset.visited_paths.copy()) + + # add the path to the visited paths + other_dataset.visited_paths.add(new_path) + + # if not resolved inplace + # need to create a new path in the result to make sure data does not overlap + other_dataset.result_root = _setdefault( + dataset.result_root, entry_id, dict) if not self.resolve_inplace else dataset.result_root + + other_dataset.archive_root = other_archive + + # need to resolve it again to get relative position correctly + return self._resolve_ref_local(required, fragment, other_dataset) + + # it appears to be a remote reference, won't try to resolve it + if self.resolve_inplace: + raise ArchiveQueryError(f'resolvable reference to remote archive not implemented: {path}') + + # simply return the intact path if not required to resolve in-place + return path + + def _resolve_ref_local(self, required: dict, path: str, dataset: RequiredReferencedArchive): + ''' + On enter, path must be relative to the archive root. + ''' + resolved = dataset.archive_root path_stack = path.strip('/').split('/') try: @@ -264,14 +396,14 @@ class RequiredReader: if isinstance(required['_def'], Quantity): resolved_result = _to_son(resolved) else: - resolved_result = self._apply_required(required, resolved) + resolved_result = self._apply_required(required, resolved, dataset) # type: ignore # return or add to root depending on self.resolve_inplace if self.resolve_inplace: return resolved_result path_stack.reverse() - target_container: Union[dict, list] = self._result_root + target_container: Union[dict, list] = dataset.result_root # noinspection PyTypeChecker prop_or_index: Union[str, int] = None while len(path_stack) > 0: @@ -287,19 +419,22 @@ class RequiredReader: target_container.extend([None] * size_diff) # type: ignore target_container[prop_or_index] = resolved_result # type: ignore - return path - def _apply_required(self, required: dict, archive_item: Union[dict, str]) -> Union[Dict, str]: + return dataset.path_prefix + path + + def _apply_required( + self, required: dict, archive_item: Union[dict, str], + dataset: RequiredReferencedArchive) -> Union[Dict, str]: if archive_item is None: - return None + return None # type: ignore directive = required.get('_directive') if directive is not None: if directive == 'include-resolved': if isinstance(archive_item, str): - return self._resolve_ref(required, archive_item) + return self._resolve_ref(required, archive_item, dataset) - return self._resolve_refs(archive_item, required['_def']) + return self._resolve_refs(required['_def'], archive_item, dataset) if directive in ['*', 'include']: return _to_son(archive_item) @@ -307,7 +442,7 @@ class RequiredReader: raise ArchiveQueryError(f'unknown directive {required}') if isinstance(archive_item, str): - return self._resolve_ref(required, archive_item) + return self._resolve_ref(required, archive_item, dataset) result: dict = {} for key, val in required.items(): @@ -320,10 +455,49 @@ class RequiredReader: archive_child = _extract_child(archive_item, prop, index) if isinstance(archive_child, (ArchiveList, list)): - result[prop] = [self._apply_required(val, item) for item in archive_child] + result[prop] = [self._apply_required(val, item, dataset) for item in archive_child] else: - result[prop] = self._apply_required(val, archive_child) + result[prop] = self._apply_required(val, archive_child, dataset) except (KeyError, IndexError): continue return result + + @lru_cache(maxsize=32) + def _retrieve_archive(self, kind: str, id_or_path: str, upload_id: str): + ''' + Retrieves the archive from the server using the stored user credentials. + + The entry_id based API is used. + + The upload_id is only used when the path to mainfile is given to fetch the corresponding entry_id. + + Returns: + dict: The archive as a dict. + None: The archive could not be found. + ''' + + if kind == 'raw': + # it is a path to raw file + # get the corresponding entry id + from nomad.processing import Entry + entry: Entry = Entry.objects(upload_id=upload_id, mainfile=id_or_path).first() + if not entry: + # cannot find the entry, None will be identified in the caller + return None + + entry_id = entry.entry_id + else: + # it is an entry id + entry_id = id_or_path + + # do not rely on upload_id, as have to retrieve the archive via API due to user credentials + from nomad.app.v1.routers.entries import answer_entry_archive_request + # it should retrieve the minimum information from server with minimal bandwidth + try: + response = answer_entry_archive_request(dict(entry_id=entry_id), required='*', user=self.user) + except HTTPException: + # in case of 404, return None to indicate that the archive cannot be found + return None + + return response['data']['archive'] diff --git a/nomad/search.py b/nomad/search.py index 474ec1fe84..1263639030 100644 --- a/nomad/search.py +++ b/nomad/search.py @@ -893,7 +893,7 @@ def _es_to_api_aggregation( if isinstance(agg, BucketAggregation): es_agg = es_aggs['agg:' + name] - values = set() + values: set = set() def get_bucket(es_bucket) -> Bucket: if has_no_pagination: diff --git a/tests/test_archive.py b/tests/test_archive.py index 4088457ec9..f138ed2a5f 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -24,6 +24,7 @@ import os.path import json from nomad import utils, config +from nomad.archive.required import _parse_path from nomad.metainfo import MSection, Quantity, Reference, SubSection, QuantityReference from nomad.datamodel import EntryArchive from nomad.archive.storage import TOCPacker, _decode, _entries_per_block @@ -32,6 +33,7 @@ from nomad.archive import ( write_partial_archive_to_mongo, read_partial_archive_from_mongo, read_partial_archives_from_mongo, create_partial_archive, compute_required_with_referenced, RequiredReader, RequiredValidationError) +from nomad.utils.exampledata import ExampleData def create_example_uuid(index: int = 0): @@ -283,92 +285,145 @@ def test_read_springer(): springer['doesnotexist'] +@pytest.mark.parametrize( + 'path, result', [ + pytest.param( + '/entries/sample_entry/archive#/seg01/1', (None, None, 'sample_entry', 'archive', '/seg01/1'), + id='local-same-upload-01'), + pytest.param( + '../entries/sample_entry/archive#/seg01/1', (None, None, 'sample_entry', 'archive', '/seg01/1'), + id='local-same-upload-02'), + pytest.param( + '/uploads/sample_upload/archive/sample_entry#/seg1/22', + (None, 'sample_upload', 'sample_entry', 'archive', '/seg1/22'), + id='local-another-upload-01'), + pytest.param( + '/upload/archive/sample_entry#/seg1/22', (None, None, 'sample_entry', 'archive', '/seg1/22'), + id='local-another-upload-02'), + pytest.param( + '/upload/archive/mainfile/mainfile_id#/seg1/22', + (None, None, utils.hash(None, 'mainfile_id'), 'archive', '/seg1/22'), + id='local-another-upload-03'), + pytest.param( + '../uploads/sample_upload/archive/sample_entry#/seg1/22', + (None, 'sample_upload', 'sample_entry', 'archive', '/seg1/22'), + id='local-another-upload-04'), + pytest.param( + 'https://myoasis.de/uploads/sample_upload/archive/sample_entry#/run/0/calculation/1', + ('https://myoasis.de', 'sample_upload', 'sample_entry', 'archive', '/run/0/calculation/1'), + id='remote-upload-01'), + pytest.param( + './uploads/sample_upload/archive/sample_entry#/run/0/calculation/1', + ('.', 'sample_upload', 'sample_entry', 'archive', '/run/0/calculation/1'), id='remote-upload-02'), + pytest.param( + './uploads/sample_upload/archives/sample_entry#/run/0/calculation/1', + None, id='remote-upload-03'), + pytest.param( + 'localhost/uploads/sample_upload/archive/sample_entry#/run/0/calculation/1', + ('localhost', 'sample_upload', 'sample_entry', 'archive', '/run/0/calculation/1'), + id='remote-upload-04'), + pytest.param( + 'http://127.0.0.1/uploads/sample_upload/archive/sample_entry#/run/0/calculation/1', + ('http://127.0.0.1', 'sample_upload', 'sample_entry', 'archive', '/run/0/calculation/1'), + id='remote-upload-05'), + ]) +def test_parsing_reference(path, result): + path_parts = _parse_path(path) + assert path_parts == result + + @pytest.fixture(scope='session') -def archive(): - archive = EntryArchive.m_from_dict(json.loads(''' - { - "metadata": { - "entry_id": "test_id" - }, - "results": { - "properties": { - "electronic": { - "dos_electronic": { - "energies": "/run/0/calculation/1/dos_electronic/0/energies" - } - } +def json_dict(): + return json.loads( + ''' +{ + "metadata": { + "entry_id": "test_id" + }, + "results": { + "properties": { + "electronic": { + "dos_electronic": { + "energies": "/run/0/calculation/1/dos_electronic/0/energies" } - }, - "run": [ + } + } + }, + "run": [ + { + "system": [ { - "system": [ + "atoms": { + "labels": [ + "He" + ] + }, + "symmetry": [ { - "atoms": { - "labels": [ - "He" - ] - }, - "symmetry": [ - { - "space_group_number": 221 - } - ] - }, - { - "atoms": { - "labels": [ - "H" - ] - }, - "symmetry": [ - { - "space_group_number": 221 - } - ] + "space_group_number": 221 } - ], - "calculation": [ - { - "system_ref": "/run/0/system/1", - "energy": { - "total": { - "value": 0.1 - } - } - }, - { - "system_ref": "/run/0/system/1", - "energy": { - "total": { - "value": 0.2 - } - }, - "dos_electronic": [ - { - "energies": [0.0, 0.1] - } - ], - "eigenvalues": [ - ] - }, + ] + }, + { + "atoms": { + "labels": [ + "H" + ] + }, + "symmetry": [ { - "system_ref": "/run/0/system/1", - "energy": { - "total": { - "value": 0.1 - } - } + "space_group_number": 221 } ] } ], - "workflow": [ + "calculation": [ + { + "system_ref": "/run/0/system/1", + "energy": { + "total": { + "value": 0.1 + } + } + }, + { + "system_ref": "/run/0/system/1", + "energy": { + "total": { + "value": 0.2 + } + }, + "dos_electronic": [ + { + "energies": [0.0, 0.1] + } + ], + "eigenvalues": [ + ] + }, { - "calculation_result_ref": "/run/0/calculation/1" + "system_ref": "/run/0/system/1", + "energy": { + "total": { + "value": 0.1 + } + } } ] } - ''')) + ], + "workflow": [ + { + "calculation_result_ref": "/run/0/calculation/1" + } + ] +} +''') + + +@pytest.fixture(scope='session') +def archive(json_dict): + archive = EntryArchive.m_from_dict(json_dict) assert archive.run is not None assert len(archive.run) == 1 return archive @@ -440,11 +495,118 @@ def test_required_reader(archive, required, error, resolve_inplace): return assert error is None - results = required_reader.read(archive_reader, 'entry_id') + results = required_reader.read(archive_reader, 'entry_id', None) assert_required_results(results, required_reader.required, archive) +@pytest.fixture(scope='module') +def example_data_with_reference(elastic_module, raw_files_module, mongo_module, test_user, json_dict): + ''' + Provides a couple of entries with references. + + Only used in test_required_reader_with_remote_reference. + ''' + data = ExampleData(main_author=test_user) + + data.create_upload(upload_id='id_published_with_ref', upload_name='name_published', published=True) + + del json_dict['results'] + + ref_list = [ + '/run/0/calculation/1', # plain direct reference + '#/run/0/calculation/1', # new-style reference + '../entries/id_01/archive#/workflow/0/calculation_result_ref', # reference to another archive + '../entries/id_05/archive#/workflow/0/calculation_result_ref', # circular reference + '../entries/id_04/archive#/workflow/0/calculation_result_ref', # circular reference + 'https://another.domain/entries/id_03/archive#/workflow/0/calculation_result_ref' # remote reference + ] + + for index, ref in enumerate(ref_list): + json_dict['workflow'][0]['calculation_result_ref'] = ref + data.create_entry( + upload_id='id_published_with_ref', + entry_id=f'id_{index + 1:02d}', + entry_archive=EntryArchive.m_from_dict(json_dict)) + + data.save(with_files=True, with_es=True, with_mongo=True) + yield data + data.delete() + + +@pytest.fixture(scope='function') +def remote_reference_required(): + ''' + Only used in test_required_reader_with_remote_reference. + ''' + return {'workflow': 'include-resolved'} + + +@pytest.mark.parametrize( + 'resolve_inplace', [ + pytest.param(True, id='inplace'), + pytest.param(False, id='root'), + ]) +@pytest.mark.parametrize( + 'entry_id, inplace_result', [ + pytest.param( + 'id_01', { + 'system_ref': {'atoms': {'labels': ['H']}, 'symmetry': [{'space_group_number': 221}]}, + 'energy': {'total': {'value': 0.2}}, 'dos_electronic': [{'energies': [0.0, 0.1]}]}, + id='plain-direct-reference'), + pytest.param( + 'id_02', { + 'system_ref': {'atoms': {'labels': ['H']}, 'symmetry': [{'space_group_number': 221}]}, + 'energy': {'total': {'value': 0.2}}, 'dos_electronic': [{'energies': [0.0, 0.1]}]}, + id='new-style-reference'), + pytest.param( + 'id_03', { + 'system_ref': {'atoms': {'labels': ['H']}, 'symmetry': [{'space_group_number': 221}]}, + 'energy': {'total': {'value': 0.2}}, 'dos_electronic': [{'energies': [0.0, 0.1]}]}, + id='reference-to-another-archive'), + pytest.param( + # circular reference detected thus untouched + 'id_04', '../entries/id_04/archive#/workflow/0/calculation_result_ref', + id='circular-reference-1'), + pytest.param( + # circular reference detected thus untouched + 'id_05', '../entries/id_05/archive#/workflow/0/calculation_result_ref', + id='circular-reference-2'), + pytest.param( + # remote reference detected thus untouched + 'id_06', 'https://another.domain/entries/id_03/archive#/workflow/0/calculation_result_ref', + id='remote-reference'), + pytest.param( + 'id_07', '../entries/id_07/archive#/workflow/0/calculation_result_ref', + id='does-not-exist'), + ]) +def test_required_reader_with_remote_reference( + json_dict, remote_reference_required, resolve_inplace, + example_data_with_reference, test_user, entry_id, inplace_result): + archive = {'workflow': json_dict['workflow']} + + archive['workflow'][0][ + 'calculation_result_ref'] = f'../entries/{entry_id}/archive#/workflow/0/calculation_result_ref' + + f = BytesIO() + write_archive(f, 1, [('entry_id', archive)], entry_toc_depth=2) + packed_archive = f.getbuffer() + + archive_reader = ArchiveReader(BytesIO(packed_archive)) + required_reader = RequiredReader( + remote_reference_required, resolve_inplace=resolve_inplace, user=test_user) + results = required_reader.read(archive_reader, 'entry_id', None) + ref_result = results['workflow'][0]['calculation_result_ref'] + + if resolve_inplace or entry_id == 'id_07': + assert ref_result == inplace_result + else: + # if not resolved inplace, the target archive is copied to the current archive, + # so the reference is overwritten by the following pattern, + # whether the target destination is another reference is not controlled by this reference. + assert ref_result == f'/{entry_id}/workflow/0/calculation_result_ref' + + def assert_required_results( results: dict, required: dict, archive: MSection, current_results: Union[dict, str] = None, -- GitLab