Commit d67cf42b authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Fixed issues with missing referenced sections in partial archive. Fixed...

Fixed issues with missing referenced sections in partial archive. Fixed missing non run content in quantities search metadata.
parent c23881c2
......@@ -2,9 +2,9 @@
# using an image that can do git, docker, docker-compose
image: gitlab-registry.mpcdf.mpg.de/nomad-lab/nomad-fair/ci-runner
# Uncomment the next lines, to run each pipline/job in its own docker environment.
# Uncomment the next lines, to run each pipeline/job in its own docker environment.
# Otherwise, it will use the docker of the gitlab runner host (e.g. enc-preprocessing...).
# This will give it access to a persitent layer cache, which will not be available
# This will give it access to a persistent layer cache, which will not be available
# with the docker service.
# services:
# - docker:dind
......
......@@ -296,7 +296,7 @@ line size ruler, etc.
"git.enableSmartCommit": true,
"eslint.autoFixOnSave": true,
"python.linting.pylintArgs": [
"--load-plugins=pylint_mongoengine",
"--load-plugins=pylint_mongoengine,nomad/metainfo/pylint_plugin",
],
"python.linting.pep8Path": "pycodestyle",
"python.linting.pep8Enabled": true,
......
......@@ -7,28 +7,35 @@ from nomad.metainfo import units
query = ArchiveQuery(
# url='http://nomad-lab.eu/prod/rae/beta/api',
query={
'dft.compound_type': 'binary',
'dft.crystal_system': 'cubic',
'dft.code_name': 'FHI-aims',
'atoms': ['O']
'$and': [
{'dft.code_name': 'VASP'},
{'$gte': {'n_atoms': 3}},
{'$lte': {'dft.workflow.section_relaxation.final_energy_difference': 1e-24}}
]
},
required={
'section_run[0]': {
'section_single_configuration_calculation[-2]': {
'energy_total': '*'
},
'section_system[-2]': '*'
'section_workflow': {
'section_relaxation': {
'final_calculation_ref': {
'energy_total': '*',
'single_configuration_calculation_to_system_ref': {
'chemical_composition_reduced': '*'
}
}
}
}
},
parallel=5,
per_page=20,
max=1000)
parallel=10,
per_page=1000,
max=10000)
print(query)
for i, result in enumerate(query):
if i < 10:
calc = result.section_workflow.section_relaxation.final_calculation_ref
energy = calc.energy_total
formula = calc.single_configuration_calculation_to_system_ref.chemical_composition_reduced
print('%s: energy %s' % (formula, energy.to(units.hartree)))
for result in query[0:100]:
run = result.section_run[0]
energy = run.section_single_configuration_calculation[0].energy_total
formula = run.section_system[0].chemical_composition_reduced
print('%s: energy %s' % (formula, energy.to(units.hartree)))
print(query)
......@@ -26,7 +26,9 @@ import orjson
import urllib.parse
from nomad.files import UploadFiles, Restricted
from nomad.archive import query_archive, ArchiveQueryError
from nomad.archive import (
query_archive, ArchiveQueryError, filter_archive, read_partial_archives_from_mongo,
compute_required_with_referenced)
from nomad import search, config
from nomad.app import common
......@@ -215,7 +217,8 @@ class ArchiveDownloadResource(Resource):
_archive_query_model = api.inherit('ArchiveSearch', search_model, {
'required': fields.Raw(description='A dictionary that defines what archive data to retrive.'),
'query_schema': fields.Raw(description='Deprecated, use required instead.'),
'raise_errors': fields.Boolean(description='Return 404 on missing archives or 500 on other errors instead of skipping the entry.')
'raise_errors': fields.Boolean(
description='Return 404 on missing archives or 500 on other errors instead of skipping the entry.')
})
......@@ -277,7 +280,8 @@ class ArchiveQueryResource(Resource):
apply_search_parameters(search_request, query)
if not aggregation:
search_request.include('calc_id', 'upload_id', 'with_embargo', 'published', 'parser_name')
search_request.include(
'calc_id', 'upload_id', 'with_embargo', 'published', 'parser_name')
if query_expression:
search_request.query_expression(query_expression)
......@@ -299,6 +303,9 @@ class ArchiveQueryResource(Resource):
calcs = results['results']
upload_files = None
current_upload_id = None
required_with_references = compute_required_with_referenced(required)
archive_is_complete = required_with_references is not None
for entry in calcs:
with_embargo = entry['with_embargo']
......@@ -309,13 +316,42 @@ class ArchiveQueryResource(Resource):
if upload_files is not None:
upload_files.close()
upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id))
upload_files = UploadFiles.get(
upload_id, create_authorization_predicate(upload_id))
if upload_files is None:
return []
if archive_is_complete:
upload_calc_ids = [
calc['calc_id'] for calc in calcs if calc['upload_id'] == upload_id]
upload_partial_archives = read_partial_archives_from_mongo(
upload_calc_ids, as_dict=True)
current_upload_id = upload_id
# TODO we are either just use the partial from mongo or read the whole required
# from the mgs-pack archive on top.
# Ideally, we would only read whats left from the msg-pack archive and merge.
if archive_is_complete:
try:
partial_archive = upload_partial_archives[calc_id]
partial_archive = filter_archive(
required_with_references, partial_archive, transform=lambda e: e)
data.append({
'calc_id': calc_id,
'parser_name': entry['parser_name'],
'archive': partial_archive})
continue
except KeyError:
pass
except ArchiveQueryError as e:
abort(400, str(e))
except Exception as e:
common.logger.error(
str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)
if with_embargo:
access = 'restricted'
upload_files._is_authorized = create_authorization_predicate(
......@@ -328,8 +364,7 @@ class ArchiveQueryResource(Resource):
data.append({
'calc_id': calc_id,
'parser_name': entry['parser_name'],
'archive': query_archive(
archive, {calc_id: required})[calc_id]
'archive': query_archive(archive, {calc_id: required})[calc_id]
})
except ArchiveQueryError as e:
abort(400, str(e))
......@@ -340,11 +375,13 @@ class ArchiveQueryResource(Resource):
pass
except Restricted:
# this should not happen
common.logger.error('supposedly unreachable code', upload_id=upload_id, calc_id=calc_id)
common.logger.error(
'supposedly unreachable code', upload_id=upload_id, calc_id=calc_id)
except Exception as e:
if raise_errors:
raise e
common.logger.error(str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)
common.logger.error(
str(e), upload_id=upload_id, calc_id=calc_id, exc_info=e)
if upload_files is not None:
upload_files.close()
......
......@@ -23,6 +23,7 @@ from flask import stream_with_context, Response, g, abort
from urllib.parse import urlencode
import pprint
import io
import json
import sys
import os.path
......@@ -176,6 +177,9 @@ def add_search_parameters(request_parser):
request_parser.add_argument(
'dft.optimade', type=str,
help='A search query in the optimade filter language.')
request_parser.add_argument(
'query', type=str,
help='A json serialized structured search query (as used in POST reuquests).')
# main search parameters
for qualified_name, quantity in search.search_quantities.items():
......@@ -227,7 +231,20 @@ def apply_search_parameters(search_request: search.SearchRequest, args: Dict[str
optimade, nomad_properties=domain, without_prefix=True)
search_request.query(q)
except filterparser.FilterException as e:
abort(400, 'Could not parse optimade query: %s' % (str(e)))
abort(400, 'Could not parse optimade query: %s' % str(e))
# search expression
query_str = args.get('query', None)
if query_str is not None:
try:
query = json.loads(query_str)
except Exception as e:
abort(400, 'Could not JSON parse query expression: %s' % str(e))
try:
search_request.query_expression(query)
except Exception as e:
abort(400, 'Invalid query expression: %s' % str(e))
# search parameter
search_request.search_parameters(**{
......
......@@ -359,7 +359,7 @@ class RepoCalcsResource(Resource):
group_metrics = [
group_quantity.metric_name
for group_name, group_quantity in search_extension.groups.items()
if data_in.get(group_name, False)]
if group_name in data_in]
total_metrics = metrics + group_metrics
if len(total_metrics) > 0:
search_request.totals(metrics_to_use=total_metrics)
......@@ -375,13 +375,13 @@ class RepoCalcsResource(Resource):
else:
for group_name, group_quantity in search_extension.groups.items():
if data_in.get(group_name, False):
if group_name in data_in:
kwargs: Dict[str, Any] = {}
if group_name == 'uploads_grouped':
kwargs.update(order_by='upload_time', order='desc')
search_request.quantity(
group_quantity.qualified_name, size=per_page, examples=1,
after=data_in.get('%s_after' % group_name, None),
after=data_in[group_name].get('after', None),
**kwargs)
results = search_request.execute_paginated(
......@@ -397,7 +397,7 @@ class RepoCalcsResource(Resource):
quantities = results.pop('quantities')
for group_name, group_quantity in search_extension.groups.items():
if data_in.get(group_name, False):
if group_name in data_in:
results[group_name] = quantities[group_quantity.qualified_name]
# build python code/curl snippet
......
......@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable, Any, Tuple, Dict, BinaryIO, Union, List, cast
from typing import Iterable, Any, Tuple, Dict, Callable, BinaryIO, Union, List, cast
from io import BytesIO, BufferedReader
from collections.abc import Mapping, Sequence
import msgpack
......@@ -22,8 +22,27 @@ import json
import math
import re
from nomad import utils
from nomad import utils, config, infrastructure
from nomad.metainfo import Quantity, Reference, Definition, MSection, Section, SubSection
from nomad.datamodel import EntryArchive
from nomad.datamodel.metainfo.public import fast_access
'''
The archive storage is made from two tiers. First the whole archive is stored in
files, secondly parts of the archive are stored in mongodb documents.
The file storage is done in msg-pack files. Each file contains the archive of many
entries (all entries of an upload). These msg-pack files contain the JSON serialized
version of the metainfo archive (see module:`nomad.metainfo`). In addition msg-pack
contains TOC information for quicker access of individual sections. See :func:`write_archive`
and :func:`read_archvive`. In addition there query functionality to partially read
specified sections from an archive: func:`query_archive`.
The mongo storage uses mongodb's native bson to store JSON serialized metainfo archive
data. Each document in mongodb holds the partial archive of single entry. Which parts
of an archive are stored in mongo is determined by the metainfo and
section annotations/categories.
'''
__packer = msgpack.Packer(autoreset=True, use_bin_type=True)
......@@ -389,7 +408,7 @@ class ArchiveReader(ArchiveObject):
raise KeyError(key)
positions = self._toc.get(key)
# TODO use hash algo instead of binary search
# TODO use hash algorithm instead of binary search
if positions is None:
r_start = 0
r_end = self._n_toc
......@@ -426,14 +445,14 @@ class ArchiveReader(ArchiveObject):
def __iter__(self):
if self.toc_entry is None:
# is not necessarely read when using blocked toc
# is not necessarily read when using blocked toc
self.toc_entry = self._read(self.toc_position)
return self.toc_entry.__iter__()
def __len__(self):
if self.toc_entry is None:
# is not necessarely read when using blocked toc
# is not necessarily read when using blocked toc
self.toc_entry = self._read(self.toc_position)
return self.toc_entry.__len__()
......@@ -499,7 +518,7 @@ def write_archive(
The top-level TOC positions are 2*5byte encoded integers. This will give the top-level TOC a
predictable layout and will allow to partially read this TOC.
The TOC of each entry will have the same structure than the data upto a certain
The TOC of each entry will have the same structure than the data up to a certain
TOC depth. A TOC object will hold the position of the object it refers to (key 'pos')
and further deeper TOC data (key 'toc'). Only data objects (dict instances) will
have TOC objects and only object count towards the TOC depth. Positions in the entry
......@@ -528,7 +547,7 @@ def read_archive(file_or_path: str, **kwargs) -> ArchiveReader:
Returns:
A mapping (dict-like) that can be used to access the archive data. The mapping
will lazyly load data as it is used. The mapping needs to be closed or used within
will lazily load data as it is used. The mapping needs to be closed or used within
a 'with' statement to free the underlying file resource after use.
'''
......@@ -538,7 +557,43 @@ def read_archive(file_or_path: str, **kwargs) -> ArchiveReader:
__query_archive_key_pattern = re.compile(r'^([\s\w\-]+)(\[([-?0-9]*)(:([-?0-9]*))?\])?$')
def query_archive(f_or_archive_reader: Union[str, ArchiveReader, BytesIO], query_dict: dict, **kwargs):
def query_archive(
f_or_archive_reader: Union[str, ArchiveReader, BytesIO], query_dict: dict,
**kwargs) -> Dict:
'''
Takes an open msg-pack based archive (either as str, reader, or BytesIO) and returns
the archive as JSON serializable dictionary filtered based on the given required
specification.
'''
def _to_son(data):
if isinstance(data, (ArchiveList, List)):
data = [_to_son(item) for item in data]
elif isinstance(data, ArchiveObject):
data = data.to_dict()
return data
def _load_data(query_dict: Dict[str, Any], archive_item: ArchiveObject) -> Dict:
query_dict_with_fixed_ids = {
adjust_uuid_size(key): value for key, value in query_dict.items()}
return filter_archive(query_dict_with_fixed_ids, archive_item, transform=_to_son)
if isinstance(f_or_archive_reader, ArchiveReader):
return _load_data(query_dict, f_or_archive_reader)
elif isinstance(f_or_archive_reader, (BytesIO, str)):
with ArchiveReader(f_or_archive_reader, **kwargs) as archive:
return _load_data(query_dict, archive)
else:
raise TypeError('%s is neither a file-like nor ArchiveReader' % f_or_archive_reader)
def filter_archive(
required: Dict[str, Any], archive_item: Union[Dict, ArchiveObject],
transform: Callable) -> Dict:
def _fix_index(index, length):
if index is None:
return index
......@@ -547,96 +602,297 @@ def query_archive(f_or_archive_reader: Union[str, ArchiveReader, BytesIO], query
else:
return min(length, index)
def _to_son(data):
if isinstance(data, (ArchiveList, List)):
data = [_to_son(item) for item in data]
if archive_item is None:
return None
elif isinstance(data, ArchiveObject):
data = data.to_dict()
if not isinstance(required, dict):
return transform(archive_item)
return data
result: Dict[str, Any] = {}
for key, val in required.items():
key = key.strip()
# process array indices
match = __query_archive_key_pattern.match(key)
index: Union[Tuple[int, int], int] = None
if match:
key = match.group(1)
def _load_data(query_dict: Dict[str, Any], archive_item: ArchiveObject, main_section: bool = False):
if not isinstance(query_dict, dict):
return _to_son(archive_item)
# check if we have indices
if match.group(2) is not None:
first_index, last_index = None, None
group = match.group(3)
first_index = None if group == '' else int(group)
result = {}
for key, val in query_dict.items():
key = key.strip()
if match.group(4) is not None:
group = match.group(5)
last_index = None if group == '' else int(group)
index = (0 if first_index is None else first_index, last_index)
# process array indices
match = __query_archive_key_pattern.match(key)
index: Union[Tuple[int, int], int] = None
if match:
key = match.group(1)
else:
index = first_index # one item
else:
index = None
# check if we have indices
if match.group(2) is not None:
first_index, last_index = None, None
group = match.group(3)
first_index = None if group == '' else int(group)
else:
raise ArchiveQueryError('invalid key format: %s' % key)
if match.group(4) is not None:
group = match.group(5)
last_index = None if group == '' else int(group)
index = (0 if first_index is None else first_index, last_index)
try:
archive_child = archive_item[key]
is_list = isinstance(archive_child, (ArchiveList, list))
else:
index = first_index # one item
if index is None and is_list:
index = (0, None)
else:
index = None
elif index is not None and not is_list:
raise ArchiveQueryError('cannot use list key on none list %s' % key)
if index is None:
pass
else:
raise ArchiveQueryError('invalid key format: %s' % key)
length = len(archive_child)
if isinstance(index, tuple):
index = (_fix_index(index[0], length), _fix_index(index[1], length))
if index[0] == index[1]:
archive_child = [archive_child[index[0]]]
else:
archive_child = archive_child[index[0]: index[1]]
else:
archive_child = [archive_child[_fix_index(index, length)]]
# support for shorter uuids
if main_section:
archive_key = adjust_uuid_size(key)
if isinstance(archive_child, (ArchiveList, list)):
result[key] = [
filter_archive(val, item, transform=transform)
for item in archive_child]
else:
archive_key = key
result[key] = filter_archive(val, archive_child, transform=transform)
try:
archive_child = archive_item[archive_key]
is_list = isinstance(archive_child, (ArchiveList, list))
except (KeyError, IndexError):
continue
if index is None and is_list:
index = (0, None)
elif index is not None and not is_list:
raise ArchiveQueryError('cannot use list key on none list %s' % key)
return result
if index is None:
pass
else:
length = len(archive_child)
if isinstance(index, tuple):
index = (_fix_index(index[0], length), _fix_index(index[1], length))
if index[0] == index[1]:
archive_child = [archive_child[index[0]]]
else:
archive_child = archive_child[index[0]: index[1]]
else:
archive_child = [archive_child[_fix_index(index, length)]]
if isinstance(archive_child, (ArchiveList, list)):
result[key] = [_load_data(val, item) for item in archive_child]
else:
result[key] = _load_data(val, archive_child)
def create_partial_archive(archive: EntryArchive) -> Dict:
'''
Creates a partial archive JSON serializable dict that can be stored directly.
The given archive is filtered based on the metainfo category ``fast_access``.
Selected sections and other data that they reference (recursively) comprise the
resulting partial archive.
TODO at the moment is hard coded and NOT informed by the metainfo. We simply
add sections EntryMetadata and Workflow.
Arguments:
archive: The archive as an :class:`EntryArchive` instance.
except (KeyError, IndexError):
continue
Returns: the partial archive in JSON serializable dictionary form.
'''
# A list with all referenced sections that might not yet been ensured to be in the
# resulting partial archive
referenceds: List[MSection] = []
# contents keeps track of all sections in the partial archive by keeping their
# JSON serializable form and placeholder status in a dict
contents: Dict[MSection, Tuple[dict, bool]] = dict()
def partial(definition: Definition, section: MSection) -> bool:
'''
``m_to_dict`` partial function that selects what goes into the partial archive
and what not. It also collects references as a side-effect.
'''
if section.m_def == EntryArchive.m_def:
if definition.m_def == Quantity:
return True
return fast_access.m_def in definition.categories
if isinstance(definition, Quantity) and isinstance(definition.type, Reference) \
and fast_access.m_def in definition.categories:
# Reference list in partial archives are not supported
if definition.is_scalar:
referenced = getattr(section, definition.name)
if referenced is not None and referenced not in contents:
referenceds.append(referenced)
return True
# add the main content
partial_contents = archive.m_to_dict(partial=partial)
# add the referenced data
def add(section, placeholder=False) -> dict:
'''
Adds the given section to partial_contents at the right place. If not a placeholder,
the section's serialization is added (or replacing an existing placeholder).
Otherwise, an empty dict is added as a placeholder for referenced children.
'''
result: Dict[str, Any] = None
content, content_is_placeholder = contents.get(section, (None, True))
if content is not None:
if content_is_placeholder and not placeholder:
# the placeholder gets replaced later
pass
else:
return content
if section.m_parent is None:
contents[section] = partial_contents, False
return partial_contents
parent_dict = add(section.m_parent, placeholder=True)
if placeholder:
result = {}
else:
result = section.m_to_dict(partial=partial)
sub_section = section.m_parent_sub_section
if sub_section.repeats:
sections = parent_dict.setdefault(sub_section.name, [])
while len(sections) < section.m_parent_index + 1:
sections.append(None)
sections[section.m_parent_index] = result
else:
parent_dict[sub_section.name] = result
contents[section] = result, placeholder
return result
if isinstance(f_or_archive_reader, ArchiveReader):
return _load_data(query_dict, f_or_archive_reader, True)
# we add referenced objects as long as they are added by subsequent serialization
# of referenced sections to implement the recursive nature of further references in
# already referenced sections.
while len(referenceds) > 0: