Commit 3844e282 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Make sure that failed calcs are also re processed. Fixes #183

parent 1e63505b
Pipeline #52393 canceled with stages
in 6 minutes and 43 seconds
......@@ -153,9 +153,15 @@ class Calc(Proc):
logger = self.get_logger()
try:
self.metadata['nomad_version'] = config.version
self.metadata['nomad_commit'] = config.commit
self.metadata['last_processing'] = datetime.now()
calc_with_metadata = datamodel.CalcWithMetadata(**self.metadata)
calc_with_metadata.upload_id = self.upload_id
calc_with_metadata.calc_id = self.calc_id
calc_with_metadata.calc_hash = self.upload_files.calc_hash(self.mainfile)
calc_with_metadata.mainfile = self.mainfile
calc_with_metadata.nomad_version = config.version
calc_with_metadata.nomad_commit = config.commit
calc_with_metadata.last_processing = datetime.now()
self.metadata = calc_with_metadata.to_dict()
self.parsing()
self.normalizing()
......@@ -626,7 +632,7 @@ class Upload(Proc):
public_upload_files.to_staging_upload_files(create=True)
self._continue_with('parse_all')
for calc in self.calcs:
for calc in Calc.objects(upload_id=self.upload_id):
calc.reset()
calc.re_process_calc()
......
......@@ -95,6 +95,7 @@
"sampling_method": "geometry_optimization"
}
],
UNPARSABLE
"section_frame_sequence": [
{
"_name": "section_frame_sequence",
......@@ -107,4 +108,4 @@
]
}
]
}
\ No newline at end of file
}
......@@ -240,58 +240,56 @@ def test_process_non_existing(proc_infra, test_user, with_error):
assert len(upload.errors) > 0
def test_re_processing(non_empty_processed: Upload, no_warn, example_user_metadata, with_publish_to_coe_repo, monkeypatch):
processed = non_empty_processed
# publish
processed.compress_and_set_metadata(example_user_metadata)
additional_keys = ['with_embargo']
if with_publish_to_coe_repo:
additional_keys.append('pid')
processed.publish_upload()
try:
processed.block_until_complete(interval=.01)
except Exception:
pass
processed.reload()
assert processed.published
assert processed.upload_files.to_staging_upload_files() is None
old_upload_time = processed.last_update
first_calc = processed.calcs.first()
@pytest.mark.timeout(config.tests.default_timeout)
@pytest.mark.parametrize('with_failure', [None, 'before', 'after'])
def test_re_processing(published: Upload, example_user_metadata, monkeypatch, with_failure):
if with_failure == 'before':
calc = published.all_calcs(0, 1).first()
calc.tasks_status = FAILURE
calc.errors = ['example error']
calc.save()
assert published.failed_calcs > 0
assert published.published
assert published.upload_files.to_staging_upload_files() is None
with_failure = with_failure == 'after'
old_upload_time = published.last_update
first_calc = published.all_calcs(0, 1).first()
old_calc_time = first_calc.metadata['last_processing']
old_archive_files = list(
archive_file
for archive_file in os.listdir(processed.upload_files.os_path)
for archive_file in os.listdir(published.upload_files.os_path)
if 'archive' in archive_file)
for archive_file in old_archive_files:
with open(processed.upload_files.join_file(archive_file).os_path, 'wt') as f:
with open(published.upload_files.join_file(archive_file).os_path, 'wt') as f:
f.write('')
if with_failure:
raw_files = 'tests/data/proc/examples_template_unparsable.zip'
else:
raw_files = 'tests/data/proc/examples_template_different_atoms.zip'
shutil.copyfile(
'tests/data/proc/examples_template_different_atoms.zip',
processed.upload_files.join_file('raw-restricted.plain.zip').os_path)
raw_files, published.upload_files.join_file('raw-restricted.plain.zip').os_path)
upload = processed.to_upload_with_metadata(example_user_metadata)
upload = published.to_upload_with_metadata(example_user_metadata)
# reprocess
monkeypatch.setattr('nomad.config.version', 're_process_test_version')
monkeypatch.setattr('nomad.config.commit', 're_process_test_commit')
processed.re_process_upload()
published.re_process_upload()
try:
processed.block_until_complete(interval=.01)
published.block_until_complete(interval=.01)
except Exception:
pass
processed.reload()
published.reload()
first_calc.reload()
# assert new process time
assert processed.last_update > old_upload_time
assert published.last_update > old_upload_time
assert first_calc.metadata['last_processing'] > old_calc_time
# assert new process version
......@@ -299,23 +297,21 @@ def test_re_processing(non_empty_processed: Upload, no_warn, example_user_metada
assert first_calc.metadata['nomad_commit'] == 're_process_test_commit'
# assert changed archive files
for archive_file in old_archive_files:
assert os.path.getsize(processed.upload_files.join_file(archive_file).os_path) > 0
# assert maintained user metadata (mongo+es+coe)
if with_publish_to_coe_repo:
assert_coe_upload(upload.upload_id, user_metadata=example_user_metadata)
if not with_failure:
for archive_file in old_archive_files:
assert os.path.getsize(published.upload_files.join_file(archive_file).os_path) > 0
# assert maintained user metadata (mongo+es)
assert_upload_files(upload, PublicUploadFiles, published=True)
assert_search_upload(upload, additional_keys, published=True)
if with_publish_to_coe_repo and config.repository_db.mode == 'coe':
assert(os.path.exists(os.path.join(config.fs.coe_extracted, upload.upload_id)))
assert_processing(Upload.get(upload.upload_id, include_published=True), published=True)
assert_search_upload(upload, published=True)
if not with_failure:
assert_processing(Upload.get(upload.upload_id, include_published=True), published=True)
# assert changed calc metadata (mongo)
assert first_calc.metadata['atoms'][0] == 'H'
if not with_failure:
assert first_calc.metadata['atoms'][0] == 'H'
else:
assert first_calc.metadata['atoms'][0] == 'Br'
def mock_failure(cls, task, monkeypatch):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment