Commit 55a7f57d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Prepared next migration.

parent 7a94a0a3
Pipeline #65345 canceled with stages
in 1 minute and 19 seconds
......@@ -30,6 +30,7 @@ import sys
import tarfile
import math
from mongoengine import Document, IntField, StringField, DictField, BooleanField
from pymongo import UpdateOne
import datetime
from bravado.exception import HTTPNotFound, HTTPBadRequest, HTTPGatewayTimeout
import os
......@@ -42,7 +43,7 @@ import random
import io
import json
from nomad import utils, infrastructure, files, config, search
from nomad import utils, infrastructure, files, config, search, processing as proc
from nomad.coe_repo import User, Calc, NoCalculation
from nomad.datamodel import CalcWithMetadata
from nomad.processing import FAILURE
......@@ -1093,7 +1094,7 @@ class NomadCOEMigration:
continue
user_payload = dict(
repo_user_id=source_user.user_id,
repo_user_id=str(source_user.user_id),
email=source_user.email,
first_name=source_user.first_name,
last_name=source_user.last_name,
......@@ -1786,55 +1787,80 @@ def update_user_metadata(bulk_size: int = 1000, update_index: bool = False, **kw
Uses kwargs as filters for the used source index query.
"""
logger = utils.get_logger(__name__)
start_time = time.time()
# iterate the source index in bulk
size = SourceCalc.objects(**kwargs).count()
for start in range(0, size, bulk_size):
source_bulk = SourceCalc.objects(**kwargs)[start, start + bulk_size]
# retrieve fairdi data for bulk (by pid)
pids = [str(calc.pid) for calc in source_bulk]
target_bulk = Calc.objects(metadata__pid__in=pids)
target_bulk_dict = {
target.metadata['pid']: target
for target in target_bulk}
# comparing entries and preparing mongo update
updates = []
updated_calcs = []
for source in source_bulk:
target = target_bulk_dict[str(source.pid)]
target_metadata = CalcWithMetadata(**target.metadata)
source_metadata_normalized = dict(
comment=source.metadata.get('comment'),
references={ref['value'] for ref in source.metadata['references']},
coauthors={str(user['id']) for user in source.metadata['coauthors']},
shared_with={str(user['id']) for user in source.metadata['shared_with']},
datasets={
dict(
id=str(ds['id']),
name=ds['name'],
doi=ds['doi']['value'] if ds['doi'] is not None else None)
for ds in source.metadata['datasets']})
target_metadata_normalized = dict(
comment=target_metadata.comment,
references=set(ref['value'] for ref in target_metadata.references),
coauthors={str(user['id']) for user in target_metadata.coauthors},
shared_with={str(user['id']) for user in target_metadata.shared_with},
datasets={
dict(
id=str(ds['id']),
name=ds['name'],
doi=ds['doi']['value'])
for ds in target_metadata.datasets})
if source_metadata_normalized != target_metadata_normalized:
# do a full update of all metadata!
update = {
"updateOne": {
"filter": dict(_id=source.pid),
"update": {
count = 0
important_changes = {
'missing_calcs': {},
'replaced': {},
'lifted_embargo': []
}
try:
for start in range(0, size, bulk_size):
source_bulk = SourceCalc.objects(**kwargs)[start: start + bulk_size]
count += bulk_size
# retrieve fairdi data for bulk (by pid)
pids = [int(calc.pid) for calc in source_bulk]
target_bulk = proc.Calc.objects(metadata__pid__in=pids)
target_bulk_dict = {
str(target.metadata['pid']): target
for target in target_bulk}
# comparing entries and preparing mongo update
updates = []
updated_calcs = []
for source in source_bulk:
target = target_bulk_dict.get(str(source.pid))
if target is None:
# missing calc (maybe we find it another way)
potential_replacements = proc.Calc.objects(mainfile=source.mainfile, metadata__pid=None)
if potential_replacements.count() == 1:
target = potential_replacements.first()
important_changes['replaced'].setdefault(source.upload, []).append(source.pid)
else:
important_changes['missing_calcs'].setdefault(source.upload, []).append(source.pid)
continue
target_metadata = CalcWithMetadata(**target.metadata)
source_metadata_normalized = dict(
comment=source.metadata.get('comment'),
references={ref['value'] for ref in source.metadata['references']},
coauthors={str(user['id']) for user in source.metadata['coauthors']},
shared_with={str(user['id']) for user in source.metadata['shared_with']},
datasets=[
dict(
id=str(ds['id']),
name=ds['name'],
doi=ds['doi']['value'] if ds['doi'] is not None else None)
for ds in source.metadata['datasets']],
with_embargo=source.metadata['with_embargo'])
source_metadata_normalized['datasets'].sort(key=lambda o: o['id'])
target_metadata_normalized = dict(
comment=target_metadata.comment,
references=set(ref['value'] for ref in target_metadata.references),
coauthors={str(user['id']) for user in target_metadata.coauthors},
shared_with={str(user['id']) for user in target_metadata.shared_with},
datasets=[
dict(
id=str(ds['id']),
name=ds['name'],
doi=ds['doi']['value'] if ds['doi'] is not None else None)
for ds in target_metadata.datasets],
with_embargo=target_metadata.with_embargo)
target_metadata_normalized['datasets'].sort(key=lambda o: o['id'])
if source_metadata_normalized != target_metadata_normalized:
# do a full update of all metadata!
update = UpdateOne(
dict(_id=target.calc_id),
{
"$set": {
"metadata.comment": source.metadata.get('comment'),
"metadata.references": [dict(value=ref['value']) for ref in source.metadata['references']],
......@@ -1844,20 +1870,36 @@ def update_user_metadata(bulk_size: int = 1000, update_index: bool = False, **kw
dict(
id=int(ds['id']),
name=ds['name'],
doi=ds['doi']['value'] if ds.get('doi') is not None else None)
for ds in source.metadata['datasets']]
doi=dict(value=ds['doi']['value']) if ds.get('doi') is not None else None)
for ds in source.metadata['datasets']],
"metadata.with_embargo": source.metadata['with_embargo']
}
}
}
}
updates.append(update)
updated_calcs.append(target_metadata)
# execute mongo update
if len(updates) > 0:
Calc._get_collection().bulk_write(updates)
if update_index:
search.index_all(updated_calcs, refresh=False)
# log
print('Synced calcs %d through %d of %d with %d diffs' % (start, start + bulk_size, size, len(updates)))
)
updates.append(update)
updated_calcs.append(target_metadata)
if target_metadata_normalized['with_embargo'] != source_metadata_normalized['with_embargo']:
important_changes['lifted_embargo'].append(source.pid)
# execute mongo update
if len(updates) > 0:
result = proc.Calc._get_collection().bulk_write(updates)
if result.bulk_api_result['nModified'] != len(updates):
logger.error('incomplete update in syncing user metadata')
if update_index:
search.index_all(updated_calcs, refresh=False)
# log
eta = ((time.time() - start_time) / float(count)) * (size - count)
print('Synced calcs %d through %d of %d with %d diffs, %s' % (
start, start + bulk_size, size, len(updates), datetime.timedelta(seconds=eta)), flush=True)
finally:
with open('sync_important_changes.json', 'wt') as f:
json.dump(important_changes, f, indent=2)
print('done')
......@@ -194,7 +194,7 @@ parsers = [
name='parsers/crystal', code_name='Crystal',
parser_class_name='crystalparser.CrystalParser',
mainfile_contents_re=(
r'(CRYSTAL\s*\n0 0 0)|('
r'(CRYSTAL\s*\n\d+ \d+ \d+)|(CRYSTAL will run on \d+ processors)|('
r'\s*\*\s{10,}CRYSTAL(?P<majorVersion>[\d]+)\s{10,}\*'
r'\s*\*\s{10,}public \: (?P<minorVersion>[\d\.]+) \- .*\*)'
)
......
......@@ -74,7 +74,8 @@ class Calc(Proc):
('upload_id', 'parser'),
('upload_id', 'tasks_status'),
('upload_id', 'process_status'),
('upload_id', 'metadata.nomad_version')
('upload_id', 'metadata.nomad_version'),
'metadata.pid'
]
}
......
......@@ -12,18 +12,19 @@ gui:
debug: true
worker:
replicas: 3
replicas: 2
routing: "worker"
dbname: fairdi_nomad_migration
postgres:
publish_enabled: false
publish_enabled: false,
dbname: 'fairdi_nomad_migration'
uploadurl: 'https://labdev-nomad.rzg.mpg.de/fairdi/nomad/migration/upload'
volumes:
prefixSize: 2
prefixSize: 1
public: /nomad/fairdi/migration/fs/public
staging: /scratch/fairdi/migration/fs/staging
tmp: /nomad/fairdi/migration/fs/tmp
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment