Commit 337e62bd authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Allow to publish failed calcs.

parent 58a8a1fc
Pipeline #45803 failed with stages
in 76 minutes and 58 seconds
......@@ -222,7 +222,10 @@ class Calc(Base):
permission=(1 if calc.with_embargo else 0))
repo_db.add(user_metadata)
spacegroup = Spacegroup(calc=self, n=calc.spacegroup)
if isinstance(calc.spacegroup, int) or calc.spacegroup.isdigit():
spacegroup = Spacegroup(calc=self, n=calc.spacegroup)
else:
spacegroup = Spacegroup(calc=self, n='0')
repo_db.add(spacegroup)
# topic based properties
......
......@@ -107,7 +107,8 @@ services = NomadConfig(
api_base_path='/nomad/api',
api_secret='defaultApiSecret',
admin_password='password',
disable_reset=True
disable_reset=True,
not_processed_value='not processed'
)
......
......@@ -400,7 +400,9 @@ class StagingUploadFiles(UploadFiles):
archive_zip = archive_restricted_zip if calc.with_embargo else archive_public_zip
archive_filename = '%s.%s' % (calc.calc_id, self._archive_ext)
archive_zip.write(self._archive_dir.join_file(archive_filename).os_path, archive_filename)
archive_file = self._archive_dir.join_file(archive_filename)
if archive_file.exists():
archive_zip.write(archive_file.os_path, archive_filename)
archive_log_filename = '%s.%s' % (calc.calc_id, 'log')
log_file = self._archive_dir.join_file(archive_log_filename)
......
......@@ -39,7 +39,7 @@ import shutil
from nomad import utils, infrastructure, files, config
from nomad.coe_repo import User, Calc, LoginException
from nomad.datamodel import CalcWithMetadata
from nomad.processing import FAILURE, SUCCESS
from nomad.processing import FAILURE
default_pid_prefix = 7000000
......@@ -814,9 +814,9 @@ class NomadCOEMigration:
calc_id=calc_proc.calc_id,
mainfile=calc_proc.mainfile)
if calc_proc.tasks_status == SUCCESS:
calc_mainfiles.append(calc_proc.mainfile)
else:
calc_mainfiles.append(calc_proc.mainfile)
if calc_proc.tasks_status == FAILURE:
report.failed_calcs += 1
calc_logger.info(
'could not process a calc', process_errors=calc_proc.errors)
......@@ -844,12 +844,15 @@ class NomadCOEMigration:
report.migrated_calcs += 1
calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
try:
if not self.validate(calc, source_calc_with_metadata, calc_logger):
if calc.get('processed', False):
try:
if not self.validate(
calc, source_calc_with_metadata, calc_logger):
report.calcs_with_diffs += 1
except Exception as e:
calc_logger.warning(
'unexpected exception during validation', exc_info=e)
report.calcs_with_diffs += 1
except Exception as e:
calc_logger.warning('unexpected exception during validation', exc_info=e)
report.calcs_with_diffs += 1
else:
calc_logger.info('processed a calc that has no source')
report.new_calcs += 1
......@@ -988,7 +991,7 @@ class Report(utils.POPO):
self.skipped_packages = 0
self.total_calcs = 0 # the calcs that have been found by the target
self.total_source_calcs = 0 # the calcs in the source index
self.failed_calcs = 0 # the calcs found b the target that could not be processed/published
self.failed_calcs = 0 # calcs that have been migrated with failed processing
self.migrated_calcs = 0 # the calcs from the source, successfully added to the target
self.calcs_with_diffs = 0 # the calcs from the source, successfully added to the target with different metadata
self.new_calcs = 0 # the calcs successfully added to the target that were not found in the source
......
......@@ -70,7 +70,7 @@ class Calc(Proc):
meta: Any = {
'indexes': [
'upload_id', 'mainfile', 'parser', 'tasks_status'
'upload_id', 'mainfile', 'parser', 'tasks_status', 'process_status'
]
}
......@@ -147,6 +147,20 @@ class Calc(Proc):
logger.error('calculation upload does not exist')
try:
# save preliminary minimum calc metadata in case processing fails
# successful processing will replace it with the actual metadata
calc_with_metadata = CalcWithMetadata(
upload_id=self.upload_id,
calc_id=self.calc_id,
calc_hash=self.upload_files.calc_hash(self.mainfile),
mainfile=self.mainfile)
calc_with_metadata.published = False
calc_with_metadata.uploader = self.upload.uploader.to_popo()
calc_with_metadata.nomad_version = config.version
calc_with_metadata.last_processing = datetime.now()
calc_with_metadata.files = self.upload_files.calc_files(self.mainfile)
self.metadata = calc_with_metadata.to_dict()
self.parsing()
self.normalizing()
self.archiving()
......@@ -159,6 +173,28 @@ class Calc(Proc):
except Exception as e:
logger.error('could not close calculation proc log', exc_info=e)
def fail(self, *errors, log_level=logging.ERROR, **kwargs):
# in case of failure, index a minimum set of metadata and mark
# processing failure
try:
calc_with_metadata = CalcWithMetadata(**self.metadata)
calc_with_metadata.formula = config.services.not_processed_value
calc_with_metadata.basis_set = config.services.not_processed_value
calc_with_metadata.xc_functional = config.services.not_processed_value
calc_with_metadata.system = config.services.not_processed_value
calc_with_metadata.crystal_system = config.services.not_processed_value
calc_with_metadata.spacegroup = config.services.not_processed_value
calc_with_metadata.spacegroup_symbol = config.services.not_processed_value
calc_with_metadata.code_name = config.services.not_processed_value
calc_with_metadata.code_version = config.services.not_processed_value
calc_with_metadata.processed = False
self.metadata = calc_with_metadata.to_dict()
search.Entry.from_calc_with_metadata(calc_with_metadata).save()
except Exception as e:
self.get_logger().error('could not index after processing failure', exc_info=e)
super().fail(*errors, log_level=log_level, **kwargs)
def on_process_complete(self, process_name):
# the save might be necessary to correctly read the join condition from the db
self.save()
......@@ -183,10 +219,12 @@ class Calc(Proc):
exc_info=e, error=str(e), **context)
return
# add the non code specific calc metadata to the backend
# all other quantities have been determined by parsers/normalizers
self._parser_backend.openNonOverlappingSection('section_calculation_info')
self._parser_backend.addValue('upload_id', self.upload_id)
self._parser_backend.addValue('calc_id', self.calc_id)
self._parser_backend.addValue('calc_hash', self.upload_files.calc_hash(self.mainfile))
self._parser_backend.addValue('calc_hash', self.metadata['calc_hash'])
self._parser_backend.addValue('main_file', self.mainfile)
self._parser_backend.addValue('parser_name', self.parser)
......@@ -202,8 +240,7 @@ class Calc(Proc):
self._parser_backend.openNonOverlappingSection('section_repository_info')
self._parser_backend.addValue('repository_archive_gid', '%s/%s' % (self.upload_id, self.calc_id))
self._parser_backend.addValue(
'repository_filepaths', self.upload_files.calc_files(self.mainfile))
self._parser_backend.addValue('repository_filepaths', self.metadata['files'])
self._parser_backend.closeNonOverlappingSection('section_repository_info')
self.add_processor_info(self.parser)
......@@ -267,16 +304,12 @@ class Calc(Proc):
logger = self.get_logger()
calc_with_metadata = self._parser_backend.to_calc_with_metadata()
calc_with_metadata.published = False
calc_with_metadata.uploader = self.upload.uploader.to_popo()
calc_with_metadata.update(**self.metadata)
calc_with_metadata.processed = True
calc_with_metadata.last_processing = datetime.now()
calc_with_metadata.nomad_version = config.version
# persist the repository metadata
with utils.timer(logger, 'saved repo metadata', step='metadata'):
# persist the calc metadata
with utils.timer(logger, 'saved calc metadata', step='metadata'):
self.metadata = calc_with_metadata.to_dict()
self.save()
# index in search
with utils.timer(logger, 'indexed', step='index'):
......@@ -336,7 +369,7 @@ class Upload(Proc):
meta: Any = {
'indexes': [
'user_id', 'tasks_status'
'user_id', 'tasks_status', 'process_status', 'published'
]
}
......@@ -528,6 +561,7 @@ class Upload(Proc):
if self.temporary:
os.remove(self.upload_path)
self.upload_path = None
except KeyError:
self.fail('processing requested for non existing upload', log_level=logging.ERROR)
return
......@@ -666,19 +700,20 @@ class Upload(Proc):
uploader=utils.POPO(id=int(self.user_id)),
upload_time=self.upload_time if user_upload_time is None else user_upload_time)
def apply_metadata(calc):
def get_metadata(calc: Calc):
"""
Assemble metadata from calc's processed calc metadata and the uploads
user metadata.
"""
calc_data = calc.metadata
calc_with_metadata = CalcWithMetadata(**calc_data)
calc_metadata = dict(upload_metadata)
calc_metadata.update(calc_metadatas.get(calc.mainfile, {}))
calc_with_metadata.apply_user_metadata(calc_metadata)
return calc_with_metadata
# TODO publish failed calcs
# result.calcs = [apply_metadata(calc) for calc in Calc.objects(upload_id=self.upload_id)]
result.calcs = [apply_metadata(calc) for calc in self.calcs]
result.calcs = [get_metadata(calc) for calc in Calc.objects(upload_id=self.upload_id)]
return result
......
......@@ -82,6 +82,7 @@ class Entry(Document):
with_embargo = Boolean()
published = Boolean()
processed = Boolean()
authors = Object(User, multi=True)
owners = Object(User, multi=True)
......@@ -124,6 +125,7 @@ class Entry(Document):
self.calc_id = source.calc_id
self.calc_hash = source.calc_hash
self.pid = None if source.pid is None else str(source.pid)
self.processed = source.processed
self.mainfile = source.mainfile
if source.files is None:
......
......@@ -46,4 +46,7 @@ sysctl --system
systemctl daemon-reload
systemctl restart kubelet
echo "Still have to use kubeadm init/join"
\ No newline at end of file
echo "Still have to use kubeadm init/join"
echo "Run on master node to create join command:"
echo "kubeadm token create --print-join-command"
echo "Run join command here"
\ No newline at end of file
......@@ -451,7 +451,10 @@ class SMTPServerFixture:
@pytest.fixture(scope='session')
def smtpd(request):
def smtpd(request, monkeysession):
# on some local machines resolving the local machine takes quit a while and
# is irrelevant for testing
monkeysession.setattr('socket.getfqdn', lambda *args, **kwargs: 'local.server')
fixture = SMTPServerFixture()
request.addfinalizer(fixture.close)
return fixture
......
......@@ -30,8 +30,7 @@ from tests.test_coe_repo import assert_coe_upload
def test_send_mail(mails, monkeypatch):
monkeypatch.setattr('nomad.config.mail.enabled', True)
infrastructure.send_mail('test name', 'test@email.de', 'test message', 'subjct')
infrastructure.send_mail('test name', 'test@email.de', 'test message', 'subject')
for message in mails.messages:
assert re.search(r'test message', message.data.decode('utf-8')) is not None
......@@ -124,6 +123,33 @@ def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, mo
assert_search_upload(upload, additional_keys, published=True)
def test_publish_failed(
non_empty_uploaded: Tuple[str, str], example_user_metadata, test_user,
monkeypatch, proc_infra, with_publish_to_coe_repo):
mock_failure(Calc, 'parsing', monkeypatch)
processed = run_processing(non_empty_uploaded, test_user)
processed.metadata = example_user_metadata
additional_keys = ['with_embargo']
if with_publish_to_coe_repo:
additional_keys.append('pid')
processed.publish_upload()
try:
processed.block_until_complete(interval=.01)
except Exception:
pass
upload = processed.to_upload_with_metadata()
if with_publish_to_coe_repo:
assert_coe_upload(upload.upload_id, user_metadata=example_user_metadata)
assert_upload_files(upload, PublicUploadFiles, additional_keys, published=True)
assert_search_upload(upload, additional_keys, published=True, processed=False)
@pytest.mark.timeout(10)
def test_processing_with_warning(proc_infra, test_user, with_warn):
example_file = 'tests/data/proc/examples_with_warning_template.zip'
......@@ -143,6 +169,16 @@ def test_process_non_existing(proc_infra, test_user, with_error):
assert len(upload.errors) > 0
def mock_failure(cls, task, monkeypatch):
def mock(self):
raise Exception('fail for test')
mock.__name__ = task
mock = task_decorator(mock)
monkeypatch.setattr('nomad.processing.data.%s.%s' % (cls.__name__, task), mock)
@pytest.mark.parametrize('task', ['extracting', 'parse_all', 'cleanup', 'parsing'])
@pytest.mark.timeout(10)
def test_task_failure(monkeypatch, uploaded, task, proc_infra, test_user, with_error):
......@@ -154,12 +190,7 @@ def test_task_failure(monkeypatch, uploaded, task, proc_infra, test_user, with_e
else:
assert False
def mock(self):
raise Exception('fail for test')
mock.__name__ = task
mock = task_decorator(mock)
monkeypatch.setattr('nomad.processing.data.%s.%s' % (cls.__name__, task), mock)
mock_failure(cls, task, monkeypatch)
# run the test
upload = run_processing(uploaded, test_user)
......
......@@ -213,9 +213,9 @@ mirgation_test_specs = [
('new_calc', 'new_calc', dict(migrated=2, source=2, new=1)),
('missing_calc', 'missing_calc', dict(migrated=1, source=2, missing=1)),
('missmatch', 'missmatch', dict(migrated=2, source=2, diffs=1)),
('failed_calc', 'failed_calc', dict(migrated=1, source=2, diffs=0, missing=1, failed=1)),
('failed_calc', 'failed_calc', dict(migrated=2, source=2, diffs=0, missing=0, failed=1)),
('failed_upload', 'baseline', dict(migrated=0, source=2, missing=2, errors=1)),
('failed_publish', 'baseline', dict(migrated=0, source=2, missing=2, failed=2, errors=1))
('failed_publish', 'baseline', dict(migrated=0, source=2, missing=2, errors=1, not_migrated=2))
]
......@@ -244,7 +244,7 @@ def perform_migration_test(migrate_infra, name, test_directory, assertions, monk
migrate_infra.migration.set_pid_prefix(pid_prefix)
report = migrate_infra.migration.migrate(upload_path)
assert report.total_calcs == assertions.get('migrated', 0) + assertions.get('new', 0) + assertions.get('failed', 0)
assert report.total_calcs == assertions.get('migrated', 0) + assertions.get('new', 0) + assertions.get('not_migrated', 0)
# assert if new, diffing, migrated calcs where detected correctly
assert report.total_source_calcs == assertions.get('source', 0)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment