Commit 02ce53ec authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Migration with versions and package parallization.

parent 76986e23
......@@ -97,7 +97,8 @@ def pid_prefix(prefix: int):
@click.option('--create-packages', help='Allow migration to create package entries on the fly.', is_flag=True)
@click.option('--local', help='Create local upload files.', is_flag=True)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
def upload(paths: list, create_packages, local: bool, parallel: int):
@click.option('--migration-version', default=0, type=int, help='The version number, only packages with lower or no number will be migrated.')
def upload(paths: list, create_packages, local: bool, parallel: int, migration_version: int):
def producer():
for path in paths:
......@@ -108,7 +109,7 @@ def upload(paths: list, create_packages, local: bool, parallel: int):
infrastructure.setup_mongo()
logger = utils.get_logger(__name__)
migration = NomadCOEMigration()
migration = NomadCOEMigration(migration_version=migration_version)
while True:
path = task.get_one()
......
......@@ -42,7 +42,7 @@ This module also provides functionality to add parsed calculation data to the db
:undoc-members:
"""
from typing import Type
from typing import Type, Callable
import datetime
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.orm import relationship
......@@ -109,7 +109,7 @@ class Upload(Base): # type: ignore
return self.created
@staticmethod
def add(upload: UploadWithMetadata) -> int:
def publish(upload: UploadWithMetadata) -> Callable[[bool], int]:
"""
Add the upload to the NOMAD-coe repository db. It creates an
uploads-entry, respective calculation and property entries. Everything in one
......@@ -117,6 +117,10 @@ class Upload(Base): # type: ignore
Arguments:
upload: The upload to add, including calculations with respective IDs, UMD, CMD.
Returns: A callback that allows to commit or rollback the publish transaction.
The callback returns the ``coe_upload_id`` or -1, if rolledback or no upload
was created, due to no calcs in the upload.
"""
assert upload.uploader is not None
......@@ -125,7 +129,7 @@ class Upload(Base): # type: ignore
logger = utils.get_logger(__name__, upload_id=upload.upload_id)
result = None
has_calcs = False
try:
# create upload
coe_upload = Upload(
......@@ -136,7 +140,6 @@ class Upload(Base): # type: ignore
repo_db.add(coe_upload)
# add calculations and metadata
has_calcs = False
# reuse the cache for the whole transaction to profit from repeating
# star schema entries for users, ds, topics, etc.
context = PublishContext(upload_id=upload.upload_id)
......@@ -152,16 +155,24 @@ class Upload(Base): # type: ignore
logger.debug('added calculation, not yet committed', calc_id=coe_calc.calc_id)
# commit
if has_calcs:
# empty upload case
repo_db.commit()
result = coe_upload.coe_upload_id
else:
repo_db.rollback()
logger.info('added upload')
def complete(commit: bool) -> int:
if commit:
if has_calcs:
# empty upload case
repo_db.commit()
return coe_upload.coe_upload_id
else:
repo_db.rollback()
return -1
logger.info('added upload')
else:
repo_db.rollback()
logger.info('rolled upload back')
return -1
return complete
except Exception as e:
logger.error('Unexpected exception.', exc_info=e)
repo_db.rollback()
raise e
return result
......@@ -105,7 +105,9 @@ class Package(Document):
size = IntField()
""" The sum of all file sizes """
meta = dict(indexes=['upload_id'])
migration_version = IntField()
meta = dict(indexes=['upload_id', 'migration_version'])
def open_package_upload_file(self) -> IO:
""" Creates a streaming zip file from the files of this package. """
......@@ -248,17 +250,13 @@ class SourceCalc(Document):
upload = StringField()
metadata = DictField()
migration_id = StringField()
"""
used to id individual runs, if this has the id of the current run, its migrated by
this run.
"""
migration_version = IntField()
extracted_prefix = '$EXTRACTED/'
sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
prefixes = [extracted_prefix] + sites
meta = dict(indexes=['upload', 'mainfile', 'migration_id'])
meta = dict(indexes=['upload', 'mainfile', 'migration_version'])
_dataset_cache: dict = {}
......@@ -336,17 +334,11 @@ class SourceCalc(Document):
class NomadCOEMigration:
"""
Drives a migration from the NOMAD coe repository db to nomad@FAIRDI. It is assumed
that this class is never used on the worker or api service. It assumes the
default coe repo connection as a connection to the source repository db.
Attributes:
source: SQLAlchemy session for the source NOMAD coe repository db.
Drives a migration from the NOMAD coe repository db to nomad@FAIRDI.
Arguments:
sites: Directories that might contain uploads to migrate. Use to override defaults.
pid_prefix: All PIDs for previously unknown calculations will get a PID higher
than that. Use to override default.
migration_version: The migration version. Only packages/calculations with
no migration version or a lower migration version are migrated.
"""
default_sites = [
......@@ -360,14 +352,11 @@ class NomadCOEMigration:
archive_filename = 'archive.tar.gz'
""" The standard name for tarred uploads in the CoE repository. """
def __init__(
self,
sites: List[str] = default_sites,
pid_prefix: int = default_pid_prefix) -> None:
self.sites, self.pid_prefix = sites, pid_prefix
self.logger = utils.get_logger(__name__)
def __init__(self, migration_version: int = 0) -> None:
self.logger = utils.get_logger(__name__, migration_version=migration_version)
self.migration_version = migration_version
self._client = None
self.source = infrastructure.repository_db
@property
......@@ -519,8 +508,7 @@ class NomadCOEMigration:
self.logger.info('set pid prefix', pid_prefix=prefix)
self.client.admin.exec_pidprefix_command(payload=dict(prefix=prefix)).response()
def migrate(
self, upload_path, create_packages: bool = False, local: bool = False) -> utils.POPO:
def migrate(self, upload_path, create_packages: bool = False, local: bool = False) -> utils.POPO:
"""
Migrate the given uploads.
......@@ -544,18 +532,14 @@ class NomadCOEMigration:
Returns: Yields a dictionary with status and statistics for each given upload.
"""
from nomad.client import stream_upload_with_client
migration_id = utils.create_uuid()
logger = self.logger.bind(upload_path=upload_path, migration_id=migration_id)
source_calcs = None # grab source calcs while waiting for the first processing
# get the packages
packages, source_upload_id = self._packages(upload_path, create=create_packages)
logger = self.logger.bind(source_upload_id=source_upload_id)
# initialize upload report
upload_report = utils.POPO()
upload_report.total_packages = 0
upload_report.failed_packages = 0
upload_report.total_source_calcs = 0
upload_report.total_calcs = 0
upload_report.failed_calcs = 0
......@@ -564,157 +548,177 @@ class NomadCOEMigration:
upload_report.new_calcs = 0
upload_report.missing_calcs = 0
# iterate all packages of upload
for package in packages:
package_id = package.package_id
logger = logger.bind(package_id=package_id, source_upload_id=source_upload_id)
logger.debug('start to process package')
# initialize package report
report = utils.POPO()
report.total_calcs = 0
report.failed_calcs = 0
report.migrated_calcs = 0
report.calcs_with_diffs = 0
report.new_calcs = 0
report.missing_calcs = 0
# upload and process the upload file
with utils.timer(logger, 'upload completed'):
try:
if local:
upload_filepath = package.create_package_upload_file()
self.logger.debug('created package upload file')
upload = self.client.uploads.upload(
name=package_id, local_path=upload_filepath).response().result
if package.migration_version is not None and package.migration_version >= self.migration_version:
logger.info('package already migrated', package_id=package.package_id)
continue
package_report = self.migrate_package(package)
if package_report is not None:
for key, value in package_report.items():
upload_report[key] += value
else:
upload_report.failed_packages += 1
upload_report.total_packages += 1
upload_report.total_source_calcs = SourceCalc.objects(upload=source_upload_id).count()
upload_report.missing_calcs = SourceCalc.objects(
upload=source_upload_id, migration_version__ne=self.migration_version).count()
self.logger.info('migrated upload', upload_path=upload_path, **upload_report)
return upload_report
def migrate_package(self, package: Package, local: bool = False):
source_upload_id = package.upload_id
package_id = package.package_id
logger = self.logger.bind(package_id=package_id, source_upload_id=source_upload_id)
logger.debug('start to process package')
# initialize package report
report = utils.POPO()
report.total_calcs = 0
report.failed_calcs = 0
report.migrated_calcs = 0
report.calcs_with_diffs = 0
report.new_calcs = 0
report.missing_calcs = 0
# upload and process the upload file
from nomad.client import stream_upload_with_client
with utils.timer(logger, 'upload completed'):
try:
if local:
upload_filepath = package.create_package_upload_file()
self.logger.debug('created package upload file')
upload = self.client.uploads.upload(
name=package_id, local_path=upload_filepath).response().result
else:
upload_f = package.open_package_upload_file()
self.logger.debug('opened package upload file')
upload = stream_upload_with_client(self.client, upload_f, name=package_id)
except Exception as e:
self.logger.error('could not upload package', exc_info=e)
return None
logger = logger.bind(
source_upload_id=source_upload_id, upload_id=upload.upload_id)
# wait for complete upload
with utils.timer(logger, 'upload processing completed'):
sleep = utils.SleepTimeBackoff()
while upload.tasks_running:
upload = self.client.uploads.get_upload(upload_id=upload.upload_id).response().result
sleep()
if upload.tasks_status == FAILURE:
logger.error('failed to process upload', process_errors=upload.errors)
return None
else:
report.total_calcs += upload.calcs.pagination.total
calc_mainfiles = []
upload_total_calcs = upload.calcs.pagination.total
# check for processing errors
with utils.timer(logger, 'checked upload processing'):
per_page = 200
for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
upload = self.client.uploads.get_upload(
upload_id=upload.upload_id, per_page=per_page, page=page,
order_by='mainfile').response().result
for calc_proc in upload.calcs.results:
calc_logger = logger.bind(
calc_id=calc_proc.calc_id,
mainfile=calc_proc.mainfile)
if calc_proc.tasks_status == SUCCESS:
calc_mainfiles.append(calc_proc.mainfile)
else:
upload_f = package.open_package_upload_file()
self.logger.debug('opened package upload file')
upload = stream_upload_with_client(self.client, upload_f, name=package_id)
except Exception as e:
self.logger.error('could not upload package', exc_info=e)
continue
report.failed_calcs += 1
calc_logger.error(
'could not process a calc', process_errors=calc_proc.errors)
continue
logger = logger.bind(
source_upload_id=source_upload_id, upload_id=upload.upload_id)
# grab source calcs
source_calcs = dict()
with utils.timer(logger, 'loaded source metadata'):
for source_calc in SourceCalc.objects(
upload=source_upload_id, mainfile__in=calc_mainfiles):
source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
source_calc_with_metadata.pid = source_calc.pid
source_calc_with_metadata.mainfile = source_calc.mainfile
source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)
# verify upload against source
calcs_in_search = 0
with utils.timer(logger, 'varyfied upload against source calcs'):
for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
search = self.client.repo.search(
page=page, per_page=per_page, upload_id=upload.upload_id,
order_by='mainfile').response().result
for calc in search.results:
calcs_in_search += 1
source_calc, source_calc_with_metadata = source_calcs.get(
calc['mainfile'], (None, None))
if source_calc is not None:
report.migrated_calcs += 1
calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
if not self._validate(calc, source_calc_with_metadata, calc_logger):
report.calcs_with_diffs += 1
else:
calc_logger.info('processed a calc that has no source')
report.new_calcs += 1
# wait for complete upload
with utils.timer(logger, 'upload processing completed'):
sleep = utils.SleepTimeBackoff()
while upload.tasks_running:
upload = self.client.uploads.get_upload(upload_id=upload.upload_id).response().result
sleep()
if len(calc_mainfiles) != calcs_in_search:
logger.error('missmatch between processed calcs and calcs found with search')
if upload.tasks_status == FAILURE:
logger.error('failed to process upload', process_errors=upload.errors)
continue
else:
report.total_calcs += upload.calcs.pagination.total
calc_mainfiles = []
upload_total_calcs = upload.calcs.pagination.total
# check for processing errors
with utils.timer(logger, 'checked upload processing'):
per_page = 200
for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
upload = self.client.uploads.get_upload(
upload_id=upload.upload_id, per_page=per_page, page=page,
order_by='mainfile').response().result
for calc_proc in upload.calcs.results:
calc_logger = logger.bind(
calc_id=calc_proc.calc_id,
mainfile=calc_proc.mainfile)
if calc_proc.tasks_status == SUCCESS:
calc_mainfiles.append(calc_proc.mainfile)
else:
report.failed_calcs += 1
calc_logger.error(
'could not process a calc', process_errors=calc_proc.errors)
continue
# grab source calcs
source_calcs = dict()
with utils.timer(logger, 'loaded source metadata'):
for source_calc in SourceCalc.objects(
upload=source_upload_id, mainfile__in=calc_mainfiles):
source_calc_with_metadata = CalcWithMetadata(**source_calc.metadata)
source_calc_with_metadata.pid = source_calc.pid
source_calc_with_metadata.mainfile = source_calc.mainfile
source_calcs[source_calc.mainfile] = (source_calc, source_calc_with_metadata)
# verify upload against source
with utils.timer(logger, 'varyfied upload against source calcs'):
for page in range(1, math.ceil(upload_total_calcs / per_page) + 1):
search = self.client.repo.search(
page=page, per_page=per_page, upload_id=upload.upload_id,
order_by='mainfile').response().result
for calc in search.results:
source_calc, source_calc_with_metadata = source_calcs.get(
calc['mainfile'], (None, None))
if source_calc is not None:
report.migrated_calcs += 1
calc_logger = logger.bind(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
if not self._validate(calc, source_calc_with_metadata, calc_logger):
report.calcs_with_diffs += 1
else:
calc_logger.info('processed a calc that has no source')
report.new_calcs += 1
SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
.update(migration_id=migration_id)
# publish upload
if len(calc_mainfiles) > 0:
with utils.timer(logger, 'upload published'):
upload_metadata = dict(with_embargo=(package.restricted > 0))
upload_metadata['calculations'] = [
self._to_api_metadata(source_calc_with_metadata)
for _, source_calc_with_metadata in source_calcs.values()]
upload = self.client.uploads.exec_upload_operation(
upload_id=upload.upload_id,
payload=dict(operation='publish', metadata=upload_metadata)
).response().result
sleep = utils.SleepTimeBackoff()
while upload.process_running:
try:
upload = self.client.uploads.get_upload(
upload_id=upload.upload_id).response().result
sleep()
except HTTPNotFound:
# the proc upload will be deleted by the publish operation
break
if upload.tasks_status == FAILURE:
logger.error('could not publish upload', process_errors=upload.errors)
report.failed_calcs = report.total_calcs
report.migrated_calcs = 0
report.calcs_with_diffs = 0
report.new_calcs = 0
SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
.update(migration_id=None)
else:
logger.info('no successful calcs, skip publish')
# publish upload
if len(calc_mainfiles) > 0:
with utils.timer(logger, 'upload published'):
upload_metadata = dict(with_embargo=(package.restricted > 0))
upload_metadata['calculations'] = [
self._to_api_metadata(source_calc_with_metadata)
for _, source_calc_with_metadata in source_calcs.values()]
report.missing_calcs = report.total_calcs - report.migrated_calcs
logger.info('migrated package', **report)
upload = self.client.uploads.exec_upload_operation(
upload_id=upload.upload_id,
payload=dict(operation='publish', metadata=upload_metadata)
).response().result
for key, value in report.items():
upload_report[key] += value
sleep = utils.SleepTimeBackoff()
while upload.process_running:
try:
upload = self.client.uploads.get_upload(
upload_id=upload.upload_id).response().result
sleep()
except HTTPNotFound:
# the proc upload will be deleted by the publish operation
break
if upload.tasks_status == FAILURE:
logger.error('could not publish upload', process_errors=upload.errors)
report.failed_calcs = report.total_calcs
report.migrated_calcs = 0
report.calcs_with_diffs = 0
report.new_calcs = 0
else:
SourceCalc.objects(upload=source_upload_id, mainfile__in=calc_mainfiles) \
.update(migration_version=self.migration_version)
package.migration_version = self.migration_version
package.save()
else:
logger.info('no successful calcs, skip publish')
upload_report.total_source_calcs = SourceCalc.objects(upload=source_upload_id).count()
upload_report.missing_calcs = SourceCalc.objects(upload=source_upload_id, migration_id__ne=migration_id).count()
report.missing_calcs = report.total_calcs - report.migrated_calcs
logger.info('migrated package', **report)
self.logger.info(
'migrated upload', upload_path=upload_path, migration_id=migration_id, **upload_report)
return upload_report
return report
def _to_api_metadata(self, calc_with_metadata: CalcWithMetadata) -> dict:
""" Transforms to a dict that fullfils the API's uploade metadata model. """
......
......@@ -400,36 +400,41 @@ class Upload(Proc):
with utils.timer(
logger, 'upload added to repository', step='publish',
upload_size=self.upload_files.size):
coe_repo.Upload.add(upload_with_metadata)
upload_transaction_complete = coe_repo.Upload.publish(upload_with_metadata)
with utils.timer(
logger, 'staged upload files packed', step='publish',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
for coe_calc in coe_upload.calcs:
calc_metadata = coe_calc.to_calc_with_metadata()
calc_metadata.published = True
self.upload_files.metadata.update(
calc_id=calc_metadata.calc_id, updates=calc_metadata.to_dict())
logger.info('metadata updated after publish to coe repo', step='publish')
self.upload_files.pack()
with utils.timer(
logger, 'index updated', step='publish',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
search.publish(
[coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs])
with utils.timer(
logger, 'staged upload deleted', step='publish',
upload_size=self.upload_files.size):
self.upload_files.delete()
self.delete()
try:
with utils.timer(
logger, 'staged upload files packed', step='publish',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
for coe_calc in coe_upload.calcs:
calc_metadata = coe_calc.to_calc_with_metadata()
calc_metadata.published = True
self.upload_files.metadata.update(
calc_id=calc_metadata.calc_id, updates=calc_metadata.to_dict())
logger.info('metadata updated after publish to coe repo', step='publish')
self.upload_files.pack()
with utils.timer(
logger, 'index updated', step='publish',
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
search.publish(
[coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs])
with utils.timer(
logger, 'staged upload deleted', step='publish',
upload_size=self.upload_files.size):
self.upload_files.delete()
self.delete()
except Exception as e:
upload_transaction_complete(False)
raise e
upload_transaction_complete(True)
return True # do not save the process status on the delete upload
@process
......@@ -532,6 +537,8 @@ class Upload(Proc):
@task
def cleanup(self):
search.refresh()
# send email about process finish
user = self.uploader
name = '%s %s' % (user.first_name, user.last_name)
......
......@@ -181,6 +181,11 @@ def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
yield entry.to_dict(include_meta=True)
elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
refresh()