From a033bd0bf04143d6aaa60f1633fd27126b90926d Mon Sep 17 00:00:00 2001 From: Markus Scheidgen <markus.scheidgen@gmail.com> Date: Fri, 5 Oct 2018 13:24:49 +0200 Subject: [PATCH] Added support for direct access to individual files in an upload. --- nomad/files.py | 209 ++++++++++++++++++++++++---------- nomad/processing/data.py | 33 +++--- tests/processing/test_data.py | 3 +- tests/test_files.py | 52 ++++++--- 4 files changed, 205 insertions(+), 92 deletions(-) diff --git a/nomad/files.py b/nomad/files.py index 44f76cf7a4..79952ba227 100644 --- a/nomad/files.py +++ b/nomad/files.py @@ -19,8 +19,11 @@ are basically paths. All major file system operations for dealing with uploaded files, archive, files, raw files, etc. should be part of this module to allow later introduction of real object storage systems. -Uploads -------- +.. note:: This module still uses ``os.path``. As long as the whole nomad runs on a +POSIX (or Windows) os everything should be fine. This means respective paths in the +dbs, and indices. In the future, this should be replaced with abstract path representations +ala ``PathLib``. + .. autoclass:: File :members: .. autoclass:: UploadFile @@ -51,6 +54,7 @@ class File: Attributes: logger: A structured logger with bucket and object information. + path: The abstract path of the file. """ def __init__(self, os_path: str = None) -> None: self.os_path = os_path @@ -61,11 +65,13 @@ class File: """ Adds context information to the given logger and returns it. """ return logger.bind(path=self.os_path) - def open(self, *args, **kwargs) -> IO: + @contextmanager + def open(self, *args, **kwargs) -> Generator[IO, None, None]: """ Opens the object with he given mode, etc. """ self.logger.debug('open file') try: - return open(self.os_path, *args, **kwargs) + with open(self.os_path, *args, **kwargs) as f: + yield f except FileNotFoundError: raise KeyError() @@ -86,6 +92,49 @@ class File: """ Returns the os determined file size. """ return os.stat(self.os_path).st_size + @property + def path(self) -> str: + return self.os_path + + +class ZippedFile(File): + """ A file contained in a .zip archive. """ + def __init__(self, zip_os_path: str, filename: str) -> None: + self.filename = filename + super().__init__(zip_os_path) + + def bind_logger(self, logger): + return super().bind_logger(logger).bind(filename=self.filename) + + @contextmanager + def open(self, *args, **kwargs) -> Generator[IO, None, None]: + self.logger.debug('open file') + try: + with ZipFile(self.os_path) as zip_file: + with zip_file.open(self.filename, *args, **kwargs) as f: + yield f + except FileNotFoundError: + raise KeyError() + except Exception as e: + msg = 'Could not read upload.' + self.logger.error(msg, exc_info=e) + raise FileError(msg, e) + + def delete(self) -> None: + assert False, "A file in a zip archive cannot be deleted." + + @property + def size(self) -> int: + with ZipFile(self.os_path) as zip_file: + return zip_file.getinfo(self.filename).file_size + + @property + def path(self) -> str: + return os.path.join( + os.path.dirname(self.os_path), + os.path.basename(self.os_path), + self.filename) + class Objects: @classmethod @@ -160,21 +209,28 @@ class FileError(Exception): class UploadFile(ObjectFile): """ - Instances represent an uploaded file in the *object storage*. Class is a conext - manager and supports the `with` statements. - In conext the upload will be extracted and contained files can be opened. - Some functions are only available for extracted uploads. + Instances of ``UploadFile`` represent an uploaded file in the *'object storage'*. + + Currently only ``.zip`` files are supported. - Uploads are stored in their own *bucket*. + Uploads can be extracted to tmp storage (open/close), the list of files in + the upload is provided, and files can be opened for read. Extracting uploads + is optional, all functions in this module are also available without extracting. + + This class is a context manager, that extracts the file when using a ``with`` + statement with instances of this class. + + UploadFiles are stored in their own *bucket*. But, storage can be overridden + by providing a ``local_path``. This is useful when the file is already stored + in nomad's distributed file system, e.g. for bulk processing of already uploaded + files. Arguments: upload_id: The upload of this uploaded file. - local_path: Optional override for the path used to store/access the upload - on the server. This can be usefull to create uploads for files that - were not uploaded but put to the server in another way, e.g. offline - processing, syncing with other data, etc. + local_path: Optional override for the path used to store/access the uploaded file. Attributes: + is_extracted: True if the upload is extracted. upload_extract_dir: The path of the tmp directory with the extracted contents. filelist: A list of filenames relative to the .zipped upload root. """ @@ -190,7 +246,7 @@ class UploadFile(ObjectFile): local_path=local_path) self.upload_extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id) - self.filelist: List[str] = None + self._filelist: List[str] = None def bind_logger(self, logger): return super().bind_logger(logger).bind(upload_id=self.object_id) @@ -209,6 +265,37 @@ class UploadFile(ObjectFile): raise FileError(msg, e) return wrapper + @property + def filelist(self) -> List[str]: + @UploadFile.Decorators.handle_errors + def get_filelist(self): + with self._zip() as zip_file: + return [ + zip_info.filename for zip_info in zip_file.filelist + if not zip_info.filename.endswith('/')] + + if not self._filelist: + self._filelist = get_filelist(self) + + return self._filelist + + @property + def is_extracted(self) -> bool: + return os.path.exists(self.upload_extract_dir) + + @contextmanager + def _zip(self): + assert self.exists(), "Can only access uploaded file if it exists." + zip_file = None + try: + zip_file = ZipFile(self.os_path) + yield zip_file + except BadZipFile as e: + raise FileError('Upload is not a zip file', e) + finally: + if zip_file is not None: + zip_file.close() + @Decorators.handle_errors def hash(self) -> str: """ Calculates the first 28 bytes of a websafe base64 encoded SHA512 of the upload. """ @@ -226,18 +313,8 @@ class UploadFile(ObjectFile): """ os.makedirs(os.path.join(config.fs.tmp, 'uploads_extracted'), exist_ok=True) - zipFile = None - try: - zipFile = ZipFile(self.os_path) - zipFile.extractall(self.upload_extract_dir) - self.filelist = [ - zipInfo.filename for zipInfo in zipFile.filelist - if not zipInfo.filename.endswith('/')] - except BadZipFile as e: - raise FileError('Upload is not a zip file', e) - finally: - if zipFile is not None: - zipFile.close() + with self._zip() as zip_file: + zip_file.extractall(self.upload_extract_dir) self.logger.debug('extracted uploaded file') @@ -270,12 +347,32 @@ class UploadFile(ObjectFile): Only works on extracted uploads. The given filename must be one of the name in ``self.filelist``. """ - return File(os.path.join(self.upload_extract_dir, filename)) + if self.is_extracted: + return File(os.path.join(self.upload_extract_dir, filename)) + else: + return ZippedFile(self.os_path, filename) @property def is_valid(self): return is_zipfile(self.os_path) + def get_siblings(self, filename: str) -> Generator[str, None, None]: + """ + Returns the names of all files that share the same prefix (object id), + respectively are part of the same directory (incl. files in sub directories). + In nomad terms, the aux files the this file. Returned siblings are relative + to this files directory. + """ + dirname = os.path.dirname(filename) + dirname_len = len(dirname) + 1 + for other in self.filelist: + if other.startswith(dirname) and other != filename: + yield other[dirname_len:] + + def get_sibling_file(self, filename: str, sibling: str) -> File: + sibling_name = os.path.join(os.path.dirname(filename), sibling) + return self.get_file(sibling_name) + class ArchiveFile(ObjectFile): """ @@ -298,44 +395,40 @@ class ArchiveFile(ObjectFile): @contextmanager def write_archive_json(self) -> Generator[TextIO, None, None]: """ Context manager that yields a file-like to write the archive json. """ - if config.files.compress_archive: - binary_out = self.open('wb') - gzip_wrapper = cast(TextIO, gzip.open(binary_out, 'wt')) - out = gzip_wrapper - else: - binary_out = self.open('wb') - text_wrapper = io.TextIOWrapper(binary_out, encoding='utf-8') - out = text_wrapper + with self.open('wb') as binary_out: + if config.files.compress_archive: + gzip_wrapper = cast(TextIO, gzip.open(binary_out, 'wt')) + out = gzip_wrapper + else: + text_wrapper = io.TextIOWrapper(binary_out, encoding='utf-8') + out = text_wrapper - try: - yield out - finally: - out.flush() - out.close() - binary_out.close() + try: + yield out + finally: + out.flush() + out.close() self.logger.debug('archive file written') @contextmanager def read_archive_json(self) -> Generator[TextIO, None, None]: """ Context manager that yields a file-like to read the archive json. """ - try: - if config.files.compress_archive: - binary_in = self.open(mode='rb') - gzip_wrapper = cast(TextIO, gzip.open(binary_in, 'rt')) - in_file = gzip_wrapper - else: - binary_in = self.open(mode='rb') - text_wrapper = io.TextIOWrapper(binary_in, encoding='utf-8') - in_file = text_wrapper - except FileNotFoundError: - raise KeyError() - - try: - yield in_file - finally: - in_file.close() - binary_in.close() + with self.open(mode='rb') as binary_in: + try: + if config.files.compress_archive: + gzip_wrapper = cast(TextIO, gzip.open(binary_in, 'rt')) + in_file = gzip_wrapper + else: + text_wrapper = io.TextIOWrapper(binary_in, encoding='utf-8') + in_file = text_wrapper + except FileNotFoundError: + raise KeyError() + + try: + yield in_file + finally: + in_file.close() self.logger.debug('archive file read') diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 8c83e0f350..9befc1affe 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -24,7 +24,7 @@ calculations, and files :members: """ -from typing import List, Any +from typing import List, Any, ContextManager from datetime import datetime from elasticsearch.exceptions import NotFoundError from mongoengine import StringField, BooleanField, DateTimeField, DictField, IntField @@ -81,6 +81,7 @@ class Calc(Proc): self._upload = None self._calc_proc_logwriter = None self._calc_proc_logfile = None + self._calc_proc_logwriter_ctx: ContextManager = None @classmethod def get(cls, id): @@ -130,9 +131,10 @@ class Calc(Proc): if self._calc_proc_logwriter is None: self._calc_proc_logfile = ArchiveLogFile(self.archive_id) - self._calc_proc_logwriter = self._calc_proc_logfile.open('wt') + self._calc_proc_logwriter_ctx = self._calc_proc_logfile.open('wt') + self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__() # pylint: disable=E1101 - def save_to_cacl_log(logger, method_name, event_dict): + def save_to_calc_log(logger, method_name, event_dict): program = event_dict.get('normalizer', 'parser') event = event_dict.get('event', '') entry = '[%s] %s: %s' % (method_name, program, event) @@ -144,7 +146,7 @@ class Calc(Proc): self._calc_proc_logwriter.write('\n') return event_dict - return wrap_logger(logger, processors=[save_to_cacl_log]) + return wrap_logger(logger, processors=[save_to_calc_log]) @property def json_dict(self): @@ -251,7 +253,7 @@ class Calc(Proc): with utils.timer( logger, 'archived log', step='archive_log', input_size=self.mainfile_file.size) as log_data: - self._calc_proc_logwriter.close() + self._calc_proc_logwriter_ctx.__exit__(None, None, None) # pylint: disable=E1101 self._calc_proc_logwriter = None log_data.update(log_size=self._calc_proc_logfile.size) @@ -456,16 +458,17 @@ class Upload(Chord): for parser in parsers: try: potential_mainfile = self._upload.get_file(filename) - if parser.is_mainfile(filename, lambda fn: potential_mainfile.open()): - mainfile_path = potential_mainfile.os_path - calc = Calc.create( - archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)), - mainfile=filename, parser=parser.name, - mainfile_tmp_path=mainfile_path, - upload_id=self.upload_id) - - calc.process() - total_calcs += 1 + with potential_mainfile.open() as mainfile_f: + if parser.is_mainfile(filename, lambda fn: mainfile_f): + mainfile_path = potential_mainfile.os_path + calc = Calc.create( + archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)), + mainfile=filename, parser=parser.name, + mainfile_tmp_path=mainfile_path, + upload_id=self.upload_id) + + calc.process() + total_calcs += 1 except Exception as e: self.error( 'exception while matching pot. mainfile', diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index 8e02576017..05e8988758 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -79,7 +79,8 @@ def assert_processing(upload: Upload): assert calc.status == 'SUCCESS', calc.archive_id assert ArchiveFile(calc.archive_id).exists() assert ArchiveLogFile(calc.archive_id).exists() - assert 'a test' in ArchiveLogFile(calc.archive_id).open('rt').read() + with ArchiveLogFile(calc.archive_id).open('rt') as f: + assert 'a test' in f.read() assert len(calc.errors) == 0 diff --git a/tests/test_files.py b/tests/test_files.py index 26885f9485..798a309b9e 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -44,9 +44,8 @@ def clear_files(): class TestObjects: @pytest.fixture() def existing_example_file(self, clear_files): - out = ObjectFile(example_bucket, 'example_file', ext='json').open(mode='wt') - json.dump(example_data, out) - out.close() + with ObjectFile(example_bucket, 'example_file', ext='json').open(mode='wt') as out: + json.dump(example_data, out) yield 'example_file', 'json' @@ -65,9 +64,8 @@ class TestObjects: name, ext = existing_example_file assert ObjectFile(example_bucket, name, ext).exists() - file = ObjectFile(example_bucket, name, ext=ext).open() - json.load(file) - file.close() + with ObjectFile(example_bucket, name, ext=ext).open() as f: + json.load(f) def test_delete(self, existing_example_file): name, ext = existing_example_file @@ -133,17 +131,29 @@ class TestUploadFile: shutil.copyfile(example_file, upload.os_path) yield upload - def test_upload(self, upload: UploadFile): + def assert_upload(self, upload: UploadFile): assert upload.exists() + assert len(upload.filelist) == 5 + has_json = False + for filename in upload.filelist: + the_file = upload.get_file(filename) + assert the_file.exists() + assert the_file.size >= 0 + if the_file.path.endswith('.json'): + has_json = True + assert the_file.size > 0 + with the_file.open() as f: + f.read() + break + assert has_json + + def test_upload_extracted(self, upload: UploadFile): with upload: - assert len(upload.filelist) == 5 - # now just try to open the first file (not directory), without error - for filename in upload.filelist: - the_file = upload.get_file(filename) - if the_file.os_path.endswith('.xml'): - the_file.open().close() - break + self.assert_upload(upload) + + def test_upload_not_extracted(self, upload: UploadFile): + self.assert_upload(upload) def test_delete_upload(self, upload: UploadFile): upload.delete() @@ -158,6 +168,12 @@ class TestUploadFile: with upload_same_file: assert hash == upload_same_file.hash() + def test_siblings(self, upload: UploadFile, no_warn): + with upload: + siblings = list(upload.get_siblings('examples_template/template.json')) + assert len(siblings) == 4 + assert all(sibling.endswith('.aux') for sibling in siblings) + class TestLocalUploadFile(TestUploadFile): @pytest.fixture() @@ -178,9 +194,8 @@ class TestLocalUploadFile(TestUploadFile): @pytest.fixture(scope='function') def archive_log(clear_files, archive_config): archive_log = ArchiveLogFile('__test_upload_hash/__test_calc_hash') - f = archive_log.open('wt') - f.write('This is a test') - f.close() + with archive_log.open('wt') as f: + f.write('This is a test') yield archive_log @@ -189,4 +204,5 @@ class TestArchiveLogFile: def test_archive_log_file(self, archive_log): assert archive_log.exists() - assert 'test' in archive_log.open('rt').read() + with archive_log.open('rt') as f: + assert 'test' in f.read() -- GitLab