Commit d6ace338 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Adopted nomad to route all upload processing to the same worker and using...

Adopted nomad to route all upload processing to the same worker and using local memory as staging fs.
parent 4170c11e
Pipeline #45173 passed with stages
in 26 minutes and 13 seconds
......@@ -285,6 +285,9 @@ def create_authorization_predicate(upload_id, calc_id=None):
if g.user is None:
# guest users don't have authorized access to anything
return False
elif g.user.user_id == 0:
# the admin user does have authorization to access everything
return True
# look in repository
upload = coe_repo.Upload.from_upload_id(upload_id)
......
......@@ -22,12 +22,12 @@ from flask_restplus import Resource, fields, abort
from datetime import datetime
from werkzeug.datastructures import FileStorage
import os.path
import os
import io
from nomad import config
from nomad import config, utils, files
from nomad.processing import Upload, FAILURE
from nomad.processing import ProcessAlreadyRunning
from nomad.files import ArchiveBasedStagingUploadFiles
from .app import api, with_logger, RFC3339DateTime
from .auth import login_really_required
......@@ -86,7 +86,7 @@ upload_model = api.inherit('UploadProcessing', proc_model, {
description='The unique id for the upload.'),
# TODO just removed during migration, where this get particularily large
# 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.'),
'local_path': fields.String,
'upload_path': fields.String(description='The uploaded file on the server'),
'upload_time': RFC3339DateTime(),
})
......@@ -159,20 +159,15 @@ class UploadListResource(Resource):
abort(404, message='The given local_path was not found.')
upload_name = request.args.get('name')
# create upload
upload = Upload.create(
user=g.user,
name=upload_name,
local_path=local_path)
upload_id = utils.create_uuid()
logger = logger.bind(upload_id=upload.upload_id, upload_name=upload_name)
logger = logger.bind(upload_id=upload_id, upload_name=upload_name)
logger.info('upload created', )
try:
if local_path:
# file is already there and does not to be received
upload_files = ArchiveBasedStagingUploadFiles(
upload.upload_id, create=True, local_path=local_path)
upload_path = local_path
elif request.mimetype == 'application/multipart-formdata':
logger.info('receive upload as multipart formdata')
# multipart formdata, e.g. with curl -X put "url" -F file=@local_file
......@@ -180,19 +175,18 @@ class UploadListResource(Resource):
if 'file' in request.files:
abort(400, message='Bad multipart-formdata, there is no file part.')
file = request.files['file']
if upload.name is None or upload.name is '':
upload.name = file.filename
if upload_name is None or upload_name is '':
upload_name = file.filename
upload_files = ArchiveBasedStagingUploadFiles(upload.upload_id, create=True)
file.save(upload_files.upload_file_os_path)
upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
file.save(upload_path)
else:
# simple streaming data in HTTP body, e.g. with curl "url" -T local_file
logger.info('started to receive upload streaming data')
upload_files = ArchiveBasedStagingUploadFiles(upload.upload_id, create=True)
upload_path = files.PathObject(config.fs.tmp, upload_id).os_path
try:
with open(upload_files.upload_file_os_path, 'wb') as f:
with open(upload_path, 'wb') as f:
received_data = 0
received_last = 0
while not request.stream.is_exhausted:
......@@ -209,13 +203,21 @@ class UploadListResource(Resource):
logger.warning('Error on streaming upload', exc_info=e)
abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
except Exception as e:
upload_files.delete()
upload.delete()
if not local_path and os.path.isfile(upload_path):
os.remove(upload_path)
logger.info('Invalid or aborted upload')
raise e
logger.info('received uploaded file')
upload.upload_time = datetime.now()
upload = Upload.create(
upload_id=upload_id,
user=g.user,
name=upload_name,
upload_time=datetime.now(),
upload_path=upload_path,
temporary=local_path != upload_path)
upload.process_upload()
logger.info('initiated processing')
......
......@@ -74,7 +74,9 @@ class CalcProcReproduction:
else:
self.logger.info('Calc already downloaded.')
self.upload_files = ArchiveBasedStagingUploadFiles(upload_id='tmp_%s' % archive_id, local_path=local_path, create=True, is_authorized=lambda: True)
self.upload_files = ArchiveBasedStagingUploadFiles(
upload_id='tmp_%s' % archive_id, upload_path=local_path, create=True,
is_authorized=lambda: True)
def __enter__(self):
# open/extract upload file
......
......@@ -26,14 +26,25 @@ warnings.filterwarnings("ignore", message="numpy.dtype size changed")
warnings.filterwarnings("ignore", message="numpy.ufunc size changed")
FilesConfig = namedtuple(
'FilesConfig', ['uploads_bucket', 'raw_bucket', 'archive_bucket', 'staging_bucket', 'public_bucket'])
""" API independent configuration for the object storage. """
CELERY_WORKER_ROUTING = 'worker'
CELERY_QUEUE_ROUTING = 'queue'
CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout', 'acks_late'])
""" Used to configure the RabbitMQ for celery. """
CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout', 'acks_late', 'routing'])
"""
Used to configure the RabbitMQ for celery.
Arguments:
broker_url: The rabbitmq broker URL
max_memory: Max worker memory
timeout: Task timeout
acks_late: Use celery acks_late if set
routing: Set to ``queue`` for routing via upload, calc queues. Processing tasks for one
upload might be routed to different worker and all staging files must be accessible
for all workers via distributed fs. Set to ``worker`` to route all tasks related
to the same upload to the same worker.
"""
FSConfig = namedtuple('FSConfig', ['tmp', 'objects', 'nomad_tmp'])
FSConfig = namedtuple('FSConfig', ['tmp', 'staging', 'public'])
""" Used to configure file stystem access. """
RepositoryDBConfig = namedtuple('RepositoryDBConfig', ['host', 'port', 'dbname', 'user', 'password'])
......@@ -57,14 +68,6 @@ MailConfig = namedtuple('MailConfig', ['host', 'port', 'user', 'password', 'from
NormalizeConfig = namedtuple('NormalizeConfig', ['all_systems'])
""" Used to configure the normalizers """
files = FilesConfig(
uploads_bucket='uploads',
raw_bucket=os.environ.get('NOMAD_FILES_RAW_BUCKET', 'raw'),
archive_bucket='archive',
staging_bucket='staging',
public_bucket='public'
)
rabbit_host = os.environ.get('NOMAD_RABBITMQ_HOST', 'localhost')
rabbit_port = os.environ.get('NOMAD_RABBITMQ_PORT', None)
rabbit_user = 'rabbitmq'
......@@ -87,13 +90,13 @@ celery = CeleryConfig(
broker_url=rabbit_url,
max_memory=int(os.environ.get('NOMAD_CELERY_MAXMEMORY', 64e6)), # 64 GB
timeout=int(os.environ.get('NOMAD_CELERY_TIMEOUT', 3 * 3600)), # 3h
acks_late=bool(os.environ.get('NOMAD_CELERY_ACKS_LATE', True))
acks_late=bool(os.environ.get('NOMAD_CELERY_ACKS_LATE', True)),
routing=os.environ.get('NOMAD_CELEREY_ROUTING', CELERY_QUEUE_ROUTING)
)
fs = FSConfig(
tmp=os.environ.get('NOMAD_FILES_TMP_DIR', '.volumes/fs/tmp'),
objects=os.environ.get('NOMAD_FILES_OBJECTS_DIR', '.volumes/fs/objects'),
nomad_tmp=os.environ.get('NOMAD_FILES_NOMAD_TMP_DIR', '/nomad/tmp')
staging=os.environ.get('NOMAD_FILES_STAGING_DIR', '.volumes/fs/staging'),
public=os.environ.get('NOMAD_FILES_PUBLIC_DIR', '.volumes/fs/public')
)
elastic = ElasticConfig(
host=os.environ.get('NOMAD_ELASTIC_HOST', 'localhost'),
......
......@@ -27,13 +27,11 @@ almost readonly (beside metadata) storage.
/raw/**
/archive/<calc>.hdf5
/.frozen
/.public
/.restricted
fs/public/<upload>/metadata.json.gz
/raw-public.bagit.zip
/raw-restricted.bagit.zip
/archive-public.hdf5.zip
/archive-restricted.hdf5.zip
/raw-public.plain.zip
/raw-restricted.plain.zip
/archive-public.json.zip
/archive-restricted.json.zip
There is an implicit relationship between files, based on them being in the same
directory. Each directory with at least one *mainfile* is a *calculation directory*
......@@ -59,7 +57,7 @@ import json
import os.path
import os
import shutil
from zipfile import ZipFile, BadZipFile, is_zipfile
from zipfile import ZipFile, BadZipFile
import tarfile
import hashlib
import io
......@@ -84,7 +82,7 @@ class PathObject:
if os_path:
self.os_path = os_path
else:
self.os_path = os.path.join(config.fs.objects, bucket, object_id)
self.os_path = os.path.join(bucket, object_id)
if prefix:
segments = list(os.path.split(self.os_path))
......@@ -292,9 +290,9 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
@staticmethod
def get(upload_id: str, *args, **kwargs) -> 'UploadFiles':
if DirectoryObject(config.files.staging_bucket, upload_id, prefix=True).exists():
if DirectoryObject(config.fs.staging, upload_id, prefix=True).exists():
return StagingUploadFiles(upload_id, *args, **kwargs)
elif DirectoryObject(config.files.public_bucket, upload_id, prefix=True).exists():
elif DirectoryObject(config.fs.public, upload_id, prefix=True).exists():
return PublicUploadFiles(upload_id, *args, **kwargs)
else:
return None
......@@ -353,7 +351,7 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
class StagingUploadFiles(UploadFiles):
def __init__(self, *args, **kwargs) -> None:
super().__init__(config.files.staging_bucket, *args, **kwargs)
super().__init__(config.fs.staging, *args, **kwargs)
self._raw_dir = self.join_dir('raw')
self._archive_dir = self.join_dir('archive')
......@@ -473,10 +471,14 @@ class StagingUploadFiles(UploadFiles):
with open(self._frozen_file.os_path, 'wt') as f:
f.write('frozen')
packed_dir = self.join_dir('.packed', create=True)
# create a target dir in the public bucket
target_dir = DirectoryObject(
config.fs.public, self.upload_id, create=True, prefix=True,
create_prefix=True)
assert target_dir.exists()
def create_zipfile(kind: str, prefix: str, ext: str) -> ZipFile:
file = packed_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext))
file = target_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext))
return ZipFile(file.os_path, mode='w')
# In prior versions we used bagit on raw files. There was not much purpose for
......@@ -513,7 +515,7 @@ class StagingUploadFiles(UploadFiles):
raw_restricted_zip.close()
raw_public_zip.close()
self.logger.debug('zipped raw files')
self.logger.debug('packed raw files')
# zip archives
archive_public_zip = create_zipfile('archive', 'public', self._archive_ext)
......@@ -532,20 +534,14 @@ class StagingUploadFiles(UploadFiles):
archive_restricted_zip.close()
archive_public_zip.close()
self.logger.debug('zipped archives')
self.logger.debug('packed archives')
# pack metadata
packed_metadata = PublicMetadata(packed_dir.os_path)
packed_metadata = PublicMetadata(target_dir.os_path)
packed_metadata._create(self._metadata)
self.logger.debug('packed metadata')
# move to public bucket
target_dir = DirectoryObject(
config.files.public_bucket, self.upload_id, create=False, prefix=True,
create_prefix=True)
assert not target_dir.exists()
os.rename(packed_dir.os_path, target_dir.os_path)
self.logger.debug('moved to public bucket')
self.logger.debug('packed upload')
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
upload_prefix_len = len(self._raw_dir.os_path) + 1
......@@ -623,39 +619,28 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles):
:class:`StagingUploadFiles` based on a single uploaded archive file (.zip)
Arguments:
local_path: Optional override for the path used to store/access the uploaded file.
upload_path: The path to the uploaded file.
"""
formats = ['zip']
""" A human readable list of supported file formats. """
def __init__(
self, upload_id: str, local_path: str = None, file_name: str = '.upload',
*args, **kwargs) -> None:
self, upload_id: str, upload_path: str, *args, **kwargs) -> None:
super().__init__(upload_id, *args, **kwargs)
self._local_path = local_path
if self._local_path is None:
self._upload_file = self.join_file(file_name)
@property
def upload_file_os_path(self):
if self._local_path is not None:
return self._local_path
else:
return self._upload_file.os_path
self.upload_path = upload_path
@property
def is_valid(self) -> bool:
if not os.path.exists(self.upload_file_os_path):
if self.upload_path is None:
return False
if not os.path.exists(self.upload_path):
return False
elif not os.path.isfile(self.upload_file_os_path):
elif not os.path.isfile(self.upload_path):
return False
else:
return is_zipfile(self.upload_file_os_path)
return True
def extract(self) -> None:
assert next(self.raw_file_manifest(), None) is None, 'can only extract once'
super().add_rawfiles(self.upload_file_os_path, force_archive=True)
super().add_rawfiles(self.upload_path, force_archive=True)
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
assert False, 'do not add_rawfiles to a %s' % self.__class__.__name__
......@@ -663,7 +648,7 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles):
class PublicUploadFiles(UploadFiles):
def __init__(self, *args, **kwargs) -> None:
super().__init__(config.files.public_bucket, *args, **kwargs)
super().__init__(config.fs.public, *args, **kwargs)
self._metadata = PublicMetadata(self.os_path)
......
......@@ -246,7 +246,8 @@ def reset(repo_content_only: bool = False):
logger.error('exception resetting repository db', exc_info=e)
try:
shutil.rmtree(config.fs.objects, ignore_errors=True)
shutil.rmtree(config.fs.staging, ignore_errors=True)
shutil.rmtree(config.fs.public, ignore_errors=True)
shutil.rmtree(config.fs.tmp, ignore_errors=True)
logger.info('files resetted')
except Exception as e:
......@@ -290,7 +291,8 @@ def remove():
logger.info('reset files')
try:
shutil.rmtree(config.fs.objects, ignore_errors=True)
shutil.rmtree(config.fs.staging, ignore_errors=True)
shutil.rmtree(config.fs.public, ignore_errors=True)
shutil.rmtree(config.fs.tmp, ignore_errors=True)
except Exception as e:
logger.error('exception deleting files', exc_info=e)
......
......@@ -129,7 +129,7 @@ class Package(Document):
Creates a zip file for the package in tmp and returns its path and whether it
was created (``True``) or existed before (``False``).
"""
upload_filepath = os.path.join(config.fs.nomad_tmp, '%s.zip' % self.package_id)
upload_filepath = os.path.join(config.fs.tmp, '%s.zip' % self.package_id)
if not os.path.exists(os.path.dirname(upload_filepath)):
os.mkdir(os.path.dirname(upload_filepath))
if not os.path.isfile(upload_filepath):
......
......@@ -18,6 +18,7 @@ import time
from celery import Celery, Task
from celery.worker.request import Request
from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init
from celery.utils import worker_direct
from billiard.exceptions import WorkerLostError
from mongoengine import Document, StringField, ListField, DateTimeField, ValidationError
from mongoengine.connection import MongoEngineConnectionError
......@@ -50,6 +51,8 @@ app = Celery('nomad.processing', broker=config.celery.broker_url)
app.conf.update(worker_hijack_root_logger=False)
app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
app.conf.update(task_time_limit=config.celery.timeout)
if config.celery.routing == config.CELERY_WORKER_ROUTING:
app.conf.update(worker_direct=True)
CREATED = 'CREATED'
PENDING = 'PENDING'
......@@ -136,7 +139,8 @@ class Proc(Document, metaclass=ProcMetaclass):
current_process = StringField(default=None)
process_status = StringField(default=None)
# _celery_task_id = StringField(default=None)
worker_hostname = StringField(default=None)
celery_task_id = StringField(default=None)
@property
def tasks_running(self) -> bool:
......@@ -426,6 +430,9 @@ def proc_task(task, cls_name, self_id, func_attr):
logger = self.get_logger()
logger.debug('received process function call')
self.worker_hostname = task.request.hostname
self.celery_task_id = task.request.id
# get the process function
func = getattr(self, func_attr, None)
if func is None:
......@@ -466,7 +473,7 @@ def process(func):
To transfer state, the instance will be saved to the database and loading on
the celery task worker. Process methods can call other (process) functions/methods on
other :class:`Proc` instances. Each :class:`Proc` instance can only run one
asny process at a time.
any process at a time.
"""
def wrapper(self, *args, **kwargs):
assert len(args) == 0 and len(kwargs) == 0, 'process functions must not have arguments'
......@@ -481,10 +488,12 @@ def process(func):
cls_name = self.__class__.__name__
queue = getattr(self.__class__, 'queue', None)
if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
queue = 'celery@%s' % worker_direct(self.worker_hostname).name
logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__)
logger.debug('calling process function', queue=queue)
logger = utils.get_logger(
__name__, cls=cls_name, id=self_id, func=func.__name__, queue=queue)
logger.debug('calling process function')
return proc_task.apply_async(args=[cls_name, self_id, func.__name__], queue=queue)
task = getattr(func, '__task_name', None)
......
......@@ -25,7 +25,7 @@ calculations, and files
"""
from typing import List, Any, ContextManager, Tuple, Generator, Dict
from mongoengine import StringField, DateTimeField, DictField
from mongoengine import StringField, DateTimeField, DictField, BooleanField
import logging
from structlog import wrap_logger
from contextlib import contextmanager
......@@ -93,7 +93,8 @@ class Calc(Proc):
@property
def upload_files(self) -> ArchiveBasedStagingUploadFiles:
if not self._upload_files:
self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.upload.local_path)
self._upload_files = ArchiveBasedStagingUploadFiles(
self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path)
return self._upload_files
def get_logger(self, **kwargs):
......@@ -289,7 +290,8 @@ class Upload(Proc):
Attributes:
name: optional user provided upload name
local_path: optional local path, e.g. for files that are already somewhere on the server
upload_path: the path were the uploaded files was stored
temporary: True if the uploaded file should be removed after extraction
metadata: optional user provided additional meta data
upload_id: the upload id generated by the database
upload_time: the timestamp when the system realised the upload
......@@ -298,9 +300,10 @@ class Upload(Proc):
id_field = 'upload_id'
upload_id = StringField(primary_key=True)
upload_path = StringField(default=None)
temporary = BooleanField(default=False)
name = StringField(default=None)
local_path = StringField(default=None)
metadata = DictField(default=None)
upload_time = DateTimeField()
user_id = StringField(required=True)
......@@ -444,7 +447,8 @@ class Upload(Proc):
@property
def upload_files(self) -> ArchiveBasedStagingUploadFiles:
if not self._upload_files:
self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.local_path)
self._upload_files = ArchiveBasedStagingUploadFiles(
self.upload_id, is_authorized=lambda: True, upload_path=self.upload_path)
return self._upload_files
@task
......@@ -454,13 +458,21 @@ class Upload(Proc):
the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe
repository db, etc.
"""
# extract the uploaded file, this will also create a bagit bag.
# extract the uploaded file
self._upload_files = ArchiveBasedStagingUploadFiles(
upload_id=self.upload_id, is_authorized=lambda: True, create=True,
upload_path=self.upload_path)
logger = self.get_logger()
try:
with utils.timer(
logger, 'upload extracted', step='extracting',
upload_size=self.upload_files.size):
self.upload_files.extract()
if self.temporary:
os.remove(self.upload_path)
self.upload_path = None
except KeyError:
self.fail('processing requested for non existing upload', log_level=logging.ERROR)
return
......@@ -510,6 +522,7 @@ class Upload(Proc):
calc = Calc.create(
calc_id=self.upload_files.calc_id(filename),
mainfile=filename, parser=parser.name,
worker_hostname=self.worker_hostname,
upload_id=self.upload_id)
calc.process_calc()
......
......@@ -23,11 +23,13 @@ spec:
- name: {{ include "nomad.name" . }}-api
image: "{{ .Values.images.nomad.name }}:{{ .Values.images.nomad.tag }}"
volumeMounts:
- mountPath: /app/.volumes/fs
- mountPath: /app/.volumes/fs/public
name: files-volume
- mountPath: /nomad
name: nomad-volume
env:
- name: NOMAD_FILES_TMP_DIR
value: "{{ .Values.volumes.tmp }}"
- name: NOMAD_SERVICE
value: "api"
- name: NOMAD_RELEASE
......@@ -72,6 +74,8 @@ spec:
value: "{{ .Values.postgres.port }}"
- name: NOMAD_COE_REPO_DB_NAME
value: "{{ .Values.dbname }}"
- name: NOMAD_CELERY_ROUTING
value: "worker"
command: ["python", "-m", "gunicorn.app.wsgiapp", "--timeout", "3600", "--log-config", "ops/gunicorn.log.conf", "-w", "{{ .Values.api.worker }}", "-b 0.0.0.0:8000", "nomad.api:app"]
livenessProbe:
httpGet:
......
......@@ -28,11 +28,15 @@ spec:
requests:
memory: "{{ .Values.worker.memrequest }}Gi"
volumeMounts:
- mountPath: /app/.volumes/fs
- mountPath: /app/.volumes/fs/public
name: files-volume
- mountPath: /app/.volumes/fs/staging
name: staging-volume
- mountPath: /nomad
name: nomad-volume
env:
- name: NOMAD_FILES_TMP_DIR
value: "{{ .Values.volumes.tmp }}"
- name: NOMAD_SERVICE
value: "worker"
- name: NOMAD_RELEASE
......@@ -79,6 +83,8 @@ spec:
value: "{{ .Values.mail.from }}"
- name: NOMAD_CONSOLE_LOGLEVEL
value: "ERROR"
- name: NOMAD_CELERY_ROUTING
value: "worker"
command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "celery,calcs,uploads"]
livenessProbe:
exec:
......@@ -110,3 +116,6 @@ spec:
hostPath:
path: {{ .Values.volumes.nomad }}
type: Directory
- name: staging-volume
emptyDir:
medium: 'Memory'
......@@ -43,8 +43,8 @@ api:
worker:
replicas: 2
# request and limit in GB
memrequest: 32
memlimit: 320
memrequest: 64
memlimit: 420
console_loglevel: INFO
logstash_loglevel: INFO
......@@ -115,5 +115,6 @@ mail:
## Everything concerning the data that is used by the service
volumes:
files: /nomad/fairdi/fs
files: /nomad/fairdi/fs/public
tmp: /nomad/fairdi/fs/tmp
nomad: /nomad
......@@ -29,7 +29,7 @@ import datetime
import base64
from bravado.client import SwaggerClient
from nomad import config, infrastructure, files, parsing, processing, coe_repo, api
from nomad import config, infrastructure, parsing, processing, coe_repo, api
from tests import test_parsing, test_normalizing
from tests.processing import test_data as test_processing
......@@ -58,13 +58,14 @@ def nomad_logging():
@pytest.fixture(scope='session', autouse=True)
def raw_files_infra(monkeysession):