Commit a033bd0b authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added support for direct access to individual files in an upload.

parent fe99218e
......@@ -19,8 +19,11 @@ are basically paths. All major file system operations for dealing with
uploaded files, archive, files, raw files, etc. should be part of this module to
allow later introduction of real object storage systems.
Uploads
-------
.. note:: This module still uses ``os.path``. As long as the whole nomad runs on a
POSIX (or Windows) os everything should be fine. This means respective paths in the
dbs, and indices. In the future, this should be replaced with abstract path representations
ala ``PathLib``.
.. autoclass:: File
:members:
.. autoclass:: UploadFile
......@@ -51,6 +54,7 @@ class File:
Attributes:
logger: A structured logger with bucket and object information.
path: The abstract path of the file.
"""
def __init__(self, os_path: str = None) -> None:
self.os_path = os_path
......@@ -61,11 +65,13 @@ class File:
""" Adds context information to the given logger and returns it. """
return logger.bind(path=self.os_path)
def open(self, *args, **kwargs) -> IO:
@contextmanager
def open(self, *args, **kwargs) -> Generator[IO, None, None]:
""" Opens the object with he given mode, etc. """
self.logger.debug('open file')
try:
return open(self.os_path, *args, **kwargs)
with open(self.os_path, *args, **kwargs) as f:
yield f
except FileNotFoundError:
raise KeyError()
......@@ -86,6 +92,49 @@ class File:
""" Returns the os determined file size. """
return os.stat(self.os_path).st_size
@property
def path(self) -> str:
return self.os_path
class ZippedFile(File):
""" A file contained in a .zip archive. """
def __init__(self, zip_os_path: str, filename: str) -> None:
self.filename = filename
super().__init__(zip_os_path)
def bind_logger(self, logger):
return super().bind_logger(logger).bind(filename=self.filename)
@contextmanager
def open(self, *args, **kwargs) -> Generator[IO, None, None]:
self.logger.debug('open file')
try:
with ZipFile(self.os_path) as zip_file:
with zip_file.open(self.filename, *args, **kwargs) as f:
yield f
except FileNotFoundError:
raise KeyError()
except Exception as e:
msg = 'Could not read upload.'
self.logger.error(msg, exc_info=e)
raise FileError(msg, e)
def delete(self) -> None:
assert False, "A file in a zip archive cannot be deleted."
@property
def size(self) -> int:
with ZipFile(self.os_path) as zip_file:
return zip_file.getinfo(self.filename).file_size
@property
def path(self) -> str:
return os.path.join(
os.path.dirname(self.os_path),
os.path.basename(self.os_path),
self.filename)
class Objects:
@classmethod
......@@ -160,21 +209,28 @@ class FileError(Exception):
class UploadFile(ObjectFile):
"""
Instances represent an uploaded file in the *object storage*. Class is a conext
manager and supports the `with` statements.
In conext the upload will be extracted and contained files can be opened.
Some functions are only available for extracted uploads.
Instances of ``UploadFile`` represent an uploaded file in the *'object storage'*.
Currently only ``.zip`` files are supported.
Uploads are stored in their own *bucket*.
Uploads can be extracted to tmp storage (open/close), the list of files in
the upload is provided, and files can be opened for read. Extracting uploads
is optional, all functions in this module are also available without extracting.
This class is a context manager, that extracts the file when using a ``with``
statement with instances of this class.
UploadFiles are stored in their own *bucket*. But, storage can be overridden
by providing a ``local_path``. This is useful when the file is already stored
in nomad's distributed file system, e.g. for bulk processing of already uploaded
files.
Arguments:
upload_id: The upload of this uploaded file.
local_path: Optional override for the path used to store/access the upload
on the server. This can be usefull to create uploads for files that
were not uploaded but put to the server in another way, e.g. offline
processing, syncing with other data, etc.
local_path: Optional override for the path used to store/access the uploaded file.
Attributes:
is_extracted: True if the upload is extracted.
upload_extract_dir: The path of the tmp directory with the extracted contents.
filelist: A list of filenames relative to the .zipped upload root.
"""
......@@ -190,7 +246,7 @@ class UploadFile(ObjectFile):
local_path=local_path)
self.upload_extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id)
self.filelist: List[str] = None
self._filelist: List[str] = None
def bind_logger(self, logger):
return super().bind_logger(logger).bind(upload_id=self.object_id)
......@@ -209,6 +265,37 @@ class UploadFile(ObjectFile):
raise FileError(msg, e)
return wrapper
@property
def filelist(self) -> List[str]:
@UploadFile.Decorators.handle_errors
def get_filelist(self):
with self._zip() as zip_file:
return [
zip_info.filename for zip_info in zip_file.filelist
if not zip_info.filename.endswith('/')]
if not self._filelist:
self._filelist = get_filelist(self)
return self._filelist
@property
def is_extracted(self) -> bool:
return os.path.exists(self.upload_extract_dir)
@contextmanager
def _zip(self):
assert self.exists(), "Can only access uploaded file if it exists."
zip_file = None
try:
zip_file = ZipFile(self.os_path)
yield zip_file
except BadZipFile as e:
raise FileError('Upload is not a zip file', e)
finally:
if zip_file is not None:
zip_file.close()
@Decorators.handle_errors
def hash(self) -> str:
""" Calculates the first 28 bytes of a websafe base64 encoded SHA512 of the upload. """
......@@ -226,18 +313,8 @@ class UploadFile(ObjectFile):
"""
os.makedirs(os.path.join(config.fs.tmp, 'uploads_extracted'), exist_ok=True)
zipFile = None
try:
zipFile = ZipFile(self.os_path)
zipFile.extractall(self.upload_extract_dir)
self.filelist = [
zipInfo.filename for zipInfo in zipFile.filelist
if not zipInfo.filename.endswith('/')]
except BadZipFile as e:
raise FileError('Upload is not a zip file', e)
finally:
if zipFile is not None:
zipFile.close()
with self._zip() as zip_file:
zip_file.extractall(self.upload_extract_dir)
self.logger.debug('extracted uploaded file')
......@@ -270,12 +347,32 @@ class UploadFile(ObjectFile):
Only works on extracted uploads. The given filename must be one of the
name in ``self.filelist``.
"""
return File(os.path.join(self.upload_extract_dir, filename))
if self.is_extracted:
return File(os.path.join(self.upload_extract_dir, filename))
else:
return ZippedFile(self.os_path, filename)
@property
def is_valid(self):
return is_zipfile(self.os_path)
def get_siblings(self, filename: str) -> Generator[str, None, None]:
"""
Returns the names of all files that share the same prefix (object id),
respectively are part of the same directory (incl. files in sub directories).
In nomad terms, the aux files the this file. Returned siblings are relative
to this files directory.
"""
dirname = os.path.dirname(filename)
dirname_len = len(dirname) + 1
for other in self.filelist:
if other.startswith(dirname) and other != filename:
yield other[dirname_len:]
def get_sibling_file(self, filename: str, sibling: str) -> File:
sibling_name = os.path.join(os.path.dirname(filename), sibling)
return self.get_file(sibling_name)
class ArchiveFile(ObjectFile):
"""
......@@ -298,44 +395,40 @@ class ArchiveFile(ObjectFile):
@contextmanager
def write_archive_json(self) -> Generator[TextIO, None, None]:
""" Context manager that yields a file-like to write the archive json. """
if config.files.compress_archive:
binary_out = self.open('wb')
gzip_wrapper = cast(TextIO, gzip.open(binary_out, 'wt'))
out = gzip_wrapper
else:
binary_out = self.open('wb')
text_wrapper = io.TextIOWrapper(binary_out, encoding='utf-8')
out = text_wrapper
with self.open('wb') as binary_out:
if config.files.compress_archive:
gzip_wrapper = cast(TextIO, gzip.open(binary_out, 'wt'))
out = gzip_wrapper
else:
text_wrapper = io.TextIOWrapper(binary_out, encoding='utf-8')
out = text_wrapper
try:
yield out
finally:
out.flush()
out.close()
binary_out.close()
try:
yield out
finally:
out.flush()
out.close()
self.logger.debug('archive file written')
@contextmanager
def read_archive_json(self) -> Generator[TextIO, None, None]:
""" Context manager that yields a file-like to read the archive json. """
try:
if config.files.compress_archive:
binary_in = self.open(mode='rb')
gzip_wrapper = cast(TextIO, gzip.open(binary_in, 'rt'))
in_file = gzip_wrapper
else:
binary_in = self.open(mode='rb')
text_wrapper = io.TextIOWrapper(binary_in, encoding='utf-8')
in_file = text_wrapper
except FileNotFoundError:
raise KeyError()
try:
yield in_file
finally:
in_file.close()
binary_in.close()
with self.open(mode='rb') as binary_in:
try:
if config.files.compress_archive:
gzip_wrapper = cast(TextIO, gzip.open(binary_in, 'rt'))
in_file = gzip_wrapper
else:
text_wrapper = io.TextIOWrapper(binary_in, encoding='utf-8')
in_file = text_wrapper
except FileNotFoundError:
raise KeyError()
try:
yield in_file
finally:
in_file.close()
self.logger.debug('archive file read')
......
......@@ -24,7 +24,7 @@ calculations, and files
:members:
"""
from typing import List, Any
from typing import List, Any, ContextManager
from datetime import datetime
from elasticsearch.exceptions import NotFoundError
from mongoengine import StringField, BooleanField, DateTimeField, DictField, IntField
......@@ -81,6 +81,7 @@ class Calc(Proc):
self._upload = None
self._calc_proc_logwriter = None
self._calc_proc_logfile = None
self._calc_proc_logwriter_ctx: ContextManager = None
@classmethod
def get(cls, id):
......@@ -130,9 +131,10 @@ class Calc(Proc):
if self._calc_proc_logwriter is None:
self._calc_proc_logfile = ArchiveLogFile(self.archive_id)
self._calc_proc_logwriter = self._calc_proc_logfile.open('wt')
self._calc_proc_logwriter_ctx = self._calc_proc_logfile.open('wt')
self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__() # pylint: disable=E1101
def save_to_cacl_log(logger, method_name, event_dict):
def save_to_calc_log(logger, method_name, event_dict):
program = event_dict.get('normalizer', 'parser')
event = event_dict.get('event', '')
entry = '[%s] %s: %s' % (method_name, program, event)
......@@ -144,7 +146,7 @@ class Calc(Proc):
self._calc_proc_logwriter.write('\n')
return event_dict
return wrap_logger(logger, processors=[save_to_cacl_log])
return wrap_logger(logger, processors=[save_to_calc_log])
@property
def json_dict(self):
......@@ -251,7 +253,7 @@ class Calc(Proc):
with utils.timer(
logger, 'archived log', step='archive_log',
input_size=self.mainfile_file.size) as log_data:
self._calc_proc_logwriter.close()
self._calc_proc_logwriter_ctx.__exit__(None, None, None) # pylint: disable=E1101
self._calc_proc_logwriter = None
log_data.update(log_size=self._calc_proc_logfile.size)
......@@ -456,16 +458,17 @@ class Upload(Chord):
for parser in parsers:
try:
potential_mainfile = self._upload.get_file(filename)
if parser.is_mainfile(filename, lambda fn: potential_mainfile.open()):
mainfile_path = potential_mainfile.os_path
calc = Calc.create(
archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)),
mainfile=filename, parser=parser.name,
mainfile_tmp_path=mainfile_path,
upload_id=self.upload_id)
calc.process()
total_calcs += 1
with potential_mainfile.open() as mainfile_f:
if parser.is_mainfile(filename, lambda fn: mainfile_f):
mainfile_path = potential_mainfile.os_path
calc = Calc.create(
archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)),
mainfile=filename, parser=parser.name,
mainfile_tmp_path=mainfile_path,
upload_id=self.upload_id)
calc.process()
total_calcs += 1
except Exception as e:
self.error(
'exception while matching pot. mainfile',
......
......@@ -79,7 +79,8 @@ def assert_processing(upload: Upload):
assert calc.status == 'SUCCESS', calc.archive_id
assert ArchiveFile(calc.archive_id).exists()
assert ArchiveLogFile(calc.archive_id).exists()
assert 'a test' in ArchiveLogFile(calc.archive_id).open('rt').read()
with ArchiveLogFile(calc.archive_id).open('rt') as f:
assert 'a test' in f.read()
assert len(calc.errors) == 0
......
......@@ -44,9 +44,8 @@ def clear_files():
class TestObjects:
@pytest.fixture()
def existing_example_file(self, clear_files):
out = ObjectFile(example_bucket, 'example_file', ext='json').open(mode='wt')
json.dump(example_data, out)
out.close()
with ObjectFile(example_bucket, 'example_file', ext='json').open(mode='wt') as out:
json.dump(example_data, out)
yield 'example_file', 'json'
......@@ -65,9 +64,8 @@ class TestObjects:
name, ext = existing_example_file
assert ObjectFile(example_bucket, name, ext).exists()
file = ObjectFile(example_bucket, name, ext=ext).open()
json.load(file)
file.close()
with ObjectFile(example_bucket, name, ext=ext).open() as f:
json.load(f)
def test_delete(self, existing_example_file):
name, ext = existing_example_file
......@@ -133,17 +131,29 @@ class TestUploadFile:
shutil.copyfile(example_file, upload.os_path)
yield upload
def test_upload(self, upload: UploadFile):
def assert_upload(self, upload: UploadFile):
assert upload.exists()
assert len(upload.filelist) == 5
has_json = False
for filename in upload.filelist:
the_file = upload.get_file(filename)
assert the_file.exists()
assert the_file.size >= 0
if the_file.path.endswith('.json'):
has_json = True
assert the_file.size > 0
with the_file.open() as f:
f.read()
break
assert has_json
def test_upload_extracted(self, upload: UploadFile):
with upload:
assert len(upload.filelist) == 5
# now just try to open the first file (not directory), without error
for filename in upload.filelist:
the_file = upload.get_file(filename)
if the_file.os_path.endswith('.xml'):
the_file.open().close()
break
self.assert_upload(upload)
def test_upload_not_extracted(self, upload: UploadFile):
self.assert_upload(upload)
def test_delete_upload(self, upload: UploadFile):
upload.delete()
......@@ -158,6 +168,12 @@ class TestUploadFile:
with upload_same_file:
assert hash == upload_same_file.hash()
def test_siblings(self, upload: UploadFile, no_warn):
with upload:
siblings = list(upload.get_siblings('examples_template/template.json'))
assert len(siblings) == 4
assert all(sibling.endswith('.aux') for sibling in siblings)
class TestLocalUploadFile(TestUploadFile):
@pytest.fixture()
......@@ -178,9 +194,8 @@ class TestLocalUploadFile(TestUploadFile):
@pytest.fixture(scope='function')
def archive_log(clear_files, archive_config):
archive_log = ArchiveLogFile('__test_upload_hash/__test_calc_hash')
f = archive_log.open('wt')
f.write('This is a test')
f.close()
with archive_log.open('wt') as f:
f.write('This is a test')
yield archive_log
......@@ -189,4 +204,5 @@ class TestArchiveLogFile:
def test_archive_log_file(self, archive_log):
assert archive_log.exists()
assert 'test' in archive_log.open('rt').read()
with archive_log.open('rt') as f:
assert 'test' in f.read()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment