From 9a070b1468fc8aaf281887a0c2ba77438632c9f2 Mon Sep 17 00:00:00 2001 From: Markus Scheidgen <markus.scheidgen@gmail.com> Date: Tue, 31 Mar 2020 14:48:27 +0200 Subject: [PATCH] Allow to reprocess staging files. --- nomad/config.py | 2 +- nomad/processing/data.py | 51 ++++++++++++++++++++++------------- tests/processing/test_data.py | 31 ++++++++++++++++++++- 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/nomad/config.py b/nomad/config.py index 002eb5d3a7..7b4318b1a6 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -145,7 +145,7 @@ services = NomadConfig( upload_limit=10, force_raw_file_decoding=False, download_scan_size=500, - download_scan_timeout=u'30m' + download_scan_timeout=u'30m', ) tests = NomadConfig( diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 3ee70bcfde..e44c1a7a76 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -695,8 +695,6 @@ class Upload(Proc): TODO this implementation does not do any re-matching. This will be more complex due to handling of new or missing matches. ''' - assert self.published - logger = self.get_logger() logger.info('started to re-process') @@ -705,8 +703,21 @@ class Upload(Proc): # extract the published raw files into a staging upload files instance self._continue_with('extracting') - public_upload_files = cast(PublicUploadFiles, self.upload_files) - staging_upload_files = public_upload_files.to_staging_upload_files(create=True) + + if self.published: + try: + staging_upload_files = StagingUploadFiles(self.upload_id) + # public files exist and there is a staging directory, it is probably old + # and we delete it first + staging_upload_files.delete() + logger.warn('deleted old staging files') + + except KeyError as e: + logger.info('reprocessing published files') + else: + logger.info('reprocessing staging files') + + staging_upload_files = self.upload_files.to_staging_upload_files(create=True) self._continue_with('parse_all') try: @@ -733,8 +744,9 @@ class Upload(Proc): # try to remove the staging copy in failure case logger.error('failed to trigger re-process of all calcs', exc_info=e) - if staging_upload_files is not None and staging_upload_files.exists(): - staging_upload_files.delete() + if self.published: + if staging_upload_files is not None and staging_upload_files.exists(): + staging_upload_files.delete() raise e @@ -956,23 +968,26 @@ class Upload(Proc): def _cleanup_after_re_processing(self): logger = self.get_logger() - logger.info('started to repack re-processed upload') + if self.published: + staging_upload_files = self.upload_files.to_staging_upload_files() + logger.info('started to repack re-processed upload') - staging_upload_files = self.upload_files.to_staging_upload_files() + with utils.timer( + logger, 'reprocessed staged upload packed', step='repack staged', + upload_size=self.upload_files.size): - with utils.timer( - logger, 'reprocessed staged upload packed', step='delete staged', - upload_size=self.upload_files.size): + staging_upload_files.pack(self.user_metadata(), skip_raw=True) - staging_upload_files.pack(self.user_metadata(), skip_raw=True) + with utils.timer( + logger, 'reprocessed staged upload deleted', step='delete staged', + upload_size=self.upload_files.size): - with utils.timer( - logger, 'reprocessed staged upload deleted', step='delete staged', - upload_size=self.upload_files.size): + staging_upload_files.delete() + self.last_update = datetime.utcnow() + self.save() - staging_upload_files.delete() - self.last_update = datetime.utcnow() - self.save() + else: + logger.info('no cleanup after re-processing unpublished upload') @task def cleanup(self): diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index e25ed455e6..89ad0bcd00 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -207,7 +207,6 @@ def test_process_non_existing(proc_infra, test_user, with_error): assert upload.tasks_status == FAILURE assert len(upload.errors) > 0 - @pytest.mark.timeout(config.tests.default_timeout) @pytest.mark.parametrize('with_failure', [None, 'before', 'after', 'not-matched']) def test_re_processing(published: Upload, internal_example_user_metadata, monkeypatch, with_failure): @@ -302,6 +301,36 @@ def test_re_processing(published: Upload, internal_example_user_metadata, monkey assert entry_metadata.atoms == [] +@pytest.mark.parametrize('publish,old_staging', [ + (False, False), (True, True), (True, False)]) +def test_re_process_staging(non_empty_processed, publish, old_staging): + upload = non_empty_processed + + if publish: + upload.publish_upload() + try: + upload.block_until_complete(interval=.01) + except Exception: + pass + + if old_staging: + StagingUploadFiles(upload.upload_id, create=True) + + upload.reset() + upload.re_process_upload() + try: + upload.block_until_complete(interval=.01) + except Exception: + pass + + assert_processing(upload, published=publish) + if publish: + with pytest.raises(KeyError): + StagingUploadFiles(upload.upload_id) + else: + StagingUploadFiles(upload.upload_id) + + @pytest.mark.timeout(config.tests.default_timeout) @pytest.mark.parametrize('with_failure', [None, 'before', 'after']) def test_re_pack(published: Upload, monkeypatch, with_failure): -- GitLab