Commit 5aac54b9 authored by Lauri Himanen's avatar Lauri Himanen
Browse files

Removed one save, removed Calc creation during reprocessing.

parent 9f8f87d6
Pipeline #72988 passed with stages
in 19 minutes and 28 seconds
......@@ -752,7 +752,6 @@ class Upload(Proc):
# Re-process all calcs
def pipeline_generator():
query = dict(upload_id=self.upload_id)
exclude = ['metadata']
running_query = dict(Calc.process_running_mongoengine_query())
running_query.update(query)
if Calc.objects(**running_query).first() is not None:
......@@ -760,7 +759,7 @@ class Upload(Proc):
Calc._get_collection().update_many(query, {'$set': dict(
current_process="re_process_calc",
process_status=PROCESS_CALLED)})
for calc in Calc.objects(**query).exclude(*exclude):
for calc in Calc.objects(**query):
yield PipelineContext(
calc.mainfile,
calc.parser,
......
......@@ -20,8 +20,6 @@ from collections import defaultdict
import networkx as nx
from celery import chain, group, chord
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils import worker_direct
from nomad.processing.base import NomadCeleryTask, PROCESS_CALLED, PROCESS_COMPLETED
from nomad.processing.celeryapp import app
import nomad.processing.data
......@@ -121,9 +119,6 @@ class Stage():
def add_dependency(self, name):
self.dependencies.append(name)
# def run(self, filepath, parser_name, calc_id, upload_id, worker_hostname, stage_name, i_stage, n_stages):
# wrapper.delay(self._function.__name__, filepath, parser_name, calc_id, upload_id, worker_hostname, stage_name, i_stage, n_stages)
def signature(self):
return wrapper.si(
self._function.__name__,
......@@ -134,7 +129,6 @@ class Stage():
)
class Pipeline():
"""Pipeline consists of a list of stages. The pipeline is complete when all
stages are finished.
......@@ -168,46 +162,50 @@ def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
wrapper takes care of common tasks like exception handling and timeout
handling, and also persists the state of the pipelines in to MongoDB.
"""
# Create the associated Calc object during first stage, otherwise get it
# from Mongo
if i_stage == 0:
# Create the associated Calc object during first stage (and if not
# reprocessing), otherwise get it from Mongo
calc = None
if i_stage == 0 and not context.re_process:
calc = nomad.processing.data.Calc.create(
calc_id=context.calc_id,
mainfile=context.filepath,
parser=context.parser_name,
worker_hostname=context.worker_hostname,
upload_id=context.upload_id
upload_id=context.upload_id,
current_process="process",
process_status=PROCESS_CALLED,
)
# During the first step tell the calculation that it's processing has been
# started
if context.re_process:
calc.current_process = "process"
else:
calc.current_process = "re_process"
calc.process_status = PROCESS_CALLED
calc.save()
else:
calc = nomad.processing.data.Calc.get(context.calc_id)
logger = calc.get_logger()
# Get the defined function. If does not exist, log error and fail calculation.
function = globals().get(function_name, None)
if function is None:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.fail('Could not find the function associated with the stage.')
# Try to execute the stage.
try:
# Try to execute the stage.
function(context, stage_name, i_stage, n_stages)
except SoftTimeLimitExceeded as e:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.fail('Could not find the function associated with the stage.')
logger = calc.get_logger()
logger.error('exceeded the celery task soft time limit')
calc.fail(e)
except Exception as e:
calc.fail(e)
except SystemExit as e:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.fail(e)
except Exception as e:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.fail(e)
# After the last stage, save the calculation info
if i_stage == n_stages - 1:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.process_status = PROCESS_COMPLETED
calc.save()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment