Commit 2a79ac54 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added new RequiredReader to v1 archive API. Some fixes to RequiredReader. #519

parent d1e880e4
Pipeline #97944 passed with stages
in 22 minutes and 52 seconds
......@@ -891,9 +891,9 @@ _archive_required_field = Body(
The `required` part allows you to specify what parts of the requested archives
should be returned. The NOMAD Archive is a hierarchical data format and
you can *require* certain branches (i.e. *sections*) in the hierarchy.
By specifing certain sections with specific contents or all contents (via `"*"`),
you can determine what sections and what quantities should be returned.
The default is everything: `"*"`.
By specifing certain sections with specific contents or all contents (via
the directive `"*"`), you can determine what sections and what quantities should
be returned. The default is the whole archive, i.e., `"*"`.
For example to specify that you are only interested in the `section_metadata`
use:
......@@ -931,16 +931,43 @@ _archive_required_field = Body(
calculation:
```
{
'section_workflow': {
'calculation_result_ref': {
'energy_total': '*',
'single_configuration_calculation_to_system_ref': {
'chemical_composition_reduced': '*'
"section_workflow": {
"calculation_result_ref": {
"energy_total": "*",
"single_configuration_calculation_to_system_ref": {
"chemical_composition_reduced": "*"
}
}
}
}
```
You can also resolve all references in a branch with the `include-resolved`
directive. This will resolve all references in the branch, and also all references
in referenced sections:
```
{
"section_workflow":
"calculation_result_ref": "include-resolved"
}
}
```
By default, the targets of "resolved" references are added to the archive at
their original hierarchy positions.
This means, all references are still references, but they are resolvable within
the returned data, since they targets are now part of the data. Another option
is to add
`"resolve-inplace": true` to the root of required. Here, the reference targets will
replace the references:
```
{
"resolve-inplace": true,
"section_workflow":
"calculation_result_ref": "include-resolved"
}
}
```
'''),
example={
'section_run': {
......
......@@ -16,7 +16,7 @@
# limitations under the License.
#
from typing import Optional, Union, Dict, Iterator, Any, List, Set, IO, cast
from typing import Optional, Union, Dict, Iterator, Any, List, Set, IO
from fastapi import APIRouter, Depends, Path, status, HTTPException, Request, Query as QueryParameter
from fastapi.responses import StreamingResponse
import os.path
......@@ -29,9 +29,7 @@ import lzma
from nomad import search, files, config, utils
from nomad.utils import strip
from nomad.archive import (
query_archive, ArchiveQueryError, compute_required_with_referenced,
read_partial_archives_from_mongo, filter_archive)
from nomad.archive import RequiredReader, RequiredValidationError, ArchiveQueryError
from .auth import get_optional_user
from ..utils import create_streamed_zipfile, File, create_responses
......@@ -446,7 +444,7 @@ async def get_entries_raw_download(
owner=with_query.owner, query=with_query.query, files=files, user=user)
def _read_archive(entry_metadata, uploads, required):
def _read_archive(entry_metadata, uploads, required_reader: RequiredReader):
calc_id = entry_metadata['calc_id']
upload_id = entry_metadata['upload_id']
upload_files = uploads.get_upload_files(upload_id)
......@@ -457,12 +455,20 @@ def _read_archive(entry_metadata, uploads, required):
'calc_id': calc_id,
'upload_id': upload_id,
'parser_name': entry_metadata['parser_name'],
'archive': query_archive(archive, {calc_id: required})[calc_id]
'archive': required_reader.read(archive, calc_id)
}
except ArchiveQueryError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
def _validate_required(required: ArchiveRequired) -> RequiredReader:
try:
return RequiredReader(required)
except RequiredValidationError as e:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=[dict(
msg=e.msg, loc=['required'] + e.loc)])
def _answer_entries_archive_request(
owner: Owner, query: Query, pagination: EntryPagination, required: ArchiveRequired,
user: User):
......@@ -476,11 +482,7 @@ def _answer_entries_archive_request(
if required is None:
required = '*'
try:
required_with_references = compute_required_with_referenced(required)
except KeyError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=(
'The required specification contains an unknown quantity or section: %s' % str(e)))
required_reader = _validate_required(required)
search_response = perform_search(
owner=owner, query=query,
......@@ -488,32 +490,14 @@ def _answer_entries_archive_request(
required=MetadataRequired(include=['calc_id', 'upload_id', 'parser_name']),
user_id=user.user_id if user is not None else None)
if required_with_references is not None:
# We can produce all the required archive data from the partial archives stored
# in mongodb.
entry_ids = [entry['calc_id'] for entry in search_response.data]
partial_archives = cast(dict, read_partial_archives_from_mongo(entry_ids, as_dict=True))
uploads = _Uploads()
response_data = {}
for entry_metadata in search_response.data:
calc_id, upload_id = entry_metadata['calc_id'], entry_metadata['upload_id']
archive_data = None
if required_with_references is not None:
try:
partial_archive = partial_archives[calc_id]
archive_data = filter_archive(required, partial_archive, transform=lambda e: e)
except KeyError:
# the partial archive might not exist, e.g. due to processing problems
pass
except ArchiveQueryError as e:
detail = 'The required specification could not be understood: %s' % str(e)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
if archive_data is None:
try:
archive_data = _read_archive(entry_metadata, uploads, required)['archive']
archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive']
except KeyError as e:
logger.error('missing archive', exc_info=e, calc_id=calc_id)
continue
......@@ -609,13 +593,15 @@ def _answer_entries_archive_download_request(
manifest = []
search_includes = ['calc_id', 'upload_id', 'parser_name']
required_reader = RequiredReader('*')
# a generator of File objects to create the streamed zip from
def file_generator():
# go through all entries that match the query
for entry_metadata in _do_exaustive_search(owner, query, include=search_includes, user=user):
path = os.path.join(entry_metadata['upload_id'], '%s.json' % entry_metadata['calc_id'])
try:
archive_data = _read_archive(entry_metadata, uploads, '*')
archive_data = _read_archive(entry_metadata, uploads, required_reader)
f = io.BytesIO(orjson.dumps(
archive_data, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS))
......@@ -881,11 +867,7 @@ async def get_entry_raw_download_file(
def _answer_entry_archive_request(entry_id: str, required: ArchiveRequired, user: User):
try:
required_with_references = compute_required_with_referenced(required)
except KeyError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=(
'The required specification contains an unknown quantity or section: %s' % str(e)))
required_reader = _validate_required(required)
query = dict(calc_id=entry_id)
response = perform_search(
......@@ -900,28 +882,10 @@ def _answer_entry_archive_request(entry_id: str, required: ArchiveRequired, user
entry_metadata = response.data[0]
if required_with_references is not None:
# We can produce all the required archive data from the partial archives stored
# in mongodb.
partial_archives = cast(dict, read_partial_archives_from_mongo([entry_id], as_dict=True))
uploads = _Uploads()
try:
archive_data = None
if required_with_references is not None:
try:
partial_archive = partial_archives[entry_id]
archive_data = filter_archive(required, partial_archive, transform=lambda e: e)
except KeyError:
# the partial archive might not exist, e.g. due to processing problems
pass
except ArchiveQueryError as e:
detail = 'The required specification could not be understood: %s' % str(e)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=detail)
if archive_data is None:
try:
archive_data = _read_archive(entry_metadata, uploads, required=required)['archive']
archive_data = _read_archive(entry_metadata, uploads, required_reader)['archive']
except KeyError:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
......
......@@ -78,12 +78,12 @@ class RequiredReader:
self.root_section_def = datamodel.EntryArchive.m_def
else:
self.root_section_def = root_section_def
self.resolve_inplace = resolve_inplace
self.__result_root: dict = None
self.__archive_root: dict = None # it is actually an ArchvieReader, but we use it as dict
self.required = self.validate(required)
self.resolve_inplace = resolve_inplace
self.required = self.validate(required, is_root=True)
def __to_son(self, data):
if isinstance(data, (ArchiveList, List)):
......@@ -103,8 +103,8 @@ class RequiredReader:
return min(length, index)
def validate(
self, required: Union[str, dict] = None, definition: Definition = None,
loc: list = None) -> dict:
self, required: Union[str, dict], definition: Definition = None,
loc: list = None, is_root=False) -> dict:
'''
Validates the required specification of this instance. It will replace all
string directives with dicts. Those will have keys `_def` and `_directive`. It
......@@ -118,8 +118,14 @@ class RequiredReader:
raises:
- RequiredValidationError
'''
if required is None:
required = self.required
if is_root and isinstance(required, dict):
resolve_inplace = required.get('resolve-inplace', None)
if isinstance(resolve_inplace, bool):
self.resolve_inplace = resolve_inplace
elif resolve_inplace is not None:
raise RequiredValidationError(
'resolve-inplace is not a bool', ['resolve-inplace'])
if definition is None:
definition = self.root_section_def
if loc is None:
......@@ -150,8 +156,14 @@ class RequiredReader:
result: Dict[str, Any] = dict(_def=definition)
for key, value in cast(dict, required).items():
if key == 'resolve-inplace':
continue
loc.append(key)
try:
prop, index = self.__parse_required_key(key)
except Exception:
raise RequiredValidationError(f'invalid key format {key}', loc)
if prop == '*':
# TODO support wildcards
raise RequiredValidationError('wildcard (*) keys are not supported yet', loc)
......@@ -209,19 +221,21 @@ class RequiredReader:
index = None
else:
raise ArchiveQueryError('invalid key format: %s' % key)
raise Exception('invalid key format: %s' % key)
return key, index
def __resolve_refs(self, archive: dict, section_def: Section) -> dict:
''' Resolves all references in archive. '''
archive = self.__to_son(archive)
result = {}
for prop, value in archive.items():
for prop in archive:
value = archive[prop]
prop_def = section_def.all_properties[prop]
handle_item: Callable[[Any], Any] = None
if isinstance(prop_def, SubSection):
handle_item = lambda value: self.__resolve_refs(value, prop_def.sub_section.m_resolved())
elif isinstance(prop_def.type, Reference):
elif isinstance(prop_def.type, Reference) and not isinstance(prop_def.type, QuantityReference):
target_section_def = prop_def.type.target_section_def.m_resolved()
required = dict(_directive='include-resolved', _def=target_section_def)
handle_item = lambda value: self.__resolve_ref(required, value)
......@@ -242,7 +256,9 @@ class RequiredReader:
# TODO the metainfo should also provide this implementation
# resolve to archive_item
assert path.startswith('/'), 'only absolute references are supported'
if not path.startswith('/'):
# TODO support custom reference resolution, e.g. user_id based
return path
resolved = self.__archive_root
path_stack = path.strip('/').split('/')
......
......@@ -750,9 +750,9 @@ def test_entries_archive_download(client, data, query, files, entries, status_co
@pytest.mark.parametrize('required, status_code', [
pytest.param('*', 200, id='full'),
pytest.param({'section_metadata': '*'}, 200, id='partial'),
pytest.param({'section_run': {'section_system[NOTANINT]': '*'}}, 400, id='bad-required-1'),
pytest.param({'section_metadata': {'owners[NOTANINT]': '*'}}, 400, id='bad-required-2'),
pytest.param({'DOESNOTEXIST': '*'}, 400, id='bad-required-3')
pytest.param({'section_run': {'section_system[NOTANINT]': '*'}}, 422, id='bad-required-1'),
pytest.param({'section_metadata': {'owners[NOTANINT]': '*'}}, 422, id='bad-required-2'),
pytest.param({'DOESNOTEXIST': '*'}, 422, id='bad-required-3')
])
def test_entries_archive(client, data, required, status_code):
perform_entries_archive_test(
......@@ -774,9 +774,11 @@ def test_entry_archive(client, data, entry_id, status_code):
pytest.param('id_01', '*', 200, id='full'),
pytest.param('id_02', '*', 404, id='404'),
pytest.param('id_01', {'section_metadata': '*'}, 200, id='partial'),
pytest.param('id_01', {'section_run': {'section_system[NOTANINT]': '*'}}, 400, id='bad-required-1'),
pytest.param('id_01', {'section_metadata': {'owners[NOTANINT]': '*'}}, 400, id='bad-required-2'),
pytest.param('id_01', {'DOESNOTEXIST': '*'}, 400, id='bad-required-3')
pytest.param('id_01', {'section_run': {'section_system[NOTANINT]': '*'}}, 422, id='bad-required-1'),
pytest.param('id_01', {'section_metadata': {'owners[NOTANINT]': '*'}}, 422, id='bad-required-2'),
pytest.param('id_01', {'DOESNOTEXIST': '*'}, 422, id='bad-required-3'),
pytest.param('id_01', {'resolve-inplace': 'NotBool', 'section_workflow': '*'}, 422, id='bad-required-4'),
pytest.param('id_01', {'resolve-inplace': True, 'section_metadata': 'include-resolved'}, 200, id='resolve-inplace')
])
def test_entry_archive_query(client, data, entry_id, required, status_code):
response = client.post('entries/%s/archive/query' % entry_id, json={
......
......@@ -356,7 +356,12 @@ def archive():
'energy_total': '*'
}
}
}, None, id='resolve'),
}, None, id='resolve-with-required'),
pytest.param({
'section_workflow': {
'calculation_result_ref': 'include-resolved'
}
}, None, id='resolve-with-directive'),
pytest.param({
'section_workflow': 'include-resolved'
}, None, id='include-resolved'),
......@@ -377,7 +382,7 @@ def archive():
])
def test_required_reader(archive, required, error, resolve_inplace):
f = BytesIO()
write_archive(f, 1, [('entry_id', archive.m_to_dict())], entry_toc_depth=1)
write_archive(f, 1, [('entry_id', archive.m_to_dict())], entry_toc_depth=2)
packed_archive = f.getbuffer()
archive_reader = ArchiveReader(BytesIO(packed_archive))
......@@ -402,7 +407,6 @@ def assert_required_results(
Asserts if the resulting dict from a :class:`RequiredReader` contains everything that
was requested and if this is consistent with the archive that was read from.
'''
# initialize recursion
if current_archive_serialized is None:
current_archive_serialized = archive.m_to_dict()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment