Commit ace5709d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Necessary changes for bagit integration.

parent 35886ec4
......@@ -25,7 +25,7 @@ from zipfile import ZIP_DEFLATED
from contextlib import contextmanager
from nomad import config, infrastructure
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, RepositoryFile
from nomad.utils import get_logger
from nomad.processing import Upload, NotAllowedDuringProcessing
from nomad.repo import RepoCalc
......@@ -776,12 +776,12 @@ def get_raw(upload_hash, calc_hash):
except Exception as e:
abort(500, message=str(e))
repository_file = RepositoryFile(upload_hash)
@contextmanager
def raw_file(filename):
try:
upload = Upload.get(repo.upload_id)
upload_file = UploadFile(upload.upload_id, local_path=upload.local_path)
the_file = upload_file.get_file(filename)
the_file = repository_file.get_file(filename)
with the_file.open() as f:
yield f
except KeyError:
......
......@@ -141,7 +141,7 @@ class ZippedFile(File):
class Objects:
@classmethod
def _os_path(cls, bucket: str, name: str, ext: str) -> str:
def _os_path(cls, bucket: str, name: str, ext: str = None) -> str:
if ext is not None and ext != '':
file_name = ".".join([name, ext])
elif name is None or name == '':
......@@ -214,11 +214,12 @@ class UploadFile(ObjectFile):
"""
Instances of ``UploadFile`` represent an uploaded file in the *'object storage'*.
Currently only ``.zip`` files are supported.
Currently only user ``.zip`` files are supported.
Uploads can be extracted to tmp storage (open/close), the list of files in
the upload is provided, and files can be opened for read. Extracting uploads
is optional, all functions in this module are also available without extracting.
Extracts are automatically bagged with *bagit*.
This class is a context manager, that extracts the file when using a ``with``
statement with instances of this class.
......@@ -228,6 +229,9 @@ class UploadFile(ObjectFile):
in nomad's distributed file system, e.g. for bulk processing of already uploaded
files.
Uploads can be persistet as :class:`ZippedDataContainers` for permanent repository
raw data storage.
Arguments:
upload_id: The upload of this uploaded file.
local_path: Optional override for the path used to store/access the uploaded file.
......@@ -248,8 +252,10 @@ class UploadFile(ObjectFile):
ext='zip',
local_path=local_path)
self.upload_extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id)
self._filelist: List[str] = None
self._extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id)
self._bagged_container: DataContainer = None
if os.path.isdir(self._extract_dir):
self._bagged_container = BaggedDataContainer(self._extract_dir)
def bind_logger(self, logger):
return super().bind_logger(logger).bind(upload_id=self.object_id)
......@@ -268,24 +274,6 @@ class UploadFile(ObjectFile):
raise FileError(msg, e)
return wrapper
@property
def filelist(self) -> List[str]:
@UploadFile.Decorators.handle_errors
def get_filelist(self):
with self._zip() as zip_file:
return [
zip_info.filename for zip_info in zip_file.filelist
if not zip_info.filename.endswith('/')]
if not self._filelist:
self._filelist = get_filelist(self)
return self._filelist
@property
def is_extracted(self) -> bool:
return os.path.exists(self.upload_extract_dir)
@contextmanager
def _zip(self):
assert self.exists(), "Can only access uploaded file if it exists."
......@@ -299,16 +287,29 @@ class UploadFile(ObjectFile):
if zip_file is not None:
zip_file.close()
@property
def filelist(self) -> List[str]:
if self.is_extracted:
return self._bagged_container.manifest
else:
with self._zip() as zip_file:
return [
zip_info.filename for zip_info in zip_file.filelist
if not zip_info.filename.endswith('/')]
@property
def is_extracted(self) -> bool:
return self._bagged_container is not None
@Decorators.handle_errors
def hash(self) -> str:
""" Calculates the first 28 bytes of a websafe base64 encoded SHA512 of the upload. """
with self.open('rb') as f:
return utils.hash(f)
def upload_hash(self) -> str:
assert self.is_extracted
return self._bagged_container.hash
@Decorators.handle_errors
def extract(self) -> None:
"""
'Opens' the upload. This means the upload files get extracted to tmp.
'Opens' the upload. This means the upload files get extracted and bagged to tmp.
Raises:
UploadFileError: If some IO went wrong.
......@@ -317,10 +318,24 @@ class UploadFile(ObjectFile):
os.makedirs(os.path.join(config.fs.tmp, 'uploads_extracted'), exist_ok=True)
with self._zip() as zip_file:
zip_file.extractall(self.upload_extract_dir)
zip_file.extractall(self._extract_dir)
self.logger.debug('extracted uploaded file')
self._bagged_container = BaggedDataContainer.create(self._extract_dir)
self.logger.debug('bagged uploaded file')
def persist(self, object_id: str = None):
"""
Persists the extracted and bagged upload to the repository raw data bucket.
"""
assert self.is_extracted
if object_id is None:
object_id = self.upload_hash()
return ZippedDataContainer.create(
self._extract_dir, Objects._os_path(config.files.repository_bucket, object_id))
@Decorators.handle_errors
def remove_extract(self) -> None:
"""
......@@ -331,7 +346,7 @@ class UploadFile(ObjectFile):
KeyError: If the upload does not exist.
"""
try:
shutil.rmtree(self.upload_extract_dir)
shutil.rmtree(self._extract_dir)
except FileNotFoundError:
raise KeyError()
......@@ -350,10 +365,8 @@ class UploadFile(ObjectFile):
Only works on extracted uploads. The given filename must be one of the
name in ``self.filelist``.
"""
if self.is_extracted:
return File(os.path.join(self.upload_extract_dir, filename))
else:
return ZippedFile(self.os_path, filename)
assert self.is_extracted
return self._bagged_container.get_file(filename)
@property
def is_valid(self):
......@@ -377,6 +390,23 @@ class UploadFile(ObjectFile):
return self.get_file(sibling_name)
class RepositoryFile(ObjectFile):
"""
Represents a repository file. A repository file is a persistet bagged upload, incl.
the upload metadata. It is used to serve raw data.
"""
def __init__(self, upload_hash: str) -> None:
super().__init__(
bucket=config.files.repository_bucket,
object_id=upload_hash,
ext='zip')
self._zipped_container = ZippedDataContainer(self.os_path)
def get_file(self, path: str) -> ZippedFile:
return self._zipped_container.get_file(path)
class ArchiveFile(ObjectFile):
"""
Represents the archive file for an individual calculation. Allows to write the
......@@ -564,9 +594,12 @@ class ZippedDataContainer(File, DataContainer):
self._metadata = None
@staticmethod
def create(path: str) -> 'ZippedDataContainer':
def create(path: str, target: str = None) -> 'ZippedDataContainer':
if not target:
target = path
assert os.path.isdir(path)
archive_file = shutil.make_archive(path, 'zip', path)
archive_file = shutil.make_archive(target, 'zip', path)
return ZippedDataContainer(archive_file)
@contextmanager
......
......@@ -490,7 +490,7 @@ class Upload(Chord):
return
try:
self.upload_hash = self.upload_file.hash()
self.upload_hash = self.upload_file.upload_hash()
except Exception as e:
self.fail('could not create upload hash', e)
return
......@@ -553,6 +553,11 @@ class Upload(Chord):
def cleanup(self):
try:
upload = UploadFile(self.upload_id, local_path=self.local_path)
with utils.timer(
self.get_logger(), 'upload persisted', step='cleaning',
upload_size=upload.size):
upload.persist()
with utils.timer(
self.get_logger(), 'processing cleaned up', step='cleaning',
upload_size=upload.size):
......
......@@ -201,6 +201,7 @@ class RepoCalc(ElasticDocument):
@staticmethod
def upload_exists(upload_hash):
""" Returns true if there are already calcs from the given upload. """
# TODO this is deprecated and should be varified via repository files
search = Search(using=infrastructure.elastic_client, index=config.elastic.calc_index) \
.query('match', upload_hash=upload_hash) \
.execute()
......
......@@ -10,6 +10,7 @@ from nomad import config, user, infrastructure
def nomad_logging():
config.logstash = config.logstash._replace(enabled=False)
config.console_log_level = logging.CRITICAL
infrastructure.setup_logging()
@pytest.fixture(scope='session')
......
......@@ -26,7 +26,7 @@ import os.path
import json
from nomad import user, utils
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, RepositoryFile
from nomad.processing import Upload, Calc
from nomad.processing.base import task as task_decorator
from nomad.repo import RepoCalc
......@@ -106,6 +106,8 @@ def assert_processing(upload: Upload, mocksearch=None):
assert repo is not None
assert len(repo.get('aux_files')) == 4
assert RepositoryFile(upload.upload_hash).exists()
@pytest.mark.timeout(30)
def test_processing(uploaded_id, worker, mocksearch, no_warn):
......
......@@ -334,6 +334,9 @@ def example_repo_with_files(mockmongo, example_elastic_calc):
upload.user_id = 'does@not.exist'
upload.save()
with UploadFile(upload.upload_id, local_path=upload.local_path) as upload_file:
upload_file.persist(example_elastic_calc.upload_hash)
return example_elastic_calc
......
......@@ -211,8 +211,12 @@ class TestUploadFile:
with upload:
self.assert_upload(upload)
def test_upload_not_extracted(self, upload: UploadFile):
self.assert_upload(upload)
def test_persist(self, upload: UploadFile):
with upload:
zipped_container = upload.persist()
assert zipped_container.exists()
assert zipped_container.os_path.endswith('%s.zip' % upload.upload_hash())
def test_delete_upload(self, upload: UploadFile):
upload.delete()
......@@ -220,12 +224,12 @@ class TestUploadFile:
def test_hash(self, upload: UploadFile, upload_same_file: UploadFile, no_warn):
with upload:
hash = upload.hash()
hash = upload.upload_hash()
assert hash is not None
assert isinstance(hash, str)
with upload_same_file:
assert hash == upload_same_file.hash()
assert hash == upload_same_file.upload_hash()
def test_siblings(self, upload: UploadFile, no_warn):
with upload:
......
......@@ -29,3 +29,7 @@ def test_sanitize_logevent():
assert utils.sanitize_logevent('numbers 2 and 45.2') == 'numbers X and X'
assert utils.sanitize_logevent('list [2, 3.3, 10] and (273.9, .92)') == 'list L and L'
assert utils.sanitize_logevent('mat [2, [3.3, 2], 10]') == 'mat M'
def test_logging():
utils.get_logger(__name__).debug('test')
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment