......@@ -352,6 +352,14 @@ def upgrade():
'--entry-update', type=str,
help='json with updates to apply to all converted entries')
'--overwrite', type=click.Choice(['always', 'if-newer', 'never'], case_sensitive=False), default='never',
help='''If an upload already exists in the destination db, this option determines whether
it and its child records should be overwritten with the data from the source db.
Possible values are "always", "if-newer", "never". Selecting "always" always overwrites,
"never" never overwrites, and "if-newer" overwrites if the upload either doesn't exist
in the destination, or it exists but its complete_time (i.e. last time it was
processed) is older than in the source db.''')
'--fix-problems', is_flag=True,
help='''If a minor, fixable problem is encountered, fixes it automaticall; otherwise fail.''')
......@@ -360,7 +368,7 @@ def upgrade():
help='Dry run (not writing anything to the destination database).')
def migrate_mongo(
host, port, src_db_name, dst_db_name, query, ids_from_file, failed_ids_to_file,
upload_update, entry_update, fix_problems, dry):
upload_update, entry_update, overwrite, fix_problems, dry):
import json
from pymongo.database import Database
from nomad import utils, infrastructure
......@@ -401,4 +409,5 @@ def migrate_mongo(
uploads = db_src.upload.find(query)
db_src, db_dst, uploads, failed_ids_to_file, upload_update, entry_update, fix_problems, dry, logger)
db_src, db_dst, uploads, failed_ids_to_file, upload_update, entry_update, overwrite,
fix_problems, dry, logger)
......@@ -17,6 +17,7 @@
import time
from datetime import datetime
from typing import List, Dict, Set, Any, Optional
from pydantic import BaseModel
......@@ -71,7 +72,7 @@ def create_collections_if_needed(db_dst: Database):
def migrate_mongo_uploads(
db_src: Database, db_dst: Database, uploads: Cursor, failed_ids_to_file: bool,
upload_update: Dict[str, Any], entry_update: Dict[str, Any],
upload_update: Dict[str, Any], entry_update: Dict[str, Any], overwrite: str,
fix_problems: bool, dry: bool, logger):
''' Converts and/or migrates an upload and all related entries and datasets. '''
number_of_uploads = uploads.count()
......@@ -81,6 +82,7 @@ def migrate_mongo_uploads( = number_of_uploads
count_treated = count_failures = count_processing = 0
failed_and_skipped = []
count_ignored = 0
start_time = time.time()
next_report_time = start_time + 60
for upload_dict in uploads:
......@@ -92,6 +94,20 @@ def migrate_mongo_uploads(
count_processing += 1
upload_dict_dst = db_dst.upload.find_one({'_id': upload_id})
if upload_dict_dst:
if overwrite == 'always':
assert not _is_processing(upload_dict_dst), 'Destination upload is processing'
elif overwrite == 'if-newer':
complete_time = upload_dict.get('complete_time', datetime.min)
complete_time_dst = upload_dict_dst.get('complete_time', datetime.min)
if _is_processing(upload_dict_dst) or complete_time <= complete_time_dst:
count_ignored += 1
elif overwrite == 'never':
count_ignored += 1
entry_dicts, dataset_dicts, doi_dicts = _convert_mongo_upload(
db_src, upload_dict, upload_update, entry_update, fix_problems,
dataset_cache, stats, logger)
......@@ -132,6 +148,12 @@ def migrate_mongo_uploads(
summary += '\nNo errors occurred :-)\n\n'
if count_processing:
summary += f'{count_processing} uploads were skipped since they are currently processing\n\n'
if count_ignored:
summary += f'{count_ignored} uploads were ignored because '
if overwrite == 'if-newer':
summary += 'they are not newer than the record in the destination db.\n\n'
summary += 'they already exist in the destination db.\n\n'
if dry:
summary += 'Dry run - nothing written to the destination db\n\n'
