Commit 8cc3b00b authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored data/proc.

parent ce71d6ce
......@@ -68,13 +68,13 @@
]
},
{
"name": "Python: tests/test_utils.py",
"name": "Python: tests/processing/test_base.py",
"type": "python",
"request": "launch",
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_utils.py"
"-sv", "tests/processing/test_base.py::test_fail"
]
},
{
......
......@@ -54,6 +54,7 @@ services:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
- RABBITMQ_DEFAULT_VHOST=/
- RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit log_levels [{default,debug}]
ports:
- ${RABBITMQ_HOST_PORT}:5672
volumes:
......
......@@ -4,8 +4,10 @@ from flask_cors import CORS
from elasticsearch.exceptions import NotFoundError
from nomad import config, files
from nomad.utils import get_logger
from nomad.data import Calc, Upload, User, InvalidId, NotAllowedDuringProcessing, me
from nomad.utils import get_logger, create_uuid
from nomad.processing import Upload, Calc, InvalidId, NotAllowedDuringProcessing
from nomad.search import CalcElasticDocument
from nomad.user import me
base_path = config.services.api_base_path
......@@ -135,7 +137,8 @@ class UploadsRes(Resource):
if json_data is None:
json_data = {}
return Upload.create(user=me, name=json_data.get('name', None)).json_dict, 200
upload = Upload.create(upload_id=create_uuid(), user=me, name=json_data.get('name'))
return upload.json_dict, 200
class UploadRes(Resource):
......@@ -191,17 +194,18 @@ class UploadRes(Resource):
:param string upload_id: the id for the upload
:resheader Content-Type: application/json
:status 200: upload successfully updated and retrieved
:status 400: bad upload id
:status 404: upload with id does not exist
:returns: the :class:`nomad.data.Upload` instance
"""
# TODO calc paging
try:
return Upload.get(upload_id=upload_id).json_dict, 200
except InvalidId:
abort(400, message='%s is not a valid upload id.' % upload_id)
result = Upload.get(upload_id).json_dict
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
result['calcs'] = [calc.json_dict for calc in Calc.objects(upload_id=upload_id)]
return result, 200
def delete(self, upload_id):
"""
Deletes an existing upload. Only ``is_ready`` or ``is_stale`` uploads
......@@ -224,9 +228,9 @@ class UploadRes(Resource):
:returns: the :class:`nomad.data.Upload` instance with the latest processing state
"""
try:
return Upload.get(upload_id=upload_id).delete().json_dict, 200
except InvalidId:
abort(400, message='%s is not a valid upload id.' % upload_id)
upload = Upload.get(upload_id)
upload.delete()
return upload.json_dict, 200
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
except NotAllowedDuringProcessing:
......@@ -236,7 +240,7 @@ class UploadRes(Resource):
class RepoCalcRes(Resource):
def get(self, upload_hash, calc_hash):
try:
return Calc.get(id='%s/%s' % (upload_hash, calc_hash)).json_dict, 200
return CalcElasticDocument.get(id='%s/%s' % (upload_hash, calc_hash)).json_dict, 200
except NotFoundError:
abort(404, message='There is no calculation for %s/%s' % (upload_hash, calc_hash))
except Exception as e:
......@@ -255,7 +259,7 @@ class RepoCalcsRes(Resource):
assert per_page > 0
try:
search = Calc.search().query('match_all')
search = CalcElasticDocument.search().query('match_all')
search = search[(page - 1) * per_page: page * per_page]
return {
'pagination': {
......
......@@ -51,7 +51,6 @@ Initiate processing
"""
from nomad.processing.app import app
from nomad.processing import tasks
from nomad.processing.state import ProcPipeline, UploadProc, CalcProc
from nomad.processing.handler import handle_uploads, handle_uploads_thread, start_processing
from nomad.processing.base import app, InvalidId, ProcNotRegistered
from nomad.processing.data import Upload, Calc, NotAllowedDuringProcessing
from nomad.processing.handler import handle_uploads, handle_uploads_thread
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from contextlib import contextmanager
import inspect
import logging
import celery
from celery import Task
from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init
from mongoengine import Document, StringField, ListField, DateTimeField, IntField, \
ReferenceField, connect
from pymongo import ReturnDocument
from datetime import datetime
from nomad import config, utils
import nomad.patch # pylint: disable=unused-import
class Celery(celery.Celery):
def mongo_connect(self):
return connect(db=config.mongo.users_db, host=config.mongo.host)
def __init__(self):
if config.logstash.enabled:
def initialize_logstash(logger=None, loglevel=logging.DEBUG, **kwargs):
utils.add_logstash_handler(logger)
return logger
after_setup_task_logger.connect(initialize_logstash)
after_setup_logger.connect(initialize_logstash)
worker_process_init.connect(lambda **kwargs: self.mongo_connect())
super().__init__(
'nomad.processing', backend=config.celery.backend_url, broker=config.celery.broker_url)
self.add_defaults(dict(
accept_content=['json', 'pickle'],
task_serializer=config.celery.serializer,
result_serializer=config.celery.serializer,
))
app = Celery()
PENDING = 'PENDING'
RUNNING = 'RUNNING'
FAILURE = 'FAILURE'
SUCCESS = 'SUCCESS'
class InvalidId(Exception): pass
class AsyncDocumentNotRegistered(Exception): pass
class Proc(Document):
"""
Base class for objects involved in processing and need persistent processing
state.
Attributes:
current_task: the currently running or last completed task
status: the overall status of the processing
errors: a list of errors that happened during processing. Error fail a processing
run
warnings: a list of warnings that happened during processing. Warnings do not
fail a processing run
create_time: the time of creation (not the start of processing)
proc_time: the time that processing completed (successfully or not)
"""
meta = {
'abstract': True,
}
tasks: List[str] = None
""" the ordered list of tasks that comprise a processing run """
current_task = StringField(default=None)
status = StringField(default='CREATED')
errors = ListField(StringField, default=[])
warnings = ListField(StringField, default=[])
create_time = DateTimeField(required=True)
complete_time = DateTimeField()
@property
def completed(self) -> bool:
return self.status in [SUCCESS, FAILURE]
def __init__(self, **kwargs):
assert self.__class__.tasks is not None and len(self.tasks) > 0, \
""" the class attribute tasks must be overwritten with an acutal list """
assert 'status' not in kwargs, \
""" do not set the status manually, its managed """
kwargs.setdefault('create_time', datetime.now())
super().__init__(**kwargs)
self.status = PENDING if self.current_task is None else RUNNING
@classmethod
def get(cls, obj_id):
try:
obj = cls.objects(id=obj_id).first()
except Exception as e:
raise InvalidId('%s is not a valid id' % obj_id)
if obj is None:
raise KeyError('%s with id %s does not exist' % (cls.__name__, obj_id))
return obj
def get_id(self):
return self.id.__str__()
def fail(self, error):
assert not self.completed
self.status = FAILURE
self.errors = [str(error) for error in errors]
self.complete_time = datetime.now()
self.save()
def warning(self, *warnings):
assert not self.completed
for warning in warnings:
self.warnings.append(str(warning))
def continue_with(self, task):
assert task in self.tasks
assert self.tasks.index(task) == self.tasks.index(self.current_task) + 1
if self.status == FAILURE:
return False
if self.status == PENDING:
assert self.current_task is None
assert task == self.tasks[0]
self.status = RUNNING
self.current_task = task
self.save()
return True
def complete(self):
if self.status != FAILURE:
assert self.status == RUNNING
self.status = SUCCESS
self.save()
@property
def json_dict(self) -> dict:
""" A json serializable dictionary representation. """
data = {
'tasks': self.tasks,
'current_task': self.current_task,
'status': self.status,
'errors': self.errors,
'warnings': self.warnings,
'create_time': self.create_time.isoformat() if self.create_time is not None else None,
'complete_time': self.complete_time.isoformat() if self.complete_time is not None else None,
}
return {key: value for key, value in data.items() if value is not None}
def task(func):
def wrapper(self, *args, **kwargs):
if self.status == 'FAILURE':
return
self.continue_with(func.__name__)
try:
func(self, *args, **kwargs)
except Exception as e:
self.fail(e)
return wrapper
def process(func):
@app.task(bind=True, name=func.__name__, ignore_results=True)
def task_func(task, cls_name, self_id):
all_cls = AsyncDocument.__subclasses__()
cls = next((cls for cls in all_cls if cls.__name__ == cls_name), None)
if cls is None:
raise AsyncDocumentNotRegistered('Document type %s not registered for async methods' % cls_name)
self = cls.get(self_id)
try:
func(self)
except Exception as e:
self.fail(e)
raise e
def wrapper(self, *args, **kwargs):
assert len(args) == 0 and len(kwargs) == 0
self.save()
self_id = self.get_id()
cls_name = self.__class__.__name__
return task_func.s(cls_name, self_id).delay()
return wrapper
class Upload(Proc):
data = StringField(default='Hello, World')
processed_calcs = IntField(default=0)
total_calcs = IntField(default=-1)
@process
def after_upload(self):
self.extract()
self.parse()
@process
def after_parse(self):
self.cleanup()
@task
def extract(self):
print('now extracting')
@task
def parse(self):
Calc(upload=self).parse()
Calc(upload=self).parse()
self.total_calcs = 2
self.save()
self.check_calcs_complete(more_calcs=0)
@task
def cleanup(self):
print('cleanup')
def check_calcs_complete(self, more_calcs=1):
# use a primitive but atomic pymongo call to increase the number of calcs
updated_raw = Upload._get_collection().find_one_and_update(
{'_id': self.id},
{'$inc': {'processed_calcs': more_calcs}},
return_document=ReturnDocument.AFTER)
updated_total_calcs = updated_raw['total_calcs']
updated_processed_calcs = updated_raw['processed_calcs']
print('%d:%d' % (updated_processed_calcs, updated_total_calcs))
if updated_processed_calcs == updated_total_calcs and updated_total_calcs != -1:
self.after_parse()
class Calc(Proc):
upload = ReferenceField(Upload, required=True)
@process
def parse(self):
print('parsee')
self.upload.check_calcs_complete()
if __name__ == '__main__':
connect(db=config.mongo.users_db, host=config.mongo.host)
tds = [Upload(), Upload(), Upload()]
for td in tds:
td.after_upload()
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, cast
from typing import List, cast, Any
import types
from contextlib import contextmanager
import collections
......@@ -28,6 +28,7 @@ from mongoengine.connection import MongoEngineConnectionError
from mongoengine.base.metaclasses import TopLevelDocumentMetaclass
from pymongo import ReturnDocument
from datetime import datetime
import sys
from nomad import config, utils
import nomad.patch # pylint: disable=unused-import
......@@ -57,6 +58,10 @@ app.add_defaults(dict(
result_serializer=config.celery.serializer,
))
# ensure elastic and mongo connections
if 'sphinx' not in sys.modules:
connect(db=config.mongo.users_db, host=config.mongo.host)
PENDING = 'PENDING'
RUNNING = 'RUNNING'
......@@ -67,7 +72,7 @@ SUCCESS = 'SUCCESS'
class InvalidId(Exception): pass
class AsyncDocumentNotRegistered(Exception): pass
class ProcNotRegistered(Exception): pass
class ProcMetaclass(TopLevelDocumentMetaclass):
......@@ -113,7 +118,7 @@ class Proc(Document, metaclass=ProcMetaclass):
proc_time: the time that processing completed (successfully or not)
"""
meta = {
meta: Any = {
'abstract': True,
}
......@@ -134,6 +139,11 @@ class Proc(Document, metaclass=ProcMetaclass):
""" Returns True of the process has failed or succeeded. """
return self.status in [SUCCESS, FAILURE]
def get_logger(self):
return utils.get_logger(
__name__, current_task=self.current_task, process=self.__class__.__name__,
status=self.status)
@classmethod
def create(cls, **kwargs):
""" Factory method that must be used instead of regular constructor. """
......@@ -150,37 +160,72 @@ class Proc(Document, metaclass=ProcMetaclass):
return self
@classmethod
def get(cls, obj_id):
""" Loads the object from the database. """
def get_by_id(cls, id: str, id_field: str):
try:
obj = cls.objects(id=str(obj_id)).first()
obj = cls.objects(**{id_field: id}).first()
except ValidationError as e:
raise InvalidId('%s is not a valid id' % obj_id)
raise InvalidId('%s is not a valid id' % id)
except MongoEngineConnectionError as e:
raise e
if obj is None:
raise KeyError('%s with id %s does not exist' % (cls.__name__, obj_id))
raise KeyError('%s with id %s does not exist' % (cls.__name__, id))
return obj
def fail(self, *errors):
@classmethod
def get(cls, obj_id):
return cls.get_by_id(str(obj_id), 'id')
@staticmethod
def log(logger, log_level, msg, **kwargs):
# TODO there seems to be a bug in structlog, cannot use logger.log
if log_level == logging.ERROR:
logger.error(msg, **kwargs)
elif log_level == logging.WARNING:
logger.warning(msg, **kwargs)
elif log_level == logging.INFO:
logger.info(msg, **kwargs)
elif log_level == logging.DEBUG:
logger.debug(msg, **kwargs)
else:
logger.critical(msg, **kwargs)
def fail(self, *errors, log_level=logging.ERROR, **kwargs):
""" Allows to fail the process. Takes strings or exceptions as args. """
assert not self.completed
assert not self.completed, 'Cannot fail a completed process.'
failed_with_exception = False
self.status = FAILURE
logger = self.get_logger(**kwargs)
for error in errors:
if isinstance(error, Exception):
failed_with_exception = True
Proc.log(logger, log_level, 'task failed with exception', exc_info=error, **kwargs)
self.errors = [str(error) for error in errors]
self.complete_time = datetime.now()
print(self.errors)
if not failed_with_exception:
errors_str = "; ".join([str(error) for error in errors])
Proc.log(logger, log_level, 'task failed', errors=errors_str, **kwargs)
logger.debug('process failed')
self.save()
def warning(self, *warnings):
def warning(self, *warnings, log_level=logging.warning, **kwargs):
""" Allows to save warnings. Takes strings or exceptions as args. """
assert not self.completed
logger = self.get_logger(**kwargs)
for warning in warnings:
self.warnings.append(str(warning))
warning = str(warning)
self.warnings.append(warning)
logger.log('task with warning', warning=warning, level=log_level)
def _continue_with(self, task):
tasks = self.__class__.tasks
......@@ -198,16 +243,21 @@ class Proc(Document, metaclass=ProcMetaclass):
assert self.current_task is None
assert task == tasks[0]
self.status = RUNNING
self.current_task = task
self.get_logger().debug('started process')
else:
self.current_task = task
self.get_logger().debug('successfully completed task')
self.current_task = task
self.save()
return True
def _complete(self):
if self.status != FAILURE:
assert self.status == RUNNING
assert self.status == RUNNING, 'Can only complete a running process.'
self.status = SUCCESS
self.save()
self.get_logger().debug('completed process')
def incr_counter(self, field, value=1, other_fields=None):
"""
......@@ -254,6 +304,7 @@ class Proc(Document, metaclass=ProcMetaclass):
'tasks': getattr(self.__class__, 'tasks'),
'current_task': self.current_task,
'status': self.status,
'completed': self.completed,
'errors': self.errors,
'warnings': self.warnings,
'create_time': self.create_time.isoformat() if self.create_time is not None else None,
......@@ -281,7 +332,7 @@ def task(func):
except Exception as e:
self.fail(e)
if self.__class__.tasks[-1] == self.current_task:
if self.__class__.tasks[-1] == self.current_task and not self.completed:
self._complete()
setattr(wrapper, '__task_name', func.__name__)
......@@ -305,7 +356,7 @@ def proc_task(task, cls_name, self_id, func_attr):
cls = next((cls for cls in all_cls if cls.__name__ == cls_name), None)
if cls is None:
logger.error('document not a subcass of Proc')
raise AsyncDocumentNotRegistered('document type %s not a subclass of Proc' % cls_name)
raise ProcNotRegistered('document %s not a subclass of Proc' % cls_name)
try:
self = cls.get(self_id)
......
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module comprises a set of persistent document classes that hold all user related
data. These are information about users, their uploads and datasets, the associated
calculations, and files
.. autoclass:: Calc
:members:
.. autoclass:: Upload
:members:
.. autoclass:: DataSet
.. autoclass:: User