Commit c112c62f authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'v1-archive-required' into 'v0.10.2'

V1 archive required

See merge request !308
parents 665eb802 2a79ac54
Pipeline #97948 passed with stages
in 23 minutes and 10 seconds
......@@ -891,9 +891,9 @@ _archive_required_field = Body(
The `required` part allows you to specify what parts of the requested archives
should be returned. The NOMAD Archive is a hierarchical data format and
you can *require* certain branches (i.e. *sections*) in the hierarchy.
By specifing certain sections with specific contents or all contents (via `"*"`),
you can determine what sections and what quantities should be returned.
The default is everything: `"*"`.
By specifing certain sections with specific contents or all contents (via
the directive `"*"`), you can determine what sections and what quantities should
be returned. The default is the whole archive, i.e., `"*"`.
For example to specify that you are only interested in the `section_metadata`
use:
......@@ -931,16 +931,43 @@ _archive_required_field = Body(
calculation:
```
{
'section_workflow': {
'calculation_result_ref': {
'energy_total': '*',
'single_configuration_calculation_to_system_ref': {
'chemical_composition_reduced': '*'
"section_workflow": {
"calculation_result_ref": {
"energy_total": "*",
"single_configuration_calculation_to_system_ref": {
"chemical_composition_reduced": "*"
}
}
}
}
```
You can also resolve all references in a branch with the `include-resolved`
directive. This will resolve all references in the branch, and also all references
in referenced sections:
```
{
"section_workflow":
"calculation_result_ref": "include-resolved"
}
}
```
By default, the targets of "resolved" references are added to the archive at
their original hierarchy positions.
This means, all references are still references, but they are resolvable within
the returned data, since they targets are now part of the data. Another option
is to add
`"resolve-inplace": true` to the root of required. Here, the reference targets will
replace the references:
```
{
"resolve-inplace": true,
"section_workflow":
"calculation_result_ref": "include-resolved"
}
}
```
'''),
example={
'section_run': {
......
......@@ -16,7 +16,7 @@
# limitations under the License.
#
from typing import Optional, Union, Dict, Iterator, Any, List, Set, IO, cast
from typing import Optional, Union, Dict, Iterator, Any, List, Set, IO
from fastapi import APIRouter, Depends, Path, status, HTTPException, Request, Query as QueryParameter
from fastapi.responses import StreamingResponse
import os.path
......@@ -29,9 +29,7 @@ import lzma
from nomad import search, files, config, utils
from nomad.utils import strip
from nomad.archive import (
query_archive, ArchiveQueryError, compute_required_with_referenced,
read_partial_archives_from_mongo, filter_archive)
from nomad.archive import RequiredReader, RequiredValidationError, ArchiveQueryError
from .auth import get_optional_user
from ..utils import create_streamed_zipfile, File, create_responses
......@@ -446,7 +444,7 @@ async def get_entries_raw_download(
owner=with_query.owner, query=with_query.query, files=files, user=user)
def _read_archive(entry_metadata, uploads, required):
def _read_archive(entry_metadata, uploads, required_reader: RequiredReader):
calc_id = entry_metadata['calc_id']
upload_id = entry_metadata['upload_id']
upload_files = uploads.get_upload_files(upload_id)
......@@ -457,12 +455,20 @@ def _read_archive(entry_metadata, uploads, required):
'calc_id': calc_id,
'upload_id': upload_id,
'parser_name': entry_metadata['parser_name'],
'archive': query_archive(archive, {calc_id: required})[calc_id]
'archive': required_reader.read(archive, calc_id)
}
except ArchiveQueryError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
def _validate_required(required: ArchiveRequired) -> RequiredReader:
try:
return RequiredReader(required)
except RequiredValidationError as e:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=[dict(
msg=e.msg, loc=['required'] + e.loc)])
def _answer_entries_archive_request(
owner: Owner, query: Query, pagination: EntryPagination, required: ArchiveRequired,
user: User):
......@@ -476,11 +482,7 @@ def _answer_entries_archive_request(
if required is None:
required = '*'
try:
required_with_references = compute_required_with_referenced(required)
except KeyError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=(
'The required specification contains an unknown quantity or section: %s' % str(e)))
required_reader = _validate_required(required)
search_response = perform_search(
owner=owner, query=query,
......@@ -488,35 +490,17 @@ def _answer_entries_archive_request(
required=MetadataRequired(include=['calc_id', 'upload_id', 'parser_name']),
user_id=user.user_id if user is not None else None)
if required_with_references is not None:
# We can produce all the required archive data from the partial archives stored
# in mongodb.
entry_ids = [entry['calc_id'] for entry in search_response.data]
partial_archives = cast(dict, read_partial_archives_from_mongo(entry_ids, as_dict=True))
uploads = _Uploads()
response_data = {}
for entry_metadata in search_response.data:
calc_id, upload_id = entry_metadata['calc_id'], entry_metadata['upload_id']
archive_data = None
if required_with_references is not None:
try:
partial_archive = partial_archives[calc_id]
archive_data = filter_archive(required, partial_archive, transform=lambda e: e)
except KeyError:
# the partial archive might not exist, e.g. due to processing problems
pass
except ArchiveQueryError as e:
detail = 'The required specification could not be understood: %s' % str(e)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
if archive_data is None:
try:
archive_data = _read_archive(entry_metadata, uploads, required)['archive']
except KeyError as e:
logger.error('missing archive', exc_info=e, calc_id=calc_id)
continue
try:
archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive']
except KeyError as e:
logger.error('missing archive', exc_info=e, calc_id=calc_id)
continue
response_data[calc_id] = {
'calc_id': calc_id,
......@@ -609,13 +593,15 @@ def _answer_entries_archive_download_request(
manifest = []
search_includes = ['calc_id', 'upload_id', 'parser_name']
required_reader = RequiredReader('*')
# a generator of File objects to create the streamed zip from
def file_generator():
# go through all entries that match the query
for entry_metadata in _do_exaustive_search(owner, query, include=search_includes, user=user):
path = os.path.join(entry_metadata['upload_id'], '%s.json' % entry_metadata['calc_id'])
try:
archive_data = _read_archive(entry_metadata, uploads, '*')
archive_data = _read_archive(entry_metadata, uploads, required_reader)
f = io.BytesIO(orjson.dumps(
archive_data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
......@@ -881,11 +867,7 @@ async def get_entry_raw_download_file(
def _answer_entry_archive_request(entry_id: str, required: ArchiveRequired, user: User):
try:
required_with_references = compute_required_with_referenced(required)
except KeyError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=(
'The required specification contains an unknown quantity or section: %s' % str(e)))
required_reader = _validate_required(required)
query = dict(calc_id=entry_id)
response = perform_search(
......@@ -900,32 +882,14 @@ def _answer_entry_archive_request(entry_id: str, required: ArchiveRequired, user
entry_metadata = response.data[0]
if required_with_references is not None:
# We can produce all the required archive data from the partial archives stored
# in mongodb.
partial_archives = cast(dict, read_partial_archives_from_mongo([entry_id], as_dict=True))
uploads = _Uploads()
try:
archive_data = None
if required_with_references is not None:
try:
partial_archive = partial_archives[entry_id]
archive_data = filter_archive(required, partial_archive, transform=lambda e: e)
except KeyError:
# the partial archive might not exist, e.g. due to processing problems
pass
except ArchiveQueryError as e:
detail = 'The required specification could not be understood: %s' % str(e)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
if archive_data is None:
try:
archive_data = _read_archive(entry_metadata, uploads, required=required)['archive']
except KeyError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='The entry with the given id does exist, but it has no archive.')
try:
archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive']
except KeyError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail='The entry with the given id does exist, but it has no archive.')
return {
'entry_id': entry_id,
......
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
'''
The archive storage is made from two tiers. First the whole archive is stored in
files, secondly parts of the archive are stored in mongodb documents.
The file storage is done in msg-pack files. Each file contains the archive of many
entries (all entries of an upload). These msg-pack files contain the JSON serialized
version of the metainfo archive (see module:`nomad.metainfo`). In addition msg-pack
contains TOC information for quicker access of individual sections. See :func:`write_archive`
and :func:`read_archvive`. In addition there query functionality to partially read
specified sections from an archive: func:`query_archive`.
The mongo storage uses mongodb's native bson to store JSON serialized metainfo archive
data. Each document in mongodb holds the partial archive of single entry. Which parts
of an archive are stored in mongo is determined by the metainfo and
section annotations/categories.
'''
from .storage import (
write_archive, read_archive, ArchiveError, ArchiveReader, ArchiveWriter,
ArchiveObject, ArchiveList, ArchiveItem)
from .query import query_archive, filter_archive, ArchiveQueryError
from .partial import (
read_partial_archive_from_mongo, read_partial_archives_from_mongo,
write_partial_archive_to_mongo, delete_partial_archives_from_mongo,
create_partial_archive, compute_required_with_referenced)
from .required import RequiredReader, RequiredValidationError
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Tuple, Dict, Union, List
from nomad import infrastructure, config
from nomad.metainfo import MSection, Definition, Quantity, Reference, SubSection, Section
from nomad.datamodel import EntryArchive
from nomad.datamodel.metainfo.common import FastAccess
def create_partial_archive(archive: EntryArchive) -> Dict:
'''
Creates a partial archive JSON serializable dict that can be stored directly.
The given archive is filtered based on the metainfo category ``FastAccess``.
Selected sections and other data that they reference (recursively) comprise the
resulting partial archive.
TODO at the moment is hard coded and NOT informed by the metainfo. We simply
add sections EntryMetadata and Workflow.
Arguments:
archive: The archive as an :class:`EntryArchive` instance.
Returns: the partial archive in JSON serializable dictionary form.
'''
# A list with all referenced sections that might not yet been ensured to be in the
# resulting partial archive
referenceds: List[MSection] = []
# contents keeps track of all sections in the partial archive by keeping their
# JSON serializable form and placeholder status in a dict
contents: Dict[MSection, Tuple[dict, bool]] = dict()
def partial(definition: Definition, section: MSection) -> bool:
'''
``m_to_dict`` partial function that selects what goes into the partial archive
and what not. It also collects references as a side-effect.
'''
if section.m_def == EntryArchive.m_def:
if definition.m_def == Quantity:
return True
return FastAccess.m_def in definition.categories
if isinstance(definition, Quantity) and isinstance(definition.type, Reference) \
and FastAccess.m_def in definition.categories:
# Reference list in partial archives are not supported
if definition.is_scalar:
referenced = getattr(section, definition.name)
if referenced is not None and referenced not in contents:
referenceds.append(referenced)
if isinstance(definition, SubSection):
return FastAccess.m_def in definition.categories
return True
# add the main content
partial_contents = archive.m_to_dict(partial=partial)
# add the referenced data
def add(section, placeholder=False) -> dict:
'''
Adds the given section to partial_contents at the right place. If not a placeholder,
the section's serialization is added (or replacing an existing placeholder).
Otherwise, an empty dict is added as a placeholder for referenced children.
'''
result: Dict[str, Any] = None
content, content_is_placeholder = contents.get(section, (None, True))
if content is not None:
if content_is_placeholder and not placeholder:
# the placeholder gets replaced later
pass
else:
return content
if section.m_parent is None:
contents[section] = partial_contents, False
return partial_contents
parent_dict = add(section.m_parent, placeholder=True)
if placeholder:
result = {}
else:
result = section.m_to_dict(partial=partial)
sub_section = section.m_parent_sub_section
if sub_section.repeats:
sections = parent_dict.setdefault(sub_section.name, [])
while len(sections) < section.m_parent_index + 1:
sections.append(None)
sections[section.m_parent_index] = result
else:
parent_dict[sub_section.name] = result
contents[section] = result, placeholder
return result
# we add referenced objects as long as they are added by subsequent serialization
# of referenced sections to implement the recursive nature of further references in
# already referenced sections.
while len(referenceds) > 0:
referenced = referenceds.pop()
add(referenced)
return partial_contents
def write_partial_archive_to_mongo(archive: EntryArchive):
''' Partially writes the given archive to mongodb. '''
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
mongo_id = archive.section_metadata.calc_id
partial_archive_dict = create_partial_archive(archive)
partial_archive_dict['_id'] = mongo_id
mongo_collection.replace_one(dict(_id=mongo_id), partial_archive_dict, upsert=True)
def read_partial_archive_from_mongo(entry_id: str, as_dict=False) -> Union[EntryArchive, Dict]:
'''
Reads the partial archive for the given id from mongodb.
Arguments:
entry_id: The entry id for the entry.
as_dict: Return the JSON serializable dictionary form of the archive not the
:class:`EntryArchive` form.
'''
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
archive_dict = mongo_collection.find_one(dict(_id=entry_id))
if as_dict:
return archive_dict
return EntryArchive.m_from_dict(archive_dict)
def delete_partial_archives_from_mongo(entry_ids: List[str]):
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
mongo_collection.delete_many(dict(_id={'$in': entry_ids}))
def read_partial_archives_from_mongo(entry_ids: List[str], as_dict=False) -> Dict[str, Union[EntryArchive, Dict]]:
'''
Reads the partial archives for a set of entries.
Arguments:
entry_ids: A list of entry ids.
as_dict: Return the JSON serializable dictionary form of the archive not the
:class:`EntryArchive` form.
Returns:
A dictionary with entry_ids as keys.
'''
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
archive_dicts = mongo_collection.find(dict(_id={'$in': entry_ids}))
if as_dict:
return {archive_dict.pop('_id'): archive_dict for archive_dict in archive_dicts}
return {
archive_dict.pop('_id'): EntryArchive.m_from_dict(archive_dict)
for archive_dict in archive_dicts}
__all_parent_sections: Dict[Section, Tuple[str, Section]] = {}
def _all_parent_sections():
if len(__all_parent_sections) == 0:
def add(section):
for sub_section in section.all_sub_sections.values():
sub_section_section = sub_section.sub_section.m_resolved()
__all_parent_sections.setdefault(sub_section_section, []).append((sub_section.qualified_name(), section, ))
add(sub_section_section)
add(EntryArchive.m_def)
return __all_parent_sections
class _Incomplete(Exception): pass
def compute_required_with_referenced(required):
'''
Updates the given required dictionary to ensure that references to non required
sections within a partial fast access archive are included. Only references that
are directly contained in required are added. References from wildcarded sub sections
are ignored.
Returns: A new required dict or None. None is returned if it is unclear if the required
is only accessing information of fast access partial archives.
'''
# TODO this function should be based on the metainfo
if not isinstance(required, dict):
return None
if any(key.startswith('section_run') for key in required):
return None
required = dict(**required)
def add_parent_section(section, child_required):
parent_sections = _all_parent_sections().get(section, [])
if len(parent_sections) == 0:
return [required]
result = []
for name, parent_section in parent_sections:
child_key = name.split('.')[-1]
for parent_required in add_parent_section(parent_section, None):
result.append(parent_required.setdefault(child_key, child_required if child_required else {}))
return result
def traverse(
current: Union[dict, str],
parent: Section = EntryArchive.m_def):
if isinstance(current, str):
return
current_updates = {}
for key, value in current.items():
prop = key.split('[')[0]
prop_definition = parent.all_properties[prop]
if isinstance(prop_definition, SubSection):
if FastAccess.m_def not in prop_definition.categories:
raise _Incomplete()
traverse(value, prop_definition.sub_section)
if isinstance(prop_definition, Quantity) and isinstance(prop_definition.type, Reference):
current_updates[prop] = '*'
if FastAccess.m_def not in prop_definition.categories:
continue
target_section_def = prop_definition.type.target_section_def.m_resolved()
# TODO is this a bug, should the result of the call not be used to traverse?
add_parent_section(target_section_def, value)
traverse(value, target_section_def)
current.update(**current_updates)
try:
traverse(dict(**required))
except _Incomplete:
# We realized that something is required that is not in the partial archive
return None
return required
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Tuple, Dict, Callable, Union, List
from io import BytesIO
import re
from nomad import utils
from .storage import ArchiveReader, ArchiveList, ArchiveObject
class ArchiveQueryError(Exception):
'''
An error that indicates that an archive query is either not valid or does not fit to
the queried archive.
'''
pass
__query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?\])?$')
def query_archive(
f_or_archive_reader: Union[str, ArchiveReader, BytesIO], query_dict: dict,
**kwargs) -> Dict:
'''
Takes an open msg-pack based archive (either as str, reader, or BytesIO) and returns
the archive as JSON serializable dictionary filtered based on the given required
(query_dict) specification.
Required example with some "required" (query_dict) features:
.. code-block:: json
{
"results": "recursive-resolve-inplace",
"workflow": {
"final": "resolve"
},
"simulation": {
"scc[-1]": {
"method": "recursive-resolve"
"system": {
"symmetry": "exclude",
"*": "include"
}
"dos": "recursive-include"
}
}
}
The different directives are: </