diff --git a/.gitmodules b/.gitmodules index 21c0a2b50fbc1d5781e05ef15cc6b334ed8513fa..9b83914f45b7e72deeac1df12158375dc2c6a093 100644 --- a/.gitmodules +++ b/.gitmodules @@ -42,7 +42,7 @@ path = dependencies/parsers/wien2k url = https://gitlab.mpcdf.mpg.de/nomad-lab/parser-wien2k branch = nomad-fair -[submodule "dependencies/parsers/parser-band"] +[submodule "dependencies/parsers/band"] path = dependencies/parsers/band url = https://gitlab.mpcdf.mpg.de/nomad-lab/parser-band [submodule "dependencies/parsers/gaussian"] diff --git a/nomad/parsing/artificial.py b/nomad/parsing/artificial.py index a721f4d1c7bb44129d15daf14bb7c927e9ff990c..836debe1af57f1cffcdae0c13b870e8dc6b3314a 100644 --- a/nomad/parsing/artificial.py +++ b/nomad/parsing/artificial.py @@ -24,6 +24,8 @@ from ase.data import chemical_symbols import numpy import sys import time +import os +import signal from nomadcore.local_meta_info import loadJsonFile, InfoKindEl import nomad_meta_info @@ -120,6 +122,7 @@ class ChaosParser(ArtificalParser): - deadlock - consume_ram - exception + - segfault - random """ name = 'parsers/chaos' @@ -139,7 +142,7 @@ class ChaosParser(ArtificalParser): chaos = None if chaos == 'random': - chaos = random.choice(['exit', 'deadlock', 'consume_ram', 'exception']) + chaos = random.choice(['exit', 'deadlock', 'consume_ram', 'exception', 'segfault']) if chaos == 'exit': sys.exit(1) @@ -147,9 +150,13 @@ class ChaosParser(ArtificalParser): while True: time.sleep(1) elif chaos == 'consume_ram': - pass + data = [] + while True: + data.append('a' * 10**6) elif chaos == 'exception': raise Exception('Some chaos happened, muhuha...') + elif chaos == 'segfault': + os.kill(os.getpid(), signal.SIGSEGV) raise Exception('Unknown chaos') diff --git a/nomad/processing/base.py b/nomad/processing/base.py index 9d24d9fb0f216e22afd78d61b7c75ac5d56bc992..3266ad04c7121966c52930672625d601c28271e9 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -46,6 +46,7 @@ def setup(**kwargs): app = Celery('nomad.processing', broker=config.celery.broker_url) app.conf.update(worker_hijack_root_logger=False) +app.conf.update(task_reject_on_worker_lost=True) CREATED = 'CREATED' PENDING = 'PENDING' @@ -129,6 +130,8 @@ class Proc(Document, metaclass=ProcMetaclass): current_process = StringField(default=None) process_status = StringField(default=None) + _celery_task_id = StringField(default=None) + @property def tasks_running(self) -> bool: """ Returns True of the process has failed or succeeded. """ @@ -415,7 +418,7 @@ all_proc_cls = {cls.__name__: cls for cls in all_subclasses(Proc)} """ Name dictionary for all Proc classes. """ -@app.task(bind=True, ignore_results=True, max_retries=3) +@app.task(bind=True, ignore_results=True, max_retries=3, acks_late=True) def proc_task(task, cls_name, self_id, func_attr): """ The celery task that is used to execute async process functions. @@ -469,6 +472,15 @@ def proc_task(task, cls_name, self_id, func_attr): self.save() return + # check requeued task after catastrophic failure, e.g. segfault + if self._celery_task_id is not None: + if self._celery_task_id == task.request.id and task.request.retries == 0: + self.fail('task failed catastrophically, probably with sys.exit or segfault') + + if self._celery_task_id != task.request.id: + self._celery_task_id = task.request.id + self.save() + # call the process function deleted = False try: diff --git a/test.py b/test.py new file mode 100644 index 0000000000000000000000000000000000000000..f843068598b2efaf03ca0d3fc6eb45f364d8cdf2 --- /dev/null +++ b/test.py @@ -0,0 +1,26 @@ + +import multiprocessing +import segfault +import signal +import time + +if __name__ == '__main__': + + def this_dies(): + def sig_handler(signum, frame): + print('segfault') + + signal.signal(signal.SIGSEGV, sig_handler) + + print('Hello World') + time.sleep(1) + segfault.segfault() + + p = multiprocessing.Process(target=this_dies()) + p.start() + try: + p.join() + except Exception as e: + pass + + print('I am joined') diff --git a/tests/data/proc/chaos_exception.zip b/tests/data/proc/chaos_exception.zip new file mode 100644 index 0000000000000000000000000000000000000000..10b7e2cebac4e366115cdaf1c7ec514e16e84784 Binary files /dev/null and b/tests/data/proc/chaos_exception.zip differ diff --git a/tests/data/proc/chaos_segfault.zip b/tests/data/proc/chaos_segfault.zip new file mode 100644 index 0000000000000000000000000000000000000000..fbc10614d6366106e498b485e105bbabafdefede Binary files /dev/null and b/tests/data/proc/chaos_segfault.zip differ