Commit 5c870f53 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added filesizes to APM.

parent 1934d86a
Pipeline #37482 failed with stages
in 6 minutes and 22 seconds
......@@ -29,7 +29,7 @@ Uploads
:members:
"""
from typing import List, Any, Generator, IO, TextIO, cast
from typing import List, Generator, IO, TextIO, cast
import os
import os.path
from zipfile import ZipFile, BadZipFile, is_zipfile
......@@ -42,6 +42,51 @@ import shutil
from nomad import config, utils
class File:
"""
Base class for handling a file. Allows to open (read, write) and delete files.
Arguments:
os_path: The path to the file in the os filesystem.
Attributes:
logger: A structured logger with bucket and object information.
"""
def __init__(self, os_path: str = None) -> None:
self.os_path = os_path
self.logger = self.bind_logger(utils.get_logger(__name__))
def bind_logger(self, logger):
""" Adds context information to the given logger and returns it. """
return logger.bind(path=self.os_path)
def open(self, *args, **kwargs) -> IO:
""" Opens the object with he given mode, etc. """
self.logger.debug('open file')
try:
return open(self.os_path, *args, **kwargs)
except FileNotFoundError:
raise KeyError()
def delete(self) -> None:
""" Deletes the file. """
try:
os.remove(self.os_path)
self.logger.debug('file deleted')
except FileNotFoundError:
raise KeyError()
def exists(self) -> bool:
""" Returns true if object exists. """
return os.path.exists(self.os_path)
@property
def size(self) -> int:
""" Returns the os determined file size. """
return os.stat(self.os_path).st_size
class Objects:
@classmethod
def _os_path(cls, bucket: str, name: str, ext: str) -> str:
......@@ -70,9 +115,11 @@ class Objects:
pass
class File:
class ObjectFile(File):
"""
Base class for file objects. Allows to open (read, write) and delete objects.
File objects filesystem location is govern by its bucket, object_id, and ext.
This object store location can be overriden with a local_path.
Arguments:
bucket (str): The 'bucket' for this object.
......@@ -82,42 +129,28 @@ class File:
Attributes:
logger: A structured logger with bucket and object information.
has_local_path: True, if this object is stored somewhere else in the fs.
"""
def __init__(self, bucket: str, object_id: str, ext: str = None) -> None:
def __init__(self, bucket: str, object_id: str, ext: str = None, local_path: str = None) -> None:
self.bucket = bucket
self.object_id = object_id
self.ext = ext
self.logger = self.bind_logger(utils.get_logger(__name__))
self.has_local_path = local_path is not None
path = Objects._os_path(self.bucket, self.object_id, self.ext)
path = local_path if self.has_local_path else path
super().__init__(path)
def bind_logger(self, logger):
""" Adds context information to the given logger and returns it. """
return logger.bind(bucket=self.bucket, object=self.object_id)
def open(self, *args, **kwargs) -> IO:
""" Opens the object with he given mode, etc. """
self.logger.debug('open file')
try:
return open(self.os_path, *args, **kwargs)
except FileNotFoundError:
raise KeyError()
return super().bind_logger(logger).bind(bucket=self.bucket, object=self.object_id)
def delete(self) -> None:
""" Deletes the file with the given object id. """
try:
os.remove(self.os_path)
self.logger.debug('file deleted')
except FileNotFoundError:
raise KeyError()
def exists(self) -> bool:
""" Returns true if object exists. """
return os.path.exists(self.os_path)
@property
def os_path(self) -> str:
""" The path of the object in the os filesystem. """
return Objects._os_path(self.bucket, self.object_id, self.ext)
""" Deletes the file, if it has not a localpath. Localpath files are never deleted. """
# Do not delete local files, no matter what
if not self.has_local_path:
super().delete()
class FileError(Exception):
......@@ -125,7 +158,7 @@ class FileError(Exception):
super().__init__(msg, cause)
class UploadFile(File):
class UploadFile(ObjectFile):
"""
Instances represent an uploaded file in the *object storage*. Class is a conext
manager and supports the `with` statements.
......@@ -153,11 +186,11 @@ class UploadFile(File):
super().__init__(
bucket=config.files.uploads_bucket,
object_id=upload_id,
ext='zip')
ext='zip',
local_path=local_path)
self.upload_extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id)
self.filelist: List[str] = None
self._local_path = local_path
def bind_logger(self, logger):
return super().bind_logger(logger).bind(upload_id=self.object_id)
......@@ -176,10 +209,6 @@ class UploadFile(File):
raise FileError(msg, e)
return wrapper
@property
def os_path(self):
return self._local_path if self._local_path is not None else super().os_path
@Decorators.handle_errors
def hash(self) -> str:
""" Calculates the first 28 bytes of a websafe base64 encoded SHA512 of the upload. """
......@@ -235,31 +264,20 @@ class UploadFile(File):
def __exit__(self, exc_type, exc, exc_tb):
self.remove_extract()
@Decorators.handle_errors
def open_file(self, filename: str, *args, **kwargs) -> IO[Any]:
""" Opens a file within an open upload and returns a file like. """
return open(self.get_path(filename), *args, **kwargs)
def get_path(self, filename: str) -> str:
""" Returns the tmp directory relative version of a filename. """
return os.path.join(self.upload_extract_dir, filename)
def delete(self) -> None:
""" Deletes the file with the given object id. """
# Do not delete local files, no matter what
if self._local_path is None:
try:
os.remove(self.os_path)
self.logger.debug('file deleted')
except FileNotFoundError:
raise KeyError()
def get_file(self, filename: str) -> File:
"""
Returns a :class:`File` instance as a handle to the file with the given name.
Only works on extracted uploads. The given filename must be one of the
name in ``self.filelist``.
"""
return File(os.path.join(self.upload_extract_dir, filename))
@property
def is_valid(self):
return is_zipfile(self.os_path)
class ArchiveFile(File):
class ArchiveFile(ObjectFile):
"""
Represents the archive file for an individual calculation. Allows to write the
archive, read the archive, delete the archive.
......@@ -331,7 +349,7 @@ class ArchiveFile(File):
.debug('archive files deleted')
class ArchiveLogFile(File):
class ArchiveLogFile(ObjectFile):
"""
Represents a log file that was created for processing a single calculation to create
an archive.
......
......@@ -34,7 +34,7 @@ import time
from structlog import wrap_logger
from nomad import config, utils
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, File
from nomad.repo import RepoCalc
from nomad.user import User
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE, RUNNING
......@@ -80,11 +80,16 @@ class Calc(Proc):
self._parser_backend = None
self._upload = None
self._calc_proc_logwriter = None
self._calc_proc_logfile = None
@classmethod
def get(cls, id):
return cls.get_by_id(id, 'archive_id')
@property
def mainfile_file(self) -> File:
return File(self.mainfile_tmp_path)
def delete(self):
"""
Delete this calculation and all associated data. This includes all files,
......@@ -123,7 +128,8 @@ class Calc(Proc):
logger = self.get_logger(**kwargs)
if self._calc_proc_logwriter is None:
self._calc_proc_logwriter = ArchiveLogFile(self.archive_id).open('wt')
self._calc_proc_logfile = ArchiveLogFile(self.archive_id)
self._calc_proc_logwriter = self._calc_proc_logfile.open('wt')
def save_to_cacl_log(logger, method_name, event_dict):
program = event_dict.get('normalizer', 'parser')
......@@ -178,8 +184,12 @@ class Calc(Proc):
def parsing(self):
logger = self.get_calc_logger(parser=self.parser)
parser = parser_dict[self.parser]
with utils.timer(logger, 'parser executed', step=self.parser):
with utils.timer(
logger, 'parser executed', step=self.parser,
input_size=self.mainfile_file.size):
self._parser_backend = parser.run(self.mainfile_tmp_path, logger=logger)
if self._parser_backend.status[0] != 'ParseSuccess':
logger.error(self._parser_backend.status[1])
error = self._parser_backend.status[1]
......@@ -190,8 +200,12 @@ class Calc(Proc):
for normalizer in normalizers:
normalizer_name = normalizer.__name__
logger = self.get_calc_logger(normalizer=normalizer_name)
with utils.timer(logger, 'normalizer executed', step=normalizer_name):
with utils.timer(
logger, 'normalizer executed', step=normalizer_name,
input_size=self.mainfile_file.size):
normalizer(self._parser_backend).normalize(logger=logger)
if self._parser_backend.status[0] != 'ParseSuccess':
logger.error(self._parser_backend.status[1])
error = self._parser_backend.status[1]
......@@ -221,17 +235,27 @@ class Calc(Proc):
calc_hash=calc_hash,
upload_id=self.upload_id)
with utils.timer(logger, 'archived', step='archive'):
with utils.timer(
logger, 'archived', step='archive',
input_size=self.mainfile_file.size) as log_data:
# persist the archive
with ArchiveFile(self.archive_id).write_archive_json() as out:
archive_file = ArchiveFile(self.archive_id)
with archive_file.write_archive_json() as out:
self._parser_backend.write_json(out, pretty=True)
with utils.timer(logger, 'archived log', step='archive_log'):
# close loghandler
if self._calc_proc_logwriter is not None:
log_data.update(archive_size=archive_file.size)
# close loghandler
if self._calc_proc_logwriter is not None:
with utils.timer(
logger, 'archived log', step='archive_log',
input_size=self.mainfile_file.size) as log_data:
self._calc_proc_logwriter.close()
self._calc_proc_logwriter = None
log_data.update(log_size=self._calc_proc_logfile.size)
class Upload(Chord):
"""
......@@ -399,8 +423,10 @@ class Upload(Chord):
def extracting(self):
logger = self.get_logger()
try:
with utils.timer(logger, 'upload extracted', step='extracting'):
self._upload = UploadFile(self.upload_id, local_path=self.local_path)
self._upload = UploadFile(self.upload_id, local_path=self.local_path)
with utils.timer(
logger, 'upload extracted', step='extracting',
upload_size=self._upload.size):
self._upload.extract()
except KeyError as e:
self.fail('process request for non existing upload', level=logging.INFO)
......@@ -421,17 +447,21 @@ class Upload(Chord):
logger = self.get_logger()
# TODO: deal with multiple possible parser specs
with utils.timer(logger, 'upload extracted', step='matching'):
with utils.timer(
logger, 'upload extracted', step='matching',
upload_size=self._upload.size,
upload_filecount=len(self._upload.filelist)):
total_calcs = 0
for filename in self._upload.filelist:
for parser in parsers:
try:
if parser.is_mainfile(filename, lambda fn: self._upload.open_file(fn)):
tmp_mainfile = self._upload.get_path(filename)
potential_mainfile = self._upload.get_file(filename)
if parser.is_mainfile(filename, lambda fn: potential_mainfile.open()):
mainfile_path = potential_mainfile.os_path
calc = Calc.create(
archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)),
mainfile=filename, parser=parser.name,
mainfile_tmp_path=tmp_mainfile,
mainfile_tmp_path=mainfile_path,
upload_id=self.upload_id)
calc.process()
......@@ -450,13 +480,15 @@ class Upload(Chord):
@task
def cleanup(self):
try:
with utils.timer(self.get_logger(), 'processing cleaned up', step='cleaning'):
upload = UploadFile(self.upload_id, local_path=self.local_path)
upload = UploadFile(self.upload_id, local_path=self.local_path)
with utils.timer(
self.get_logger(), 'processing cleaned up', step='cleaning',
upload_size=upload.size):
upload.remove_extract()
except KeyError as e:
self.fail('Upload does not exist', exc_info=e)
return
upload.remove_extract()
self.get_logger().debug('closed upload')
@property
......
......@@ -197,10 +197,23 @@ def lnr(logger, event, **kwargs):
@contextmanager
def timer(logger, event, method='info', **kwargs):
"""
A context manager that takes execution time and produces a log entry with said time.
Arguments:
logger: The logger that should be used to produce the log entry.
event: The log message/event.
method: The log methad that should be used. Must be a valid logger method name.
Default is 'info'.
**kwargs: Aditional logger data that is passed to the log entry.
Returns:
The method yields a dictionary that can be used to add further log data.
"""
start = time.time()
try:
yield
yield kwargs
finally:
stop = time.time()
......
......@@ -16,7 +16,7 @@ import pytest
import json
import shutil
from nomad.files import Objects, File, ArchiveFile, UploadFile, ArchiveLogFile
from nomad.files import Objects, ObjectFile, ArchiveFile, UploadFile, ArchiveLogFile
from nomad import config
# example_file uses an artificial parser for faster test execution, can also be
......@@ -44,29 +44,40 @@ def clear_files():
class TestObjects:
@pytest.fixture()
def existing_example_file(self, clear_files):
out = File(example_bucket, 'example_file', ext='json').open(mode='wt')
out = ObjectFile(example_bucket, 'example_file', ext='json').open(mode='wt')
json.dump(example_data, out)
out.close()
yield 'example_file', 'json'
def test_size(self, existing_example_file):
name, ext = existing_example_file
assert ObjectFile(example_bucket, name, ext).size > 0
def test_exists(self, existing_example_file):
name, ext = existing_example_file
assert ObjectFile(example_bucket, name, ext).exists()
def test_not_exists(self):
assert not ObjectFile(example_bucket, 'does_not_exist').exists()
def test_open(self, existing_example_file):
name, ext = existing_example_file
assert File(example_bucket, name, ext).exists()
file = File(example_bucket, name, ext=ext).open()
assert ObjectFile(example_bucket, name, ext).exists()
file = ObjectFile(example_bucket, name, ext=ext).open()
json.load(file)
file.close()
def test_delete(self, existing_example_file):
name, ext = existing_example_file
File(example_bucket, name, ext).delete()
assert not File(example_bucket, name, ext).exists()
ObjectFile(example_bucket, name, ext).delete()
assert not ObjectFile(example_bucket, name, ext).exists()
def test_delete_all(self, existing_example_file):
name, ext = existing_example_file
Objects.delete_all(example_bucket)
assert not File(example_bucket, name, ext).exists()
assert not ObjectFile(example_bucket, name, ext).exists()
@pytest.fixture(scope='function', params=[False, True])
......@@ -129,8 +140,10 @@ class TestUploadFile:
assert len(upload.filelist) == 5
# now just try to open the first file (not directory), without error
for filename in upload.filelist:
if filename.endswith('.xml'):
upload.open_file(filename).close()
the_file = upload.get_file(filename)
if the_file.os_path.endswith('.xml'):
the_file.open()
the_file.close()
break
def test_delete_upload(self, upload: UploadFile):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment