From 8cc3b00b7171ada4ce2ef1a9b57a029bf24c2731 Mon Sep 17 00:00:00 2001 From: Markus Scheidgen <markus.scheidgen@gmail.com> Date: Wed, 5 Sep 2018 16:46:34 +0200 Subject: [PATCH] Refactored data/proc. --- .vscode/launch.json | 4 +- infrastructure/nomadxt/docker-compose.yml | 1 + nomad/api.py | 28 +- nomad/data.py | 400 ------------------ nomad/processing/__init__.py | 7 +- nomad/processing/app.py | 287 ------------- nomad/{proc.py => processing/base.py} | 85 +++- nomad/processing/data.py | 369 ++++++++++++++++ nomad/processing/handler.py | 43 +- nomad/processing/state.py | 305 ------------- nomad/processing/tasks.py | 206 --------- nomad/search.py | 159 +++++++ nomad/user.py | 33 ++ nomad/utils.py | 5 + tests/conftest.py | 49 +++ .../{test_proc.py => processing/test_base.py} | 45 +- tests/processing/test_data.py | 148 +++++++ tests/test_api.py | 51 ++- tests/test_processing.py | 172 -------- tests/{test_data.py => test_search.py} | 54 +-- 20 files changed, 910 insertions(+), 1541 deletions(-) delete mode 100644 nomad/data.py delete mode 100644 nomad/processing/app.py rename nomad/{proc.py => processing/base.py} (81%) create mode 100644 nomad/processing/data.py delete mode 100644 nomad/processing/state.py delete mode 100644 nomad/processing/tasks.py create mode 100644 nomad/search.py create mode 100644 nomad/user.py create mode 100644 tests/conftest.py rename tests/{test_proc.py => processing/test_base.py} (65%) create mode 100644 tests/processing/test_data.py delete mode 100644 tests/test_processing.py rename tests/{test_data.py => test_search.py} (64%) diff --git a/.vscode/launch.json b/.vscode/launch.json index 51439e8689..29b01f918c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -68,13 +68,13 @@ ] }, { - "name": "Python: tests/test_utils.py", + "name": "Python: tests/processing/test_base.py", "type": "python", "request": "launch", "cwd": "${workspaceFolder}", "program": "${workspaceFolder}/.pyenv/bin/pytest", "args": [ - "-sv", "tests/test_utils.py" + "-sv", "tests/processing/test_base.py::test_fail" ] }, { diff --git a/infrastructure/nomadxt/docker-compose.yml b/infrastructure/nomadxt/docker-compose.yml index 2eceb4d49d..dfa748c326 100644 --- a/infrastructure/nomadxt/docker-compose.yml +++ b/infrastructure/nomadxt/docker-compose.yml @@ -54,6 +54,7 @@ services: - RABBITMQ_DEFAULT_USER=rabbitmq - RABBITMQ_DEFAULT_PASS=rabbitmq - RABBITMQ_DEFAULT_VHOST=/ + - RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit log_levels [{default,debug}] ports: - ${RABBITMQ_HOST_PORT}:5672 volumes: diff --git a/nomad/api.py b/nomad/api.py index 639a726927..17706851fa 100644 --- a/nomad/api.py +++ b/nomad/api.py @@ -4,8 +4,10 @@ from flask_cors import CORS from elasticsearch.exceptions import NotFoundError from nomad import config, files -from nomad.utils import get_logger -from nomad.data import Calc, Upload, User, InvalidId, NotAllowedDuringProcessing, me +from nomad.utils import get_logger, create_uuid +from nomad.processing import Upload, Calc, InvalidId, NotAllowedDuringProcessing +from nomad.search import CalcElasticDocument +from nomad.user import me base_path = config.services.api_base_path @@ -135,7 +137,8 @@ class UploadsRes(Resource): if json_data is None: json_data = {} - return Upload.create(user=me, name=json_data.get('name', None)).json_dict, 200 + upload = Upload.create(upload_id=create_uuid(), user=me, name=json_data.get('name')) + return upload.json_dict, 200 class UploadRes(Resource): @@ -191,17 +194,18 @@ class UploadRes(Resource): :param string upload_id: the id for the upload :resheader Content-Type: application/json :status 200: upload successfully updated and retrieved - :status 400: bad upload id :status 404: upload with id does not exist :returns: the :class:`nomad.data.Upload` instance """ + # TODO calc paging try: - return Upload.get(upload_id=upload_id).json_dict, 200 - except InvalidId: - abort(400, message='%s is not a valid upload id.' % upload_id) + result = Upload.get(upload_id).json_dict except KeyError: abort(404, message='Upload with id %s does not exist.' % upload_id) + result['calcs'] = [calc.json_dict for calc in Calc.objects(upload_id=upload_id)] + return result, 200 + def delete(self, upload_id): """ Deletes an existing upload. Only ``is_ready`` or ``is_stale`` uploads @@ -224,9 +228,9 @@ class UploadRes(Resource): :returns: the :class:`nomad.data.Upload` instance with the latest processing state """ try: - return Upload.get(upload_id=upload_id).delete().json_dict, 200 - except InvalidId: - abort(400, message='%s is not a valid upload id.' % upload_id) + upload = Upload.get(upload_id) + upload.delete() + return upload.json_dict, 200 except KeyError: abort(404, message='Upload with id %s does not exist.' % upload_id) except NotAllowedDuringProcessing: @@ -236,7 +240,7 @@ class UploadRes(Resource): class RepoCalcRes(Resource): def get(self, upload_hash, calc_hash): try: - return Calc.get(id='%s/%s' % (upload_hash, calc_hash)).json_dict, 200 + return CalcElasticDocument.get(id='%s/%s' % (upload_hash, calc_hash)).json_dict, 200 except NotFoundError: abort(404, message='There is no calculation for %s/%s' % (upload_hash, calc_hash)) except Exception as e: @@ -255,7 +259,7 @@ class RepoCalcsRes(Resource): assert per_page > 0 try: - search = Calc.search().query('match_all') + search = CalcElasticDocument.search().query('match_all') search = search[(page - 1) * per_page: page * per_page] return { 'pagination': { diff --git a/nomad/data.py b/nomad/data.py deleted file mode 100644 index b73e647460..0000000000 --- a/nomad/data.py +++ /dev/null @@ -1,400 +0,0 @@ -# Copyright 2018 Markus Scheidgen -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an"AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This module comprises a set of persistent document classes that hold all user related -data. These are information about users, their uploads and datasets, the associated -calculations, and files - - -.. autoclass:: Calc - :members: -.. autoclass:: Upload - :members: -.. autoclass:: DataSet -.. autoclass:: User - -""" - -from typing import List -import sys -from datetime import datetime -import elasticsearch.exceptions -from elasticsearch_dsl import Document as ElasticDocument, Date, Keyword, Search, connections -from mongoengine import \ - Document, EmailField, StringField, BooleanField, DateTimeField, \ - ListField, DictField, ReferenceField, connect -import mongoengine.errors - -from nomad import config, files -from nomad.processing import UploadProc, CalcProc -from nomad.parsing import LocalBackend -from nomad.utils import get_logger, lnr - -logger = get_logger(__name__) - -# ensure elastic and mongo connections -if 'sphinx' not in sys.modules: - client = connections.create_connection(hosts=[config.elastic.host]) - connect(db=config.mongo.users_db, host=config.mongo.host) - -key_mappings = { - 'basis_set_type': 'program_basis_set_type', - 'chemical_composition': 'chemical_composition_bulk_reduced' -} - - -class AlreadyExists(Exception): pass - - -class InvalidId(Exception): pass - - -class NotAllowedDuringProcessing(Exception): pass - - -class Calc(ElasticDocument): - """ - Instances of this class represent calculations. This class manages the elastic - search index entry, files, and archive for the respective calculation. - Instances should be created directly, but via the static :func:`create_from_backend` - function. - - The attribute list, does not include the various repository properties generated - while parsing, including ``program_name``, ``program_version``, etc. - - Attributes: - calc_hash: The hash that identified the calculation within an upload - upload_hash: The hash of the upload - upload_id: The id of the upload used to create this calculation - mainfile: The mainfile (including path in upload) that was used to create this calc - """ - calc_hash = Keyword() - - upload_time = Date() - upload_id = Keyword() - upload_hash = Keyword() - mainfile = Keyword() - - program_name = Keyword() - program_version = Keyword() - - chemical_composition = Keyword() - basis_set_type = Keyword() - atom_species = Keyword() - system_type = Keyword() - crystal_system = Keyword() - space_group_number = Keyword() - configuration_raw_gid = Keyword() - XC_functional_name = Keyword() - - class Index: - name = config.elastic.calc_index - - @property - def archive_id(self) -> str: - """ The unique id for this calculation. """ - return '%s/%s' % (self.upload_hash, self.calc_hash) - - def delete(self): - """ - Delete this calculation and all associated data. This includes all files, - the archive, and this search index entry. - """ - # delete the archive - files.delete_archive(self.archive_id) - - # delete the search index entry - super().delete() - - @staticmethod - def es_search(body): - """ Perform an elasticsearch and not elasticsearch_dsl search on the Calc index. """ - return client.search(index=config.elastic.calc_index, body=body) - - @staticmethod - def delete_all(**kwargs): - for calc in Calc.search().query('match', **kwargs).execute(): - calc.delete() - - @staticmethod - def create_from_backend( - backend: LocalBackend, upload_id: str, upload_hash: str, calc_hash: str, **kwargs) \ - -> 'Calc': - """ - Create a new calculation instance. The data from the given backend - will be used. Additional meta-data can be given as *kwargs*. ``upload_id``, - ``upload_hash``, and ``calc_hash`` are mandatory. - This will create a elastic search entry and store the backend data to the - archive. - - Arguments: - backend: The parsing/normalizing backend that contains the calculation data. - upload_hash: The upload hash of the originating upload. - upload_id: The upload id of the originating upload. - calc_hash: The upload unique hash for this calculation. - kwargs: Additional arguments not stored in the backend. - - Raises: - AlreadyExists: If the calculation already exists in elastic search. We use - the elastic document lock here. The elastic document is ided via the - ``archive_id``. - """ - assert upload_hash is not None and calc_hash is not None and upload_id is not None - kwargs.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id)) - - calc = Calc(meta=dict(id='%s/%s' % (upload_hash, calc_hash))) - - for property in Calc._doc_type.mapping: - property = key_mappings.get(property, property) - - if property in kwargs: - value = kwargs[property] - else: - try: - value = backend.get_value(property, 0) - except KeyError: - logger.warning( - 'Missing property value', property=property, upload_id=upload_id, - upload_hash=upload_hash, calc_hash=calc_hash) - continue - - setattr(calc, property, value) - - # persist to elastic search - try: - calc.save(op_type='create') - except Exception: - raise AlreadyExists('Calculation %s does already exist.' % (calc.archive_id)) - - # persist the archive - with files.write_archive_json(calc.archive_id) as out: - backend.write_json(out, pretty=True) - - return calc - - @property - def json_dict(self): - """ A json serializable dictionary representation. """ - data = self.to_dict() - - upload_time = data.get('upload_time', None) - if upload_time is not None and isinstance(upload_time, datetime): - data['upload_time'] = data['upload_time'].isoformat() - - data['archive_id'] = self.archive_id - - return {key: value for key, value in data.items() if value is not None} - - @staticmethod - def upload_exists(upload_hash): - """ Returns true if there are already calcs from the given upload. """ - search = Search(using=client, index=config.elastic.calc_index) \ - .query('match', upload_hash=upload_hash) \ - .execute() - - return len(search) > 0 - - -if 'sphinx' not in sys.modules: - try: - Calc.init() - except elasticsearch.exceptions.RequestError as e: - if e.status_code == 400 and 'resource_already_exists_exception' in e.error: - pass # happens if two services try this at the same time - else: - raise e - - -class User(Document): - """ Represents users in the database. """ - email = EmailField(primary=True) - name = StringField() - - -class Upload(Document): - """ - Represents uploads in the databases. Provides persistence access to the files storage, - and processing state. - - Attributes: - file_name: Optional user provided upload name - upload_id: The upload id. Generated by the database. - in_staging: True if the upload is still in staging and can be edited by the uploader. - is_private: True if the upload and its derivitaves are only visible to the uploader. - proc: The :class:`nomad.processing.UploadProc` that holds the processing state. - created_time: The timestamp this upload was created. - upload_time: The timestamp when the system realised the upload. - proc_time: The timestamp when the processing realised finished by the system. - """ - - name = StringField(default=None) - - in_staging = BooleanField(default=True) - is_private = BooleanField(default=False) - - presigned_url = StringField() - upload_time = DateTimeField() - create_time = DateTimeField() - - proc_time = DateTimeField() - proc = DictField() - - user = ReferenceField(User, required=True) - - meta = { - 'indexes': [ - 'proc.upload_hash', - 'user' - ] - } - - @staticmethod - def user_uploads(user: User) -> List['Upload']: - """ Returns all uploads for the given user. Currently returns all uploads. """ - return [upload.update_proc() for upload in Upload.objects()] - - @staticmethod - def get(upload_id: str) -> 'Upload': - try: - upload = Upload.objects(id=upload_id).first() - except mongoengine.errors.ValidationError: - raise InvalidId('Invalid upload id') - - if upload is None: - raise KeyError('Upload does not exist') - - upload.update_proc() - return upload - - def logger(self, **kwargs): - return get_logger(__name__, upload_id=self.upload_id, cls='Upload', **kwargs) - - def delete(self) -> 'Upload': - logger = self.logger(action='delete') - - self.update_proc() - if not (self.is_ready or self.is_stale or self._proc.current_task_name == 'uploading'): - raise NotAllowedDuringProcessing() - - with lnr(logger, 'Delete upload file'): - try: - files.Upload(self.upload_id).delete() - except KeyError: - if self._proc.current_task_name == 'uploading': - logger.debug('Upload exist, but file does not exist. It was probably aborted and deleted.') - else: - logger.debug('Upload exist, but uploaded file does not exist.') - - if self._proc.upload_hash is not None: - with lnr(logger, 'Deleting calcs'): - Calc.delete_all(upload_id=self.upload_id) - - with lnr(logger, 'Deleting upload'): - super().delete() - - return self - - @staticmethod - def create(user: User, name: str=None) -> 'Upload': - """ - Creates a new upload for the given user, a user given name is optional. - It will populate the record with a signed url and pending :class:`UploadProc`. - The upload will be already saved to the database. - """ - upload = Upload(user=user, name=name) - upload.save() - - upload.presigned_url = files.get_presigned_upload_url(upload.upload_id) - upload.create_time = datetime.now() - upload.proc = UploadProc(upload.upload_id) - upload.save() - - upload.update_proc() - - return upload - - @property - def upload_id(self) -> str: - return self.id.__str__() - - @property - def is_stale(self) -> bool: - proc = self._proc - if proc.current_task_name == proc.task_names[0] and self.upload_time is None: - return (datetime.now() - self.create_time).days > 1 - else: - return False - - @property - def is_ready(self) -> bool: - return self._proc.status in ['SUCCESS', 'FAILURE'] - - @property - def _proc(self): - """ Cast the internal mongo dict to an actual :class:`UploadProc` instance. """ - # keep the instance cached - if '__proc' not in self.__dict__: - self.__dict__['__proc'] = UploadProc(**self.proc) - - return self.__dict__['__proc'] - - def update_proc(self) -> 'Upload': - """ Updates this instance with information from the celery results backend. """ - if self._proc.update_from_backend(): - self.proc = self._proc - self.save() - - return self - - @property - def json_dict(self) -> dict: - """ A json serializable dictionary representation. """ - data = { - 'name': self.name, - 'upload_id': self.upload_id, - 'presigned_url': files.external_objects_url(self.presigned_url), - 'create_time': self.create_time.isoformat() if self.create_time is not None else None, - 'upload_time': self.upload_time.isoformat() if self.upload_time is not None else None, - 'proc_time': self.proc_time.isoformat() if self.proc_time is not None else None, - 'is_stale': self.is_stale, - 'is_ready': self.is_ready, - 'proc': self._proc - } - return {key: value for key, value in data.items() if value is not None} - - -class DataSet(Document): - name = StringField() - description = StringField() - doi = StringField() - - user = ReferenceField(User) - calcs = ListField(StringField) - - meta = { - 'indexes': [ - 'user', - 'doi', - 'calcs' - ] - } - -# provid a fake user for testing -me = None -if 'sphinx' not in sys.modules: - me = User.objects(email='me@gmail.com').first() - if me is None: - me = User(email='me@gmail.com', name='Me Meyer') - me.save() diff --git a/nomad/processing/__init__.py b/nomad/processing/__init__.py index 27448b50be..87e89555ee 100644 --- a/nomad/processing/__init__.py +++ b/nomad/processing/__init__.py @@ -51,7 +51,6 @@ Initiate processing """ -from nomad.processing.app import app -from nomad.processing import tasks -from nomad.processing.state import ProcPipeline, UploadProc, CalcProc -from nomad.processing.handler import handle_uploads, handle_uploads_thread, start_processing +from nomad.processing.base import app, InvalidId, ProcNotRegistered +from nomad.processing.data import Upload, Calc, NotAllowedDuringProcessing +from nomad.processing.handler import handle_uploads, handle_uploads_thread diff --git a/nomad/processing/app.py b/nomad/processing/app.py deleted file mode 100644 index 5aecae6307..0000000000 --- a/nomad/processing/app.py +++ /dev/null @@ -1,287 +0,0 @@ -# Copyright 2018 Markus Scheidgen -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an"AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import List -from contextlib import contextmanager -import inspect -import logging -import celery -from celery import Task -from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init -from mongoengine import Document, StringField, ListField, DateTimeField, IntField, \ - ReferenceField, connect -from pymongo import ReturnDocument -from datetime import datetime - -from nomad import config, utils -import nomad.patch # pylint: disable=unused-import - - -class Celery(celery.Celery): - - def mongo_connect(self): - return connect(db=config.mongo.users_db, host=config.mongo.host) - - def __init__(self): - if config.logstash.enabled: - def initialize_logstash(logger=None, loglevel=logging.DEBUG, **kwargs): - utils.add_logstash_handler(logger) - return logger - - after_setup_task_logger.connect(initialize_logstash) - after_setup_logger.connect(initialize_logstash) - - worker_process_init.connect(lambda **kwargs: self.mongo_connect()) - - super().__init__( - 'nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url) - - self.add_defaults(dict( - accept_content=['json', 'pickle'], - task_serializer=config.celery.serializer, - result_serializer=config.celery.serializer, - )) - - -app = Celery() - - -PENDING = 'PENDING' -RUNNING = 'RUNNING' -FAILURE = 'FAILURE' -SUCCESS = 'SUCCESS' - - -class InvalidId(Exception): pass - - -class AsyncDocumentNotRegistered(Exception): pass - - -class Proc(Document): - """ - Base class for objects involved in processing and need persistent processing - state. - - Attributes: - current_task: the currently running or last completed task - status: the overall status of the processing - errors: a list of errors that happened during processing. Error fail a processing - run - warnings: a list of warnings that happened during processing. Warnings do not - fail a processing run - create_time: the time of creation (not the start of processing) - proc_time: the time that processing completed (successfully or not) - """ - - meta = { - 'abstract': True, - } - - tasks: List[str] = None - """ the ordered list of tasks that comprise a processing run """ - - current_task = StringField(default=None) - status = StringField(default='CREATED') - - errors = ListField(StringField, default=[]) - warnings = ListField(StringField, default=[]) - - create_time = DateTimeField(required=True) - complete_time = DateTimeField() - - @property - def completed(self) -> bool: - return self.status in [SUCCESS, FAILURE] - - def __init__(self, **kwargs): - assert self.__class__.tasks is not None and len(self.tasks) > 0, \ - """ the class attribute tasks must be overwritten with an acutal list """ - assert 'status' not in kwargs, \ - """ do not set the status manually, its managed """ - - kwargs.setdefault('create_time', datetime.now()) - super().__init__(**kwargs) - self.status = PENDING if self.current_task is None else RUNNING - - @classmethod - def get(cls, obj_id): - try: - obj = cls.objects(id=obj_id).first() - except Exception as e: - raise InvalidId('%s is not a valid id' % obj_id) - - if obj is None: - raise KeyError('%s with id %s does not exist' % (cls.__name__, obj_id)) - - return obj - - def get_id(self): - return self.id.__str__() - - def fail(self, error): - assert not self.completed - - self.status = FAILURE - self.errors = [str(error) for error in errors] - self.complete_time = datetime.now() - - self.save() - - def warning(self, *warnings): - assert not self.completed - - for warning in warnings: - self.warnings.append(str(warning)) - - def continue_with(self, task): - assert task in self.tasks - assert self.tasks.index(task) == self.tasks.index(self.current_task) + 1 - - if self.status == FAILURE: - return False - - if self.status == PENDING: - assert self.current_task is None - assert task == self.tasks[0] - self.status = RUNNING - - self.current_task = task - self.save() - return True - - def complete(self): - if self.status != FAILURE: - assert self.status == RUNNING - self.status = SUCCESS - self.save() - - @property - def json_dict(self) -> dict: - """ A json serializable dictionary representation. """ - data = { - 'tasks': self.tasks, - 'current_task': self.current_task, - 'status': self.status, - 'errors': self.errors, - 'warnings': self.warnings, - 'create_time': self.create_time.isoformat() if self.create_time is not None else None, - 'complete_time': self.complete_time.isoformat() if self.complete_time is not None else None, - } - return {key: value for key, value in data.items() if value is not None} - - -def task(func): - def wrapper(self, *args, **kwargs): - if self.status == 'FAILURE': - return - - self.continue_with(func.__name__) - try: - func(self, *args, **kwargs) - except Exception as e: - self.fail(e) - - return wrapper - - -def process(func): - @app.task(bind=True, name=func.__name__, ignore_results=True) - def task_func(task, cls_name, self_id): - all_cls = AsyncDocument.__subclasses__() - cls = next((cls for cls in all_cls if cls.__name__ == cls_name), None) - if cls is None: - raise AsyncDocumentNotRegistered('Document type %s not registered for async methods' % cls_name) - - self = cls.get(self_id) - - try: - func(self) - except Exception as e: - self.fail(e) - raise e - - def wrapper(self, *args, **kwargs): - assert len(args) == 0 and len(kwargs) == 0 - self.save() - - self_id = self.get_id() - cls_name = self.__class__.__name__ - - return task_func.s(cls_name, self_id).delay() - - return wrapper - - -class Upload(Proc): - - data = StringField(default='Hello, World') - processed_calcs = IntField(default=0) - total_calcs = IntField(default=-1) - - @process - def after_upload(self): - self.extract() - self.parse() - - @process - def after_parse(self): - self.cleanup() - - @task - def extract(self): - print('now extracting') - - @task - def parse(self): - Calc(upload=self).parse() - Calc(upload=self).parse() - self.total_calcs = 2 - self.save() - self.check_calcs_complete(more_calcs=0) - - @task - def cleanup(self): - print('cleanup') - - def check_calcs_complete(self, more_calcs=1): - # use a primitive but atomic pymongo call to increase the number of calcs - updated_raw = Upload._get_collection().find_one_and_update( - {'_id': self.id}, - {'$inc': {'processed_calcs': more_calcs}}, - return_document=ReturnDocument.AFTER) - - updated_total_calcs = updated_raw['total_calcs'] - updated_processed_calcs = updated_raw['processed_calcs'] - - print('%d:%d' % (updated_processed_calcs, updated_total_calcs)) - if updated_processed_calcs == updated_total_calcs and updated_total_calcs != -1: - self.after_parse() - - -class Calc(Proc): - - upload = ReferenceField(Upload, required=True) - - @process - def parse(self): - print('parsee') - self.upload.check_calcs_complete() - - -if __name__ == '__main__': - connect(db=config.mongo.users_db, host=config.mongo.host) - tds = [Upload(), Upload(), Upload()] - for td in tds: - td.after_upload() diff --git a/nomad/proc.py b/nomad/processing/base.py similarity index 81% rename from nomad/proc.py rename to nomad/processing/base.py index a4c451c12c..ae09173d96 100644 --- a/nomad/proc.py +++ b/nomad/processing/base.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, cast +from typing import List, cast, Any import types from contextlib import contextmanager import collections @@ -28,6 +28,7 @@ from mongoengine.connection import MongoEngineConnectionError from mongoengine.base.metaclasses import TopLevelDocumentMetaclass from pymongo import ReturnDocument from datetime import datetime +import sys from nomad import config, utils import nomad.patch # pylint: disable=unused-import @@ -57,6 +58,10 @@ app.add_defaults(dict( result_serializer=config.celery.serializer, )) +# ensure elastic and mongo connections +if 'sphinx' not in sys.modules: + connect(db=config.mongo.users_db, host=config.mongo.host) + PENDING = 'PENDING' RUNNING = 'RUNNING' @@ -67,7 +72,7 @@ SUCCESS = 'SUCCESS' class InvalidId(Exception): pass -class AsyncDocumentNotRegistered(Exception): pass +class ProcNotRegistered(Exception): pass class ProcMetaclass(TopLevelDocumentMetaclass): @@ -113,7 +118,7 @@ class Proc(Document, metaclass=ProcMetaclass): proc_time: the time that processing completed (successfully or not) """ - meta = { + meta: Any = { 'abstract': True, } @@ -134,6 +139,11 @@ class Proc(Document, metaclass=ProcMetaclass): """ Returns True of the process has failed or succeeded. """ return self.status in [SUCCESS, FAILURE] + def get_logger(self): + return utils.get_logger( + __name__, current_task=self.current_task, process=self.__class__.__name__, + status=self.status) + @classmethod def create(cls, **kwargs): """ Factory method that must be used instead of regular constructor. """ @@ -150,37 +160,72 @@ class Proc(Document, metaclass=ProcMetaclass): return self @classmethod - def get(cls, obj_id): - """ Loads the object from the database. """ + def get_by_id(cls, id: str, id_field: str): try: - obj = cls.objects(id=str(obj_id)).first() + obj = cls.objects(**{id_field: id}).first() except ValidationError as e: - raise InvalidId('%s is not a valid id' % obj_id) + raise InvalidId('%s is not a valid id' % id) except MongoEngineConnectionError as e: raise e if obj is None: - raise KeyError('%s with id %s does not exist' % (cls.__name__, obj_id)) + raise KeyError('%s with id %s does not exist' % (cls.__name__, id)) return obj - def fail(self, *errors): + @classmethod + def get(cls, obj_id): + return cls.get_by_id(str(obj_id), 'id') + + @staticmethod + def log(logger, log_level, msg, **kwargs): + # TODO there seems to be a bug in structlog, cannot use logger.log + if log_level == logging.ERROR: + logger.error(msg, **kwargs) + elif log_level == logging.WARNING: + logger.warning(msg, **kwargs) + elif log_level == logging.INFO: + logger.info(msg, **kwargs) + elif log_level == logging.DEBUG: + logger.debug(msg, **kwargs) + else: + logger.critical(msg, **kwargs) + + def fail(self, *errors, log_level=logging.ERROR, **kwargs): """ Allows to fail the process. Takes strings or exceptions as args. """ - assert not self.completed + assert not self.completed, 'Cannot fail a completed process.' + + failed_with_exception = False self.status = FAILURE + + logger = self.get_logger(**kwargs) + for error in errors: + if isinstance(error, Exception): + failed_with_exception = True + Proc.log(logger, log_level, 'task failed with exception', exc_info=error, **kwargs) + self.errors = [str(error) for error in errors] self.complete_time = datetime.now() - print(self.errors) + if not failed_with_exception: + errors_str = "; ".join([str(error) for error in errors]) + Proc.log(logger, log_level, 'task failed', errors=errors_str, **kwargs) + + logger.debug('process failed') + self.save() - def warning(self, *warnings): + def warning(self, *warnings, log_level=logging.warning, **kwargs): """ Allows to save warnings. Takes strings or exceptions as args. """ assert not self.completed + logger = self.get_logger(**kwargs) + for warning in warnings: - self.warnings.append(str(warning)) + warning = str(warning) + self.warnings.append(warning) + logger.log('task with warning', warning=warning, level=log_level) def _continue_with(self, task): tasks = self.__class__.tasks @@ -198,16 +243,21 @@ class Proc(Document, metaclass=ProcMetaclass): assert self.current_task is None assert task == tasks[0] self.status = RUNNING + self.current_task = task + self.get_logger().debug('started process') + else: + self.current_task = task + self.get_logger().debug('successfully completed task') - self.current_task = task self.save() return True def _complete(self): if self.status != FAILURE: - assert self.status == RUNNING + assert self.status == RUNNING, 'Can only complete a running process.' self.status = SUCCESS self.save() + self.get_logger().debug('completed process') def incr_counter(self, field, value=1, other_fields=None): """ @@ -254,6 +304,7 @@ class Proc(Document, metaclass=ProcMetaclass): 'tasks': getattr(self.__class__, 'tasks'), 'current_task': self.current_task, 'status': self.status, + 'completed': self.completed, 'errors': self.errors, 'warnings': self.warnings, 'create_time': self.create_time.isoformat() if self.create_time is not None else None, @@ -281,7 +332,7 @@ def task(func): except Exception as e: self.fail(e) - if self.__class__.tasks[-1] == self.current_task: + if self.__class__.tasks[-1] == self.current_task and not self.completed: self._complete() setattr(wrapper, '__task_name', func.__name__) @@ -305,7 +356,7 @@ def proc_task(task, cls_name, self_id, func_attr): cls = next((cls for cls in all_cls if cls.__name__ == cls_name), None) if cls is None: logger.error('document not a subcass of Proc') - raise AsyncDocumentNotRegistered('document type %s not a subclass of Proc' % cls_name) + raise ProcNotRegistered('document %s not a subclass of Proc' % cls_name) try: self = cls.get(self_id) diff --git a/nomad/processing/data.py b/nomad/processing/data.py new file mode 100644 index 0000000000..832da1b85b --- /dev/null +++ b/nomad/processing/data.py @@ -0,0 +1,369 @@ +# Copyright 2018 Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module comprises a set of persistent document classes that hold all user related +data. These are information about users, their uploads and datasets, the associated +calculations, and files + + +.. autoclass:: Calc + :members: +.. autoclass:: Upload + :members: +.. autoclass:: DataSet +.. autoclass:: User + +""" + +from typing import List, Any +import sys +from datetime import datetime +from mongoengine import \ + Document, EmailField, StringField, BooleanField, DateTimeField, \ + ListField, DictField, ReferenceField, IntField, connect +import mongoengine.errors +import logging + +from nomad import config, files, utils +from nomad.search import CalcElasticDocument +from nomad.user import User, me +from nomad.processing.base import Proc, process, task, PENDING +from nomad.parsing import LocalBackend, parsers, parser_dict +from nomad.normalizing import normalizers +from nomad.utils import get_logger, lnr + + +class NotAllowedDuringProcessing(Exception): pass + + +class Calc(Proc): + """ + Instances of this class represent calculations. This class manages the elastic + search index entry, files, and archive for the respective calculation. + + It also contains the calculations processing and its state. + + The attribute list, does not include the various repository properties generated + while parsing, including ``program_name``, ``program_version``, etc. + + Attributes: + archive_id: the hash based archive id of the calc + parser: the name of the parser used to process this calc + upload_id: the id of the upload used to create this calculation + mainfile: the mainfile (including path in upload) that was used to create this calc + mainfile_tmp_path: path to the mainfile extracted for processing + """ + archive_id = StringField(primary_key=True) + upload_id = StringField() + mainfile = StringField() + parser = StringField() + mainfile_tmp_path = StringField() + + meta: Any = { + 'indices': [ + 'upload_id' + ] + } + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._parser_backend = None + self._upload = None + + @classmethod + def get(cls, id): + return cls.get_by_id(id, 'archive_id') + + def delete(self): + """ + Delete this calculation and all associated data. This includes all files, + the archive, and this search index entry. + """ + # delete the archive + if self.archive_id is not None: + files.delete_archive(self.archive_id) + + # delete the search index entry + elastic_entry = CalcElasticDocument.get(self.archive_id) + if elastic_entry is not None: + elastic_entry.delete() + + # delete this mongo document + super().delete() + + def get_logger(self, **kwargs): + upload_hash, calc_hash = self.archive_id.split('/') + logger = super().get_logger() + logger = logger.bind( + upload_id=self.upload_id, mainfile=self.mainfile, + upload_hash=upload_hash, calc_hash=calc_hash, **kwargs) + return logger + + @property + def json_dict(self): + """ A json serializable dictionary representation. """ + data = { + 'archive_id': self.archive_id, + 'mainfile': self.mainfile, + 'upload_id': self.upload_id, + 'parser': self.parser + } + data.update(super().json_dict) + return {key: value for key, value in data.items() if value is not None} + + @process + def process(self): + self._upload = Upload.get(self.upload_id) + if self._upload is None: + get_logger().error('calculation upload does not exist') + + try: + self.parsing() + self.normalizing() + self.archiving() + finally: + self._upload.calc_proc_completed() + + @task + def parsing(self): + self._parser_backend = parser_dict[self.parser].run(self.mainfile_tmp_path) + if self._parser_backend.status[0] != 'ParseSuccess': + error = self._parser_backend.status[1] + self.fail(error, level=logging.DEBUG) + + @task + def normalizing(self): + for normalizer in normalizers: + normalizer_name = normalizer.__name__ + normalizer(self._parser_backend).normalize() + if self._parser_backend.status[0] != 'ParseSuccess': + error = self._parser_backend.status[1] + self.fail(error, normalizer=normalizer_name, level=logging.WARNING) + return + self.get_logger().debug( + 'completed normalizer successfully', normalizer=normalizer_name) + + @task + def archiving(self): + upload_hash, calc_hash = self.archive_id.split('/') + # persist to elastic search + CalcElasticDocument.create_from_backend( + self._parser_backend, + upload_hash=upload_hash, + calc_hash=calc_hash, + upload_id=self.upload_id, + mainfile=self.mainfile, + upload_time=self._upload.upload_time) + + # persist the archive + with files.write_archive_json(self.archive_id) as out: + self._parser_backend.write_json(out, pretty=True) + + +class Upload(Proc): + """ + Represents uploads in the databases. Provides persistence access to the files storage, + and processing state. + + Attributes: + name: optional user provided upload name + additional_metadata: optional user provided additional meta data + upload_id: the upload id generated by the database + in_staging: true if the upload is still in staging and can be edited by the uploader + is_private: true if the upload and its derivitaves are only visible to the uploader + presigned_url: the presigned url for file upload + upload_time: the timestamp when the system realised the upload + upload_hash: the hash of the uploaded file + """ + id_field = 'upload_id' + + upload_id = StringField(primary_key=True) + + name = StringField(default=None) + additional_metadata = DictField(default=None) + + in_staging = BooleanField(default=True) + is_private = BooleanField(default=False) + + presigned_url = StringField() + upload_time = DateTimeField() + upload_hash = StringField(default=None) + + processed_calcs = IntField(default=0) + total_calcs = IntField(default=-1) + + user = ReferenceField(User, required=True) + + meta: Any = { + 'indexes': [ + 'upload_hash', + 'user' + ] + } + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._upload = None + + @classmethod + def get(cls, id): + return cls.get_by_id(id, 'upload_id') + + @classmethod + def user_uploads(cls, user: User) -> List['Upload']: + """ Returns all uploads for the given user. Currently returns all uploads. """ + return cls.objects() + + def get_logger(self, **kwargs): + logger = super().get_logger() + logger = logger.bind(upload_id=self.upload_id, **kwargs) + return logger + + def delete(self): + logger = self.get_logger(task='delete') + + if not (self.completed or self.is_stale or self.current_task == 'uploading'): + raise NotAllowedDuringProcessing() + + with lnr(logger, 'delete upload file'): + try: + files.Upload(self.upload_id).delete() + except KeyError: + if self.current_task == 'uploading': + logger.debug( + 'Upload exist, but file does not exist. ' + 'It was probably aborted and deleted.') + else: + logger.debug('Upload exist, but uploaded file does not exist.') + + with lnr(logger, 'deleting calcs'): + for calc in Calc.objects(upload_id=self.upload_id): + calc.delete() + + with lnr(logger, 'deleting upload'): + super().delete() + + @classmethod + def create(cls, **kwargs) -> 'Upload': + """ + Creates a new upload for the given user, a user given name is optional. + It will populate the record with a signed url and pending :class:`UploadProc`. + The upload will be already saved to the database. + """ + self = super().create(**kwargs) + self.presigned_url = files.get_presigned_upload_url(self.upload_id) + self._continue_with('uploading') + return self + + @property + def is_stale(self) -> bool: + if self.current_task == 'uploading' and self.upload_time is None: + return (datetime.now() - self.create_time).days > 1 + else: + return False + + @property + def json_dict(self) -> dict: + """ A json serializable dictionary representation. """ + data = { + 'name': self.name, + 'additional_metadata': self.additional_metadata, + 'upload_id': self.upload_id, + 'presigned_url': files.external_objects_url(self.presigned_url), + 'upload_time': self.upload_time.isoformat() if self.upload_time is not None else None, + 'is_stale': self.is_stale, + } + data.update(super().json_dict) + return {key: value for key, value in data.items() if value is not None} + + @process + def process(self): + self.extracting() + self.parse_all() + + @task + def uploading(self): + pass + + @task + def extracting(self): + logger = self.get_logger() + try: + self._upload = files.Upload(self.upload_id) + self._upload.open() + logger.debug('opened upload') + except KeyError as e: + self.fail('process request for non existing upload', level=logging.INFO) + return + + try: + self.upload_hash = self._upload.hash() + except files.UploadError as e: + self.fail('could not create upload hash', e) + return + + if CalcElasticDocument.upload_exists(self.upload_hash): + self.fail('The same file was already uploaded and processed.', level=logging.INFO) + return + + @task + def parse_all(self): + # TODO: deal with multiple possible parser specs + self.total_calcs = 0 + for filename in self._upload.filelist: + for parser in parsers: + try: + if parser.is_mainfile(filename, lambda fn: self._upload.open_file(fn)): + tmp_mainfile = self._upload.get_path(filename) + calc = Calc.create( + archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)), + mainfile=filename, parser=parser.name, + mainfile_tmp_path=tmp_mainfile, + upload_id=self.upload_id) + + calc.process() + self.total_calcs += 1 + except Exception as e: + self.warnings( + 'exception while matching pot. mainfile', + mainfile=filename, exc_info=e) + + if self.total_calcs == 0: + self.cleanup() + + # have to save the total_calcs information + self.save() + + @task + def cleanup(self): + try: + upload = files.Upload(self.upload_id) + except KeyError as e: + upload_proc.fail('Upload does not exist', exc_info=e) + return + + upload.close() + self.get_logger().debug('closed upload') + + def calc_proc_completed(self): + processed_calcs, (total_calcs,) = self.incr_counter( + 'processed_calcs', other_fields=['total_calcs']) + + if processed_calcs == total_calcs: + self.cleanup() + + @property + def calcs(self): + return Calc.objects(upload_id=self.upload_hash) diff --git a/nomad/processing/handler.py b/nomad/processing/handler.py index 2f528c7409..948de664b4 100644 --- a/nomad/processing/handler.py +++ b/nomad/processing/handler.py @@ -17,40 +17,10 @@ from threading import Thread from nomad import files, utils -from nomad.processing.tasks import extracting_task, cleanup_task, parse_all_task -from nomad.processing.state import UploadProc +from nomad.processing.data import Upload from nomad.utils import get_logger, lnr -def start_processing(upload_id, proc: UploadProc=None) -> UploadProc: - """ Starts the processing tasks via celery canvas. """ - - if proc is not None: - proc = UploadProc(**proc) - else: - proc = UploadProc(upload_id) - - # Keep the results of the last task is the workflow. - # The last task is started by another task, therefore it - # is not the end of the main task chain. - finalize = cleanup_task.s() - finalize_result = finalize.freeze() - - # start the main chain - main_chain = extracting_task.s(proc) | parse_all_task.s(finalize) - main_chain_result = main_chain.delay() - - # Create a singular result tree. This might not be the right way to do it. - finalize_result.parent = main_chain_result - - # Keep the result as tuple that also includes all parents, i.e. the whole - # serializable tree - proc.celery_task_ids = finalize_result.as_tuple() - proc.status = 'STARTED' - - return proc - - def handle_uploads(quit=False): """ Starts a daemon that will listen to files for new uploads. For each new @@ -64,13 +34,12 @@ def handle_uploads(quit=False): @files.upload_put_handler def handle_upload_put(received_upload_id: str): - from nomad.data import Upload logger = get_logger(__name__, upload_id=received_upload_id) logger.debug('Initiate upload processing') try: with lnr(logger, 'Could not load'): try: - upload = Upload.get(upload_id=received_upload_id) + upload = Upload.get(received_upload_id) except KeyError as e: logger.error('Upload does not exist') raise e @@ -81,15 +50,11 @@ def handle_uploads(quit=False): with lnr(logger, 'Save upload time'): upload.upload_time = datetime.now() - upload.save() with lnr(logger, 'Start processing'): - proc = start_processing(received_upload_id, proc=upload.proc) - assert proc.is_started - upload.proc = proc - upload.save() + upload.process() - except Exception: + except Exception as e: logger.error('Exception while handling upload put notification.', exc_info=e) if quit: diff --git a/nomad/processing/state.py b/nomad/processing/state.py deleted file mode 100644 index ef60611fa7..0000000000 --- a/nomad/processing/state.py +++ /dev/null @@ -1,305 +0,0 @@ -# Copyright 2018 Markus Scheidgen -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an"AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import List, Any, Union, cast -from celery.result import AsyncResult, result_from_tuple -import itertools -import time - -from nomad import utils -from nomad.normalizing import normalizers -from nomad.processing.app import app - - -class ProcPipeline(utils.DataObject): - """ - Arguments: - task_names: A list of task names in pipeline order. - - Attributes: - current_task_name: Name of the currently running task. - status: Aggregated status for the whole process. Celery status names as convention. - errors: A list of potential error that caused failure. - """ - def __init__(self, task_names: List[str], *args, **kwargs) -> None: - super().__init__(*args) - self.task_names: List[str] = task_names - self.current_task_name: str = None - self.status: str = 'PENDING' - self.errors: List[str] = [] - self.warnings: List[str] = [] - - self.update(kwargs) - - def continue_with(self, task_name: str) -> bool: - """ Upadtes itself with information about the new current task. """ - - assert self.status != 'SUCCESS', 'Cannot continue on completed workflow.' - - if self.status == 'FAILURE': - return False - else: - self.status = 'PROGRESS' - self.current_task_name = task_name - return True - - def success(self) -> None: - self.status = 'SUCCESS' - - @property - def is_started(self) -> bool: - """ True, if the task is started. """ - return self.status is not 'PENDING' - - def fail(self, e: Union[List[str], List[Exception], Exception, str]): - """ Allows tasks to mark this processing as failed. All following task will do nothing. """ - raw_errors: Union[List[str], List[Exception]] = None - if isinstance(e, list): - raw_errors = e - else: - raw_errors = cast(Union[List[str], List[Exception]], [e]) - - for error in raw_errors: - if isinstance(error, str): - self.errors.append(error) - elif isinstance(error, Exception): - self.errors.append(error.__str__()) - else: - assert False, 'Unknown error' - - self.status = 'FAILURE' - - -class CalcProc(ProcPipeline): - """ - Used to represent the state of an calc processing. It is used to provide - information to the user (via api, users) and act as a state object within the - more complex calc processing task. Keep in mind that this task might become several - celery tasks in the future. - - Arguments: - upload_hash: The hash that identifies the upload in the archive. - mainfile: The path to the mainfile in the upload. - parser_name: The name of the parser to use/used. - tmp_mainfile: The full path to the mainfile in the local fs. - - Attributes: - calc_hash: The mainfile hash that identifies the calc in the archive. - celery_task_id: The celery task id for the calc parse celery task. - """ - def __init__(self, mainfile, parser_name, tmp_mainfile, *args, **kwargs): - task_names = [ - [parser_name], - [n.__name__ for n in normalizers], - ['archiving'] - ] - - super().__init__(task_names=list(itertools.chain(*task_names)), *args) - - self.mainfile = mainfile - self.parser_name = parser_name - self.tmp_mainfile = tmp_mainfile - - self.calc_hash = utils.hash(mainfile) - - self.celery_task_id: str = None - - self.update(kwargs) - - def update_from_backend(self) -> bool: - """ Consults results backend and updates. Returns if object might have changed. """ - if self.status in ['FAILURE', 'SUCCESS']: - return False - if self.celery_task_id is None: - return False - - celery_task_result = AsyncResult(self.celery_task_id, app=app) - if celery_task_result.ready(): - self.update(celery_task_result.result) - return True - else: - info = celery_task_result.info - if info is not None: - self.update(info) - return True - - return False - - -class UploadProc(ProcPipeline): - """ - Used to represent the state of an upload processing. It is used to provide - information to the user (via api, users) and act as a state object that is passed - from celery task to celery task. - - It is serializable (JSON, pickle). Iternaly stores - :class:`~celery.results.AsyncResults` instance in serialized *tuple* form to - keep connected to the results backend. - - Warning: - You have to call :func:`forget` eventually to free all resources and the celery - results backend. - - Anyhow, results will be deleted after 1 day, depending on `configuration - <http://docs.celeryproject.org/en/latest/userguide/configuration.html#result-expires>`_. - - Arguments: - upload_id: The id of the uploaded file in the object storage, - see also :mod:`nomad.files`. - task_name: The names of all task in pipeline order. - - Attributes: - upload_hash: The hash of the uploaded file. E.g., used for archive/repo ids. - calc_procs: The state data for all child calc processings. - celery_task_ids: Serialized form of the celery async_results tree for the processing. - """ - def __init__(self, upload_id: str, *args, **kwargs) -> None: - assert upload_id is not None - # TODO there should be a way to read the names from the tasks - # but currently this is not possible due to import cycles - task_names = ['uploading', 'extracting', 'parse_all', 'cleanup'] - super().__init__(task_names, *args) - - self.upload_id = upload_id - self.upload_hash: str = None - - self.calc_procs: List[CalcProc] = [] - - self.celery_task_ids: Any = None - - self.update(kwargs) - - if not self.is_started: - self.continue_with(task_names[0]) - - def update(self, dct): - # Since the data might be updated from deserialized dicts, list and CalcProc - # instances might by dicts, or mongoengines BaseList, BaseDicts. This overwrite - # replaces it. - # TODO there might be a better solution, or the problem solves itself, when - # moving away from mongo. - if 'calc_procs' in dct: - calc_procs = dct['calc_procs'] - for idx, calc_proc_dct in enumerate(calc_procs): - if not isinstance(calc_proc_dct, CalcProc): - calc_procs[idx] = CalcProc(**calc_proc_dct) - if type(calc_procs) != list: - dct['calc_procs'] = list(calc_procs) - - super().update(dct) - - @property - def _celery_task_result(self) -> AsyncResult: - """ - The celery async_result in its regular usable, but not serializable form. - - We use the tuple form to allow serialization (i.e. storage). Keep in mind - that the sheer `task_id` is not enough, because it does not contain - the parent tasks, i.e. result tree. - See `third comment <https://github.com/celery/celery/issues/1328>`_ - for details. - """ - assert self.celery_task_ids is not None - - return result_from_tuple(self.celery_task_ids, app=app) - - def update_from_backend(self) -> bool: - """ - Consults the result backend and updates itself with the available results. - Will only update not completed processings. - - Returns: - If object might have changed. - """ - assert self.is_started, 'Run is not yet started.' - - if self.status in ['SUCCESS', 'FAILURE']: - return False - - if self.celery_task_ids is None: - return False - - celery_task_result = self._celery_task_result - task_index = len(self.task_names) - might_have_changed = False - while celery_task_result is not None: - task_index -= 1 - if celery_task_result.ready(): - result = celery_task_result.result - if isinstance(result, Exception): - self.fail(result) - self.current_task_name = self.task_names[task_index] - logger = utils.get_logger( - __name__, - upload_id=self.upload_id, - current_task_name=self.current_task_name) - logger.error('Celery task raised exception', exc_info=result) - else: - self.update(result) - might_have_changed = True - break - else: - if celery_task_result.status == 'PROGRESS': - # get potential info - result = celery_task_result.info - if result is not None: - self.update(result) - break - - celery_task_result = celery_task_result.parent - - if self.calc_procs is not None: - for calc_proc in self.calc_procs: - if calc_proc.update_from_backend(): - might_have_changed = True - - return might_have_changed - - def forget(self) -> None: - """ Forget the results of a completed run; free all resources in the results backend. """ - # TODO, this is not forgetting the parse task in the parse_all header, right? - assert self.ready(), 'Run is not completed.' - - celery_task_result = self._celery_task_result - while celery_task_result is not None: - celery_task_result.forget() - celery_task_result = celery_task_result.parent - - def ready(self) -> bool: - """ Returns: True if the task has been executed. """ - self.update_from_backend() - return self.status in ['FAILURE', 'SUCCESS'] - - def get(self, interval=1, timeout=None) -> 'UploadProc': - """ - Blocks until the processing has finished. It uses the given interval - to contineously consult the results backend. - - Arguments: - interval: a period to sleep between updates - timeout: a rough timeout to terminated, even unfinished - - Returns: An upadted instance of itself with all the results. - """ - assert self.is_started, 'Run is not yet started.' - - slept = 0 - while not self.ready() and (timeout is None or slept < timeout): - time.sleep(interval) - slept += interval - self.update_from_backend() - - self.update_from_backend() - - return self diff --git a/nomad/processing/tasks.py b/nomad/processing/tasks.py deleted file mode 100644 index 1adced9e44..0000000000 --- a/nomad/processing/tasks.py +++ /dev/null @@ -1,206 +0,0 @@ -# Copyright 2018 Markus Scheidgen -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an"AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import List -from celery import Task, chord, group -from celery.canvas import Signature -from datetime import datetime - -from nomad import files, utils -from nomad.parsing import parsers, parser_dict -from nomad.normalizing import normalizers -import nomad.patch # pylint: disable=unused-import - -from nomad.processing.app import app -from nomad.processing.state import UploadProc, CalcProc - - -def _report_progress(task, dct): - if not task.request.called_directly: - task.update_state(state='PROGRESS', meta=dct) - - -@app.task(bind=True, name='extracting') -def extracting_task(task: Task, proc: UploadProc) -> UploadProc: - logger = utils.get_logger(__name__, task=task.name, upload_id=proc.upload_id) - if not proc.continue_with(task.name): - return proc - - _report_progress(task, proc) - - try: - upload = files.Upload(proc.upload_id) - upload.open() - logger.debug('Opened upload') - except KeyError as e: - logger.info('Process request for non existing upload') - proc.fail(e) - return proc - except files.UploadError as e: - logger.info('Could not open upload', error=str(e)) - proc.fail(e) - return proc - except Exception as e: - logger.error('Unknown exception', exc_info=e) - proc.fail(e) - return proc - - logger.debug('Upload opened') - - try: - proc.upload_hash = upload.hash() - except files.UploadError as e: - logger.error('Could not create upload hash', error=str(e)) - proc.fail(e) - return proc - - from nomad.data import Calc - if Calc.upload_exists(proc.upload_hash): - logger.info('Upload hash doublet') - proc.fail('The same file was already uploaded and processed.') - return proc - - try: - # TODO: deal with multiple possible parser specs - for filename in upload.filelist: - for parser in parsers: - try: - if parser.is_mainfile(filename, lambda fn: upload.open_file(fn)): - tmp_mainfile = upload.get_path(filename) - calc_proc = CalcProc(filename, parser.name, tmp_mainfile) - proc.calc_procs.append(calc_proc) - except Exception as e: - logger.warn('Exception while matching pot. mainfile.', mainfile=filename) - proc.warnings.append('Exception while matching pot. mainfile %s.' % filename) - - except files.UploadError as e: - logger.warn('Could find parse specs in open upload', error=str(e)) - proc.fail(e) - return proc - - return proc - - -@app.task(bind=True, name='cleanup') -def cleanup_task(task, calc_procs: List[CalcProc], upload_proc: UploadProc) -> UploadProc: - logger = utils.get_logger(__name__, task=task.name, upload_id=upload_proc.upload_id) - if upload_proc.continue_with(task.name): - - _report_progress(task, upload_proc) - - try: - upload = files.Upload(upload_proc.upload_id) - except KeyError as e: - logger.warn('Upload does not exist') - upload_proc.fail(e) - return upload_proc - - try: - upload.close() - except Exception as e: - logger.error('Could not close upload', exc_info=e) - upload_proc.fail(e) - return upload_proc - - logger.debug('Closed upload') - upload_proc.success() - - return upload_proc - - -@app.task(bind=True, name='parse_all') -def parse_all_task(task: Task, upload_proc: UploadProc, cleanup: Signature) -> UploadProc: - cleanup = cleanup.clone(args=(upload_proc,)) - if not upload_proc.continue_with(task.name): - chord(group())(cleanup) - return upload_proc - - # prepare the group of parallel calc processings - parses = group(parse_task.s(calc_proc, upload_proc) for calc_proc in upload_proc.calc_procs) - - # save the calc processing task ids to the overall processing - for idx, child in enumerate(parses.freeze().children): - upload_proc.calc_procs[idx].celery_task_id = child.task_id - - # initiate the chord that runs calc processings first, and close_upload afterwards - chord(parses)(cleanup) - - return upload_proc - - -@app.task(bind=True, name='parse') -def parse_task(self, proc: CalcProc, upload_proc: UploadProc) -> CalcProc: - assert upload_proc.upload_hash is not None - - upload_hash, parser, mainfile = upload_proc.upload_hash, proc.parser_name, proc.mainfile - logger = utils.get_logger( - __name__, task=self.name, - upload_id=upload_proc.upload_id, upload_hash=upload_hash, mainfile=mainfile) - - # parsing - proc.continue_with(parser) - try: - parser_backend = parser_dict[parser].run(proc.tmp_mainfile) - if parser_backend.status[0] != 'ParseSuccess': - error = parser_backend.status[1] - logger.debug('Failed parsing', parser=parser, error=error) - proc.fail(error) - return proc - logger.debug('Completed successfully', parser=parser) - except Exception as e: - logger.warn('Exception wile parsing', parser=parser, exc_info=e) - proc.fail(e) - return proc - _report_progress(self, proc) - - # normalization - for normalizer in normalizers: - normalizer_name = normalizer.__name__ - proc.continue_with(normalizer_name) - try: - normalizer(parser_backend).normalize() - if parser_backend.status[0] != 'ParseSuccess': - error = parser_backend.status[1] - logger.info('Failed run of %s: %s' % (normalizer, error)) - proc.fail(error) - return proc - logger.debug('Completed %s successfully' % normalizer) - except Exception as e: - logger.warn('Exception wile normalizing with %s' % normalizer, exc_info=e) - proc.fail(e) - return proc - _report_progress(self, proc) - - # update search - proc.continue_with('archiving') - try: - from nomad.data import Calc - Calc.create_from_backend( - parser_backend, - upload_hash=upload_hash, - calc_hash=proc.calc_hash, - upload_id=upload_proc.upload_id, - mainfile=mainfile, - upload_time=datetime.now()) - logger.debug('Archived successfully') - except Exception as e: - logger.error('Failed to archive', exc_info=e) - proc.fail(e) - return proc - _report_progress(self, proc) - - logger.debug('Completed processing') - - proc.success() - return proc diff --git a/nomad/search.py b/nomad/search.py new file mode 100644 index 0000000000..1c870f3365 --- /dev/null +++ b/nomad/search.py @@ -0,0 +1,159 @@ +# Copyright 2018 Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import elasticsearch.exceptions +from elasticsearch_dsl import Document as ElasticDocument, Search, Date, Keyword, connections +from datetime import datetime + +from nomad import config +from nomad.parsing import LocalBackend +from nomad.utils import get_logger + +logger = get_logger(__name__) + +# ensure elastic and mongo connections +if 'sphinx' not in sys.modules: + client = connections.create_connection(hosts=[config.elastic.host]) + +key_mappings = { + 'basis_set_type': 'program_basis_set_type', + 'chemical_composition': 'chemical_composition_bulk_reduced' +} + + +class AlreadyExists(Exception): pass + + +class CalcElasticDocument(ElasticDocument): + """ + Elastic search document that represents a calculation. It is supposed to be a + component of :class:`Calc`. Should only be created by its parent :class:`Calc` + instance and only via the :func:`create_from_backend` factory method. + """ + class Index: + name = config.elastic.calc_index + + calc_hash = Keyword() + mainfile = Keyword() + upload_hash = Keyword() + upload_id = Keyword() + + upload_time = Date() + + program_name = Keyword() + program_version = Keyword() + + chemical_composition = Keyword() + basis_set_type = Keyword() + atom_species = Keyword() + system_type = Keyword() + crystal_system = Keyword() + space_group_number = Keyword() + configuration_raw_gid = Keyword() + XC_functional_name = Keyword() + + @property + def archive_id(self) -> str: + """ The unique id for this calculation. """ + return '%s/%s' % (self.upload_hash, self.calc_hash) + + @classmethod + def create_from_backend( + cls, backend: LocalBackend, upload_id: str, upload_hash: str, calc_hash: str, + **kwargs) -> 'CalcElasticDocument': + """ + Create a new calculation instance in elastic search. The data from the given backend + will be used. Additional meta-data can be given as *kwargs*. ``upload_id``, + ``upload_hash``, and ``calc_hash`` are mandatory. + This will create a elastic search entry and store the backend data to the + archive. + + Arguments: + backend: The parsing/normalizing backend that contains the calculation data. + upload_hash: The upload hash of the originating upload. + upload_id: The upload id of the originating upload. + calc_hash: The upload unique hash for this calculation. + kwargs: Additional arguments not stored in the backend. + + Raises: + AlreadyExists: If the calculation already exists in elastic search. We use + the elastic document lock here. The elastic document is IDed via the + ``archive_id``. + """ + assert upload_hash is not None and calc_hash is not None and upload_id is not None + kwargs.update(dict(upload_hash=upload_hash, calc_hash=calc_hash, upload_id=upload_id)) + + # prepare the entry with all necessary properties from the backend + calc = cls(meta=dict(id='%s/%s' % (upload_hash, calc_hash))) + for property in cls._doc_type.mapping: + property = key_mappings.get(property, property) + + if property in kwargs: + value = kwargs[property] + else: + try: + value = backend.get_value(property, 0) + except KeyError: + logger.warning( + 'Missing property value', property=property, upload_id=upload_id, + upload_hash=upload_hash, calc_hash=calc_hash) + continue + + setattr(calc, property, value) + + # persist to elastic search + try: + calc.save(op_type='create') + except Exception: + raise AlreadyExists('Calculation %s does already exist.' % (calc.archive_id)) + + return calc + + @staticmethod + def es_search(body): + """ Perform an elasticsearch and not elasticsearch_dsl search on the Calc index. """ + return client.search(index=config.elastic.calc_index, body=body) + + @staticmethod + def upload_exists(upload_hash): + """ Returns true if there are already calcs from the given upload. """ + search = Search(using=client, index=config.elastic.calc_index) \ + .query('match', upload_hash=upload_hash) \ + .execute() + + return len(search) > 0 + + @property + def json_dict(self): + """ A json serializable dictionary representation. """ + data = self.to_dict() + + upload_time = data.get('upload_time', None) + if upload_time is not None and isinstance(upload_time, datetime): + data['upload_time'] = data['upload_time'].isoformat() + + data['archive_id'] = self.archive_id + + return {key: value for key, value in data.items() if value is not None} + + +if 'sphinx' not in sys.modules: + try: + CalcElasticDocument.init() + except elasticsearch.exceptions.RequestError as e: + if e.status_code == 400 and 'resource_already_exists_exception' in e.error: + pass # happens if two services try this at the same time + else: + raise e diff --git a/nomad/user.py b/nomad/user.py new file mode 100644 index 0000000000..755e1611fe --- /dev/null +++ b/nomad/user.py @@ -0,0 +1,33 @@ +import sys +from mongoengine import Document, EmailField, StringField, ReferenceField, ListField + + +class User(Document): + """ Represents users in the database. """ + email = EmailField(primary=True) + name = StringField() + + +class DataSet(Document): + name = StringField() + description = StringField() + doi = StringField() + + user = ReferenceField(User) + calcs = ListField(StringField) + + meta = { + 'indexes': [ + 'user', + 'doi', + 'calcs' + ] + } + +# provid a fake user for testing +me = None +if 'sphinx' not in sys.modules: + me = User.objects(email='me@gmail.com').first() + if me is None: + me = User(email='me@gmail.com', name='Me Meyer') + me.save() diff --git a/nomad/utils.py b/nomad/utils.py index 5ffcb6c89b..620ff0ae5a 100644 --- a/nomad/utils.py +++ b/nomad/utils.py @@ -10,6 +10,7 @@ from contextlib import contextmanager import json import os import sys +import uuid from nomad import config @@ -98,6 +99,10 @@ if not _logging_is_configured: _logging_is_configured = True +def create_uuid() -> str: + return base64.b64encode(uuid.uuid4().bytes, altchars=b'-_').decode('utf-8')[0:-2] + + def hash(obj: Union[IO, str]) -> str: """ First 28 character of an URL safe base 64 encoded sha512 digest. """ hash = hashlib.sha512() diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000000..9803c09c00 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,49 @@ +import pytest +from mongoengine import connect +from mongoengine.connection import disconnect + +from nomad import config + + +@pytest.fixture(scope='session') +def celery_includes(): + return ['nomad.processing.base'] + + +@pytest.fixture(scope='session') +def celery_config(): + return { + 'broker_url': config.celery.broker_url, + 'result_backend': config.celery.backend_url, + 'accept_content': ['json', 'pickle'], + 'task_serializer': config.celery.serializer, + 'result_serializer': config.celery.serializer + } + + +@pytest.fixture(scope='function', autouse=True) +def mongomock(monkeypatch): + def mock_connect(**kwargs): + return connect('test_db', host='mongomock://localhost') + + disconnect() + connection = mock_connect() + monkeypatch.setattr('nomad.processing.base.mongo_connect', mock_connect) + yield + connection.drop_database('test_db') + + +@pytest.fixture(scope='function') +def mocksearch(monkeypatch): + uploads = [] + + def create_from_backend(_, **kwargs): + upload_hash = kwargs.get('upload_hash', None) + uploads.append(upload_hash) + return {} + + def upload_exists(upload_hash): + return upload_hash in uploads + + monkeypatch.setattr('nomad.search.CalcElasticDocument.create_from_backend', create_from_backend) + monkeypatch.setattr('nomad.search.CalcElasticDocument.upload_exists', upload_exists) diff --git a/tests/test_proc.py b/tests/processing/test_base.py similarity index 65% rename from tests/test_proc.py rename to tests/processing/test_base.py index 7da2c042e0..74c1903d10 100644 --- a/tests/test_proc.py +++ b/tests/processing/test_base.py @@ -4,35 +4,7 @@ from mongoengine.connection import disconnect import time from nomad import config -from nomad.proc import Proc, process, task, SUCCESS, FAILURE, RUNNING, PENDING - - -@pytest.fixture(scope='session') -def celery_includes(): - return ['nomad.proc'] - - -@pytest.fixture(scope='session') -def celery_config(): - return { - 'broker_url': config.celery.broker_url, - 'result_backend': config.celery.backend_url, - 'accept_content': ['json', 'pickle'], - 'task_serializer': config.celery.serializer, - 'result_serializer': config.celery.serializer - } - - -@pytest.fixture(scope='function') -def mongomock(monkeypatch): - def mock_connect(**kwargs): - return connect('test_db', host='mongomock://localhost') - - disconnect() - connection = mock_connect() - monkeypatch.setattr('nomad.proc.mongo_connect', mock_connect) - yield - connection.drop_database('test_db') +from nomad.processing.base import Proc, process, task, SUCCESS, FAILURE, RUNNING, PENDING def assert_proc(proc, current_task, status=SUCCESS, errors=0, warnings=0): @@ -80,7 +52,7 @@ class FailTasks(Proc): self.fail('fail fail fail') -def test_fail(mongomock): +def test_fail(): p = FailTasks.create() p.will_fail() @@ -102,7 +74,7 @@ class SimpleProc(Proc): pass -def test_simple_process(celery_session_worker, mongomock): +def test_simple_process(celery_session_worker): p = SimpleProc.create() p.process() p.block_until_complete() @@ -116,7 +88,7 @@ class TaskInProc(Proc): pass -def test_task_as_proc(celery_session_worker, mongomock): +def test_task_as_proc(celery_session_worker): p = TaskInProc.create() p.process() p.block_until_complete() @@ -129,17 +101,14 @@ class ParentProc(Proc): @process @task def spawn_children(self): - print('## spawn') ChildProc.create(parent=self).process() @process @task def after_children(self): - print('## after') pass def on_child_complete(self): - print('## on complete') if self.incr_counter('children') == 1: self.after_children() @@ -153,8 +122,12 @@ class ChildProc(Proc): self.parent.on_child_complete() -def test_counter(celery_session_worker, mongomock): +def test_counter(celery_session_worker): p = ParentProc.create() p.spawn_children() p.block_until_complete() assert_proc(p, 'after_children') + + # wait for session worker to complete all open tasks + # otherwise uncompleted task request will bleed into the next tests + time.sleep(1) diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py new file mode 100644 index 0000000000..e45159bf9e --- /dev/null +++ b/tests/processing/test_data.py @@ -0,0 +1,148 @@ +# Copyright 2018 Markus Scheidgen +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an"AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +To run this test, a celery worker must be running. The test worker provided by +the celery pytest plugin is currently not working. It results on a timeout when +reading from the redis result backend, even though all task apperently ended successfully. +""" + +from typing import Generator +import pytest +import logging +from datetime import datetime + +from nomad import config, files +from nomad.processing import Upload, Calc +from nomad.processing.base import task as task_decorator +from nomad.user import me +from nomad.search import CalcElasticDocument + +from tests.test_files import example_file, empty_file + +# import fixtures +from tests.test_files import clear_files # pylint: disable=unused-import + +example_files = [empty_file, example_file] + + +@pytest.fixture(scope='function', autouse=True) +def mocksearch_forall(mocksearch): + pass + + +@pytest.fixture(scope='function', params=example_files) +def uploaded_id(request, clear_files) -> Generator[str, None, None]: + example_file = request.param + example_upload_id = example_file.replace('.zip', '') + files._client.fput_object(config.files.uploads_bucket, example_upload_id, example_file) + + yield example_upload_id + + +def run_processing(uploaded_id: str) -> Upload: + upload = Upload.create(upload_id=uploaded_id, user=me) + upload.upload_time = datetime.now() + + assert upload.status == 'RUNNING' + assert upload.current_task == 'uploading' + + upload.process() # pylint: disable=E1101 + upload.block_until_complete(interval=.1) + + return upload + + +def assert_processing(upload: Upload): + assert upload.completed + assert upload.current_task == 'cleanup' + assert upload.upload_hash is not None + assert len(upload.errors) == 0 + assert upload.status == 'SUCCESS' + + for calc in Calc.objects(upload_id=upload.upload_id): + assert calc.parser is not None + assert calc.mainfile is not None + assert calc.status == 'SUCCESS', calc.archive_id + assert len(calc.errors) == 0 + + +@pytest.mark.timeout(30) +def test_processing(uploaded_id, celery_session_worker): + upload = run_processing(uploaded_id) + assert_processing(upload) + + +@pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True) +def test_processing_doublets(uploaded_id, celery_session_worker, caplog): + caplog.set_level(logging.CRITICAL) + + upload = run_processing(uploaded_id) + assert upload.status == 'SUCCESS' + assert CalcElasticDocument.upload_exists(upload.upload_hash) # pylint: disable=E1101 + + upload = run_processing(uploaded_id) + assert upload.status == 'FAILURE' + assert len(upload.errors) > 0 + assert 'already' in upload.errors[0] + + +@pytest.mark.timeout(30) +def test_process_non_existing(celery_session_worker, caplog): + caplog.set_level(logging.CRITICAL) + upload = run_processing('__does_not_exist') + + assert upload.completed + assert upload.current_task == 'extracting' + assert upload.status == 'FAILURE' + assert len(upload.errors) > 0 + + +@pytest.mark.parametrize('task', ['extracting', 'parse_all', 'cleanup', 'parsing']) +@pytest.mark.timeout(30) +def test_task_failure(monkeypatch, uploaded_id, celery_session_worker, task, caplog): + caplog.set_level(logging.CRITICAL) + + # mock the task method to through exceptions + if hasattr(Upload, task): + cls = Upload + elif hasattr(Calc, task): + cls = Calc + else: + assert False + + def mock(self): + raise Exception('fail for test') + mock.__name__ = task + mock = task_decorator(mock) + + monkeypatch.setattr('nomad.processing.data.%s.%s' % (cls.__name__, task), mock) + + # run the test + upload = run_processing(uploaded_id) + + assert upload.completed + + if task != 'parsing': + assert upload.status == 'FAILURE' + assert upload.current_task == task + assert len(upload.errors) > 0 + elif len(upload.calcs) > 0: # pylint: disable=E1101 + assert upload.status == 'SUCCESS' + assert upload.current_task == 'cleanup' + assert len(upload.errors) > 0 + for calc in upload.calcs: # pylint: disable=E1101 + assert calc.status == 'FAILURE' + assert calc.current_task == 'parsing' + assert len(calc.errors) > 0 diff --git a/tests/test_api.py b/tests/test_api.py index b75298d4de..340f542e19 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -14,18 +14,17 @@ services_config = config.services._asdict() services_config.update(api_base_path='') config.services = config.NomadServicesConfig(**services_config) -from nomad import api, files, processing # noqa -from nomad.data import Upload # noqa +from nomad import api, files # noqa +from nomad.processing import Upload, handle_uploads_thread # noqa -from tests.test_processing import example_files # noqa +from tests.processing.test_data import example_files # noqa from tests.test_files import assert_exists # noqa # import fixtures from tests.test_files import clear_files, archive_id # noqa pylint: disable=unused-import from tests.test_normalizing import normalized_vasp_example # noqa pylint: disable=unused-import from tests.test_parsing import parsed_vasp_example # noqa pylint: disable=unused-import -from tests.test_data import example_calc # noqa pylint: disable=unused-import -from tests.test_processing import celery_config, celery_includes, mocksearch # noqa pylint: disable=unused-import +from tests.test_search import example_elastic_calc # noqa pylint: disable=unused-import @pytest.fixture(scope='function') @@ -70,11 +69,6 @@ def test_no_uploads(client): assert_uploads(rv.data, count=0) -def test_bad_upload_id(client): - rv = client.get('/uploads/bad_id') - assert rv.status_code == 400 - - def test_not_existing_upload(client): rv = client.get('/uploads/123456789012123456789012') assert rv.status_code == 404 @@ -88,7 +82,7 @@ def test_stale_upload(client): assert rv.status_code == 200 upload_id = assert_upload(rv.data)['upload_id'] - upload = Upload.get(upload_id=upload_id) + upload = Upload.get(upload_id) upload.create_time = datetime.now() - timedelta(days=2) upload.save() @@ -163,9 +157,9 @@ def test_upload_to_upload(client, file): @pytest.mark.parametrize("file", example_files) -@pytest.mark.timeout(30) +@pytest.mark.timeout(10) def test_processing(client, file, celery_session_worker, mocksearch): - handle_uploads_thread = processing.handle_uploads_thread(quit=True) + handler = handle_uploads_thread(quit=True) rv = client.post('/uploads') assert rv.status_code == 200 @@ -176,7 +170,7 @@ def test_processing(client, file, celery_session_worker, mocksearch): cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file) subprocess.call(shlex.split(cmd)) - handle_uploads_thread.join() + handler.join() while True: time.sleep(1) @@ -185,20 +179,25 @@ def test_processing(client, file, celery_session_worker, mocksearch): assert rv.status_code == 200 upload = assert_upload(rv.data) assert 'upload_time' in upload - if upload['proc']['status'] in ['SUCCESS', 'FAILURE']: + if upload['completed']: break - proc = upload['proc'] - assert proc['status'] == 'SUCCESS' - assert 'calc_procs' in proc - assert proc['calc_procs'] is not None - assert proc['current_task_name'] == 'cleanup' - assert len(proc['task_names']) == 4 - assert_exists(config.files.uploads_bucket, upload['upload_id']) + assert len(upload['tasks']) == 4 + assert upload['status'] == 'SUCCESS' + assert upload['current_task'] == 'cleanup' + calcs = upload['calcs'] + for calc in calcs: + assert calc['status'] == 'SUCCESS' + assert calc['current_task'] == 'archiving' + assert len(calc['tasks']) == 3 + assert_exists(config.files.uploads_bucket, upload['upload_id']) + + time.sleep(1) -def test_repo_calc(client, example_calc): - rv = client.get('/repo/%s/%s' % (example_calc.upload_hash, example_calc.calc_hash)) +def test_repo_calc(client, example_elastic_calc): + rv = client.get( + '/repo/%s/%s' % (example_elastic_calc.upload_hash, example_elastic_calc.calc_hash)) assert rv.status_code == 200 @@ -207,7 +206,7 @@ def test_non_existing_repo_cals(client): assert rv.status_code == 404 -def test_repo_calcs(client, example_calc): +def test_repo_calcs(client, example_elastic_calc): rv = client.get('/repo') assert rv.status_code == 200 data = json.loads(rv.data) @@ -217,7 +216,7 @@ def test_repo_calcs(client, example_calc): assert len(results) >= 1 -def test_repo_calcs_pagination(client, example_calc): +def test_repo_calcs_pagination(client, example_elastic_calc): rv = client.get('/repo?page=1&per_page=1') assert rv.status_code == 200 data = json.loads(rv.data) diff --git a/tests/test_processing.py b/tests/test_processing.py deleted file mode 100644 index e1de1628d9..0000000000 --- a/tests/test_processing.py +++ /dev/null @@ -1,172 +0,0 @@ -# Copyright 2018 Markus Scheidgen -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an"AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -To run this test, a celery worker must be running. The test worker provided by -the celery pytest plugin is currently not working. It results on a timeout when -reading from the redis result backend, even though all task apperently ended successfully. -""" - -from typing import Generator -import pytest -import time -import logging - -from nomad import config, files -from nomad.data import Calc -from nomad.processing import start_processing, ProcPipeline - -from tests.test_files import example_file, empty_file - -# import fixtures -from tests.test_files import clear_files # pylint: disable=unused-import - -example_files = [empty_file, example_file] - - -@pytest.fixture(scope='function') -def mocksearch(monkeypatch): - uploads = [] - - def create_from_backend(_, **kwargs): - upload_hash = kwargs.get('upload_hash', None) - uploads.append(upload_hash) - return {} - - def upload_exists(upload_hash): - return upload_hash in uploads - - monkeypatch.setattr('nomad.data.Calc.create_from_backend', create_from_backend) - monkeypatch.setattr('nomad.data.Calc.upload_exists', upload_exists) - - -@pytest.fixture(scope='function', autouse=True) -def mocksearch_forall(mocksearch): - pass - - -@pytest.fixture(scope='session') -def celery_includes(): - return ['nomad.processing.tasks'] - - -@pytest.fixture(scope='session') -def celery_config(): - return { - 'broker_url': config.celery.broker_url, - 'result_backend': config.celery.backend_url, - 'accept_content': ['json', 'pickle'], - 'task_serializer': config.celery.serializer, - 'result_serializer': config.celery.serializer - } - - -@pytest.fixture(scope='function', params=example_files) -def uploaded_id(request, clear_files) -> Generator[str, None, None]: - example_file = request.param - example_upload_id = example_file.replace('.zip', '') - files._client.fput_object(config.files.uploads_bucket, example_upload_id, example_file) - - yield example_upload_id - - -@pytest.mark.timeout(30) -def test_processing(uploaded_id, celery_session_worker): - upload_proc = start_processing(uploaded_id) - - upload_proc.update_from_backend() - - assert upload_proc.status in ['PENDING', 'STARTED', 'PROGRESS'] - - while not upload_proc.ready(): - time.sleep(1) - upload_proc.update_from_backend() - - assert upload_proc.ready() - assert upload_proc.current_task_name == 'cleanup' - assert upload_proc.upload_hash is not None - assert len(upload_proc.errors) == 0 - assert upload_proc.status == 'SUCCESS' - for calc_proc in upload_proc.calc_procs: - assert calc_proc.parser_name is not None - assert calc_proc.mainfile is not None - assert calc_proc.calc_hash is not None - assert calc_proc.status == 'SUCCESS' - assert len(calc_proc.errors) == 0 - - upload_proc.forget() - - -@pytest.mark.parametrize('uploaded_id', [example_files[1]], indirect=True) -def test_processing_doublets(uploaded_id, celery_session_worker, caplog): - caplog.set_level(logging.CRITICAL) - upload_proc = start_processing(uploaded_id) - upload_proc.get() - assert upload_proc.status == 'SUCCESS' - - assert Calc.upload_exists(upload_proc.upload_hash) - - upload_proc = start_processing(uploaded_id) - upload_proc.get() - assert upload_proc.status == 'FAILURE' - assert len(upload_proc.errors) > 0 - assert 'already' in upload_proc.errors[0] - - -@pytest.mark.timeout(30) -def test_process_non_existing(celery_session_worker, caplog): - caplog.set_level(logging.CRITICAL) - upload_proc = start_processing('__does_not_exist') - - upload_proc.get() - - assert upload_proc.ready() - upload_proc.forget() - - assert upload_proc.current_task_name == 'extracting' - assert upload_proc.status == 'FAILURE' - assert len(upload_proc.errors) > 0 - - -@pytest.mark.parametrize('task', ['extracting', 'parse_all', 'cleanup', 'parsers/vasp']) -def test_task_failure(monkeypatch, uploaded_id, celery_session_worker, task, caplog): - caplog.set_level(logging.CRITICAL) - - original_continue_with = ProcPipeline.continue_with - - def continue_with(self: ProcPipeline, current_task): - if task == current_task: - raise Exception('fail for test') - - return original_continue_with(self, current_task) - - monkeypatch.setattr('nomad.processing.state.ProcPipeline.continue_with', continue_with) - - upload_proc = start_processing(uploaded_id) - upload_proc.get() - - assert upload_proc.ready() - - if task != 'parsers/vasp': - assert upload_proc.status == 'FAILURE' - assert upload_proc.current_task_name == task - assert len(upload_proc.errors) > 0 - elif len(upload_proc.calc_procs) > 0: # ignore the empty example upload - assert upload_proc.status == 'FAILURE' - assert upload_proc.current_task_name == 'cleanup' - assert len(upload_proc.errors) > 0 - for calc_proc in upload_proc.calc_procs: - assert calc_proc.status == 'FAILURE' - assert calc_proc.current_task_name == 'parser/vasp' - assert len(calc_proc.errors) > 0 diff --git a/tests/test_data.py b/tests/test_search.py similarity index 64% rename from tests/test_data.py rename to tests/test_search.py index 26fbdf2729..be0fb2a22a 100644 --- a/tests/test_data.py +++ b/tests/test_search.py @@ -21,7 +21,7 @@ from elasticsearch import NotFoundError from nomad import config from nomad.parsing import LocalBackend -from nomad.data import Calc, AlreadyExists, key_mappings +from nomad.search import AlreadyExists, CalcElasticDocument, key_mappings from tests.test_normalizing import normalized_vasp_example # pylint: disable=unused-import from tests.test_parsing import parsed_vasp_example # pylint: disable=unused-import @@ -29,16 +29,17 @@ from tests.test_files import assert_not_exists @pytest.fixture(scope='function') -def example_calc(normalized_vasp_example: LocalBackend, caplog) -> Generator[Calc, None, None]: +def example_elastic_calc(normalized_vasp_example: LocalBackend, caplog) \ + -> Generator[CalcElasticDocument, None, None]: try: caplog.set_level(logging.ERROR) - Calc.get(id='test_upload_hash/test_calc_hash').delete() + CalcElasticDocument.get(id='test_upload_hash/test_calc_hash').delete() except Exception: pass finally: caplog.set_level(logging.WARNING) - entry = Calc.create_from_backend( + entry = CalcElasticDocument.create_from_backend( normalized_vasp_example, upload_hash='test_upload_hash', calc_hash='test_calc_hash', @@ -58,28 +59,27 @@ def example_calc(normalized_vasp_example: LocalBackend, caplog) -> Generator[Cal caplog.set_level(logging.WARNING) -def assert_calc(calc: Calc): +def assert_elastic_calc(calc: CalcElasticDocument): assert calc is not None - for property in Calc._doc_type.mapping: + for property in CalcElasticDocument._doc_type.mapping: property = key_mappings.get(property, property) assert getattr(calc, property) is not None -def test_create(example_calc: Calc): - assert_calc(example_calc) - assert Calc.upload_exists(example_calc.upload_hash) +def test_create_elasitc_calc(example_elastic_calc: CalcElasticDocument): + assert_elastic_calc(example_elastic_calc) + assert CalcElasticDocument.upload_exists(example_elastic_calc.upload_hash) - get_result: Calc = Calc.get(id='%s/%s' % (example_calc.upload_hash, example_calc.calc_hash)) - assert_calc(get_result) + get_result: CalcElasticDocument = CalcElasticDocument.get( + id='%s/%s' % (example_elastic_calc.upload_hash, example_elastic_calc.calc_hash)) + assert_elastic_calc(get_result) - json_dict = get_result.json_dict - assert 'archive_id' in json_dict - -def test_create_existing(example_calc: Calc, normalized_vasp_example, caplog): +def test_create_existing_elastic_calc( + example_elastic_calc: CalcElasticDocument, normalized_vasp_example, caplog): try: caplog.set_level(logging.ERROR) - Calc.create_from_backend( + CalcElasticDocument.create_from_backend( normalized_vasp_example, upload_hash='test_upload_hash', calc_hash='test_calc_hash', @@ -94,13 +94,13 @@ def test_create_existing(example_calc: Calc, normalized_vasp_example, caplog): assert False -def test_delete(example_calc: Calc, caplog): - example_calc.delete() +def test_delete_elastic_calc(example_elastic_calc: CalcElasticDocument, caplog): + example_elastic_calc.delete() assert_not_exists(config.files.archive_bucket, 'test_upload_hash/test_calc_hash') try: caplog.set_level(logging.ERROR) - Calc.get(id='test_upload_hash/test_calc_hash') + CalcElasticDocument.get(id='test_upload_hash/test_calc_hash') assert False except NotFoundError: pass @@ -108,19 +108,3 @@ def test_delete(example_calc: Calc, caplog): assert False finally: caplog.set_level(logging.WARNING) - - -def test_delete_all(example_calc: Calc, caplog): - Calc.delete_all(upload_id=example_calc.upload_id) - - assert_not_exists(config.files.archive_bucket, 'test_upload_hash/test_calc_hash') - try: - caplog.set_level(logging.ERROR) - Calc.get(id='test_upload_hash/test_calc_hash') - assert False - except NotFoundError: - pass - else: - assert False - finally: - caplog.set_level(logging.WARNING) \ No newline at end of file -- GitLab