Commit 75cf1602 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Small fixes and delete uploads after failed migration.

parent 236fa67c
......@@ -109,6 +109,10 @@ class CalcProcReproduction:
self.logger.info('identified parser')
parser_backend = parser.run(self.upload_files.raw_file_object(self.mainfile).os_path, logger=self.logger)
if not parser_backend.status[0] == 'ParseSuccess':
self.logger.error('parsing was not successful', status=parser_backend.status)
parser_backend.openNonOverlappingSection('section_calculation_info')
parser_backend.addValue('upload_id', self.upload_id)
parser_backend.addValue('calc_id', self.calc_id)
......
......@@ -102,9 +102,10 @@ def pid_prefix(prefix: int):
@click.option('--delete-local', help='Delete created local upload files after upload.', is_flag=True)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
@click.option('--delete-failed', default='', type=str, help='String from N, U, P to determine if empty (N), failed (U), or failed to publish (P) uploads should be deleted or kept for debugging.')
def upload(
paths: list, pattern: str, create_packages: bool, local: bool, delete_local: bool,
parallel: int, migration_version: int):
parallel: int, migration_version: int, delete_failed: str):
infrastructure.setup_logging()
infrastructure.setup_mongo()
......@@ -120,4 +121,5 @@ def upload(
migration = NomadCOEMigration(migration_version=migration_version, threads=parallel)
migration.migrate(
*paths, local=local, delete_local=delete_local, create_packages=create_packages)
*paths, local=local, delete_local=delete_local, create_packages=create_packages,
delete_failed=delete_failed)
......@@ -344,6 +344,11 @@ class SourceCalc(Document):
SourceCalc.objects.insert(source_calcs)
NO_PROCESSED_CALCS = 0
FAILED_PROCESSING = 1
FAILED_PUBLISH = 2
class NomadCOEMigration:
"""
Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
......@@ -551,7 +556,7 @@ class NomadCOEMigration:
def migrate(
self, *args, create_packages: bool = True, local: bool = False,
delete_local: bool = False) -> utils.POPO:
delete_local: bool = False, delete_failed: str = '') -> utils.POPO:
"""
Migrate the given uploads.
......@@ -574,6 +579,9 @@ class NomadCOEMigration:
local: Instead of streaming an upload, create a local file and use
local_path on the upload.
delete_local: Delete created local file upload files
delete_failed: String from ``N``, ``U``, ``P`` to determine that uploads with
no processed calcs (N), failed upload processing (U), or failed publish
operation (P) should be deleted after the migration attempt.
Returns: Dictionary with statistics on the migration.
"""
......@@ -594,9 +602,14 @@ class NomadCOEMigration:
overall_report.new_calcs))
def migrate_package(package: Package, of_packages: int):
logger = self.logger.bind(package_id=package.package_id, source_upload_id=package.upload_id)
logger = self.logger.bind(
package_id=package.package_id, source_upload_id=package.upload_id)
try:
package_report = self.migrate_package(package, local=local, delete_local=delete_local)
package_report = self.migrate_package(
package, local=local, delete_local=delete_local,
delete_failed=delete_failed)
except Exception as e:
package_report = Report()
logger.error(
......@@ -694,7 +707,7 @@ class NomadCOEMigration:
def migrate_package(
self, package: Package, local: bool = False,
delete_local: bool = False) -> 'Report':
delete_local: bool = False, delete_failed: str = '') -> 'Report':
""" Migrates the given package. For other params see :func:`migrate`. """
source_upload_id = package.upload_id
......@@ -730,6 +743,27 @@ class NomadCOEMigration:
logger = logger.bind(
source_upload_id=source_upload_id, upload_id=upload.upload_id)
def delete_upload(reason: int):
delete = \
(reason == NO_PROCESSED_CALCS and 'N' in delete_failed) or \
(reason == FAILED_PROCESSING and 'U' in delete_failed) or \
(reason == FAILED_PUBLISH and 'P' in delete_failed)
upload_to_delete = upload
if delete:
sleep = utils.SleepTimeBackoff()
while upload.process_running:
try:
upload_to_delete = self.nomad(
'uploads.delete_upload', upload_id=upload_to_delete.upload_id)
sleep()
except HTTPNotFound:
# the proc upload will be deleted by the delete operation
break
logger.info('deleted upload after migration failure')
# grab source calcs, while waiting for upload
source_calcs = dict()
surrogate_source_calc_with_metadata = None
......@@ -767,6 +801,7 @@ class NomadCOEMigration:
logger.error('failed to process upload', process_errors=upload.errors)
report.failed_packages += 1
report.missing_calcs += report.total_source_calcs
delete_upload(FAILED_PROCESSING)
return report
else:
report.total_calcs += upload.calcs.pagination.total
......@@ -874,11 +909,14 @@ class NomadCOEMigration:
report.calcs_with_diffs = 0
report.new_calcs = 0
report.failed_packages += 1
delete_upload(FAILED_PUBLISH)
else:
SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
.update(migration_version=self.migration_version)
package.migration_version = self.migration_version
else:
delete_upload(NO_PROCESSED_CALCS)
logger.info('no successful calcs, skip publish')
report.missing_calcs = report.total_source_calcs - report.migrated_calcs
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment