diff --git a/nomad/app/v1/routers/graph.py b/nomad/app/v1/routers/graph.py index a6aeaea909c3c54fe90822b399ba61405c2041db..3294569809f835eceae5c8ac8fe16102a707000b 100644 --- a/nomad/app/v1/routers/graph.py +++ b/nomad/app/v1/routers/graph.py @@ -60,7 +60,7 @@ async def raw_query( ): relocate_children(query) with MongoReader(query, user=user) as reader: - return normalise_response(reader.read()) + return normalise_response(await reader.read()) @router.post( @@ -82,7 +82,7 @@ async def basic_query( ) relocate_children(query_dict) with MongoReader(query_dict, user=user) as reader: - response: dict = reader.read() + response: dict = await reader.read() except ConfigError as e: raise HTTPException(400, detail=str(e)) except Exception as e: @@ -118,6 +118,6 @@ async def archive_query( del graph_dict[Token.SEARCH]['m_request']['query'] with UserReader(graph_dict, user=user) as reader: - response: dict = reader.read(user.user_id) + response: dict = await reader.read(user.user_id) return normalise_response(response) diff --git a/nomad/graph/graph_reader.py b/nomad/graph/graph_reader.py index 14cdc8ae81a998b6ea5477813fdfe05960e95ff4..d674c96a6a6f1ae4ffe5b461a75e9bbd388e5c23 100644 --- a/nomad/graph/graph_reader.py +++ b/nomad/graph/graph_reader.py @@ -17,6 +17,7 @@ # from __future__ import annotations +import asyncio import copy import dataclasses import functools @@ -136,6 +137,30 @@ class ConfigError(Exception): pass +async def goto_child(container, key: str | int | list): + if not isinstance(key, list): + if isinstance(container, (list, dict)): + return container[key] # type: ignore + + return await asyncio.to_thread(container.__getitem__, key) + + target = container + for v in key: + target = await goto_child(target, v) + return target + + +async def async_get(container, key, default=None): + if isinstance(container, dict): + return container.get(key, default) + + return await asyncio.to_thread(container.get, key, default) + + +async def async_to_json(data): + return await asyncio.to_thread(to_json, data) + + @dataclasses.dataclass(frozen=True) class GraphNode: upload_id: str # the upload id of the current node @@ -175,13 +200,13 @@ class GraphNode: def _generate_prefix(self) -> str: return f'../uploads/{self.upload_id}/archive/{self.entry_id}' - def goto(self, reference: str, resolve_inplace: bool) -> GraphNode: + async def goto(self, reference: str, resolve_inplace: bool) -> GraphNode: if reference.startswith(('/', '#')): - return self._goto_local(reference, resolve_inplace) + return await self._goto_local(reference, resolve_inplace) - return self._goto_remote(reference, resolve_inplace) + return await self._goto_remote(reference, resolve_inplace) - def _goto_local(self, reference: str, resolve_inplace: bool) -> GraphNode: + async def _goto_local(self, reference: str, resolve_inplace: bool) -> GraphNode: """ Go to a local reference. Since it is a local reference, only need to walk to the proper position. @@ -197,11 +222,11 @@ class GraphNode: raise ArchiveError(f'Circular reference detected: {reference_url}.') try: - target = self.__goto_path(self.archive_root, path_stack) + target = await self.__goto_path(self.archive_root, path_stack) except (KeyError, IndexError): raise ArchiveError(f'Archive {self.entry_id} does not contain {reference}.') - return self._switch_root( + return await self._switch_root( self.replace( archive=target, visited_path=self.visited_path.union({reference_url}) ), @@ -209,7 +234,7 @@ class GraphNode: reference_url, ) - def _goto_remote(self, reference: str, resolve_inplace: bool) -> GraphNode: + async def _goto_remote(self, reference: str, resolve_inplace: bool) -> GraphNode: """ Go to a remote archive, which can be either in the same server or another installation. """ @@ -255,11 +280,11 @@ class GraphNode: try: # now go to the target path - other_target = self.__goto_path(other_archive_root, path_stack) + other_target = await self.__goto_path(other_archive_root, path_stack) except (KeyError, IndexError): raise ArchiveError(f'Archive {other_entry_id} does not contain {path}.') - return self._switch_root( + return await self._switch_root( self.replace( upload_id=other_upload_id, entry_id=other_entry_id, @@ -271,13 +296,15 @@ class GraphNode: reference_url, ) - def _switch_root(self, node: GraphNode, resolve_inplace: bool, reference_url: str): + async def _switch_root( + self, node: GraphNode, resolve_inplace: bool, reference_url: str + ): if resolve_inplace: # place the target into the current result container return node # place the reference into the current result container - _populate_result( + await _populate_result( self.result_root, self.current_path, _convert_ref_to_path_string(reference_url), @@ -289,22 +316,26 @@ class GraphNode: ) @staticmethod - def __goto_path(target_root: ArchiveDict | dict, path_stack: list) -> Any: + async def __goto_path(target_root: ArchiveDict | dict, path_stack: list) -> Any: """ Go to the specified path in the data. """ for key in path_stack: - target_root = target_root[int(key) if key.isdigit() else key] + target_root = await goto_child( + target_root, int(key) if key.isdigit() else key + ) return target_root -def _if_exists(target_root: dict, path_stack: list) -> bool: +async def _if_exists(target_root: dict, path_stack: list) -> bool: """ Check if specified path in the data. """ try: for key in path_stack: - target_root = target_root[int(key) if key.isdigit() else key] + target_root = await goto_child( + target_root, int(key) if key.isdigit() else key + ) except (KeyError, IndexError): return False return target_root is not None @@ -365,7 +396,7 @@ def _to_response_config(config: RequestConfig, exclude: list = None, **kwargs): return response_config -def _populate_result(container_root: dict, path: list, value, *, path_like=False): +async def _populate_result(container_root: dict, path: list, value, *, path_like=False): """ For the given path and the root of the target container, populate the value. @@ -440,7 +471,7 @@ def _populate_result(container_root: dict, path: list, value, *, path_like=False # the target container does not necessarily have to be a dict or a list # if the result is striped due to large size, it will be replaced by a string - new_value = to_json(value) + new_value = await async_to_json(value) if isinstance(target_container, list): assert isinstance(key_or_index, int) if target_container[key_or_index] is None: @@ -815,13 +846,16 @@ class GeneralReader: path, set() ).add(config_hash) - def retrieve_user(self, user_id: str) -> str | dict: + async def retrieve_user(self, user_id: str) -> str | dict: # `me` is a convenient way to refer to the current user if user_id == 'me': user_id = self.user.user_id + def _retrieve(): + return User.get(user_id=user_id) + try: - user: User = User.get(user_id=user_id) + user: User = await asyncio.to_thread(_retrieve) except Exception as e: self._log(str(e), to_response=False) return user_id @@ -835,7 +869,7 @@ class GeneralReader: return user.m_to_dict(with_out_meta=True, include_derived=True) - def _overwrite_upload(self, item: Upload): + async def _overwrite_upload(self, item: Upload): plain_dict = orjson.loads(upload_to_pydantic(item).json()) if n_entries := plain_dict.pop('entries', None): plain_dict['n_entries'] = n_entries @@ -845,17 +879,17 @@ class GeneralReader: ) if main_author := plain_dict.pop('main_author', None): - plain_dict['main_author'] = self.retrieve_user(main_author) + plain_dict['main_author'] = await self.retrieve_user(main_author) for name in ('coauthors', 'reviewers', 'viewers', 'writers'): if (items := plain_dict.pop(name, None)) is not None: - plain_dict[name] = [self.retrieve_user(item) for item in items] + plain_dict[name] = [await self.retrieve_user(item) for item in items] return plain_dict - def retrieve_upload(self, upload_id: str) -> str | dict: + async def retrieve_upload(self, upload_id: str) -> str | dict: try: - upload: Upload = get_upload_with_read_access( - upload_id, self.user, include_others=True + upload: Upload = await asyncio.to_thread( + get_upload_with_read_access, upload_id, self.user, include_others=True ) except HTTPException as e: if e.status_code == 404: @@ -869,7 +903,7 @@ class GeneralReader: ) return upload_id - return self._overwrite_upload(upload) + return await self._overwrite_upload(upload) @staticmethod def _overwrite_entry(item: Entry): @@ -882,25 +916,29 @@ class GeneralReader: return plain_dict - def retrieve_entry(self, entry_id: str) -> str | dict: - if ( - perform_search( + async def retrieve_entry(self, entry_id: str) -> str | dict: + def _search(): + return perform_search( owner='all', query={'entry_id': entry_id}, user_id=self.user.user_id - ).pagination.total - == 0 - ): + ) + + if (await asyncio.to_thread(_search)).pagination.total == 0: self._log( f'The value {entry_id} is not a valid entry id or not visible to current user.', error_type=QueryError.NOACCESS, ) return entry_id - return self._overwrite_entry(Entry.objects(entry_id=entry_id).first()) + def _retrieve(): + return Entry.objects(entry_id=entry_id).first() - def retrieve_dataset(self, dataset_id: str) -> str | dict: - if ( - dataset := Dataset.m_def.a_mongo.objects(dataset_id=dataset_id).first() - ) is None: + return self._overwrite_entry(await asyncio.to_thread(_retrieve)) + + async def retrieve_dataset(self, dataset_id: str) -> str | dict: + def _retrieve(): + return Dataset.m_def.a_mongo.objects(dataset_id=dataset_id).first() + + if (dataset := await asyncio.to_thread(_retrieve)) is None: self._log( f'The value {dataset_id} is not a valid dataset id.', error_type=QueryError.NOTFOUND, @@ -940,7 +978,7 @@ class GeneralReader: f'Archive {entry_id} does not exist in upload {entry_id}.' ) - def _apply_resolver(self, node: GraphNode, config: RequestConfig): + async def _apply_resolver(self, node: GraphNode, config: RequestConfig): if_skip: bool = config.property_name not in GeneralReader.__UPLOAD_ID__ if_skip &= config.property_name not in GeneralReader.__USER_ID__ if_skip &= config.property_name not in GeneralReader.__DATASET_ID__ @@ -955,30 +993,31 @@ class GeneralReader: return node.archive if config.resolve_type is ResolveType.user: - return self.retrieve_user(node.archive) + return await self.retrieve_user(node.archive) if config.resolve_type is ResolveType.upload: - return self.retrieve_upload(node.archive) + return await self.retrieve_upload(node.archive) if config.resolve_type is ResolveType.entry: - return self.retrieve_entry(node.archive) + return await self.retrieve_entry(node.archive) if config.resolve_type is ResolveType.dataset: - return self.retrieve_dataset(node.archive) + return await self.retrieve_dataset(node.archive) return node.archive - def _resolve_list(self, node: GraphNode, config: RequestConfig): + async def _resolve_list(self, node: GraphNode, config: RequestConfig): # the original archive may be an empty list # populate an empty list to keep the structure - _populate_result(node.result_root, node.current_path, []) + await _populate_result(node.result_root, node.current_path, []) new_config: RequestConfig = config.new({'index': None}, retain_pattern=True) for i in _normalise_index(config.index, len(node.archive)): - self._resolve( + await self._resolve( node.replace( - archive=node.archive[i], current_path=node.current_path + [str(i)] + archive=await goto_child(node.archive, i), + current_path=node.current_path + [str(i)], ), new_config, ) - def _walk( + async def _walk( self, node: GraphNode, required: dict | RequestConfig, @@ -986,7 +1025,7 @@ class GeneralReader: ): raise NotImplementedError() - def _resolve( + async def _resolve( self, node: GraphNode, config: RequestConfig, @@ -1000,6 +1039,12 @@ class GeneralReader: def validate_config(cls, key: str, config: RequestConfig): raise NotImplementedError() + async def read(self, *args, **kwargs): + raise NotImplementedError() + + def sync_read(self, *args, **kwargs): + return asyncio.run(self.read(*args, **kwargs)) + class MongoReader(GeneralReader): def __init__(self, *args, **kwargs): @@ -1008,7 +1053,7 @@ class MongoReader(GeneralReader): self.uploads = None self.datasets = None - def _query_es(self, config: RequestConfig): + async def _query_es(self, config: RequestConfig): search_params: dict = { 'owner': 'user', 'user_id': self.user.user_id, @@ -1036,7 +1081,7 @@ class MongoReader(GeneralReader): if config.pagination and (not config.query or not config.query.pagination): # type: ignore search_params['pagination'] = config.pagination - search_response = perform_search(**search_params) + search_response = await asyncio.to_thread(perform_search, **search_params) # overwrite the pagination to the new one from the search response config.pagination = search_response.pagination @@ -1049,7 +1094,7 @@ class MongoReader(GeneralReader): v['entry_id']: _overwrite(v) for v in search_response.data } - def _query_entries(self, config: RequestConfig): + async def _query_entries(self, config: RequestConfig): if not config.query: return None, self.entries @@ -1079,7 +1124,7 @@ class MongoReader(GeneralReader): return config.query.dict(exclude_unset=True), self.entries.filter(mongo_query) - def _query_uploads(self, config: RequestConfig): + async def _query_uploads(self, config: RequestConfig): if not config.query: return None, self.uploads @@ -1107,7 +1152,7 @@ class MongoReader(GeneralReader): return config.query.dict(exclude_unset=True), self.uploads.filter(mongo_query) - def _query_datasets(self, config: RequestConfig): + async def _query_datasets(self, config: RequestConfig): if not config.query: return None, self.datasets @@ -1131,7 +1176,7 @@ class MongoReader(GeneralReader): return config.query.dict(exclude_unset=True), self.datasets.filter(mongo_query) - def _normalise( + async def _normalise( self, mongo_result, config: RequestConfig, transformer: Callable ) -> tuple[dict, PaginationResponse | None]: """ @@ -1183,7 +1228,7 @@ class MongoReader(GeneralReader): if transformer == upload_to_pydantic: mongo_dict = { v['upload_id']: v - for v in [self._overwrite_upload(item) for item in mongo_result] + for v in [await self._overwrite_upload(item) for item in mongo_result] } elif transformer == dataset_to_pydantic: mongo_dict = { @@ -1200,7 +1245,7 @@ class MongoReader(GeneralReader): return mongo_dict, pagination_response - def read(self): + async def read(self): """ All read() methods, including the ones in the subclasses, should return a dict as the response. It performs the following tasks: @@ -1227,7 +1272,7 @@ class MongoReader(GeneralReader): self.entries = Entry.objects(upload_id__in=[v.upload_id for v in self.uploads]) self.datasets = Dataset.m_def.a_mongo.objects(user_id=self.user.user_id) - self._walk( + await self._walk( GraphNode( upload_id='__NOT_NEEDED__', entry_id='__NOT_NEEDED__', @@ -1249,14 +1294,14 @@ class MongoReader(GeneralReader): return response - def _walk( + async def _walk( self, node: GraphNode, required: dict | RequestConfig, parent_config: RequestConfig, ): if isinstance(required, RequestConfig): - return self._resolve(node, required) + return await self._resolve(node, required) has_config: bool = GeneralReader.__CONFIG__ in required has_wildcard: bool = GeneralReader.__WILDCARD__ in required @@ -1269,7 +1314,7 @@ class MongoReader(GeneralReader): wildcard_config = required[GeneralReader.__WILDCARD__] if isinstance(wildcard_config, RequestConfig): # use the wildcard config to filter the current scope - self._resolve( + await self._resolve( node, wildcard_config, omit_keys=required.keys(), wildcard=True ) elif isinstance(node.archive, dict): @@ -1283,7 +1328,7 @@ class MongoReader(GeneralReader): required = required | extra_required elif has_config: # use the inherited/assigned config to filter the current scope - self._resolve(node, current_config, omit_keys=required.keys()) + await self._resolve(node, current_config, omit_keys=required.keys()) offload_pack: dict = { 'user': self.user, @@ -1296,49 +1341,49 @@ class MongoReader(GeneralReader): if key in (GeneralReader.__CONFIG__, GeneralReader.__WILDCARD__): continue - def offload_read(reader_cls, *args, read_list=False): + async def offload_read(reader_cls, *args, read_list=False): try: with reader_cls(value, **offload_pack) as reader: - _populate_result( + await _populate_result( node.result_root, node.current_path + [key], - [reader.read(item) for item in args[0]] + [await reader.read(item) for item in args[0]] if read_list - else reader.read(*args), + else await reader.read(*args), ) except Exception as exc: self._log(str(exc)) if key == Token.RAW and self.__class__ is UploadReader: # hitting the bottom of the current scope - offload_read(FileSystemReader, node.upload_id) + await offload_read(FileSystemReader, node.upload_id) continue if key == Token.METADATA and self.__class__ is EntryReader: # hitting the bottom of the current scope - offload_read(ElasticSearchReader, node.entry_id) + await offload_read(ElasticSearchReader, node.entry_id) continue if key == Token.UPLOAD and self.__class__ is EntryReader: # hitting the bottom of the current scope - offload_read(UploadReader, node.upload_id) + await offload_read(UploadReader, node.upload_id) continue if key == Token.MAINFILE and self.__class__ is EntryReader: # hitting the bottom of the current scope - offload_read( + await offload_read( FileSystemReader, node.upload_id, node.archive['mainfile_path'] ) continue if key == Token.ARCHIVE and self.__class__ is EntryReader: # hitting the bottom of the current scope - offload_read(ArchiveReader, node.upload_id, node.entry_id) + await offload_read(ArchiveReader, node.upload_id, node.entry_id) continue if key == Token.ENTRIES and self.__class__ is ElasticSearchReader: # hitting the bottom of the current scope - offload_read(EntryReader, node.archive['entry_id']) + await offload_read(EntryReader, node.archive['entry_id']) continue if isinstance(node.archive, dict) and isinstance(value, dict): @@ -1347,19 +1392,19 @@ class MongoReader(GeneralReader): if key in GeneralReader.__ENTRY_ID__: # offload to the upload reader if it is a nested query if isinstance(entry_id := node.archive.get(key, None), str): - offload_read(EntryReader, entry_id) + await offload_read(EntryReader, entry_id) continue if key in GeneralReader.__UPLOAD_ID__: # offload to the entry reader if it is a nested query if isinstance(upload_id := node.archive.get(key, None), str): - offload_read(UploadReader, upload_id) + await offload_read(UploadReader, upload_id) continue if key in GeneralReader.__USER_ID__: # offload to the user reader if it is a nested query if user_id := node.archive.get(key, None): - offload_read( + await offload_read( UserReader, user_id, read_list=isinstance(user_id, list) ) continue @@ -1371,11 +1416,11 @@ class MongoReader(GeneralReader): else: child_config = current_config.new({'query': None, 'pagination': None}) - def __offload_walk(query_set, transformer): + async def __offload_walk(query_set, transformer): response_path: list = node.current_path + [key, Token.RESPONSE] if isinstance(value, dict) and GeneralReader.__CONFIG__ in value: - _populate_result( + await _populate_result( node.result_root, response_path, _to_response_config( @@ -1385,24 +1430,26 @@ class MongoReader(GeneralReader): query, filtered = query_set if query is not None: - _populate_result(node.result_root, response_path + ['query'], query) + await _populate_result( + node.result_root, response_path + ['query'], query + ) if filtered is None: return - result, pagination = self._normalise( + result, pagination = await self._normalise( filtered, child_config, transformer ) if pagination is not None: pagination_dict = pagination.dict() if pagination_dict.get('order_by', None) == 'mainfile': pagination_dict['order_by'] = 'mainfile_path' - _populate_result( + await _populate_result( node.result_root, response_path + ['pagination'], pagination_dict, ) - self._walk( + await self._walk( node.replace( archive={k: k for k in result} if isinstance(value, RequestConfig) @@ -1413,11 +1460,11 @@ class MongoReader(GeneralReader): current_config, ) - if self._offload_walk(__offload_walk, child_config, key, value): + if await self._offload_walk(__offload_walk, child_config, key, value): continue if len(node.current_path) > 0 and node.current_path[-1] in __M_SEARCHABLE__: - offload_read(__M_SEARCHABLE__[node.current_path[-1]], key) + await offload_read(__M_SEARCHABLE__[node.current_path[-1]], key) continue # key may contain index, cached @@ -1430,15 +1477,15 @@ class MongoReader(GeneralReader): child_path: list = node.current_path + [name] if isinstance(value, RequestConfig): - self._resolve( + await self._resolve( node.replace(archive=child_archive, current_path=child_path), value ) elif isinstance(value, dict): # should never reach here in most cases # most mongo data is a 1-level tree # second level implies it's delegated to another reader - def __walk(__archive, __path): - self._walk( + async def __walk(__archive, __path): + await self._walk( node.replace(archive=__archive, current_path=__path), value, current_config, @@ -1446,9 +1493,9 @@ class MongoReader(GeneralReader): if isinstance(child_archive, list): for i in _normalise_index(index, len(child_archive)): - __walk(child_archive[i], child_path + [str(i)]) + await __walk(child_archive[i], child_path + [str(i)]) else: - __walk(child_archive, child_path) + await __walk(child_archive, child_path) elif isinstance(value, list): # optionally support alternative syntax pass @@ -1456,23 +1503,23 @@ class MongoReader(GeneralReader): # should never reach here raise ConfigError(f'Invalid required config: {value}.') - def _offload_walk( + async def _offload_walk( self, offload_func: Callable, config: RequestConfig, key: str, value ) -> bool: if key == Token.SEARCH: - offload_func(self._query_es(config), None) + await offload_func(await self._query_es(config), None) return True if key == Token.ENTRY or key == Token.ENTRIES: - offload_func(self._query_entries(config), entry_to_pydantic) + await offload_func(await self._query_entries(config), entry_to_pydantic) return True if key == Token.UPLOAD or key == Token.UPLOADS: - offload_func(self._query_uploads(config), upload_to_pydantic) + await offload_func(await self._query_uploads(config), upload_to_pydantic) return True if key == Token.DATASET or key == Token.DATASETS: - offload_func(self._query_datasets(config), dataset_to_pydantic) + await offload_func(await self._query_datasets(config), dataset_to_pydantic) return True if key == Token.USER or key == Token.USERS: - offload_func( + await offload_func( ( None, { @@ -1488,7 +1535,7 @@ class MongoReader(GeneralReader): return False - def _resolve( + async def _resolve( self, node: GraphNode, config: RequestConfig, @@ -1497,13 +1544,15 @@ class MongoReader(GeneralReader): wildcard: bool = False, ): if isinstance(node.archive, list): - return self._resolve_list(node, config) + return await self._resolve_list(node, config) if not isinstance(node.archive, dict): # primitive type data is always included # this is not affected by size limit nor by depth limit - return _populate_result( - node.result_root, node.current_path, self._apply_resolver(node, config) + return await _populate_result( + node.result_root, + node.current_path, + await self._apply_resolver(node, config), ) if ( @@ -1520,10 +1569,10 @@ class MongoReader(GeneralReader): config=config, global_root=self.global_root, ) as reader: - _populate_result( + await _populate_result( node.result_root, node.current_path, - reader.read(node.current_path[-1]), + await reader.read(node.current_path[-1]), ) except Exception as e: self._log(str(e)) @@ -1548,7 +1597,7 @@ class MongoReader(GeneralReader): new_config['exclude'] = None # need to retrain the include/exclude pattern for wildcard - self._resolve( + await self._resolve( node.replace( archive=node.archive[key], current_path=node.current_path + [key], @@ -1591,7 +1640,7 @@ class MongoReader(GeneralReader): class UploadReader(MongoReader): # noinspection PyMethodOverriding - def read(self, upload_id: str) -> dict: # type: ignore + async def read(self, upload_id: str) -> dict: # type: ignore response: dict = {} if self.global_root is None: @@ -1601,10 +1650,10 @@ class UploadReader(MongoReader): has_global_root = True # if it is a string, no access - if isinstance(target_upload := self.retrieve_upload(upload_id), dict): + if isinstance(target_upload := await self.retrieve_upload(upload_id), dict): self.entries = Entry.objects(upload_id=upload_id) - self._walk( + await self._walk( GraphNode( upload_id=upload_id, entry_id='__NOT_NEEDED__', @@ -1649,7 +1698,7 @@ class UploadReader(MongoReader): class DatasetReader(MongoReader): # noinspection PyMethodOverriding - def read(self, dataset_id: str) -> dict: # type: ignore + async def read(self, dataset_id: str) -> dict: # type: ignore response: dict = {} if self.global_root is None: @@ -1659,13 +1708,13 @@ class DatasetReader(MongoReader): has_global_root = True # if it is a string, no access - if isinstance(target_dataset := self.retrieve_dataset(dataset_id), dict): + if isinstance(target_dataset := await self.retrieve_dataset(dataset_id), dict): self.entries = Entry.objects(datasets=dataset_id) self.uploads = Upload.objects( upload_id__in=list({v['upload_id'] for v in self.entries}) ) - self._walk( + await self._walk( GraphNode( upload_id='__NOT_NEEDED__', entry_id='__NOT_NEEDED__', @@ -1705,7 +1754,7 @@ class DatasetReader(MongoReader): class EntryReader(MongoReader): # noinspection PyMethodOverriding - def read(self, entry_id: str) -> dict: # type: ignore + async def read(self, entry_id: str) -> dict: # type: ignore response: dict = {} if self.global_root is None: @@ -1715,10 +1764,10 @@ class EntryReader(MongoReader): has_global_root = True # if it is a string, no access - if isinstance(target_entry := self.retrieve_entry(entry_id), dict): + if isinstance(target_entry := await self.retrieve_entry(entry_id), dict): self.datasets = Dataset.m_def.a_mongo.objects(entries=entry_id) - self._walk( + await self._walk( GraphNode( upload_id=target_entry['upload_id'], entry_id=entry_id, @@ -1757,7 +1806,7 @@ class EntryReader(MongoReader): class ElasticSearchReader(EntryReader): - def retrieve_entry(self, entry_id: str) -> str | dict: + async def retrieve_entry(self, entry_id: str) -> str | dict: search_response = perform_search( owner='all', query={'entry_id': entry_id}, user_id=self.user.user_id ) @@ -1790,7 +1839,7 @@ class ElasticSearchReader(EntryReader): class UserReader(MongoReader): # noinspection PyMethodOverriding - def read(self, user_id: str): # type: ignore + async def read(self, user_id: str): # type: ignore response: dict = {} if self.global_root is None: @@ -1821,14 +1870,14 @@ class UserReader(MongoReader): ) ) - self._walk( + await self._walk( GraphNode( upload_id='__NOT_NEEDED__', entry_id='__NOT_NEEDED__', current_path=[], result_root=response, ref_result_root=self.global_root, - archive=self.retrieve_user(user_id), + archive=await self.retrieve_user(user_id), archive_root=None, definition=None, visited_path=set(), @@ -1861,7 +1910,7 @@ class FileSystemReader(GeneralReader): super().__init__(*args, **kwargs) self._root_path: list = [] - def read(self, upload_id: str, path: str = None) -> dict: + async def read(self, upload_id: str, path: str = None) -> dict: self._root_path = [v for v in path.split('/') if v] if path else [] response: dict = {} @@ -1882,7 +1931,7 @@ class FileSystemReader(GeneralReader): error_type=QueryError.NOACCESS, ) else: - self._walk( + await self._walk( GraphNode( upload_id=upload_id, entry_id='__NOT_NEEDED__', @@ -1924,20 +1973,20 @@ class FileSystemReader(GeneralReader): abs_path.append(pp) return abs_path - def _walk( + async def _walk( self, node: GraphNode, required: dict | RequestConfig, parent_config: RequestConfig, ): if isinstance(required, RequestConfig): - return self._resolve(node, required) + return await self._resolve(node, required) if GeneralReader.__CONFIG__ in required: # resolve the current tree if config is present # excluding explicitly assigned keys current_config: RequestConfig = required[GeneralReader.__CONFIG__] - self._resolve(node, current_config, omit_keys=required.keys()) + await self._resolve(node, current_config, omit_keys=required.keys()) else: current_config = parent_config @@ -1946,16 +1995,18 @@ class FileSystemReader(GeneralReader): is_current_path_file: bool = node.archive.raw_path_is_file(full_path_str) if not is_current_path_file: - _populate_result(node.result_root, full_path + ['m_is'], 'Directory') + await _populate_result(node.result_root, full_path + ['m_is'], 'Directory') if Token.ENTRY in required: # implicit resolve if is_current_path_file and ( - results := self._offload( + results := await self._offload( node.upload_id, full_path_str, required[Token.ENTRY], current_config ) ): - _populate_result(node.result_root, full_path + [Token.ENTRY], results) + await _populate_result( + node.result_root, full_path + [Token.ENTRY], results + ) for key, value in required.items(): if key == GeneralReader.__CONFIG__: @@ -1968,7 +2019,7 @@ class FileSystemReader(GeneralReader): ): continue - self._walk( + await self._walk( node.replace( current_path=child_path, current_depth=node.current_depth + 1, @@ -1977,7 +2028,7 @@ class FileSystemReader(GeneralReader): current_config, ) - def _resolve( + async def _resolve( self, node: GraphNode, config: RequestConfig, @@ -1991,7 +2042,7 @@ class FileSystemReader(GeneralReader): os_path: str = '/'.join(abs_path) if not node.archive.raw_path_is_file(os_path): - _populate_result(node.result_root, full_path + ['m_is'], 'Directory') + await _populate_result(node.result_root, full_path + ['m_is'], 'Directory') ref_path = ['/'.join(self._root_path)] if ref_path[0]: @@ -2021,7 +2072,7 @@ class FileSystemReader(GeneralReader): pagination['total'] = len(folders) + len(files) - _populate_result( + await _populate_result( node.result_root, full_path + [Token.RESPONSE], _to_response_config(config, pagination=pagination), @@ -2052,7 +2103,9 @@ class FileSystemReader(GeneralReader): not file.path.endswith(os.path.sep + k) for k in omit_keys ): if config.directive is DirectiveType.resolved and ( - resolved := self._offload(node.upload_id, file.path, config, config) + resolved := await self._offload( + node.upload_id, file.path, config, config + ) ): results[Token.ENTRY] = resolved @@ -2064,7 +2117,9 @@ class FileSystemReader(GeneralReader): if not (result_path := ref_path + file_path[len(abs_path) :]): result_path = [file_path[-1]] - _populate_result(node.result_root, result_path, results, path_like=True) + await _populate_result( + node.result_root, result_path, results, path_like=True + ) @classmethod def validate_config(cls, key: str, config: RequestConfig): @@ -2076,7 +2131,7 @@ class FileSystemReader(GeneralReader): return config - def _offload( + async def _offload( self, upload_id: str, main_file: str, required, parent_config: RequestConfig ) -> dict: if entry := Entry.objects(upload_id=upload_id, mainfile=main_file).first(): @@ -2087,7 +2142,7 @@ class FileSystemReader(GeneralReader): config=parent_config, global_root=self.global_root, ) as reader: - return reader.read(entry.entry_id) + return await reader.read(entry.entry_id) return {} @@ -2115,11 +2170,11 @@ class ArchiveReader(GeneralReader): >>> user = {} >>> archive = {} >>> reader = ArchiveReader(query, user) - >>> result = reader.read(archive) + >>> result = reader.sync_read(archive) >>> reader.close() # important 2. Use context manager. >>> with ArchiveReader(query, user) as reader: - >>> result = reader.read(archive) + >>> result = reader.sync_read(archive) 3. Use static method. >>> result = ArchiveReader.read_required(query, user, archive) """ @@ -2153,7 +2208,7 @@ class ArchiveReader(GeneralReader): return False - def read(self, *args) -> dict: + async def read(self, *args) -> dict: """ Read the given archive with the required fields. Takes two forms of arguments: @@ -2162,7 +2217,7 @@ class ArchiveReader(GeneralReader): """ archive = args[0] if len(args) == 1 else self.load_archive(*args) - metadata = archive['metadata'] + metadata = await goto_child(archive, 'metadata') response: dict = {} @@ -2172,10 +2227,10 @@ class ArchiveReader(GeneralReader): else: has_global_root = True - self._walk( + await self._walk( GraphNode( - upload_id=metadata['upload_id'], - entry_id=metadata['entry_id'], + upload_id=await goto_child(metadata, 'upload_id'), + entry_id=await goto_child(metadata, 'entry_id'), current_path=[], result_root=response, ref_result_root=self.global_root, @@ -2197,7 +2252,7 @@ class ArchiveReader(GeneralReader): return response - def _walk( + async def _walk( self, node: GraphNode, required: dict | RequestConfig, @@ -2208,7 +2263,7 @@ class ArchiveReader(GeneralReader): The parent config is passed down to the children in case there is no config in any subtree. """ if isinstance(required, RequestConfig): - return self._resolve(node, required) + return await self._resolve(node, required) if required.pop(GeneralReader.__WILDCARD__, None): self._log( @@ -2224,12 +2279,12 @@ class ArchiveReader(GeneralReader): if GeneralReader.__CONFIG__ in required: # keys explicitly given will be resolved later on during tree traversal # thus omit here to avoid duplicate resolve - self._resolve(node, current_config, omit_keys=required.keys()) + await self._resolve(node, current_config, omit_keys=required.keys()) # update node definition if required - node = self._check_definition(node, current_config) + node = await self._check_definition(node, current_config) # in case of a reference, resolve it implicitly - node = self._check_reference(node, current_config, implicit_resolve=True) + node = await self._check_reference(node, current_config, implicit_resolve=True) # walk through the required fields for key, value in required.items(): @@ -2249,10 +2304,10 @@ class ArchiveReader(GeneralReader): config=current_config, global_root=self.global_root, ) as reader: - _populate_result( + await _populate_result( node.result_root, node.current_path + [Token.DEF], - reader.read(node.definition), + await reader.read(node.definition), ) continue @@ -2260,7 +2315,7 @@ class ArchiveReader(GeneralReader): name, index = _parse_key(key) try: - child_archive = node.archive.get(name, None) + child_archive = await async_get(node.archive, name, None) except AttributeError as e: # implicit resolve failed, or wrong path given self._log(str(e), error_type=QueryError.NOTFOUND) @@ -2301,13 +2356,13 @@ class ArchiveReader(GeneralReader): if isinstance(value, RequestConfig): # this is a leaf, resolve it according to the config - self._resolve( + await self._resolve( child(current_path=child_path, archive=child_archive), value ) elif isinstance(value, dict): # this is a nested query, keep walking down the tree - def __walk(__path, __archive): - self._walk( + async def __walk(__path, __archive): + await self._walk( child(current_path=__path, archive=__archive), value, current_config, @@ -2316,10 +2371,10 @@ class ArchiveReader(GeneralReader): if is_list: # field[start:end]: dict for i in _normalise_index(index, len(child_archive)): - __walk(child_path + [str(i)], child_archive[i]) + await __walk(child_path + [str(i)], child_archive[i]) else: # field: dict - __walk(child_path, child_archive) + await __walk(child_path, child_archive) elif isinstance(value, list): # optionally support alternative syntax pass @@ -2327,7 +2382,7 @@ class ArchiveReader(GeneralReader): # should never reach here raise ConfigError(f'Invalid required config: {value}.') - def _resolve( + async def _resolve( self, node: GraphNode, config: RequestConfig, @@ -2345,12 +2400,12 @@ class ArchiveReader(GeneralReader): from nomad.archive.storage_v2 import ArchiveList as ArchiveListNew if isinstance(node.archive, (list, ArchiveList, ArchiveListNew)): - return self._resolve_list(node, config) + return await self._resolve_list(node, config) # no matter if to resolve, it is always necessary to replace the definition with potential custom definition - node = self._check_definition(node, config) + node = await self._check_definition(node, config) # if it needs to resolve, it is necessary to check references - node = self._check_reference( + node = await self._check_reference( node, config, implicit_resolve=omit_keys is not None ) @@ -2359,8 +2414,10 @@ class ArchiveReader(GeneralReader): if not isinstance(node.archive, (dict, ArchiveDict, ArchiveDictNew)): # primitive type data is always included # this is not affected by size limit nor by depth limit - _populate_result( - node.result_root, node.current_path, self._apply_resolver(node, config) + await _populate_result( + node.result_root, + node.current_path, + await self._apply_resolver(node, config), ) return @@ -2371,9 +2428,9 @@ class ArchiveReader(GeneralReader): result_to_write = ( f'__INTERNAL__:{node.generate_reference()}' if self.__if_strip(node, config) - else self._apply_resolver(node, config) + else await self._apply_resolver(node, config) ) - _populate_result(node.result_root, node.current_path, result_to_write) + await _populate_result(node.result_root, node.current_path, result_to_write) return for key in node.archive.keys(): @@ -2393,21 +2450,21 @@ class ArchiveReader(GeneralReader): child_definition = child_definition.sub_section child_node = node.replace( - archive=node.archive[key], + archive=await goto_child(node.archive, key), current_path=node.current_path + [key], definition=child_definition, current_depth=node.current_depth + 1, ) if self.__if_strip(child_node, config, depth_check=is_subsection): - _populate_result( + await _populate_result( node.result_root, child_node.current_path, f'__INTERNAL__:{child_node.generate_reference()}', ) continue - self._resolve( + await self._resolve( child_node, config.new( { @@ -2419,7 +2476,9 @@ class ArchiveReader(GeneralReader): ), ) - def _check_definition(self, node: GraphNode, config: RequestConfig) -> GraphNode: + async def _check_definition( + self, node: GraphNode, config: RequestConfig + ) -> GraphNode: """ Check the existence of custom definition. If positive, overwrite the corresponding information of the current node. @@ -2429,19 +2488,19 @@ class ArchiveReader(GeneralReader): if not isinstance(node.archive, (dict, ArchiveDict, ArchiveDictNew)): return node - def __if_contains(m_def): - return _if_exists( + async def __if_contains(m_def): + return await _if_exists( self.global_root, _convert_ref_to_path(m_def.strict_reference()) ) - custom_def: str | None = node.archive.get('m_def', None) - custom_def_id: str | None = node.archive.get('m_def_id', None) + custom_def: str | None = await async_get(node.archive, 'm_def', None) + custom_def_id: str | None = await async_get(node.archive, 'm_def_id', None) if custom_def is None and custom_def_id is None: if config.include_definition is DefinitionType.both: definition = node.definition if isinstance(definition, SubSection): definition = definition.sub_section.m_resolved() - if not __if_contains(definition): + if not await __if_contains(definition): with DefinitionReader( RequestConfig(directive=DirectiveType.plain), user=self.user, @@ -2449,23 +2508,24 @@ class ArchiveReader(GeneralReader): config=config, global_root=self.global_root, ) as reader: - _populate_result( + await _populate_result( node.result_root, node.current_path + [Token.DEF], - reader.read(definition), + await reader.read(definition), ) return node try: - new_def = self._retrieve_definition(custom_def, custom_def_id, node) + new_def = await self._retrieve_definition(custom_def, custom_def_id, node) except Exception as e: self._log( f'Failed to retrieve definition: {e}', error_type=QueryError.NOTFOUND ) return node - if config.include_definition is not DefinitionType.none and not __if_contains( - new_def + if ( + config.include_definition is not DefinitionType.none + and not await __if_contains(new_def) ): with DefinitionReader( RequestConfig(directive=DirectiveType.plain), @@ -2474,15 +2534,15 @@ class ArchiveReader(GeneralReader): config=config, global_root=self.global_root, ) as reader: - _populate_result( + await _populate_result( node.result_root, node.current_path + [Token.DEF], - reader.read(new_def), + await reader.read(new_def), ) return node.replace(definition=new_def) - def _check_reference( + async def _check_reference( self, node: GraphNode, config: RequestConfig, *, implicit_resolve: bool = False ) -> GraphNode: """ @@ -2521,13 +2581,13 @@ class ArchiveReader(GeneralReader): return node try: - resolved_node = node.goto(node.archive, config.resolve_inplace) + resolved_node = await node.goto(node.archive, config.resolve_inplace) except ArchiveError as e: # cannot resolve somehow # treat it as a normal string # populate to the result self._log(str(e), error_type=QueryError.ARCHIVEERROR) - _populate_result(node.result_root, node.current_path, node.archive) + await _populate_result(node.result_root, node.current_path, node.archive) return node ref = original_def.type @@ -2537,12 +2597,12 @@ class ArchiveReader(GeneralReader): else ref.target_section_def ) # need to check custom definition again since the referenced archive may have a custom definition - return self._check_definition( + return await self._check_definition( resolved_node.replace(definition=target.m_resolved()), config ) # noinspection PyUnusedLocal - def _retrieve_definition( + async def _retrieve_definition( self, m_def: str | None, m_def_id: str | None, node: GraphNode ): # todo: more flexible definition retrieval, accounting for definition id, mismatches, etc. @@ -2576,10 +2636,12 @@ class ArchiveReader(GeneralReader): if m_def.startswith(('#/', '/')): # appears to be a local definition return __resolve_definition_in_archive( - to_json(node.archive_root['definitions']), + await async_to_json( + await goto_child(node.archive_root, 'definitions') + ), [v for v in m_def.split('/') if v not in ('', '#', 'definitions')], - node.archive_root['metadata']['upload_id'], - node.archive_root['metadata']['entry_id'], + await goto_child(node.archive_root, ['metadata', 'upload_id']), + await goto_child(node.archive_root, ['metadata', 'entry_id']), ) # todo: !!!need to unify different formats!!! # check if m_def matches the pattern 'entry_id:example_id.example_section.example_quantity' @@ -2589,7 +2651,7 @@ class ArchiveReader(GeneralReader): upload_id = Entry.objects(entry_id=entry_id).first().upload_id archive = self.load_archive(upload_id, entry_id) return __resolve_definition_in_archive( - to_json(archive['definitions']), + await async_to_json(await goto_child(archive, 'definitions')), list(match.groups()[1:]), upload_id, entry_id, @@ -2620,11 +2682,11 @@ class ArchiveReader(GeneralReader): A helper wrapper. """ with ArchiveReader(required_query, user=user) as reader: - return reader.read(archive) + return reader.sync_read(archive) class DefinitionReader(GeneralReader): - def read(self, archive: Definition) -> dict: + async def read(self, archive: Definition) -> dict: response: dict = {Token.DEF: {}} if self.global_root is None: @@ -2633,7 +2695,7 @@ class DefinitionReader(GeneralReader): else: has_global_root = True - self._walk( + await self._walk( GraphNode( upload_id='__NONE__', entry_id='__NONE__', @@ -2658,28 +2720,29 @@ class DefinitionReader(GeneralReader): return response - def _walk( + async def _walk( self, node: GraphNode, required: dict | RequestConfig, parent_config: RequestConfig, ): if isinstance(required, RequestConfig): - return self._resolve( - self._switch_root(node, inplace=required.resolve_inplace), required + return await self._resolve( + await self._switch_root(node, inplace=required.resolve_inplace), + required, ) current_config: RequestConfig = required.get( GeneralReader.__CONFIG__, parent_config ) - node = self._switch_root(node, inplace=current_config.resolve_inplace) + node = await self._switch_root(node, inplace=current_config.resolve_inplace) # if it is a subtree, itself needs to be resolved if GeneralReader.__CONFIG__ in required: # keys explicitly given will be resolved later on during tree traversal # thus omit here to avoid duplicate resolve - self._resolve(node, current_config, omit_keys=required.keys()) + await self._resolve(node, current_config, omit_keys=required.keys()) def __convert(m_def): return _convert_ref_to_path_string(m_def.strict_reference()) @@ -2711,7 +2774,7 @@ class DefinitionReader(GeneralReader): for i in _normalise_index(index, len(child_def)): if child_def[i] is not node.archive: continue - _populate_result( + await _populate_result( node.result_root, child_path + [str(i)], __convert(child_def[i]) ) break # early return assuming children do not repeat @@ -2720,15 +2783,15 @@ class DefinitionReader(GeneralReader): # just write reference strings to the corresponding paths # whether they shall be resolved or not is determined by the config and will be handled later if isinstance(child_def, dict): - _populate_result(node.result_root, child_path, {}) + await _populate_result(node.result_root, child_path, {}) for k, v in child_def.items(): - _populate_result( + await _populate_result( node.result_root, child_path + [k], __convert(v) ) elif isinstance(child_def, (set, list)): - _populate_result(node.result_root, child_path, []) + await _populate_result(node.result_root, child_path, []) for i, v in enumerate(child_def): - _populate_result( + await _populate_result( node.result_root, child_path + [str(i)], __convert(v) ) else: @@ -2736,26 +2799,28 @@ class DefinitionReader(GeneralReader): raise # noqa: PLE0704 elif child_def is node.archive: assert isinstance(child_def, Definition) - _populate_result(node.result_root, child_path, __convert(child_def)) + await _populate_result( + node.result_root, child_path, __convert(child_def) + ) - def __handle_derived(__func): + async def __handle_derived(__func): if isinstance(child_def, dict): for _k, _v in child_def.items(): - __func(child_path + [_k], _v) + await __func(child_path + [_k], _v) elif isinstance(child_def, (set, list)): for _i, _v in enumerate(child_def): - __func(child_path + [str(_i)], _v) + await __func(child_path + [str(_i)], _v) else: # should never reach here raise # noqa: PLE0704 if isinstance(value, RequestConfig): # this is a leaf, resolve it according to the config - def __resolve(__path, __archive): + async def __resolve(__path, __archive): if __archive is node.archive: return - self._resolve( - self._switch_root( + await self._resolve( + await self._switch_root( node.replace(current_path=__path, archive=__archive), inplace=value.resolve_inplace, ), @@ -2764,18 +2829,18 @@ class DefinitionReader(GeneralReader): if is_list: for i in _normalise_index(index, len(child_def)): - __resolve(child_path + [str(i)], child_def[i]) + await __resolve(child_path + [str(i)], child_def[i]) elif is_plain_container: if value.directive is DirectiveType.resolved: - __handle_derived(__resolve) + await __handle_derived(__resolve) else: - __resolve(child_path, child_def) + await __resolve(child_path, child_def) elif isinstance(value, dict): # this is a nested query, keep walking down the tree - def __walk(__path, __archive): + async def __walk(__path, __archive): if __archive is node.archive: return - self._walk( + await self._walk( node.replace(current_path=__path, archive=__archive), value, current_config, @@ -2784,11 +2849,11 @@ class DefinitionReader(GeneralReader): if is_list: # field[start:end]: dict for i in _normalise_index(index, len(child_def)): - __walk(child_path + [str(i)], child_def[i]) + await __walk(child_path + [str(i)], child_def[i]) elif is_plain_container: - __handle_derived(__walk) + await __handle_derived(__walk) else: - __walk(child_path, child_def) + await __walk(child_path, child_def) elif isinstance(value, list): # optionally support alternative syntax pass @@ -2796,7 +2861,7 @@ class DefinitionReader(GeneralReader): # should never reach here raise ConfigError(f'Invalid required config: {value}.') - def _resolve( + async def _resolve( self, node: GraphNode, config: RequestConfig, @@ -2805,7 +2870,7 @@ class DefinitionReader(GeneralReader): wildcard: bool = False, ): if isinstance(node.archive, list): - return self._resolve_list(node, config) + return await self._resolve_list(node, config) def __unwrap_ref(ref_type: Reference): """ @@ -2846,7 +2911,7 @@ class DefinitionReader(GeneralReader): # rewrite quantity type data with global reference if not self._check_cache(node.current_path, config.hash): self._cache_hash(node.current_path, config.hash) - _populate_result( + await _populate_result( node.result_root, node.current_path, node.archive.m_to_dict(with_out_meta=True, transform=__override_path), @@ -2858,10 +2923,10 @@ class DefinitionReader(GeneralReader): ref_str: str = target.strict_reference() path_stack: list = _convert_ref_to_path(ref_str) # check if it has been populated - if ref_str not in node.visited_path and not _if_exists( + if ref_str not in node.visited_path and not await _if_exists( node.ref_result_root, path_stack ): - self._resolve( + await self._resolve( node.replace( archive=target, current_path=path_stack, @@ -2905,8 +2970,8 @@ class DefinitionReader(GeneralReader): path_stack, config.hash ): continue - self._resolve( - self._switch_root( + await self._resolve( + await self._switch_root( node.replace( archive=section, current_path=node.current_path + [name, str(index)], @@ -2919,7 +2984,7 @@ class DefinitionReader(GeneralReader): ) @staticmethod - def _switch_root(node: GraphNode, *, inplace: bool) -> GraphNode: + async def _switch_root(node: GraphNode, *, inplace: bool) -> GraphNode: """ Depending on whether to resolve in place, adapt the current root of the result tree. If NOT in place, write a global reference string to the current place, then switch to the referenced root. @@ -2929,7 +2994,7 @@ class DefinitionReader(GeneralReader): ref_str: str = node.archive.strict_reference() if not isinstance(node.archive, Quantity): - _populate_result( + await _populate_result( node.result_root, node.current_path, _convert_ref_to_path_string(ref_str), diff --git a/tests/app/v1/routers/test_graph.py b/tests/app/v1/routers/test_graph.py index e90f618bb076b91238bcb341149a9f6f5f2f2432..4e1beee2d52e29c09da29061caedfdd2d1ef48fa 100644 --- a/tests/app/v1/routers/test_graph.py +++ b/tests/app/v1/routers/test_graph.py @@ -607,7 +607,7 @@ def test_entry_reader_with_reference( example_archive, required, error, user1, example_upload ): with EntryReader({Token.ARCHIVE: required}, user=user1) as reader: - results = reader.read('test_id') + results = reader.sync_read('test_id') if error: assert 'm_errors' in results[Token.ARCHIVE] diff --git a/tests/graph/test_definition_reader.py b/tests/graph/test_definition_reader.py index 9e1e904e98e22eba03fcc8995216969ae87b2410..a6f28eca420fa2936461e6e2e6870fd5a433d3fc 100644 --- a/tests/graph/test_definition_reader.py +++ b/tests/graph/test_definition_reader.py @@ -507,5 +507,5 @@ def assert_dict(d1, d2): ) def test_definition_reader(query, result): with DefinitionReader(query) as reader: - response = remove_cache(reader.read(m_def)) + response = remove_cache(reader.sync_read(m_def)) assert_dict(response, result) diff --git a/tests/graph/test_graph_reader.py b/tests/graph/test_graph_reader.py index c15fd814901b88e4944932fe7be25dc23ba9650a..2f254db37a27e09d7b2c25ceb356229dcc5514ee 100644 --- a/tests/graph/test_graph_reader.py +++ b/tests/graph/test_graph_reader.py @@ -110,12 +110,12 @@ def test_remote_reference(json_dict, example_data_with_reference, user1): def __user_print(msg, required, *, result: dict = None): with UserReader(required, user=user1) as reader: if result: - assert_dict(reader.read(user1.user_id), result) + assert_dict(reader.sync_read(user1.user_id), result) else: rprint(f'\n\nExample: {next(counter)} -> {msg}:') rprint(required) rprint('output:') - rprint(reader.read(user1.user_id)) + rprint(reader.sync_read(user1.user_id)) __user_print( 'plain user', @@ -832,12 +832,12 @@ def test_remote_reference(json_dict, example_data_with_reference, user1): def __upload_print(msg, required, *, result: dict = None): with UploadReader(required, user=user1) as reader: if result: - assert_dict(reader.read('id_published_with_ref'), result) + assert_dict(reader.sync_read('id_published_with_ref'), result) else: rprint(f'\n\nExample: {next(counter)} -> {msg}:') rprint(required) rprint('output:') - rprint(reader.read('id_published_with_ref')) + rprint(reader.sync_read('id_published_with_ref')) __upload_print( 'plain upload reader', @@ -1247,16 +1247,16 @@ def test_remote_reference(json_dict, example_data_with_reference, user1): def __entry_print(msg, required, *, to_file: bool = False, result: dict = None): with EntryReader(required, user=user1) as reader: if result: - assert_dict(reader.read('id_03'), result) + assert_dict(reader.sync_read('id_03'), result) else: rprint(f'\n\nExample: {next(counter)} -> {msg}:') rprint(required) if not to_file: rprint('output:') - rprint(reader.read('id_03')) + rprint(reader.sync_read('id_03')) else: with open('entry_reader_test.json', 'w') as f: - f.write(json.dumps(reader.read('id_03'))) + f.write(json.dumps(reader.sync_read('id_03'))) __entry_print( 'plain entry reader', @@ -1655,12 +1655,12 @@ def test_remote_reference(json_dict, example_data_with_reference, user1): def __fs_print(msg, required, *, result: dict = None): with FileSystemReader(required, user=user1) as reader: if result: - assert_dict(reader.read('id_published_with_ref'), result) + assert_dict(reader.sync_read('id_published_with_ref'), result) else: rprint(f'\n\nExample: {next(counter)} -> {msg}:') rprint(required) rprint('output:') - rprint(reader.read('id_published_with_ref')) + rprint(reader.sync_read('id_published_with_ref')) __fs_print( 'plain file system reader', @@ -2129,16 +2129,16 @@ def test_general_reader(json_dict, example_data_with_reference, user1): def __ge_print(msg, required, *, to_file: bool = False, result: dict = None): with MongoReader(required, user=user1) as reader: if result: - assert_dict(reader.read(), result) + assert_dict(reader.sync_read(), result) else: rprint(f'\n\nExample: {next(counter)} -> {msg}:') rprint(required) if not to_file: rprint('output:') - rprint(reader.read()) + rprint(reader.sync_read()) else: with open('archive_reader_test.json', 'w') as f: - f.write(json.dumps(reader.read())) + f.write(json.dumps(reader.sync_read())) __ge_print( 'general start from entry', @@ -2524,16 +2524,16 @@ def test_general_reader_search(json_dict, example_data_with_reference, user1): def __ge_print(msg, required, *, to_file: bool = False, result: dict = None): with MongoReader(required, user=user1) as reader: if result: - assert_dict(reader.read(), result) + assert_dict(reader.sync_read(), result) else: rprint(f'\n\nExample: {next(counter)} -> {msg}:') rprint(required) if not to_file: rprint('output:') - rprint(reader.read()) + rprint(reader.sync_read()) else: with open('archive_reader_test.json', 'w') as f: - f.write(json.dumps(reader.read())) + f.write(json.dumps(reader.sync_read())) __ge_print( 'general start from elastic search', @@ -2673,7 +2673,7 @@ def test_custom_schema_archive_and_definition(user1, custom_data): def __entry_print(msg, required, *, to_file: bool = False, result: dict = None): with EntryReader(required, user=user1) as reader: - response = reader.read('id_example') + response = reader.sync_read('id_example') if result: assert_dict(response, result) else: @@ -2774,12 +2774,12 @@ def test_custom_schema_archive_and_definition(user1, custom_data): def __fs_print(msg, required, *, result: dict = None): with FileSystemReader(required, user=user1) as reader: if result: - assert_dict(reader.read('id_custom'), result) + assert_dict(reader.sync_read('id_custom'), result) else: rprint(f'\n\nExample: {next(counter)} -> {msg}:') rprint(required) rprint('output:') - rprint(reader.read('id_custom')) + rprint(reader.sync_read('id_custom')) __fs_print( 'one level deep second page',