Commit b8f6f076 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Allow base celery implementation to retry after segfault, detect retry and...

Allow base celery implementation to retry after segfault, detect retry and handle accordingly. Added segfault to chaos parser.
parent 0f5e4eb1
Pipeline #43734 passed with stages
in 21 minutes and 36 seconds
......@@ -42,7 +42,7 @@
path = dependencies/parsers/wien2k
url = https://gitlab.mpcdf.mpg.de/nomad-lab/parser-wien2k
branch = nomad-fair
[submodule "dependencies/parsers/parser-band"]
[submodule "dependencies/parsers/band"]
path = dependencies/parsers/band
url = https://gitlab.mpcdf.mpg.de/nomad-lab/parser-band
[submodule "dependencies/parsers/gaussian"]
......
......@@ -24,6 +24,8 @@ from ase.data import chemical_symbols
import numpy
import sys
import time
import os
import signal
from nomadcore.local_meta_info import loadJsonFile, InfoKindEl
import nomad_meta_info
......@@ -120,6 +122,7 @@ class ChaosParser(ArtificalParser):
- deadlock
- consume_ram
- exception
- segfault
- random
"""
name = 'parsers/chaos'
......@@ -139,7 +142,7 @@ class ChaosParser(ArtificalParser):
chaos = None
if chaos == 'random':
chaos = random.choice(['exit', 'deadlock', 'consume_ram', 'exception'])
chaos = random.choice(['exit', 'deadlock', 'consume_ram', 'exception', 'segfault'])
if chaos == 'exit':
sys.exit(1)
......@@ -147,9 +150,13 @@ class ChaosParser(ArtificalParser):
while True:
time.sleep(1)
elif chaos == 'consume_ram':
pass
data = []
while True:
data.append('a' * 10**6)
elif chaos == 'exception':
raise Exception('Some chaos happened, muhuha...')
elif chaos == 'segfault':
os.kill(os.getpid(), signal.SIGSEGV)
raise Exception('Unknown chaos')
......
......@@ -46,6 +46,7 @@ def setup(**kwargs):
app = Celery('nomad.processing', broker=config.celery.broker_url)
app.conf.update(worker_hijack_root_logger=False)
app.conf.update(task_reject_on_worker_lost=True)
CREATED = 'CREATED'
PENDING = 'PENDING'
......@@ -129,6 +130,8 @@ class Proc(Document, metaclass=ProcMetaclass):
current_process = StringField(default=None)
process_status = StringField(default=None)
_celery_task_id = StringField(default=None)
@property
def tasks_running(self) -> bool:
""" Returns True of the process has failed or succeeded. """
......@@ -415,7 +418,7 @@ all_proc_cls = {cls.__name__: cls for cls in all_subclasses(Proc)}
""" Name dictionary for all Proc classes. """
@app.task(bind=True, ignore_results=True, max_retries=3)
@app.task(bind=True, ignore_results=True, max_retries=3, acks_late=True)
def proc_task(task, cls_name, self_id, func_attr):
"""
The celery task that is used to execute async process functions.
......@@ -469,6 +472,15 @@ def proc_task(task, cls_name, self_id, func_attr):
self.save()
return
# check requeued task after catastrophic failure, e.g. segfault
if self._celery_task_id is not None:
if self._celery_task_id == task.request.id and task.request.retries == 0:
self.fail('task failed catastrophically, probably with sys.exit or segfault')
if self._celery_task_id != task.request.id:
self._celery_task_id = task.request.id
self.save()
# call the process function
deleted = False
try:
......
import multiprocessing
import segfault
import signal
import time
if __name__ == '__main__':
def this_dies():
def sig_handler(signum, frame):
print('segfault')
signal.signal(signal.SIGSEGV, sig_handler)
print('Hello World')
time.sleep(1)
segfault.segfault()
p = multiprocessing.Process(target=this_dies())
p.start()
try:
p.join()
except Exception as e:
pass
print('I am joined')
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment