-
Ihrig, Arvid Conrad (ari) authoredIhrig, Arvid Conrad (ari) authored
caching_backend.py 26.01 KiB
from builtins import str
from builtins import zip
from builtins import range
from builtins import object
from functools import reduce
import json
import numpy as np
import sys
import logging
from nomadcore import parser_backend
from nomadcore.unit_conversion import unit_conversion
class CachingLevel(object):
Forward = 1
Cache = 2
ForwardAndCache = 3
Ignore = 4
PreOpenedCache = 5
PreOpenedIgnore = 6
@classmethod
def toText(cls, cl):
if cl == cls.Forward:
return "Forward"
elif cl == cls.ForwardAndCache:
return "ForwardAndCache"
elif cl == cls.Cache:
return "Cache"
elif cl == cls.Ignore:
return "Ignore"
elif cl == cls.PreOpenedIgnore:
return "PreOpenedIgnore"
elif cl == cls.PreOpenedCache:
return "PreOpenedIgnore"
else:
return "CachingLevel(%s)" % cl
@classmethod
def restrict(cls, a, b):
"""restricts a keeping in account that it is contained in b (i.e. if b is not forwarded then also a cannot be forwarded)"""
if b == cls.Cache or b == cls.Ignore:
if a == cls.Forward:
return cls.Ignore
elif a == cls.ForwardAndCache:
return cls.Cache
return a
class CachingSection(object):
"""Represents an open section, and can cache values"""
def __init__(self, gIndex, references, storeInSuper = False, activeBackend=None):
self.gIndex = gIndex
self.references = references
self.simpleValues = {}
self.arrayValues = {}
self.subSectionValues = {}
self.storeInSuper = storeInSuper
self.activeBackend = activeBackend
def __getitem__(self, metaName):
"""Returns the cached values corresponding to metaName, or None if
there aren't any. You can search values and subsections.
"""
res = self.simpleValues.get(metaName, None)
if res:
return res
res = self.arrayValues.get(metaName, None)
if res:
return res
res = self.subSectionValues.get(metaName, None)
return res
def get_latest_value(self, metaname_list):
"""Goes through the given list of metanames and tries to find the data
by succesfully opening the latest value from within this section and
it's subsections. If the given path leads to an actual value, it is
returned.
Args:
metaname_list: list of metainfo names
Returns:
The value that if found when successively opening the data in this
section and it's subsection. If some of the metanames in the given
list are not found, returns None.
"""
# Make path into a list
if not isinstance(metaname_list, (tuple, list)):
metaname_list = [metaname_list]
# Go deeper into the section structure by iterating through the given
# list of metanames.
current = self
for metaname in metaname_list:
new_section = current[metaname]
if new_section is not None:
current = new_section[-1]
else:
return None
return current
def add_latest_value(self, metaname_list, metaname, require=False):
latest_value = self.get_latest_value(metaname_list)
if latest_value is not None:
self.activeBackend.addValue(metaname, latest_value)
else:
if require:
raise KeyError("Could not add the latest value from path '{}' because not value was found.")
def add_latest_array_values(self, metaname_list, metaname, require=False):
latest_value = self.get_latest_value(metaname_list)
if latest_value is not None:
self.activeBackend.addArrayValues(metaname, latest_value)
else:
if require:
raise KeyError("Could not add the latest value from path '{}' because not value was found.")
def addValue(self, metaInfo, value):
vals = self.simpleValues.get(metaInfo.name, None)
if vals is None:
self.simpleValues[metaInfo.name] = [value]
else:
vals.append(value)
def setArrayValues(self, metaInfo, values, offset = None):
vals = self.arrayValues.get(metaInfo.name, None)
if vals is None:
raise Exception("setArrayValues(%s,...) called before adding a value" % metaInfo.name)
else:
if offset:
idxs = [slice(offset[i], offset[i] + values.shape[i]) for i in range(len(offset))]
else:
idxs = [slice(0, x) for x in values.shape]
vals[len(vals) - 1][idxs] = values
def addArrayValues(self, metaInfo, values):
vals = self.arrayValues.get(metaInfo.name, None)
if vals is None:
self.arrayValues[metaInfo.name] = [values]
else:
vals.append(values)
def addSubsection(self, metaInfo, section):
vals = self.subSectionValues.get(metaInfo.name, None)
if vals is None:
self.subSectionValues[metaInfo.name] = [section]
else:
vals.append(section)
def __str__(self):
return """CachingSection{{
gIndex:{0},
references:{1},
simpleValues:{2},
arrayValues.keys:{3},
subSectionValues.keys:{4},
storeInSuper:{5},
}}""".format(
self.gIndex,
self.references,
self.simpleValues,
list(self.arrayValues.keys()),
list(self.subSectionValues.keys()),
self.storeInSuper)
class CachingSectionManager(object):
def __init__(self, metaInfo, parentSectionNames, lastSectionGIndex = -1, openSections = {}, onClose = [], onOpen = [], storeInSuper = False, forwardOpenClose = True, preOpened = False):
self.metaInfo = metaInfo
self.parentSectionNames = parentSectionNames
self.lastSectionGIndex = lastSectionGIndex
self.openSections = dict(openSections)
self.onClose = list(onClose)
self.onOpen = list(onOpen)
self.storeInSuper = storeInSuper
self.forwardOpenClose = forwardOpenClose
self.preOpened = preOpened
def setSectionInfo(self, gIndex, references):
self.openSections[gIndex].references = [references[x] for x in self.parentSectionNames]
def get_latest_section(self):
"""Returns the latest opened section if it is still open.
"""
section = self.openSections.get(self.lastSectionGIndex)
return section
def openSection(self, backend):
newGIndex = self.lastSectionGIndex + 1
self.openSectionWithGIndex(backend, newGIndex)
return newGIndex
def openSectionWithGIndex(self, backend, gIndex):
self.lastSectionGIndex = gIndex
references = []
for parentName in self.parentSectionNames:
pSect = backend.sectionManagers.get(parentName, None)
if pSect:
references.append(pSect.lastSectionGIndex)
else:
references.append(-1)
opened = CachingSection(gIndex, references, storeInSuper = self.storeInSuper, activeBackend=backend)
self.openSections[gIndex] = opened
for onOpenF in self.onOpen:
onOpenF(backend, gIndex, opened)
def closeSection(self, backend, gIndex):
toClose = self.openSections[gIndex]
for onCloseF in self.onClose:
onCloseF(backend, gIndex, toClose)
if toClose.storeInSuper:
for superGIndex, superName in zip(toClose.references,self.parentSectionNames):
superSect = backend.sectionManagers[superName].openSections.get(superGIndex, None)
if superSect:
superSect.addSubsection(self.metaInfo, toClose)
else:
backend.storeToClosedSuper(superName, superGIndex, self.metaInfo, toClose)
if not toClose.references: # checks for empty list
backend.parsingSession.addSubsection(self.metaInfo, toClose)
if self.forwardOpenClose and backend.superBackend:
backend.superBackend.closeSection(self.metaInfo.name, gIndex)
del self.openSections[gIndex]
def openSectionInfo(self, gIndex):
section = self.openSections.get(gIndex, None)
if section:
return "section {name} ({references})".format(
name = section.name,
references = list(zip(self.parentSectionNames, section.references)))
else:
return "section {gIndex} in {name} is not open!!".format(
name = self.metaInfo.name,
gIndex = gIndex)
def addValue(self, valueMetaInfo, value, gIndex):
if (gIndex == -1):
gI = self.lastSectionGIndex
else:
gI = gIndex
try:
self.openSections[gI].addValue(valueMetaInfo, value)
except:
raise Exception("Cannot add value for metadata %s to section %d (%d) of %s, as it is not open" % (valueMetaInfo.name, gI, gIndex, self.metaInfo.name))
def setArrayValues(self, valueMetaInfo, value, offset = None, gIndex = -1):
if gIndex == -1:
gI = self.lastSectionGIndex
else:
gI = gIndex
try:
self.openSections[gI].setArrayValues(valueMetaInfo, value, offset)
except:
raise Exception("Cannot set array values for metadata %s to section %d (%d) of %s, as it is not open" % (valueMetaInfo.name, gI, gIndex, self.metaInfo.name))
def addArrayValues(self, valueMetaInfo, value, offset = None, gIndex = -1):
if gIndex == -1:
gI = self.lastSectionGIndex
else:
gI = gIndex
try:
self.openSections[gI].addArrayValues(valueMetaInfo, value)
except:
raise Exception("Cannot add array values for metadata %s to section %d (%d) of %s, as it is not open" % (valueMetaInfo.name, gI, gIndex, self.metaInfo.name))
class CachingDataManager(object):
def __init__(self, metaInfo, superSectionManager, cachingLevel):
self.metaInfo = metaInfo
self.superSectionManager = superSectionManager
self.cachingLevel = cachingLevel
class ActiveBackend(object):
def __init__(self, metaInfoEnv, sectionManagers, dataManagers, superBackend, propagateStartFinishParsing = True, default_units=None, metainfo_units=None):
self.__metaInfoEnv = metaInfoEnv
self.sectionManagers = sectionManagers
self.dataManagers = dataManagers
self.superBackend = superBackend
self.propagateStartFinishParsing = propagateStartFinishParsing
self.default_units = default_units # A mapping between dimension and an unit definition.
self.metainfo_units = metainfo_units # A mapping between metaname and an unit definition.
@classmethod
def activeBackend(cls, metaInfoEnv, cachingLevelForMetaName = {}, defaultDataCachingLevel = CachingLevel.ForwardAndCache, defaultSectionCachingLevel = CachingLevel.Forward, superBackend = None,
onClose = {}, onOpen = {}, propagateStartFinishParsing = True, default_units=None, metainfo_units=None):
for sectionName in onClose.keys():
if not sectionName in metaInfoEnv:
raise Exception("Found trigger for non existing section %s" % sectionName)
elif metaInfoEnv.infoKinds[sectionName].kindStr != "type_section":
raise Exception("Found trigger for %s which is not a section but %s" %
(sectionName, json.dumps(metaInfoEnv.infoKinds[sectionName].toDict(), indent=2)))
for sectionName in onOpen.keys():
if not sectionName in metaInfoEnv:
raise Exception("Found trigger for non existing section %s" % sectionName)
elif metaInfoEnv.infoKinds[sectionName].kindStr != "type_section":
raise Exception("Found trigger for %s which is not a section but %s" %
(sectionName, json.dumps(metaInfoEnv.infoKinds[sectionName].toDict(), indent=2)))
sectionManagers = {}
for ikNames, ik in metaInfoEnv.infoKinds.items():
if ik.kindStr == "type_section":
parentS, parentO = list(metaInfoEnv.firstAncestorsByType(ik.name).get("type_section", [[],[]]))
parentS.sort()
cachingLevel = reduce(CachingLevel.restrict, [cachingLevelForMetaName.get(x, defaultSectionCachingLevel) for x in ([ik.name] + parentS + parentO)])
sectionManagers[ik.name] = CachingSectionManager(
metaInfo = ik,
parentSectionNames = parentS,
storeInSuper = (cachingLevel == CachingLevel.ForwardAndCache or cachingLevel == CachingLevel.Cache or cachingLevel == CachingLevel.PreOpenedCache),
forwardOpenClose = (cachingLevel == CachingLevel.Forward or cachingLevel == CachingLevel.ForwardAndCache),
preOpened = (cachingLevel == CachingLevel.PreOpenedCache or cachingLevel == CachingLevel.PreOpenedIgnore),
onClose = onClose.get(ik.name, []),
onOpen = onOpen.get(ik.name, []))
dataManagers = {}
for ikNames, ik in metaInfoEnv.infoKinds.items():
if ik.kindStr == "type_document_content" or ik.kindStr == "type_dimension":
superSectionNames = metaInfoEnv.firstAncestorsByType(ik.name).get("type_section", [[]])[0]
if not superSectionNames:
raise Exception("MetaInfo of conrete value %s is not in any superSection" % ik.name)
elif len(superSectionNames) > 1:
raise Exception("MetaInfo of concrete value %s has multiple superSections (%s)" %
(ik.name, superSectionNames))
sectionManager = sectionManagers[superSectionNames[0]]
dataManagers[ik.name] = CachingDataManager(ik, sectionManager,
CachingLevel.restrict(cachingLevelForMetaName.get(ik.name, defaultDataCachingLevel), CachingLevel.Forward if sectionManager.forwardOpenClose or sectionManager.preOpened else CachingLevel.Ignore))
return ActiveBackend(metaInfoEnv, sectionManagers, dataManagers, superBackend, propagateStartFinishParsing, default_units, metainfo_units)
def appendOnClose(self, sectionName, onClose):
self.sectionManagers.onClose.append(onClose)
def get_latest_section(self, metaname):
"""Returns the latest opened section with the given metaname if it has
not been closed.
"""
manager = self.sectionManagers.get(metaname)
if manager:
return manager.get_latest_section()
def appendOnOpen(self, sectionName, onOpen):
self.sectionManagers.onOpen.append(onOpen)
def storeToClosedSuper(self, superName, superGIndex, metaInfo, toClose):
logging.getLogger("nomadcore.caching_backend").warn(
"Dropping section {name} gIndex {gIndex}, as it cannot be added to closed super section {superName} gIndex: {superGIndex}".format(
name = metaInfo.name,
gIndex = toClose.gIndex,
superName = superName,
superGIndex = superGIndex))
def startedParsingSession(self, mainFileUri, parserInfo, parsingStatus = None, parsingErrors = None):
"""should be called when the parsing starts, parserInfo should be a valid json dictionary"""
if self.propagateStartFinishParsing and self.superBackend:
self.superBackend.startedParsingSession(mainFileUri, parserInfo, parsingStatus, parsingErrors)
def finishedParsingSession(self, parsingStatus, parsingErrors, mainFileUri = None, parserInfo = None,
parsingStats = None):
"""should be called when the parsing finishes"""
if self.propagateStartFinishParsing and self.superBackend:
self.superBackend.finishedParsingSession(parsingStatus, parsingErrors, mainFileUri, parserInfo, parsingStats)
def addMatchTelemetry(self, match_telemetry, gIndex = -1):
""" should be called for outputting match telemetry data:
input data, together with capture info """
if self.superBackend:
self.superBackend.addMatchTelemetry(match_telemetry, gIndex)
def metaInfoEnv(self):
"""the metaInfoEnv this parser was optimized for"""
return self.__metaInfoEnv
def openSections(self):
"""returns the sections that are still open
sections are identified by metaName and their gIndex"""
res = set()
for manager in self.sectionManagers:
res.extend(map(lambda x: (manager.metaInfo.name, x), manager.openSections.keys()))
return res
def sectionInfo(self, metaName, gIndex):
"""returns information on a section (for debugging purposes)"""
manager = self.sectionManagers[metaName]
section = manager.openSections.get(gIndex, None)
if section:
return "open section {} gIndex: {}, references: {}".format(metaName, gIndex, section.references)
else:
return "closed section {} gIndex: {}".format(metaName, gIndex)
def openSection(self, metaName):
"""opens a new section and returns its new unique gIndex"""
manager = self.sectionManagers[metaName]
if self.superBackend and manager.forwardOpenClose:
newGIndex = self.superBackend.openSection(metaName)
manager.openSectionWithGIndex(self, newGIndex)
else:
newGIndex = manager.openSection(self)
return newGIndex
def openNonOverlappingSection(self, metaName):
"""opens a new section that is expected not to overlap with itself and returns its new unique gIndex"""
manager = self.sectionManagers[metaName]
if len(manager.openSections) != 0:
raise Exception("Section %s was not expected to overlap" % metaName)
return self.openSection(metaName)
def openSectionWithGIndex(self, metaName, gIndex):
"""opens a new section where gIndex is generated externally
gIndex should be unique (no reopening of a closed section)"""
manager = self.sectionManagers[metaName]
if self.superBackend and manager.forwardOpenClose:
self.superBackend.openSectionWithGIndex(metaName, newGIndex)
manager.openSectionWithGIndex(newGIndex)
def setSectionInfo(self, metaName, gIndex, references):
"""sets info values of an open section
references should be a dictionary with the gIndexes of the root sections this section refers to"""
manager = self.sectionManagers[metaName]
manager.setSectionInfo(gIndex, references)
if self.superBackend and manager.forwardOpenClose:
self.superBackend.setSectionInfo(metaName, gIndex, references)
def closeSection(self, metaName, gIndex):
"""closes a section
after this no other value can be added to the section
metaName is the name of the meta info, gIndex the index of the section"""
manager = self.sectionManagers[metaName]
manager.closeSection(self, gIndex)
def closeNonOverlappingSection(self, metaName):
"""closes a non overlapping section
after this no other value can be added to the section
metaName is the name of the meta info, gIndex the index of the section"""
manager = self.sectionManagers[metaName]
openGIndexes = list(manager.openSections.keys())
if len(openGIndexes) != 1:
if not openGIndexes:
raise Exception("Call to closeNonOverlapping(%s) with no open section" % metaName)
else:
raise Exception("Section %s was not supposed to overlap, found %s open when closing" % (metaName, openGIndexes))
manager.closeSection(self, openGIndexes[0])
def addValue(self, metaName, value, gIndex = -1):
"""adds a json value corresponding to metaName
the value is added to the section the meta info metaName is in
a gIndex of -1 means the latest section
"""
dataManager = self.dataManagers[metaName]
cachingLevel = dataManager.cachingLevel
if cachingLevel == CachingLevel.Forward or cachingLevel == CachingLevel.ForwardAndCache:
if self.superBackend:
self.superBackend.addValue(metaName, value, gIndex)
if cachingLevel == CachingLevel.ForwardAndCache or cachingLevel == CachingLevel.Cache or cachingLevel == CachingLevel.PreOpenedCache:
dataManager.superSectionManager.addValue(dataManager.metaInfo, value, gIndex)
def addRealValue(self, metaName, value, gIndex = -1, unit=None):
"""Adds a floating point value corresponding to metaName The value is
added to the section the meta info metaName is in A gIndex of -1 means
the latest section.
"""
if unit is not None:
value = self.convert_unit(metaName, value, unit)
dataManager = self.dataManagers[metaName]
cachingLevel = dataManager.cachingLevel
if cachingLevel == CachingLevel.Forward or cachingLevel == CachingLevel.ForwardAndCache:
if self.superBackend:
self.superBackend.addRealValue(metaName, value, gIndex)
if cachingLevel == CachingLevel.ForwardAndCache or cachingLevel == CachingLevel.Cache or cachingLevel == CachingLevel.PreOpenedCache:
dataManager.superSectionManager.addValue(dataManager.metaInfo, value, gIndex)
def addArray(self, metaName, shape, gIndex = -1):
"""adds a new array value of the given size corresponding to metaName
the value is added to the section the meta info metaName is in
a gIndex of -1 means the latest section
the array is unitialized"""
dataManager = self.dataManagers[metaName]
cachingLevel = dataManager.cachingLevel
if cachingLevel == CachingLevel.Forward or cachingLevel == CachingLevel.ForwardAndCache:
if self.superBackend:
self.superBackend.addArray(metaName, shape, gIndex)
if cachingLevel == CachingLevel.ForwardAndCache or cachingLevel == CachingLevel.Cache or cachingLevel == CachingLevel.PreOpenedCache:
dataManager.superSectionManager.addArray(dataManager.metaInfo, shape, gIndex)
def setArrayValues(self, metaName, values, offset = None, gIndex = -1, unit=None):
"""Adds values to the last array added, array must be a numpy array
"""
if unit is not None:
values = self.convert_unit(metaName, values, unit)
dataManager = self.dataManagers[metaName]
cachingLevel = dataManager.cachingLevel
if cachingLevel == CachingLevel.Forward or cachingLevel == CachingLevel.ForwardAndCache or cachingLevel == CachingLevel.PreOpenedCache:
if self.superBackend:
self.superBackend.setArrayValues(metaName, values, offset, gIndex)
if cachingLevel == CachingLevel.ForwardAndCache or cachingLevel == CachingLevel.Cache or cachingLevel == CachingLevel.PreOpenedCache:
dataManager.superSectionManager.setArrayValues(dataManager.metaInfo, values, offset, gIndex)
def addArrayValues(self, metaName, values, gIndex = -1, unit=None):
"""Adds an array value with the given array values. Values must be a
numpy array.
"""
if unit is not None:
values = self.convert_unit(metaName, values, unit)
dataManager = self.dataManagers[metaName]
cachingLevel = dataManager.cachingLevel
if cachingLevel == CachingLevel.Forward or cachingLevel == CachingLevel.ForwardAndCache:
if self.superBackend:
self.superBackend.addArrayValues(metaName, values, gIndex=gIndex)
if cachingLevel == CachingLevel.ForwardAndCache or cachingLevel == CachingLevel.Cache or cachingLevel == CachingLevel.PreOpenedCache:
dataManager.superSectionManager.addArrayValues(dataManager.metaInfo, values, gIndex=gIndex)
def convertScalarStringValue(self, metaName, strValue):
"""converts a scalar string value of the given meta info to a python value"""
metaInfo = self.metaInfoEnv().infoKindEl(metaName)
dtypeStr = metaInfo.dtypeStr
return parser_backend.valueForStrValue(strValue, dtypeStr)
def arrayForMetaInfo(self, metaName, shape):
"""Returns an array with the correct type for the given meta info, and the given shape"""
metaInfo = self.metaInfoEnv().infoKindEl(metaName)
dtypeStr = metaInfo.dtypeStr
return np.zeros(shape, dtype = parser_backend.numpyDtypeForDtypeStr(dtypeStr))
def convert_unit(self, metaName, value, unit):
"""Try to perform a unit conversion.
"""
# If a unit has been specified in the metainfo, check that the
# conversion can be done
metaInfo = self.metaInfoEnv().infoKindEl(metaName)
metainfo_unit = metaInfo.units
if metainfo_unit:
metainfo_dim = unit_conversion.ureg(metainfo_unit).dimensionality
source_dim = unit_conversion.ureg(unit).dimensionality
if metainfo_dim != source_dim:
raise Exception("Unit conversion error with metainfo '{}'. Could not convert the given unit '{}' to target unit '{}' specified in the metainfo.".format(metaName, unit, metainfo_unit))
# If there is a metaname specific unit request, convert to it
target_unit = None
if self.metainfo_units is not None:
target_unit = self.metainfo_units.get(metaName)
# If there is a dimension specific unit request, convert to it
if target_unit is None:
if self.default_units is not None:
source_dim = unit_conversion.ureg(unit).dimensionality
map_unit = self.default_units.get(str(source_dim))
if map_unit:
target_unit = map_unit
converted = unit_conversion.convert_unit(value, unit, target_unit)
if converted is None:
raise Exception("ActiveBackend cannot convert to unit '{}'".format(unit))
return converted
def pwarn(self, msg):
"""Writes a parser warning message"""
self.addValue("parsing_message_warning_run", msg)