base.py 23.5 KB
Newer Older
Markus Scheidgen's avatar
Markus Scheidgen committed
1
2
3
4
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
5
6
7
8
9
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
Markus Scheidgen's avatar
Markus Scheidgen committed
10
#     http://www.apache.org/licenses/LICENSE-2.0
11
12
#
# Unless required by applicable law or agreed to in writing, software
Markus Scheidgen's avatar
Markus Scheidgen committed
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
15
16
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markus Scheidgen's avatar
Markus Scheidgen committed
17
#
18

19
from typing import List, Any, Dict
20
import logging
21
import time
22
import os
23
24
from celery import Celery, Task
from celery.worker.request import Request
25
from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init, \
26
    celeryd_after_setup, worker_process_shutdown
27
from celery.utils import worker_direct
28
from celery.exceptions import SoftTimeLimitExceeded
29
30
from billiard.exceptions import WorkerLostError
from mongoengine import Document, StringField, ListField, DateTimeField, ValidationError
31
from mongoengine.connection import ConnectionFailure
32
from datetime import datetime
Markus Scheidgen's avatar
Markus Scheidgen committed
33
import functools
34

35
from nomad import config, utils, infrastructure
36
37
import nomad.patch  # pylint: disable=unused-import

38

39
if config.logstash.enabled:
40
    from nomad.utils import structlogging
Markus Scheidgen's avatar
Markus Scheidgen committed
41

42
    def initialize_logstash(logger=None, loglevel=logging.DEBUG, **kwargs):
43
        structlogging.add_logstash_handler(logger)
44
45
46
47
48
        return logger

    after_setup_task_logger.connect(initialize_logstash)
    after_setup_logger.connect(initialize_logstash)

49
50
51

@worker_process_init.connect
def setup(**kwargs):
52
53
54
55
56
57
58
    # each subprocess is supposed disconnect connect again: https://jira.mongodb.org/browse/PYTHON-2090
    try:
        from mongoengine import disconnect
        disconnect()
    except Exception:
        pass

59
    infrastructure.setup()
Markus Scheidgen's avatar
Markus Scheidgen committed
60
61
    utils.get_logger(__name__).info(
        'celery configured with acks_late=%s' % str(config.celery.acks_late))
62

63

64
65
66
67
68
69
70
71
72
worker_hostname = None


@celeryd_after_setup.connect
def capture_worker_name(sender, instance, **kwargs):
    global worker_hostname
    worker_hostname = sender


73
74
75
76
77
78
79
@worker_process_shutdown.connect
def on_worker_process_shutdown(*args, **kwargs):
    # We need to make sure not to leave open sessions: https://jira.mongodb.org/browse/PYTHON-2090
    from mongoengine.connection import disconnect
    disconnect()


80
app = Celery('nomad.processing', broker=config.rabbitmq_url())
Markus Scheidgen's avatar
Markus Scheidgen committed
81
app.conf.update(worker_hijack_root_logger=False)
82
app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
83
84
if config.celery.routing == config.CELERY_WORKER_ROUTING:
    app.conf.update(worker_direct=True)
85

86
app.conf.task_queue_max_priority = 10
87

88

89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class ProcessStatus:
    '''
    Class holding constants related to the possible process statuses.

    Attributes:
        READY: The process is ready to start
        PENDING: The process has been called, but still waiting for a celery worker to start running.
        RUNNING: Currently running the main process function.
        WAITING_FOR_RESULT: Waiting for the result from some other process.
        SUCCESS: The last process completed successfully.
        FAILURE: The last process completed with a fatal failure.
        DELETED: Used to signal that the process results in the deletion of the object.

        STATUSES_PROCESSING: List of statuses where the process is still incomplete (no other
            process can be started).
        STATUSES_NOT_PROCESSING: The opposite of the above - statuses from which a new
            process can be started.
    '''
    READY = 'READY'
    PENDING = 'PENDING'
    RUNNING = 'RUNNING'
    WAITING_FOR_RESULT = 'WAITING_FOR_RESULT'
    SUCCESS = 'SUCCESS'
    FAILURE = 'FAILURE'
    DELETED = 'DELETED'

115
116
    STATUSES_PROCESSING = (PENDING, RUNNING, WAITING_FOR_RESULT)
    STATUSES_NOT_PROCESSING = (READY, SUCCESS, FAILURE)
117
    STATUSES_VALID_IN_DB = tuple(list(STATUSES_NOT_PROCESSING) + list(STATUSES_PROCESSING))
118

119

120
121
122
class InvalidId(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
123
class ProcNotRegistered(Exception): pass
124
125


126
127
128
class ProcessAlreadyRunning(Exception): pass


129
130
131
class ProcObjectDoesNotExist(Exception): pass


132
class ProcessFailure(Exception):
133
    '''
134
135
136
137
138
139
140
    Special exception class which allows the user to control how :func:`Proc.fail` should
    be called when the exception is caught from the process function.
    '''
    def __init__(self, *errors, log_level=logging.ERROR, **kwargs):
        self._errors = errors
        self._log_level = log_level
        self._kwargs = kwargs
141
142


143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class Proc(Document):
    '''
    Base class for objects that are subject to processing and need persistent processing
    state. The processing state is persisted in mongo db. Possible processing statuses are
    defined by :class:`ProcessStatus`.

    To initiate a process, an object subclassing Proc must first be created using
    :func:`create`. Processes are then initiated by calling a *process function* on this
    object, which is a member function marked with the decorator @process. Calling a process
    function sets the process_state to PENDING and a celery task is created, which will be
    picked up by a worker, which sets the state to RUNNING and actually executes the
    process function.

    From process_status RUNNING, the process can transition to either SUCCESS, FAILURE or
    WAITING_FOR_RESULT, or result in the deletion of the process object itself (for example
    the process for deleting an upload).

    WAITING_FOR_RESULT means the process needs to wait for the result from other processes.
    To send the process to status WAITING_FOR_RESULT, the process function must return
    normally, without errors and exceptions, and return the value `ProcessStatus.WAITING_FOR_RESULT`.
    The process should then be made to transition to either SUCCESS or FAILURE, by invoking
    either :func:`succeed` or :func:`fail`, when appropriate.

    If the process deletes the object itself, the process function should instead return
    `ProcessStatus.DELETED`. If the process function returns normally and without a return
    value, the process status will be set to SUCCESS.
169

170
171
172
173
174
    Attributes:
        errors: a list of errors that happened during processing. Error fail a processing
            run
        warnings: a list of warnings that happened during processing. Warnings do not
            fail a processing run
175
176
177
178
        last_status_message: A short, human readable message from the current process, with
            information about what the current process is doing, or information about the
            completion (successful or not) of the last process, if no process is currently
            running.
179
180
        complete_time: the time that processing completed (successfully or not)
        current_process: the currently or last run asyncronous process
181
        process_status: one of the values defined by :class:`ProcessStatus`.
182
    '''
183

Markus Scheidgen's avatar
Markus Scheidgen committed
184
    meta: Any = {
185
186
187
        'abstract': True,
    }

188
    complete_time = DateTimeField()
189
190
191

    errors = ListField(StringField())
    warnings = ListField(StringField())
192
    last_status_message = StringField(default=None)
193

194
195
    current_process = StringField(default=None)
    process_status = StringField(default=None)
196

197
198
    worker_hostname = StringField(default=None)
    celery_task_id = StringField(default=None)
199

200
201
    @property
    def process_running(self) -> bool:
202
203
        ''' Returns True of an asynchrounous process is currently running (or waiting to run). '''
        return self.process_status in ProcessStatus.STATUSES_PROCESSING
204

205
206
    @classmethod
    def process_running_mongoengine_query(cls):
207
        ''' Returns a mongoengine query dict (to be used in objects) to find running processes. '''
208
        return dict(process_status__in=ProcessStatus.STATUSES_PROCESSING)
209

Markus Scheidgen's avatar
Markus Scheidgen committed
210
211
    def get_logger(self):
        return utils.get_logger(
212
            'nomad.processing', proc=self.__class__.__name__,
213
            process=self.current_process, process_status=self.process_status)
Markus Scheidgen's avatar
Markus Scheidgen committed
214

215
216
    @classmethod
    def create(cls, **kwargs):
217
        ''' Factory method that must be used instead of regular constructor. '''
218
        assert 'process_status' not in kwargs, \
219
            ''' do not set the status manually, its managed '''
220

221
        self = cls(**kwargs)
222
        self.process_status = ProcessStatus.READY
223
        self.save()
224

225
        return self
226

227
228
    def reset(
            self, worker_hostname: str = None, force: bool = False,
229
230
            process_status: str = ProcessStatus.READY):
        ''' Resets the process status. If force is not set, there must be no currently running process. '''
231
        assert not self.process_running or force
232

233
234
        self.current_process = None
        self.process_status = process_status
235
236
        self.errors = []
        self.warnings = []
237
        self.worker_hostname = worker_hostname
238

239
240
    @classmethod
    def reset_pymongo_update(cls, worker_hostname: str = None):
241
        ''' Returns a pymongo update dict part to reset calculations. '''
242
        return dict(
243
            current_process=None, process_status=ProcessStatus.READY,
244
            errors=[], warnings=[], worker_hostname=worker_hostname)
245

246
    @classmethod
Markus Scheidgen's avatar
Markus Scheidgen committed
247
    def get_by_id(cls, id: str, id_field: str):
248
        try:
Markus Scheidgen's avatar
Markus Scheidgen committed
249
            obj = cls.objects(**{id_field: id}).first()
250
        except ValidationError as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
251
            raise InvalidId('%s is not a valid id' % id)
252
        except ConnectionFailure as e:
253
            raise e
254

255
        if obj is None:
Markus Scheidgen's avatar
Markus Scheidgen committed
256
            raise KeyError('%s with id %s does not exist' % (cls.__name__, id))
257
258
259

        return obj

Markus Scheidgen's avatar
Markus Scheidgen committed
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
    @classmethod
    def get(cls, obj_id):
        return cls.get_by_id(str(obj_id), 'id')

    @staticmethod
    def log(logger, log_level, msg, **kwargs):
        # TODO there seems to be a bug in structlog, cannot use logger.log
        if log_level == logging.ERROR:
            logger.error(msg, **kwargs)
        elif log_level == logging.WARNING:
            logger.warning(msg, **kwargs)
        elif log_level == logging.INFO:
            logger.info(msg, **kwargs)
        elif log_level == logging.DEBUG:
            logger.debug(msg, **kwargs)
        else:
            logger.critical(msg, **kwargs)

278
279
280
281
282
283
284
285
286
    def succeed(self):
        ''' Call this to transition a process from WAITING_FOR_RESULT to SUCCESS. '''
        assert self.process_status == ProcessStatus.WAITING_FOR_RESULT, f'Wrong status {self.process_status}.'
        self.process_status = ProcessStatus.SUCCESS
        self.on_success()
        self.complete_time = datetime.utcnow()
        self.save()
        self.get_logger().info('completed process')

Markus Scheidgen's avatar
Markus Scheidgen committed
287
    def fail(self, *errors, log_level=logging.ERROR, **kwargs):
288
289
        '''
        Allows to fail the process. Takes strings or exceptions as args. The method
290
291
        logs the error(s), updates `self.errors`, `self.last_status_message`,
        `self.process_status`, calls :func:`on_fail`, and saves.
292
293
        '''
        assert self.process_running, 'Cannot fail a completed process.'
Markus Scheidgen's avatar
Markus Scheidgen committed
294
295

        failed_with_exception = False
296

297
        self.process_status = ProcessStatus.FAILURE
Markus Scheidgen's avatar
Markus Scheidgen committed
298
299

        logger = self.get_logger(**kwargs)
300
        self.errors = []
Markus Scheidgen's avatar
Markus Scheidgen committed
301
302
303
        for error in errors:
            if isinstance(error, Exception):
                failed_with_exception = True
304
                self.errors.append('%s: %s' % (error.__class__.__name__, str(error)))
305
                Proc.log(
306
                    logger, log_level, 'process failed with exception',
307
                    exc_info=error, error=str(error))
308
309
            else:
                self.errors.append(str(error))
Markus Scheidgen's avatar
Markus Scheidgen committed
310

311
        self.complete_time = datetime.utcnow()
312

Markus Scheidgen's avatar
Markus Scheidgen committed
313
314
        if not failed_with_exception:
            errors_str = "; ".join([str(error) for error in errors])
315
            Proc.log(logger, log_level, 'process failed', errors=errors_str)
Markus Scheidgen's avatar
Markus Scheidgen committed
316

317
318
        self.on_fail()

319
        logger.info('process failed')
320
        if len(self.errors) > 0:
321
            self.last_status_message = f'Process {self.current_process} failed: {self.errors[-1]}'
Markus Scheidgen's avatar
Markus Scheidgen committed
322

323
324
        self.save()

Markus Scheidgen's avatar
Markus Scheidgen committed
325
    def warning(self, *warnings, log_level=logging.WARNING, **kwargs):
326
        ''' Allows to save warnings. Takes strings or exceptions as args. '''
327
        assert self.process_running
328

Markus Scheidgen's avatar
Markus Scheidgen committed
329
330
        logger = self.get_logger(**kwargs)

331
        for warning in warnings:
Markus Scheidgen's avatar
Markus Scheidgen committed
332
333
            warning = str(warning)
            self.warnings.append(warning)
334
            Proc.log(logger, log_level, 'task with warning', warning=warning)
335

336
337
    def set_last_status_message(self, last_status_message: str):
        ''' Sets the `last_status_message` and saves. '''
338
        assert self.process_running
339
        self.last_status_message = last_status_message
340
        self.save()
341
342
343

    def on_success(self):
        ''' To be called whenever a process transitions to status SUCCESS. '''
344
345
        pass

346
347
348
349
350
351
    def on_fail(self):
        ''' To be called whenever a process transitions to status FAILURE. '''
        pass

    def on_waiting_for_result(self):
        ''' To be called whenever a process transitions to status WAITING_FOR_RESULT. '''
352
353
        pass

354
    def block_until_complete(self, interval=0.01):
355
        '''
356
        Reloads the process constantly until it sees a completed process (FAILURE or SUCCESS).
357
358
359
        Should be used with care as it can block indefinitely. Just intended for testing
        purposes.
        '''
David Sikter's avatar
David Sikter committed
360
        self.reload()
361
        while self.process_running:
362
363
364
            time.sleep(interval)
            self.reload()

365
    def block_until_complete_or_waiting_for_result(self, interval=0.01):
366
        '''
367
368
369
        Reloads the process constantly until the process is either complete or in status WAITING_FOR_RESULT.
        Should be used with care as it can block indefinitely. Just intended for testing
        purposes.
370
        '''
371
        while self.process_status in (ProcessStatus.PENDING, ProcessStatus.RUNNING):
372
373
374
            time.sleep(interval)
            self.reload()

375
    @classmethod
376
377
378
    def process_all(
            cls, func, query: Dict[str, Any], exclude: List[str] = [],
            process_args: List[Any] = [], process_kwargs: Dict[str, Any] = {}):
379
        '''
380
381
382
383
        Allows to run process functions for all objects on the given query. Calling
        process functions though the func:`process` wrapper might be slow, because
        it causes a save on each call. This function will use a query based update to
        do the same for all objects at once.
384
        '''
385
386
387
388
389
390
391
392

        running_query = dict(cls.process_running_mongoengine_query())
        running_query.update(query)
        if cls.objects(**running_query).first() is not None:
            raise ProcessAlreadyRunning('Tried to call a processing function on an already processing process.')

        cls._get_collection().update_many(query, {'$set': dict(
            current_process=func.__name__,
393
            process_status=ProcessStatus.PENDING)})
394
395

        for obj in cls.objects(**query).exclude(*exclude):
396
            obj._run_process(func, process_args, process_kwargs)
397

398
    def _run_process(self, func, process_args, process_kwargs):
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
        if hasattr(func, '__process_unwrapped'):
            func = getattr(func, '__process_unwrapped')

        self_id = self.id.__str__()
        cls_name = self.__class__.__name__

        queue = None
        if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
            queue = worker_direct(self.worker_hostname).name

        priority = config.celery.priorities.get('%s.%s' % (cls_name, func.__name__), 1)

        logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__)
        logger.debug('calling process function', queue=queue, priority=priority)

        return proc_task.apply_async(
415
            args=[cls_name, self_id, func.__name__, process_args, process_kwargs],
416
417
            queue=queue, priority=priority)

418
    def __str__(self):
Markus Scheidgen's avatar
Markus Scheidgen committed
419
        return 'proc celery_task_id=%s worker_hostname=%s' % (self.celery_task_id, self.worker_hostname)
420

421
422

def all_subclasses(cls):
423
    ''' Helper method to calculate set of all subclasses of a given class. '''
424
425
426
    return set(cls.__subclasses__()).union(
        [s for c in cls.__subclasses__() for s in all_subclasses(c)])

Markus Scheidgen's avatar
Markus Scheidgen committed
427

428
all_proc_cls = {cls.__name__: cls for cls in all_subclasses(Proc)}
429
''' Name dictionary for all Proc classes. '''
430
431


432
class NomadCeleryRequest(Request):
433
    '''
434
435
    A custom celery request class that allows to catch error in the worker main
    thread, which cannot be caught on the worker threads themselves.
436
    '''
437
438
439
440
441

    def _fail(self, event, **kwargs):
        args = self._payload[0]
        # this might be run in the worker main thread, which does not have a mongo
        # connection by default
442
443
        if infrastructure.mongo_client is None:
            infrastructure.setup_mongo()
444

445
        proc = unwarp_task(self.task, *args)
446
        proc.fail(event, **kwargs)
447
448
449
450
451
452
453
454
455

    def on_timeout(self, soft, timeout):
        if not soft:
            self._fail('task timeout occurred', timeout=timeout)

        super().on_timeout(soft, timeout)

    def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
        if isinstance(exc_info.exception, WorkerLostError):
456
457
458
            infrastructure.setup()
            utils.get_logger(__name__).error(
                'detected WorkerLostError', exc_info=exc_info.exception)
459
            self._fail(
460
                'process failed due to worker lost: %s' % str(exc_info.exception),
461
462
463
464
465
466
467
468
469
470
471
472
473
474
                exc_info=exc_info)

        super().on_failure(
            exc_info,
            send_failed_event=send_failed_event,
            return_ok=return_ok
        )


class NomadCeleryTask(Task):
    Request = NomadCeleryRequest


def unwarp_task(task, cls_name, self_id, *args, **kwargs):
475
    '''
476
    Retrieves the proc object that the given task is executed on from the database.
477
    '''
478
    logger = utils.get_logger(__name__, cls=cls_name, id=self_id)
479

480
481
482
483
484
485
486
487
    # get the process class
    global all_proc_cls
    cls = all_proc_cls.get(cls_name, None)
    if cls is None:
        # refind all Proc classes, since more modules might have been imported by now
        all_proc_cls = {cls.__name__: cls for cls in all_subclasses(Proc)}
        cls = all_proc_cls.get(cls_name, None)

488
    if cls is None:
489
        logger.critical('document not a subcass of Proc')
Markus Scheidgen's avatar
Markus Scheidgen committed
490
        raise ProcNotRegistered('document %s not a subclass of Proc' % cls_name)
491

492
    # get the process instance
493
    try:
494
495
496
        try:
            self = cls.get(self_id)
        except KeyError as e:
Markus Scheidgen's avatar
Markus Scheidgen committed
497
            from nomad.app import flask
Markus Scheidgen's avatar
Markus Scheidgen committed
498
            if flask.app.config['TESTING']:
499
500
501
                # This only happens in tests, where it is not always avoidable that
                # tasks from old test-cases bleed over.
                raise ProcObjectDoesNotExist()
502
            logger.warning('called object is missing, retry')
503
504
            raise task.retry(exc=e, countdown=3)
    except KeyError:
505
        logger.critical('called object is missing, retries exeeded', proc_id=self_id)
506
507
508
509
510
        raise ProcObjectDoesNotExist()

    return self


511
512
@app.task(
    bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3,
513
    acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
514
    time_limit=config.celery.timeout * 2)
515
def proc_task(task, cls_name, self_id, func_attr, process_args, process_kwargs):
516
    '''
517
518
519
520
521
    The celery task that is used to execute async process functions.
    It ignores results, since all results are handled via the self document.
    It retries for 3 times with a countdown of 3 on missing 'selfs', since this
    might happen in sharded, distributed mongo setups where the object might not
    have yet been propagated and therefore appear missing.
522
    '''
523
524
525
526
    if '_meta_label' in process_kwargs:
        config.meta.label = process_kwargs['_meta_label']
        del(process_kwargs['_meta_label'])

527
    self = unwarp_task(task, cls_name, self_id)
528
529

    logger = self.get_logger()
530
    logger.debug('received process function call')
531

532
    self.worker_hostname = worker_hostname
533
534
    self.celery_task_id = task.request.id

535
    # get the process function
536
537
    func = getattr(self, func_attr, None)
    if func is None:
538
        logger.error('called function not a function of proc class')
539
        self.fail('called function %s is not a function of proc class %s' % (func_attr, cls_name))
540
541
        return

542
    # unwrap the process decorator
543
544
    func = getattr(func, '__process_unwrapped', None)
    if func is None:
545
        logger.error('called function was not decorated with @process')
Markus Scheidgen's avatar
Markus Scheidgen committed
546
        self.fail('called function %s was not decorated with @process' % func_attr)
547
548
        return

549
    # call the process function
550
    try:
551
        os.chdir(config.fs.working_directory)
552
        with utils.timer(logger, 'process executed on worker', log_memory=True):
553
            # Actually call the process function
554
            self.process_status = ProcessStatus.RUNNING
555
            self.last_status_message = 'Started process: ' + func_attr
556
            self.save()
557
558
            rv = func(self, *process_args, **process_kwargs)
            if self.errors:
559
                # Should not happen, but handle just in case
560
561
562
563
564
565
566
567
568
569
570
                # Must have called self.fail, but continued execution and returned normally
                # Set complete_time and process_status, just in case...
                self.complete_time = datetime.utcnow()
                self.process_status = ProcessStatus.FAILURE
                self.save()
                self.get_logger().info('completed process with errors')
            elif rv is None:
                # All looks good
                self.process_status = ProcessStatus.SUCCESS
                self.on_success()
                self.complete_time = datetime.utcnow()
571
572
573
574
                if self.warnings:
                    self.last_status_message = f'Process {func_attr} completed with warnings'
                else:
                    self.last_status_message = f'Process {func_attr} completed successfully'
575
576
577
578
579
580
                self.save()
                self.get_logger().info('completed process')
            elif rv == ProcessStatus.WAITING_FOR_RESULT:
                # No errors, and the process requests to wait for other processes
                self.process_status = ProcessStatus.WAITING_FOR_RESULT
                self.save()
581
                self.on_waiting_for_result()
582
583
584
585
586
            elif rv == ProcessStatus.DELETED:
                # The Proc object itself to be deleted from the database
                pass
            else:
                raise ValueError('Invalid return value from process function')
587
588
589
    except SoftTimeLimitExceeded as e:
        logger.error('exceeded the celery task soft time limit')
        self.fail(e)
590
591
592
    except ProcessFailure as e:
        # Exception with details about how to call self.fail
        self.fail(*e._errors, log_level=e._log_level, **e._kwargs)
593
594
    except Exception as e:
        self.fail(e)
595
596
    except SystemExit as e:
        self.fail(e)
597
598


599
def process(func):
600
    '''
601
602
603
    The decorator for process functions that will be called async via celery.
    All calls to the decorated method will result in celery task requests.
    To transfer state, the instance will be saved to the database and loading on
604
    the celery task worker. Process methods can call other (process) functions/methods on
605
606
    other :class:`Proc` instances. Each :class:`Proc` instance can only run one process
    at a time.
607
    '''
Markus Scheidgen's avatar
Markus Scheidgen committed
608
    @functools.wraps(func)
609
    def wrapper(self, *args, **kwargs):
610
        if self.process_running:
611
            raise ProcessAlreadyRunning('Tried to call a processing function on an already processing process.')
612
613

        self.current_process = func.__name__
614
        self.process_status = ProcessStatus.PENDING
615
616
        self.save()

617
618
        kwargs['_meta_label'] = config.meta.label

619
        self._run_process(func, args, kwargs)
620

621
    setattr(wrapper, '__process_unwrapped', func)
622

623
    return wrapper