Commit 13db6d28 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Optimized index by lazy loading most relationships in repo db.

parent abad0d7e
Pipeline #42509 passed with stages
in 18 minutes and 59 seconds
......@@ -13,6 +13,8 @@
# limitations under the License.
import click
import time
import datetime
from nomad import config, infrastructure
from nomad.migration import NomadCOEMigration
......@@ -41,8 +43,16 @@ def migration(host, port, user, password, dbname):
@migration.command(help='Create/update the coe repository db migration index')
@click.option('--drop', help='Drop the existing index, otherwise it will only add new data.', is_flag=True)
def index(drop):
_migration.index(drop=drop)
@click.option('--with-metadata', help='Extract metadata for each calc and add it to the index.', is_flag=True)
@click.option('--per-query', default=100, help='We index many objects with one query. Default is 100.')
def index(drop, with_metadata, per_query):
start = time.time()
indexed = 0
for _, total in _migration.index(drop=drop, with_metadata=with_metadata, per_query=int(per_query)):
indexed += 1
eta = total * ((time.time() - start) / indexed)
print('indexed: %8d, total: %8d, ETA: %s\r' % (indexed, total, datetime.timedelta(seconds=eta)), end='')
print('done')
@migration.command(help='Copy users from source into empty target db')
......
......@@ -43,5 +43,5 @@ This module also provides functionality to add parsed calculation data to the db
"""
from .user import User, ensure_test_user, admin_user, LoginException
from .calc import Calc
from .calc import Calc, DataSet
from .upload import UploadMetaData, Upload
......@@ -45,7 +45,7 @@ class Calc(Base, datamodel.Calc): # type: ignore
secondary=calc_dataset_containment,
primaryjoin=calc_dataset_containment.c.children_calc_id == coe_calc_id,
secondaryjoin=calc_dataset_containment.c.parent_calc_id == coe_calc_id,
backref='children')
backref='children', lazy='joined', join_depth=2)
@classmethod
def load_from(cls, obj):
......@@ -136,7 +136,7 @@ class DataSet:
@property
def dois(self) -> List[Citation]:
return list(citation for citation in self._dataset_calc.citations if citation.kind == 'INTERNAL')
return list(citation.value for citation in self._dataset_calc.citations if citation.kind == 'INTERNAL')
@property
def name(self):
......
......@@ -17,33 +17,13 @@ This module contains functions to read data from NOMAD coe, external sources,
other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI instance.
"""
from typing import Generator, Tuple, List
import os.path
import json
from mongoengine import Document, IntField, StringField, DictField
import itertools
from nomad import utils
from nomad.coe_repo import User, Calc
from nomad.coe_repo.base import CalcMetaData
def read_metadata(calc: Calc) -> dict:
metadata = dict(
_pid=calc.pid,
_uploader=calc.uploader.user_id,
_upload_time=calc.calc_metadata.added,
_datasets=list(
dict(id=ds.id, dois=ds.dois, name=ds.name)
for ds in calc.all_datasets),
with_embargo=calc.with_embargo,
comment=calc.comment,
references=calc.references,
coauthors=list(user.user_id for user in calc.coauthors),
shared_with=list(user.user_id for user in calc.shared_with))
return {
key: value for key, value in metadata.items()
if value is not None and value != []
}
from nomad.coe_repo import User, Calc, DataSet
class SourceCalc(Document):
......@@ -58,27 +38,91 @@ class SourceCalc(Document):
extracted_prefix = '$EXTRACTED/'
sites = ['/data/nomad/extracted/', '/nomad/repository/extracted/']
prefixes = itertools.chain([extracted_prefix], sites)
prefixes = [extracted_prefix] + sites
_dataset_cache: dict = {}
@staticmethod
def _read_metadata(calc: Calc) -> dict:
datasets: List[DataSet] = []
for parent in calc.parents:
parents = SourceCalc._dataset_cache.get(parent, None)
if parents is None:
parents = parent.all_datasets
SourceCalc._dataset_cache[parent] = parents
datasets.append(DataSet(parent))
datasets.extend(parents)
metadata = dict(
_pid=calc.pid,
_uploader=calc.uploader.user_id,
_upload_time=calc.calc_metadata.added,
_datasets=list(
dict(id=ds.id, dois=ds.dois, name=ds.name)
for ds in datasets),
with_embargo=calc.with_embargo,
comment=calc.comment,
references=calc.references,
coauthors=list(user.user_id for user in calc.coauthors),
shared_with=list(user.user_id for user in calc.shared_with)
)
return {
key: value for key, value in metadata.items()
if value is not None and value != []
}
@staticmethod
def index(source, drop: bool = False):
def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
-> Generator[Tuple['SourceCalc', int], None, None]:
"""
Creates a collection of :class:`SourceCalc` documents that represent source repo
db entries. Each document relates a calc's (pid, mainfile, upload). Where
upload is the 'id'/prefix of an upload directory or upload file in the source repo's
filesystem.
Arguments:
source: The source db sql alchemy session
drop: True to create a new collection, update the existing otherwise, default is False.
with_metadata: True to also grab all user metadata and store it, default is True.
Returns:
yields tuples (:class:`SourceCalc`, #calcs_total)
"""
if drop:
SourceCalc.drop_collection()
for metadata in source.query(CalcMetaData).filter(CalcMetaData.filenames.isnot(None)).yield_per(1000):
filenames = json.loads(metadata.filenames.decode('utf-8'))
filename = filenames[0]
for prefix in SourceCalc.prefixes:
filename = filename.replace(prefix, '')
segments = [file.strip('\\') for file in filename.split('/')]
last_source_calc = SourceCalc.objects().order_by('pid').first()
start_pid = last_source_calc.pid if last_source_calc is not None else 0
source_query = source.query(Calc)
total = source_query.count()
while True:
calcs = source_query.filter(Calc.coe_calc_id > start_pid).order_by(Calc.coe_calc_id).limit(per_query)
source_calcs = []
for calc in calcs:
if calc.calc_metadata.filenames is None:
continue # dataset case
filenames = json.loads(calc.calc_metadata.filenames.decode('utf-8'))
filename = filenames[0]
for prefix in SourceCalc.prefixes:
filename = filename.replace(prefix, '')
segments = [file.strip('\\') for file in filename.split('/')]
source_calc = SourceCalc(pid=calc.pid)
source_calc.upload = segments[0]
source_calc.mainfile = os.path.join(*segments[1:])
if with_metadata:
source_calc.metadata = SourceCalc._read_metadata(calc)
source_calcs.append(source_calc)
start_pid = source_calc.pid
upload = segments[0]
mainfile = os.path.join(*segments[1:])
pid = metadata.calc.pid
yield source_calc, total
source_calc = SourceCalc(mainfile=mainfile, pid=pid, upload=upload)
source_calc.metadata = read_metadata(metadata.calc)
source_calc.save()
if len(source_calcs) == 0:
break
else:
SourceCalc.objects.insert(source_calcs)
class NomadCOEMigration:
......@@ -114,4 +158,4 @@ class NomadCOEMigration:
pass
def index(self, *args, **kwargs):
SourceCalc.index(self.source, *args, **kwargs)
return SourceCalc.index(self.source, *args, **kwargs)
......@@ -75,10 +75,29 @@ class TestNomadCOEMigration:
assert target_repo.query(coe_repo.User).filter_by(user_id=1).first().email == 'one'
assert target_repo.query(coe_repo.User).filter_by(user_id=2).first().email == 'two'
def test_index(self, migration, mockmongo):
SourceCalc.index(migration.source)
test_calc = SourceCalc.objects(mainfile='test/out.xml', upload='upload1').first()
def perform_index(self, migration, has_indexed, with_metadata, **kwargs):
has_source_calc = False
for source_calc, total in SourceCalc.index(migration.source, with_metadata=with_metadata, **kwargs):
assert source_calc.pid is not None
assert source_calc.mainfile == 'test/out.xml'
assert source_calc.upload == 'upload1'
has_source_calc = True
assert total == 1
assert has_source_calc == has_indexed
test_calc = SourceCalc.objects(mainfile='test/out.xml', upload='upload1').first()
assert test_calc is not None
assert test_calc.metadata['_uploader'] == 1
assert test_calc.metadata['comment'] == 'label1'
if with_metadata:
assert test_calc.metadata['_uploader'] == 1
assert test_calc.metadata['comment'] == 'label1'
@pytest.mark.parametrize('with_metadata', [False, True])
def test_create_index(self, migration, mockmongo, with_metadata: bool):
self.perform_index(migration, has_indexed=True, drop=True, with_metadata=with_metadata)
@pytest.mark.parametrize('with_metadata', [True, False])
def test_update_index(self, migration, mockmongo, with_metadata: bool):
self.perform_index(migration, has_indexed=True, drop=True, with_metadata=with_metadata)
self.perform_index(migration, has_indexed=False, drop=False, with_metadata=with_metadata)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment