diff --git a/nomad/client/migration.py b/nomad/client/migration.py index 20f8ff237595adb7c5f12885dad788ead894fb38..babadbf606c0423b4ae99ce0babebe10118fc88f 100644 --- a/nomad/client/migration.py +++ b/nomad/client/migration.py @@ -23,7 +23,7 @@ import multiprocessing import queue import json -from nomad import config, infrastructure +from nomad import config, infrastructure, utils from nomad.migration import NomadCOEMigration, SourceCalc, Package from .main import cli @@ -99,7 +99,39 @@ def reset(delete_packages: bool): def determine_upload_paths(paths, pattern=None): - if pattern is not None: + if len(paths) == 1 and paths[0].endswith('.json'): + with open(paths[0], 'rt') as f: + data = json.load(f) + + if isinstance(data, list): + items = data + else: + if pattern is not None: + key = pattern + else: + key = 'uploads_with_no_package' + + items = [] + for item in data[key]: + if isinstance(item, str): + items.append(item) + else: + items.append(item['id']) + + paths = [] + for upload_id in items: + exists = False + for prefix in ['/nomad/repository/data/extracted', '/nomad/repository/data/uploads']: + path = os.path.join(prefix, upload_id) + if os.path.exists(path): + exists = True + paths.append(path) + + if not exists: + utils.get_logger(__name__).error( + 'source upload does not exist', source_upload_id=upload_id) + + elif pattern is not None: assert len(paths) == 1, "Can only apply pattern on a single directory." path = paths[0] if pattern == "ALL": @@ -195,9 +227,10 @@ def upload( @migration.command(help='Get an report about not migrated calcs.') -def missing(): +@click.option('--use-cache', is_flag=True, help='Skip processing steps and take results from prior runs') +def missing(use_cache): infrastructure.setup_logging() infrastructure.setup_mongo() - report = SourceCalc.missing() + report = SourceCalc.missing(use_cache=use_cache) print(json.dumps(report, indent=2)) diff --git a/nomad/migration.py b/nomad/migration.py index f95c5b68634c5c6aec6ced53ef504336877fec89..9fa759603d4453cfd05ad0fc44fd6a72b20d2415 100644 --- a/nomad/migration.py +++ b/nomad/migration.py @@ -149,6 +149,9 @@ class Package(Document): report = DictField() """ The report of the last successful migration of this package """ + migration_failure = StringField() + """ String that describe the cause for last failed migration attempt """ + meta = dict(indexes=['upload_id', 'migration_version']) @classmethod @@ -394,12 +397,12 @@ class SourceCalc(Document): _dataset_cache: dict = {} @staticmethod - def missing(): + def missing(use_cache=False): """ Produces data about non migrated calcs """ tmp_data_path = '/tmp/nomad_migration_missing.json' - if os.path.exists(tmp_data_path): + if os.path.exists(tmp_data_path) and use_cache: with open(tmp_data_path, 'rt') as f: data = utils.POPO(**json.load(f)) else: @@ -407,14 +410,14 @@ class SourceCalc(Document): try: # get source_uploads that have non migrated calcs - if data.step < 1: + if data.step < 1 or not use_cache: import re data.source_uploads = SourceCalc._get_collection() \ .find({'migration_version': {'$lt': 0}, 'mainfile': {'$not': re.compile(r'^aflowlib_data.*')}}) \ .distinct('upload') data.step = 1 - if data.step < 2: + if data.step < 2 or not use_cache: source_uploads = [] data.packages = utils.POPO() data.uploads_with_no_package = [] @@ -426,9 +429,10 @@ class SourceCalc(Document): calcs = SourceCalc.objects(upload=source_upload).count() packages = Package.objects(upload_id=source_upload).count() source_uploads.append(dict( - id=source_upload, packages=packages, calcs=calcs, + id=source_upload, package_count=packages, + packages=package.packages, calcs=calcs, path=package.upload_path)) - source_uploads = sorted(source_uploads, key=lambda k: k['calcs']) + source_uploads = sorted(source_uploads, key=lambda k: k['calcs'], reverse=True) data.source_uploads = source_uploads data.step = 2 finally: @@ -775,8 +779,9 @@ class NomadCOEMigration: except Exception as e: package_report = Report() package_report.failed_packages = 1 - logger.error( - 'unexpected exception while migrating packages', exc_info=e) + event = 'unexpected exception while migrating packages' + package.migration_failure = event + ': ' + str(e) + logger.error(event, exc_info=e) finally: package.report = package_report package.migration_version = self.migration_version @@ -852,7 +857,9 @@ class NomadCOEMigration: assert upload is None, 'duplicate upload name' upload = a_upload except Exception as e: - self.logger.error('could verify if upload already exists', exc_info=e) + event = 'could verify if upload already exists' + self.logger.error(event, exc_info=e) + package.migration_failure(event) report.failed_packages += 1 return report @@ -863,7 +870,9 @@ class NomadCOEMigration: upload = self.call_api( 'uploads.upload', name=package_id, local_path=package.package_path) except Exception as e: - self.logger.error('could not upload package', exc_info=e) + event = 'could not upload package' + self.logger.error(event, exc_info=e) + package.migration_failure = event + ': ' + str(e) report.failed_packages += 1 return report else: @@ -934,7 +943,9 @@ class NomadCOEMigration: sleep() if upload.tasks_status == FAILURE: - logger.error('failed to process upload', process_errors=upload.errors) + event = 'failed to process upload' + logger.error(event, process_errors=upload.errors) + package.migration_failure = event + ': ' + str(upload.errors) report.failed_packages += 1 delete_upload(FAILED_PROCESSING) return report @@ -946,7 +957,7 @@ class NomadCOEMigration: # check for processing errors with utils.timer(logger, 'checked upload processing'): - per_page = 500 + per_page = 10000 for page in range(1, math.ceil(upload_total_calcs / per_page) + 1): upload = self.call_api( 'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page, @@ -1041,12 +1052,14 @@ class NomadCOEMigration: break if upload.tasks_status == FAILURE: - logger.error('could not publish upload', process_errors=upload.errors) + event = 'could not publish upload' + logger.error(event, process_errors=upload.errors) report.failed_calcs = report.total_calcs report.migrated_calcs = 0 report.calcs_with_diffs = 0 report.new_calcs = 0 report.failed_packages += 1 + package.migration_failure = event + ': ' + str(upload.errors) delete_upload(FAILED_PUBLISH) SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \ diff --git a/nomad/processing/data.py b/nomad/processing/data.py index b7307b403b6f332733e4a286b113451ca2a55c3f..da52165f5cc6e0b96ea2ede639553ce46ca7ec81 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -386,11 +386,18 @@ class Upload(Proc): @property def metadata(self) -> dict: - return self.upload_files.user_metadata + # TODO user_metadata needs to be stored in the public bucket, since staging data might not be shared + try: + upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True) + except KeyError: + return None + return upload_files.user_metadata @metadata.setter def metadata(self, data: dict) -> None: - self.upload_files.user_metadata = data + # TODO user_metadata needs to be stored in the public bucket, since staging data might not be shared + upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True) + upload_files.user_metadata = data @classmethod def get(cls, id: str, include_published: bool = False) -> 'Upload':