Commit c74dc07b authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Improved archive reliability in case of processing failures.

parent 4fa62c8e
Pipeline #71509 failed with stages
in 25 minutes and 27 seconds
......@@ -338,9 +338,15 @@ class StagingUploadFiles(UploadFiles):
def write_archive(self, calc_id: str, data: Any) -> int:
''' Writes the data as archive file and returns the archive file size. '''
write_archive(self.archive_file_object(calc_id).os_path, 1, data=[(calc_id, data)])
with read_archive(self.archive_file_object(calc_id).os_path) as archive:
assert calc_id in archive
archive_file_object = self.archive_file_object(calc_id)
try:
write_archive(archive_file_object.os_path, 1, data=[(calc_id, data)])
except Exception as e:
# in case of failure, remove the possible corrupted archive file
if archive_file_object.exists():
archive_file_object.delete()
raise e
return self.archive_file_object(calc_id).size
......
......@@ -523,7 +523,7 @@ class MResource():
assert section.m_resource == self, 'Can only remove section from the resource that contains it.'
section.m_resource = None
self.__data.get(section.m_def).remove(section)
if section.m_parent is not None:
if section.m_parent is None:
self.contents.remove(section)
def all(self, section_cls: Type[MSectionBound]) -> List[MSectionBound]:
......
......@@ -164,8 +164,18 @@ class Calc(Proc):
e.g. if this method is called for many entries of the same upload.
'''
archive = upload_files.read_archive(self.calc_id)
entry_metadata = datamodel.EntryMetadata.m_from_dict(
archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())
try:
entry_metadata = datamodel.EntryMetadata.m_from_dict(
archive[self.calc_id][datamodel.EntryArchive.section_metadata.name].to_dict())
except KeyError:
# Due hard processing failures, it might be possible that an entry might not
# have an archive
if self._entry_metadata is not None:
entry_metadata = self._entry_metadata
else:
entry_metadata = self.create_metadata()
entry_metadata.m_update_from_dict(self.metadata)
......@@ -203,7 +213,8 @@ class Calc(Proc):
def save_to_calc_log(logger, method_name, event_dict):
try:
dump_dict = dict(event_dict)
# sanitize the event_dict, because all kinds of values might have been added
dump_dict = {key: str(value) for key, value in event_dict.items()}
dump_dict.update(level=method_name.upper())
self._calc_proc_logs.append(dump_dict)
......@@ -235,6 +246,7 @@ class Calc(Proc):
except Exception as e:
logger.error('could not copy archive for non matching, non reprocessed entry', exc_info=e)
raise e
# mock the steps of actual processing
self._continue_with('parsing')
......@@ -322,12 +334,7 @@ class Calc(Proc):
'could not index after processing failure', exc_info=e)
try:
archive = datamodel.EntryArchive()
archive.m_add_sub_section(
datamodel.EntryArchive.section_metadata, self._entry_metadata)
archive.processing_logs = self._calc_proc_logs
self.upload_files.write_archive(self.calc_id, archive.m_to_dict())
self.write_archive(None)
except Exception as e:
self.get_logger().error(
'could not write archive after processing failure', exc_info=e)
......@@ -361,9 +368,6 @@ class Calc(Proc):
self.fail('parser raised system exit', error='system exit', **context)
return
self._parser_backend.entry_archive.m_add_sub_section(
datamodel.EntryArchive.section_metadata, self._entry_metadata)
if self._parser_backend.status[0] != 'ParseSuccess':
error = self._parser_backend.status[1]
self.fail('parser failed', error=error, **context)
......@@ -443,13 +447,34 @@ class Calc(Proc):
logger, 'archived', step='archive',
input_size=self.mainfile_file.size) as log_data:
self._parser_backend.entry_archive.processing_logs = self._calc_proc_logs
self._calc_proc_logs = None
archive_data = self._parser_backend.entry_archive.m_to_dict()
archive_size = self.upload_files.write_archive(self.calc_id, archive_data)
archive_size = self.write_archive(self._parser_backend)
log_data.update(archive_size=archive_size)
def write_archive(self, backend: Backend):
if self._calc_proc_logs is None:
self._calc_proc_logs = []
if backend is not None:
entry_archive = backend.entry_archive.m_copy()
else:
entry_archive = datamodel.EntryArchive()
entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
entry_archive.processing_logs = self._calc_proc_logs
try:
return self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
except Exception as e:
if backend is None:
raise e
# most likely failed due to domain data, try to write metadata and processing logs
entry_archive = datamodel.EntryArchive()
entry_archive.m_add_sub_section(datamodel.EntryArchive.section_metadata, self._entry_metadata)
entry_archive.processing_logs = self._calc_proc_logs
self.upload_files.write_archive(self.calc_id, entry_archive.m_to_dict())
raise e
def __str__(self):
return 'calc %s calc_id=%s upload_id%s' % (super().__str__(), self.calc_id, self.upload_id)
......
......@@ -510,6 +510,14 @@ class TestM1:
assert len(resource.all(System)) == 2
def test_resource_move(self):
resource = MResource()
run = resource.create(Run)
system = run.m_create(System)
run = Run()
run.m_add_sub_section(Run.systems, system)
def test_mapping(self):
run = Run()
run.m_create(Parsing).parser_name = 'test'
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment