Skip to content
Snippets Groups Projects
Commit 7a66cb5b authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored processing and made it typesafe.

parent 9465b207
No related branches found
No related tags found
No related merge requests found
.pyenv/
__pycache__
.mypy_cache
*.pyc
*.egg-info/
data
......
......@@ -13,5 +13,11 @@
"--enable=F,E,unreachable,duplicate-key,unnecessary-semicolon,global-variable-not-assigned,unused-variable,binary-op-exception,bad-format-string,anomalous-backslash-in-string,bad-open-mode,unused-import"
],
"python.linting.pep8Enabled": true,
"python.linting.pep8Args": ["--ignore=E501"]
"python.linting.pep8Args": ["--ignore=E501,E701"],
"python.linting.mypyEnabled": true,
"python.linting.mypyArgs": [
"--ignore-missing-imports",
"--follow-imports=silent",
"--no-strict-optional"
]
}
\ No newline at end of file
......@@ -13,8 +13,8 @@
# limitations under the License.
"""
This modules allows to (1) run a celery worker that can perform all processing
task, (2) allows to start, manage, read status of processing runs
This modules allows to (1) run a celery worker that can perform all upload processing
tasks, (2) allows to start, manage, get status of upload processings
(i.e. celery canvas, i.e. workflow of tasks), (3) contains the implementation of said
celery tasks.
......@@ -24,17 +24,19 @@ rabbitmq, redis combination. Rabbitmq allows for a very scalable distribution of
clouds and clusters, while redis provides a more centralized, reliable temporary storage
for task stati and results.
.. autoclass:: nomad.processing.ProcessRun
"""
The class :class:`UploadProcessing` allows to start, manage, and read the status of
a upload processing in a serializable form.
from celery import Celery, chord, group, chain, subtask
from celery.result import result_from_tuple
.. autoclass:: nomad.processing.UploadProcessing
"""
from typing import List, Any, Tuple
from celery import Celery, Task, chord, group
from celery.result import ResultBase, result_from_tuple
from celery.signals import after_setup_task_logger, after_setup_logger
from celery.utils.log import get_task_logger
from celery.canvas import Signature
import logging
import logstash
import time
import sys
import json
import nomad.config as config
......@@ -70,128 +72,18 @@ app.conf.update(
)
@app.task(bind=True)
def open_upload(task, state):
if not state.continue_with(task):
return state
try:
upload = files.upload(state.upload_id)
upload.open()
except files.UploadError as e:
logger.debug('Could not open upload %s: %s' % (state.upload_id, e))
return state.fail(e)
try:
state.parse_specs = list()
for filename in upload.filelist:
for parser in parsers:
if parser.is_mainfile(upload, filename):
parse_spec = (parser.name, upload.get_path(filename))
state.parse_specs.append(parse_spec)
except files.UploadError as e:
logger.warning('Could find parse specs in open upload %s: %s' % (state.upload_id, e))
return state.fail(e)
return state
@app.task(bind=True)
def close_upload(task, parse_results, state):
if not state.continue_with(task):
return state
try:
upload = files.upload(state.upload_id)
except KeyError as e:
logger.warning('No upload %s' % state.upload_id)
return state.fail(e)
upload.close()
return state
@app.task(bind=True)
def distributed_parse(task, state, close_upload):
if not state.continue_with(task):
chord([])(close_upload.clone(args=(state,)))
parses = group(parse.s(parse_spec) for parse_spec in state.parse_specs)
chord(parses)(close_upload.clone(args=(state,)))
@app.task()
def parse(parse_spec):
parser, mainfile = parse_spec
logger.debug('Start %s for %s.' % parse_spec)
try:
parser_dict[parser].run(mainfile)
except ValueError as e:
logger.warning('%s stopped on %s/%s: %s' % (parse_spec + (e,)))
return e
except Exception as e:
logger.warning('%s stopped on %s/%s: %s' % (parse_spec + (e,)), exc_info=e)
return e
return True # TODO some other value?
class ProcessState():
"""
JSON serializable state of a pending, running, or completed :class:`ProcessRun`.
Instances are used to pass data from task to task within a process workflow.
Instances are also used to represent state to clients via :func:`ProcessRun.status`.
Attributes:
upload_id: The *upload_id* of the :class:`ProcessRun`.
parse_specs: A list of already identified parse_specs, or None.
parse_results: A list of completed (failed or successful) parse results.
current_task: The name of the current task of the process run.
"""
def __init__(self, upload_id):
self.upload_id = upload_id
self.parse_specs = None
self.parse_results = None
self.status = 'PENDING'
self.task_name = None
self.task_id = None
self.cause = None
def fail(self, e):
self.cause = e
self.status = 'FAILURE'
return self
def continue_with(self, task):
assert self.status != 'SUCCESS', 'Cannot continue on completed workflow.'
if self.status == 'FAILURE':
return False
else:
self.status = 'STARTED'
self.task_name = task.name
self.task_id = task.request.id
return True
def to_json(self):
return json.dumps(self, indent=4)
class ProcessRun():
class UploadProcessing():
"""
Represents the processing of an uploaded file. It allows to start and manage a
processing run, retrieve status information and results.
processing run, acts itself as an client accesible and serializable state of the
processing.
It is serializable (JSON, pickle). Iternaly stores
:class:`~celery.results.AsyncResults` instance in serialized *tuple* form.
We use the serialized form to allow serialization (i.e. storage). Keep in mind
that the sheer `task_id` is not enough, because it does not contain
the parent tasks. See `third comment <https://github.com/celery/celery/issues/1328>`_
for details.
:class:`~celery.results.AsyncResults` instance in serialized *tuple* form to
keep connected to the results backend.
Instances of this class represent the state of a processing and are handed
from task to task through the processing workflow.
Warning:
You have to call :func:`forget` eventually to free all resources and the celery
......@@ -203,83 +95,207 @@ class ProcessRun():
Arguments:
upload_id: The id of the uploaded file in the object storage,
see also :mod:`nomad.files`.
Attributes:
parse_specs: List of (parser_name, mainfile) tuples.
parse_results: Result of the parsers, currently bool indicating success.
status: Aggregated celery status for the whole process.
task_name: Name of the currently running task.
task_id: Id of the currently running task.
cause: *None* or *Exception* that caused failure.
result_tuple: Serialized form of the celery async_results tree for the processing.
"""
def __init__(self, upload_id):
self._start_state = ProcessState(upload_id)
self.result_tuple = None
def __init__(self, upload_id: str) -> None:
self.upload_id = upload_id
self.parse_specs: List[Tuple[str, str]] = None
self.parse_results: List[bool] = None
self.status: str = 'PENDING'
self.task_name: str = None
self.task_id: str = None
self.cause: Exception = None
self.result_tuple: Any = None
def start(self):
""" Initiates the processing tasks via celery canvas. """
assert not self.is_started, 'Cannot start a started or used run.'
assert not self._is_started, 'Cannot start a started or used processing.'
finalize = close_upload.s()
# Keep the results of the last task is the workflow.
# The last task is started by another task, therefore it
# is not the end of the main task chain.
finalize = close_upload.s()
finalize_result = finalize.freeze()
main_chain = open_upload.s(self._start_state) | distributed_parse.s(finalize)
# start the main chain
main_chain = open_upload.s(self) | distributed_parse.s(finalize)
main_chain_result = main_chain.delay()
# Create a singular result tree. This might not be the right way to do it.
finalize_result.parent = main_chain_result
# Keep the result as tuple to keep self object pickable
# Keep the result as tuple that also includes all parents, i.e. the whole
# serializable tree
self.result_tuple = finalize_result.as_tuple()
@property
def async_result(self):
""" The celery async_result in its regular usable, but not serializable form. """
def _async_result(self) -> ResultBase:
"""
The celery async_result in its regular usable, but not serializable form.
We use the tuple form to allow serialization (i.e. storage). Keep in mind
that the sheer `task_id` is not enough, because it does not contain
the parent tasks, i.e. result tree.
See `third comment <https://github.com/celery/celery/issues/1328>`_
for details.
"""
return result_from_tuple(self.result_tuple)
@property
def is_started(self):
def _is_started(self) -> bool:
""" True, if the task is started. """
return self.result_tuple is not None
def status(self):
"""
Extract the current state from the various tasks involved in upload processing.
def _update(self, other: 'UploadProcessing') -> 'UploadProcessing':
""" Updates all attributes from another instance. Returns itself. """
self.parse_specs = other.parse_specs
self.parse_results = other.parse_results
Returns: JSON-style python object with various task information.
"""
self.status = other.status
self.task_name = other.task_name
self.task_id = other.task_id
self.cause = other.cause
return self
assert self.is_started, 'Run is not yet started.'
def updated(self) -> 'UploadProcessing':
""" Consults the result backend and updates itself with the available results. """
assert self._is_started, 'Run is not yet started.'
async_result = self.async_result
async_result = self._async_result
while async_result is not None:
if async_result.ready():
async_result.result.status = async_result.status
return async_result.result
return self._update(async_result.result)
else:
async_result = async_result.parent
return self._start_state
return self
def forget(self):
def forget(self) -> None:
""" Forget the results of a completed run; free all resources in the results backend. """
assert self.ready(), 'Run is not completed.'
async_result = self.async_result
async_result = self._async_result
while async_result is not None:
async_result.forget()
async_result = async_result.parent
def ready(self):
def ready(self) -> bool:
""" Returns: True if the task has been executed. """
assert self.is_started, 'Run is not yet started.'
assert self._is_started, 'Run is not yet started.'
return self.async_result.ready()
return self._async_result.ready()
def get(self, *args, **kwargs):
def get(self, *args, **kwargs) -> 'UploadProcessing':
"""
Blocks until the processing has finished. Forwards args, kwargs to
*celery.result.get* for timeouts, etc.
Returns: The task result as :func:`status`.
Returns: An upadted instance of itself with all the results.
"""
assert self.is_started, 'Run is not yet started.'
assert self._is_started, 'Run is not yet started.'
self._async_result.get(*args, **kwargs)
return self.updated()
def fail(self, e: Exception) -> 'UploadProcessing':
""" Allows tasks to mark this processing as failed. All following task will do nothing. """
self.cause = e
self.status = 'FAILURE'
return self
def continue_with(self, task: Task) -> bool:
""" Upadtes itself with information about the new current task. """
assert self.status != 'SUCCESS', 'Cannot continue on completed workflow.'
if self.status == 'FAILURE':
return False
else:
self.status = 'STARTED'
self.task_name = task.name
self.task_id = task.request.id
return True
def to_json(self) -> str:
""" Creates a representative JSON record as str. """
return json.dumps(self, indent=4)
@app.task(bind=True)
def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing:
if not processing.continue_with(task):
return processing
try:
upload = files.upload(processing.upload_id)
upload.open()
except files.UploadError as e:
logger.debug('Could not open upload %s: %s' % (processing.upload_id, e))
return processing.fail(e)
try:
processing.parse_specs = list()
for filename in upload.filelist:
for parser in parsers:
if parser.is_mainfile(upload, filename):
parse_spec = (parser.name, upload.get_path(filename))
processing.parse_specs.append(parse_spec)
except files.UploadError as e:
logger.warning('Could find parse specs in open upload %s: %s' % (processing.upload_id, e))
return processing.fail(e)
return processing
@app.task(bind=True)
def close_upload(task, parse_results: List[Any], processing: UploadProcessing) -> UploadProcessing:
if not processing.continue_with(task):
return processing
try:
upload = files.upload(processing.upload_id)
except KeyError as e:
logger.warning('No upload %s' % processing.upload_id)
return processing.fail(e)
upload.close()
return processing
self.async_result.get(*args, **kwargs)
return self.status()
@app.task(bind=True)
def distributed_parse(
task: Task, processing: UploadProcessing, close_upload: Signature) -> Signature:
if not processing.continue_with(task):
chord([])(close_upload.clone(args=(processing,)))
parses = group(parse.s(parse_spec) for parse_spec in processing.parse_specs)
chord(parses)(close_upload.clone(args=(processing,)))
@app.task()
def parse(parse_spec: Tuple[str, str]) -> Any:
parser, mainfile = parse_spec
logger.debug('Start %s for %s.' % parse_spec)
try:
parser_dict[parser].run(mainfile)
except ValueError as e:
logger.warning('%s stopped on %s: %s' % (parser, mainfile, e))
return e
except Exception as e:
logger.warning('%s stopped on %s: %s' % (parser, mainfile, e), exc_info=e)
return e
return True # TODO some other value?
watchdog
sphinx
recommonmark
gitpython
\ No newline at end of file
gitpython
mypy
pylint
pycodestyle
\ No newline at end of file
......@@ -6,7 +6,7 @@ import os
import nomad.files as files
import nomad.config as config
from nomad.processing import ProcessRun
from nomad.processing import UploadProcessing
test_upload_id = '__test_upload_id'
......@@ -27,14 +27,14 @@ class ProcessingTests(TestCase):
pass
def test_processing(self):
run = ProcessRun(test_upload_id)
run = UploadProcessing(test_upload_id)
run.start()
state = run.get(timeout=30)
run.get(timeout=30)
self.assertTrue(run.ready())
run.forget()
self.assertEqual('nomad.processing.close_upload', state.task_name)
self.assertEqual('SUCCESS', state.status)
self.assertEqual('nomad.processing.close_upload', run.task_name)
self.assertEqual('SUCCESS', run.status)
if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment