Commit 9a78848f authored by David Sikter's avatar David Sikter
Browse files

Adding code for migrating DOI and fixing missing dataset refs

parent f782e11e
Pipeline #115452 passed with stages
in 33 minutes and 12 seconds
......@@ -17,7 +17,7 @@
#
import time
from typing import List, Dict, Set, Any
from typing import List, Dict, Set, Any, Optional
from pydantic import BaseModel
from pymongo import ReplaceOne
......@@ -47,6 +47,13 @@ class _UpgradeStatistics(BaseModel):
uploads = _CollectionStatistics(collection_name='Uploads')
entries = _CollectionStatistics(collection_name='Entries')
datasets = _CollectionStatistics(collection_name='Datasets')
dois = _CollectionStatistics(collection_name='DOIs')
class _DatasetCacheItem(BaseModel):
converted_dataset_dict: Optional[Dict[str, Any]] = None
converted_doi_dict: Optional[Dict[str, Any]] = None
ready_to_commit: bool = False
def create_collections_if_needed(db_dst: Database):
......@@ -68,7 +75,7 @@ def migrate_mongo_uploads(
''' Converts and/or migrates an upload and all related entries and datasets. '''
number_of_uploads = uploads.count()
logger.info(f'Found {number_of_uploads} uploads to import.')
migrated_dataset_ids: Set[str] = set()
dataset_cache: Dict[str, _DatasetCacheItem] = {}
stats = _UpgradeStatistics()
stats.uploads.total = number_of_uploads
count_treated = count_failures = count_processing = 0
......@@ -84,10 +91,10 @@ def migrate_mongo_uploads(
count_processing += 1
failed_and_skipped.append(upload_id)
else:
entry_dicts, dataset_dicts = _convert_mongo_upload(
db_src, upload_dict, fix_problems, migrated_dataset_ids, stats, logger)
entry_dicts, dataset_dicts, doi_dicts = _convert_mongo_upload(
db_src, upload_dict, fix_problems, dataset_cache, stats, logger)
if not dry:
_commit_upload(upload_dict, entry_dicts, dataset_dicts, db_dst, stats)
_commit_upload(upload_dict, entry_dicts, dataset_dicts, doi_dicts, db_dst, stats)
del entry_dicts, dataset_dicts # To free up memory immediately
except Exception as e:
logger.error(f'Failed to migrate upload: {str(e)}', upload_id=upload_id)
......@@ -113,7 +120,7 @@ def migrate_mongo_uploads(
summary = f'Summary:\n\nElapsed time: {_seconds_to_h_m(time.time() - start_time)}\n\n'
summary += f'{"Doc type":<10}{"found":>20}{"converted":>20}{"migrated":>20}\n'
summary += '-' * 70 + '\n'
for sub_stats in (stats.uploads, stats.entries, stats.datasets):
for sub_stats in (stats.uploads, stats.entries, stats.datasets, stats.dois):
summary += (
f'{sub_stats.collection_name:<10}{sub_stats.total:>20}' # pylint: disable=E1101
f'{sub_stats.converted:>20}{sub_stats.migrated:>20}\n')
......@@ -132,12 +139,11 @@ def migrate_mongo_uploads(
def _convert_mongo_upload(
db_src: Database, upload_dict: Dict[str, Any], fix_problems: bool,
migrated_dataset_ids: Set[str], stats: _UpgradeStatistics, logger):
dataset_cache: Dict[str, _DatasetCacheItem], stats: _UpgradeStatistics, logger):
'''
Converts (upgrades) an upload_dict and all related records. If successful,
returns two lists: one with converted entry dicts, and one with converted dataset dicts.
Datasets whose IDs are in `migrated_dataset_ids` are skipped (as they are assumed to
have been migrated, or at least attempted to migrated already).
returns three lists: one with converted entry dicts, and one with converted dataset dicts,
and one with converted DOI dicts.
'''
upload_id = upload_dict['_id']
published = upload_dict.get('publish_time') is not None
......@@ -185,7 +191,7 @@ def _convert_mongo_upload(
external_db = entry_metadata_dict.get('external_db')
if external_db != first_external_db:
if external_db and first_external_db:
# Problem is unfixable (two non-empty values encountered)
# Problem is unfixable (two different non-empty values encountered)
assert False, 'Inconsistent external_db for entries - unfixable'
elif not fix_problems:
assert False, 'Inconsistent external_db for entries - use --fix-problems to fix'
......@@ -225,32 +231,59 @@ def _convert_mongo_upload(
assert upload_dict.get(field) is not None, f'Missing required upload field: {field}'
# migrate entries
new_dataset_ids: Set[str] = set()
newly_encountered_dataset_ids: Set[str] = set()
for entry_dict in entry_dicts:
assert not _is_processing(entry_dict), (
f'the entry {entry_dict["_id"]} has status processing, but the upload is not processing.')
_convert_mongo_entry(entry_dict, common_coauthors, fix_problems, logger)
stats.entries.converted += 1
# Add encountered datasets
# Convert datasets
datasets = entry_dict.get('datasets')
if datasets:
converted_datasets = []
for dataset_id in datasets:
if dataset_id not in migrated_dataset_ids:
new_dataset_ids.add(dataset_id)
# Migrate newly discovered datasets
stats.datasets.total += len(new_dataset_ids)
migrated_dataset_ids.update(new_dataset_ids)
if dataset_id in dataset_cache:
ds_cache = dataset_cache[dataset_id]
else:
# First time we encounter this dataset ref
newly_encountered_dataset_ids.add(dataset_id)
ds_cache = _get_dataset_cache_data(dataset_id, db_src, logger)
dataset_cache[dataset_id] = ds_cache
if ds_cache.converted_dataset_dict is not None:
stats.datasets.total += 1
if ds_cache.converted_doi_dict is not None:
stats.dois.total += 1
if ds_cache.converted_dataset_dict is None:
# Dataset record does not exist
assert fix_problems, (
f'Missing dataset reference encountered: {dataset_id} - '
'use --fix-problems to fix')
logger.warn(
'Fixing (removing) missing dataset reference for entry',
entry_id=entry_dict['_id'], dataset_id=dataset_id)
elif not ds_cache.converted_dataset_dict:
# The value must be the empty dict, which represents a conversion failure
assert False, f'Reference to unconvertable dataset {dataset_id}'
else:
# Dataset record exists and has been converted suuccessfully
converted_datasets.append(dataset_id)
entry_dict['datasets'] = converted_datasets
# All conversion successful! Ready to migrate
dataset_dicts: List[Dict[str, Any]] = []
for dataset_id in new_dataset_ids:
dataset_dict = db_src.dataset.find_one({'_id': dataset_id})
assert dataset_dict is not None, f'Missing dataset reference: {dataset_id}'
_convert_mongo_dataset(dataset_dict)
dataset_dicts.append(dataset_dict)
stats.datasets.converted += 1
# All conversion successful!
doi_dicts: List[Dict[str, Any]] = []
for dataset_id in newly_encountered_dataset_ids:
ds_cache = dataset_cache[dataset_id]
if not ds_cache.ready_to_commit:
if ds_cache.converted_dataset_dict:
dataset_dicts.append(ds_cache.converted_dataset_dict)
stats.datasets.converted += 1
if ds_cache.converted_doi_dict:
doi_dicts.append(ds_cache.converted_doi_dict)
stats.dois.converted += 1
ds_cache.ready_to_commit = True
stats.entries.converted += len(entry_dicts)
stats.uploads.converted += 1
return entry_dicts, dataset_dicts
return entry_dicts, dataset_dicts, doi_dicts
def _convert_mongo_entry(entry_dict: Dict[str, Any], common_coauthors: Set, fix_problems: bool, logger):
......@@ -321,8 +354,38 @@ def _convert_mongo_dataset(dataset_dict: Dict[str, Any]):
_rename_key(dataset_dict, 'modified', 'dataset_modified_time')
# Check that all required fields are there
for field in ('dataset_name',):
assert dataset_dict.get(field) is not None, (
f'Dataset {dataset_dict["_id"]} missing required field {field}')
assert dataset_dict.get(field) is not None, f'Dataset missing required field {field}'
def _convert_mongo_doi(doi_dict: Dict[str, Any]):
pass
def _get_dataset_cache_data(dataset_id: str, db_src: Database, logger) -> _DatasetCacheItem:
'''
Fetches and converts a dataset and related doi record. None values will be used in the
returned :class:`_DatasetCacheItem` if a record is missing; empty dicts are used if the
record exists but can't be converted
'''
dataset_dict = db_src.dataset.find_one({'_id': dataset_id})
doi_dict = None
if dataset_dict:
try:
_convert_mongo_dataset(dataset_dict)
doi = dataset_dict.get('doi')
if doi:
doi_dict = db_src.d_o_i.find_one({'_id': doi})
# DOI records were not stored originally, so some older datasets with
# doi might not have a corresponding DOI record. The data is also
# not super critical, so if it doesn't exist we will just ignore it
if doi_dict:
_convert_mongo_doi(doi_dict)
except Exception as e:
# Conversion failed
logger.error(str(e), dataset_id=dataset_id)
dataset_dict = {}
doi_dict = {}
return _DatasetCacheItem(converted_dataset_dict=dataset_dict, converted_doi_dict=doi_dict)
def _is_processing(proc_dict: Dict[str, Any]) -> bool:
......@@ -333,7 +396,8 @@ def _is_processing(proc_dict: Dict[str, Any]) -> bool:
def _commit_upload(
upload_dict: Dict[str, Any], entry_dicts: List[Dict[str, Any]], dataset_dicts: List[Dict[str, Any]],
upload_dict: Dict[str, Any], entry_dicts: List[Dict[str, Any]],
dataset_dicts: List[Dict[str, Any]], doi_dicts: List[Dict[str, Any]],
db_dst: Database, stats: _UpgradeStatistics):
# Commit datasets
if dataset_dicts:
......@@ -342,6 +406,13 @@ def _commit_upload(
dataset_writes.append(ReplaceOne({'_id': dataset_dict['_id']}, dataset_dict, upsert=True))
db_dst.dataset.bulk_write(dataset_writes)
stats.datasets.migrated += len(dataset_dicts)
# Commit DOIs
if doi_dicts:
doi_writes = []
for doi_dict in doi_dicts:
doi_writes.append(ReplaceOne({'_id': doi_dict['_id']}, doi_dict, upsert=True))
db_dst.d_o_i.bulk_write(doi_writes)
stats.dois.migrated += len(doi_writes)
# Commit upload
db_dst.upload.replace_one({'_id': upload_dict['_id']}, upload_dict, upsert=True)
stats.uploads.migrated += 1
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment