Commit 9545b727 authored by Daria M. Tomecka's avatar Daria M. Tomecka
Browse files

add temporary script for classification - two more metadata from aflowlib adding to calc.

parent e490979e
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Reads calculation data and classifies their structures by prototypes
on the basis of the space_group and normalized_wyckoff, and adds labels to the calculations
- prototype_label (with labels in the same format as
in the read_prototypes function).
"""
from __future__ import absolute_import
__author__ = "Daria M. Tomecka and Fawzi Mohamed"
__copyright__ = "Copyright 2017, The NOMAD Project"
__maintainer__ = "Daria M. Tomecka"
__email__ = "tomeckadm@gmail.com;"
__date__ = "12/07/17"
import sys
import ase.io
from ase.data import chemical_symbols
import json
import numpy as np
import time
import datetime
import os, os.path
import logging
import functools
import fractions
import setup_paths
### new updated location for the prototypes
from nomadcore.structure_types import structure_types_by_spacegroup as str_types_by_spg
from nomadcore.parse_streamed_dicts import ParseStreamedDicts
from nomadcore.local_meta_info import loadJsonFile, InfoKindEl
from nomadcore.parser_backend import JsonParseEventsWriterBackend
#prototypes_file = os.path.normpath("/nomad-lab-base/analysis-tools/structural-similarity/python-modules/nomad_sim/structure_types.py")
import sys
#print(sys.path)
#sys.path.append('/home/beaker/py3k/lib/python3.5/')
import spglib
import logging
#LOGGER = logging.getLogger(__name__)
#logging.basicConfig(level=logging.ERROR)
atomSpecies = None
cell = None
def get_normalized_wyckoff(atomic_number, wyckoff):
"""Returns a normalized Wyckoff sequence for the given atomic numbers and
wyckoff symbols.
"""
# print("at", atomic_number, wyckoff)
atomCount = {}
for nr in atomic_number:
atomCount[nr] = atomCount.get(nr, 0) + 1
wycDict = {}
#logging.error("atomic_number: %s, wyckoff: %s", atomic_number, wyckoff)
for i, wk in enumerate(wyckoff):
oldVal = wycDict.get(wk, {})
#print("i:",i, "wyckoff", wyckoff, "wk", wk)
nr = atomic_number[i]
oldVal[nr] = oldVal.get(nr, 0) + 1
wycDict[wk] = oldVal
sortedWyc = list(wycDict.keys())
sortedWyc.sort()
def cmpp(a, b):
return ((a < b) - (a > b))
def compareAtNr(at1, at2):
c = cmpp(atomCount[at1], atomCount[at2])
if (c != 0):
return c
for wk in sortedWyc:
p = wycDict[wk]
c = cmpp(p.get(at1, 0), p.get(at2, 0))
if c != 0:
return c
return 0
sortedAt = list(atomCount.keys())
sortedAt.sort(key=functools.cmp_to_key(compareAtNr))
standardAtomNames = {}
for i, at in enumerate(sortedAt):
standardAtomNames[at] = ("X_%d" % i)
standardWyc = {}
for wk, ats in wycDict.items():
stdAts = {}
for at, count in ats.items():
stdAts[standardAtomNames[at]] = count
standardWyc[wk] = stdAts
if standardWyc:
counts = [c for x in standardWyc.values() for c in x.values()]
# logging.error("counts: %s", counts)
gcd = counts[0]
for c in counts[1:]:
gcd = fractions.gcd(gcd,c)
if gcd != 1:
for wk,d in standardWyc.items():
for at,c in d.items():
d[at] = c // gcd
return standardWyc
def get_structure_type(space_group, norm_wyckoff):
"""Returns the information on the prototype.
"""
structure_type_info = {}
for type_description in str_types_by_spg.get((space_group), []):
current_norm_wyckoffs = type_description.get("normalized_wyckoff_spg")
if current_norm_wyckoffs and current_norm_wyckoffs == norm_wyckoff:
structure_type_info = type_description
break
if structure_type_info:
return structure_type_info
else:
return None
def _structure_type_info(self):
"""Known structure types"""
return get_structure_type(
self.space_group,
self.normalized_wyckoff)
def toAtomNr(string):
"returns the atom number of the given symbol"
baseStr = string[:3].title()
if baseStr.startswith("Uu") and baseStr in chemical_symbols[1:]:
return chemical_symbols.index(baseStr)
if baseStr[:2] in chemical_symbols[1:]:
return chemical_symbols.index(baseStr[:2])
elif baseStr[:1] in chemical_symbols[1:]:
return chemical_symbols.index(baseStr[:1])
else:
return 0
def dictToNarray(dictValue):
"""function that gets the dictionary with flat data ans shape and give the array"""
v=dictValue['flatData']
return np.reshape(np.asarray(v), dictValue['shape'])
def protoNormalizeWycoff(protoDict):
"""recalculates the normalized wyckoff values for the given prototype dictionary"""
cell = np.asarray(protoDict['lattice_vectors'])
atomSpecies = [toAtomNr(at) for at in protoDict['atom_labels']]
atomPos = np.asarray(protoDict['atom_positions'])
symm = systemToSpg(cell, atomSpecies, atomPos)
wyckoffs = symm.get("wyckoffs")
norm_wyckoff = get_normalized_wyckoff(atomSpecies,wyckoffs)
return norm_wyckoff
def updatePrototypesWyckoff(protos):
for sp, pts in protos.items():
for protoDict in pts:
try:
wy = protoNormalizeWycoff(protoDict)
protoDict['normalized_wyckoff_spg'] = wy
except:
logging.exception("Failed to compute normalized wyckoffs for %s", json.dumps(protoDict))
def systemToSpg(cell, atomSpecies, atomPos):
"""uses spg to calculate the symmetry of the given system"""
acell = cell*1.0e10
cellInv = np.linalg.inv(cell)
symm = spglib.get_symmetry_dataset((acell, np.dot(atomPos,cellInv), atomSpecies),
0.002, -1) # use m instead of Angstrom?
return symm
def classify_by_norm_wyckoff(sectionSystem):
try:
###
#atomic_number = atom_species
#as in the normalized version
cell = None
conf = sectionSystem
lab = conf.get("atom_labels", None)
if lab is None : return None
##periodicDirs = conf.get("configuration_periodic_dimensions", periodicDirs)
atomSpecies = [toAtomNr(l) for l in lab['flatData']]
#print (atomSpecies)
newCell = conf.get("simulation_cell")
if newCell is None : return None
if newCell:
cell = dictToNarray(newCell)
symm = None
#print("***full:",cell)
#acell = cell.reshape(3,3)
atomPos = dictToNarray(conf.get("atom_positions"))
if atomPos is None : return None
symm = systemToSpg(cell, atomSpecies, atomPos)
wyckoffs = symm.get("wyckoffs")
spg_nr = symm.get("number")
### adds recalculated wyckoff positions
updatePrototypesWyckoff(str_types_by_spg)
###
norm_wyckoff = get_normalized_wyckoff(atomSpecies,wyckoffs)
protoDict = get_structure_type(spg_nr, norm_wyckoff)
if protoDict is None:
proto = "%d-_" % spg_nr
else:
#if protoDict.get("Notes","") not in ["", "_", "-", "–"]:
# proto = '%d-%s' % (spg_nr, protoDict)
#else:
#proto = '%d-%s' % (spg_nr, protoDict)
proto = '%d-%s-%s' % (spg_nr, protoDict.get("Prototype","-"),protoDict.get("Pearsons Symbol","-"))
aflow_prototype_id = protoDict.get("aflow_prototype_id","-")
aflow_prototype_url = protoDict.get("aflow_prototype_url","-")
return proto, aflow_prototype_id, aflow_prototype_url
except:
#logging.exception("failure while computing for %r",json_file_name)
logging.exception("failure while computing for that example")
return None
#classify_by_norm_wyckoff(json_list)
def main():
metapath = '../../../../nomad-meta-info/meta_info/nomad_meta_info/' +\
'common.nomadmetainfo.json'
metaInfoPath = os.path.normpath(
os.path.join(os.path.dirname(os.path.abspath(__file__)), metapath))
metaInfoEnv, warns = loadJsonFile(filePath=metaInfoPath,
dependencyLoader=None,
extraArgsHandling=InfoKindEl.ADD_EXTRA_ARGS,
uri=None)
backend = JsonParseEventsWriterBackend(metaInfoEnv)
#Start
calcContext=sys.argv[1]
backend.startedParsingSession(
calcContext,
parserInfo = {'name':'PrototypesNormalizer', 'version': '1.0'})
dictReader = ParseStreamedDicts(sys.stdin)
while True:
sectSys = dictReader.readNextDict()
if sectSys is None:
break
if "aflow_prototype_id" in sectSys:
backend.addValue("prototype_aflow_id",sectSys["aflow_prototype_id"])
if "aflow_prototype_url" in sectSys:
backend.addValue("prototype_aflow_url",sectSys["aflow_prototype_url"])
try:
label = classify_by_norm_wyckoff(sectSys)
if label:
backend.openContext(sectSys['uri'])
backend.addValue("prototype_label", label)
# backend.addValue("prototype_aflow_id", aflow_prototype_id)
# backend.addValue("prototype_aflow_url", aflow_prototype_url)
# if "aflow_prototype_url" in sectSys:
# backend.addValue("prototype_aflow_url",sectSys["aflow_prototype_url"])
backend.closeContext(sectSys['uri'])
sys.stdout.flush()
except:
logging.exception("exception trying to calculate prototype for %s",sectSys)
backend.finishedParsingSession("ParseSuccess", None)
sys.stdout.flush()
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment