diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0d79bfec493a29bc064cb6c58f736ace8746d0ae..e4669f81305e562009056b3530c445545da1d5ae 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -68,8 +68,6 @@ tests: image: $TEST_IMAGE services: - rabbitmq - - name: redis:5.0.8-alpine - alias: redis - name: docker.elastic.co/elasticsearch/elasticsearch:6.3.2 alias: elastic # fix issue with running elastic in gitlab ci runner: @@ -80,7 +78,6 @@ tests: RABBITMQ_DEFAULT_USER: rabbitmq RABBITMQ_DEFAULT_PASS: rabbitmq RABBITMQ_DEFAULT_VHOST: / - NOMAD_REDIS_HOST: redis NOMAD_RABBITMQ_HOST: rabbitmq NOMAD_ELASTIC_HOST: elastic NOMAD_MONGO_HOST: mongo diff --git a/nomad/processing/base.py b/nomad/processing/base.py index 66bcc7037abbf4a475406e8746c0179a9fbf7961..51cb19072d7b96fac916f745876de09ace783a74 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -16,7 +16,7 @@ from typing import List, Any, Dict import logging import time import os -from celery import Task +from celery import Celery, Task from celery.worker.request import Request from celery.signals import after_setup_task_logger, after_setup_logger, worker_process_init, \ celeryd_after_setup, worker_process_shutdown @@ -30,7 +30,6 @@ from datetime import datetime import functools from nomad import config, utils, infrastructure -from nomad.processing.celeryapp import app import nomad.patch # pylint: disable=unused-import @@ -75,6 +74,14 @@ def on_worker_process_shutdown(*args, **kwargs): disconnect() +app = Celery('nomad.processing', broker=config.rabbitmq_url()) +app.conf.update(worker_hijack_root_logger=False) +app.conf.update(worker_max_memory_per_child=config.celery.max_memory) +if config.celery.routing == config.CELERY_WORKER_ROUTING: + app.conf.update(worker_direct=True) + +app.conf.task_queue_max_priority = 10 + CREATED = 'CREATED' PENDING = 'PENDING' RUNNING = 'RUNNING' @@ -528,7 +535,10 @@ def unwarp_task(task, cls_name, self_id, *args, **kwargs): return self -@app.task(bind=True, base=NomadCeleryTask) +@app.task( + bind=True, base=NomadCeleryTask, ignore_results=True, max_retries=3, + acks_late=config.celery.acks_late, soft_time_limit=config.celery.timeout, + time_limit=config.celery.timeout * 2) def proc_task(task, cls_name, self_id, func_attr): ''' The celery task that is used to execute async process functions. diff --git a/nomad/processing/celeryapp.py b/nomad/processing/celeryapp.py deleted file mode 100644 index 70f4dc77422d18ce561d3a6cfc49e67606080b05..0000000000000000000000000000000000000000 --- a/nomad/processing/celeryapp.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2018 Markus Scheidgen -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an"AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This module contains the Celery configuration. -""" -from celery import Celery -from nomad import config - -# Celery is configured to use redis as a results backend. Although the results -# are not forwarded within the processing pipeline, celery requires the results -# backend to be configured in order to use chained tasks. -app = Celery( - 'nomad.processing', - backend=config.redis_url(), - broker=config.rabbitmq_url(), -) -app.conf.update(accept_content=["myjson"]) -app.conf.update(task_serializer="myjson") -app.conf.update(result_serializer="myjson") -app.conf.update(worker_hijack_root_logger=False) -app.conf.update(worker_max_memory_per_child=config.celery.max_memory) -app.conf.update(soft_task_time_limit=2 * config.celery.timeout) -app.conf.update(task_time_limit=config.celery.timeout) -app.conf.update(task_acks_late=config.celery.acks_late) -app.conf.update(task_ignore_result=True) -app.Task.max_retries = 3 -if config.celery.routing == config.CELERY_WORKER_ROUTING: - app.conf.update(worker_direct=True) -app.conf.task_queue_max_priority = 10 diff --git a/nomad/processing/data.py b/nomad/processing/data.py index a213afe68cc78e428932c9e3a256a4904afc098b..70027dc4e45e6ca84649dc2f2d27a0457e1eaeea 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -23,7 +23,8 @@ calculations, and files .. autoclass:: Upload ''' -from typing import cast, List, Any, Iterator, Dict, cast, Iterable + +from typing import cast, List, Any, Tuple, Iterator, Dict, cast, Iterable from mongoengine import StringField, DateTimeField, DictField, BooleanField, IntField import logging from structlog import wrap_logger @@ -31,16 +32,14 @@ from contextlib import contextmanager import os.path from datetime import datetime from pymongo import UpdateOne -from celery.utils import worker_direct import hashlib from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper from nomad import utils, config, infrastructure, search, datamodel from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles, StagingUploadFiles -from nomad.processing.base import Proc, process, task, ProcessAlreadyRunning, PENDING, SUCCESS, FAILURE, PROCESS_CALLED, PROCESS_COMPLETED +from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE from nomad.parsing import parser_dict, match_parser, Backend from nomad.normalizing import normalizers -from nomad.processing.pipelines import run_pipelines, PipelineContext def _pack_log_event(logger, method_name, event_dict): @@ -231,6 +230,7 @@ class Calc(Proc): return wrap_logger(logger, processors=_log_processors + [save_to_calc_log]) + @process def re_process_calc(self): ''' Processes a calculation again. This means there is already metadata and @@ -293,6 +293,7 @@ class Calc(Proc): except Exception as e: logger.error('could unload processing results', exc_info=e) + @process def process_calc(self): ''' Processes a new calculation that has no prior records in the mongo, elastic, @@ -350,6 +351,14 @@ class Calc(Proc): self.get_logger().error( 'could not write archive after processing failure', exc_info=e) + def on_process_complete(self, process_name): + # the save might be necessary to correctly read the join condition from the db + self.save() + # in case of error, the process_name might be unknown + if process_name == 'process_calc' or process_name == 're_process_calc' or process_name is None: + self.upload.reload() + self.upload.check_join() + @task def parsing(self): ''' The *task* that encapsulates all parsing related actions. ''' @@ -510,6 +519,7 @@ class Upload(Proc): published: Boolean that indicates the publish status publish_time: Date when the upload was initially published last_update: Date of the last publishing/re-processing + joined: Boolean indicates if the running processing has joined (:func:`check_join`) ''' id_field = 'upload_id' @@ -525,11 +535,12 @@ class Upload(Proc): publish_time = DateTimeField() last_update = DateTimeField() + joined = BooleanField(default=False) + meta: Any = { 'indexes': [ 'user_id', 'tasks_status', 'process_status', 'published', 'upload_time' - ], - 'strict': False # ignore extra fields to support older entries with join related fields + ] } def __init__(self, **kwargs): @@ -728,7 +739,7 @@ class Upload(Proc): self._continue_with('parse_all') try: - # Check if a calc is already/still processing + # check if a calc is already/still processing processing = Calc.objects( upload_id=self.upload_id, **Calc.process_running_mongoengine_query()).count() @@ -738,37 +749,13 @@ class Upload(Proc): 'processes are still/already running on calc, they will be resetted', count=processing) - # Reset all calcs + # reset all calcs Calc._get_collection().update_many( dict(upload_id=self.upload_id), {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)}) - # Resolve queue and priority - queue = None - if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None: - queue = worker_direct(self.worker_hostname).name - priority = config.celery.priorities.get('%s.%s' % ("Calc", "re_process"), 1) - - # Re-process all calcs - def pipeline_generator(): - query = dict(upload_id=self.upload_id) - running_query = dict(Calc.process_running_mongoengine_query()) - running_query.update(query) - if Calc.objects(**running_query).first() is not None: - raise ProcessAlreadyRunning('Tried to call a processing function on an already processing process.') - Calc._get_collection().update_many(query, {'$set': dict( - current_process="re_process_calc", - process_status=PROCESS_CALLED)}) - for calc in Calc.objects(**query): - yield PipelineContext( - calc.mainfile, - calc.parser, - calc.calc_id, - calc.upload_id, - calc.worker_hostname, - re_process=True - ) - run_pipelines(pipeline_generator(), self.upload_id, queue, priority) + # process call calcs + Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata']) logger.info('completed to trigger re-process of all calcs') except Exception as e: @@ -796,6 +783,7 @@ class Upload(Proc): self._continue_with('cleanup') self.upload_files.re_pack(self.user_metadata()) + self.joined = True self._complete() @process @@ -883,54 +871,34 @@ class Upload(Proc): self.staging_upload_files.raw_file_object(path).os_path, self.staging_upload_files.raw_file_object(stripped_path).os_path)) - def match_mainfiles(self) -> Iterator[PipelineContext]: - """Generator function that iterates over files in an upload and returns - basic information for each found mainfile. + def match_mainfiles(self) -> Iterator[Tuple[str, object]]: + ''' + Generator function that matches all files in the upload to all parsers to + determine the upload's mainfiles. Returns: - PipelineContext - """ + Tuples of mainfile, filename, and parsers + ''' directories_with_match: Dict[str, str] = dict() upload_files = self.staging_upload_files - for filepath in upload_files.raw_file_manifest(): - self._preprocess_files(filepath) + for filename in upload_files.raw_file_manifest(): + self._preprocess_files(filename) try: - parser = match_parser(upload_files.raw_file_object(filepath).os_path) + parser = match_parser(upload_files.raw_file_object(filename).os_path) if parser is not None: - directory = os.path.dirname(filepath) + directory = os.path.dirname(filename) if directory in directories_with_match: # TODO this might give us the chance to store directory based relationship # between calcs for the future? pass else: - directories_with_match[directory] = filepath - yield PipelineContext( - filepath, - parser.name, - upload_files.calc_id(filepath), - self.upload_id, - self.worker_hostname - ) + directories_with_match[directory] = filename + + yield filename, parser except Exception as e: self.get_logger().error( 'exception while matching pot. mainfile', - mainfile=filepath, exc_info=e) - - def processing_started(self): - """Informs MongoDB that this Upload has started processing. - """ - # Tell Upload that a process has been started. - self.current_process = "process" - self.process_status = PROCESS_CALLED - self.save() - - def processing_finished(self): - """Informs MongoDB that this Upload has finished processing. - """ - # Tell Upload that a process has been started. - self.process_status = PROCESS_COMPLETED - self.save() - self.cleanup() + mainfile=filename, exc_info=e) @task def parse_all(self): @@ -943,30 +911,54 @@ class Upload(Proc): with utils.timer( logger, 'upload extracted', step='matching', upload_size=self.upload_files.size): + for filename, parser in self.match_mainfiles(): + calc = Calc.create( + calc_id=self.upload_files.calc_id(filename), + mainfile=filename, parser=parser.name, + worker_hostname=self.worker_hostname, + upload_id=self.upload_id) - # Tell Upload that a process has been started. - self.processing_started() - - # Resolve queue and priority - queue = None - if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None: - queue = worker_direct(self.worker_hostname).name - priority = config.celery.priorities.get('%s.%s' % ("Calc", "process"), 1) + calc.process_calc() - # Start running all pipelines - n_pipelines = run_pipelines(self.match_mainfiles(), self.upload_id, queue, priority) + def on_process_complete(self, process_name): + if process_name == 'process_upload' or process_name == 're_process_upload': + self.check_join() - # If the upload has not spawned any pipelines, tell it that it is - # finished and perform cleanup - if n_pipelines == 0: - self.processing_finished() + def check_join(self): + ''' + Performs an evaluation of the join condition and triggers the :func:`cleanup` + task if necessary. The join condition allows to run the ``cleanup`` after + all calculations have been processed. The upload processing stops after all + calculation processings have been triggered (:func:`parse_all` or + :func:`re_process_upload`). The cleanup task is then run within the last + calculation process (the one that triggered the join by calling this method). + ''' + total_calcs = self.total_calcs + processed_calcs = self.processed_calcs + + self.get_logger().debug('check join', processed_calcs=processed_calcs, total_calcs=total_calcs) + # check if process is not running anymore, i.e. not still spawining new processes to join + # check the join condition, i.e. all calcs have been processed + if not self.process_running and processed_calcs >= total_calcs: + # this can easily be called multiple times, e.g. upload finished after all calcs finished + modified_upload = self._get_collection().find_one_and_update( + {'_id': self.upload_id, 'joined': {'$ne': True}}, + {'$set': {'joined': True}}) + if modified_upload is not None: + self.get_logger().debug('join') + self.cleanup() + else: + # the join was already done due to a prior call + pass def reset(self): + self.joined = False super().reset() @classmethod def reset_pymongo_update(cls, worker_hostname: str = None): update = super().reset_pymongo_update() + update.update(joined=False) return update def _cleanup_after_processing(self): diff --git a/nomad/processing/pipelines.py b/nomad/processing/pipelines.py deleted file mode 100644 index 4a7435c42e4d4ab99f4d70ec2920123c98b16dea..0000000000000000000000000000000000000000 --- a/nomad/processing/pipelines.py +++ /dev/null @@ -1,330 +0,0 @@ -# Copyright 2018 Markus Scheidgen -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an"AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -This module contains the objects and setups for running pipelines. -""" -from typing import List -from collections import defaultdict -import networkx as nx -from celery import chain, group, chord -from celery.exceptions import SoftTimeLimitExceeded -from nomad.processing.base import NomadCeleryTask, PROCESS_CALLED, PROCESS_COMPLETED -from nomad.processing.celeryapp import app -import nomad.processing.data - -import json -from kombu.serialization import register - - -def my_dumps(obj): - """Custom JSON encoder function for Celery tasks. - """ - class MyEncoder(json.JSONEncoder): - def default(self, obj): # pylint: disable=E0202 - if isinstance(obj, PipelineContext): - return obj.encode() - else: - return json.JSONEncoder.default(self, obj) - - return json.dumps(obj, cls=MyEncoder) - - -def my_loads(obj): - """Custom JSON decoder function for Celery tasks. - """ - def my_decoder(obj): - if '__type__' in obj: - if obj['__type__'] == '__pipelinecontext__': - return PipelineContext.decode(obj) - return obj - return json.loads(obj, object_hook=my_decoder) - - -register("myjson", my_dumps, my_loads, content_type='application/x-myjson', content_encoding='utf-8') - - -class PipelineContext(): - """Convenience class for storing pipeline execution related information. - Provides custom encode/decode functions for JSON serialization with Celery. - """ - def __init__(self, filepath, parser_name, calc_id, upload_id, worker_hostname, re_process=False): - self.filepath = filepath - self.parser_name = parser_name - self.calc_id = calc_id - self.upload_id = upload_id - self.worker_hostname = worker_hostname - self.re_process = re_process - - def encode(self): - return { - "__type__": "__pipelinecontext__", - "filepath": self.filepath, - "parser_name": self.parser_name, - "calc_id": self.calc_id, - "upload_id": self.upload_id, - "worker_hostname": self.worker_hostname, - "re_process": self.re_process, - } - - @staticmethod - def decode(data): - return PipelineContext( - data["filepath"], - data["parser_name"], - data["calc_id"], - data["upload_id"], - data["worker_hostname"], - data["re_process"], - ) - - -class Stage(): - """Stage comprises of a single python function. After this function is - completed, the stage is completed. - """ - def __init__(self, name: str, function): - """ - Args: - name: Name of the stage. The name is used in resolving stage - dependencies. - function: A regular python function that will be executed during - this stage. The function should not return any values as all - communication happens through object persistence in MongoDB. The - function should accept the following arguments: - - - context: PipelineContext object - - stage_name: Name of the stage executing the function - - i_stage: The index of this stage in the pipeline - - n_stages: Number of stages in this pipeline - """ - self.name = name - self._function = function - self._pipeline: Pipeline = None - self.index: int = None - self.dependencies: List[Stage] = [] - - def add_dependency(self, name): - self.dependencies.append(name) - - def signature(self): - return wrapper.si( - self._function.__name__, - self._pipeline.context, - self.name, - self.index, - len(self._pipeline.stages) - ) - - -class Pipeline(): - """Pipeline consists of a list of stages. The pipeline is complete when all - stages are finished. - """ - def __init__(self, context: PipelineContext): - """ - Args: - context: The working context for this pipeline. - """ - self.context = context - self.stages: List[Stage] = [] - - def add_stage(self, stage: Stage): - """Adds a stage to this pipeline. The stages are executec in the order - they are added with this function. - - Args: - stage: The stage to be added to this pipeline. - """ - stage._pipeline = self - stage.index = len(self.stages) - self.stages.append(stage) - - -@app.task(bind=True, base=NomadCeleryTask) -def wrapper(task, function_name, context, stage_name, i_stage, n_stages): - """This function wraps the function calls made within the stages. This - wrapper takes care of common tasks like exception handling and timeout - handling, and also persists the state of the pipelines in to MongoDB. - """ - # Create the associated Calc object during first stage (and if not - # reprocessing), otherwise get it from Mongo - calc = None - if i_stage == 0 and not context.re_process: - calc = nomad.processing.data.Calc.create( - calc_id=context.calc_id, - mainfile=context.filepath, - parser=context.parser_name, - worker_hostname=context.worker_hostname, - upload_id=context.upload_id, - current_process="process", - process_status=PROCESS_CALLED, - ) - - # Get the defined function. If does not exist, log error and fail calculation. - function = globals().get(function_name, None) - if function is None: - if calc is None: - calc = nomad.processing.data.Calc.get(context.calc_id) - calc.fail('Could not find the function associated with the stage.') - try: - # Try to execute the stage. - function(context, stage_name, i_stage, n_stages) - except SoftTimeLimitExceeded as e: - if calc is None: - calc = nomad.processing.data.Calc.get(context.calc_id) - logger = calc.get_logger() - logger.error('exceeded the celery task soft time limit') - calc.fail(e) - except Exception as e: - if calc is None: - calc = nomad.processing.data.Calc.get(context.calc_id) - calc.fail(e) - - # After the last stage, save the calculation info - if i_stage == n_stages - 1: - if calc is None: - calc = nomad.processing.data.Calc.get(context.calc_id) - calc.process_status = PROCESS_COMPLETED - calc.save() - - -@app.task(bind=True, base=NomadCeleryTask) -def empty_task(task, *args, **kwargs): - """Empty dummy task used as a callback for Celery chords. - """ - pass - - -@app.task(bind=True, base=NomadCeleryTask) -def upload_cleanup(task, upload_id): - """Task for cleaning up after processing an upload. - """ - upload = nomad.processing.data.Upload.get(upload_id) - upload.processing_finished() - upload.get_logger().info("Cleanup for upload {} finished.".format(upload_id)) - - -def comp_process(context, stage_name, i_stage, n_stages): - """Function for processing computational entries: runs parsing and - normalization. - """ - # Process calculation - calc = nomad.processing.data.Calc.get(context.calc_id) - if context.re_process is True: - calc.re_process_calc() - calc.get_logger().info("Re-processing of calculation {} at path {} finished.".format(context.filepath, context.calc_id)) - else: - calc.process_calc() - calc.get_logger().info("Processing of calculation {} at path {} finished.".format(context.filepath, context.calc_id)) - - -def get_pipeline(context): - """Used to fetch a pipeline based on a pipeline context. Typically chosen - simply based on a matched parser name that is stored in the context - - Args: - context: The context based on which the pipeline is chosen. - - Returns: - Pipeline: The pipeline to execute for the given context. - """ - pipeline = Pipeline(context) - - # Phonopy pipeline - if context.parser_name == "parsers/phonopy": - stage1 = Stage("comp_process_phonopy", comp_process) - stage1.add_dependency("comp_process") - pipeline.add_stage(stage1) - # DFT pipeline - else: - stage1 = Stage("comp_process", comp_process) - pipeline.add_stage(stage1) - - return pipeline - - -def run_pipelines(context_generator, upload_id, queue, priority) -> int: - """Used to start running pipelines based on the PipelineContext objects - generated by the given generator. Currently the implementation searches all - the mainfiles in an upload before starting to execute any stages. This - makes synchronization with Celery much easier. - - Returns: - The number of pipelines that were started. - """ - # Resolve all pipelines into disconnected dependency trees and run - # each tree in parallel. - stage_dependencies = [] - stages: defaultdict = defaultdict(list) - stage_names = set() - n_pipelines = 0 - for context in context_generator: - pipeline = get_pipeline(context) - n_pipelines += 1 - for stage in pipeline.stages: - - # Store stage names to be used as nodes - stage_names.add(stage.name) - - # Store dependencies to be used as edges - for dependency in stage.dependencies: - stage_dependencies.append((stage.name, dependency)) - stages[stage.name].append(stage) - - if n_pipelines != 0: - # Resolve all independent dependency trees - dependency_graph = nx.DiGraph() - dependency_graph.add_nodes_from(stage_names) - dependency_graph.add_edges_from(stage_dependencies) - dependency_trees = nx.weakly_connected_components(dependency_graph) - - # Form chains for each independent tree. - chains = [] - for tree_nodes in dependency_trees: - tree = dependency_graph.subgraph(tree_nodes).copy() - sorted_nodes = nx.topological_sort(tree) - groups = [] - for node in reversed(list(sorted_nodes)): - # Group all tasks for a stage - tasks = stages[node] - task_signatures = [] - for task in tasks: - task_signatures.append(task.signature().set(queue=queue, priority=priority)) - task_group = group(*task_signatures) - - # Celery does not allow directly chaining groups. To simulate - # this behaviour, we instead wrap the groups inside chords - # which can be chained. The callback is a dummy function that - # does nothing. - groups.append(chord(task_group, body=empty_task.si())) - - # Form a chain of stages for this tree - stage_chain = chain(*groups) - chains.append(stage_chain) - - # This is is the group that will start executing all independent stage trees parallelly. - tree_group = group(*chains) - - # After all trees are finished, the upload should perform cleanup - final_chain = chain(tree_group, upload_cleanup.si(upload_id)) - res = final_chain.delay() - - # According to Celery docs we need to release the result resources: - # https://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-result - # None of the results are ever used anyways, the result backend is configured - # only to keep the state of the task chains. - res.forget() - - return n_pipelines diff --git a/tests/conftest.py b/tests/conftest.py index 64e5e86fe044c168d6dec38558c2a613ac9aa5a6..1e4796f18ccece6e55540da875158777fb0322d5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,12 +29,10 @@ import elasticsearch.exceptions from typing import List import json import logging -from kombu.serialization import register from nomad import config, infrastructure, parsing, processing, app, utils from nomad.utils import structlogging from nomad.datamodel import User -from nomad.processing.pipelines import my_dumps, my_loads from tests import test_parsing from tests.normalizing.conftest import run_normalize @@ -116,17 +114,9 @@ def celery_includes(): @pytest.fixture(scope='session') def celery_config(): - - # Custom JSON encode/decode - register("myjson", my_dumps, my_loads, content_type='application/x-myjson', content_encoding='utf-8') - return { 'broker_url': config.rabbitmq_url(), - 'result_backend': config.redis_url(), - 'task_queue_max_priority': 10, - 'accept_content': ["myjson"], - 'task_serializer': "myjson", - 'result_serializer': "myjson", + 'task_queue_max_priority': 10 }