Commit beb68ec0 authored by Lauri Himanen's avatar Lauri Himanen
Browse files

Removing pipeline usage for now.

parent 71d39906
Pipeline #73621 passed with stages
in 19 minutes and 2 seconds
......@@ -68,8 +68,6 @@ tests:
image: $TEST_IMAGE
services:
- rabbitmq
- name: redis:5.0.8-alpine
alias: redis
- name: docker.elastic.co/elasticsearch/elasticsearch:6.3.2
alias: elastic
# fix issue with running elastic in gitlab ci runner:
......@@ -80,7 +78,6 @@ tests:
RABBITMQ_DEFAULT_USER: rabbitmq
RABBITMQ_DEFAULT_PASS: rabbitmq
RABBITMQ_DEFAULT_VHOST: /
NOMAD_REDIS_HOST: redis
NOMAD_RABBITMQ_HOST: rabbitmq
NOMAD_ELASTIC_HOST: elastic
NOMAD_MONGO_HOST: mongo
......
......@@ -16,7 +16,7 @@ from typing import List, Any, Dict
import logging
import time
import os
from celery import Task
from celery import Celery, Task
from celery.worker.request import Request
from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init, \
celeryd_after_setup, worker_process_shutdown
......@@ -30,7 +30,6 @@ from datetime import datetime
import functools
from nomad import config, utils, infrastructure
from nomad.processing.celeryapp import app
import nomad.patch # pylint: disable=unused-import
......@@ -75,6 +74,14 @@ def on_worker_process_shutdown(*args, **kwargs):
disconnect()
app = Celery('nomad.processing', broker=config.rabbitmq_url())
app.conf.update(worker_hijack_root_logger=False)
app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
if config.celery.routing == config.CELERY_WORKER_ROUTING:
app.conf.update(worker_direct=True)
app.conf.task_queue_max_priority = 10
CREATED = 'CREATED'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
......@@ -528,7 +535,10 @@ def unwarp_task(task, cls_name, self_id, *args, **kwargs):
return self
@app.task(bind=True, base=NomadCeleryTask)
@app.task(
bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3,
acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout,
time_limit=config.celery.timeout * 2)
def proc_task(task, cls_name, self_id, func_attr):
'''
The celery task that is used to execute async process functions.
......
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains the Celery configuration.
"""
from celery import Celery
from nomad import config
# Celery is configured to use redis as a results backend. Although the results
# are not forwarded within the processing pipeline, celery requires the results
# backend to be configured in order to use chained tasks.
app = Celery(
'nomad.processing',
backend=config.redis_url(),
broker=config.rabbitmq_url(),
)
app.conf.update(accept_content=["myjson"])
app.conf.update(task_serializer="myjson")
app.conf.update(result_serializer="myjson")
app.conf.update(worker_hijack_root_logger=False)
app.conf.update(worker_max_memory_per_child=config.celery.max_memory)
app.conf.update(soft_task_time_limit=2 * config.celery.timeout)
app.conf.update(task_time_limit=config.celery.timeout)
app.conf.update(task_acks_late=config.celery.acks_late)
app.conf.update(task_ignore_result=True)
app.Task.max_retries = 3
if config.celery.routing == config.CELERY_WORKER_ROUTING:
app.conf.update(worker_direct=True)
app.conf.task_queue_max_priority = 10
......@@ -23,7 +23,8 @@ calculations, and files
.. autoclass:: Upload
'''
from typing import cast, List, Any, Iterator, Dict, cast, Iterable
from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable
from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField
import logging
from structlog import wrap_logger
......@@ -31,16 +32,14 @@ from contextlib import contextmanager
import os.path
from datetime import datetime
from pymongo import UpdateOne
from celery.utils import worker_direct
import hashlib
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
from nomad import utils, config, infrastructure, search, datamodel
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles
from nomad.processing.base import Proc, process, task, ProcessAlreadyRunning, PENDING, SUCCESS, FAILURE, PROCESS_CALLED, PROCESS_COMPLETED
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parser_dict, match_parser, Backend
from nomad.normalizing import normalizers
from nomad.processing.pipelines import run_pipelines, PipelineContext
def _pack_log_event(logger, method_name, event_dict):
......@@ -231,6 +230,7 @@ class Calc(Proc):
return wrap_logger(logger, processors=_log_processors + [save_to_calc_log])
@process
def re_process_calc(self):
'''
Processes a calculation again. This means there is already metadata and
......@@ -293,6 +293,7 @@ class Calc(Proc):
except Exception as e:
logger.error('could unload processing results', exc_info=e)
@process
def process_calc(self):
'''
Processes a new calculation that has no prior records in the mongo, elastic,
......@@ -350,6 +351,14 @@ class Calc(Proc):
self.get_logger().error(
'could not write archive after processing failure', exc_info=e)
def on_process_complete(self, process_name):
# the save might be necessary to correctly read the join condition from the db
self.save()
# in case of error, the process_name might be unknown
if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None:
self.upload.reload()
self.upload.check_join()
@task
def parsing(self):
''' The *task* that encapsulates all parsing related actions. '''
......@@ -510,6 +519,7 @@ class Upload(Proc):
published: Boolean that indicates the publish status
publish_time: Date when the upload was initially published
last_update: Date of the last publishing/re-processing
joined: Boolean indicates if the running processing has joined (:func:`check_join`)
'''
id_field = 'upload_id'
......@@ -525,11 +535,12 @@ class Upload(Proc):
publish_time = DateTimeField()
last_update = DateTimeField()
joined = BooleanField(default=False)
meta: Any = {
'indexes': [
'user_id', 'tasks_status', 'process_status', 'published', 'upload_time'
],
'strict': False # ignore extra fields to support older entries with join related fields
]
}
def __init__(self, **kwargs):
......@@ -728,7 +739,7 @@ class Upload(Proc):
self._continue_with('parse_all')
try:
# Check if a calc is already/still processing
# check if a calc is already/still processing
processing = Calc.objects(
upload_id=self.upload_id,
**Calc.process_running_mongoengine_query()).count()
......@@ -738,37 +749,13 @@ class Upload(Proc):
'processes are still/already running on calc, they will be resetted',
count=processing)
# Reset all calcs
# reset all calcs
Calc._get_collection().update_many(
dict(upload_id=self.upload_id),
{'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})
# Resolve queue and priority
queue = None
if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
queue = worker_direct(self.worker_hostname).name
priority = config.celery.priorities.get('%s.%s' % ("Calc", "re_process"), 1)
# Re-process all calcs
def pipeline_generator():
query = dict(upload_id=self.upload_id)
running_query = dict(Calc.process_running_mongoengine_query())
running_query.update(query)
if Calc.objects(**running_query).first() is not None:
raise ProcessAlreadyRunning('Tried to call a processing function on an already processing process.')
Calc._get_collection().update_many(query, {'$set': dict(
current_process="re_process_calc",
process_status=PROCESS_CALLED)})
for calc in Calc.objects(**query):
yield PipelineContext(
calc.mainfile,
calc.parser,
calc.calc_id,
calc.upload_id,
calc.worker_hostname,
re_process=True
)
run_pipelines(pipeline_generator(), self.upload_id, queue, priority)
# process call calcs
Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
logger.info('completed to trigger re-process of all calcs')
except Exception as e:
......@@ -796,6 +783,7 @@ class Upload(Proc):
self._continue_with('cleanup')
self.upload_files.re_pack(self.user_metadata())
self.joined = True
self._complete()
@process
......@@ -883,54 +871,34 @@ class Upload(Proc):
self.staging_upload_files.raw_file_object(path).os_path,
self.staging_upload_files.raw_file_object(stripped_path).os_path))
def match_mainfiles(self) -> Iterator[PipelineContext]:
"""Generator function that iterates over files in an upload and returns
basic information for each found mainfile.
def match_mainfiles(self) -> Iterator[Tuple[str, object]]:
'''
Generator function that matches all files in the upload to all parsers to
determine the upload's mainfiles.
Returns:
PipelineContext
"""
Tuples of mainfile, filename, and parsers
'''
directories_with_match: Dict[str, str] = dict()
upload_files = self.staging_upload_files
for filepath in upload_files.raw_file_manifest():
self._preprocess_files(filepath)
for filename in upload_files.raw_file_manifest():
self._preprocess_files(filename)
try:
parser = match_parser(upload_files.raw_file_object(filepath).os_path)
parser = match_parser(upload_files.raw_file_object(filename).os_path)
if parser is not None:
directory = os.path.dirname(filepath)
directory = os.path.dirname(filename)
if directory in directories_with_match:
# TODO this might give us the chance to store directory based relationship
# between calcs for the future?
pass
else:
directories_with_match[directory] = filepath
yield PipelineContext(
filepath,
parser.name,
upload_files.calc_id(filepath),
self.upload_id,
self.worker_hostname
)
directories_with_match[directory] = filename
yield filename, parser
except Exception as e:
self.get_logger().error(
'exception while matching pot. mainfile',
mainfile=filepath, exc_info=e)
def processing_started(self):
"""Informs MongoDB that this Upload has started processing.
"""
# Tell Upload that a process has been started.
self.current_process = "process"
self.process_status = PROCESS_CALLED
self.save()
def processing_finished(self):
"""Informs MongoDB that this Upload has finished processing.
"""
# Tell Upload that a process has been started.
self.process_status = PROCESS_COMPLETED
self.save()
self.cleanup()
mainfile=filename, exc_info=e)
@task
def parse_all(self):
......@@ -943,30 +911,54 @@ class Upload(Proc):
with utils.timer(
logger, 'upload extracted', step='matching',
upload_size=self.upload_files.size):
for filename, parser in self.match_mainfiles():
calc = Calc.create(
calc_id=self.upload_files.calc_id(filename),
mainfile=filename, parser=parser.name,
worker_hostname=self.worker_hostname,
upload_id=self.upload_id)
# Tell Upload that a process has been started.
self.processing_started()
# Resolve queue and priority
queue = None
if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
queue = worker_direct(self.worker_hostname).name
priority = config.celery.priorities.get('%s.%s' % ("Calc", "process"), 1)
calc.process_calc()
# Start running all pipelines
n_pipelines = run_pipelines(self.match_mainfiles(), self.upload_id, queue, priority)
def on_process_complete(self, process_name):
if process_name == 'process_upload' or process_name == 're_process_upload':
self.check_join()
# If the upload has not spawned any pipelines, tell it that it is
# finished and perform cleanup
if n_pipelines == 0:
self.processing_finished()
def check_join(self):
'''
Performs an evaluation of the join condition and triggers the :func:`cleanup`
task if necessary. The join condition allows to run the ``cleanup`` after
all calculations have been processed. The upload processing stops after all
calculation processings have been triggered (:func:`parse_all` or
:func:`re_process_upload`). The cleanup task is then run within the last
calculation process (the one that triggered the join by calling this method).
'''
total_calcs = self.total_calcs
processed_calcs = self.processed_calcs
self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs)
# check if process is not running anymore, i.e. not still spawining new processes to join
# check the join condition, i.e. all calcs have been processed
if not self.process_running and processed_calcs >= total_calcs:
# this can easily be called multiple times, e.g. upload finished after all calcs finished
modified_upload = self._get_collection().find_one_and_update(
{'_id': self.upload_id, 'joined': {'$ne': True}},
{'$set': {'joined': True}})
if modified_upload is not None:
self.get_logger().debug('join')
self.cleanup()
else:
# the join was already done due to a prior call
pass
def reset(self):
self.joined = False
super().reset()
@classmethod
def reset_pymongo_update(cls, worker_hostname: str = None):
update = super().reset_pymongo_update()
update.update(joined=False)
return update
def _cleanup_after_processing(self):
......
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module contains the objects and setups for running pipelines.
"""
from typing import List
from collections import defaultdict
import networkx as nx
from celery import chain, group, chord
from celery.exceptions import SoftTimeLimitExceeded
from nomad.processing.base import NomadCeleryTask, PROCESS_CALLED, PROCESS_COMPLETED
from nomad.processing.celeryapp import app
import nomad.processing.data
import json
from kombu.serialization import register
def my_dumps(obj):
"""Custom JSON encoder function for Celery tasks.
"""
class MyEncoder(json.JSONEncoder):
def default(self, obj): # pylint: disable=E0202
if isinstance(obj, PipelineContext):
return obj.encode()
else:
return json.JSONEncoder.default(self, obj)
return json.dumps(obj, cls=MyEncoder)
def my_loads(obj):
"""Custom JSON decoder function for Celery tasks.
"""
def my_decoder(obj):
if '__type__' in obj:
if obj['__type__'] == '__pipelinecontext__':
return PipelineContext.decode(obj)
return obj
return json.loads(obj, object_hook=my_decoder)
register("myjson", my_dumps, my_loads, content_type='application/x-myjson', content_encoding='utf-8')
class PipelineContext():
"""Convenience class for storing pipeline execution related information.
Provides custom encode/decode functions for JSON serialization with Celery.
"""
def __init__(self, filepath, parser_name, calc_id, upload_id, worker_hostname, re_process=False):
self.filepath = filepath
self.parser_name = parser_name
self.calc_id = calc_id
self.upload_id = upload_id
self.worker_hostname = worker_hostname
self.re_process = re_process
def encode(self):
return {
"__type__": "__pipelinecontext__",
"filepath": self.filepath,
"parser_name": self.parser_name,
"calc_id": self.calc_id,
"upload_id": self.upload_id,
"worker_hostname": self.worker_hostname,
"re_process": self.re_process,
}
@staticmethod
def decode(data):
return PipelineContext(
data["filepath"],
data["parser_name"],
data["calc_id"],
data["upload_id"],
data["worker_hostname"],
data["re_process"],
)
class Stage():
"""Stage comprises of a single python function. After this function is
completed, the stage is completed.
"""
def __init__(self, name: str, function):
"""
Args:
name: Name of the stage. The name is used in resolving stage
dependencies.
function: A regular python function that will be executed during
this stage. The function should not return any values as all
communication happens through object persistence in MongoDB. The
function should accept the following arguments:
- context: PipelineContext object
- stage_name: Name of the stage executing the function
- i_stage: The index of this stage in the pipeline
- n_stages: Number of stages in this pipeline
"""
self.name = name
self._function = function
self._pipeline: Pipeline = None
self.index: int = None
self.dependencies: List[Stage] = []
def add_dependency(self, name):
self.dependencies.append(name)
def signature(self):
return wrapper.si(
self._function.__name__,
self._pipeline.context,
self.name,
self.index,
len(self._pipeline.stages)
)
class Pipeline():
"""Pipeline consists of a list of stages. The pipeline is complete when all
stages are finished.
"""
def __init__(self, context: PipelineContext):
"""
Args:
context: The working context for this pipeline.
"""
self.context = context
self.stages: List[Stage] = []
def add_stage(self, stage: Stage):
"""Adds a stage to this pipeline. The stages are executec in the order
they are added with this function.
Args:
stage: The stage to be added to this pipeline.
"""
stage._pipeline = self
stage.index = len(self.stages)
self.stages.append(stage)
@app.task(bind=True, base=NomadCeleryTask)
def wrapper(task, function_name, context, stage_name, i_stage, n_stages):
"""This function wraps the function calls made within the stages. This
wrapper takes care of common tasks like exception handling and timeout
handling, and also persists the state of the pipelines in to MongoDB.
"""
# Create the associated Calc object during first stage (and if not
# reprocessing), otherwise get it from Mongo
calc = None
if i_stage == 0 and not context.re_process:
calc = nomad.processing.data.Calc.create(
calc_id=context.calc_id,
mainfile=context.filepath,
parser=context.parser_name,
worker_hostname=context.worker_hostname,
upload_id=context.upload_id,
current_process="process",
process_status=PROCESS_CALLED,
)
# Get the defined function. If does not exist, log error and fail calculation.
function = globals().get(function_name, None)
if function is None:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.fail('Could not find the function associated with the stage.')
try:
# Try to execute the stage.
function(context, stage_name, i_stage, n_stages)
except SoftTimeLimitExceeded as e:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
logger = calc.get_logger()
logger.error('exceeded the celery task soft time limit')
calc.fail(e)
except Exception as e:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.fail(e)
# After the last stage, save the calculation info
if i_stage == n_stages - 1:
if calc is None:
calc = nomad.processing.data.Calc.get(context.calc_id)
calc.process_status = PROCESS_COMPLETED
calc.save()
@app.task(bind=True, base=NomadCeleryTask)
def empty_task(task, *args, **kwargs):
"""Empty dummy task used as a callback for Celery chords.
"""
pass
@app.task(bind=True, base=NomadCeleryTask)
def upload_cleanup(task, upload_id):
"""Task for cleaning up after processing an upload.
"""
upload = nomad.processing.data.Upload.get(upload_id)
upload.processing_finished()
upload.get_logger().info("Cleanup for upload {} finished.".format(upload_id))
def comp_process(context, stage_name, i_stage, n_stages):
"""Function for processing computational entries: runs parsing and
normalization.
"""
# Process calculation
calc = nomad.processing.data.Calc.get(context.calc_id)
if context.re_process is True:
calc.re_process_calc()
calc.get_logger().info("Re-processing of calculation {} at path {} finished.".format(context.filepath, context.calc_id))
else:
calc.process_calc()
calc.get_logger().info("Processing of calculation {} at path {} finished.".format(context.filepath, context.calc_id))
def get_pipeline(context):
"""Used to fetch a pipeline based on a pipeline context. Typically chosen
simply based on a matched parser name that is stored in the context