diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e4669f81305e562009056b3530c445545da1d5ae..ebe7a1bc951736183c0708ba5a8f57d8b0d38348 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -68,6 +68,7 @@ tests: image: $TEST_IMAGE services: - rabbitmq + - redis:5.0.8-alpine - name: docker.elastic.co/elasticsearch/elasticsearch:6.3.2 alias: elastic # fix issue with running elastic in gitlab ci runner: diff --git a/nomad/processing/pipelines.py b/nomad/processing/pipelines.py index 13006d75a1b31174143aafc8a08a5c74077ff869..3cdb3bfc3034a057125b7304d660b2fd5db866de 100644 --- a/nomad/processing/pipelines.py +++ b/nomad/processing/pipelines.py @@ -157,17 +157,35 @@ class Pipeline(): self.stages.append(stage) -# This function wraps the function calls made within the stages. Although the -# results are not used in any way, ignore_results is set to False as documented -# in https://docs.celeryproject.org/en/stable/userguide/canvas.html#important-notes @app.task( bind=True, base=NomadCeleryTask, ignore_results=False, max_retries=3, acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout, time_limit=config.celery.timeout) def wrapper(task, function_name, context, stage_name, i_stage, n_stages): - - # Get the associated calculation - calc = nomad.processing.data.Calc.get(context.calc_id) + """This function wraps the function calls made within the stages. This + wrapper takes care of common tasks like exception handling and timeout + handling, and also persists the state of the pipelines in to MongoDB. + """ + # Create the associated Calc object during first stage, otherwise get it + # from Mongo + if i_stage == 0: + calc = nomad.processing.data.Calc.create( + calc_id=context.calc_id, + mainfile=context.filepath, + parser=context.parser_name, + worker_hostname=context.worker_hostname, + upload_id=context.upload_id + ) + # During the first step tell the calculation that it's processing has been + # started + if context.re_process: + calc.current_process = "process" + else: + calc.current_process = "re_process" + calc.process_status = PROCESS_CALLED + calc.save() + else: + calc = nomad.processing.data.Calc.get(context.calc_id) logger = calc.get_logger() # Get the defined function. If does not exist, log error and fail calculation. @@ -176,9 +194,8 @@ def wrapper(task, function_name, context, stage_name, i_stage, n_stages): calc.fail('Could not find the function associated with the stage.') # Try to execute the stage. - deleted = False try: - deleted = function(context, stage_name, i_stage, n_stages) + function(context, stage_name, i_stage, n_stages) except SoftTimeLimitExceeded as e: logger.error('exceeded the celery task soft time limit') calc.fail(e) @@ -186,12 +203,8 @@ def wrapper(task, function_name, context, stage_name, i_stage, n_stages): calc.fail(e) except SystemExit as e: calc.fail(e) - finally: - if deleted is None or not deleted: - calc.save() - # After the last stage tell the calculation that it's processing has been - # finished + # After the last stage, save the calculation info if i_stage == n_stages - 1: calc.process_status = PROCESS_COMPLETED calc.save() @@ -260,7 +273,9 @@ def get_pipeline(context): def run_pipelines(context_generator, upload_id) -> int: """Used to start running pipelines based on the PipelineContext objects - generated by the given generator. + generated by the given generator. Currently the implementation searches all + the mainfiles in an upload before starting to execute any stages. This + makes synchronization with Celery much easier. Returns: The number of pipelines that were started. @@ -274,7 +289,7 @@ def run_pipelines(context_generator, upload_id) -> int: for context in context_generator: pipeline = get_pipeline(context) n_pipelines += 1 - for i_stage, stage in enumerate(pipeline.stages): + for stage in pipeline.stages: # Store stage names to be used as nodes stage_names.add(stage.name) @@ -284,27 +299,6 @@ def run_pipelines(context_generator, upload_id) -> int: stage_dependencies.append((stage.name, dependency)) stages[stage.name].append(stage) - # Start eagerly running first stage if it does not have any - # dependencies - if i_stage == 0: - - # Create the associated Calc object - calc = nomad.processing.data.Calc.create( - calc_id=pipeline.context.calc_id, - mainfile=pipeline.context.filepath, - parser=pipeline.context.parser_name, - worker_hostname=pipeline.context.worker_hostname, - upload_id=pipeline.context.upload_id - ) - - # Tell the calculation that it's processing has been started - if context.re_process: - calc.current_process = "process" - else: - calc.current_process = "re_process" - calc.process_status = PROCESS_CALLED - calc.save() - if n_pipelines != 0: # Resolve all independent dependency trees dependency_graph = nx.DiGraph()