Commit 9a070b14 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Allow to reprocess staging files.

parent c76f40c3
......@@ -145,7 +145,7 @@ services = NomadConfig(
upload_limit=10,
force_raw_file_decoding=False,
download_scan_size=500,
download_scan_timeout=u'30m'
download_scan_timeout=u'30m',
)
tests = NomadConfig(
......
......@@ -695,8 +695,6 @@ class Upload(Proc):
TODO this implementation does not do any re-matching. This will be more complex
due to handling of new or missing matches.
'''
assert self.published
logger = self.get_logger()
logger.info('started to re-process')
......@@ -705,8 +703,21 @@ class Upload(Proc):
# extract the published raw files into a staging upload files instance
self._continue_with('extracting')
public_upload_files = cast(PublicUploadFiles, self.upload_files)
staging_upload_files = public_upload_files.to_staging_upload_files(create=True)
if self.published:
try:
staging_upload_files = StagingUploadFiles(self.upload_id)
# public files exist and there is a staging directory, it is probably old
# and we delete it first
staging_upload_files.delete()
logger.warn('deleted old staging files')
except KeyError as e:
logger.info('reprocessing published files')
else:
logger.info('reprocessing staging files')
staging_upload_files = self.upload_files.to_staging_upload_files(create=True)
self._continue_with('parse_all')
try:
......@@ -733,8 +744,9 @@ class Upload(Proc):
# try to remove the staging copy in failure case
logger.error('failed to trigger re-process of all calcs', exc_info=e)
if staging_upload_files is not None and staging_upload_files.exists():
staging_upload_files.delete()
if self.published:
if staging_upload_files is not None and staging_upload_files.exists():
staging_upload_files.delete()
raise e
......@@ -956,23 +968,26 @@ class Upload(Proc):
def _cleanup_after_re_processing(self):
logger = self.get_logger()
logger.info('started to repack re-processed upload')
if self.published:
staging_upload_files = self.upload_files.to_staging_upload_files()
logger.info('started to repack re-processed upload')
staging_upload_files = self.upload_files.to_staging_upload_files()
with utils.timer(
logger, 'reprocessed staged upload packed', step='repack staged',
upload_size=self.upload_files.size):
with utils.timer(
logger, 'reprocessed staged upload packed', step='delete staged',
upload_size=self.upload_files.size):
staging_upload_files.pack(self.user_metadata(), skip_raw=True)
staging_upload_files.pack(self.user_metadata(), skip_raw=True)
with utils.timer(
logger, 'reprocessed staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
with utils.timer(
logger, 'reprocessed staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
staging_upload_files.delete()
self.last_update = datetime.utcnow()
self.save()
staging_upload_files.delete()
self.last_update = datetime.utcnow()
self.save()
else:
logger.info('no cleanup after re-processing unpublished upload')
@task
def cleanup(self):
......
......@@ -207,7 +207,6 @@ def test_process_non_existing(proc_infra, test_user, with_error):
assert upload.tasks_status == FAILURE
assert len(upload.errors) > 0
@pytest.mark.timeout(config.tests.default_timeout)
@pytest.mark.parametrize('with_failure', [None, 'before', 'after', 'not-matched'])
def test_re_processing(published: Upload, internal_example_user_metadata, monkeypatch, with_failure):
......@@ -302,6 +301,36 @@ def test_re_processing(published: Upload, internal_example_user_metadata, monkey
assert entry_metadata.atoms == []
@pytest.mark.parametrize('publish,old_staging', [
(False, False), (True, True), (True, False)])
def test_re_process_staging(non_empty_processed, publish, old_staging):
upload = non_empty_processed
if publish:
upload.publish_upload()
try:
upload.block_until_complete(interval=.01)
except Exception:
pass
if old_staging:
StagingUploadFiles(upload.upload_id, create=True)
upload.reset()
upload.re_process_upload()
try:
upload.block_until_complete(interval=.01)
except Exception:
pass
assert_processing(upload, published=publish)
if publish:
with pytest.raises(KeyError):
StagingUploadFiles(upload.upload_id)
else:
StagingUploadFiles(upload.upload_id)
@pytest.mark.timeout(config.tests.default_timeout)
@pytest.mark.parametrize('with_failure', [None, 'before', 'after'])
def test_re_pack(published: Upload, monkeypatch, with_failure):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment