Commit 48cf90b6 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added untested and unused impementation for transparent open of compressed raw files.

parent bf23dc76
......@@ -50,6 +50,7 @@ import hashlib
import base64
import io
import gzip
import bz2
from nomad import config, utils
......@@ -272,12 +273,14 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
""" The calc metadata for this upload. """
raise NotImplementedError
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
def raw_file(self, file_path: str, *args, compressed: bool = True, **kwargs) -> IO:
"""
Opens a raw file and returns a file-like objects. Additional args, kwargs are
Opens a raw file and returns a file-like object. Additional args, kwargs are
delegated to the respective `open` call.
Arguments:
file_path: The path to the file relative to the upload.
compressed: If True will open the raw file as it is, even if it is compressed.
With False, it will transparently open to read the decompressed file contents.
Raises:
KeyError: If the file does not exist.
Restricted: If the file is restricted and upload access evaluated to False.
......@@ -342,13 +345,26 @@ class StagingUploadFiles(UploadFiles):
raise Restricted
return self._metadata
def _file(self, path_object: PathObject, *args, **kwargs) -> IO:
_compressions = {
b'\x1f\x8b\x08': gzip.open,
b'\x42\x5a\x68': bz2.open
}
def _file(self, path_object: PathObject, *args, compressed: bool = True, **kwargs) -> IO:
try:
return open(path_object.os_path, *args, **kwargs)
open_compressed = None
if not compressed:
with open(path_object.os_path, 'rb') as f:
open_compressed = StagingUploadFiles._compressions[f.read(3)]
if open_compressed is not None and not compressed:
return open_compressed(path_object.os_path, *args, **kwargs)
else:
return open(path_object.os_path, *args, **kwargs)
except FileNotFoundError:
raise KeyError()
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
def raw_file(self, file_path: str, *args, compressed: bool = False, **kwargs) -> IO:
if not self._is_authorized():
raise Restricted
return self._file(self.raw_file_object(file_path), *args, **kwargs)
......@@ -650,7 +666,8 @@ class PublicUploadFiles(UploadFiles):
raise KeyError()
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
def raw_file(self, file_path: str, *args, compressed: bool = True, **kwargs) -> IO:
assert compressed, 'not supported'
return self._file('raw', 'bagit', 'data/' + file_path, *args, *kwargs)
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
......
......@@ -111,6 +111,24 @@ def celery_inspect(purged_app):
yield purged_app.control.inspect()
# The follwing workarround seems unnecessary. I'll leave it here for an incubation period
# @pytest.fixture()
# def patched_celery(monkeypatch):
# # There is a bug in celery, which prevents to use the celery_worker for multiple
# # tests: https://github.com/celery/celery/issues/4088
# # The bug has a fix from Aug 2018, but it is not yet released (TODO).
# # We monkeypatch a similar solution here.
# def add_reader(self, fds, callback, *args):
# from kombu.utils.eventio import ERR, READ, poll
# if self.poller is None:
# self.poller = poll()
# return self.add(fds, callback, READ | ERR, args)
# monkeypatch.setattr('kombu.asynchronous.hub.Hub.add_reader', add_reader)
# yield
# It might be necessary to make this a function scoped fixture, if old tasks keep
# 'bleeding' into successive tests.
@pytest.fixture(scope='session')
def worker(celery_session_worker, celery_inspect):
""" Provides a clean worker (no old tasks) per function. Waits for all tasks to be completed. """
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment