Commit 9b192d2c authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Call calc processing in reprocess upload via mongo update_many.

parent f7f98c71
Pipeline #68848 passed with stages
in 13 minutes and 47 seconds
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Any
from typing import List, Any, Dict
import logging
import time
import os
......@@ -341,6 +341,47 @@ class Proc(Document, metaclass=ProcMetaclass):
time.sleep(interval)
self.reload()
@classmethod
def process_all(cls, func, query: Dict[str, Any], exclude: List[str] = []):
"""
Allows to run process functions for all objects on the given query. Calling
process functions though the func:`process` wrapper might be slow, because
it causes a save on each call. This function will use a query based update to
do the same for all objects at once.
"""
running_query = dict(cls.process_running_mongoengine_query())
running_query.update(query)
if cls.objects(**running_query).first() is not None:
raise ProcessAlreadyRunning('Tried to call a processing function on an already processing process.')
cls._get_collection().update_many(query, {'$set': dict(
current_process=func.__name__,
process_status=PROCESS_CALLED)})
for obj in cls.objects(**query).exclude(*exclude):
obj._run_process(func)
def _run_process(self, func):
if hasattr(func, '__process_unwrapped'):
func = getattr(func, '__process_unwrapped')
self_id = self.id.__str__()
cls_name = self.__class__.__name__
queue = None
if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
queue = worker_direct(self.worker_hostname).name
priority = config.celery.priorities.get('%s.%s' % (cls_name, func.__name__), 1)
logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__)
logger.debug('calling process function', queue=queue, priority=priority)
return proc_task.apply_async(
args=[cls_name, self_id, func.__name__],
queue=queue, priority=priority)
def __str__(self):
return 'proc celery_task_id=%s worker_hostname=%s' % (self.celery_task_id, self.worker_hostname)
......@@ -549,21 +590,7 @@ def process(func):
self.process_status = PROCESS_CALLED
self.save()
self_id = self.id.__str__()
cls_name = self.__class__.__name__
queue = None
if config.celery.routing == config.CELERY_WORKER_ROUTING and self.worker_hostname is not None:
queue = worker_direct(self.worker_hostname).name
priority = config.celery.priorities.get('%s.%s' % (cls_name, func.__name__), 1)
logger = utils.get_logger(__name__, cls=cls_name, id=self_id, func=func.__name__)
logger.debug('calling process function', queue=queue, priority=priority)
return proc_task.apply_async(
args=[cls_name, self_id, func.__name__],
queue=queue, priority=priority)
self._run_process(func)
task = getattr(func, '__task_name', None)
if task is not None:
......
......@@ -682,12 +682,14 @@ class Upload(Proc):
calc_id=calc.calc_id)
elif calc.parser != parser.name:
calc.parser = parser.name
calc.save()
logger.info(
'different parser matches during re-process, use new parser',
calc_id=calc.calc_id, parser=parser.name)
calc.re_process_calc()
logger.info('completed to trigger re-process of all calcs')
Calc.process_all(Calc.re_process_calc, dict(upload_id=self.upload_id), exclude=['metadata'])
logger.info('completed to trigger re-process of all calcs')
except Exception as e:
# try to remove the staging copy in failure case
logger.error('failed to trigger re-process of all calcs', exc_info=e)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment