Commit fb2a7938 authored by Mohamed, Fawzi Roberto (fawzi)'s avatar Mohamed, Fawzi Roberto (fawzi)
Browse files

skeleton of the band structure normalizer

parent 07a29445
import setup_paths
import json
from numpy import *
import os.path, glob
from nomadcore.local_meta_info import loadJsonFile, InfoKindEl
from nomadcore.parser_backend import JsonParseEventsWriterBackend
from nomadcore.parse_streamed_dicts import *
import sys
import logging
import sys, getopt
def crystal_structure_from_cell(cell, eps=1e-4):
"""Return the crystal structure as a string calculated from the cell.
"""
cellpar = ase.geometry.cell_to_cellpar(cell=cell)
abc = cellpar[:3]
angles = cellpar[3:] / 180 * pi
a, b, c = abc
alpha, beta, gamma = angles
if abc.ptp() < eps and abs(angles - pi / 2).max() < eps:
return 'cubic'
elif abc.ptp() < eps and abs(angles - pi / 3).max() < eps:
return 'fcc'
elif abc.ptp() < eps and abs(angles - np.arccos(-1 / 3)).max() < eps:
return 'bcc'
elif abs(a - b) < eps and abs(angles - pi / 2).max() < eps:
return 'tetragonal'
elif abs(angles - pi / 2).max() < eps:
return 'orthorhombic'
elif (abs(a - b) < eps and
abs(gamma - pi / 3 * 2) < eps and
abs(angles[:2] - pi / 2).max() < eps):
return 'hexagonal'
elif (c >= a and c >= b and alpha < pi / 2 and
abs(angles[1:] - pi / 2).max() < eps):
return 'monoclinic'
else:
raise ValueError('Cannot find crystal structure')
special_points = {
'cubic': {'Γ': [0, 0, 0],
'M': [1 / 2, 1 / 2, 0],
'R': [1 / 2, 1 / 2, 1 / 2],
'X': [0, 1 / 2, 0]},
'fcc': {'Γ': [0, 0, 0],
'K': [3 / 8, 3 / 8, 3 / 4],
'L': [1 / 2, 1 / 2, 1 / 2],
'U': [5 / 8, 1 / 4, 5 / 8],
'W': [1 / 2, 1 / 4, 3 / 4],
'X': [1 / 2, 0, 1 / 2]},
'bcc': {'Γ': [0, 0, 0],
'H': [1 / 2, -1 / 2, 1 / 2],
'P': [1 / 4, 1 / 4, 1 / 4],
'N': [0, 0, 1 / 2]},
'tetragonal': {'Γ': [0, 0, 0],
'A': [1 / 2, 1 / 2, 1 / 2],
'M': [1 / 2, 1 / 2, 0],
'R': [0, 1 / 2, 1 / 2],
'X': [0, 1 / 2, 0],
'Z': [0, 0, 1 / 2]},
'orthorhombic': {'Γ': [0, 0, 0],
'R': [1 / 2, 1 / 2, 1 / 2],
'S': [1 / 2, 1 / 2, 0],
'T': [0, 1 / 2, 1 / 2],
'U': [1 / 2, 0, 1 / 2],
'X': [1 / 2, 0, 0],
'Y': [0, 1 / 2, 0],
'Z': [0, 0, 1 / 2]},
'hexagonal': {'Γ': [0, 0, 0],
'A': [0, 0, 1 / 2],
'H': [1 / 3, 1 / 3, 1 / 2],
'K': [1 / 3, 1 / 3, 0],
'L': [1 / 2, 0, 1 / 2],
'M': [1 / 2, 0, 0]}}
special_paths = {
'cubic': 'ΓXMΓRX,MR',
'fcc': 'ΓXWKΓLUWLK,UX',
'bcc': 'ΓHNΓPH,PN',
'tetragonal': 'ΓXMΓZRAZXR,MA',
'orthorhombic': 'ΓXSYΓZURTZ,YT,UX,SR',
'hexagonal': 'ΓMKΓALHA,LM,KH',
'monoclinic': 'ΓYHCEM1AXH1,MDZ,YD'}
def get_special_points(cell, eps=1e-4):
"""Return dict of special points.
The definitions are from a paper by Wahyu Setyawana and Stefano
Curtarolo::
http://dx.doi.org/10.1016/j.commatsci.2010.05.010
lattice: str
One of the following: cubic, fcc, bcc, orthorhombic, tetragonal,
hexagonal or monoclinic.
cell: 3x3 ndarray
Unit cell.
eps: float
Tolerance for cell-check.
"""
lattice = crystal_structure_from_cell(cell)
cellpar = ase.geometry.cell_to_cellpar(cell=cell)
abc = cellpar[:3]
angles = cellpar[3:] / 180 * pi
a, b, c = abc
alpha, beta, gamma = angles
# Check that the unit-cells are as in the Setyawana-Curtarolo paper:
if lattice == 'cubic':
assert abc.ptp() < eps and abs(angles - pi / 2).max() < eps
elif lattice == 'fcc':
assert abc.ptp() < eps and abs(angles - pi / 3).max() < eps
elif lattice == 'bcc':
angle = np.arccos(-1 / 3)
assert abc.ptp() < eps and abs(angles - angle).max() < eps
elif lattice == 'tetragonal':
assert abs(a - b) < eps and abs(angles - pi / 2).max() < eps
elif lattice == 'orthorhombic':
assert abs(angles - pi / 2).max() < eps
elif lattice == 'hexagonal':
assert abs(a - b) < eps
assert abs(gamma - pi / 3 * 2) < eps
assert abs(angles[:2] - pi / 2).max() < eps
elif lattice == 'monoclinic':
assert c >= a and c >= b
assert alpha < pi / 2 and abs(angles[1:] - pi / 2).max() < eps
if lattice != 'monoclinic':
return special_points[lattice]
# Here, we need the cell:
eta = (1 - b * cos(alpha) / c) / (2 * sin(alpha)**2)
nu = 1 / 2 - eta * c * cos(alpha) / b
return {'Γ': [0, 0, 0],
'A': [1 / 2, 1 / 2, 0],
'C': [0, 1 / 2, 1 / 2],
'D': [1 / 2, 0, 1 / 2],
'D1': [1 / 2, 0, -1 / 2],
'E': [1 / 2, 1 / 2, 1 / 2],
'H': [0, eta, 1 - nu],
'H1': [0, 1 - eta, nu],
'H2': [0, eta, -nu],
'M': [1 / 2, eta, 1 - nu],
'M1': [1 / 2, 1 - eta, nu],
'M2': [1 / 2, eta, -nu],
'X': [0, 1 / 2, 0],
'Y': [0, 0, 1 / 2],
'Y1': [0, 0, -1 / 2],
'Z': [1 / 2, 0, 0]}
def findLabel(labels, value):
for k, v in labels.items():
if np.all(np.abs(v-value) < 1.e-5):
return k
return "?"
def main():
metapath = '../../../../nomad-meta-info/meta_info/nomad_meta_info/' +\
'common.nomadmetainfo.json'
metaInfoPath = os.path.normpath(
os.path.join(os.path.dirname(os.path.abspath(__file__)), metapath))
metaInfoEnv, warns = loadJsonFile(filePath=metaInfoPath,
dependencyLoader=None,
extraArgsHandling=InfoKindEl.ADD_EXTRA_ARGS,
uri=None)
backend = JsonParseEventsWriterBackend(metaInfoEnv)
calcContext = sys.argv[1]
backend.startedParsingSession(
calcContext,
parserInfo = {'name':'BandNormalizer', 'version': '1.0'})
backend.openNonOverlappingSection("section_k_band_normalized")
specialPoints = {}
try:
specialPoints = get_special_points(convert_unit_function("m","angstrom")(self.cell))
except:
logging.exception("failed to get special points")
for isegment in range(nsegments):
backend.openNonOverlappingSection("section_k_band_segment_normalized")
backend.addArrayValues("band_energies_normalized", energies[:, isegment, :, :] - max(self.vbTopE))
backend.addArrayValues("band_occupations_normalized", occ[:, isegment, :, :])
backend.addArrayValues("band_k_points_normalized", kpt[isegment])
backend.addArrayValues("band_segm_start_end_normalized", np.asarray([kpt[isegment, 0], kpt[isegment, divisions - 1]]))
backend.addValue("band_segm_labels_normalized",
[findLabel(specialPoints, kpt[isegment, 0]),
findLabel(specialPoints, kpt[isegment, divisions - 1])])
backend.closeNonOverlappingSection("section_k_band_segment_normalized")
backend.closeNonOverlappingSection("section_k_band_normalized")
if self.vbTopE:
eRef = max(self.vbTopE)
else:
eRef = self.eFermi
backend.addArrayValues("dos_energies_normalized", dosE - eRef)
backend.finishedParsingSession("ParseSuccess", None)
sys.stdout.flush()
return
if __name__ == "__main__":
main()
import sys, os, os.path
baseDir = os.path.dirname(os.path.abspath(__file__))
commonDir = os.path.normpath(os.path.join(baseDir,"../../../../python-common/common/python"))
if not commonDir in sys.path:
sys.path.insert(0, commonDir)
package eu.nomad_lab.normalizers
import eu.{ nomad_lab => lab }
import eu.nomad_lab.DefaultPythonInterpreter
import org.{ json4s => jn }
import scala.collection.breakOut
import eu.nomad_lab.normalize.ExternalNormalizerGenerator
import eu.nomad_lab.meta
import eu.nomad_lab.query
import eu.nomad_lab.resolve._
import eu.nomad_lab.h5.EmitJsonVisitor
import eu.nomad_lab.h5.H5EagerScanner
import eu.nomad_lab.parsers.ExternalParserWrapper
import eu.nomad_lab.JsonUtils
import scala.collection.mutable.StringBuilder
object BandStructureNormalizer extends ExternalNormalizerGenerator(
name = "BandStructureNormalizer",
info = jn.JObject(
("name" -> jn.JString("BandStructureNormalizer")) ::
("parserId" -> jn.JString("BandStructureNormalizer" + lab.BandStructureVersionInfo.version)) ::
("versionInfo" -> jn.JObject(
("nomadCoreVersion" -> jn.JObject(lab.NomadCoreVersionInfo.toMap.map {
case (k, v) => k -> jn.JString(v.toString)
}(breakOut): List[(String, jn.JString)])) ::
(lab.BandStructureVersionInfo.toMap.map {
case (key, value) =>
(key -> jn.JString(value.toString))
}(breakOut): List[(String, jn.JString)])
)) :: Nil
),
context = "calculation_context",
filter = query.CompiledQuery(query.QueryExpression("section_k_band and not section_k_band_normalized"), meta.KnownMetaInfoEnvs.all),
cmd = Seq(DefaultPythonInterpreter.pythonExe(), "${envDir}/normalizers/band-structure/normalizer/normalizer-band-structure/normalizer_band_structure.py",
"${context}", "${archivePath}"),
resList = Seq(
"normalizer-band-structure/normalizer_band_structure.py",
"normalizer-band-structure/setup_paths.py",
"nomad_meta_info/public.nomadmetainfo.json",
"nomad_meta_info/common.nomadmetainfo.json",
"nomad_meta_info/meta_types.nomadmetainfo.json",
"nomad_meta_info/stats.nomadmetainfo.json"
) ++ DefaultPythonInterpreter.commonFiles(),
dirMap = Map(
"normalizer-band-structure" -> "normalizers/band-structure/normalizer/normalizer-band-structure",
"nomad_meta_info" -> "nomad-meta-info/meta_info/nomad_meta_info",
"python" -> "python-common/common/python/nomadcore"
) ++ DefaultPythonInterpreter.commonDirMapping(),
metaInfoEnv = lab.meta.KnownMetaInfoEnvs.stats
) {
val trace: Boolean = false
override def stdInHandler(context: ResolvedRef)(wrapper: ExternalParserWrapper)(pIn: java.io.OutputStream): Unit = {
val out: java.io.Writer = if (trace)
new java.io.BufferedWriter(new java.io.OutputStreamWriter(pIn));
else
null
val stringBuilder = new StringBuilder
def writeOut(s: String): Unit = {
out.write(s)
if (trace) stringBuilder ++= s
}
def flush(): Unit = {
out.flush()
if (trace) {
logger.info(stringBuilder.result())
stringBuilder.clear()
}
}
writeOut("[")
var isFirst = true
try {
context match {
case Calculation(archiveSet, c) =>
for (b <- c.sectionTable(Seq("section_run", "section_single_configuration_calculation", "section_k_band"))) {
if (!isFirst)
writeOut(",")
else
isFirst = false
writeOut(s"""{
| "context": ${JsonUtils.escapeString(b.toRef.toUriStr(archiveSet.objectKind))},
| "section_k_band": """.stripMargin)
val visitor = new EmitJsonVisitor(
writeOut = writeOut
)
val scanner = new H5EagerScanner
scanner.scanResolvedRef(Section(archiveSet, b), visitor)
def writeOutRefE(refEName: String, refE: Option[Seq[Double]]): Unit = {
refE match {
case Some(e) =>
writeOut(s""",
| "$refEName": ${e.mkString("[",",","]")}""".stripMargin)
case None => ()
}
}
b.parentSection.map{ singleConf: SectionH5 =>
val eFermi = singleConf.maybeValue("energy_reference_fermi")(_.seqDoubleValue)
writeOutRefE("energy_reference_fermi", eFermi)
val eVbTop = singleConf.maybeValue("energy_reference_highest_occupied").map(_.seqDoubleValue)
writeOutRefE("energy_reference_highest_occupied", eVbTop)
var contextsToGiveBack = Seq()
try {
// get DOS
val dosInfo: Option[(Option[Seq[Double]], Option[Seq[Double]], SectionH5)] = b.parentSection.map{ singleConf: SectionH5 =>
// try locally
val dosColl = singleConf.subSectionCollection("section_dos")
if (dosColl.length > 0) {
if (dosColl.length > 1)
logging.warn(s"multiple dos in $singleConf, using first")
Some(eFermi, eVbTop, dosColl(0))
} else {
// look in archive
val sysIdx = singleConf.maybeValue("single_configuration_calculation_to_system_ref").map( _.longValue )
val sys = singleConf.parentSection.map{ run: SectionH5 =>
run.sectionTable("section_system")(sysIdx)
}
sys.maybeValue("configuration_raw_gid").map( _.stringValue) match {
case Some(confId) =>
val possibleDosScanOp = new query.CollectUrisScanOp(
context = "section_single_configuration_calculation",
filter = query.CompiledQuery(query.QueryExpression("section_dos and configuration_raw_gid = \"$confId\"")),
metaInfoEnv0 = None)
val evaluator = new query.DirectArchiveScanner(
archiveSet = archiveSet,
archive = c.archive,
scanOp = possibleDosScanOp
)
evaluator.run()
val uris = possibleDosScanOp.uris
evaluator.cleanup()
if (uris.length > 0) {
if (uris.length > 1) {
logger.warn(s"found multiple DoS potentially matching to $b in archive: $uris, shoudl check method more carefully, skipping for now...")
None
} else {
val dosCtx = resolve.resolveInArchiveSet(archiveSet, NomadUri(uris(0)).toRef)
contextsToGiveBack += dosCtx
dosCtx match {
case Section(_,dosSingleConf) =>
val dosEVbTop = dosSingleConf.maybeValue("energy_reference_highest_occupied").map(_.doubleValue)
val dosEFermi = dosSingleConf.maybeValue("energy_reference_fermi").map(_.doubleValue)
if (dosCollection.length > 0) {
if (dosCollection.length > 1)
logger.warn(s"multiple dos in $singleConf, using first")
Some(dosEFermi, dosEVbTop, dosColl(0))
} else {
logger.warn(s"query error, expected a section_dos in $dosSingleConf")
None
}
case _ =>
logger.warn(s"query error, expected valid section as result of query, not ${dosCtx.uriString}")
None
}
}
} else {
None
}
case None =>
logger.warn(s"Could not find configuration_raw_gid for configuration of band structure $b, search of DoS failed.")
None
}
}
}
dosInfo match {
case Some((dFermi, dVBTop, dos)) =>
writeOut(""",
| "dos": {
| "section_dos":""".stripMargin)
scanner.scanResolvedRef(Section(archiveSet, dos), visitor)
writeOutRefE("energy_reference_fermi", dFermi)
writeOutRefE("energy_reference_highest_occupied", dVBTop)
writeOut("\n }")
case None => ()
}
} finally {
for (ctx <- contextsToGiveBack)
ctx.giveBack()
}
writeOut("\n}")
flush()
}
writeOut("]\n")
flush()
case r =>
throw new Exception(s"BandStructureNormalizer expected a calculation as context, but got $r")
}
} finally {
out.close()
pIn.close()
wrapper.sendStatus = ExternalParserWrapper.SendStatus.Finished
}
}
}
package eu.nomad_lab.normalizers
import eu.nomad_lab.{ parsers, DefaultPythonInterpreter }
import org.scalacheck.Properties
import org.specs2.mutable.Specification
import org.{ json4s => jn }
object BandStructureNormalizerSpec extends Specification {
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment