From b8f6f07685f4a857c5bcb83be8d236fb05de0150 Mon Sep 17 00:00:00 2001 From: Markus Scheidgen <markus.scheidgen@gmail.com> Date: Thu, 14 Feb 2019 21:51:52 +0100 Subject: [PATCH] Allow base celery implementation to retry after segfault, detect retry and handle accordingly. Added segfault to chaos parser. --- .gitmodules | 2 +- nomad/parsing/artificial.py | 11 +++++++++-- nomad/processing/base.py | 14 +++++++++++++- test.py | 26 ++++++++++++++++++++++++++ tests/data/proc/chaos_exception.zip | Bin 0 -> 182 bytes tests/data/proc/chaos_segfault.zip | Bin 0 -> 670 bytes 6 files changed, 49 insertions(+), 4 deletions(-) create mode 100644 test.py create mode 100644 tests/data/proc/chaos_exception.zip create mode 100644 tests/data/proc/chaos_segfault.zip diff --git a/.gitmodules b/.gitmodules index 21c0a2b50f..9b83914f45 100644 --- a/.gitmodules +++ b/.gitmodules @@ -42,7 +42,7 @@ path = dependencies/parsers/wien2k url = https://gitlab.mpcdf.mpg.de/nomad-lab/parser-wien2k branch = nomad-fair -[submodule "dependencies/parsers/parser-band"] +[submodule "dependencies/parsers/band"] path = dependencies/parsers/band url = https://gitlab.mpcdf.mpg.de/nomad-lab/parser-band [submodule "dependencies/parsers/gaussian"] diff --git a/nomad/parsing/artificial.py b/nomad/parsing/artificial.py index a721f4d1c7..836debe1af 100644 --- a/nomad/parsing/artificial.py +++ b/nomad/parsing/artificial.py @@ -24,6 +24,8 @@ from ase.data import chemical_symbols import numpy import sys import time +import os +import signal from nomadcore.local_meta_info import loadJsonFile, InfoKindEl import nomad_meta_info @@ -120,6 +122,7 @@ class ChaosParser(ArtificalParser): - deadlock - consume_ram - exception + - segfault - random """ name = 'parsers/chaos' @@ -139,7 +142,7 @@ class ChaosParser(ArtificalParser): chaos = None if chaos == 'random': - chaos = random.choice(['exit', 'deadlock', 'consume_ram', 'exception']) + chaos = random.choice(['exit', 'deadlock', 'consume_ram', 'exception', 'segfault']) if chaos == 'exit': sys.exit(1) @@ -147,9 +150,13 @@ class ChaosParser(ArtificalParser): while True: time.sleep(1) elif chaos == 'consume_ram': - pass + data = [] + while True: + data.append('a' * 10**6) elif chaos == 'exception': raise Exception('Some chaos happened, muhuha...') + elif chaos == 'segfault': + os.kill(os.getpid(), signal.SIGSEGV) raise Exception('Unknown chaos') diff --git a/nomad/processing/base.py b/nomad/processing/base.py index 9d24d9fb0f..3266ad04c7 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -46,6 +46,7 @@ def setup(**kwargs): app = Celery('nomad.processing', broker=config.celery.broker_url) app.conf.update(worker_hijack_root_logger=False) +app.conf.update(task_reject_on_worker_lost=True) CREATED = 'CREATED' PENDING = 'PENDING' @@ -129,6 +130,8 @@ class Proc(Document, metaclass=ProcMetaclass): current_process = StringField(default=None) process_status = StringField(default=None) + _celery_task_id = StringField(default=None) + @property def tasks_running(self) -> bool: """ Returns True of the process has failed or succeeded. """ @@ -415,7 +418,7 @@ all_proc_cls = {cls.__name__: cls for cls in all_subclasses(Proc)} """ Name dictionary for all Proc classes. """ -@app.task(bind=True, ignore_results=True, max_retries=3) +@app.task(bind=True, ignore_results=True, max_retries=3, acks_late=True) def proc_task(task, cls_name, self_id, func_attr): """ The celery task that is used to execute async process functions. @@ -469,6 +472,15 @@ def proc_task(task, cls_name, self_id, func_attr): self.save() return + # check requeued task after catastrophic failure, e.g. segfault + if self._celery_task_id is not None: + if self._celery_task_id == task.request.id and task.request.retries == 0: + self.fail('task failed catastrophically, probably with sys.exit or segfault') + + if self._celery_task_id != task.request.id: + self._celery_task_id = task.request.id + self.save() + # call the process function deleted = False try: diff --git a/test.py b/test.py new file mode 100644 index 0000000000..f843068598 --- /dev/null +++ b/test.py @@ -0,0 +1,26 @@ + +import multiprocessing +import segfault +import signal +import time + +if __name__ == '__main__': + + def this_dies(): + def sig_handler(signum, frame): + print('segfault') + + signal.signal(signal.SIGSEGV, sig_handler) + + print('Hello World') + time.sleep(1) + segfault.segfault() + + p = multiprocessing.Process(target=this_dies()) + p.start() + try: + p.join() + except Exception as e: + pass + + print('I am joined') diff --git a/tests/data/proc/chaos_exception.zip b/tests/data/proc/chaos_exception.zip new file mode 100644 index 0000000000000000000000000000000000000000..10b7e2cebac4e366115cdaf1c7ec514e16e84784 GIT binary patch literal 182 zcmWIWW@h1H0D;+MPJW8sTP=7P7#Kj9i$R7VIU_N@STCzMKQA<dlYx2bg3y?43qoT` zE4UdLS-vtdFtCU)Ffb^kRwSnulw{`TDRBjOGcw6B<1$eKWDx@+0|Ud7Mi3L(d{&70 XXl4g^v$BCyGBPkS1TZjwF#`htU8*7I literal 0 HcmV?d00001 diff --git a/tests/data/proc/chaos_segfault.zip b/tests/data/proc/chaos_segfault.zip new file mode 100644 index 0000000000000000000000000000000000000000..fbc10614d6366106e498b485e105bbabafdefede GIT binary patch literal 670 zcmWIWW@Zs#;9%fjSiRKGj{ymAF$gduXC&qq>tz+^=Y>Y_FoYaUjR`uM8uOJ=gdteV z&)45u=d5PnBPNCbZ+4D-mJZUq3=9n13=9k)L%G04twS@ElR<zXKHk^S**`c!A7p04 z(bO0a2AK)chTScq$lCPcaXTwHK{Po%At~X5uTR(q{;-aK2BrxT%<2Ntj32`l*gi>K zmF!^ZQ&e;m5Q{i2sNm-oe27I<JRxC%*SYgoy>&EBcwRm2siUW<>*wp~spA>S$I-TG zFVhi+Z5AG}UV(LvL3OQhNuHj8X+fEuS%v|V-ezR+cs%^>`YTFdSJJsx2Kr}%f5|Ps zR{mqY-Q&t@%Rm3|tlw|kIJ+sVW$vf1mgSFlikH9IqN&uo*3s#nQ?HG8X-o0Xv)k=o zyxbMCDX#nfTj}1{YOm){adFq*TdLjluw#yoAUJU2{uWkWW?*1YX9NW<Ba<jIBAk!| zo{ND89PA7X3=T^gK}_VJ;(*6sfH%ktTv3ED<F8{V0|Rn|aWH@)3xpv?Ae@2hL{SuH e^dK9dhZ1?nE@EW^xtE!Nnc*4(1H&dp5Dx&L1HFC# literal 0 HcmV?d00001 -- GitLab