Commit 5f4318fa authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Completed baseline migration test. Normalized atoms, version, functional names for repository.

parent d3b00ee2
......@@ -146,7 +146,7 @@ class Calc(Base, datamodel.Calc): # type: ignore
elif topic.cid == base.topic_system_type:
result.system_type = topic.topic
elif topic.cid == base.topic_atoms:
result.setdefault('atom_species', []).append(topic.topic)
result.setdefault('atom_labels', []).append(topic.topic)
elif topic.cid == base.topic_crystal_system:
result.crystal_system = topic.topic
else:
......@@ -155,7 +155,7 @@ class Calc(Base, datamodel.Calc): # type: ignore
result.program_version = self.calc_metadata.version.content
result.chemical_composition = self.calc_metadata.chemical_formula
result.space_group_number = self.spacegroup.n
result.setdefault('atom_species', []).sort()
result.setdefault('atom_labels', []).sort()
datasets: List[DataSet] = []
for parent in self.parents:
......
......@@ -214,10 +214,10 @@ class Upload(Base, datamodel.Upload): # type: ignore
# topic based properties
coe_calc.set_value(base.topic_code, calc.program_name)
for atom in set(calc.atom_species):
coe_calc.set_value(base.topic_atoms, str(atom)) # TODO atom label not number
for atom in set(calc.atom_labels):
coe_calc.set_value(base.topic_atoms, str(atom))
coe_calc.set_value(base.topic_system_type, calc.system_type)
coe_calc.set_value(base.topic_xc_treatment, calc.XC_functional_name) # TODO function->treatment
coe_calc.set_value(base.topic_xc_treatment, calc.XC_functional_name)
coe_calc.set_value(base.topic_crystal_system, calc.crystal_system)
coe_calc.set_value(base.topic_basis_set_type, calc.basis_set_type)
......
......@@ -24,6 +24,8 @@ api, processing, migration, mirroring, or other 'infrastructure' operations.
from typing import Type, TypeVar, Union, Iterable, cast, Callable, Dict
import datetime
from nomad import utils
T = TypeVar('T')
......@@ -103,7 +105,7 @@ class UploadWithMetadata(dict, Entity):
self.upload_id = upload_id
class CalcWithMetadata(dict, Entity):
class CalcWithMetadata(utils.POPO, Entity):
"""
A dict/POPO class that can be used for mapping calc representations with calc metadata.
We have many representations of calcs and their calc metadata. To avoid implement
......@@ -117,7 +119,7 @@ class CalcWithMetadata(dict, Entity):
@classmethod
def register_mapping(
cls, from_type: Type[T], mapping: Callable[[T], 'CalcWithMetadata']):
cls, from_type: Type[Entity], mapping: Callable[[Entity], 'CalcWithMetadata']):
"""
Register a mapping from instances of another calc representation to instances of
:class:`CalcWithMetadata`.
......@@ -135,18 +137,3 @@ class CalcWithMetadata(dict, Entity):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.upload = UploadWithMetadata(kwargs['upload_id'])
def __getattr__(self, name):
if name in self:
return self[name]
else:
raise AttributeError("No such attribute: " + name)
def __setattr__(self, name, value):
self[name] = value
def __delattr__(self, name):
if name in self:
del self[name]
else:
raise AttributeError("No such attribute: " + name)
......@@ -719,7 +719,7 @@ def repo_data_to_calc_with_metadata(upload_id, calc_id, repo_data):
target.crystal_system = calc_data['repository_crystal_system']
target.XC_functional_name = calc_data['repository_xc_treatment']
target.system_type = calc_data['repository_system_type']
target.atom_species = calc_data['repository_atomic_elements']
target.atom_labels = calc_data['repository_atomic_elements']
target.space_group_number = calc_data['repository_spacegroup_nr']
target.chemical_composition = calc_data['repository_chemical_formula']
target.program_version = calc_data['repository_code_version']
......
......@@ -33,7 +33,7 @@ import time
from bravado.exception import HTTPNotFound
from datetime import datetime
from nomad import utils, config
from nomad import utils, config, infrastructure
from nomad.files import repo_data_to_calc_with_metadata
from nomad.coe_repo import User, Calc
from nomad.datamodel import CalcWithMetadata
......@@ -153,8 +153,16 @@ class NomadCOEMigration:
self.sites, self.pid_prefix = sites, pid_prefix
self.logger = utils.get_logger(__name__)
from nomad.infrastructure import repository_db
self.source = repository_db
self._client = None
self.source = infrastructure.repository_db
@property
def client(self):
if self._client is None:
from nomad.client import create_client
self._client = create_client()
return self._client
def copy_users(self, target_db):
""" Copy all users, keeping their ids, within a single transaction. """
......@@ -171,6 +179,52 @@ class NomadCOEMigration:
target_db.add(admin)
target_db.commit()
def _validate(self, upload_id: str, calc_id: str, source_calc: dict, logger) -> bool:
"""
Validates the given processed calculation, assuming that the data in the given
source_calc is correct.
Returns:
False, if the calculation differs from the source calc.
"""
repo_calc = self.client.repo.get_repo_calc(
upload_id=upload_id, calc_id=calc_id).response().result
is_valid = True
target_calc = repo_data_to_calc_with_metadata(upload_id, calc_id, repo_calc)
for key, target_value in target_calc.items():
if key in ['calc_id', 'upload_id', 'files']:
continue
source_value = source_calc.get(key, None)
def report_mismatch():
logger.info(
'source target missmatch', quantity=key,
source_value=source_value, target_value=target_value)
if (source_value is None or target_value is None) and source_value != target_value:
report_mismatch()
is_valid = False
continue
if isinstance(target_value, list):
if len(set(source_value).intersection(target_value)) != len(target_value):
report_mismatch()
is_valid = False
continue
if isinstance(source_value, str):
source_value = source_value.lower()
target_value = str(target_value).lower()
if source_value != target_value:
report_mismatch()
is_valid = False
return is_valid
def migrate(self, *args):
"""
Migrate the given uploads.
......@@ -235,16 +289,15 @@ class NomadCOEMigration:
upload_name = '%s.zip' % source_upload_id
# upload and process the upload file
from nomad.client import create_client
client = create_client()
upload = client.uploads.upload(file=upload_archive_f, name=upload_name).response().result
upload = self.client.uploads.upload(file=upload_archive_f, name=upload_name).response().result
upload_archive_f.close()
upload_logger = self.logger.bind(
source_upload_id=source_upload_id, upload_id=upload.upload_id)
while upload.tasks_running:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
upload = self.client.uploads.get_upload(upload_id=upload.upload_id).response().result
time.sleep(0.1)
if upload.tasks_status == FAILURE:
......@@ -265,16 +318,17 @@ class NomadCOEMigration:
metadata_dict[source_calc.mainfile] = source_metadata
# verify upload
total_calcs = upload.calcs.pagination.total
total_source_calcs = len(metadata_dict)
unprocessed_calcs = 0
migrated_calcs = 0
calcs_with_diffs = 0
new_calcs = 0
missing_calcs = 0
for page in range(1, math.ceil(total_calcs / 100) + 1):
upload = client.uploads.get_upload(
report = utils.POPO()
report.total_calcs = upload.calcs.pagination.total
report.total_source_calcs = len(metadata_dict)
report.unprocessed_calcs = 0
report.migrated_calcs = 0
report.calcs_with_diffs = 0
report.new_calcs = 0
report.missing_calcs = 0
for page in range(1, math.ceil(report.total_calcs / 100) + 1):
upload = self.client.uploads.get_upload(
upload_id=upload.upload_id, per_page=100, page=page,
order_by='mainfile').response().result
......@@ -284,14 +338,20 @@ class NomadCOEMigration:
mainfile=calc_proc.mainfile)
source_calc = metadata_dict.get(calc_proc.mainfile, None)
repo_calc = None
if calc_proc.tasks_status == SUCCESS:
repo_calc = client.repo.get_repo_calc(
upload_id=upload.upload_id,
calc_id=calc_proc.calc_id).response().result
if source_calc is None:
calc_logger.info('processed a calc that has no source')
report.new_calcs += 1
continue
else:
source_calc.__migrated = True
report.migrated_calcs += 1
if not self._validate(
upload.upload_id, calc_proc.calc_id, source_calc, calc_logger):
report.calcs_with_diffs += 1
else:
unprocessed_calcs += 1
report.unprocessed_calcs += 1
calc_logger.info(
'could not process a calc%s.' %
', that is in source' if source_calc is not None else '')
......@@ -301,39 +361,14 @@ class NomadCOEMigration:
continue
if source_calc is None:
calc_logger.info('processed a calc that has no source')
new_calcs += 1
continue
migrated_calcs += 1
source_calc.__migrated = True
has_diff = False
target_calc = repo_data_to_calc_with_metadata(
upload.upload_id, repo_calc['calc_id'], repo_calc)
for key, target_value in target_calc.items():
if key in ['calc_id', 'upload_id']:
continue
source_value = source_calc.get(key, None)
if source_value != target_value:
has_diff = True
calc_logger.info(
'source target missmatch', quantity=key,
source_value=source_value, target_value=target_value)
if has_diff:
calcs_with_diffs += 1
for source_calc in upload_metadata_calcs:
if source_calc.__migrated is None:
missing_calcs += 1
report.missing_calcs += 1
upload_logger.info('no match for source calc', mainfile=source_calc.mainfile)
elif source_calc.__migrated is False:
upload_logger.info('source calc not processed', mainfile=source_calc.mainfile)
# commit upload
admin_keys = ['upload_time, uploader, pid']
def transform(calcWithMetadata):
......@@ -353,30 +388,22 @@ class NomadCOEMigration:
transform(calc) for calc in upload_metadata['calculations']
if calc.__migrated]
# commit with metadata
if total_calcs > unprocessed_calcs:
upload = client.uploads.exec_upload_command(
if report.total_calcs > report.unprocessed_calcs:
upload = self.client.uploads.exec_upload_command(
upload_id=upload.upload_id,
payload=dict(command='commit', metadata=upload_metadata)
).response().result
while upload.process_running:
try:
upload = client.uploads.get_upload(upload_id=upload.upload_id).response().result
upload = self.client.uploads.get_upload(
upload_id=upload.upload_id).response().result
time.sleep(0.1)
except HTTPNotFound:
# the proc upload will be deleted by the commit command
break
# report
report = dict(
total_calcs=total_calcs,
total_source_calcs=total_source_calcs,
unprocessed_calcs=unprocessed_calcs,
migrated_calcs=migrated_calcs,
calcs_with_diffs=calcs_with_diffs,
new_calcs=new_calcs,
missing_calcs=new_calcs)
upload_logger.info('migrated upload', **report)
report.update(status=SUCCESS)
yield report
......
......@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from nomad.parsing import BadContextURI
from .normalizer import Normalizer
......@@ -22,6 +24,29 @@ class RepositoryNormalizer(Normalizer):
The normalizer that turnes normalized parse results into a set of metadata
quantities for the repository.
"""
xc_treatments = {
'gga': 'GGA',
'hf_': 'HF',
'oep': 'OEP',
'hyb': 'hybrid',
'mgga': 'meta-GGA',
'vdw': 'vdW',
'lda': 'LDA'
}
""" https://gitlab.mpcdf.mpg.de/nomad-lab/nomad-meta-info/wikis/metainfo/XC-functional """
version_re = re.compile(r'(\d+(\.\d+(\.\d+)?)?)')
def map_functional_name_to_xc_treatment(self, name):
return RepositoryNormalizer.xc_treatments.get(name[:3].lower(), name)
def simplify_version(self, version):
match = RepositoryNormalizer.version_re.search(version)
if match is None:
return version
else:
return match.group(0)
def normalize(self, logger=None) -> None:
super().normalize(logger)
b = self._backend
......@@ -38,19 +63,20 @@ class RepositoryNormalizer(Normalizer):
b.addValue('repository_checksum', b.get_value('calc_hash', 0))
b.addValue('repository_chemical_formula', b.get_value('chemical_composition_bulk_reduced', 0))
b.addValue('repository_parser_id', b.get_value('parser_name', 0))
atoms = b.get_value('atom_labels', 0)
# TODO make list unique?
b.addValue('repository_atomic_elements', atoms)
b.addValue('repository_atomic_elements_count', len(atoms))
atom_labels = b.get_value('atom_labels', 0)
b.addValue('repository_atomic_elements', list(set(atom_labels)))
b.addValue('repository_atomic_elements_count', len(atom_labels))
b.addValue('repository_basis_set_type', b.get_value('program_basis_set_type', 0))
b.addValue('repository_crystal_system', b.get_value('crystal_system', 0))
b.addValue('repository_program_name', b.get_value('program_name', 0))
# TODO shorten and normalize the code version
b.addValue('repository_code_version', b.get_value('program_version', 0))
b.addValue(
'repository_code_version',
self.simplify_version(b.get_value('program_version', 0)))
b.addValue('repository_spacegroup_nr', b.get_value('space_group_number', 0))
b.addValue('repository_system_type', b.get_value('system_type', 0))
# TODO shorten and normalize to functional type
b.addValue('repository_xc_treatment', b.get_value('XC_functional_name', 0))
b.addValue(
'repository_xc_treatment',
self.map_functional_name_to_xc_treatment(b.get_value('XC_functional_name', 0)))
b.closeNonOverlappingSection('section_repository_parserdata')
if repository_info_context is None:
......
......@@ -257,3 +257,26 @@ class archive:
def to_tuple(self, *args):
return tuple(self[arg] for arg in args)
class POPO(dict):
"""
A dict subclass that uses attributes as key/value pairs.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __getattr__(self, name):
if name in self:
return self[name]
else:
raise AttributeError("No such attribute: " + name)
def __setattr__(self, name, value):
self[name] = value
def __delattr__(self, name):
if name in self:
del self[name]
else:
raise AttributeError("No such attribute: " + name)
SET statement_timeout = 0;
SET lock_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET client_min_messages = warning;
TRUNCATE TABLE public.users CASCADE;
INSERT INTO public.users VALUES (1, 'one', 'one', 'one', 'one', NULL, NULL, NULL);
INSERT INTO public.users VALUES (2, 'two', 'two', 'two', 'two', NULL, NULL, NULL);
INSERT INTO public.calculations VALUES (NULL, NULL, NULL, NULL, 0, false, 1, NULL);
INSERT INTO public.calculations VALUES (NULL, NULL, NULL, NULL, 0, false, 2, NULL);
INSERT INTO public.codefamilies VALUES (1, 'VASP');
INSERT INTO public.codeversions VALUES (1, 1, '4.6.35');
-- topcis
INSERT INTO public.topics VALUES (1, 90, 'tetragonal');
INSERT INTO public.topics VALUES (2, 220, 'VASP');
INSERT INTO public.topics VALUES (3, 50, 'bulk');
INSERT INTO public.topics VALUES (4, 75, 'GGA');
INSERT INTO public.topics VALUES (5, 80, 'plane waves');
INSERT INTO public.topics VALUES (6, 10, 'Br');
INSERT INTO public.topics VALUES (7, 10, 'K');
INSERT INTO public.topics VALUES (8, 10, 'Si');
-- mapping topics to calcs via tags
INSERT INTO public.tags VALUES(1, 1);
INSERT INTO public.tags VALUES(2, 1);
INSERT INTO public.tags VALUES(1, 2);
INSERT INTO public.tags VALUES(2, 2);
INSERT INTO public.tags VALUES(1, 3);
INSERT INTO public.tags VALUES(2, 3);
INSERT INTO public.tags VALUES(1, 4);
INSERT INTO public.tags VALUES(2, 4);
INSERT INTO public.tags VALUES(1, 5);
INSERT INTO public.tags VALUES(2, 5);
INSERT INTO public.tags VALUES(1, 6);
INSERT INTO public.tags VALUES(2, 6);
INSERT INTO public.tags VALUES(1, 7);
INSERT INTO public.tags VALUES(2, 7);
INSERT INTO public.tags VALUES(1, 8);
INSERT INTO public.tags VALUES(2, 8);
INSERT INTO public.metadata VALUES (1, NULL, NULL, NULL, NULL, 'BrKSi2', '2019-01-01 12:00:00', NULL, decode('["$EXTRACTED/upload/1/template.json"]', 'escape'), 1, NULL);
INSERT INTO public.metadata VALUES (1, NULL, NULL, NULL, NULL, 'BrKSi2', '2015-01-01 13:00:00', NULL, decode('["$EXTRACTED/upload/2/template.json"]', 'escape'), 2, NULL);
INSERT INTO public.spacegroups VALUES (1, 123);
INSERT INTO public.spacegroups VALUES (2, 123);
INSERT INTO public.user_metadata VALUES (1, 0, 'label1');
INSERT INTO public.user_metadata VALUES (2, 1, 'label2');
INSERT INTO public.ownerships VALUES (1, 1);
INSERT INTO public.ownerships VALUES (2, 2);
\ No newline at end of file
......@@ -33,6 +33,12 @@ test_target_db_name = 'test_nomad_fair_migration_target'
@pytest.fixture(scope='module')
def source_repo(monkeysession, repository_db):
"""
Fixture for an example migration source db with:
- two user
- two calculations (1 per user)
- one calculation with all metadata (dataset, ref, comment, coauther, sharewith)
"""
try:
with repository_db_connection(dbname='postgres', with_trans=False) as con:
with con.cursor() as cursor:
......@@ -47,24 +53,14 @@ def source_repo(monkeysession, repository_db):
'CREATE SCHEMA IF NOT EXISTS public;'
'GRANT ALL ON SCHEMA public TO postgres;'
'GRANT ALL ON SCHEMA public TO public;')
sql_file = os.path.join(os.path.dirname(infrastructure.__file__), 'empty_repository_db.sql')
cur.execute(open(sql_file, 'r').read())
cur.execute(
'TRUNCATE TABLE public.users CASCADE;'
"INSERT INTO public.users VALUES (1, 'one', 'one', 'one', 'one', NULL, NULL, NULL);"
"INSERT INTO public.users VALUES (2, 'two', 'two', 'two', 'two', NULL, NULL, NULL);"
"INSERT INTO public.calculations VALUES (NULL, NULL, NULL, NULL, 0, false, 1, NULL); "
"INSERT INTO public.calculations VALUES (NULL, NULL, NULL, NULL, 0, false, 2, NULL); "
"INSERT INTO public.codefamilies VALUES (1, 'test_code'); "
"INSERT INTO public.codeversions VALUES (1, 1, 'test_version'); "
"INSERT INTO public.metadata VALUES (1, NULL, NULL, NULL, NULL, 'formula1', '2019-01-01 12:00:00', NULL, decode('[\"$EXTRACTED/upload/1/template.json\"]', 'escape'), 1, NULL); "
"INSERT INTO public.metadata VALUES (1, NULL, NULL, NULL, NULL, 'formula2', '2015-01-01 13:00:00', NULL, decode('[\"$EXTRACTED/upload/2/template.json\"]', 'escape'), 2, NULL); "
"INSERT INTO public.spacegroups VALUES (1, 255); "
"INSERT INTO public.spacegroups VALUES (2, 255); "
"INSERT INTO public.user_metadata VALUES (1, 0, 'label1'); "
"INSERT INTO public.user_metadata VALUES (2, 1, 'label2'); "
"INSERT INTO public.ownerships VALUES (1, 1); "
"INSERT INTO public.ownerships VALUES (2, 2); ")
schema_sql_file, example_data_sql_file = (
os.path.join(os.path.dirname(infrastructure.__file__), 'empty_repository_db.sql'),
os.path.join('tests', 'data', 'migration', 'example_source_db.sql'))
for sql_file in [schema_sql_file, example_data_sql_file]:
with open(sql_file, 'r') as f:
cur.execute(f.read())
with create_repository_db(monkeysession, exists=True, readonly=True, dbname=test_source_db_name) as db:
yield db
......@@ -168,6 +164,6 @@ def test_migrate(migrate_infra, upload, assertions):
assert report['total_calcs'] == 2
assert report['total_source_calcs'] == 2
assert report['migrated_calcs'] == 2
# assert report['calcs_with_diffs'] == 0 # TODO
assert report['calcs_with_diffs'] == 0
assert report['new_calcs'] == 0
assert report['missing_calcs'] == 0
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment