Commit 8238c1b3 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'improved-migrate-cli' into 'v1.0.0'

Improved migrate cli

See merge request !460
parents 928f5af8 3b4beaf3
Pipeline #115639 passed with stages
in 29 minutes and 4 seconds
......@@ -331,12 +331,15 @@ def upgrade():
'--dst-db-name', type=str, default=config.mongo.db_name,
help='The name of the destination database. By default same as the configured db.')
@click.option(
'--query', type=str,
help='An mongo query, for selecting only some of the uploads in the source database for import.')
'--upload-query', type=str,
help='An mongo upload query. All uploads matching the query will be included in the migration.')
@click.option(
'--entry-query', type=str,
help='An mongo entry query. All uploads with an entry matching the query will be included in the migration.')
@click.option(
'--ids-from-file', type=str,
help='''Reads upload IDs from the specified file. Cannot be used together with the --query
option.
help='''Reads upload IDs from the specified file. Cannot be used together with the
--upload-query or --entry-query options.
This can for example be used to retry just the uploads that has previously failed
(as these ids can be exported to file using --failed-ids-to-file). You can specify both
--ids-from-file and --failed-ids-to-file at the same time with the same file name.''')
......@@ -346,6 +349,20 @@ def upgrade():
This can for example be used to subsequently retry just the uploads that failed
(as these ids can be loaded from file using --ids-from-file). You can specify both
--ids-from-file and --failed-ids-to-file at the same time with the same file name.''')
@click.option(
'--upload-update', type=str,
help='json with updates to apply to all converted uploads')
@click.option(
'--entry-update', type=str,
help='json with updates to apply to all converted entries')
@click.option(
'--overwrite', type=click.Choice(['always', 'if-newer', 'never'], case_sensitive=False), default='never',
help='''If an upload already exists in the destination db, this option determines whether
it and its child records should be overwritten with the data from the source db.
Possible values are "always", "if-newer", "never". Selecting "always" always overwrites,
"never" never overwrites, and "if-newer" overwrites if the upload either doesn't exist
in the destination, or it exists but its complete_time (i.e. last time it was
processed) is older than in the source db.''')
@click.option(
'--fix-problems', is_flag=True,
help='''If a minor, fixable problem is encountered, fixes it automaticall; otherwise fail.''')
......@@ -353,8 +370,8 @@ def upgrade():
'--dry', is_flag=True,
help='Dry run (not writing anything to the destination database).')
def migrate_mongo(
host, port, src_db_name, dst_db_name, query, ids_from_file, failed_ids_to_file,
fix_problems, dry):
host, port, src_db_name, dst_db_name, upload_query, entry_query,
ids_from_file, failed_ids_to_file, upload_update, entry_update, overwrite, fix_problems, dry):
import json
from pymongo.database import Database
from nomad import utils, infrastructure
......@@ -372,8 +389,12 @@ def migrate_mongo(
if not dry:
create_collections_if_needed(db_dst)
upload_ids = None
if upload_query and entry_query:
print('Cannot specify both upload-query and entry-query')
return -1
if ids_from_file:
if query is not None:
if upload_query or entry_query:
print('Cannot specify a query when using --ids-from-file.')
return -1
try:
......@@ -382,12 +403,24 @@ def migrate_mongo(
except FileNotFoundError:
logger.error(f'Could not open file {ids_from_file}')
return -1
query = {'_id': {'$in': upload_ids}}
elif query:
query = json.loads(query)
elif upload_query:
upload_query = json.loads(upload_query)
elif entry_query:
entry_query = json.loads(entry_query)
if upload_update:
upload_update = json.loads(upload_update)
if entry_update:
entry_update = json.loads(entry_update)
if entry_query:
logger.info('Quering entries...')
upload_ids = list(db_src.calc.distinct('upload_id', entry_query))
if upload_ids:
upload_query = {'_id': {'$in': upload_ids}}
logger.info('Quering uploads...')
uploads = db_src.upload.find(query)
uploads = db_src.upload.find(upload_query)
migrate_mongo_uploads(
db_src, db_dst, uploads, failed_ids_to_file, fix_problems, dry, logger)
db_src, db_dst, uploads, failed_ids_to_file, upload_update, entry_update, overwrite,
fix_problems, dry, logger)
......@@ -17,7 +17,8 @@
#
import time
from typing import List, Dict, Set, Any
from datetime import datetime
from typing import List, Dict, Set, Any, Optional
from pydantic import BaseModel
from pymongo import ReplaceOne
......@@ -48,6 +49,13 @@ class _UpgradeStatistics(BaseModel):
uploads = _CollectionStatistics(collection_name='Uploads')
entries = _CollectionStatistics(collection_name='Entries')
datasets = _CollectionStatistics(collection_name='Datasets')
dois = _CollectionStatistics(collection_name='DOIs')
class _DatasetCacheItem(BaseModel):
converted_dataset_dict: Optional[Dict[str, Any]] = None
converted_doi_dict: Optional[Dict[str, Any]] = None
ready_to_commit: bool = False
def create_collections_if_needed(db_dst: Database):
......@@ -65,15 +73,17 @@ def create_collections_if_needed(db_dst: Database):
def migrate_mongo_uploads(
db_src: Database, db_dst: Database, uploads: Cursor, failed_ids_to_file: bool,
upload_update: Dict[str, Any], entry_update: Dict[str, Any], overwrite: str,
fix_problems: bool, dry: bool, logger):
''' Converts and/or migrates an upload and all related entries and datasets. '''
number_of_uploads = uploads.count()
logger.info(f'Found {number_of_uploads} uploads to import.')
migrated_dataset_ids: Set[str] = set()
dataset_cache: Dict[str, _DatasetCacheItem] = {}
stats = _UpgradeStatistics()
stats.uploads.total = number_of_uploads
count_treated = count_failures = count_processing = 0
failed_and_skipped = []
count_ignored = 0
start_time = time.time()
next_report_time = start_time + 60
for upload_dict in uploads:
......@@ -85,10 +95,25 @@ def migrate_mongo_uploads(
count_processing += 1
failed_and_skipped.append(upload_id)
else:
entry_dicts, dataset_dicts = _convert_mongo_upload(
db_src, upload_dict, fix_problems, migrated_dataset_ids, stats, logger)
upload_dict_dst = db_dst.upload.find_one({'_id': upload_id})
if upload_dict_dst:
if overwrite == 'always':
assert not _is_processing(upload_dict_dst), 'Destination upload is processing'
elif overwrite == 'if-newer':
complete_time = upload_dict.get('complete_time', datetime.min)
complete_time_dst = upload_dict_dst.get('complete_time', datetime.min)
if _is_processing(upload_dict_dst) or complete_time <= complete_time_dst:
count_ignored += 1
continue
elif overwrite == 'never':
count_ignored += 1
continue
entry_dicts, dataset_dicts, doi_dicts = _convert_mongo_upload(
db_src, upload_dict, upload_update, entry_update, fix_problems,
dataset_cache, stats, logger)
if not dry:
_commit_upload(upload_dict, entry_dicts, dataset_dicts, db_dst, stats)
_commit_upload(upload_dict, entry_dicts, dataset_dicts, doi_dicts, db_dst, stats)
del entry_dicts, dataset_dicts # To free up memory immediately
except Exception as e:
logger.error(f'Failed to migrate upload: {str(e)}', upload_id=upload_id)
......@@ -114,7 +139,7 @@ def migrate_mongo_uploads(
summary = f'Summary:\n\nElapsed time: {_seconds_to_h_m(time.time() - start_time)}\n\n'
summary += f'{"Doc type":<10}{"found":>20}{"converted":>20}{"migrated":>20}\n'
summary += '-' * 70 + '\n'
for sub_stats in (stats.uploads, stats.entries, stats.datasets):
for sub_stats in (stats.uploads, stats.entries, stats.datasets, stats.dois):
summary += (
f'{sub_stats.collection_name:<10}{sub_stats.total:>20}' # pylint: disable=E1101
f'{sub_stats.converted:>20}{sub_stats.migrated:>20}\n')
......@@ -124,6 +149,12 @@ def migrate_mongo_uploads(
summary += '\nNo errors occurred :-)\n\n'
if count_processing:
summary += f'{count_processing} uploads were skipped since they are currently processing\n\n'
if count_ignored:
summary += f'{count_ignored} uploads were ignored because '
if overwrite == 'if-newer':
summary += 'they are not newer than the record in the destination db.\n\n'
else:
summary += 'they already exist in the destination db.\n\n'
if dry:
summary += 'Dry run - nothing written to the destination db\n\n'
logger.info(summary)
......@@ -132,13 +163,13 @@ def migrate_mongo_uploads(
def _convert_mongo_upload(
db_src: Database, upload_dict: Dict[str, Any], fix_problems: bool,
migrated_dataset_ids: Set[str], stats: _UpgradeStatistics, logger):
db_src: Database, upload_dict: Dict[str, Any],
upload_update: Dict[str, Any], entry_update: Dict[str, Any], fix_problems: bool,
dataset_cache: Dict[str, _DatasetCacheItem], stats: _UpgradeStatistics, logger):
'''
Converts (upgrades) an upload_dict and all related records. If successful,
returns two lists: one with converted entry dicts, and one with converted dataset dicts.
Datasets whose IDs are in `migrated_dataset_ids` are skipped (as they are assumed to
have been migrated, or at least attempted to migrated already).
returns three lists: one with converted entry dicts, and one with converted dataset dicts,
and one with converted DOI dicts.
'''
upload_id = upload_dict['_id']
published = upload_dict.get('publish_time') is not None
......@@ -186,7 +217,7 @@ def _convert_mongo_upload(
external_db = entry_metadata_dict.get('external_db')
if external_db != first_external_db:
if external_db and first_external_db:
# Problem is unfixable (two non-empty values encountered)
# Problem is unfixable (two different non-empty values encountered)
assert False, 'Inconsistent external_db for entries - unfixable'
elif not fix_problems:
assert False, 'Inconsistent external_db for entries - use --fix-problems to fix'
......@@ -225,33 +256,66 @@ def _convert_mongo_upload(
for field in ('_id', 'upload_create_time', 'main_author', 'embargo_length', 'license'):
assert upload_dict.get(field) is not None, f'Missing required upload field: {field}'
if upload_update:
upload_dict.update(upload_update)
# migrate entries
new_dataset_ids: Set[str] = set()
newly_encountered_dataset_ids: Set[str] = set()
for entry_dict in entry_dicts:
assert not _is_processing(entry_dict), (
f'the entry {entry_dict["_id"]} has status processing, but the upload is not processing.')
_convert_mongo_entry(entry_dict, common_coauthors, fix_problems, logger)
stats.entries.converted += 1
# Add encountered datasets
# Convert datasets
datasets = entry_dict.get('datasets')
if datasets:
converted_datasets = []
for dataset_id in datasets:
if dataset_id not in migrated_dataset_ids:
new_dataset_ids.add(dataset_id)
# Migrate newly discovered datasets
stats.datasets.total += len(new_dataset_ids)
migrated_dataset_ids.update(new_dataset_ids)
if dataset_id in dataset_cache:
ds_cache = dataset_cache[dataset_id]
else:
# First time we encounter this dataset ref
newly_encountered_dataset_ids.add(dataset_id)
ds_cache = _get_dataset_cache_data(dataset_id, db_src, logger)
dataset_cache[dataset_id] = ds_cache
if ds_cache.converted_dataset_dict is not None:
stats.datasets.total += 1
if ds_cache.converted_doi_dict is not None:
stats.dois.total += 1
if ds_cache.converted_dataset_dict is None:
# Dataset record does not exist
assert fix_problems, (
f'Missing dataset reference encountered: {dataset_id} - '
'use --fix-problems to fix')
logger.warn(
'Fixing (removing) missing dataset reference for entry',
entry_id=entry_dict['_id'], dataset_id=dataset_id)
elif not ds_cache.converted_dataset_dict:
# The value must be the empty dict, which represents a conversion failure
assert False, f'Reference to unconvertable dataset {dataset_id}'
else:
# Dataset record exists and has been converted suuccessfully
converted_datasets.append(dataset_id)
entry_dict['datasets'] = converted_datasets
if entry_update:
entry_dict.update(entry_update)
# All conversion successful! Ready to migrate
dataset_dicts: List[Dict[str, Any]] = []
for dataset_id in new_dataset_ids:
dataset_dict = db_src.dataset.find_one({'_id': dataset_id})
assert dataset_dict is not None, f'Missing dataset reference: {dataset_id}'
_convert_mongo_dataset(dataset_dict)
dataset_dicts.append(dataset_dict)
stats.datasets.converted += 1
# All conversion successful!
doi_dicts: List[Dict[str, Any]] = []
for dataset_id in newly_encountered_dataset_ids:
ds_cache = dataset_cache[dataset_id]
if not ds_cache.ready_to_commit:
if ds_cache.converted_dataset_dict:
dataset_dicts.append(ds_cache.converted_dataset_dict)
stats.datasets.converted += 1
if ds_cache.converted_doi_dict:
doi_dicts.append(ds_cache.converted_doi_dict)
stats.dois.converted += 1
ds_cache.ready_to_commit = True
stats.entries.converted += len(entry_dicts)
stats.uploads.converted += 1
return entry_dicts, dataset_dicts
return entry_dicts, dataset_dicts, doi_dicts
def _convert_mongo_entry(entry_dict: Dict[str, Any], common_coauthors: Set, fix_problems: bool, logger):
......@@ -326,8 +390,38 @@ def _convert_mongo_dataset(dataset_dict: Dict[str, Any]):
_rename_key(dataset_dict, 'modified', 'dataset_modified_time')
# Check that all required fields are there
for field in ('dataset_name',):
assert dataset_dict.get(field) is not None, (
f'Dataset {dataset_dict["_id"]} missing required field {field}')
assert dataset_dict.get(field) is not None, f'Dataset missing required field {field}'
def _convert_mongo_doi(doi_dict: Dict[str, Any]):
pass
def _get_dataset_cache_data(dataset_id: str, db_src: Database, logger) -> _DatasetCacheItem:
'''
Fetches and converts a dataset and related doi record. None values will be used in the
returned :class:`_DatasetCacheItem` if a record is missing; empty dicts are used if the
record exists but can't be converted
'''
dataset_dict = db_src.dataset.find_one({'_id': dataset_id})
doi_dict = None
if dataset_dict:
try:
_convert_mongo_dataset(dataset_dict)
doi = dataset_dict.get('doi')
if doi:
doi_dict = db_src.d_o_i.find_one({'_id': doi})
# DOI records were not stored originally, so some older datasets with
# doi might not have a corresponding DOI record. The data is also
# not super critical, so if it doesn't exist we will just ignore it
if doi_dict:
_convert_mongo_doi(doi_dict)
except Exception as e:
# Conversion failed
logger.error(str(e), dataset_id=dataset_id)
dataset_dict = {}
doi_dict = {}
return _DatasetCacheItem(converted_dataset_dict=dataset_dict, converted_doi_dict=doi_dict)
def _is_processing(proc_dict: Dict[str, Any]) -> bool:
......@@ -338,7 +432,8 @@ def _is_processing(proc_dict: Dict[str, Any]) -> bool:
def _commit_upload(
upload_dict: Dict[str, Any], entry_dicts: List[Dict[str, Any]], dataset_dicts: List[Dict[str, Any]],
upload_dict: Dict[str, Any], entry_dicts: List[Dict[str, Any]],
dataset_dicts: List[Dict[str, Any]], doi_dicts: List[Dict[str, Any]],
db_dst: Database, stats: _UpgradeStatistics):
# Commit datasets
if dataset_dicts:
......@@ -347,6 +442,13 @@ def _commit_upload(
dataset_writes.append(ReplaceOne({'_id': dataset_dict['_id']}, dataset_dict, upsert=True))
db_dst.dataset.bulk_write(dataset_writes)
stats.datasets.migrated += len(dataset_dicts)
# Commit DOIs
if doi_dicts:
doi_writes = []
for doi_dict in doi_dicts:
doi_writes.append(ReplaceOne({'_id': doi_dict['_id']}, doi_dict, upsert=True))
db_dst.d_o_i.bulk_write(doi_writes)
stats.dois.migrated += len(doi_writes)
# Commit upload
db_dst.upload.replace_one({'_id': upload_dict['_id']}, upload_dict, upsert=True)
stats.uploads.migrated += 1
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment