Skip to content
Snippets Groups Projects
topology.py 27.12 KiB
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.'
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Dict, List, Optional, Union
from collections import defaultdict
import pathlib
import json
from math import isnan

from ase import Atoms
from ase.data import chemical_symbols
import numpy as np
from matid.clustering import SBC, Cluster
from matid.symmetry.symmetryanalyzer import SymmetryAnalyzer
from matid.classification.classifications import (
    Class0D,
    Atom,
    Class1D,
    Class2D,
    Material2D,
    Surface,
    Class3D,
)

from nomad import utils
from nomad.config import config
from nomad import atomutils
from nomad.datamodel.results import (
    CoreHole,
    SymmetryNew as Symmetry,
    Material,
    System,
    Relation,
    structure_name_map,
)
from nomad.datamodel.datamodel import EntryArchive
from nomad.normalizing.common import (
    cell_from_ase_atoms,
    ase_atoms_from_nomad_atoms,
    nomad_atoms_from_ase_atoms,
    wyckoff_sets_from_matid,
    structures_2d,
    material_id_bulk,
    material_id_2d,
    material_id_1d,
)

conventional_description = 'The conventional cell of the material from which the subsystem is constructed from.'
subsystem_description = 'Automatically detected subsystem.'
chemical_symbols = np.array(chemical_symbols)
with open(pathlib.Path(__file__).parent / 'data/top_50k_material_ids.json', 'r') as fin:
    top_50k_material_ids = json.load(fin)


def get_topology_id(index: int) -> str:
    """Retuns a valid topology identifier with the given index.
    Args:
        index: The index of the topology. Must be unique.

    Returns:
        An identifier string that can be stored in topology.system_id.
    """
    return f'results/material/topology/{index}'


def get_topology_original(atoms=None, archive: EntryArchive = None) -> System:
    """
    Creates a new topology item for the original structure.
    """
    dimensionality = None
    try:
        classification = archive.run[0].m_cache['classification']
    except Exception:
        pass
    else:
        dimensionality_map = {
            Class0D: 0,
            Atom: 0,
            Class1D: 1,
            Class2D: 2,
            Surface: 2,
            Material2D: 2,
            Class3D: 3,
        }
        dimension = dimensionality_map.get(classification)
        if dimension is not None:
            dimensionality = f'{dimension}D'

    original = System(
        method='parser',
        label='original',
        description='A representative system chosen from the original simulation.',
        dimensionality=dimensionality,
        system_relation=Relation(type='root'),
        atoms_ref=atoms,
    )

    return original


def add_system_info(
    system: System,
    topologies: Dict[str, System],
    masses: Union[List[float], Dict[str, float]] = None,
) -> None:
    """Given a system with minimal information, attempts to add all values than
    can be derived.
    """

    def get_atoms(system):
        """Resolves the atoms that the system is constructed from."""
        if system.atoms:
            return system.atoms
        if system.atoms_ref:
            return system.atoms_ref
        if system.indices is not None:
            return get_atoms(topologies[system.parent_system])

    atoms = get_atoms(system)
    if atoms:
        if system.cell is None:
            if system.atoms or system.atoms_ref:
                ase_atoms = ase_atoms_from_nomad_atoms(atoms)
                system.cell = cell_from_ase_atoms(ase_atoms, masses, atoms.labels)
        atomic_numbers = (
            atoms.species if atoms.species is not None else atoms.atomic_numbers
        )
        symbols = chemical_symbols[atomic_numbers]

        if system.indices is not None:
            system.atoms_ref = atoms
            total_atoms = len(atomic_numbers)
            atomic_numbers = atomic_numbers if not masses else None
            sub_mass = atomutils.get_summed_mass(
                atomic_numbers=atomic_numbers,
                masses=masses,
                indices=system.indices[0],
                atom_labels=atoms.labels,
            )
            total_mass = atomutils.get_summed_mass(
                atomic_numbers=atomic_numbers,
                masses=masses,
                indices=[],
                atom_labels=atoms.labels,
            )
            if sub_mass and total_mass:
                system.mass_fraction = sub_mass / total_mass

            symbols = symbols[system.indices[0]]
            system.atomic_fraction = len(symbols) / total_atoms
        n_atoms = len(symbols)
        if system.n_atoms is None:
            system.n_atoms = n_atoms
        try:
            formula = atomutils.Formula(''.join(symbols))
        except Exception:
            pass
        else:
            formula.populate(system, descriptive_format='descriptive')


def add_system(
    system: System, topologies: Dict[str, System], parent: Optional[System] = None
) -> None:
    """Adds the given system to the topology."""
    index = len(topologies)
    system.system_id = get_topology_id(index)
    if parent:
        children = parent.child_systems if parent.child_systems else []
        children.append(system.system_id)
        if parent.child_systems is not children:
            parent.child_systems = children
        system.parent_system = parent.system_id
    topologies[system.system_id] = system


class TopologyNormalizer:
    """Handles the creation of topology information."""

    def __init__(self, entry_archive, repr_system, repr_symmetry, conv_atoms, logger):
        self.entry_archive = entry_archive
        self.repr_system = repr_system
        self.repr_symmetry = repr_symmetry
        self.structural_type = None
        self.conv_atoms = conv_atoms
        self.logger = logger
        self.masses = atomutils.get_masses_from_computational_model(
            entry_archive, repr_system=repr_system
        )

    def topology(self, material) -> Optional[List[System]]:
        """Returns a dictionary that contains all of the topologies mapped by id."""
        # If topology already exists (e.g. written by another normalizer), do
        # not overwrite it.
        topology = self.entry_archive.m_xpath('results.material.topology')
        if topology:
            return None
        # Next use the topology from the calculation
        topology_calc = self.topology_calculation()
        if topology_calc:
            return topology_calc
        # Finally if no other topology exists, try creating one with MatID
        with utils.timer(self.logger, 'calculating topology with matid'):
            topology_matid = self.topology_matid(material)
        if topology_matid:
            return topology_matid

        return None

    def topology_calculation(self) -> Optional[List[System]]:
        """Extracts the system topology as defined in the original calculation.
        This topology typically comes from e.g. classical force fields that
        define a topology for the system.
        """
        try:
            groups = self.entry_archive.run[0].system[0].atoms_group
            if len(groups) == 0:
                return None
        except Exception:
            return None
        try:
            atoms = self.repr_system.atoms
        except Exception:
            atoms = None
        if atoms is None:
            return None
        elif atoms.positions is None or len(atoms.positions) == 0:
            return None
        elif (atoms.species is None or len(atoms.species) == 0) and (
            atoms.atomic_numbers is None or len(atoms.atomic_numbers) == 0
        ):
            return None

        topology: Dict[str, System] = {}
        original = get_topology_original(atoms, self.entry_archive)
        original.atoms_ref = atoms
        add_system(original, topology)
        label_to_indices: Dict[str, list] = defaultdict(list)

        def add_group(groups, parent=None):
            if not groups:
                return
            for group in groups:
                label = group.label
                # Groups with the same label are mapped to the same system.  # TODO: change this logic for active orbitals
                old_labels = label_to_indices[label]
                instance_indices = group.atom_indices
                if not len(old_labels):
                    description_map = {
                        'molecule': 'Molecule extracted from the calculation topology.',
                        'molecule_group': 'Group of molecules extracted from the calculation topology.',
                        'monomer_group': 'Group of monomers extracted from the calculation topology.',
                        'monomer': 'Monomer extracted from the calculation topology.',
                        'active_orbitals': 'Orbitals targeted by the calculation.',
                    }
                    structural_type_map = {
                        'active_orbitals': 'active orbitals',
                        'molecule': 'molecule',
                        'molecule_group': 'group',
                        'monomer': 'monomer',
                        'monomer_group': 'group',
                    }  # determines the sub-title
                    building_block_map = {
                        'molecule': 'molecule',
                        'monomer': 'monomer',
                    }
                    relation_map = {
                        'active_orbitals': 'group',
                        'molecule': 'subsystem',
                        'molecule_group': 'group',
                        'monomer': 'subsystem',
                        'monomer_group': 'group',
                    }  # determines the shading of non-highlighted atoms
                    system = System(
                        method='parser',
                        description=description_map.get(group.type),
                        label=group.label,
                        structural_type=structural_type_map.get(group.type),
                        building_block=building_block_map.get(group.type),
                        system_relation=Relation(type=relation_map.get(group.type)),
                    )
                    add_system(system, topology, parent)
                    add_group(group.atoms_group, system)
                    old_labels.append(instance_indices)
                else:
                    if len(old_labels[0]) == len(instance_indices):
                        old_labels.append(instance_indices)
                    else:
                        self.logger.warn(
                            (
                                'the topology contains entries with the same label but with '
                                'different number of atoms'
                            )
                        )

        add_group(groups, original)
        active_orbital_states = self._extract_orbital()

        # Add the derived system information once all indices etc. are gathered.
        for top in topology.values():
            top.indices = label_to_indices.get(top.label)
            add_system_info(top, topology, masses=self.masses)
            if top.structural_type == 'active orbitals':
                try:
                    top.active_orbitals = active_orbital_states[0]
                    active_orbital_states.pop(0)
                except (
                    IndexError
                ):  # FIXME temporary fix to prevent output from projection parsers
                    self.logger.warn(
                        'Cannot assign all active orbital states to the topology.'
                    )

        return list(topology.values())

    def topology_matid(self, material: Material) -> Optional[List[System]]:
        """
        Returns a list of systems that have been identified with MatID.
        """
        # See if a system is available
        try:
            nomad_atoms = self.repr_system.atoms
            atoms = ase_atoms_from_nomad_atoms(nomad_atoms)
        except Exception:
            return None
        if not atoms or len(atoms) == 0:
            return None

        # Create topology for the original system
        topology: Dict[str, System] = {}
        original = get_topology_original(nomad_atoms, self.entry_archive)
        original.atoms_ref = nomad_atoms
        add_system(original, topology)
        add_system_info(original, topology, masses=self.masses)

        # Since we still need to run the old classification code
        # (matid.classification.classify), we use it's results to populate the
        # topology for bulk and 1D systems. Also the new clustering cannot
        # currently be run for systems without a cell. In other cases we run the
        # new classification code (matid.clustering.clusterer).
        n_atoms = len(atoms)
        cell = atoms.get_cell()
        if material.structural_type == 'bulk':
            self._topology_bulk(original, topology)
        elif material.structural_type == '1D':
            self._topology_1d(original, topology)
        # Continue creating topology if system size is not too large
        elif n_atoms <= config.normalize.clustering_size_limit:
            # Add all meaningful clusters to the topology
            sbc = SBC()
            try:
                clusters = sbc.get_clusters(atoms, pos_tol=0.8)
            except Exception as e:
                self.logger.warning(
                    'issue in matid clustering',
                    exc_info=e,
                    error=str(e),
                )
            else:
                for cluster in clusters:
                    subsystem = self._create_subsystem(cluster)
                    if not subsystem:
                        continue
                    structural_type = subsystem.structural_type
                    # If the found cell has many basis atoms, it is more likely that
                    # some of the symmetries were not correctly found than the cell
                    # actually being very complicated. Thus we ignore these clusters to
                    # minimize false-positive and to limit the time spent on symmetry
                    # calculation.
                    cell = cluster.get_cell()
                    if len(cell) > 8:
                        self.logger.info(
                            f'cell with many atoms ({len(cell)}) was ignored'
                        )
                        continue
                    try:
                        conventional_cell = self._create_conv_cell_system(
                            cluster, structural_type
                        )
                    except Exception as e:
                        self.logger.error(
                            'conventional cell information could not be created',
                            exc_info=e,
                            error=str(e),
                        )
                        continue
                    # We only accept the subsystem if the material id exists in the top
                    # 50k materials with most entries attached to them. This ensures
                    # that the material_id link points to valid materials and that we
                    # don't report anything too weird. The top 50k materials are
                    # pre-stored in a pickle file that has been created by using ES
                    # terms aggregation.
                    if conventional_cell.material_id in top_50k_material_ids:
                        add_system(subsystem, topology, original)
                        add_system_info(subsystem, topology, masses=self.masses)
                        add_system(conventional_cell, topology, subsystem)
                        add_system_info(
                            conventional_cell,
                            topology,
                            masses=self.masses
                            if isinstance(self.masses, Dict)
                            else None,
                        )
                    else:
                        self.logger.info(
                            f'material_id {conventional_cell.material_id} could not be verified'
                        )

        return list(topology.values())

    def _topology_bulk(self, original, topology) -> None:
        """Creates a topology for bulk structures as detected by the old matid
        classification."""
        if self.conv_atoms is None:
            return None

        # Subsystem
        subsystem = System(
            method='matid',
            label='subsystem',
            dimensionality='3D',
            structural_type='bulk',
            description=subsystem_description,
            system_relation=Relation(type='subsystem'),
            indices=[list(range(original.n_atoms))],
        )
        add_system(subsystem, topology, original)
        add_system_info(subsystem, topology, masses=self.masses)

        # Conventional system
        conv_system = System(
            method='matid',
            label='conventional cell',
            system_relation=Relation(type='conventional_cell'),
            dimensionality='3D',
            structural_type='bulk',
            description=conventional_description,
        )
        conv_system.atoms = nomad_atoms_from_ase_atoms(self.conv_atoms)
        symmetry_analyzer = self.repr_symmetry.m_cache.get('symmetry_analyzer')
        conv_system.symmetry = self._create_symmetry(symmetry_analyzer)
        conv_system.cell = cell_from_ase_atoms(
            self.conv_atoms, masses=self.masses, atom_labels=None
        )
        conv_system.material_id = material_id_bulk(
            symmetry_analyzer.get_space_group_number(),
            symmetry_analyzer.get_wyckoff_sets_conventional(),
        )
        add_system(conv_system, topology, subsystem)
        add_system_info(conv_system, topology, masses=self.masses)

    def _topology_1d(self, original, topology):
        """Creates a topology for 1D structures as detected by the old matid
        classification."""
        if self.conv_atoms is None:
            return None

        # Subsystem
        subsystem = System(
            method='matid',
            label='subsystem',
            dimensionality='1D',
            structural_type='1D',
            description=subsystem_description,
            system_relation=Relation(type='subsystem'),
            indices=[list(range(original.n_atoms))],
        )
        add_system(subsystem, topology, original)
        add_system_info(subsystem, topology, masses=self.masses)

        # Conventional system
        conv_system = System(
            method='matid',
            label='conventional cell',
            system_relation=Relation(type='conventional_cell'),
            dimensionality='1D',
            structural_type='1D',
        )
        conv_system.atoms = nomad_atoms_from_ase_atoms(self.conv_atoms)
        conv_system.cell = cell_from_ase_atoms(
            self.conv_atoms, masses=self.masses, atom_labels=None
        )

        # The lattice parameters that are not well defined for 1D structures are unset
        conv_system.cell.b = None
        conv_system.cell.c = None
        conv_system.cell.alpha = None
        conv_system.cell.beta = None
        conv_system.cell.gamma = None
        conv_system.cell.atomic_density = None
        conv_system.cell.mass_density = None
        conv_system.cell.volume = None

        conv_system.material_id = material_id_1d(self.conv_atoms)
        add_system(conv_system, topology, subsystem)
        add_system_info(conv_system, topology, masses=self.masses)

    def _create_subsystem(self, cluster: Cluster) -> Optional[System]:
        """
        Creates a new subsystem as detected by MatID.
        """
        try:
            dimensionality = cluster.get_dimensionality()
            cell = cluster.get_cell()
            n_repeated_directions = sum(cell.get_pbc())
        except Exception as e:
            self.logger.error(
                'matid cluster classification failed', exc_info=e, error=str(e)
            )
            return None
        structural_type = None
        building_block = None
        if dimensionality == 3:
            structural_type = 'bulk'
        elif dimensionality == 2:
            if n_repeated_directions == 2:
                structural_type = '2D'
                building_block = '2D material'
            elif n_repeated_directions == 3:
                structural_type = 'surface'
                building_block = 'surface'
        if not structural_type:
            return None

        subsystem = System(
            method='matid',
            label='subsystem',
            description=subsystem_description,
            system_relation=Relation(type='subsystem'),
            indices=[list(cluster.indices)],
        )

        subsystem.dimensionality = f'{dimensionality}D'
        subsystem.structural_type = structural_type
        subsystem.building_block = building_block

        return subsystem

    def _create_conv_cell_system(self, cluster: Cluster, structural_type: str):
        """
        Creates a new topology item for a conventional cell.
        """
        symmsystem = System(
            method='matid',
            label='conventional cell',
            system_relation=Relation(type='conventional_cell'),
        )
        if structural_type == '2D':
            self._add_conventional_2d(cluster, symmsystem)
        else:
            self._add_conventional_bulk(cluster, symmsystem)
        symmsystem.description = conventional_description

        return symmsystem

    def _add_conventional_bulk(self, cluster: Cluster, subsystem: System) -> None:
        """
        Creates the subsystem with the symmetry information of the conventional cell
        """
        cell = cluster.get_cell()
        # A big tolerance is used here to allow deviations from exact symmetry
        symm = SymmetryAnalyzer(cell, 1.0)
        conv_system = symm.get_conventional_system()
        subsystem.atoms = nomad_atoms_from_ase_atoms(conv_system)
        spg_number = symm.get_space_group_number()
        subsystem.cell = cell_from_ase_atoms(
            conv_system, masses=self.masses, atom_labels=None
        )
        symmetry = self._create_symmetry(symm)
        wyckoff_sets = symm.get_wyckoff_sets_conventional()
        material_id = material_id_bulk(spg_number, wyckoff_sets)
        subsystem.structural_type = 'bulk'
        subsystem.dimensionality = '3D'
        subsystem.material_id = material_id
        subsystem.symmetry = symmetry

    def _add_conventional_2d(self, cluster: Cluster, subsystem: System) -> None:
        """
        Creates the subsystem with the symmetry information of the conventional cell.
        """
        cell = cluster.get_cell()
        conv_atoms, _, wyckoff_sets, spg_number = structures_2d(cell)
        subsystem.cell = cell_from_ase_atoms(
            conv_atoms, masses=self.masses, atom_labels=None
        )
        subsystem.atoms = nomad_atoms_from_ase_atoms(conv_atoms)

        # Here we zero out the irrelevant lattice parameters to correctly handle
        # 2D systems with nonzero thickness (e.g. MoS2).
        subsystem.cell.c = None
        subsystem.cell.alpha = None
        subsystem.cell.beta = None
        subsystem.cell.atomic_density = None
        subsystem.cell.mass_density = None
        subsystem.cell.volume = None

        subsystem.structural_type = '2D'
        subsystem.dimensionality = '2D'
        subsystem.building_block = '2D material'
        subsystem.material_id = material_id_2d(spg_number, wyckoff_sets)

    def _create_symmetry(self, symm: SymmetryAnalyzer) -> Symmetry:
        international_short = symm.get_space_group_international_short()
        conv_system = symm.get_conventional_system()

        sec_symmetry = Symmetry()
        sec_symmetry.symmetry_method = 'MatID'
        sec_symmetry.space_group_number = symm.get_space_group_number()
        sec_symmetry.space_group_symbol = international_short
        sec_symmetry.hall_number = symm.get_hall_number()
        sec_symmetry.hall_symbol = symm.get_hall_symbol()
        sec_symmetry.point_group = symm.get_point_group()
        sec_symmetry.crystal_system = symm.get_crystal_system()
        sec_symmetry.bravais_lattice = symm.get_bravais_lattice()
        sec_symmetry.origin_shift = symm._get_spglib_origin_shift()
        sec_symmetry.transformation_matrix = symm._get_spglib_transformation_matrix()
        sec_symmetry.wyckoff_sets = wyckoff_sets_from_matid(
            symm.get_wyckoff_sets_conventional()
        )

        spg_number = symm.get_space_group_number()
        atom_species = conv_system.get_atomic_numbers()
        if type(conv_system) is Atoms or conv_system.wyckoff_letters is None:
            wyckoffs = symm.get_wyckoff_letters_conventional()
        else:
            wyckoffs = conv_system.wyckoff_letters
        norm_wyckoff = atomutils.get_normalized_wyckoff(atom_species, wyckoffs)
        protoDict = atomutils.search_aflow_prototype(spg_number, norm_wyckoff)

        if protoDict is not None:
            sec_symmetry.prototype_label_aflow = protoDict.get('aflow_prototype_id')
            sec_symmetry.prototype_name = structure_name_map.get(protoDict.get('Notes'))

        return sec_symmetry

    def _extract_orbital(self) -> list[CoreHole]:
        """
        Gather atomic orbitals from `run.method.atom_parameters.core_hole`.
        Also apply normalization in the process.
        """
        # Collect the active orbitals from method
        methods = self.entry_archive.run[-1].method
        if not methods:
            return []
        atom_params = getattr(methods[-1], 'atom_parameters', [])
        active_orbitals_run = [
            param.core_hole for param in atom_params if param.core_hole is not None
        ]
        if (
            len(active_orbitals_run) > 1
        ):  # FIXME: currently only one set of active orbitals is supported, remove for multiple
            self.logger.warn(
                """Multiple sets of active orbitals found.
                Currently, the topology only supports 1, so only the first set is used."""
            )
        # Map the active orbitals to the topology
        active_orbitals_results: list[CoreHole] = []
        for param in getattr(methods[-1], 'atom_parameters', []):
            if (active_orbitals := getattr(param, 'core_hole')) is not None:
                active_orbitals.normalize(
                    EntryArchive(), None
                )  # leave out `archive` and `logger`, to keep normalization local
                active_orbitals_new = CoreHole()
                for quantity_name in active_orbitals.quantities:
                    setattr(
                        active_orbitals_new,
                        quantity_name,
                        getattr(active_orbitals, quantity_name),
                    )
                active_orbitals_new.normalize(None, None)
                active_orbitals_results.append(active_orbitals_new)
                break  # FIXME: currently only one set of active orbitals is supported, remove for multiple
        return active_orbitals_results