Commit d1e880e4 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored archive module; implemented RequiredReader with reference resolution.

parent 688c242b
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
The archive storage is made from two tiers. First the whole archive is stored in
files, secondly parts of the archive are stored in mongodb documents.
The file storage is done in msg-pack files. Each file contains the archive of many
entries (all entries of an upload). These msg-pack files contain the JSON serialized
version of the metainfo archive (see module:`nomad.metainfo`). In addition msg-pack
contains TOC information for quicker access of individual sections. See :func:`write_archive`
and :func:`read_archvive`. In addition there query functionality to partially read
specified sections from an archive: func:`query_archive`.
The mongo storage uses mongodb's native bson to store JSON serialized metainfo archive
data. Each document in mongodb holds the partial archive of single entry. Which parts
of an archive are stored in mongo is determined by the metainfo and
section annotations/categories.
'''
from .storage import (
write_archive, read_archive, ArchiveError, ArchiveReader, ArchiveWriter,
ArchiveObject, ArchiveList, ArchiveItem)
from .query import query_archive, filter_archive, ArchiveQueryError
from .partial import (
read_partial_archive_from_mongo, read_partial_archives_from_mongo,
write_partial_archive_to_mongo, delete_partial_archives_from_mongo,
create_partial_archive, compute_required_with_referenced)
from .required import RequiredReader, RequiredValidationError
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Tuple, Dict, Union, List
from nomad import infrastructure, config
from nomad.metainfo import MSection, Definition, Quantity, Reference, SubSection, Section
from nomad.datamodel import EntryArchive
from nomad.datamodel.metainfo.common import FastAccess
def create_partial_archive(archive: EntryArchive) -> Dict:
'''
Creates a partial archive JSON serializable dict that can be stored directly.
The given archive is filtered based on the metainfo category ``FastAccess``.
Selected sections and other data that they reference (recursively) comprise the
resulting partial archive.
TODO at the moment is hard coded and NOT informed by the metainfo. We simply
add sections EntryMetadata and Workflow.
Arguments:
archive: The archive as an :class:`EntryArchive` instance.
Returns: the partial archive in JSON serializable dictionary form.
'''
# A list with all referenced sections that might not yet been ensured to be in the
# resulting partial archive
referenceds: List[MSection] = []
# contents keeps track of all sections in the partial archive by keeping their
# JSON serializable form and placeholder status in a dict
contents: Dict[MSection, Tuple[dict, bool]] = dict()
def partial(definition: Definition, section: MSection) -> bool:
'''
``m_to_dict`` partial function that selects what goes into the partial archive
and what not. It also collects references as a side-effect.
'''
if section.m_def == EntryArchive.m_def:
if definition.m_def == Quantity:
return True
return FastAccess.m_def in definition.categories
if isinstance(definition, Quantity) and isinstance(definition.type, Reference) \
and FastAccess.m_def in definition.categories:
# Reference list in partial archives are not supported
if definition.is_scalar:
referenced = getattr(section, definition.name)
if referenced is not None and referenced not in contents:
referenceds.append(referenced)
if isinstance(definition, SubSection):
return FastAccess.m_def in definition.categories
return True
# add the main content
partial_contents = archive.m_to_dict(partial=partial)
# add the referenced data
def add(section, placeholder=False) -> dict:
'''
Adds the given section to partial_contents at the right place. If not a placeholder,
the section's serialization is added (or replacing an existing placeholder).
Otherwise, an empty dict is added as a placeholder for referenced children.
'''
result: Dict[str, Any] = None
content, content_is_placeholder = contents.get(section, (None, True))
if content is not None:
if content_is_placeholder and not placeholder:
# the placeholder gets replaced later
pass
else:
return content
if section.m_parent is None:
contents[section] = partial_contents, False
return partial_contents
parent_dict = add(section.m_parent, placeholder=True)
if placeholder:
result = {}
else:
result = section.m_to_dict(partial=partial)
sub_section = section.m_parent_sub_section
if sub_section.repeats:
sections = parent_dict.setdefault(sub_section.name, [])
while len(sections) < section.m_parent_index + 1:
sections.append(None)
sections[section.m_parent_index] = result
else:
parent_dict[sub_section.name] = result
contents[section] = result, placeholder
return result
# we add referenced objects as long as they are added by subsequent serialization
# of referenced sections to implement the recursive nature of further references in
# already referenced sections.
while len(referenceds) > 0:
referenced = referenceds.pop()
add(referenced)
return partial_contents
def write_partial_archive_to_mongo(archive: EntryArchive):
''' Partially writes the given archive to mongodb. '''
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
mongo_id = archive.section_metadata.calc_id
partial_archive_dict = create_partial_archive(archive)
partial_archive_dict['_id'] = mongo_id
mongo_collection.replace_one(dict(_id=mongo_id), partial_archive_dict, upsert=True)
def read_partial_archive_from_mongo(entry_id: str, as_dict=False) -> Union[EntryArchive, Dict]:
'''
Reads the partial archive for the given id from mongodb.
Arguments:
entry_id: The entry id for the entry.
as_dict: Return the JSON serializable dictionary form of the archive not the
:class:`EntryArchive` form.
'''
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
archive_dict = mongo_collection.find_one(dict(_id=entry_id))
if as_dict:
return archive_dict
return EntryArchive.m_from_dict(archive_dict)
def delete_partial_archives_from_mongo(entry_ids: List[str]):
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
mongo_collection.delete_many(dict(_id={'$in': entry_ids}))
def read_partial_archives_from_mongo(entry_ids: List[str], as_dict=False) -> Dict[str, Union[EntryArchive, Dict]]:
'''
Reads the partial archives for a set of entries.
Arguments:
entry_ids: A list of entry ids.
as_dict: Return the JSON serializable dictionary form of the archive not the
:class:`EntryArchive` form.
Returns:
A dictionary with entry_ids as keys.
'''
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
archive_dicts = mongo_collection.find(dict(_id={'$in': entry_ids}))
if as_dict:
return {archive_dict.pop('_id'): archive_dict for archive_dict in archive_dicts}
return {
archive_dict.pop('_id'): EntryArchive.m_from_dict(archive_dict)
for archive_dict in archive_dicts}
__all_parent_sections: Dict[Section, Tuple[str, Section]] = {}
def _all_parent_sections():
if len(__all_parent_sections) == 0:
def add(section):
for sub_section in section.all_sub_sections.values():
sub_section_section = sub_section.sub_section.m_resolved()
__all_parent_sections.setdefault(sub_section_section, []).append((sub_section.qualified_name(), section, ))
add(sub_section_section)
add(EntryArchive.m_def)
return __all_parent_sections
class _Incomplete(Exception): pass
def compute_required_with_referenced(required):
'''
Updates the given required dictionary to ensure that references to non required
sections within a partial fast access archive are included. Only references that
are directly contained in required are added. References from wildcarded sub sections
are ignored.
Returns: A new required dict or None. None is returned if it is unclear if the required
is only accessing information of fast access partial archives.
'''
# TODO this function should be based on the metainfo
if not isinstance(required, dict):
return None
if any(key.startswith('section_run') for key in required):
return None
required = dict(**required)
def add_parent_section(section, child_required):
parent_sections = _all_parent_sections().get(section, [])
if len(parent_sections) == 0:
return [required]
result = []
for name, parent_section in parent_sections:
child_key = name.split('.')[-1]
for parent_required in add_parent_section(parent_section, None):
result.append(parent_required.setdefault(child_key, child_required if child_required else {}))
return result
def traverse(
current: Union[dict, str],
parent: Section = EntryArchive.m_def):
if isinstance(current, str):
return
current_updates = {}
for key, value in current.items():
prop = key.split('[')[0]
prop_definition = parent.all_properties[prop]
if isinstance(prop_definition, SubSection):
if FastAccess.m_def not in prop_definition.categories:
raise _Incomplete()
traverse(value, prop_definition.sub_section)
if isinstance(prop_definition, Quantity) and isinstance(prop_definition.type, Reference):
current_updates[prop] = '*'
if FastAccess.m_def not in prop_definition.categories:
continue
target_section_def = prop_definition.type.target_section_def.m_resolved()
# TODO is this a bug, should the result of the call not be used to traverse?
add_parent_section(target_section_def, value)
traverse(value, target_section_def)
current.update(**current_updates)
try:
traverse(dict(**required))
except _Incomplete:
# We realized that something is required that is not in the partial archive
return None
return required
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Tuple, Dict, Callable, Union, List
from io import BytesIO
import re
from nomad import utils
from .storage import ArchiveReader, ArchiveList, ArchiveObject
class ArchiveQueryError(Exception):
'''
An error that indicates that an archive query is either not valid or does not fit to
the queried archive.
'''
pass
__query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?\])?$')
def query_archive(
f_or_archive_reader: Union[str, ArchiveReader, BytesIO], query_dict: dict,
**kwargs) -> Dict:
'''
Takes an open msg-pack based archive (either as str, reader, or BytesIO) and returns
the archive as JSON serializable dictionary filtered based on the given required
(query_dict) specification.
Required example with some "required" (query_dict) features:
.. code-block:: json
{
"results": "recursive-resolve-inplace",
"workflow": {
"final": "resolve"
},
"simulation": {
"scc[-1]": {
"method": "recursive-resolve"
"system": {
"symmetry": "exclude",
"*": "include"
}
"dos": "recursive-include"
}
}
}
The different directives are:
* include ('*' is an alias), includes whole subtree, resolves on a ref
* recursive-resolve, includes whole subtree, resolves all refs recursively
* recursive-resolve-inplace, includes whole subtree, resolves all refs recursively
and replaces the ref with the resolved data
* exclude (in combination with wildcard keys), replaces the value with null
'''
def _to_son(data):
if isinstance(data, (ArchiveList, List)):
data = [_to_son(item) for item in data]
elif isinstance(data, ArchiveObject):
data = data.to_dict()
return data
def _load_data(query_dict: Dict[str, Any], archive_item: ArchiveObject) -> Dict:
query_dict_with_fixed_ids = {
utils.adjust_uuid_size(key): value for key, value in query_dict.items()}
return filter_archive(query_dict_with_fixed_ids, archive_item, transform=_to_son)
if isinstance(f_or_archive_reader, ArchiveReader):
return _load_data(query_dict, f_or_archive_reader)
elif isinstance(f_or_archive_reader, (BytesIO, str)):
with ArchiveReader(f_or_archive_reader, **kwargs) as archive:
return _load_data(query_dict, archive)
else:
raise TypeError('%s is neither a file-like nor ArchiveReader' % f_or_archive_reader)
def filter_archive(
required: Union[str, Dict[str, Any]], archive_item: Union[Dict, ArchiveObject, str],
transform: Callable, result_root: Dict = None, resolve_inplace: bool = False) -> Dict:
def _fix_index(index, length):
if index is None:
return index
if index < 0:
return max(-(length), index)
else:
return min(length, index)
if archive_item is None:
return None
if isinstance(required, str):
if required == 'exclude':
return None
if required == 'resolve':
# TODO this requires to reflect on the definition to determine what are references!
pass
elif required in ['*', 'include']:
pass
else:
raise ArchiveQueryError(f'unknown directive {required}')
return transform(archive_item)
elif not isinstance(required, dict):
raise ArchiveQueryError('a value in required is neither dict not string directive')
if isinstance(archive_item, str):
# The archive item is a reference, the required is still a dict, the references
# needs to be resolved
# TODO
raise ArchiveQueryError(f'resolving references in non partial archives is not yet implemented')
result: Dict[str, Any] = {}
for key, val in required.items():
key = key.strip()
# process array indices
match = __query_archive_key_pattern.match(key)
index: Union[Tuple[int, int], int] = None
if match:
key = match.group(1)
# check if we have indices
if match.group(2) is not None:
first_index, last_index = None, None
group = match.group(3)
first_index = None if group == '' else int(group)
if match.group(4) is not None:
group = match.group(5)
last_index = None if group == '' else int(group)
index = (0 if first_index is None else first_index, last_index)
else:
index = first_index # one item
else:
index = None
elif key == '*':
# TODO
raise ArchiveQueryError('key wildcards not yet implemented')
else:
raise ArchiveQueryError('invalid key format: %s' % key)
try:
archive_child = archive_item[key]
is_list = isinstance(archive_child, (ArchiveList, list))
if index is None and is_list:
index = (0, None)
elif index is not None and not is_list:
raise ArchiveQueryError('cannot use list key on none list %s' % key)
if index is None:
pass
else:
length = len(archive_child)
if isinstance(index, tuple):
index = (_fix_index(index[0], length), _fix_index(index[1], length))
if index[0] == index[1]:
archive_child = [archive_child[index[0]]]
else:
archive_child = archive_child[index[0]: index[1]]
else:
archive_child = [archive_child[_fix_index(index, length)]]
if isinstance(archive_child, (ArchiveList, list)):
result[key] = [
filter_archive(val, item, transform=transform)
for item in archive_child]
else:
result[key] = filter_archive(val, archive_child, transform=transform)
except (KeyError, IndexError):
continue
return result
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import cast, Union, Dict, Tuple, List, Any, Callable
import re
from nomad import utils
from nomad.metainfo import Definition, Section, Quantity, SubSection, Reference, QuantityReference
from .storage import ArchiveReader, ArchiveList, ArchiveObject, ArchiveError
from .query import ArchiveQueryError # TODO
class RequiredValidationError(Exception):
def __init__(self, msg, loc):
super().__init__(msg)
self.msg = msg
self.loc = loc
class RequiredReader:
'''
Clients can read only the required parts of an archive. They specify the required
parts with a required specification like this.
.. code-block:: python
{
"results": "include-resolved", // contains all the results and all its references
// resolved
"workflow": {
"calculation_result_ref": { // implicitly resolves the reference
"calculation_to_system_ref": "*" // short-hand for include
"calculation_to_method_ref": "include" // resolves and includes the target,
// but no references in the target
// are resolved.
"*": "*", // includes everything else ...
"section_eigenvalues": "exclude" // ... but explicitly excluded parts
}
}
}
The structure has to adheres to metainfo definitions of an archive's sub-sections and
quantities. At each point in the specification, children can be replaced with certain
directives.
The different directives are:
* include ('*' is an alias), includes whole subtree, resolves on a ref
* include-resolved, includes whole subtree, resolves all refs recursively
* exclude (in combination with wildcard keys), omits it from the result
This class allows to keep a requirement specification and use it to read with it
from given upload files and entry ids.
Attributes:
- required: The requirement specification as a python dictionary or directive string.
'''
__query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?\])?$')
def __init__(self, required: Union[dict, str], root_section_def: Section = None, resolve_inplace: bool = False):
if root_section_def is None:
from nomad import datamodel
self.root_section_def = datamodel.EntryArchive.m_def
else:
self.root_section_def = root_section_def
self.resolve_inplace = resolve_inplace
self.__result_root: dict = None
self.__archive_root: dict = None # it is actually an ArchvieReader, but we use it as dict
self.required = self.validate(required)
def __to_son(self, data):
if isinstance(data, (ArchiveList, List)):
data = [self.__to_son(item) for item in data]
elif isinstance(data, ArchiveObject):
data = data.to_dict()
return data
def __fix_index(self, index, length):
if index is None: