Commit cbec2eae authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Fixed user metdata staging locality issues on publish upload.

parent d6254cc6
......@@ -23,7 +23,7 @@ import multiprocessing
import queue
import json
from nomad import config, infrastructure
from nomad import config, infrastructure, utils
from nomad.migration import NomadCOEMigration, SourceCalc, Package
from .main import cli
......@@ -99,7 +99,39 @@ def reset(delete_packages: bool):
def determine_upload_paths(paths, pattern=None):
if len(paths) == 1 and paths[0].endswith('.json'):
with open(paths[0], 'rt') as f:
data = json.load(f)
if isinstance(data, list):
items = data
else:
if pattern is not None:
key = pattern
else:
key = 'uploads_with_no_package'
items = []
for item in data[key]:
if isinstance(item, str):
items.append(item)
else:
items.append(item['id'])
paths = []
for upload_id in items:
exists = False
for prefix in ['/nomad/repository/data/extracted', '/nomad/repository/data/uploads']:
path = os.path.join(prefix, upload_id)
if os.path.exists(path):
exists = True
paths.append(path)
if not exists:
utils.get_logger(__name__).error(
'source upload does not exist', source_upload_id=upload_id)
elif pattern is not None:
assert len(paths) == 1, "Can only apply pattern on a single directory."
path = paths[0]
if pattern == "ALL":
......@@ -195,9 +227,10 @@ def upload(
@migration.command(help='Get an report about not migrated calcs.')
def missing():
@click.option('--use-cache', is_flag=True, help='Skip processing steps and take results from prior runs')
def missing(use_cache):
infrastructure.setup_logging()
infrastructure.setup_mongo()
report = SourceCalc.missing()
report = SourceCalc.missing(use_cache=use_cache)
print(json.dumps(report, indent=2))
......@@ -149,6 +149,9 @@ class Package(Document):
report = DictField()
""" The report of the last successful migration of this package """
migration_failure = StringField()
""" String that describe the cause for last failed migration attempt """
meta = dict(indexes=['upload_id', 'migration_version'])
@classmethod
......@@ -394,12 +397,12 @@ class SourceCalc(Document):
_dataset_cache: dict = {}
@staticmethod
def missing():
def missing(use_cache=False):
"""
Produces data about non migrated calcs
"""
tmp_data_path = '/tmp/nomad_migration_missing.json'
if os.path.exists(tmp_data_path):
if os.path.exists(tmp_data_path) and use_cache:
with open(tmp_data_path, 'rt') as f:
data = utils.POPO(**json.load(f))
else:
......@@ -407,14 +410,14 @@ class SourceCalc(Document):
try:
# get source_uploads that have non migrated calcs
if data.step < 1:
if data.step < 1 or not use_cache:
import re
data.source_uploads = SourceCalc._get_collection() \
.find({'migration_version': {'$lt': 0}, 'mainfile': {'$not': re.compile(r'^aflowlib_data.*')}}) \
.distinct('upload')
data.step = 1
if data.step < 2:
if data.step < 2 or not use_cache:
source_uploads = []
data.packages = utils.POPO()
data.uploads_with_no_package = []
......@@ -426,9 +429,10 @@ class SourceCalc(Document):
calcs = SourceCalc.objects(upload=source_upload).count()
packages = Package.objects(upload_id=source_upload).count()
source_uploads.append(dict(
id=source_upload, packages=packages, calcs=calcs,
id=source_upload, package_count=packages,
packages=package.packages, calcs=calcs,
path=package.upload_path))
source_uploads = sorted(source_uploads, key=lambda k: k['calcs'])
source_uploads = sorted(source_uploads, key=lambda k: k['calcs'], reverse=True)
data.source_uploads = source_uploads
data.step = 2
finally:
......@@ -775,8 +779,9 @@ class NomadCOEMigration:
except Exception as e:
package_report = Report()
package_report.failed_packages = 1
logger.error(
'unexpected exception while migrating packages', exc_info=e)
event = 'unexpected exception while migrating packages'
package.migration_failure = event + ': ' + str(e)
logger.error(event, exc_info=e)
finally:
package.report = package_report
package.migration_version = self.migration_version
......@@ -852,7 +857,9 @@ class NomadCOEMigration:
assert upload is None, 'duplicate upload name'
upload = a_upload
except Exception as e:
self.logger.error('could verify if upload already exists', exc_info=e)
event = 'could verify if upload already exists'
self.logger.error(event, exc_info=e)
package.migration_failure(event)
report.failed_packages += 1
return report
......@@ -863,7 +870,9 @@ class NomadCOEMigration:
upload = self.call_api(
'uploads.upload', name=package_id, local_path=package.package_path)
except Exception as e:
self.logger.error('could not upload package', exc_info=e)
event = 'could not upload package'
self.logger.error(event, exc_info=e)
package.migration_failure = event + ': ' + str(e)
report.failed_packages += 1
return report
else:
......@@ -934,7 +943,9 @@ class NomadCOEMigration:
sleep()
if upload.tasks_status == FAILURE:
logger.error('failed to process upload', process_errors=upload.errors)
event = 'failed to process upload'
logger.error(event, process_errors=upload.errors)
package.migration_failure = event + ': ' + str(upload.errors)
report.failed_packages += 1
delete_upload(FAILED_PROCESSING)
return report
......@@ -946,7 +957,7 @@ class NomadCOEMigration:
# check for processing errors
with utils.timer(logger, 'checked upload processing'):
per_page = 500
per_page = 10000
for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
upload = self.call_api(
'uploads.get_upload', upload_id=upload.upload_id, per_page=per_page,
......@@ -1041,12 +1052,14 @@ class NomadCOEMigration:
break
if upload.tasks_status == FAILURE:
logger.error('could not publish upload', process_errors=upload.errors)
event = 'could not publish upload'
logger.error(event, process_errors=upload.errors)
report.failed_calcs = report.total_calcs
report.migrated_calcs = 0
report.calcs_with_diffs = 0
report.new_calcs = 0
report.failed_packages += 1
package.migration_failure = event + ': ' + str(upload.errors)
delete_upload(FAILED_PUBLISH)
SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
......
......@@ -386,11 +386,18 @@ class Upload(Proc):
@property
def metadata(self) -> dict:
return self.upload_files.user_metadata
# TODO user_metadata needs to be stored in the public bucket, since staging data might not be shared
try:
upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True)
except KeyError:
return None
return upload_files.user_metadata
@metadata.setter
def metadata(self, data: dict) -> None:
self.upload_files.user_metadata = data
# TODO user_metadata needs to be stored in the public bucket, since staging data might not be shared
upload_files = PublicUploadFiles(self.upload_id, is_authorized=lambda: True, create=True)
upload_files.user_metadata = data
@classmethod
def get(cls, id: str, include_published: bool = False) -> 'Upload':
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment