diff --git a/nomad/files.py b/nomad/files.py index 72f743563a746d5bedd294190b6f875ddf9f3c8e..44f76cf7a4d461790925fa14fc8ef91e60f0544b 100644 --- a/nomad/files.py +++ b/nomad/files.py @@ -29,7 +29,7 @@ Uploads :members: """ -from typing import List, Any, Generator, IO, TextIO, cast +from typing import List, Generator, IO, TextIO, cast import os import os.path from zipfile import ZipFile, BadZipFile, is_zipfile @@ -42,6 +42,51 @@ import shutil from nomad import config, utils +class File: + """ + Base class for handling a file. Allows to open (read, write) and delete files. + + Arguments: + os_path: The path to the file in the os filesystem. + + Attributes: + logger: A structured logger with bucket and object information. + """ + def __init__(self, os_path: str = None) -> None: + self.os_path = os_path + + self.logger = self.bind_logger(utils.get_logger(__name__)) + + def bind_logger(self, logger): + """ Adds context information to the given logger and returns it. """ + return logger.bind(path=self.os_path) + + def open(self, *args, **kwargs) -> IO: + """ Opens the object with he given mode, etc. """ + self.logger.debug('open file') + try: + return open(self.os_path, *args, **kwargs) + except FileNotFoundError: + raise KeyError() + + def delete(self) -> None: + """ Deletes the file. """ + try: + os.remove(self.os_path) + self.logger.debug('file deleted') + except FileNotFoundError: + raise KeyError() + + def exists(self) -> bool: + """ Returns true if object exists. """ + return os.path.exists(self.os_path) + + @property + def size(self) -> int: + """ Returns the os determined file size. """ + return os.stat(self.os_path).st_size + + class Objects: @classmethod def _os_path(cls, bucket: str, name: str, ext: str) -> str: @@ -70,9 +115,11 @@ class Objects: pass -class File: +class ObjectFile(File): """ Base class for file objects. Allows to open (read, write) and delete objects. + File objects filesystem location is govern by its bucket, object_id, and ext. + This object store location can be overriden with a local_path. Arguments: bucket (str): The 'bucket' for this object. @@ -82,42 +129,28 @@ class File: Attributes: logger: A structured logger with bucket and object information. + has_local_path: True, if this object is stored somewhere else in the fs. """ - def __init__(self, bucket: str, object_id: str, ext: str = None) -> None: + def __init__(self, bucket: str, object_id: str, ext: str = None, local_path: str = None) -> None: self.bucket = bucket self.object_id = object_id self.ext = ext - self.logger = self.bind_logger(utils.get_logger(__name__)) + self.has_local_path = local_path is not None + path = Objects._os_path(self.bucket, self.object_id, self.ext) + path = local_path if self.has_local_path else path + + super().__init__(path) def bind_logger(self, logger): """ Adds context information to the given logger and returns it. """ - return logger.bind(bucket=self.bucket, object=self.object_id) - - def open(self, *args, **kwargs) -> IO: - """ Opens the object with he given mode, etc. """ - self.logger.debug('open file') - try: - return open(self.os_path, *args, **kwargs) - except FileNotFoundError: - raise KeyError() + return super().bind_logger(logger).bind(bucket=self.bucket, object=self.object_id) def delete(self) -> None: - """ Deletes the file with the given object id. """ - try: - os.remove(self.os_path) - self.logger.debug('file deleted') - except FileNotFoundError: - raise KeyError() - - def exists(self) -> bool: - """ Returns true if object exists. """ - return os.path.exists(self.os_path) - - @property - def os_path(self) -> str: - """ The path of the object in the os filesystem. """ - return Objects._os_path(self.bucket, self.object_id, self.ext) + """ Deletes the file, if it has not a localpath. Localpath files are never deleted. """ + # Do not delete local files, no matter what + if not self.has_local_path: + super().delete() class FileError(Exception): @@ -125,7 +158,7 @@ class FileError(Exception): super().__init__(msg, cause) -class UploadFile(File): +class UploadFile(ObjectFile): """ Instances represent an uploaded file in the *object storage*. Class is a conext manager and supports the `with` statements. @@ -153,11 +186,11 @@ class UploadFile(File): super().__init__( bucket=config.files.uploads_bucket, object_id=upload_id, - ext='zip') + ext='zip', + local_path=local_path) self.upload_extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id) self.filelist: List[str] = None - self._local_path = local_path def bind_logger(self, logger): return super().bind_logger(logger).bind(upload_id=self.object_id) @@ -176,10 +209,6 @@ class UploadFile(File): raise FileError(msg, e) return wrapper - @property - def os_path(self): - return self._local_path if self._local_path is not None else super().os_path - @Decorators.handle_errors def hash(self) -> str: """ Calculates the first 28 bytes of a websafe base64 encoded SHA512 of the upload. """ @@ -235,31 +264,20 @@ class UploadFile(File): def __exit__(self, exc_type, exc, exc_tb): self.remove_extract() - @Decorators.handle_errors - def open_file(self, filename: str, *args, **kwargs) -> IO[Any]: - """ Opens a file within an open upload and returns a file like. """ - return open(self.get_path(filename), *args, **kwargs) - - def get_path(self, filename: str) -> str: - """ Returns the tmp directory relative version of a filename. """ - return os.path.join(self.upload_extract_dir, filename) - - def delete(self) -> None: - """ Deletes the file with the given object id. """ - # Do not delete local files, no matter what - if self._local_path is None: - try: - os.remove(self.os_path) - self.logger.debug('file deleted') - except FileNotFoundError: - raise KeyError() + def get_file(self, filename: str) -> File: + """ + Returns a :class:`File` instance as a handle to the file with the given name. + Only works on extracted uploads. The given filename must be one of the + name in ``self.filelist``. + """ + return File(os.path.join(self.upload_extract_dir, filename)) @property def is_valid(self): return is_zipfile(self.os_path) -class ArchiveFile(File): +class ArchiveFile(ObjectFile): """ Represents the archive file for an individual calculation. Allows to write the archive, read the archive, delete the archive. @@ -331,7 +349,7 @@ class ArchiveFile(File): .debug('archive files deleted') -class ArchiveLogFile(File): +class ArchiveLogFile(ObjectFile): """ Represents a log file that was created for processing a single calculation to create an archive. diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 73d844b13739e4d607096981cc58bdfe28722d68..bc8a9b81ce9847d235a5d172831d6095b66aaad6 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -34,7 +34,7 @@ import time from structlog import wrap_logger from nomad import config, utils -from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile +from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, File from nomad.repo import RepoCalc from nomad.user import User from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE, RUNNING @@ -80,11 +80,16 @@ class Calc(Proc): self._parser_backend = None self._upload = None self._calc_proc_logwriter = None + self._calc_proc_logfile = None @classmethod def get(cls, id): return cls.get_by_id(id, 'archive_id') + @property + def mainfile_file(self) -> File: + return File(self.mainfile_tmp_path) + def delete(self): """ Delete this calculation and all associated data. This includes all files, @@ -123,7 +128,8 @@ class Calc(Proc): logger = self.get_logger(**kwargs) if self._calc_proc_logwriter is None: - self._calc_proc_logwriter = ArchiveLogFile(self.archive_id).open('wt') + self._calc_proc_logfile = ArchiveLogFile(self.archive_id) + self._calc_proc_logwriter = self._calc_proc_logfile.open('wt') def save_to_cacl_log(logger, method_name, event_dict): program = event_dict.get('normalizer', 'parser') @@ -178,8 +184,12 @@ class Calc(Proc): def parsing(self): logger = self.get_calc_logger(parser=self.parser) parser = parser_dict[self.parser] - with utils.timer(logger, 'parser executed', step=self.parser): + + with utils.timer( + logger, 'parser executed', step=self.parser, + input_size=self.mainfile_file.size): self._parser_backend = parser.run(self.mainfile_tmp_path, logger=logger) + if self._parser_backend.status[0] != 'ParseSuccess': logger.error(self._parser_backend.status[1]) error = self._parser_backend.status[1] @@ -190,8 +200,12 @@ class Calc(Proc): for normalizer in normalizers: normalizer_name = normalizer.__name__ logger = self.get_calc_logger(normalizer=normalizer_name) - with utils.timer(logger, 'normalizer executed', step=normalizer_name): + + with utils.timer( + logger, 'normalizer executed', step=normalizer_name, + input_size=self.mainfile_file.size): normalizer(self._parser_backend).normalize(logger=logger) + if self._parser_backend.status[0] != 'ParseSuccess': logger.error(self._parser_backend.status[1]) error = self._parser_backend.status[1] @@ -221,17 +235,27 @@ class Calc(Proc): calc_hash=calc_hash, upload_id=self.upload_id) - with utils.timer(logger, 'archived', step='archive'): + with utils.timer( + logger, 'archived', step='archive', + input_size=self.mainfile_file.size) as log_data: + # persist the archive - with ArchiveFile(self.archive_id).write_archive_json() as out: + archive_file = ArchiveFile(self.archive_id) + with archive_file.write_archive_json() as out: self._parser_backend.write_json(out, pretty=True) - with utils.timer(logger, 'archived log', step='archive_log'): - # close loghandler - if self._calc_proc_logwriter is not None: + log_data.update(archive_size=archive_file.size) + + # close loghandler + if self._calc_proc_logwriter is not None: + with utils.timer( + logger, 'archived log', step='archive_log', + input_size=self.mainfile_file.size) as log_data: self._calc_proc_logwriter.close() self._calc_proc_logwriter = None + log_data.update(log_size=self._calc_proc_logfile.size) + class Upload(Chord): """ @@ -399,8 +423,10 @@ class Upload(Chord): def extracting(self): logger = self.get_logger() try: - with utils.timer(logger, 'upload extracted', step='extracting'): - self._upload = UploadFile(self.upload_id, local_path=self.local_path) + self._upload = UploadFile(self.upload_id, local_path=self.local_path) + with utils.timer( + logger, 'upload extracted', step='extracting', + upload_size=self._upload.size): self._upload.extract() except KeyError as e: self.fail('process request for non existing upload', level=logging.INFO) @@ -421,17 +447,21 @@ class Upload(Chord): logger = self.get_logger() # TODO: deal with multiple possible parser specs - with utils.timer(logger, 'upload extracted', step='matching'): + with utils.timer( + logger, 'upload extracted', step='matching', + upload_size=self._upload.size, + upload_filecount=len(self._upload.filelist)): total_calcs = 0 for filename in self._upload.filelist: for parser in parsers: try: - if parser.is_mainfile(filename, lambda fn: self._upload.open_file(fn)): - tmp_mainfile = self._upload.get_path(filename) + potential_mainfile = self._upload.get_file(filename) + if parser.is_mainfile(filename, lambda fn: potential_mainfile.open()): + mainfile_path = potential_mainfile.os_path calc = Calc.create( archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)), mainfile=filename, parser=parser.name, - mainfile_tmp_path=tmp_mainfile, + mainfile_tmp_path=mainfile_path, upload_id=self.upload_id) calc.process() @@ -450,13 +480,15 @@ class Upload(Chord): @task def cleanup(self): try: - with utils.timer(self.get_logger(), 'processing cleaned up', step='cleaning'): - upload = UploadFile(self.upload_id, local_path=self.local_path) + upload = UploadFile(self.upload_id, local_path=self.local_path) + with utils.timer( + self.get_logger(), 'processing cleaned up', step='cleaning', + upload_size=upload.size): + upload.remove_extract() except KeyError as e: self.fail('Upload does not exist', exc_info=e) return - upload.remove_extract() self.get_logger().debug('closed upload') @property diff --git a/nomad/utils.py b/nomad/utils.py index 11c7a2d879d67bec63f9a6af91f2227d5933d95a..1aa91c1468c0683ac3411dfe2d32c80baab4ff60 100644 --- a/nomad/utils.py +++ b/nomad/utils.py @@ -197,10 +197,23 @@ def lnr(logger, event, **kwargs): @contextmanager def timer(logger, event, method='info', **kwargs): + """ + A context manager that takes execution time and produces a log entry with said time. + + Arguments: + logger: The logger that should be used to produce the log entry. + event: The log message/event. + method: The log methad that should be used. Must be a valid logger method name. + Default is 'info'. + **kwargs: Aditional logger data that is passed to the log entry. + + Returns: + The method yields a dictionary that can be used to add further log data. + """ start = time.time() try: - yield + yield kwargs finally: stop = time.time() diff --git a/tests/test_files.py b/tests/test_files.py index a1626f73e9c1696dad6f24ddd3fa33ad224f6206..6c1c414e7e1b4a3a3f77e13a0dcf7c6111a9f2a9 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -16,7 +16,7 @@ import pytest import json import shutil -from nomad.files import Objects, File, ArchiveFile, UploadFile, ArchiveLogFile +from nomad.files import Objects, ObjectFile, ArchiveFile, UploadFile, ArchiveLogFile from nomad import config # example_file uses an artificial parser for faster test execution, can also be @@ -44,29 +44,40 @@ def clear_files(): class TestObjects: @pytest.fixture() def existing_example_file(self, clear_files): - out = File(example_bucket, 'example_file', ext='json').open(mode='wt') + out = ObjectFile(example_bucket, 'example_file', ext='json').open(mode='wt') json.dump(example_data, out) out.close() yield 'example_file', 'json' + def test_size(self, existing_example_file): + name, ext = existing_example_file + assert ObjectFile(example_bucket, name, ext).size > 0 + + def test_exists(self, existing_example_file): + name, ext = existing_example_file + assert ObjectFile(example_bucket, name, ext).exists() + + def test_not_exists(self): + assert not ObjectFile(example_bucket, 'does_not_exist').exists() + def test_open(self, existing_example_file): name, ext = existing_example_file - assert File(example_bucket, name, ext).exists() - file = File(example_bucket, name, ext=ext).open() + assert ObjectFile(example_bucket, name, ext).exists() + file = ObjectFile(example_bucket, name, ext=ext).open() json.load(file) file.close() def test_delete(self, existing_example_file): name, ext = existing_example_file - File(example_bucket, name, ext).delete() - assert not File(example_bucket, name, ext).exists() + ObjectFile(example_bucket, name, ext).delete() + assert not ObjectFile(example_bucket, name, ext).exists() def test_delete_all(self, existing_example_file): name, ext = existing_example_file Objects.delete_all(example_bucket) - assert not File(example_bucket, name, ext).exists() + assert not ObjectFile(example_bucket, name, ext).exists() @pytest.fixture(scope='function', params=[False, True]) @@ -129,8 +140,10 @@ class TestUploadFile: assert len(upload.filelist) == 5 # now just try to open the first file (not directory), without error for filename in upload.filelist: - if filename.endswith('.xml'): - upload.open_file(filename).close() + the_file = upload.get_file(filename) + if the_file.os_path.endswith('.xml'): + the_file.open() + the_file.close() break def test_delete_upload(self, upload: UploadFile):