diff --git a/nomad/api/auth.py b/nomad/api/auth.py index 197935e355f02c582fd80e9999c12f5117bbff24..165be56f84e3910515dd336ed14c7bb650ac433d 100644 --- a/nomad/api/auth.py +++ b/nomad/api/auth.py @@ -285,6 +285,9 @@ def create_authorization_predicate(upload_id, calc_id=None): if g.user is None: # guest users don't have authorized access to anything return False + elif g.user.user_id == 0: + # the admin user does have authorization to access everything + return True # look in repository upload = coe_repo.Upload.from_upload_id(upload_id) diff --git a/nomad/api/upload.py b/nomad/api/upload.py index c7567f97016873164a6886aae7271848de1b5ba6..ea0c93f1a665851d1e979824041aeb80de98db1d 100644 --- a/nomad/api/upload.py +++ b/nomad/api/upload.py @@ -22,12 +22,12 @@ from flask_restplus import Resource, fields, abort from datetime import datetime from werkzeug.datastructures import FileStorage import os.path +import os import io -from nomad import config +from nomad import config, utils, files from nomad.processing import Upload, FAILURE from nomad.processing import ProcessAlreadyRunning -from nomad.files import ArchiveBasedStagingUploadFiles from .app import api, with_logger, RFC3339DateTime from .auth import login_really_required @@ -86,7 +86,7 @@ upload_model = api.inherit('UploadProcessing', proc_model, { description='The unique id for the upload.'), # TODO just removed during migration, where this get particularily large # 'metadata': fields.Nested(model=upload_metadata_model, description='Additional upload and calculation meta data.'), - 'local_path': fields.String, + 'upload_path': fields.String(description='The uploaded file on the server'), 'upload_time': RFC3339DateTime(), }) @@ -159,20 +159,15 @@ class UploadListResource(Resource): abort(404, message='The given local_path was not found.') upload_name = request.args.get('name') - # create upload - upload = Upload.create( - user=g.user, - name=upload_name, - local_path=local_path) + upload_id = utils.create_uuid() - logger = logger.bind(upload_id=upload.upload_id, upload_name=upload_name) + logger = logger.bind(upload_id=upload_id, upload_name=upload_name) logger.info('upload created', ) try: if local_path: # file is already there and does not to be received - upload_files = ArchiveBasedStagingUploadFiles( - upload.upload_id, create=True, local_path=local_path) + upload_path = local_path elif request.mimetype == 'application/multipart-formdata': logger.info('receive upload as multipart formdata') # multipart formdata, e.g. with curl -X put "url" -F file=@local_file @@ -180,19 +175,18 @@ class UploadListResource(Resource): if 'file' in request.files: abort(400, message='Bad multipart-formdata, there is no file part.') file = request.files['file'] - if upload.name is None or upload.name is '': - upload.name = file.filename + if upload_name is None or upload_name is '': + upload_name = file.filename - upload_files = ArchiveBasedStagingUploadFiles(upload.upload_id, create=True) - - file.save(upload_files.upload_file_os_path) + upload_path = files.PathObject(config.fs.tmp, upload_id).os_path + file.save(upload_path) else: # simple streaming data in HTTP body, e.g. with curl "url" -T local_file logger.info('started to receive upload streaming data') - upload_files = ArchiveBasedStagingUploadFiles(upload.upload_id, create=True) + upload_path = files.PathObject(config.fs.tmp, upload_id).os_path try: - with open(upload_files.upload_file_os_path, 'wb') as f: + with open(upload_path, 'wb') as f: received_data = 0 received_last = 0 while not request.stream.is_exhausted: @@ -209,13 +203,21 @@ class UploadListResource(Resource): logger.warning('Error on streaming upload', exc_info=e) abort(400, message='Some IO went wrong, download probably aborted/disrupted.') except Exception as e: - upload_files.delete() - upload.delete() + if not local_path and os.path.isfile(upload_path): + os.remove(upload_path) logger.info('Invalid or aborted upload') raise e logger.info('received uploaded file') - upload.upload_time = datetime.now() + + upload = Upload.create( + upload_id=upload_id, + user=g.user, + name=upload_name, + upload_time=datetime.now(), + upload_path=upload_path, + temporary=local_path != upload_path) + upload.process_upload() logger.info('initiated processing') diff --git a/nomad/client/local.py b/nomad/client/local.py index 77d9351c282b1f66a0fed2cbe8aa606902e67b5b..1b77da741b28b78c031eab2e38cea33d5b205feb 100644 --- a/nomad/client/local.py +++ b/nomad/client/local.py @@ -74,7 +74,9 @@ class CalcProcReproduction: else: self.logger.info('Calc already downloaded.') - self.upload_files = ArchiveBasedStagingUploadFiles(upload_id='tmp_%s' % archive_id, local_path=local_path, create=True, is_authorized=lambda: True) + self.upload_files = ArchiveBasedStagingUploadFiles( + upload_id='tmp_%s' % archive_id, upload_path=local_path, create=True, + is_authorized=lambda: True) def __enter__(self): # open/extract upload file diff --git a/nomad/config.py b/nomad/config.py index 9617e7469779753cfcf3df8de49b75a481b40fc3..322e8a1d81de3a663b5fd4c940159b62065fdf27 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -26,14 +26,25 @@ warnings.filterwarnings("ignore", message="numpy.dtype size changed") warnings.filterwarnings("ignore", message="numpy.ufunc size changed") -FilesConfig = namedtuple( - 'FilesConfig', ['uploads_bucket', 'raw_bucket', 'archive_bucket', 'staging_bucket', 'public_bucket']) -""" API independent configuration for the object storage. """ +CELERY_WORKER_ROUTING = 'worker' +CELERY_QUEUE_ROUTING = 'queue' -CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout', 'acks_late']) -""" Used to configure the RabbitMQ for celery. """ +CeleryConfig = namedtuple('Celery', ['broker_url', 'max_memory', 'timeout', 'acks_late', 'routing']) +""" +Used to configure the RabbitMQ for celery. + +Arguments: + broker_url: The rabbitmq broker URL + max_memory: Max worker memory + timeout: Task timeout + acks_late: Use celery acks_late if set + routing: Set to ``queue`` for routing via upload, calc queues. Processing tasks for one + upload might be routed to different worker and all staging files must be accessible + for all workers via distributed fs. Set to ``worker`` to route all tasks related + to the same upload to the same worker. +""" -FSConfig = namedtuple('FSConfig', ['tmp', 'objects', 'nomad_tmp']) +FSConfig = namedtuple('FSConfig', ['tmp', 'staging', 'public']) """ Used to configure file stystem access. """ RepositoryDBConfig = namedtuple('RepositoryDBConfig', ['host', 'port', 'dbname', 'user', 'password']) @@ -57,14 +68,6 @@ MailConfig = namedtuple('MailConfig', ['host', 'port', 'user', 'password', 'from NormalizeConfig = namedtuple('NormalizeConfig', ['all_systems']) """ Used to configure the normalizers """ -files = FilesConfig( - uploads_bucket='uploads', - raw_bucket=os.environ.get('NOMAD_FILES_RAW_BUCKET', 'raw'), - archive_bucket='archive', - staging_bucket='staging', - public_bucket='public' -) - rabbit_host = os.environ.get('NOMAD_RABBITMQ_HOST', 'localhost') rabbit_port = os.environ.get('NOMAD_RABBITMQ_PORT', None) rabbit_user = 'rabbitmq' @@ -87,13 +90,13 @@ celery = CeleryConfig( broker_url=rabbit_url, max_memory=int(os.environ.get('NOMAD_CELERY_MAXMEMORY', 64e6)), # 64 GB timeout=int(os.environ.get('NOMAD_CELERY_TIMEOUT', 3 * 3600)), # 3h - acks_late=bool(os.environ.get('NOMAD_CELERY_ACKS_LATE', True)) + acks_late=bool(os.environ.get('NOMAD_CELERY_ACKS_LATE', True)), + routing=os.environ.get('NOMAD_CELEREY_ROUTING', CELERY_QUEUE_ROUTING) ) - fs = FSConfig( tmp=os.environ.get('NOMAD_FILES_TMP_DIR', '.volumes/fs/tmp'), - objects=os.environ.get('NOMAD_FILES_OBJECTS_DIR', '.volumes/fs/objects'), - nomad_tmp=os.environ.get('NOMAD_FILES_NOMAD_TMP_DIR', '/nomad/tmp') + staging=os.environ.get('NOMAD_FILES_STAGING_DIR', '.volumes/fs/staging'), + public=os.environ.get('NOMAD_FILES_PUBLIC_DIR', '.volumes/fs/public') ) elastic = ElasticConfig( host=os.environ.get('NOMAD_ELASTIC_HOST', 'localhost'), diff --git a/nomad/files.py b/nomad/files.py index 5480ffafb193e203f05c45ecceb36200eac162d6..c75b87b67cce26992d9f3cc21ca629cc31700a6f 100644 --- a/nomad/files.py +++ b/nomad/files.py @@ -27,13 +27,11 @@ almost readonly (beside metadata) storage. /raw/** /archive/<calc>.hdf5 /.frozen - /.public - /.restricted fs/public/<upload>/metadata.json.gz - /raw-public.bagit.zip - /raw-restricted.bagit.zip - /archive-public.hdf5.zip - /archive-restricted.hdf5.zip + /raw-public.plain.zip + /raw-restricted.plain.zip + /archive-public.json.zip + /archive-restricted.json.zip There is an implicit relationship between files, based on them being in the same directory. Each directory with at least one *mainfile* is a *calculation directory* @@ -59,7 +57,7 @@ import json import os.path import os import shutil -from zipfile import ZipFile, BadZipFile, is_zipfile +from zipfile import ZipFile, BadZipFile import tarfile import hashlib import io @@ -84,7 +82,7 @@ class PathObject: if os_path: self.os_path = os_path else: - self.os_path = os.path.join(config.fs.objects, bucket, object_id) + self.os_path = os.path.join(bucket, object_id) if prefix: segments = list(os.path.split(self.os_path)) @@ -292,9 +290,9 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta): @staticmethod def get(upload_id: str, *args, **kwargs) -> 'UploadFiles': - if DirectoryObject(config.files.staging_bucket, upload_id, prefix=True).exists(): + if DirectoryObject(config.fs.staging, upload_id, prefix=True).exists(): return StagingUploadFiles(upload_id, *args, **kwargs) - elif DirectoryObject(config.files.public_bucket, upload_id, prefix=True).exists(): + elif DirectoryObject(config.fs.public, upload_id, prefix=True).exists(): return PublicUploadFiles(upload_id, *args, **kwargs) else: return None @@ -353,7 +351,7 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta): class StagingUploadFiles(UploadFiles): def __init__(self, *args, **kwargs) -> None: - super().__init__(config.files.staging_bucket, *args, **kwargs) + super().__init__(config.fs.staging, *args, **kwargs) self._raw_dir = self.join_dir('raw') self._archive_dir = self.join_dir('archive') @@ -473,10 +471,14 @@ class StagingUploadFiles(UploadFiles): with open(self._frozen_file.os_path, 'wt') as f: f.write('frozen') - packed_dir = self.join_dir('.packed', create=True) + # create a target dir in the public bucket + target_dir = DirectoryObject( + config.fs.public, self.upload_id, create=True, prefix=True, + create_prefix=True) + assert target_dir.exists() def create_zipfile(kind: str, prefix: str, ext: str) -> ZipFile: - file = packed_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext)) + file = target_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext)) return ZipFile(file.os_path, mode='w') # In prior versions we used bagit on raw files. There was not much purpose for @@ -513,7 +515,7 @@ class StagingUploadFiles(UploadFiles): raw_restricted_zip.close() raw_public_zip.close() - self.logger.debug('zipped raw files') + self.logger.debug('packed raw files') # zip archives archive_public_zip = create_zipfile('archive', 'public', self._archive_ext) @@ -532,20 +534,14 @@ class StagingUploadFiles(UploadFiles): archive_restricted_zip.close() archive_public_zip.close() - self.logger.debug('zipped archives') + self.logger.debug('packed archives') # pack metadata - packed_metadata = PublicMetadata(packed_dir.os_path) + packed_metadata = PublicMetadata(target_dir.os_path) packed_metadata._create(self._metadata) self.logger.debug('packed metadata') - # move to public bucket - target_dir = DirectoryObject( - config.files.public_bucket, self.upload_id, create=False, prefix=True, - create_prefix=True) - assert not target_dir.exists() - os.rename(packed_dir.os_path, target_dir.os_path) - self.logger.debug('moved to public bucket') + self.logger.debug('packed upload') def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]: upload_prefix_len = len(self._raw_dir.os_path) + 1 @@ -623,39 +619,28 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles): :class:`StagingUploadFiles` based on a single uploaded archive file (.zip) Arguments: - local_path: Optional override for the path used to store/access the uploaded file. + upload_path: The path to the uploaded file. """ - formats = ['zip'] - """ A human readable list of supported file formats. """ - def __init__( - self, upload_id: str, local_path: str = None, file_name: str = '.upload', - *args, **kwargs) -> None: + self, upload_id: str, upload_path: str, *args, **kwargs) -> None: super().__init__(upload_id, *args, **kwargs) - self._local_path = local_path - if self._local_path is None: - self._upload_file = self.join_file(file_name) - - @property - def upload_file_os_path(self): - if self._local_path is not None: - return self._local_path - else: - return self._upload_file.os_path + self.upload_path = upload_path @property def is_valid(self) -> bool: - if not os.path.exists(self.upload_file_os_path): + if self.upload_path is None: + return False + if not os.path.exists(self.upload_path): return False - elif not os.path.isfile(self.upload_file_os_path): + elif not os.path.isfile(self.upload_path): return False else: - return is_zipfile(self.upload_file_os_path) + return True def extract(self) -> None: assert next(self.raw_file_manifest(), None) is None, 'can only extract once' - super().add_rawfiles(self.upload_file_os_path, force_archive=True) + super().add_rawfiles(self.upload_path, force_archive=True) def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None: assert False, 'do not add_rawfiles to a %s' % self.__class__.__name__ @@ -663,7 +648,7 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles): class PublicUploadFiles(UploadFiles): def __init__(self, *args, **kwargs) -> None: - super().__init__(config.files.public_bucket, *args, **kwargs) + super().__init__(config.fs.public, *args, **kwargs) self._metadata = PublicMetadata(self.os_path) diff --git a/nomad/infrastructure.py b/nomad/infrastructure.py index 8d072be9db9289f4f0ef2222f93bf179992a918a..ff261f04cc1cec46e96f26cdde89aa73f8ddca07 100644 --- a/nomad/infrastructure.py +++ b/nomad/infrastructure.py @@ -246,7 +246,8 @@ def reset(repo_content_only: bool = False): logger.error('exception resetting repository db', exc_info=e) try: - shutil.rmtree(config.fs.objects, ignore_errors=True) + shutil.rmtree(config.fs.staging, ignore_errors=True) + shutil.rmtree(config.fs.public, ignore_errors=True) shutil.rmtree(config.fs.tmp, ignore_errors=True) logger.info('files resetted') except Exception as e: @@ -290,7 +291,8 @@ def remove(): logger.info('reset files') try: - shutil.rmtree(config.fs.objects, ignore_errors=True) + shutil.rmtree(config.fs.staging, ignore_errors=True) + shutil.rmtree(config.fs.public, ignore_errors=True) shutil.rmtree(config.fs.tmp, ignore_errors=True) except Exception as e: logger.error('exception deleting files', exc_info=e) diff --git a/nomad/migration.py b/nomad/migration.py index fd97f7916d4b8346ff96bd5e8d664f89612f4f2c..4b18b21d005ee628901209e0ad514625b31c2c17 100644 --- a/nomad/migration.py +++ b/nomad/migration.py @@ -129,7 +129,7 @@ class Package(Document): Creates a zip file for the package in tmp and returns its path and whether it was created (``True``) or existed before (``False``). """ - upload_filepath = os.path.join(config.fs.nomad_tmp, '%s.zip' % self.package_id) + upload_filepath = os.path.join(config.fs.tmp, '%s.zip' % self.package_id) if not os.path.exists(os.path.dirname(upload_filepath)): os.mkdir(os.path.dirname(upload_filepath)) if not os.path.isfile(upload_filepath): diff --git a/nomad/processing/base.py b/nomad/processing/base.py index f0df2604ee90043fbe18646c18f214a575e744bb..8e10b2f96b2b327542770cc47e07e68d2e22b8b4 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -18,6 +18,7 @@ import time from celery import Celery, Task from celery.worker.request import Request from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init +from celery.utils import worker_direct from billiard.exceptions import WorkerLostError from mongoengine import Document, StringField, ListField, DateTimeField, ValidationError from mongoengine.connection import MongoEngineConnectionError @@ -50,6 +51,8 @@ app = Celery('nomad.processing', broker=config.celery.broker_url) app.conf.update(worker_hijack_root_logger=False) app.conf.update(worker_max_memory_per_child=config.celery.max_memory) app.conf.update(task_time_limit=config.celery.timeout) +if config.celery.routing == config.CELERY_WORKER_ROUTING: + app.conf.update(worker_direct=True) CREATED = 'CREATED' PENDING = 'PENDING' @@ -136,7 +139,8 @@ class Proc(Document, metaclass=ProcMetaclass): current_process = StringField(default=None) process_status = StringField(default=None) - # _celery_task_id = StringField(default=None) + worker_hostname = StringField(default=None) + celery_task_id = StringField(default=None) @property def tasks_running(self) -> bool: @@ -426,6 +430,9 @@ def proc_task(task, cls_name, self_id, func_attr): logger = self.get_logger() logger.debug('received process function call') + self.worker_hostname = task.request.hostname + self.celery_task_id = task.request.id + # get the process function func = getattr(self, func_attr, None) if func is None: @@ -466,7 +473,7 @@ def process(func): To transfer state, the instance will be saved to the database and loading on the celery task worker. Process methods can call other (process) functions/methods on other :class:`Proc` instances. Each :class:`Proc` instance can only run one - asny process at a time. + any process at a time. """ def wrapper(self, *args, **kwargs): assert len(args) == 0 and len(kwargs) == 0, 'process functions must not have arguments' @@ -481,10 +488,12 @@ def process(func): cls_name = self.__class__.__name__ queue = getattr(self.__class__, 'queue', None) + if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None: + queue = 'celery@%s' % worker_direct(self.worker_hostname).name + + logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__) + logger.debug('calling process function', queue=queue) - logger = utils.get_logger( - __name__, cls=cls_name, id=self_id, func=func.__name__, queue=queue) - logger.debug('calling process function') return proc_task.apply_async(args=[cls_name, self_id, func.__name__], queue=queue) task = getattr(func, '__task_name', None) diff --git a/nomad/processing/data.py b/nomad/processing/data.py index ca7b6406c04b374424e335e5e2b23ef07d155ab8..181c5e9f7b527a64f94c7daf7d49de6a5a71b7d4 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -25,7 +25,7 @@ calculations, and files """ from typing import List, Any, ContextManager, Tuple, Generator, Dict -from mongoengine import StringField, DateTimeField, DictField +from mongoengine import StringField, DateTimeField, DictField, BooleanField import logging from structlog import wrap_logger from contextlib import contextmanager @@ -93,7 +93,8 @@ class Calc(Proc): @property def upload_files(self) -> ArchiveBasedStagingUploadFiles: if not self._upload_files: - self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.upload.local_path) + self._upload_files = ArchiveBasedStagingUploadFiles( + self.upload_id, is_authorized=lambda: True, upload_path=self.upload.upload_path) return self._upload_files def get_logger(self, **kwargs): @@ -289,7 +290,8 @@ class Upload(Proc): Attributes: name: optional user provided upload name - local_path: optional local path, e.g. for files that are already somewhere on the server + upload_path: the path were the uploaded files was stored + temporary: True if the uploaded file should be removed after extraction metadata: optional user provided additional meta data upload_id: the upload id generated by the database upload_time: the timestamp when the system realised the upload @@ -298,9 +300,10 @@ class Upload(Proc): id_field = 'upload_id' upload_id = StringField(primary_key=True) + upload_path = StringField(default=None) + temporary = BooleanField(default=False) name = StringField(default=None) - local_path = StringField(default=None) metadata = DictField(default=None) upload_time = DateTimeField() user_id = StringField(required=True) @@ -444,7 +447,8 @@ class Upload(Proc): @property def upload_files(self) -> ArchiveBasedStagingUploadFiles: if not self._upload_files: - self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, is_authorized=lambda: True, local_path=self.local_path) + self._upload_files = ArchiveBasedStagingUploadFiles( + self.upload_id, is_authorized=lambda: True, upload_path=self.upload_path) return self._upload_files @task @@ -454,13 +458,21 @@ class Upload(Proc): the uploaded files, computing all keys, create an *upload* entry in the NOMAD-coe repository db, etc. """ - # extract the uploaded file, this will also create a bagit bag. + # extract the uploaded file + self._upload_files = ArchiveBasedStagingUploadFiles( + upload_id=self.upload_id, is_authorized=lambda: True, create=True, + upload_path=self.upload_path) + logger = self.get_logger() try: with utils.timer( logger, 'upload extracted', step='extracting', upload_size=self.upload_files.size): self.upload_files.extract() + + if self.temporary: + os.remove(self.upload_path) + self.upload_path = None except KeyError: self.fail('processing requested for non existing upload', log_level=logging.ERROR) return @@ -510,6 +522,7 @@ class Upload(Proc): calc = Calc.create( calc_id=self.upload_files.calc_id(filename), mainfile=filename, parser=parser.name, + worker_hostname=self.worker_hostname, upload_id=self.upload_id) calc.process_calc() diff --git a/ops/helm/nomad/templates/api-deployment.yaml b/ops/helm/nomad/templates/api-deployment.yaml index e456db4960504dc5adeade488c11d136ff4aa167..dba04d347e76499eb17af72a81ef9865d9229c4b 100644 --- a/ops/helm/nomad/templates/api-deployment.yaml +++ b/ops/helm/nomad/templates/api-deployment.yaml @@ -23,11 +23,13 @@ spec: - name: {{ include "nomad.name" . }}-api image: "{{ .Values.images.nomad.name }}:{{ .Values.images.nomad.tag }}" volumeMounts: - - mountPath: /app/.volumes/fs + - mountPath: /app/.volumes/fs/public name: files-volume - mountPath: /nomad name: nomad-volume env: + - name: NOMAD_FILES_TMP_DIR + value: "{{ .Values.volumes.tmp }}" - name: NOMAD_SERVICE value: "api" - name: NOMAD_RELEASE @@ -72,6 +74,8 @@ spec: value: "{{ .Values.postgres.port }}" - name: NOMAD_COE_REPO_DB_NAME value: "{{ .Values.dbname }}" + - name: NOMAD_CELERY_ROUTING + value: "worker" command: ["python", "-m", "gunicorn.app.wsgiapp", "--timeout", "3600", "--log-config", "ops/gunicorn.log.conf", "-w", "{{ .Values.api.worker }}", "-b 0.0.0.0:8000", "nomad.api:app"] livenessProbe: httpGet: diff --git a/ops/helm/nomad/templates/worker-deployment.yaml b/ops/helm/nomad/templates/worker-deployment.yaml index 38c89fc69a1edc402adc41fb2853da910ea2e261..9e133c39c9f298d292aa958aa07c2f9fc84f9917 100644 --- a/ops/helm/nomad/templates/worker-deployment.yaml +++ b/ops/helm/nomad/templates/worker-deployment.yaml @@ -28,11 +28,15 @@ spec: requests: memory: "{{ .Values.worker.memrequest }}Gi" volumeMounts: - - mountPath: /app/.volumes/fs + - mountPath: /app/.volumes/fs/public name: files-volume + - mountPath: /app/.volumes/fs/staging + name: staging-volume - mountPath: /nomad name: nomad-volume env: + - name: NOMAD_FILES_TMP_DIR + value: "{{ .Values.volumes.tmp }}" - name: NOMAD_SERVICE value: "worker" - name: NOMAD_RELEASE @@ -79,6 +83,8 @@ spec: value: "{{ .Values.mail.from }}" - name: NOMAD_CONSOLE_LOGLEVEL value: "ERROR" + - name: NOMAD_CELERY_ROUTING + value: "worker" command: ["python", "-m", "celery", "worker", "-A", "nomad.processing", "-Q", "celery,calcs,uploads"] livenessProbe: exec: @@ -110,3 +116,6 @@ spec: hostPath: path: {{ .Values.volumes.nomad }} type: Directory + - name: staging-volume + emptyDir: + medium: 'Memory' diff --git a/ops/helm/nomad/values.yaml b/ops/helm/nomad/values.yaml index 0ac9644e8eb72062518f2ee60aa0e5c5001af6f0..c92d322e3e98f50dfd3ed4aef85bdb2c30e736ea 100644 --- a/ops/helm/nomad/values.yaml +++ b/ops/helm/nomad/values.yaml @@ -43,8 +43,8 @@ api: worker: replicas: 2 # request and limit in GB - memrequest: 32 - memlimit: 320 + memrequest: 64 + memlimit: 420 console_loglevel: INFO logstash_loglevel: INFO @@ -115,5 +115,6 @@ mail: ## Everything concerning the data that is used by the service volumes: - files: /nomad/fairdi/fs + files: /nomad/fairdi/fs/public + tmp: /nomad/fairdi/fs/tmp nomad: /nomad diff --git a/tests/conftest.py b/tests/conftest.py index d00fd9c0abe1b608e3ed3e758f95fbc92a404f15..3cbac35fb98641c32f81333120ae33144ff2294f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,7 +29,7 @@ import datetime import base64 from bravado.client import SwaggerClient -from nomad import config, infrastructure, files, parsing, processing, coe_repo, api +from nomad import config, infrastructure, parsing, processing, coe_repo, api from tests import test_parsing, test_normalizing from tests.processing import test_data as test_processing @@ -58,13 +58,14 @@ def nomad_logging(): @pytest.fixture(scope='session', autouse=True) def raw_files_infra(monkeysession): monkeysession.setattr('nomad.config.fs', config.FSConfig( - tmp='.volumes/test_fs/tmp', objects='.volumes/test_fs/objects', nomad_tmp='.volumes/test_fs/nomad_tmp')) + tmp='.volumes/test_fs/tmp', staging='.volumes/test_fs/staging', + public='.volumes/test_fs/public')) @pytest.fixture(scope='function') def raw_files(raw_files_infra): """ Provides cleaned out files directory structure per function. Clears files after test. """ - directories = [config.fs.objects, config.fs.tmp] + directories = [config.fs.staging, config.fs.public, config.fs.tmp] for directory in directories: if not os.path.exists(directory): os.makedirs(directory) @@ -510,21 +511,19 @@ def normalized(parsed: parsing.LocalBackend) -> parsing.LocalBackend: @pytest.fixture(scope='function') -def uploaded(example_upload: str, raw_files) -> str: +def uploaded(example_upload: str, raw_files) -> Tuple[str, str]: """ Provides a uploaded with uploaded example file and gives the upload_id. Clears files after test. """ example_upload_id = os.path.basename(example_upload).replace('.zip', '') - upload_files = files.ArchiveBasedStagingUploadFiles(example_upload_id, create=True) - shutil.copyfile(example_upload, upload_files.upload_file_os_path) - return example_upload_id + return example_upload_id, example_upload @pytest.mark.timeout(10) @pytest.fixture(scope='function') -def processed(uploaded: str, test_user: coe_repo.User, proc_infra) -> processing.Upload: +def processed(uploaded: Tuple[str, str], test_user: coe_repo.User, proc_infra) -> processing.Upload: """ Provides a processed upload. Upload was uploaded with test_user. """ diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index 109d409283dfffae5d39aeddf84cba5aa1c178de..badb9277a6be11270f40bb0062d4fb3890f52e7a 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -12,16 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Generator +from typing import Generator, Tuple import pytest from datetime import datetime -import shutil import os.path import json import re from nomad import utils, infrastructure -from nomad.files import ArchiveBasedStagingUploadFiles, UploadFiles, StagingUploadFiles +from nomad.files import UploadFiles, StagingUploadFiles from nomad.processing import Upload, Calc from nomad.processing.base import task as task_decorator, FAILURE, SUCCESS @@ -39,17 +38,17 @@ def mongo_forall(mongo): @pytest.fixture -def uploaded_id_with_warning(raw_files) -> Generator[str, None, None]: +def uploaded_id_with_warning(raw_files) -> Generator[Tuple[str, str], None, None]: example_file = 'tests/data/proc/examples_with_warning_template.zip' example_upload_id = os.path.basename(example_file).replace('.zip', '') - upload_files = ArchiveBasedStagingUploadFiles(example_upload_id, create=True) - shutil.copyfile(example_file, upload_files.upload_file_os_path) - yield example_upload_id + yield example_upload_id, example_file -def run_processing(uploaded_id: str, test_user) -> Upload: - upload = Upload.create(upload_id=uploaded_id, user=test_user) +def run_processing(uploaded: Tuple[str, str], test_user) -> Upload: + uploaded_id, uploaded_path = uploaded + upload = Upload.create( + upload_id=uploaded_id, user=test_user, upload_path=uploaded_path) upload.upload_time = datetime.now() assert upload.tasks_status == 'RUNNING' @@ -102,16 +101,14 @@ def test_processing(processed, no_warn, mails): def test_processing_with_warning(proc_infra, test_user, with_warn): example_file = 'tests/data/proc/examples_with_warning_template.zip' example_upload_id = os.path.basename(example_file).replace('.zip', '') - upload_files = ArchiveBasedStagingUploadFiles(example_upload_id, create=True) - shutil.copyfile(example_file, upload_files.upload_file_os_path) - upload = run_processing(example_upload_id, test_user) + upload = run_processing((example_upload_id, example_file), test_user) assert_processing(upload) @pytest.mark.timeout(10) def test_process_non_existing(proc_infra, test_user, with_error): - upload = run_processing('__does_not_exist', test_user) + upload = run_processing(('__does_not_exist', '__does_not_exist'), test_user) assert not upload.tasks_running assert upload.current_task == 'extracting' @@ -164,10 +161,8 @@ def test_task_failure(monkeypatch, uploaded, task, proc_infra, test_user, with_e def test_malicious_parser_task_failure(proc_infra, failure, test_user): example_file = 'tests/data/proc/chaos_%s.zip' % failure example_upload_id = os.path.basename(example_file).replace('.zip', '') - upload_files = ArchiveBasedStagingUploadFiles(example_upload_id, create=True) - shutil.copyfile(example_file, upload_files.upload_file_os_path) - upload = run_processing(example_upload_id, test_user) + upload = run_processing((example_upload_id, example_file), test_user) assert not upload.tasks_running assert upload.current_task == 'cleanup' diff --git a/tests/test_api.py b/tests/test_api.py index 1d8a3aa213c6e7ed269ef55f71d5f619ee803059..3d1286b9a2fcf27ca17d52cb5fc332c46860b803 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -328,7 +328,7 @@ class TestUploads: assert rv.status_code == 200 if mode == 'local_path': - upload = self.assert_upload(rv.data, local_path=file, name=name) + upload = self.assert_upload(rv.data, upload_path=file, name=name) else: upload = self.assert_upload(rv.data, name=name) assert upload['tasks_running'] @@ -351,14 +351,6 @@ class TestUploads: yield True monkeypatch.setattr('nomad.processing.data.Upload.cleanup', old_cleanup) - def test_delete_during_processing(self, client, test_user_auth, proc_infra, slow_processing, no_warn): - rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth) - upload = self.assert_upload(rv.data) - assert upload['tasks_running'] - rv = client.delete('/uploads/%s' % upload['upload_id'], headers=test_user_auth) - assert rv.status_code == 400 - self.assert_processing(client, test_user_auth, upload['upload_id']) - def test_delete_unstaged(self, client, test_user_auth, proc_infra, no_warn): rv = client.put('/uploads/?local_path=%s' % example_file, headers=test_user_auth) upload = self.assert_upload(rv.data) diff --git a/tests/test_files.py b/tests/test_files.py index 65e230b08a61adce1734ecbc48f37225212be03a..03551466838ddca7db48b466dc39580bb75fa39c 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -50,11 +50,11 @@ class TestObjects: @pytest.fixture(scope='function') def test_bucket(self): - yield 'test_bucket' + yield config.fs.staging - bucket = os.path.join(config.fs.objects, 'test_bucket') + bucket = os.path.join(config.fs.staging) if os.path.exists(bucket): - shutil.rmtree(os.path.join(config.fs.objects, 'test_bucket')) + shutil.rmtree(os.path.join(config.fs.staging)) def test_file_dir_existing(self, test_bucket): file = PathObject(test_bucket, 'sub/test_id') @@ -243,12 +243,12 @@ class UploadFilesFixtures: @pytest.fixture(scope='function') def test_upload_id(self) -> Generator[str, None, None]: - for bucket in [config.files.staging_bucket, config.files.public_bucket]: + for bucket in [config.fs.staging, config.fs.public]: directory = DirectoryObject(bucket, 'test_upload', prefix=True) if directory.exists(): directory.delete() yield 'test_upload' - for bucket in [config.files.staging_bucket, config.files.public_bucket]: + for bucket in [config.fs.staging, config.fs.public]: directory = DirectoryObject(bucket, 'test_upload', prefix=True) if directory.exists(): directory.delete() @@ -330,7 +330,8 @@ def create_staging_upload(upload_id: str, calc_specs: str) -> StagingUploadFiles directory = os.path.join(str(prefix), 'examples_template') calc, upload_file = generate_example_calc( - prefix, with_mainfile_prefix=is_multi, subdirectory=directory, with_embargo=calc_spec == 'r') + prefix, with_mainfile_prefix=is_multi, subdirectory=directory, + with_embargo=calc_spec == 'r') calc_id = calc['calc_id'] upload.add_rawfiles(upload_file) @@ -395,21 +396,17 @@ class TestStagingUploadFiles(UploadFilesContract): class TestArchiveBasedStagingUploadFiles(UploadFilesFixtures): def test_create(self, test_upload_id): - test_upload = ArchiveBasedStagingUploadFiles(test_upload_id, create=True) - shutil.copy(example_file, test_upload.upload_file_os_path) + test_upload = ArchiveBasedStagingUploadFiles( + test_upload_id, create=True, upload_path=example_file) test_upload.extract() assert sorted(list(test_upload.raw_file_manifest())) == sorted(example_file_contents) - assert os.path.exists(test_upload.upload_file_os_path) - - def test_local_path(self, test_upload_id): - test_upload = ArchiveBasedStagingUploadFiles(test_upload_id, create=True, local_path=example_file) - test_upload.extract() - assert sorted(list(test_upload.raw_file_manifest())) == sorted(example_file_contents) - assert os.path.exists(test_upload.upload_file_os_path) + assert os.path.exists(test_upload.upload_path) def test_invalid(self, test_upload_id): - assert ArchiveBasedStagingUploadFiles(test_upload_id, create=True, local_path=example_file).is_valid - assert not ArchiveBasedStagingUploadFiles(test_upload_id, create=True).is_valid + assert ArchiveBasedStagingUploadFiles( + test_upload_id, create=True, upload_path=example_file).is_valid + assert not ArchiveBasedStagingUploadFiles( + test_upload_id, create=True, upload_path='does not exist').is_valid def create_public_upload(upload_id: str, calc_specs: str, **kwargs):