Commit 4d21ba27 authored by Lauri Himanen's avatar Lauri Himanen
Browse files

Trying to add redis as service to Gitlab CI.

parent 78f82935
Pipeline #72911 canceled with stages
in 38 minutes and 1 second
......@@ -68,6 +68,7 @@ tests:
image: $TEST_IMAGE
services:
- rabbitmq
- redis:5.0.8-alpine
- name: docker.elastic.co/elasticsearch/elasticsearch:6.3.2
alias: elastic
# fix issue with running elastic in gitlab ci runner:
......
......@@ -157,16 +157,34 @@ class Pipeline():
self.stages.append(stage)
# This function wraps the function calls made within the stages. Although the
# results are not used in any way, ignore_results is set to False as documented
# in https://docs.celeryproject.org/en/stable/userguide/canvas.html#important-notes
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=False, max_retries=3,
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout)
def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
# Get the associated calculation
"""This function wraps the function calls made within the stages. This
wrapper takes care of common tasks like exception handling and timeout
handling, and also persists the state of the pipelines in to MongoDB.
"""
# Create the associated Calc object during first stage, otherwise get it
# from Mongo
if i_stage == 0:
calc = nomad.processing.data.Calc.create(
calc_id=context.calc_id,
mainfile=context.filepath,
parser=context.parser_name,
worker_hostname=context.worker_hostname,
upload_id=context.upload_id
)
# During the first step tell the calculation that it's processing has been
# started
if context.re_process:
calc.current_process = "process"
else:
calc.current_process = "re_process"
calc.process_status = PROCESS_CALLED
calc.save()
else:
calc = nomad.processing.data.Calc.get(context.calc_id)
logger = calc.get_logger()
......@@ -176,9 +194,8 @@ def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
calc.fail('Could not find the function associated with the stage.')
# Try to execute the stage.
deleted = False
try:
deleted = function(context, stage_name, i_stage, n_stages)
function(context, stage_name, i_stage, n_stages)
except SoftTimeLimitExceeded as e:
logger.error('exceeded the celery task soft time limit')
calc.fail(e)
......@@ -186,12 +203,8 @@ def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
calc.fail(e)
except SystemExit as e:
calc.fail(e)
finally:
if deleted is None or not deleted:
calc.save()
# After the last stage tell the calculation that it's processing has been
# finished
# After the last stage, save the calculation info
if i_stage == n_stages - 1:
calc.process_status = PROCESS_COMPLETED
calc.save()
......@@ -260,7 +273,9 @@ def get_pipeline(context):
def run_pipelines(context_generator, upload_id) -> int:
"""Used to start running pipelines based on the PipelineContext objects
generated by the given generator.
generated by the given generator. Currently the implementation searches all
the mainfiles in an upload before starting to execute any stages. This
makes synchronization with Celery much easier.
Returns:
The number of pipelines that were started.
......@@ -274,7 +289,7 @@ def run_pipelines(context_generator, upload_id) -> int:
for context in context_generator:
pipeline = get_pipeline(context)
n_pipelines += 1
for i_stage, stage in enumerate(pipeline.stages):
for stage in pipeline.stages:
# Store stage names to be used as nodes
stage_names.add(stage.name)
......@@ -284,27 +299,6 @@ def run_pipelines(context_generator, upload_id) -> int:
stage_dependencies.append((stage.name, dependency))
stages[stage.name].append(stage)
# Start eagerly running first stage if it does not have any
# dependencies
if i_stage == 0:
# Create the associated Calc object
calc = nomad.processing.data.Calc.create(
calc_id=pipeline.context.calc_id,
mainfile=pipeline.context.filepath,
parser=pipeline.context.parser_name,
worker_hostname=pipeline.context.worker_hostname,
upload_id=pipeline.context.upload_id
)
# Tell the calculation that it's processing has been started
if context.re_process:
calc.current_process = "process"
else:
calc.current_process = "re_process"
calc.process_status = PROCESS_CALLED
calc.save()
if n_pipelines != 0:
# Resolve all independent dependency trees
dependency_graph = nx.DiGraph()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment