Commit e43ab060 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Performance improvements to re_process for large uploads.

parent 4c5685de
Pipeline #68840 passed with stages
in 17 minutes and 52 seconds
......@@ -166,6 +166,11 @@ class Proc(Document, metaclass=ProcMetaclass):
""" Returns True of an asynchrounous process is currently running. """
return self.process_status is not None and self.process_status != PROCESS_COMPLETED
@staticmethod
def process_running_mongoengine_query():
""" Returns a mongoengine query dict (to be used in objects) to find running processes. """
return dict(process_status__in=[PROCESS_CALLED, PROCESS_RUNNING])
def get_logger(self):
return utils.get_logger(
'nomad.processing', task=self.current_task, proc=self.__class__.__name__,
......@@ -198,6 +203,13 @@ class Proc(Document, metaclass=ProcMetaclass):
self.warnings = []
self.worker_hostname = worker_hostname
@staticmethod
def reset_pymongo_update(worker_hostname: str = None):
""" Returns a pymongo update dict part to reset calculations. """
return dict(
current_task=None, tasks_status=PENDING, errors=[], warnings=[],
worker_hostname=worker_hostname)
@classmethod
def get_by_id(cls, id: str, id_field: str):
try:
......
......@@ -658,20 +658,22 @@ class Upload(Proc):
self._continue_with('parse_all')
try:
# check if a calc is already/still processing
for calc in Calc.objects(
upload_id=self.upload_id,
**Calc.process_running_mongoengine_query()).exclude('metadata'):
logger.warn('a process is already running on calc', calc_id=calc.calc_id)
# reset all calcs
Calc._get_collection().update(
dict(upload_id=self.upload_id),
{'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)})
# match and call calc processings
# we use a copy of the mongo queryset; reasons are cursor timeouts and
# changing results on modifying the calc entries
calcs = list(Calc.objects(upload_id=self.upload_id))
calcs = list(Calc.objects(upload_id=self.upload_id).exclude('metadata'))
for calc in calcs:
if calc.process_running:
if calc.current_process == 're_process_calc':
logger.warn('re_process_calc is already running', calc_id=calc.calc_id)
else:
logger.warn('a process is already running on calc', calc_id=calc.calc_id)
continue
calc.reset(worker_hostname=self.worker_hostname)
parser = match_parser(calc.mainfile, staging_upload_files, strict=False)
if parser is None:
logger.error(
......@@ -683,8 +685,12 @@ class Upload(Proc):
'different parser matches during re-process, use new parser',
calc_id=calc.calc_id, parser=parser.name)
calc.re_process_calc()
logger.info('completed to trigger re-process of all calcs')
except Exception as e:
# try to remove the staging copy in failure case
logger.error('failed to trigger re-process of all calcs', exc_info=e)
if staging_upload_files is not None and staging_upload_files.exists():
staging_upload_files.delete()
......
......@@ -225,7 +225,7 @@ def test_re_processing(published: Upload, example_user_metadata, monkeypatch, wi
if not with_failure:
with published.upload_files.archive_log_file(first_calc.calc_id) as f:
old_log_line = f.readline()
old_log_lines = f.readlines()
old_archive_files = list(
archive_file
for archive_file in os.listdir(published.upload_files.os_path)
......@@ -272,8 +272,8 @@ def test_re_processing(published: Upload, example_user_metadata, monkeypatch, wi
# assert changed archive log files
if not with_failure:
with published.upload_files.archive_log_file(first_calc.calc_id) as f:
new_log_line = f.readline()
assert old_log_line != new_log_line
new_log_lines = f.readlines()
assert old_log_lines != new_log_lines
# assert maintained user metadata (mongo+es)
assert_upload_files(upload, PublicUploadFiles, published=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment