Commit 8cd885dd authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Replaced minio in files.py.

parent 34436ba8
...@@ -28,10 +28,7 @@ FilesConfig = namedtuple( ...@@ -28,10 +28,7 @@ FilesConfig = namedtuple(
CeleryConfig = namedtuple('Celery', ['broker_url']) CeleryConfig = namedtuple('Celery', ['broker_url'])
""" Used to configure the RabbitMQ and Redis backends for celery. """ """ Used to configure the RabbitMQ and Redis backends for celery. """
MinioConfig = namedtuple('Minio', ['host', 'port', 'accesskey', 'secret']) FSConfig = namedtuple('FSConfig', ['tmp', 'objects'])
""" Used to configure the minio object storage API. """
FSConfig = namedtuple('FSConfig', ['tmp'])
""" Used to configure file stystem access. """ """ Used to configure file stystem access. """
ElasticConfig = namedtuple('ElasticConfig', ['host', 'calc_index']) ElasticConfig = namedtuple('ElasticConfig', ['host', 'calc_index'])
...@@ -43,7 +40,7 @@ MongoConfig = namedtuple('MongoConfig', ['host', 'port', 'users_db']) ...@@ -43,7 +40,7 @@ MongoConfig = namedtuple('MongoConfig', ['host', 'port', 'users_db'])
LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port', 'level']) LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port', 'level'])
""" Used to configure and enable/disable the ELK based centralized logging. """ """ Used to configure and enable/disable the ELK based centralized logging. """
NomadServicesConfig = namedtuple('NomadServicesConfig', ['api_base_path', 'objects_host', 'objects_port', 'objects_base_path', 'api_secret']) NomadServicesConfig = namedtuple('NomadServicesConfig', ['api_base_path', 'api_secret'])
""" Used to configure nomad services: worker, handler, api """ """ Used to configure nomad services: worker, handler, api """
files = FilesConfig( files = FilesConfig(
...@@ -66,14 +63,9 @@ celery = CeleryConfig( ...@@ -66,14 +63,9 @@ celery = CeleryConfig(
broker_url=rabbit_url broker_url=rabbit_url
) )
minio = MinioConfig(
host=os.environ.get('NOMAD_MINIO_HOST', 'localhost'),
port=int(os.environ.get('NOMAD_MINIO_PORT', '9007')),
accesskey='AKIAIOSFODNN7EXAMPLE',
secret='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
)
fs = FSConfig( fs = FSConfig(
tmp='.volumes/fs' tmp='.volumes/fs/tmp',
objects='.volumes/fs/objects'
) )
elastic = ElasticConfig( elastic = ElasticConfig(
host=os.environ.get('NOMAD_ELASTIC_HOST', 'localhost'), host=os.environ.get('NOMAD_ELASTIC_HOST', 'localhost'),
...@@ -92,8 +84,5 @@ logstash = LogstashConfig( ...@@ -92,8 +84,5 @@ logstash = LogstashConfig(
) )
services = NomadServicesConfig( services = NomadServicesConfig(
api_base_path=os.environ.get('NOMAD_API_BASE_PATH', '/nomad/api'), api_base_path=os.environ.get('NOMAD_API_BASE_PATH', '/nomad/api'),
objects_host=os.environ.get('NOMAD_OBJECTS_HOST', 'localhost'),
objects_port=int(os.environ.get('NOMAD_OBJECTS_PORT', minio.port)),
objects_base_path=os.environ.get('NOMAD_OBJECTS_BASE_PATH', ''),
api_secret='the quick fox jumps over something' api_secret='the quick fox jumps over something'
) )
...@@ -53,111 +53,190 @@ from contextlib import contextmanager ...@@ -53,111 +53,190 @@ from contextlib import contextmanager
import gzip import gzip
import io import io
import time import time
import shutil
import nomad.config as config import nomad.config as config
from nomad.utils import get_logger from nomad.utils import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
_client = None # _client = None
# if _client is None and 'sphinx' not in sys.modules:
# _client = Minio('%s:%s' % (config.minio.host, config.minio.port),
# access_key=config.minio.accesskey,
# secret_key=config.minio.secret,
# secure=False)
# # ensure all neccessary buckets exist
# def ensure_bucket(name):
# try:
# _client.make_bucket(bucket_name=name)
# logger.info('Created uploads bucket', bucket=name)
# except minio.error.BucketAlreadyOwnedByYou:
# pass
# ensure_bucket(config.files.uploads_bucket)
# ensure_bucket(config.files.archive_bucket)
# def get_presigned_upload_url(upload_id: str) -> str:
# """
# Generates a presigned upload URL. Presigned URL allows users (and their client programs)
# to safely *PUT* a single file without further authorization or API to the *uploads* bucket
# using the given ``upload_id``. Example usages for presigned URLs include
# browser based uploads or simple *curl* commands (see also :func:`create_curl_upload_cmd`).
# Arguments:
# upload_id: The upload id for the uploaded file.
# Returns:
# The presigned URL string.
# """
# return _client.presigned_put_object(config.files.uploads_bucket, upload_id)
# def create_curl_upload_cmd(presigned_url: str, file_dummy: str='<ZIPFILE>') -> str:
# """Creates a readymade curl command for uploading.
# Arguments:
# presigned_url: The presigned URL to base the command on.
# Kwargs:
# file_dummy: A placeholder for the file that the user/client has to replace.
# Returns:
# The curl shell command with correct method, url, headers, etc.
# """
# return 'curl "%s" --upload-file %s' % (presigned_url, file_dummy)
# def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]:
# def upload_notifications(events: List[Any]) -> Generator[str, None, None]:
# for event in events:
# for event_record in event['Records']:
# try:
# event_name = event_record['eventName']
# if event_name == 's3:ObjectCreated:Put':
# upload_id = event_record['s3']['object']['key']
# logger.debug('Received bucket upload event', upload_id=upload_id)
# yield upload_id
# break # only one per record, pls
# else:
# logger.debug('Unhanled bucket event', bucket_event_name=event_name)
# except KeyError:
# logger.warning(
# 'Unhandled bucket event due to unexprected event format',
# bucket_event_record=event_record)
# def wrapper(*args, **kwargs) -> None:
# logger.info('Start listening to uploads notifications.')
# _client.remove_all_bucket_notification(config.files.uploads_bucket)
# events = _client.listen_bucket_notification(
# config.files.uploads_bucket,
# events=['s3:ObjectCreated:*'])
# upload_ids = upload_notifications(events)
# for upload_id in upload_ids:
# try:
# func(upload_id)
# except StopIteration:
# # Using StopIteration to allow clients to stop handling of events.
# logger.debug('Handling of upload notifications was stopped via StopIteration.')
# return
# except Exception:
# pass
# return wrapper
class Objects:
"""
Object store like abstraction based on a regular file system.
"""
@classmethod
def _os_path(cls, bucket, name, ext):
if ext is not None and ext != '':
file_name = ".".join([name, ext])
elif name is None or name == '':
file_name = ''
else:
file_name = name
path = file_name.split('/')
path = os.path.join(*([config.fs.objects, bucket] + path))
directory = os.path.dirname(path)
if not os.path.isdir(directory):
os.makedirs(directory)
return path
@classmethod
def open(cls, bucket, name, ext=None, *args, **kwargs):
""" Open an object like you would a file, e.g. with 'rb', etc. """
try:
return open(cls._os_path(bucket, name, ext), *args, **kwargs)
except FileNotFoundError:
raise KeyError()
if _client is None and 'sphinx' not in sys.modules: @classmethod
_client = Minio('%s:%s' % (config.minio.host, config.minio.port), def delete(cls, bucket, name, ext=None):
access_key=config.minio.accesskey, """ Delete a single object. """
secret_key=config.minio.secret, try:
secure=False) os.remove(cls._os_path(bucket, name, ext))
except FileNotFoundError:
raise KeyError()
# ensure all neccessary buckets exist @classmethod
def ensure_bucket(name): def delete_all(cls, bucket, prefix=''):
""" Delete all files with given prefix, prefix must denote a directory. """
try: try:
_client.make_bucket(bucket_name=name) shutil.rmtree(cls._os_path(bucket, prefix, ext=None))
logger.info('Created uploads bucket', bucket=name) except FileNotFoundError:
except minio.error.BucketAlreadyOwnedByYou:
pass pass
ensure_bucket(config.files.uploads_bucket) @classmethod
ensure_bucket(config.files.archive_bucket) def exists(cls, bucket, name, ext=None):
""" Returns True if object exists. """
return os.path.exists(cls._os_path(bucket, name, ext))
def get_presigned_upload_url(upload_id: str) -> str: class File:
""" """ Base class for file objects. """
Generates a presigned upload URL. Presigned URL allows users (and their client programs) def __init__(self, bucket, object_id, ext=None):
to safely *PUT* a single file without further authorization or API to the *uploads* bucket self.bucket = bucket
using the given ``upload_id``. Example usages for presigned URLs include self.object_id = object_id
browser based uploads or simple *curl* commands (see also :func:`create_curl_upload_cmd`). self.ext = ext
Arguments: def open(self, *args, **kwargs):
upload_id: The upload id for the uploaded file. """ Opens the object with he given mode, etc. """
return Objects.open(self.bucket, self.object_id, self.ext, *args, **kwargs)
Returns: def delete(self):
The presigned URL string. """ Deletes the file with the given object id. """
""" try:
return _client.presigned_put_object(config.files.uploads_bucket, upload_id) Objects.delete(self.bucket, self.object_id, self.ext)
except FileNotFoundError:
raise KeyError()
def create_curl_upload_cmd(presigned_url: str, file_dummy: str='<ZIPFILE>') -> str:
"""Creates a readymade curl command for uploading.
Arguments:
presigned_url: The presigned URL to base the command on.
Kwargs: def exists(self):
file_dummy: A placeholder for the file that the user/client has to replace. """ Returns true if object exists. """
return Objects.exists(self.bucket, self.object_id, self.ext)
Returns: @property
The curl shell command with correct method, url, headers, etc. def os_path(self):
""" """ The path of the object in the os filesystem. """
return 'curl "%s" --upload-file %s' % (presigned_url, file_dummy) return Objects._os_path(self.bucket, self.object_id, self.ext)
def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]: class FileError(Exception):
def upload_notifications(events: List[Any]) -> Generator[str, None, None]:
for event in events:
for event_record in event['Records']:
try:
event_name = event_record['eventName']
if event_name == 's3:ObjectCreated:Put':
upload_id = event_record['s3']['object']['key']
logger.debug('Received bucket upload event', upload_id=upload_id)
yield upload_id
break # only one per record, pls
else:
logger.debug('Unhanled bucket event', bucket_event_name=event_name)
except KeyError:
logger.warning(
'Unhandled bucket event due to unexprected event format',
bucket_event_record=event_record)
def wrapper(*args, **kwargs) -> None:
logger.info('Start listening to uploads notifications.')
_client.remove_all_bucket_notification(config.files.uploads_bucket)
events = _client.listen_bucket_notification(
config.files.uploads_bucket,
events=['s3:ObjectCreated:*'])
upload_ids = upload_notifications(events)
for upload_id in upload_ids:
try:
func(upload_id)
except StopIteration:
# Using StopIteration to allow clients to stop handling of events.
logger.debug('Handling of upload notifications was stopped via StopIteration.')
return
except Exception:
pass
return wrapper
class UploadError(Exception):
def __init__(self, msg, cause): def __init__(self, msg, cause):
super().__init__(msg, cause) super().__init__(msg, cause)
class Upload(): class UploadFile(File):
""" """
Instances represent an uploaded file in the object storage. Class supports open/close, Instances represent an uploaded file in the 'object storage'. Class supports open/close,
i.e. extract .zip files, and opening contained files. Some functions are only available i.e. extract .zip files, and opening contained files. Some functions are only available
for open (i.e. tmp. downloaded and extracted uploads) uploads. for open (i.e. tmp. downloaded and extracted uploads) uploads.
...@@ -167,22 +246,18 @@ class Upload(): ...@@ -167,22 +246,18 @@ class Upload():
upload_id: The upload of this uploaded file. upload_id: The upload of this uploaded file.
Attributes: Attributes:
upload_file: The path of the tmp version of this file for an open upload.
upload_extract_dir: The path of the tmp directory with the extracted contents. upload_extract_dir: The path of the tmp directory with the extracted contents.
filelist: A list of filenames relative to the .zipped upload root. filelist: A list of filenames relative to the .zipped upload root.
metadata: The upload object storage metadata.
""" """
def __init__(self, upload_id: str) -> None: def __init__(self, upload_id: str) -> None:
self.upload_id = upload_id super().__init__(
self.upload_file: str = os.path.join(config.fs.tmp, 'uploads', upload_id) bucket=config.files.uploads_bucket,
object_id=upload_id,
ext='zip')
self.upload_extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id) self.upload_extract_dir: str = os.path.join(config.fs.tmp, 'uploads_extracted', upload_id)
self.filelist: List[str] = None self.filelist: List[str] = None
try:
self.metadata = _client.stat_object(config.files.uploads_bucket, upload_id).metadata
except minio.error.NoSuchKey:
raise KeyError(self.upload_id)
# There is not good way to capsule decorators in a class: # There is not good way to capsule decorators in a class:
# https://medium.com/@vadimpushtaev/decorator-inside-python-class-1e74d23107f6 # https://medium.com/@vadimpushtaev/decorator-inside-python-class-1e74d23107f6
class Decorators: class Decorators:
...@@ -194,70 +269,63 @@ class Upload(): ...@@ -194,70 +269,63 @@ class Upload():
except Exception as e: except Exception as e:
msg = 'Could not %s upload %s.' % (decorated.__name__, self.upload_id) msg = 'Could not %s upload %s.' % (decorated.__name__, self.upload_id)
logger.error(msg, exc_info=e) logger.error(msg, exc_info=e)
raise UploadError(msg, e) raise FileError(msg, e)
return wrapper return wrapper
@Decorators.handle_errors @Decorators.handle_errors
def hash(self) -> str: def hash(self) -> str:
""" Calculates the first 28 bytes of a websafe base64 encoded SHA512 of the upload. """ """ Calculates the first 28 bytes of a websafe base64 encoded SHA512 of the upload. """
hash = hashlib.sha512() hash = hashlib.sha512()
with open(self.upload_file, 'rb') as f: with self.open('rb') as f:
for data in iter(lambda: f.read(65536), b''): for data in iter(lambda: f.read(65536), b''):
hash.update(data) hash.update(data)
return base64.b64encode(hash.digest(), altchars=b'-_')[0:28].decode('utf-8') return base64.b64encode(hash.digest(), altchars=b'-_')[0:28].decode('utf-8')
@Decorators.handle_errors @Decorators.handle_errors
def open(self) -> None: def extract(self) -> None:
""" """
Opens the upload. This means the uploaed files gets tmp. downloaded and extracted. 'Opens' the upload. This means the uploaed files get extracted to tmp.
Raises: Raises:
UploadError: If some IO went wrong. UploadFileError: If some IO went wrong.
KeyError: If the upload does not exist. KeyError: If the upload does not exist.
""" """
os.makedirs(os.path.join(config.fs.tmp, 'uploads'), exist_ok=True)
os.makedirs(os.path.join(config.fs.tmp, 'uploads_extracted'), exist_ok=True) os.makedirs(os.path.join(config.fs.tmp, 'uploads_extracted'), exist_ok=True)
try:
_client.fget_object(config.files.uploads_bucket, self.upload_id, self.upload_file)
except minio.error.NoSuchKey:
raise KeyError(self.upload_id)
zipFile = None zipFile = None
try: try:
zipFile = ZipFile(self.upload_file) zipFile = ZipFile(self.os_path)
zipFile.extractall(self.upload_extract_dir) zipFile.extractall(self.upload_extract_dir)
self.filelist = [ self.filelist = [
zipInfo.filename for zipInfo in zipFile.filelist zipInfo.filename for zipInfo in zipFile.filelist
if not zipInfo.filename.endswith('/')] if not zipInfo.filename.endswith('/')]
except BadZipFile as e: except BadZipFile as e:
raise UploadError('Upload is not a zip file', e) raise FileError('Upload is not a zip file', e)
finally: finally:
if zipFile is not None: if zipFile is not None:
zipFile.close() zipFile.close()
@Decorators.handle_errors @Decorators.handle_errors
def close(self) -> None: def remove_extract(self) -> None:
""" """
Closes the upload. This means the tmp. files are deleted. Closes the upload. This means the tmp. files are deleted.
Raises: Raises:
UploadError: If some IO went wrong. UploadFileError: If some IO went wrong.
KeyError: If the upload does not exist. KeyError: If the upload does not exist.
""" """
try: try:
os.remove(self.upload_file)
shutil.rmtree(self.upload_extract_dir) shutil.rmtree(self.upload_extract_dir)
except FileNotFoundError: except FileNotFoundError:
raise KeyError(self.upload_id) raise KeyError()
def __enter__(self): def __enter__(self):
self.open() self.extract()
return self return self
def __exit__(self, exc_type, exc, exc_tb): def __exit__(self, exc_type, exc, exc_tb):
self.close() self.remove_extract()
@Decorators.handle_errors @Decorators.handle_errors
def open_file(self, filename: str, *args, **kwargs) -> IO[Any]: def open_file(self, filename: str, *args, **kwargs) -> IO[Any]:
...@@ -268,96 +336,60 @@ class Upload(): ...@@ -268,96 +336,60 @@ class Upload():
""" Returns the tmp directory relative version of a filename. """ """ Returns the tmp directory relative version of a filename. """
return os.path.join(self.upload_extract_dir, filename) return os.path.join(self.upload_extract_dir, filename)
def delete(self):
""" Delete the file from the store. Must not be open. """ class ArchiveFile(File):
"""
Represents the archive file for an individual calculation. Allows to write the
archive, read the archive, delete the archive.
"""
def __init__(self, archive_id: str) -> None:
super().__init__(
bucket=config.files.archive_bucket,
object_id=archive_id,
ext='json.gz' if config.files.compress_archive else 'json')
@contextmanager
def write_archive_json(self) -> Generator[TextIO, None, None]:
""" Context manager that yiels a file-like to write the archive json. """
if config.files.compress_archive:
binary_out = self.open('wb')
gzip_wrapper = cast(TextIO, gzip.open(binary_out, 'wt'))
out = gzip_wrapper
else:
binary_out = self.open('wb')
text_wrapper = io.TextIOWrapper(binary_out, encoding='utf-8')
out = text_wrapper
try:
yield out
finally:
out.flush()
out.close()
binary_out.close()
@contextmanager
def read_archive_json(self) -> Generator[TextIO, None, None]:
""" Context manager that yiels a file-like to read the archive json. """
try: try:
_client.remove_object(config.files.uploads_bucket, self.upload_id) if config.files.compress_archive:
except minio.error.NoSuchKey: binary_in = self.open(mode='rb')
raise KeyError(self.upload_id) gzip_wrapper = cast(TextIO, gzip.open(binary_in, 'rt'))
in_file = gzip_wrapper
else:
@contextmanager binary_in = self.open(mode='rb')
def write_archive_json(archive_id) -> Generator[TextIO, None, None]: text_wrapper = io.TextIOWrapper(binary_in, encoding='utf-8')
""" Context manager that yiels a file-like to write the archive json. """ in_file = text_wrapper
binary_out = io.BytesIO() except FileNotFoundError:
if config.files.compress_archive: raise KeyError()
gzip_wrapper = cast(TextIO, gzip.open(binary_out, 'wt'))
out = gzip_wrapper try:
metadata = {'Content-Encoding': 'gzip'} yield in_file
else: finally:
text_wrapper = io.TextIOWrapper(binary_out, encoding='utf-8') in_file.close()
out = text_wrapper binary_in.close()
metadata = {}
@staticmethod
try: def delete_archives(upload_hash: str):
yield out """ Delete all archives of one upload with the given hash. """
finally: bucket = config.files.archive_bucket
out.flush() Objects.delete_all(bucket, upload_hash)
# in practice minio fails writing seemingly arbitrarely for various reasons
# a simple retry with a small delay seems to be a pragmatic solution
for _ in range(0, 2):
try:
binary_out.seek(0)
length = len(binary_out.getvalue())
_client.put_object(
config.files.archive_bucket, archive_id, binary_out, length=length,
content_type='application/json',
metadata=metadata)
break
except Exception:
time.sleep(1)
out.close()
binary_out.close()
def archive_url(archive_id) -> str:
""" Returns the file server url for the archive. """
try:
_client.stat_object(config.files.archive_bucket, archive_id)
except minio.error.NoSuchKey:
raise KeyError()
return 'http://%s:%d/%s/%s' % \