Commit 78f82935 authored by Lauri Himanen's avatar Lauri Himanen
Browse files

Refactored re-processing to use pipelines, fixed issue with calculation...

Refactored re-processing to use pipelines, fixed issue with calculation processes not being updated.
parent 1900e5a0
Pipeline #72909 canceled with stages
in 61 minutes and 35 seconds
......@@ -36,7 +36,7 @@ from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
from nomad import utils, config, infrastructure, search, datamodel
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE, PROCESS_CALLED, PROCESS_COMPLETED
from nomad.processing.base import Proc, process, task, ProcessAlreadyRunning, PENDING, SUCCESS, FAILURE, PROCESS_CALLED, PROCESS_COMPLETED
from nomad.parsing import parser_dict, match_parser, Backend
from nomad.normalizing import normalizers
from nomad.processing.pipelines import run_pipelines, PipelineContext
......@@ -231,7 +231,6 @@ class Calc(Proc):
return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
@process
def re_process_calc(self):
'''
Processes a calculation again. This means there is already metadata and
......@@ -740,7 +739,7 @@ class Upload(Proc):
self._continue_with('parse_all')
try:
# check if a calc is already/still processing
# Check if a calc is already/still processing
processing = Calc.objects(
upload_id=self.upload_id,
**Calc.process_running_mongoengine_query()).count()
......@@ -750,13 +749,33 @@ class Upload(Proc):
'processes are still/already running on calc, they will be resetted',
count=processing)
# reset all calcs
# Reset all calcs
Calc._get_collection().update_many(
dict(upload_id=self.upload_id),
{'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})
# process call calcs
Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
# Re-process all calcs
def gen():
query = dict(upload_id=self.upload_id)
exclude = ['metadata']
running_query = dict(Calc.process_running_mongoengine_query())
running_query.update(query)
if Calc.objects(**running_query).first() is not None:
raise ProcessAlreadyRunning('Tried to call a processing function on an already processing process.')
Calc._get_collection().update_many(query, {'$set': dict(
current_process="re_process_calc",
process_status=PROCESS_CALLED)})
for calc in Calc.objects(**query).exclude(*exclude):
yield PipelineContext(
calc.mainfile,
calc.parser,
calc.calc_id,
calc.upload_id,
calc.worker_hostname,
re_process=True
)
run_pipelines(gen(), self.upload_id)
# Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
logger.info('completed to trigger re-process of all calcs')
except Exception as e:
......
......@@ -21,7 +21,7 @@ import networkx as nx
from celery import chain, group, chord
from celery.exceptions import SoftTimeLimitExceeded
from nomad.processing.base import NomadCeleryTask
from nomad.processing.base import NomadCeleryTask, PROCESS_CALLED, PROCESS_COMPLETED
from nomad.processing.celeryapp import app
import nomad.processing.data
from nomad import config
......@@ -61,12 +61,13 @@ class PipelineContext():
"""Convenience class for storing pipeline execution related information.
Provides custom encode/decode functions for JSON serialization with Celery.
"""
def __init__(self, filepath, parser_name, calc_id, upload_id, worker_hostname):
def __init__(self, filepath, parser_name, calc_id, upload_id, worker_hostname, re_process=False):
self.filepath = filepath
self.parser_name = parser_name
self.calc_id = calc_id
self.upload_id = upload_id
self.worker_hostname = worker_hostname
self.re_process = re_process
def encode(self):
return {
......@@ -76,6 +77,7 @@ class PipelineContext():
"calc_id": self.calc_id,
"upload_id": self.upload_id,
"worker_hostname": self.worker_hostname,
"re_process": self.re_process,
}
@staticmethod
......@@ -86,6 +88,7 @@ class PipelineContext():
data["calc_id"],
data["upload_id"],
data["worker_hostname"],
data["re_process"],
)
......@@ -187,6 +190,12 @@ def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
if deleted is None or not deleted:
calc.save()
# After the last stage tell the calculation that it's processing has been
# finished
if i_stage == n_stages - 1:
calc.process_status = PROCESS_COMPLETED
calc.save()
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=False, max_retries=3,
......@@ -211,12 +220,17 @@ def upload_cleanup(task, upload_id):
def comp_process(context, stage_name, i_stage, n_stages):
"""Function for processing computational entries: runs parsing and normalization.
"""Function for processing computational entries: runs parsing and
normalization.
"""
# Process calculation
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.process_calc()
calc.get_logger().info("Processing of calculation {} at path {} finished.".format(context.filepath, context.calc_id))
if context.re_process is True:
calc.re_process_calc()
calc.get_logger().warn("Re-processing of calculation {} at path {} finished.".format(context.filepath, context.calc_id))
else:
calc.process_calc()
calc.get_logger().warn("Processing of calculation {} at path {} finished.".format(context.filepath, context.calc_id))
def get_pipeline(context):
......@@ -275,7 +289,7 @@ def run_pipelines(context_generator, upload_id) -> int:
if i_stage == 0:
# Create the associated Calc object
nomad.processing.data.Calc.create(
calc = nomad.processing.data.Calc.create(
calc_id=pipeline.context.calc_id,
mainfile=pipeline.context.filepath,
parser=pipeline.context.parser_name,
......@@ -283,6 +297,14 @@ def run_pipelines(context_generator, upload_id) -> int:
upload_id=pipeline.context.upload_id
)
# Tell the calculation that it's processing has been started
if context.re_process:
calc.current_process = "process"
else:
calc.current_process = "re_process"
calc.process_status = PROCESS_CALLED
calc.save()
if n_pipelines != 0:
# Resolve all independent dependency trees
dependency_graph = nx.DiGraph()
......@@ -290,8 +312,6 @@ def run_pipelines(context_generator, upload_id) -> int:
dependency_graph.add_edges_from(stage_dependencies)
dependency_trees = nx.weakly_connected_components(dependency_graph)
upload = nomad.processing.data.Upload.get(upload_id)
# Form chains for each independent tree.
chains = []
for tree_nodes in dependency_trees:
......@@ -299,8 +319,6 @@ def run_pipelines(context_generator, upload_id) -> int:
sorted_nodes = nx.topological_sort(tree)
groups = []
for node in reversed(list(sorted_nodes)):
upload.get_logger().warning(node)
# Group all tasks for a stage
tasks = stages[node]
task_signatures = []
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment