Commit 754b00dd authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'v0.7.3' of gitlab.mpcdf.mpg.de:nomad-lab/nomad-FAIR into v0.7.3

parents 01c7f304 841f88fc
Pipeline #68271 failed with stages
in 14 minutes and 47 seconds
......@@ -87,53 +87,56 @@ class SystemBasedNormalizer(Normalizer, metaclass=ABCMeta):
""" Normalize the given section and returns True, iff successful"""
pass
def __representative_systems(self):
# look for sccs in last frames
sccs = []
try:
frame_seqs = self._backend.get_sections(s_frame_sequence)
except Exception:
frame_seqs = []
def __representative_system(self):
"""Used to select a representative system for this entry.
for frame_seq in frame_seqs:
try:
frames = self._backend.get_value(r_frame_sequence_local_frames, frame_seq)
except Exception:
frames = []
if len(frames) > 0:
sccs.append(frames[-1])
# no sccs from frames -> consider all sccs
if len(sccs) == 0:
try:
sccs = self._backend.get_sections(s_scc)
except Exception:
sccs = []
Attempt to find a single section_system that is representative for the
entry. The selection depends on the type of calculation.
"""
system_idx = None
# Try to find a frame sequence, only first found is considered
try:
systems = [self._backend.get_value(r_scc_to_system, scc) for scc in sccs]
frame_seqs = self._backend[s_frame_sequence]
frame_seq = frame_seqs[0]
sampling_method_idx = frame_seq["frame_sequence_to_sampling_ref"]
sec_sampling_method = self._backend["section_sampling_method"][sampling_method_idx]
sampling_method = sec_sampling_method["sampling_method"]
frames = frame_seq["frame_sequence_local_frames_ref"]
if sampling_method == "molecular_dynamics":
scc_idx = frames[0]
else:
scc_idx = frames[-1]
scc = self._backend[s_scc][scc_idx]
system_idx = scc["single_configuration_calculation_to_system_ref"]
except Exception:
systems = []
frame_seqs = []
# only take the first, and last two systems
if len(systems) == 0:
# If no frame sequences detected, try to find scc
if len(frame_seqs) == 0:
try:
systems = self._backend.get_sections(s_system)
sccs = self._backend[s_scc]
scc = sccs[-1]
system_idx = scc["single_configuration_calculation_to_system_ref"]
except Exception:
systems = []
if len(systems) > 2:
systems = [systems[0], systems[-2], systems[-1]]
sccs = []
if len(systems) == 0:
self.logger.error('no "representative" section system found')
# If no sccs exist, try to find systems
if len(sccs) == 0:
try:
systems = self._backend.get_sections(s_system)
system_idx = systems[-1]
except Exception:
sccs = []
self.logger.info(
'chose "representative" systems for normalization',
number_of_systems=len(systems))
if system_idx is None:
self.logger.error('no "representative" section system found')
else:
self.logger.info(
'chose "representative" system for normalization',
)
return systems
return system_idx
def __normalize_system(self, g_index, representative, logger=None) -> bool:
try:
......@@ -154,18 +157,15 @@ class SystemBasedNormalizer(Normalizer, metaclass=ABCMeta):
def normalize(self, logger=None) -> None:
super().normalize(logger)
representative_systems = set(self.__representative_systems())
all_systems = self._backend.get_sections(s_system)
has_representative = False
for g_index in representative_systems:
has_representative = has_representative or self.__normalize_system(g_index, True, logger)
# Process representative system first
representative_system_idx = self.__representative_system()
if representative_system_idx is not None:
self.__normalize_system(representative_system_idx, True, logger)
# all the rest or until first representative depending on configuration
if not self.only_representatives or not has_representative:
# All the rest if requested
if not self.only_representatives:
for g_index in all_systems:
if g_index not in representative_systems:
if self.__normalize_system(g_index, not has_representative, logger):
has_representative = True
if self.only_representatives:
break
if g_index != representative_system_idx:
self.__normalize_system(g_index, False, logger)
......@@ -413,6 +413,9 @@ class LocalBackend(LegacyParserBackend, metaclass=DelegatingMeta):
self._open_context: Tuple[str, int] = None
self._context_section = None
def __getitem__(self, metaname):
return self.data[metaname]
def __getattr__(self, name):
""" Support for unimplemented and unexpected methods. """
if name not in self._known_attributes and self._unknown_attributes.get(name) is None:
......
......@@ -122,6 +122,34 @@ def run_normalize_for_structure(atoms: Atoms) -> LocalBackend:
return run_normalize(template)
@pytest.fixture(scope='session')
def single_point(two_d) -> LocalBackend:
return two_d
@pytest.fixture(scope='session')
def geometry_optimization() -> LocalBackend:
parser_name = "parsers/template"
filepath = "tests/data/normalizers/fcc_crystal_structure.json"
backend = parse_file((parser_name, filepath))
backend = run_normalize(backend)
return backend
@pytest.fixture(scope='session')
def molecular_dynamics(bulk) -> LocalBackend:
return bulk
@pytest.fixture(scope='session')
def phonon() -> LocalBackend:
parser_name = "parsers/phonopy"
filepath = "tests/data/parsers/phonopy/phonopy-FHI-aims-displacement-01/control.in"
backend = parse_file((parser_name, filepath))
backend = run_normalize(backend)
return backend
@pytest.fixture(scope='session')
def bulk() -> LocalBackend:
parser_name = "parsers/cp2k"
......@@ -255,17 +283,54 @@ def test_symmetry_classification_fcc():
def test_system_classification(atom, molecule, one_d, two_d, surface, bulk):
# Atom
assert atom.get_value('system_type') == "atom"
assert atom['system_type'] == "atom"
# Molecule / cluster
assert molecule.get_value('system_type') == "molecule / cluster"
assert molecule['system_type'] == "molecule / cluster"
# 1D
assert one_d.get_value('system_type') == "1D"
assert one_d['system_type'] == "1D"
# 2D
assert two_d.get_value('system_type') == "2D"
assert two_d['system_type'] == "2D"
# Surface
assert surface.get_value('system_type') == "surface"
assert surface['system_type'] == "surface"
# Bulk
assert bulk.get_value('system_type') == "bulk"
assert bulk['system_type'] == "bulk"
def test_representative_systems(single_point, molecular_dynamics, geometry_optimization, phonon):
"""Checks that the representative systems are correctly identified and
processed by SystemNormalizer.
"""
def check_representative_frames(backend):
# For systems with multiple frames the first and two last should be processed.
try:
frames = backend["frame_sequence_local_frames_ref"]
except KeyError:
sccs = backend["section_single_configuration_calculation"]
scc = sccs[-1]
repr_system_idx = scc["single_configuration_calculation_to_system_ref"]
else:
sampling_method = backend["sampling_method"]
if sampling_method == "molecular_dynamics":
idx = 0
else:
idx = -1
scc_idx = frames[idx]
scc = backend["section_single_configuration_calculation"][scc_idx]
repr_system_idx = scc["single_configuration_calculation_to_system_ref"]
# Check that only the representative system has been labels with
# "is_representative"
for i, system in enumerate(backend["section_system"]):
if i == repr_system_idx:
assert system["is_representative"] is True
else:
with pytest.raises(KeyError):
system["is_representative"]
check_representative_frames(single_point)
check_representative_frames(molecular_dynamics)
check_representative_frames(geometry_optimization)
check_representative_frames(phonon)
def test_reduced_chemical_formula():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment