diff --git a/nomad/config.py b/nomad/config.py index 002eb5d3a75c64fcdff656395d85d2112fe55548..7b4318b1a6fe32e6eadfd6dd4c6b4a0d4a9a6ccd 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -145,7 +145,7 @@ services = NomadConfig( upload_limit=10, force_raw_file_decoding=False, download_scan_size=500, - download_scan_timeout=u'30m' + download_scan_timeout=u'30m', ) tests = NomadConfig( diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 3ee70bcfdedea8e63f4d75b5a6ea1df09ca13e96..e44c1a7a76c023147284e3341b837ffaefa6fa69 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -695,8 +695,6 @@ class Upload(Proc): TODO this implementation does not do any re-matching. This will be more complex due to handling of new or missing matches. ''' - assert self.published - logger = self.get_logger() logger.info('started to re-process') @@ -705,8 +703,21 @@ class Upload(Proc): # extract the published raw files into a staging upload files instance self._continue_with('extracting') - public_upload_files = cast(PublicUploadFiles, self.upload_files) - staging_upload_files = public_upload_files.to_staging_upload_files(create=True) + + if self.published: + try: + staging_upload_files = StagingUploadFiles(self.upload_id) + # public files exist and there is a staging directory, it is probably old + # and we delete it first + staging_upload_files.delete() + logger.warn('deleted old staging files') + + except KeyError as e: + logger.info('reprocessing published files') + else: + logger.info('reprocessing staging files') + + staging_upload_files = self.upload_files.to_staging_upload_files(create=True) self._continue_with('parse_all') try: @@ -733,8 +744,9 @@ class Upload(Proc): # try to remove the staging copy in failure case logger.error('failed to trigger re-process of all calcs', exc_info=e) - if staging_upload_files is not None and staging_upload_files.exists(): - staging_upload_files.delete() + if self.published: + if staging_upload_files is not None and staging_upload_files.exists(): + staging_upload_files.delete() raise e @@ -956,23 +968,26 @@ class Upload(Proc): def _cleanup_after_re_processing(self): logger = self.get_logger() - logger.info('started to repack re-processed upload') + if self.published: + staging_upload_files = self.upload_files.to_staging_upload_files() + logger.info('started to repack re-processed upload') - staging_upload_files = self.upload_files.to_staging_upload_files() + with utils.timer( + logger, 'reprocessed staged upload packed', step='repack staged', + upload_size=self.upload_files.size): - with utils.timer( - logger, 'reprocessed staged upload packed', step='delete staged', - upload_size=self.upload_files.size): + staging_upload_files.pack(self.user_metadata(), skip_raw=True) - staging_upload_files.pack(self.user_metadata(), skip_raw=True) + with utils.timer( + logger, 'reprocessed staged upload deleted', step='delete staged', + upload_size=self.upload_files.size): - with utils.timer( - logger, 'reprocessed staged upload deleted', step='delete staged', - upload_size=self.upload_files.size): + staging_upload_files.delete() + self.last_update = datetime.utcnow() + self.save() - staging_upload_files.delete() - self.last_update = datetime.utcnow() - self.save() + else: + logger.info('no cleanup after re-processing unpublished upload') @task def cleanup(self): diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index e25ed455e646e2cf5c18f72cdd40909347da1a82..89ad0bcd009c5fa8d73eb805b15dcd2a0ac27678 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -207,7 +207,6 @@ def test_process_non_existing(proc_infra, test_user, with_error): assert upload.tasks_status == FAILURE assert len(upload.errors) > 0 - @pytest.mark.timeout(config.tests.default_timeout) @pytest.mark.parametrize('with_failure', [None, 'before', 'after', 'not-matched']) def test_re_processing(published: Upload, internal_example_user_metadata, monkeypatch, with_failure): @@ -302,6 +301,36 @@ def test_re_processing(published: Upload, internal_example_user_metadata, monkey assert entry_metadata.atoms == [] +@pytest.mark.parametrize('publish,old_staging', [ + (False, False), (True, True), (True, False)]) +def test_re_process_staging(non_empty_processed, publish, old_staging): + upload = non_empty_processed + + if publish: + upload.publish_upload() + try: + upload.block_until_complete(interval=.01) + except Exception: + pass + + if old_staging: + StagingUploadFiles(upload.upload_id, create=True) + + upload.reset() + upload.re_process_upload() + try: + upload.block_until_complete(interval=.01) + except Exception: + pass + + assert_processing(upload, published=publish) + if publish: + with pytest.raises(KeyError): + StagingUploadFiles(upload.upload_id) + else: + StagingUploadFiles(upload.upload_id) + + @pytest.mark.timeout(config.tests.default_timeout) @pytest.mark.parametrize('with_failure', [None, 'before', 'after']) def test_re_pack(published: Upload, monkeypatch, with_failure):