diff --git a/nomad/archive/query_reader.py b/nomad/archive/query_reader.py index b02d85880a784a30f050871c5ed0d16bb7fb0d6c..e8a063817883a985b6b3a66d560349d45b963ab4 100644 --- a/nomad/archive/query_reader.py +++ b/nomad/archive/query_reader.py @@ -713,7 +713,8 @@ class GeneralReader: self.upload_pool[upload_id] = upload.upload_files try: - return self.upload_pool[upload_id].read_archive(entry_id)[entry_id] + with self.upload_pool[upload_id].read_archive(entry_id) as archive: + return archive[entry_id] except KeyError: raise ArchiveError(f'Archive {entry_id} does not exist in upload {entry_id}.') diff --git a/nomad/archive/storage.py b/nomad/archive/storage.py index 1bfe497b3665afb15a84a715dd3e0e5ab96e4e63..3424337891ba20394b3c3a85df480d37ce465ece 100644 --- a/nomad/archive/storage.py +++ b/nomad/archive/storage.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import os from typing import Iterable, Any, Tuple, Dict, BinaryIO, Union, List, cast from io import BytesIO, BufferedReader from collections.abc import Mapping, Sequence @@ -26,7 +26,7 @@ from msgpack.fallback import Packer, StringIO import struct import json -from nomad import utils +from nomad import utils, config from nomad.config import archive __packer = msgpack.Packer(autoreset=True, use_bin_type=True) @@ -464,7 +464,7 @@ class ArchiveReader(ArchiveDict): self._f.close() def is_closed(self): - return self._f.closed + return self._f.closed if isinstance(self._file_or_path, str) else True def write_archive( @@ -552,6 +552,11 @@ def read_archive(file_or_path: Union[str, BytesIO], **kwargs) -> ArchiveReader: ''' from .storage_v2 import ArchiveWriter as ArchiveWriterNew, ArchiveReader as ArchiveReaderNew + # if the file is smaller than the threshold, just read it into memory to avoid multiple small reads + if isinstance(file_or_path, str) and os.path.getsize(file_or_path) < 4 * config.archive.read_buffer_size: + with open(file_or_path, 'rb') as f: + file_or_path = BytesIO(f.read()) + if isinstance(file_or_path, str): with open(file_or_path, 'rb') as f: magic = f.read(ArchiveWriterNew.magic_len) diff --git a/nomad/archive/storage_v2.py b/nomad/archive/storage_v2.py index fa2fa501a0dec26e12d295d77052312ba2bbf84d..788faa4bb4ed0182b687ad48392e73079f3eaaf0 100644 --- a/nomad/archive/storage_v2.py +++ b/nomad/archive/storage_v2.py @@ -673,7 +673,9 @@ class ArchiveReader(ArchiveItem): self._f.close() def is_closed(self): - return self._f.closed + # If the input is a BytesIO, it is assumed that the file is always closed + # If the input is a path, need to check if the file is closed + return self._f.closed if isinstance(self._file_or_path, str) else True def to_json(self): if self._full_cache is None: diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 331b4c32b63cea65952070413e6f79a39637054c..b9882d6f4cc4169d6ce9820c0f1d484c0dfabb62 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -930,15 +930,15 @@ class Entry(Proc): # parts that are referenced by section_metadata/EntryMetadata # TODO somehow it should determine which root sections too load from the metainfo # or configuration - archive = upload.upload_files.read_archive(self.entry_id) - entry_archive = archive[self.entry_id] - entry_archive_dict = {section_metadata: to_json(entry_archive[section_metadata])} - if section_workflow in entry_archive: - entry_archive_dict[section_workflow] = to_json(entry_archive[section_workflow]) - if section_results in entry_archive: - entry_archive_dict[section_results] = to_json(entry_archive[section_results]) - entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata] - self._apply_metadata_from_mongo(upload, entry_metadata) + with upload.upload_files.read_archive(self.entry_id) as archive: + entry_archive = archive[self.entry_id] + entry_archive_dict = {section_metadata: to_json(entry_archive[section_metadata])} + if section_workflow in entry_archive: + entry_archive_dict[section_workflow] = to_json(entry_archive[section_workflow]) + if section_results in entry_archive: + entry_archive_dict[section_results] = to_json(entry_archive[section_results]) + entry_metadata = datamodel.EntryArchive.m_from_dict(entry_archive_dict)[section_metadata] + self._apply_metadata_from_mongo(upload, entry_metadata) except KeyError: # Due to hard processing failures, it might be possible that an entry might not # have an archive. Return the metadata that is available. diff --git a/tests/parsing/test_elabftw_parser.py b/tests/parsing/test_elabftw_parser.py index 276e10ae8d8c1a25d753439437431493756cd775..b67b068cdce37529a04c92a2c412a1a9ee09c88f 100644 --- a/tests/parsing/test_elabftw_parser.py +++ b/tests/parsing/test_elabftw_parser.py @@ -36,8 +36,8 @@ def _assert_parsed_data(upload_id, entries, cls, parser, assert_fnc, mainfile_st f.read() try: - archive = upload_files.read_archive(entry.entry_id) - assert entry.entry_id in archive + with upload_files.read_archive(entry.entry_id) as archive: + assert entry.entry_id in archive if entry.entry_name and mainfile_str in entry.entry_name: test_archive = EntryArchive(metadata=EntryMetadata(entry_id=entry.entry_id, upload_id=upload_id)) test_archive.m_context = Context() diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index cd2ec398ee26998019f413b7a3d0c686f37c7422..7d1b06ad9a230d1100efec8e03b986306f92f6c6 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -846,9 +846,9 @@ def test_creating_new_entries_during_processing(proc_infra, test_user): assert entry.process_status == ProcessStatus.SUCCESS if entry.mainfile.startswith('my_batch_'): idx = int(entry.mainfile.split('.')[0].split('_')[-1]) - archive = upload_files.read_archive(entry.entry_id)[entry.entry_id] - assert archive['data']['batch_id'] == 'my_batch' - assert archive['data']['sample_number'] == idx + with upload_files.read_archive(entry.entry_id) as archive: + assert archive[entry.entry_id]['data']['batch_id'] == 'my_batch' + assert archive[entry.entry_id]['data']['sample_number'] == idx @pytest.mark.timeout(config.tests.default_timeout) diff --git a/tests/test_files.py b/tests/test_files.py index cdafd3ece2f086a5d4565cda5b620bf54bef3bfd..8dc5a3fddc6c5d67394c8dc86f6098ac88a9477d 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -565,9 +565,8 @@ def assert_upload_files( f.read() try: - archive = upload_files.read_archive(entry.entry_id) - assert entry.entry_id in archive - + with upload_files.read_archive(entry.entry_id) as archive: + assert entry.entry_id in archive except KeyError: assert no_archive