Commit 9f8f87d6 authored by Lauri Himanen's avatar Lauri Himanen
Browse files

Completely removed the 'joined' field from Uploads as it is not needed,...

Completely removed the 'joined' field from Uploads as it is not needed, removed redis persistence on development docker-compose, made sure that results are not kept for celery tasks.
parent 0224033c
Pipeline #72976 failed with stages
in 19 minutes and 55 seconds
......@@ -31,6 +31,7 @@ from contextlib import contextmanager
import os.path
from datetime import datetime
from pymongo import UpdateOne
from celery.utils import worker_direct
import hashlib
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
......@@ -350,15 +351,6 @@ class Calc(Proc):
self.get_logger().error(
'could not write archive after processing failure', exc_info=e)
# def on_process_complete(self, process_name):
# # the save might be necessary to correctly read the join condition from the db
# self.save()
# # in case of error, the process_name might be unknown
# if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
# self.get_logger().warning("JOINING NOW")
# self.upload.reload()
# self.upload.check_join()
@task
def parsing(self):
''' The *task* that encapsulates all parsing related actions. '''
......@@ -519,7 +511,6 @@ class Upload(Proc):
published: Boolean that indicates the publish status
publish_time: Date when the upload was initially published
last_update: Date of the last publishing/re-processing
joined: Boolean indicates if the running processing has joined (:func:`check_join`)
'''
id_field = 'upload_id'
......@@ -535,8 +526,6 @@ class Upload(Proc):
publish_time = DateTimeField()
last_update = DateTimeField()
joined = BooleanField(default=False)
meta: Any = {
'indexes': [
'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
......@@ -754,8 +743,14 @@ class Upload(Proc):
dict(upload_id=self.upload_id),
{'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})
# Resolve queue and priority
queue = None
if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
queue = worker_direct(self.worker_hostname).name
priority = config.celery.priorities.get('%s.%s' % ("Calc", "re_process"), 1)
# Re-process all calcs
def gen():
def pipeline_generator():
query = dict(upload_id=self.upload_id)
exclude = ['metadata']
running_query = dict(Calc.process_running_mongoengine_query())
......@@ -774,8 +769,7 @@ class Upload(Proc):
calc.worker_hostname,
re_process=True
)
run_pipelines(gen(), self.upload_id)
# Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
run_pipelines(pipeline_generator(), self.upload_id, queue, priority)
logger.info('completed to trigger re-process of all calcs')
except Exception as e:
......@@ -803,7 +797,6 @@ class Upload(Proc):
self._continue_with('cleanup')
self.upload_files.re_pack(self.user_metadata())
self.joined = True
self._complete()
@process
......@@ -955,8 +948,14 @@ class Upload(Proc):
# Tell Upload that a process has been started.
self.processing_started()
# Resolve queue and priority
queue = None
if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
queue = worker_direct(self.worker_hostname).name
priority = config.celery.priorities.get('%s.%s' % ("Calc", "process"), 1)
# Start running all pipelines
n_pipelines = run_pipelines(self.match_mainfiles(), self.upload_id)
n_pipelines = run_pipelines(self.match_mainfiles(), self.upload_id, queue, priority)
# If the upload has not spawned any pipelines, tell it that it is
# finished and perform cleanup
......@@ -964,13 +963,11 @@ class Upload(Proc):
self.processing_finished()
def reset(self):
self.joined = False
super().reset()
@classmethod
def reset_pymongo_update(cls, worker_hostname: str = None):
update = super().reset_pymongo_update()
update.update(joined=False)
return update
def _cleanup_after_processing(self):
......
......@@ -20,6 +20,7 @@ from collections import defaultdict
import networkx as nx
from celery import chain, group, chord
from celery.exceptions import SoftTimeLimitExceeded
from celery.utils import worker_direct
from nomad.processing.base import NomadCeleryTask, PROCESS_CALLED, PROCESS_COMPLETED
from nomad.processing.celeryapp import app
......@@ -133,6 +134,7 @@ class Stage():
)
class Pipeline():
"""Pipeline consists of a list of stages. The pipeline is complete when all
stages are finished.
......@@ -158,9 +160,9 @@ class Pipeline():
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=False, max_retries=3,
bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3,
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout)
time_limit=config.celery.timeout * 2)
def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
"""This function wraps the function calls made within the stages. This
wrapper takes care of common tasks like exception handling and timeout
......@@ -211,9 +213,9 @@ def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=False, max_retries=3,
bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3,
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout)
time_limit=config.celery.timeout * 2)
def empty_task(task, *args, **kwargs):
"""Empty dummy task used as a callback for Celery chords.
"""
......@@ -221,11 +223,11 @@ def empty_task(task, *args, **kwargs):
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=False, max_retries=3,
bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3,
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout)
time_limit=config.celery.timeout * 2)
def upload_cleanup(task, upload_id):
"""Task for cleanin up after processing.
"""Task for cleaning up after processing an upload.
"""
upload = nomad.processing.data.Upload.get(upload_id)
upload.processing_finished()
......@@ -271,7 +273,7 @@ def get_pipeline(context):
return pipeline
def run_pipelines(context_generator, upload_id) -> int:
def run_pipelines(context_generator, upload_id, queue, priority) -> int:
"""Used to start running pipelines based on the PipelineContext objects
generated by the given generator. Currently the implementation searches all
the mainfiles in an upload before starting to execute any stages. This
......@@ -317,7 +319,7 @@ def run_pipelines(context_generator, upload_id) -> int:
tasks = stages[node]
task_signatures = []
for task in tasks:
task_signatures.append(task.signature())
task_signatures.append(task.signature().set(queue=queue, priority=priority))
task_group = group(*task_signatures)
# Celery does not allow directly chaining groups. To simulate
......@@ -335,6 +337,12 @@ def run_pipelines(context_generator, upload_id) -> int:
# After all trees are finished, the upload should perform cleanup
final_chain = chain(tree_group, upload_cleanup.si(upload_id))
final_chain.delay()
res = final_chain.delay()
# According to Celery docs we need to release the result resources:
# https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-result
# None of the results are ever used anyways, the result backend is configured
# only to keep the state of the task chains.
res.forget()
return n_pipelines
......@@ -51,8 +51,7 @@ services:
restart: unless-stopped
image: redis:5.0.8-alpine
container_name: nomad_redis
volumes:
- nomad_redis:/data
command: [sh, -c, "rm -f /data/dump.rdb && redis-server --save ''"] # Disables db persistence to disk
# the search engine
elastic:
......@@ -79,4 +78,3 @@ volumes:
nomad_mongo:
nomad_elastic:
nomad_rabbitmq:
nomad_redis:
......@@ -107,4 +107,4 @@ volumes:
nomad_oasis_mongo:
nomad_oasis_elastic:
nomad_oasis_rabbitmq:
nomad_oasis_files:
\ No newline at end of file
nomad_oasis_files:
......@@ -33,7 +33,7 @@ from kombu.serialization import register
from nomad import config, infrastructure, parsing, processing, app, utils
from nomad.datamodel import User
from nomad.processing.pipelines import PipelineContext, my_dumps, my_loads
from nomad.processing.pipelines import my_dumps, my_loads
from tests import test_parsing
from tests.normalizing.conftest import run_normalize
......
......@@ -69,7 +69,6 @@ def assert_processing(upload: Upload, published: bool = False):
assert upload.upload_id is not None
assert len(upload.errors) == 0
assert upload.tasks_status == SUCCESS
assert upload.joined
upload_files = UploadFiles.get(upload.upload_id, is_authorized=lambda: True)
if published:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment