Commit 1070d8d6 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Fixed multiple cleanup calls.

parent 45cbbaf8
......@@ -275,7 +275,7 @@ class Proc(Document, metaclass=ProcMetaclass):
# task is repeated, probably the celery task of the process was reschedule
# due to prior worker failure
self.current_task = task
self.get_logger().warning('task is re-run')
self.get_logger().error('task is re-run')
self.save()
return True
else:
......
......@@ -424,6 +424,8 @@ class Upload(Proc):
publish_time = DateTimeField()
last_update = DateTimeField()
joined = BooleanField(default=False)
meta: Any = {
'indexes': [
'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
......@@ -802,12 +804,23 @@ class Upload(Proc):
processed_calcs = self.processed_calcs
self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
# check if process is not running anymore, i.e. not still spawining new processes to join
# check the join condition, i.e. all calcs have been processed
if not self.process_running and processed_calcs >= total_calcs:
self.get_logger().debug('join')
self.join()
# this can easily be called multiple times, e.g. upload finished after all calcs finished
modified_upload = self._get_collection().update(
dict(_id=self.upload_id, joined=False),
{'$set': {'joined': True}}, upsert=False)
if modified_upload is not None:
self.get_logger().debug('join')
self.cleanup()
else:
# the join was already done due to a prior call
pass
def join(self):
self.cleanup()
def reset(self):
self.joined = False
super().reset()
@property
def gui_url(self):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment