Commit 1133d7fb authored by Lauri Himanen's avatar Lauri Himanen
Browse files

Made upload cleanup into a chained celery task.

parent 1e685a3c
......@@ -26,7 +26,6 @@ class Parser(metaclass=ABCMeta):
'''
def __init__(self):
self.name = None
self.domain = 'dft'
self._metainfo_env: Environment = None
......
......@@ -905,6 +905,22 @@ class Upload(Proc):
'exception while matching pot. mainfile',
mainfile=filepath, exc_info=e)
def processing_started(self):
"""Informs MongoDB that this Upload has started processing.
"""
# Tell Upload that a process has been started.
self.current_process = "process"
self.process_status = PROCESS_CALLED
self.save()
def processing_finished(self):
"""Informs MongoDB that this Upload has finished processing.
"""
# Tell Upload that a process has been started.
self.process_status = PROCESS_COMPLETED
self.save()
self.cleanup()
@task
def parse_all(self):
'''
......@@ -918,49 +934,15 @@ class Upload(Proc):
upload_size=self.upload_files.size):
# Tell Upload that a process has been started.
self.current_process = "process"
self.process_status = PROCESS_CALLED
self.save()
self.processing_started()
# Start running all pipelines
n_pipelines = run_pipelines(self.match_mainfiles())
n_pipelines = run_pipelines(self.match_mainfiles(), self.upload_id)
# If the upload has not spawned any pipelines, tell it that it is
# finished and perform cleanup
if n_pipelines == 0:
self.process_status = PROCESS_COMPLETED
self.save()
self.cleanup()
def check_join(self):
'''
Performs an evaluation of the join condition and triggers the :func:`cleanup`
task if necessary. The join condition allows to run the ``cleanup`` after
all calculations have been processed. The upload processing stops after all
calculation processings have been triggered (:func:`parse_all` or
:func:`re_process_upload`). The cleanup task is then run within the last
calculation process (the one that triggered the join by calling this method).
'''
total_calcs = self.total_calcs
processed_calcs = self.processed_calcs
logger = self.get_logger()
logger.warning("Checking join: {}/{}".format(processed_calcs, total_calcs))
logger.warning('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
# check if process is not running anymore, i.e. not still spawining new processes to join
# check the join condition, i.e. all calcs have been processed
if not self.process_running and processed_calcs >= total_calcs:
# this can easily be called multiple times, e.g. upload finished after all calcs finished
modified_upload = self._get_collection().find_one_and_update(
{'_id': self.upload_id, 'joined': {'$ne': True}},
{'$set': {'joined': True}})
if modified_upload is not None:
logger.debug('join')
self.cleanup()
else:
# the join was already done due to a prior call
pass
self.processing_finished()
def reset(self):
self.joined = False
......
......@@ -149,11 +149,6 @@ class Pipeline():
Args:
stage: The stage to be added to this pipeline.
"""
if len(self.stages) == 0:
if len(stage.dependencies) != 0:
raise ValueError(
"The first stage in a pipeline must not have any dependencies."
)
stage._pipeline = self
stage.index = len(self.stages)
self.stages.append(stage)
......@@ -192,35 +187,36 @@ def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
if deleted is None or not deleted:
calc.save()
# For last stage, inform upload that we are finished.
if i_stage == n_stages - 1:
# The save might be necessary to correctly read the join condition from the db
calc.save()
# Inform upload that we are finished
upload = calc.upload
upload.process_status = PROCESS_COMPLETED
upload.save()
upload.reload()
upload.check_join()
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=False, max_retries=3,
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout)
def empty_task(task, *args, **kwargs):
"""Empty dummy task.
"""Empty dummy task used as a callback for Celery chords.
"""
pass
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=False, max_retries=3,
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout)
def upload_cleanup(task, upload_id):
"""Task for cleanin up after processing.
"""
upload = nomad.processing.data.Upload.get(upload_id)
upload.processing_finished()
upload.get_logger().info("Cleanup for upload {} finished.".format(upload_id))
def comp_process(context, stage_name, i_stage, n_stages):
"""Function for processing computational entries: runs parsing and normalization.
"""
# Process calculation
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.process_calc()
calc.get_logger().info("Processing of calculation {} at path {} finished.".format(context.filepath, context.calc_id))
def get_pipeline(context):
......@@ -248,7 +244,7 @@ def get_pipeline(context):
return pipeline
def run_pipelines(context_generator) -> int:
def run_pipelines(context_generator, upload_id) -> int:
"""Used to start running pipelines based on the PipelineContext objects
generated by the given generator.
......@@ -274,7 +270,8 @@ def run_pipelines(context_generator) -> int:
stage_dependencies.append((stage.name, dependency))
stages[stage.name].append(stage)
# Start running first stage: it does not have any dependencies
# Start eagerly running first stage if it does not have any
# dependencies
if i_stage == 0:
# Create the associated Calc object
......@@ -293,6 +290,8 @@ def run_pipelines(context_generator) -> int:
dependency_graph.add_edges_from(stage_dependencies)
dependency_trees = nx.weakly_connected_components(dependency_graph)
upload = nomad.processing.data.Upload.get(upload_id)
# Form chains for each independent tree.
chains = []
for tree_nodes in dependency_trees:
......@@ -300,6 +299,7 @@ def run_pipelines(context_generator) -> int:
sorted_nodes = nx.topological_sort(tree)
groups = []
for node in reversed(list(sorted_nodes)):
upload.get_logger().warning(node)
# Group all tasks for a stage
tasks = stages[node]
......@@ -308,17 +308,21 @@ def run_pipelines(context_generator) -> int:
task_signatures.append(task.signature())
task_group = group(*task_signatures)
# Celery does not allow chaining groups. To simulate this
# behaviour, we instead wrap the groups inside chords which can be
# chained. The callback is a dummy function that does nothing.
# Celery does not allow directly chaining groups. To simulate
# this behaviour, we instead wrap the groups inside chords
# which can be chained. The callback is a dummy function that
# does nothing.
groups.append(chord(task_group, body=empty_task.si()))
# Form a chain of stages for this tree
stage_chain = chain(*groups)
chains.append(stage_chain)
# This is is the final group that will start executing all independent stage trees parallelly.
final_group = group(*chains)
final_group.delay()
# This is is the group that will start executing all independent stage trees parallelly.
tree_group = group(*chains)
# After all trees are finished, the upload should perform cleanup
final_chain = chain(tree_group, upload_cleanup.si(upload_id))
final_chain.delay()
return n_pipelines
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment