Commit da7b1904 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Completed refactor of search and search API.

parents 662e3702 7b64512a
......@@ -34,11 +34,29 @@ git clone git@gitlab.mpcdf.mpg.de:nomad-lab/nomad-FAIR.git
cd nomad-FAIR
```
### C libs
Even though the NOMAD infrastructure is written in python, there is a C library
required by one of our pyhton dependencies.
#### libmagic
Libmagic allows to determine the MIME type of files. It should be installed on most
unix/linux systems. It can be installed on MacOS with homebrew:
```
brew install libmagic
```
### Virtual environment
#### pyenv
The nomad code currently targets python 3.6. If you host machine has 3.7 or later installed,
you can use [pyenv](https://github.com/pyenv/pyenv) to use python 3.6 in parallel.
While in principle everything should be compatable with 3.7 and later there have been
issues with some dependencies and requirements not being compatible with 3.7
#### virtualenv
We strongly recommend to use *virtualenv* to create a virtual environment. It will allow you
to keep nomad and its dependencies separate from your system's python installation.
Make sure to base the virtual environment on Python 3.
......@@ -49,6 +67,19 @@ virtualenv -p `which python3` .pyenv
source .pyenv/bin/activate
```
#### Conda
If you are a conda user, there is an equivalent, but you have to install pip and the
right python version while creating the environment.
```
conda create --name nomad_env pip python=3.6
conda activate nomad_env
```
To install libmagick for conda, you can use (other channels might also work):
```
conda -c conda-forge install --name nomad_env libmagic
```
The next steps can be done using the `setup.sh` script. If you prefere to understand all
the steps and run them manually, read on:
......
......@@ -32,7 +32,7 @@ from nomad.processing import Calc
from .app import api
from .auth import login_if_available, create_authorization_predicate, \
signature_token_argument, with_signature_token
from .repo import search_request_parser, create_search_kwargs
from .repo import search_request_parser, add_query
if sys.version_info >= (3, 7):
import zipfile
......@@ -371,10 +371,12 @@ class RawFileQueryResource(Resource):
except Exception:
abort(400, message='bad parameter types')
search_kwargs = create_search_kwargs()
search_request = search.SearchRequest()
add_query(search_request)
calcs = sorted([
(entry['upload_id'], entry['mainfile'])
for entry in search.entry_scan(**search_kwargs)], key=lambda x: x[0])
for entry in search_request.execute_scan()], key=lambda x: x[0])
def generator():
for upload_id, mainfile in calcs:
......
......@@ -20,9 +20,7 @@ meta-data.
from typing import List
from flask_restplus import Resource, abort, fields
from flask import request, g
from elasticsearch_dsl import Q
from elasticsearch.exceptions import NotFoundError
import datetime
from nomad import search, utils
......@@ -79,11 +77,11 @@ repo_calcs_model = api.model('RepoCalculations', {
'results': fields.List(fields.Raw, description=(
'A list of search results. Each result is a dict with quantitie names as key and '
'values as values')),
'quantities': fields.Raw(description=(
'A dict with all aggregations. Each aggregation is dictionary with a metrics dict as '
'value and quantity value as key. The metrics are code runs(calcs), %s. '
'There is a pseudo quantity "total" with a single value "all" that contains the metrics over all results. ' %
', '.join(search.metrics_names)))
'statistics': fields.Raw(description=(
'A dict with all statistics. Each statistic is dictionary with a metrics dict as '
'value and quantity value as key. The possible metrics are code runs(calcs), %s. '
'There is a pseudo quantity "total" with a single value "all" that contains the '
' metrics over all results. ' % ', '.join(search.metrics_names)))
})
......@@ -127,72 +125,36 @@ search_request_parser = api.parser()
add_common_parameters(search_request_parser)
def _create_owner_query():
owner = request.args.get('owner', 'all')
# TODO this should be removed after migration
# if owner == 'migrated':
# q = Q('term', published=True) & Q('term', with_embargo=False)
# if g.user is not None:
# q = q | Q('term', owners__user_id=g.user.user_id)
# q = q & ~Q('term', **{'uploader.user_id': 1}) # pylint: disable=invalid-unary-operand-type
if owner == 'all':
q = Q('term', published=True) & Q('term', with_embargo=False)
if g.user is not None:
q = q | Q('term', owners__user_id=g.user.user_id)
elif owner == 'public':
q = Q('term', published=True) & Q('term', with_embargo=False)
elif owner == 'user':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', owners__user_id=g.user.user_id)
elif owner == 'staging':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', published=False) & Q('term', owners__user_id=g.user.user_id)
elif owner == 'admin':
if g.user is None or not g.user.is_admin:
abort(401, message='This can only be used by the admin user.')
q = None
else:
abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
# TODO this should be removed after migration
without_currupted_mainfile = ~Q('term', code_name='currupted mainfile') # pylint: disable=invalid-unary-operand-type
q = q & without_currupted_mainfile if q is not None else without_currupted_mainfile
return q
def _create_search_parameters():
""" Helper that creates a request.args dict with isolated search parameters """
return {
key: request.args.getlist(key) if search.search_quantities[key] else request.args.get(key)
for key in request.args.keys()
if key in search.search_quantities}
def _create_time_range():
def add_query(search_request: search.SearchRequest):
"""
Help that adds query relevant request parameters to the given SearchRequest.
"""
# owner
try:
search_request.owner(
request.args.get('owner', 'all'),
g.user.user_id if g.user is not None else None)
except ValueError as e:
abort(401, getattr(e, 'message', 'Invalid owner parameter'))
except Exception as e:
abort(400, getattr(e, 'message', 'Invalid owner parameter'))
# time range
from_time_str = request.args.get('from_time', None)
until_time_str = request.args.get('until_time', None)
try:
if from_time_str is None and until_time_str is None:
return None
else:
from_time = rfc3339DateTime.parse('2000-01-01' if from_time_str is None else from_time_str)
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.utcnow()
return from_time, until_time
from_time = rfc3339DateTime.parse(from_time_str) if from_time_str is not None else None
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else None
search_request.time_range(start=from_time, end=until_time)
except Exception:
abort(400, message='bad datetime format')
def create_search_kwargs():
return dict(
q=_create_owner_query(),
time_range=_create_time_range(),
search_parameters=_create_search_parameters())
# search parameter
search_request.search_parameters(**{
key: request.args.getlist(key) if search.search_quantities[key] else request.args.get(key)
for key in request.args.keys()
if key in search.search_quantities})
@ns.route('/')
......@@ -234,21 +196,24 @@ class RepoCalcsResource(Resource):
Ordering is determined by ``order_by`` and ``order`` parameters.
"""
search_request = search.SearchRequest()
add_query(search_request)
try:
scroll = bool(request.args.get('scroll', False))
date_histogram = bool(request.args.get('date_histogram', False))
scroll_id = request.args.get('scroll_id', None)
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10 if not scroll else 1000))
order = int(request.args.get('order', -1))
order_by = request.args.get('order_by', 'formula')
if bool(request.args.get('date_histogram', False)):
search_request.date_histogram()
metrics: List[str] = request.args.getlist('metrics')
except Exception:
abort(400, message='bad parameter types')
search_kwargs = create_search_kwargs()
order_by = request.args.get('order_by', 'formula')
try:
assert page >= 1
assert per_page > 0
......@@ -261,22 +226,20 @@ class RepoCalcsResource(Resource):
for metric in metrics:
if metric not in search.metrics_names:
abort(400, message='there is not metric %s' % metric)
search_request.statistics(metrics_to_use=metrics)
try:
if scroll:
results = search.scroll_search(
scroll_id=scroll_id, size=per_page, **search_kwargs)
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
else:
results = search.metrics_search(
per_page=per_page, page=page, order=order, order_by=order_by,
metrics_to_use=metrics,
with_date_histogram=date_histogram, **search_kwargs)
results = search_request.execute_paginated(
per_page=per_page, page=page, order=order, order_by=order_by)
# TODO just a work around to make things prettier
quantities = results['quantities']
if 'code_name' in quantities and 'currupted mainfile' in quantities['code_name']:
del(quantities['code_name']['currupted mainfile'])
statistics = results['statistics']
if 'code_name' in statistics and 'currupted mainfile' in statistics['code_name']:
del(statistics['code_name']['currupted mainfile'])
return results, 200
except search.ScrollIdNotFound:
......@@ -288,11 +251,10 @@ class RepoCalcsResource(Resource):
repo_quantity_values_model = api.model('RepoQuantityValues', {
'quantities': fields.Raw(description='''
A dict with the requested quantity as single key.
The value is a dictionary with 'after' and 'values' keys.
The 'values' key holds a dict with actual values as keys and their entry count
as values (i.e. number of entries with that value). ''')
'quantity': fields.Nested(api.model('RepoQuantity', {
'after': fields.String(description='The after value that can be used to retrieve the next set of values.'),
'values': fields.Raw(description='A dict with values as key and entry count as values.')
}), allow_null=True)
})
repo_quantity_search_request_parser = api.parser()
......@@ -324,19 +286,17 @@ class RepoQuantityResource(Resource):
scrolling. The result will contain an 'after' value, that can be specified
for the next request. You can use the 'size' and 'after' parameters accordingly.
The result will contain a 'quantities' key with the given quantity and the
respective values (upto 'size' many). For the rest of the values use the
'after' parameter accordingly.
The result will contain a 'quantity' key with quantity values and the "after"
value. There will be upto 'size' many values. For the rest of the values use the
"after" parameter in another request.
"""
search_request = search.SearchRequest()
add_query(search_request)
try:
after = request.args.get('after', None)
size = int(request.args.get('size', 100))
from_time = rfc3339DateTime.parse(request.args.get('from_time', '2000-01-01'))
until_time_str = request.args.get('until_time', None)
until_time = rfc3339DateTime.parse(until_time_str) if until_time_str is not None else datetime.datetime.utcnow()
time_range = (from_time, until_time)
except Exception:
abort(400, message='bad parameter types')
......@@ -345,13 +305,12 @@ class RepoQuantityResource(Resource):
except AssertionError:
abort(400, message='invalid size')
q = _create_owner_query()
search_parameters = _create_search_parameters()
search_request.quantity(quantity, size=size, after=after)
try:
results = search.quantity_search(
q=q, time_range=time_range, search_parameters=search_parameters,
quantities={quantity: after}, size=size, with_entries=False)
results = search_request.execute()
quantities = results.pop('quantities')
results['quantity'] = quantities[quantity]
return results, 200
except KeyError as e:
......@@ -367,17 +326,25 @@ class RepoPidResource(Resource):
@api.marshal_with(repo_calc_id_model, skip_none=True, code=200, description='Entry resolved')
@login_if_available
def get(self, pid: int):
q = _create_owner_query()
results = search.entry_search(q, page=1, per_page=1, search_parameters=dict(pid=pid))
total = results['pagination']['total']
if total == 1:
return dict(
upload_id=results['results'][0]['upload_id'],
calc_id=results['results'][0]['calc_id'])
elif total == 0:
abort(404, 'Entry with PID %d does not exist' % pid)
search_request = search.SearchRequest()
if g.user is not None:
search_request.owner('all', user_id=g.user.user_id)
else:
search_request.owner('all')
search_request.search_parameter('pid', pid)
results = list(search_request.execute_scan())
total = len(results)
if total == 0:
abort(404, 'Entry with PID %d does not exist' % pid)
if total > 1:
utils.get_logger(__name__).error('Two entries for the same pid', pid=pid)
return dict(
upload_id=results['results'][0]['upload_id'],
calc_id=results['results'][0]['calc_id'])
result = results[0]
return dict(
upload_id=result['upload_id'],
calc_id=result['calc_id'])
......@@ -315,14 +315,14 @@ class SearchRequest:
raise ValueError('Authentication required for owner value user')
q = Q('term', published=False) & Q('term', owners__user_id=user_id)
elif owner_type == 'admin':
if user_id is None or not User.is_admin(user_id):
if user_id is None or not coe_repo.User.from_user_id(user_id).is_admin:
raise ValueError('This can only be used by the admin user.')
q = None
else:
raise KeyError('Unsupported owner value')
if q is not None:
self.q &= q
self.q = self.q & q
return self
......@@ -333,10 +333,12 @@ class SearchRequest:
in the domain's (DFT calculations) datamodel. Alternatively search parameters
can be set via attributes.
"""
for name, value in kwargs:
setattr(self, name, value)
for name, value in kwargs.items():
self.search_parameter(name, value)
def __setattr__(self, name, value):
return self
def search_parameter(self, name, value):
quantity = search_quantities.get(name, None)
if quantity is None:
raise KeyError('Unknown quantity %s' % name)
......@@ -352,7 +354,7 @@ class SearchRequest:
values = [value]
for item in values:
q &= Q(quantity.elastic_search_type, **{quantity.elastic_field: item})
self.q &= Q(quantity.elastic_search_type, **{quantity.elastic_field: item})
return self
......@@ -375,9 +377,11 @@ class SearchRequest:
""" The underlying elasticsearch_dsl query object """
if self._query is None:
return Q('match_all')
else:
return self._query
@q.setter
def q(self, q, value):
def q(self, q):
self._query = q
def statistics(
......@@ -491,7 +495,7 @@ class SearchRequest:
if after is not None:
composite['after'] = {name: after}
self._search.aggs.bucket('quantitiy:%s' % name, 'composite', **composite)
self._search.aggs.bucket('quantity:%s' % name, 'composite', **composite)
return self
......@@ -500,17 +504,17 @@ class SearchRequest:
Exectutes without returning actual results. Only makes sense if the request
was configured for statistics or quantity values.
"""
return self._response(self._search.execute())
return self._response(self._search.query(self.q)[0:0].execute())
def execute_scan(self):
"""
This execute the search as scan. The result will be a generator over the found
entries. Everything but the query part of this object, will be ignored.
"""
for hit in self._search.scan():
for hit in self._search.query(self.q).scan():
yield hit.to_dict()
def execute_pagenated(
def execute_paginated(
self, page: int = 1, per_page=10, order_by: str = order_default_quantity,
order: int = -1):
"""
......@@ -522,7 +526,7 @@ class SearchRequest:
order_by: The quantity to order by.
order: -1 or 1 for descending or ascending order.
"""
search = self._search
search = self._search.query(self.q)
if order_by not in search_quantities:
raise KeyError('Unknown order quantity %s' % order_by)
......@@ -535,8 +539,9 @@ class SearchRequest:
search = search.sort('-%s' % order_by_quantity.elastic_field)
search = search[(page - 1) * per_page: page * per_page]
result = self._response(search.execute())
result = self._response(search.execute(), with_hits=True)
result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
return result
def execute_scrolled(self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m'):
"""
......@@ -563,7 +568,7 @@ class SearchRequest:
if scroll_id is None:
# initiate scroll
resp = es.search( # pylint: disable=E1123
body=self._search.to_dict(), scroll=scroll, size=size,
body=self._search.query(self.q).to_dict(), scroll=scroll, size=size,
index=config.elastic.index_name)
scroll_id = resp.get('_scroll_id')
......@@ -595,55 +600,64 @@ class SearchRequest:
return dict(scroll=scroll_info, results=results)
def _response(self, response) -> Dict[str, Any]:
def _response(self, response, with_hits: bool = False) -> Dict[str, Any]:
"""
Prepares a response object covering the total number of resutls, hits, statistics,
and quantities. Other aspects like pagination and scrolling have to be added
elsewhere.
"""
result: Dict[str, Any] = dict()
aggs = response.aggregations.to_dict()
# total
total = response.hits.total if hasattr(response, 'hits') else 0
result.update(total=total)
# hits
result.update(results=[hit.to_dict() for hit in response.hits])
if len(response.hits) > 0 or with_hits:
result.update(results=[hit.to_dict() for hit in response.hits])
# statistics
def get_metrics(bucket, code_runs):
result = {
metric: bucket[metric]['value']
for metric in vars(bucket)
for metric in metrics_names if metric in bucket
}
result.update(code_runs=code_runs)
return result
metrics_results = {
quantity_name: {
bucket.key: get_metrics(bucket, bucket.doc_count)
for bucket in getattr(response.aggregations, quantity_name).buckets
quantity_name[11:]: {
bucket['key']: get_metrics(bucket, bucket['doc_count'])
for bucket in quantity['buckets']
}
for quantity_name in vars(response.aggrgations)
for quantity_name, quantity in aggs.items()
if quantity_name.startswith('statistics:')
}
total_metrics_result = get_metrics(response.aggregations, total)
metrics_results['total'] = dict(all=total_metrics_result)
result.update(quantities=metrics_results)
if len(metrics_results) > 0:
total_metrics_result = get_metrics(aggs, total)
metrics_results['total'] = dict(all=total_metrics_result)
result.update(statistics=metrics_results)
# quantities
def create_quantity_result(quantity):
values = getattr(response.aggregations, quantity)
def create_quantity_result(quantity_name, quantity):
result = dict(values={
getattr(bucket.key, quantity): bucket.doc_count
for bucket in values.buckets})
bucket['key'][quantity_name]: bucket['doc_count']
for bucket in quantity['buckets']})
if hasattr(values, 'after_key'):
result.update(after=getattr(values.after_key, quantity))
if 'after_key' in quantity:
result.update(after=quantity['after_key'][quantity_name])
return result
quantity_results = {
quantity: create_quantity_result(quantity)
for quantity_name in vars(response.aggrgations)
if quantity_name.startswith('statistics:')
quantity_name[9:]: create_quantity_result(quantity_name[9:], quantity)
for quantity_name, quantity in aggs.items()
if quantity_name.startswith('quantity:')
}
result.update(quantities=quantity_results)
if len(quantity_results) > 0:
result.update(quantities=quantity_results)
return result
......@@ -775,20 +775,20 @@ class TestRepo():
(2, 'quantities', ['wyckoff_letters_primitive', 'hall_number']),
(0, 'quantities', 'dos')
])
def test_search_quantities(self, client, example_elastic_calcs, no_warn, test_user_auth, calcs, quantity, value):
def test_search_parameters(self, client, example_elastic_calcs, no_warn, test_user_auth, calcs, quantity, value):
query_string = urlencode({quantity: value}, doseq=True)
rv = client.get('/repo/?%s' % query_string, headers=test_user_auth)
logger.debug('run search quantities test', query_string=query_string)
data = self.assert_search(rv, calcs)
quantities = data.get('quantities', None)
assert quantities is not None
statistics = data.get('statistics', None)
assert statistics is not None
if quantity == 'system' and calcs != 0:
# for simplicity we only assert on quantities for this case
assert 'system' in quantities
assert len(quantities['system']) == 1
assert value in quantities['system']
assert 'system' in statistics
assert len(statistics['system']) == 1
assert value in statistics['system']
metrics_permutations = [[], search.metrics_names] + [[metric] for metric in search.metrics_names]
......@@ -808,7 +808,7 @@ class TestRepo():
rv = client.get('/repo/?%s' % urlencode(dict(metrics=metrics), doseq=True))
assert rv.status_code == 200, str(rv.data)
data = json.loads(rv.data)
total_metrics = data.get('quantities', {}).get('total', {}).get('all', None)
total_metrics = data.get('statistics', {}).get('total', {}).get('all', None)
assert total_metrics is not None
assert 'code_runs' in total_metrics
for metric in metrics:
......@@ -819,7 +819,7 @@ class TestRepo():
rv = client.get('/repo/?%s' % urlencode(dict(metrics=metrics), doseq=True))
assert rv.status_code == 200
data = json.loads(rv.data)
for name, quantity in data.get('quantities').items():
for name, quantity in data.get('statistics').items():
for metrics_result in quantity.values():
assert 'code_runs' in metrics_result
if name != 'authors':
......@@ -832,8 +832,7 @@ class TestRepo():
rv = client.get('/repo/?date_histogram=true&metrics=total_energies')
assert rv.status_code == 200
data = json.loads(rv.data)
histogram = data.get('quantities').get('date_histogram')
print(histogram)
histogram = data.get('statistics').get('date_histogram')
assert len(histogram) > 0
@pytest.mark.parametrize('n_results, page, per_page', [(2, 1, 5), (1, 1, 1), (0, 2, 3)])
......@@ -903,10 +902,7 @@ class TestRepo():
rv = client.get('/repo/%s' % quantity, headers=test_user_auth)
assert rv.status_code == 200
data = json.loads(rv.data)