Commit ae1d14dc authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactored the search API and adopted GUI accordingly.

parent 7331d5e1
Pipeline #52253 failed with stages
in 6 minutes and 35 seconds
......@@ -7,7 +7,7 @@ import QuantityHistogram from '../search/QuantityHistogram'
class DFTSearchAggregations extends React.Component {
static propTypes = {
classes: PropTypes.object.isRequired,
aggregations: PropTypes.object.isRequired,
quantities: PropTypes.object.isRequired,
metric: PropTypes.string.isRequired,
searchValues: PropTypes.object.isRequired,
onChange: PropTypes.func.isRequired
......@@ -43,11 +43,11 @@ class DFTSearchAggregations extends React.Component {
}
render() {
const { classes, aggregations, metric, searchValues } = this.props
const { classes, quantities, metric, searchValues } = this.props
const quantity = (key, title) => (<QuantityHistogram
classes={{root: classes.quantity}} title={title || key} width={300}
data={aggregations[key]} metric={metric}
data={quantities[key]} metric={metric}
value={searchValues[key]}
onChanged={(selection) => this.handleQuantityChanged(key, selection)}/>)
......@@ -56,7 +56,7 @@ class DFTSearchAggregations extends React.Component {
<Card>
<CardContent>
<PeriodicTable
aggregations={aggregations.atoms} metric={metric}
aggregations={quantities.atoms} metric={metric}
values={searchValues.atoms || []}
onChanged={(selection) => this.handleAtomsChanged(selection)}
/>
......
......@@ -7,7 +7,7 @@ import QuantityHistogram from '../search/QuantityHistogram'
class EMSSearchAggregations extends React.Component {
static propTypes = {
classes: PropTypes.object.isRequired,
aggregations: PropTypes.object.isRequired,
quantities: PropTypes.object.isRequired,
metric: PropTypes.string.isRequired,
searchValues: PropTypes.object.isRequired,
onChange: PropTypes.func.isRequired
......@@ -43,11 +43,11 @@ class EMSSearchAggregations extends React.Component {
}
render() {
const { classes, aggregations, metric, searchValues } = this.props
const { classes, quantities, metric, searchValues } = this.props
const quantity = (key, title) => (<QuantityHistogram
classes={{root: classes.quantity}} title={title || key} width={300}
data={aggregations[key]} metric={metric}
data={quantities[key]} metric={metric}
value={searchValues[key]}
onChanged={(selection) => this.handleQuantityChanged(key, selection)}/>)
......@@ -56,7 +56,7 @@ class EMSSearchAggregations extends React.Component {
<Card>
<CardContent>
<PeriodicTable
aggregations={aggregations.atoms} metric={metric}
aggregations={quantities.atoms} metric={metric}
values={searchValues.atoms || []}
onChanged={(selection) => this.handleAtomsChanged(selection)}
/>
......
......@@ -9,8 +9,7 @@ class SearchAggregationsUnstyled extends React.Component {
classes: PropTypes.object.isRequired,
onChange: PropTypes.func.isRequired,
data: PropTypes.object.isRequired,
total_metrics: PropTypes.arrayOf(PropTypes.string).isRequired,
aggregation_metrics: PropTypes.arrayOf(PropTypes.string).isRequired,
metrics: PropTypes.arrayOf(PropTypes.string).isRequired,
searchValues: PropTypes.object.isRequired,
domain: PropTypes.object.isRequired,
showDetails: PropTypes.bool
......@@ -28,7 +27,7 @@ class SearchAggregationsUnstyled extends React.Component {
handleMetricChange(metric) {
const metrics = metric === 'code_runs' ? [] : [metric]
this.setState({metric: metric})
this.props.onChange({total_metrics: metrics, aggregation_metrics: metrics})
this.props.onChange({metrics: metrics})
}
handleSearchChanged(searchValues) {
......@@ -36,10 +35,10 @@ class SearchAggregationsUnstyled extends React.Component {
}
render() {
const { classes, data, total_metrics, searchValues, domain, onChange, showDetails } = this.props
const { aggregations, metrics } = data
const selectedMetric = total_metrics.length === 0 ? 'code_runs' : total_metrics[0]
const useMetric = Object.keys(metrics).find(metric => metric !== 'code_runs') || 'code_runs'
const { classes, data, metrics, searchValues, domain, onChange, showDetails } = this.props
const { quantities } = data
const selectedMetric = metrics.length === 0 ? 'code_runs' : metrics[0]
const useMetric = Object.keys(quantities.total.all).find(metric => metric !== 'code_runs') || 'code_runs'
const metricsDefinitions = domain.searchMetrics
return (
......@@ -60,7 +59,7 @@ class SearchAggregationsUnstyled extends React.Component {
))}
</FormGroup>
</FormControl>
<domain.SearchAggregations aggregations={aggregations} searchValues={searchValues} metric={useMetric} onChange={onChange} />
<domain.SearchAggregations quantities={quantities} searchValues={searchValues} metric={useMetric} onChange={onChange} />
</div>
</div>
)
......@@ -70,8 +69,7 @@ class SearchAggregationsUnstyled extends React.Component {
const SearchAggregations = compose(withDomain, withStyles(SearchAggregationsUnstyled.styles))(SearchAggregationsUnstyled)
Object.assign(SearchAggregations, {
defaultState: {
aggregation_metrics: [],
total_metrics: [],
metrics: [],
searchValues: {}
}
})
......
......@@ -124,15 +124,15 @@ class SearchBar extends React.Component {
getSuggestions(value) {
value = value.toLowerCase()
const { data: { aggregations } } = this.props
const { data: { quantities } } = this.props
const suggestions = []
Object.keys(aggregations).forEach(aggKey => {
Object.keys(aggregations[aggKey]).forEach(aggValue => {
if (aggValue.toLowerCase().startsWith(value)) {
Object.keys(quantities).forEach(quantity => {
Object.keys(quantities[quantity]).forEach(quantityValue => {
if (quantityValue.toLowerCase().startsWith(value)) {
suggestions.push({
key: aggKey,
value: aggValue
key: quantity,
value: quantityValue
})
}
})
......
......@@ -64,8 +64,12 @@ class SearchPage extends React.Component {
pagination: {
total: 0
},
aggregations: {},
metrics: {}
quantities: {
total: {
all: {
}
}
}
}
state = {
......@@ -161,7 +165,7 @@ class SearchPage extends React.Component {
const { classes, user, domain, loading } = this.props
const { data, searchState, searchResultListState, showDetails } = this.state
const { searchValues } = searchState
const { pagination: { total }, metrics } = data
const { pagination: { total }, quantities } = data
const ownerLabel = {
all: 'All entries',
......@@ -179,11 +183,11 @@ class SearchPage extends React.Component {
const withoutLogin = ['all']
const useMetric = Object.keys(metrics).find(metric => metric !== 'code_runs') || 'code_runs'
const useMetric = Object.keys(quantities.total.all).find(metric => metric !== 'code_runs') || 'code_runs'
const helperText = <span>
There are {Object.keys(domain.searchMetrics).map(key => {
return (key === useMetric || key === 'code_runs') ? <span key={key}>
{domain.searchMetrics[key].renderResultString(!loading && metrics[key] !== undefined ? metrics[key] : '...')}
{domain.searchMetrics[key].renderResultString(!loading && quantities.total.all[key] !== undefined ? quantities.total.all[key] : '...')}
</span> : ''
})}{Object.keys(searchValues).length ? ' left' : ''}.
</span>
......
......@@ -70,17 +70,18 @@ class RepoCalcResource(Resource):
repo_calcs_model = api.model('RepoCalculations', {
'pagination': fields.Nested(pagination_model),
'pagination': fields.Nested(pagination_model, allow_null=True),
'scroll': fields.Nested(allow_null=True, skip_none=True, model=api.model('Scroll', {
'total': fields.Integer(description='The total amount of hits for the search.'),
'scroll_id': fields.String(allow_null=True, description='The scroll_id that can be used to retrieve the next page.'),
'size': fields.Integer(help='The size of the returned scroll page.')})),
'results': fields.List(fields.Raw, description=(
'A list of search results. Each result is a dict with quantitie names as key and '
'values as values')),
'scroll_id': fields.String(description='Id of the current scroll view in scroll based search.'),
'aggregations': fields.Raw(description=(
'quantities': fields.Raw(description=(
'A dict with all aggregations. Each aggregation is dictionary with a metrics dict as '
'value and quantity value as key. The metrics are code runs(calcs), %s. ' %
', '.join(search.metrics_names))),
'metrics': fields.Raw(description=(
'A dict with the overall metrics. The metrics are code runs(calcs), %s.' %
'value and quantity value as key. The metrics are code runs(calcs), %s. '
'There is a pseudo quantity "total" with a single value "all" that contains the metrics over all results. ' %
', '.join(search.metrics_names)))
})
......@@ -99,12 +100,8 @@ repo_request_parser.add_argument(
repo_request_parser.add_argument(
'scroll_id', type=str, help='The id of the current scrolling window to use.')
repo_request_parser.add_argument(
'total_metrics', type=str, help=(
'Metrics to aggregate all search results over.'
'Possible values are %s.' % ', '.join(search.metrics_names)))
repo_request_parser.add_argument(
'aggregation_metrics', type=str, help=(
'Metrics to aggregate all aggregation buckets over as comma separated list. '
'metrics', type=str, help=(
'Metrics to aggregate over all quantities and their values as comma separated list. '
'Possible values are %s.' % ', '.join(search.metrics_names)))
for search_quantity in search.search_quantities.keys():
......@@ -157,19 +154,15 @@ class RepoCalcsResource(Resource):
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10 if not scroll else 1000))
order = int(request.args.get('order', -1))
total_metrics_str = request.args.get('total_metrics', '')
aggregation_metrics_str = request.args.get('aggregation_metrics', '')
metrics_str = request.args.get('metrics', '')
from_time = rfc3339DateTime.parse(request.args.get('from_time', '2000-01-01'))
until_time_str = request.args.get('until_time', None)
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.now()
time_range = (from_time, until_time)
total_metrics = [
metric for metric in total_metrics_str.split(',')
if metric in search.metrics_names]
aggregation_metrics = [
metric for metric in aggregation_metrics_str.split(',')
metrics = [
metric for metric in metrics_str.split(',')
if metric in search.metrics_names]
except Exception:
abort(400, message='bad parameter types')
......@@ -218,47 +211,35 @@ class RepoCalcsResource(Resource):
without_currupted_mainfile = ~Q('term', code_name='currupted mainfile') # pylint: disable=invalid-unary-operand-type
q = q & without_currupted_mainfile if q is not None else without_currupted_mainfile
data = dict(**request.args)
data.pop('owner', None)
data.pop('scroll', None)
data.pop('scroll_id', None)
data.pop('per_page', None)
data.pop('page', None)
data.pop('order', None)
data.pop('order_by', None)
data.pop('total_metrics', None)
data.pop('aggregation_metrics', None)
data.pop('from_time', None)
data.pop('until_time', None)
if scroll:
data.update(scroll_id=scroll_id, size=per_page)
else:
data.update(
per_page=per_page, page=page, order=order, order_by=order_by, time_range=time_range,
total_metrics=total_metrics, aggregation_metrics=aggregation_metrics)
search_parameters = dict(**request.args)
search_parameters.pop('owner', None)
search_parameters.pop('scroll', None)
search_parameters.pop('scroll_id', None)
search_parameters.pop('per_page', None)
search_parameters.pop('page', None)
search_parameters.pop('order', None)
search_parameters.pop('order_by', None)
search_parameters.pop('metrics', None)
search_parameters.pop('from_time', None)
search_parameters.pop('until_time', None)
try:
if scroll:
page = -1
scroll_id, total, results = search.scroll_search(q=q, **data)
aggregations = {}
metrics = {}
results = search.scroll_search(
q=q, scroll_id=scroll_id, size=per_page, search_parameters=search_parameters)
else:
scroll_id = None
total, results, aggregations, metrics = search.aggregate_search(q=q, **data)
results = search.metrics_search(
q=q, per_page=per_page, page=page, order=order, order_by=order_by,
time_range=time_range, metrics_to_use=metrics, search_parameters=search_parameters)
# TODO just a work around to make things prettier
quantities = results['quantities']
if 'code_name' in quantities and 'currupted mainfile' in quantities['code_name']:
del(quantities['code_name']['currupted mainfile'])
return results, 200
except search.ScrollIdNotFound:
abort(400, 'The given scroll_id does not exist.')
except KeyError as e:
abort(400, str(e))
# TODO just a workarround to make things prettier
if 'code_name' in aggregations and 'currupted mainfile' in aggregations['code_name']:
del(aggregations['code_name']['currupted mainfile'])
return dict(
pagination=dict(total=total, page=page, per_page=per_page),
results=results,
scroll_id=scroll_id,
aggregations=aggregations,
metrics=metrics), 200
......@@ -1400,7 +1400,7 @@ class NomadCOEMigration:
scroll_args['scroll_id'] = scroll_id
search = self.call_api('repo.search', upload_id=upload_id, owner='admin', **scroll_args)
scroll_id = search.scroll_id
scroll_id = search.scroll.scroll_id
for calc in search.results:
yield calc
......
......@@ -16,7 +16,7 @@
This module represents calculations in elastic search.
"""
from typing import Iterable, Dict, Tuple, List
from typing import Iterable, Dict, Tuple, List, Any
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
Object, Boolean, Search, Q, A, analyzer, tokenizer
from elasticsearch_dsl.document import IndexMeta
......@@ -195,7 +195,8 @@ search_quantities = {
'upload_id': ('term', 'upload_id', 'Search for the upload_id.'),
'calc_id': ('term', 'calc_id', 'Search for the calc_id.'),
'pid': ('term', 'pid', 'Search for the pid.'),
'mainfile': ('term', 'mainfile', 'Search for the mainfile.')
'mainfile': ('term', 'mainfile', 'Search for the mainfile.'),
'datasets': ('term', 'datasets.name', 'Search for a particular dataset by name.')
}
"""
The available search quantities in :func:`aggregate_search` as tuples with *search type*,
......@@ -229,7 +230,10 @@ for quantity in datamodel.Domain.instance.quantities.values():
order_default_quantity = quantity.name
def _construct_search(q: Q = None, time_range: Tuple[datetime, datetime] = None, **kwargs) -> Search:
def _construct_search(
q: Q = None, time_range: Tuple[datetime, datetime] = None,
search_parameters: Dict[str, Any] = {}, **kwargs) -> Search:
search = Search(index=config.elastic.index_name)
if q is not None:
......@@ -238,10 +242,13 @@ def _construct_search(q: Q = None, time_range: Tuple[datetime, datetime] = None,
if time_range is not None:
search = search.query('range', upload_time=dict(gte=time_range[0], lte=time_range[1]))
for key, value in kwargs.items():
for key, value in search_parameters.items():
query_type, field, _ = search_quantities.get(key, (None, None, None))
if query_type is None:
raise KeyError('Unknown quantity %s' % key)
if key in ['page', 'per_page', 'order', 'order_by']:
continue
else:
raise KeyError('Unknown quantity %s' % key)
if isinstance(value, list):
values = value
......@@ -263,40 +270,71 @@ def _construct_search(q: Q = None, time_range: Tuple[datetime, datetime] = None,
return search
def _execute_paginated_search(
search: Search,
page: int = 1, per_page: int = 10,
order_by: str = order_default_quantity, order: int = -1,
**kwargs) -> Tuple[Any, Dict[str, Any]]:
if order_by not in search_quantities:
raise KeyError('Unknown order quantity %s' % order_by)
search = search.sort(order_by if order == 1 else '-%s' % order_by)
paginated_search = search[(page - 1) * per_page: page * per_page]
response = paginated_search.execute() # pylint: disable=E1101
total_results = response.hits.total
search_results = [hit.to_dict() for hit in response.hits]
return response, {
'pagination': {
'page': page,
'per_page': per_page,
'total': total_results
},
'results': search_results
}
def scroll_search(
scroll_id: str = None, size: int = 1000, scroll: str = u'5m',
q: Q = None, **kwargs) -> Tuple[str, int, List[dict]]:
q: Q = None, search_parameters: Dict[str, Any] = {}) -> Dict[str, Any]:
"""
Alternative search based on ES scroll API. Can be used similar to
:func:`aggregate_search`, but pagination is replaced with scrolling, no ordering,
and no aggregation information is given.
no property, and no metrics information is available.
he search is limited to parameters :param:`q` and :param:`search_parameters`,
which work exactly as in :func:`entry_search`.
Scrolling is done by calling this function again and again with the same ``scroll_id``.
Each time, this function will return the next batch of search results. If the
``scroll_id`` is not available anymore, a new ``scroll_id`` is assigned and scrolling
starts from the beginning again.
See see :func:`aggregate_search` for additional ``kwargs``
Arguments:
scroll_id: The scroll id to receive the next batch from. None will create a new
scroll.
size: The batch size in number of hits.
scroll: The time the scroll should be kept alive (i.e. the time between requests
to this method) in ES time units. Default is 5 minutes.
Returns: A tuple with ``scroll_id``, total amount of hits, and result list.
Returns:
A dict with keys 'scroll' and 'results'. The key 'scroll' holds a dict with
'total', 'scroll_id', 'size'.
"""
es = infrastructure.elastic_client
if scroll_id is None:
# initiate scroll
search = _construct_search(q, **kwargs)
search = _construct_search(q, search_parameters=search_parameters)
resp = es.search(body=search.to_dict(), scroll=scroll, size=size, index=config.elastic.index_name) # pylint: disable=E1123
scroll_id = resp.get('_scroll_id')
if scroll_id is None:
# no results for search query
return None, 0, []
return dict(scroll=dict(total=0, size=size), results=[])
else:
try:
resp = es.scroll(scroll_id, scroll=scroll) # pylint: disable=E1123
......@@ -304,7 +342,7 @@ def scroll_search(
raise ScrollIdNotFound()
total = resp['hits']['total']
results = [hit['_source'] for hit in resp['hits']['hits']]
results = list(hit['_source'] for hit in resp['hits']['hits'])
# since we are using the low level api here, we should check errors
if resp["_shards"]["successful"] < resp["_shards"]["total"]:
......@@ -313,122 +351,196 @@ def scroll_search(
if len(results) == 0:
es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, )) # pylint: disable=E1123
return None, total, []
scroll_id = None
return scroll_id, total, results
scroll_info = dict(total=total, size=size)
if scroll_id is not None:
scroll_info.update(scroll_id=scroll_id)
return dict(scroll=scroll_info, results=results)
def aggregate_search(
page: int = 1, per_page: int = 10, order_by: str = order_default_quantity, order: int = -1,
def entry_search(
q: Q = None,
page: int = 1, per_page: int = 10,
order_by: str = order_default_quantity, order: int = -1,
time_range: Tuple[datetime, datetime] = None,
aggregations: Dict[str, int] = aggregations,
aggregation_metrics: List[str] = [],
total_metrics: List[str] = [],
**kwargs) -> Tuple[int, List[dict], Dict[str, Dict[str, Dict[str, int]]], Dict[str, int]]:
search_parameters: Dict[str, Any] = {}) -> Dict[str, Any]:
"""
Performs a search and returns paginated search results and aggregations. The aggregations
contain overall and per quantity value sums of code runs (calcs), unique code runs, datasets,
and additional domain specific metrics (e.g. total energies, and unique geometries for DFT
calculations).
Performs a search and returns a paginated list of search results.
The search is determimed by the given elasticsearch_dsl query param:`q`,
param:`time_range` and additional :param:`search_parameters`.
The search_parameters have to match general or domain specific metadata quantities.
See module:`datamodel`.
The search results are paginated. Pagination is controlled by the pagination parameters
param:`page` and param:`per_page`. The results are ordered.
Arguments:
page: The page to return starting with page 1
per_page: Results per page
q: An *elasticsearch_dsl* query used to further filter the results (via ``and``)
time_range: A tuple to filter for uploads within with start, end ``upload_time``.
search_parameters: Adds a ``and`` search for each key, value pair. Where the key corresponds
to a quantity and the value is the value to search for in this quantity.
Returns:
A dict with keys 'pagination' and 'results' (similar to pagination in the REST API).
The pagination key holds a dict with keys 'total', 'page', 'per_page'. The
results key holds an array with the found entries.
"""
search = _construct_search(q, time_range, search_parameters=search_parameters)
_, results = _execute_paginated_search(search, page, per_page, order_by, order)
return results
def quantity_search(
quantities: Dict[str, Any], with_entries: bool = True, size: int = 100,
**kwargs) -> Dict[str, Any]:
"""
Performs a search like :func:`entry_search`, but instead of entries, returns the values
of the given quantities that are exhibited by the entries in the search results.
In contrast to :func:`metrics_search` it allows to scroll through all values via
elasticsearch's composite aggregations.
Optionally, it will also return the entries.
This can be used to implement continues scrolling through authors, datasets, or uploads
within the searched entries.
Arguments:
quantities: A dict, where the keys are quantity names, and the values are either
None, or the 'after' value. This allows to scroll over various requests, by
providing the 'after' value of the last search. The 'after' value is
part of the return.
with_entries: If True, the method will also return the entry search results. See
:func:`entry_search`.
size: The size of the quantity lists to return with each call.
**kwargs: Additional arguments are passed to the underlying entry search.
Returns:
A dictionary with key 'quantities' (and optionally the keys of the
return of :func:`entry_search` ). The 'quantities' key will hold a dict
of quantities, each quantity is a dictionary with 'after' and 'values' key.
The 'values' key holds a dict with actual values as keys and their entry count
as values (i.e. number of entries with that value).
"""
search = _construct_search(**kwargs)
for quantity, after in quantities.items():
_, field, _ = search_quantities[quantity]
terms = A('terms', field=field)
composite = dict(sources={quantity: terms}, size=size)
if after is not None:
composite['after'] = after
search.aggs.bucket(quantity, 'composite', **composite)
response, entry_results = _execute_paginated_search(search, **kwargs)
quantity_results = {
quantity: {
'after': getattr(getattr(response.aggregations, quantity).after_key, quantity),
'values': {
getattr(bucket.key, quantity): bucket.doc_count
for bucket in getattr(response.aggregations, quantity).buckets
}
}
for quantity in quantities.keys()
}
results = dict(quantities=quantity_results)
if with_entries:
results.update(**entry_results)
return results
def metrics_search(
quantities: Dict[str, int] = aggregations, metrics_to_use: List[str] = [],
with_entries: bool = True, **kwargs) -> Dict[str, Any]:
"""
Performs a search like :func:`entry_search`, but instead of entries, returns the given
metrics aggregated for (a limited set of values) of the given quantities calculated
from the entries in the search results.
In contrast to :func:`property_search` the amount of values for each quantity is
limited.
Optionally, it will also return the entries.
This can be used to display statistics over the searched entries and allows to
implement faceted search on the top values for each quantity.
The metrics contain overall and per quantity value sums of code runs (calcs), unique code runs,
datasets, and additional domain specific metrics (e.g. total energies, and unique geometries for DFT