Commit bb57b6bc authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Implemented basic aggregation based search functions.

parent 1e8eef8c
Pipeline #43848 failed with stages
in 17 minutes and 40 seconds
......@@ -99,16 +99,16 @@ class RepoCalcsResource(Resource):
if g.user is None:
q = Q('term', published=True)
else:
q = Q('term', published=True) | Q('term', uploader__user_id=g.user.user_id)
q = Q('term', published=True) | Q('term', owners__user_id=g.user.user_id)
elif owner == 'user':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', uploader__user_id=g.user.user_id)
q = Q('term', owners__user_id=g.user.user_id)
elif owner == 'staging':
if g.user is None:
abort(401, message='Authentication required for owner value user.')
q = Q('term', published=False) & Q('term', uploader__user_id=g.user.user_id)
q = Q('term', published=False) & Q('term', owners__user_id=g.user.user_id)
else:
abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
......
......@@ -364,7 +364,7 @@ class Upload(Proc):
with utils.timer(
logger, 'upload deleted from index', step='delete',
upload_size=self.upload_files.size):
search.Entry.delete_upload(self.upload_id)
search.delete_upload(self.upload_id)
with utils.timer(
logger, 'staged upload deleted', step='delete',
......@@ -402,11 +402,8 @@ class Upload(Proc):
upload_size=self.upload_files.size):
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is not None:
for coe_calc in coe_upload.calcs:
entry = search.Entry.from_calc_with_metadata(
coe_calc.to_calc_with_metadata())
entry.published = True
entry.save(refresh=True)
search.publish(
[coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs])
with utils.timer(
logger, 'staged upload deleted', step='publish',
......
......@@ -16,10 +16,13 @@
This module represents calculations in elastic search.
"""
from typing import Iterable, Dict, Tuple, List
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
Object, Boolean, Search, Integer
Object, Boolean, Search, Integer, Q, A
import elasticsearch.helpers
import ase.data
from nomad import config, datamodel, infrastructure, datamodel, coe_repo
from nomad import config, datamodel, infrastructure, datamodel, coe_repo, parsing
class AlreadyExists(Exception): pass
......@@ -34,8 +37,9 @@ class User(InnerDoc):
if 'first_name' not in user:
user = coe_repo.User.from_user_id(user.id).to_popo()
self.name = '%s %s' % (user['first_name'], user['last_name'])
self.name_keyword = '%s %s' % (user['first_name'], user['last_name'])
name = '%s, %s' % (user['last_name'], user['first_name'])
self.name = name
self.name_keyword = name
return self
......@@ -74,8 +78,8 @@ class Entry(Document):
with_embargo = Boolean()
published = Boolean()
coauthors = Object(User)
shared_with = Object(User)
authors = Object(User, multi=True)
owners = Object(User, multi=True)
comment = Text()
references = Keyword()
datasets = Object(Dataset)
......@@ -95,6 +99,11 @@ class Entry(Document):
geometries = Keyword(multi=True)
quantities = Keyword(multi=True)
# def __init__(self, *args, **kwargs):
# super().__init__(*args, **kwargs)
# self.authors = []
# self.owners = []
@classmethod
def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
entry = Entry(meta=dict(id=source.calc_id))
......@@ -113,8 +122,11 @@ class Entry(Document):
self.with_embargo = source.with_embargo
self.published = source.published
self.coauthors = [User.from_user_popo(user) for user in source.coauthors]
self.shared_with = [User.from_user_popo(user) for user in source.shared_with]
self.authors = [User.from_user_popo(user) for user in source.coauthors]
self.owners = [User.from_user_popo(user) for user in source.shared_with]
if self.uploader is not None:
self.authors.append(self.uploader)
self.owners.append(self.uploader)
self.comment = source.comment
self.references = [ref.value for ref in source.references]
self.datasets = [Dataset.from_dataset_popo(ds) for ds in source.datasets]
......@@ -149,33 +161,116 @@ class Entry(Document):
self.n_total_energies = n_total_energies
self.n_geometries = n_geometries
@classmethod
def update_by_query(cls, upload_id, script):
""" Update all entries of a given upload via elastic script. """
index = cls._default_index()
doc_type = cls._doc_type.name
conn = cls._get_connection()
body = {
'script': script,
'query': {
'match': {
'upload_id': upload_id
}
}
}
conn.update_by_query(index, doc_type=[doc_type], body=body)
@classmethod
def publish_upload(cls, upload: datamodel.UploadWithMetadata):
cls.update_by_query(upload.upload_id, 'ctx._source["published"] = true')
# TODO run update on all calcs with their new metadata
@classmethod
def delete_upload(cls, upload_id):
index = cls._default_index()
Search(index=index).query('match', upload_id=upload_id).delete()
@staticmethod
def es_search(body):
""" Perform an elasticsearch and not elasticsearch_dsl search on the Calc index. """
return infrastructure.elastic_client.search(index=config.elastic.index_name, body=body)
def delete_upload(upload_id):
""" Delete all entries with given ``upload_id`` from the index. """
index = Entry._default_index()
Search(index=index).query('match', upload_id=upload_id).delete()
def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
""" Update all given calcs with their metadata and set ``publish = True``. """
def elastic_updates():
for calc in calcs:
entry = Entry.from_calc_with_metadata(calc)
entry.published = True
yield entry.to_dict(include_meta=True)
elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
default_aggregations = {
'atoms': len(ase.data.chemical_symbols),
'system': 10,
'crystal_system': 10,
'code_name': len(parsing.parsers),
'xc_functional': 10,
'authors': 10
}
def aggregate_search(
page: int = 0, per_page: int = 10, q: Q = None,
aggregations: Dict[str, int] = default_aggregations,
**kwargs) -> Tuple[int, List[dict], Dict[str, Dict[str, int]]]:
"""
Performs a search and returns paginated search results and aggregation bucket sizes
based on key quantities.
Arguments:
page: The page to return starting with 0
per_page: Results per page
q: An *elasticsearch_dsl* query used to further filter the results (via `and`)
aggregations: A customized list of aggregations to perform. Keys are index fields,
and values the amount of buckets to return. Only works on *keyword* field.
**kwargs: Field, value pairs to search for.
Returns: A tuple with the total hits, an array with the results, an dictionary with
the aggregation data.
"""
search = Search()
if q is not None:
search.query(q)
for key, value in kwargs.items():
if key == 'comment':
search = search.query(Q('match', **{key: value}))
else:
search = search.query(Q('term', **{key: value}))
for aggregation, size in aggregations.items():
if aggregation == 'authors':
search.aggs.bucket(aggregation, A('terms', field='authors.name_keyword', size=size))
else:
search.aggs.bucket(aggregation, A('terms', field=aggregation, size=size))
response = search[page * per_page: (page + 1) * per_page].execute() # pylint: disable=no-member
total_results = response.hits.total
search_results = [hit.to_dict() for hit in response.hits]
aggregation_results = {
aggregation: {
bucket.key: bucket.doc_count
for bucket in getattr(response.aggregations, aggregation).buckets
}
for aggregation in aggregations.keys()
}
return total_results, search_results, aggregation_results
def authors(per_page: int = 10, after: str = None, prefix: str = None) -> Tuple[Dict[str, int], str]:
"""
Returns the name field for all authors with the number of their calculations in
their natural order.
The author buckets of :func:`search` is limit to the top 10 author. This function
in contrast, allows to paginate through all authors.
Arguments:
per_page: The number of authors to return
after: Only return the authors after the given ``name_keyword`` field value
(for pagination).
prefix: Used to do a prefix search on authors. Be aware the return authors also
contain the coauthors of the actual authors with prefix.
Returns: A tuple with an ordered dict containing author ``name_keyword`` field value
and number of calculations and the ``name_keyword`` value of the last author
(to be used as the next ``after`` value).
"""
composite = dict(
size=per_page,
sources=dict(authors=dict(terms=dict(field='authors.name_keyword'))))
if after is not None:
composite.update(after=dict(authors=after))
body = dict(size=0, aggs=dict(authors=dict(composite=composite)))
if prefix is not None:
body.update(query=dict(prefix={'authors.name': prefix}))
response = infrastructure.elastic_client.search(index=config.elastic.index_name, body=body)
response = response['aggregations']['authors']
return {
bucket['key']['authors']: bucket['doc_count']
for bucket in response['buckets']}, response.get('after_key', {'authors': None})['authors']
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import names
import random
from essential_generators import DocumentGenerator
import datetime
from ase.data import chemical_symbols
from nomad import datamodel, parsing, utils
number_of = 20
random.seed(0)
gen = DocumentGenerator()
users = [(i + 1, names.get_first_name(), names.get_last_name(), gen.email()) for i in range(0, number_of)]
basis_sets = ['Numeric AOs', 'Gaussians', '(L)APW+lo', 'Plane waves']
xc_functionals = ['LDA', 'GGA', 'hybrid', 'meta-GGA', 'GW', 'unknown']
crystal_systems = ['triclinic', 'monoclinic', 'orthorombic', 'tetragonal', 'hexagonal', 'cubic']
systems = ['atom', 'molecule/cluster', '2D/surface', 'bulk']
comments = [gen.sentence() for _ in range(0, number_of)]
references = [(i + 1, gen.url()) for i in range(0, number_of)]
datasets = [(i + 1, gen.slug()) for i in range(0, number_of)]
codes = [parser[8:] for parser in parsing.parser_dict.keys()]
files = ['/'.join(gen.url().split('/')[3:]) for _ in range(0, number_of)]
low_numbers_for_atoms = [1, 1, 2, 2, 2, 2, 2, 3, 3, 4]
low_numbers_for_refs_and_datasets = [0, 0, 0, 0, 1, 1, 1, 2]
def _gen_user():
id, first, last, email = random.choice(users)
return utils.POPO(id=id, first_name=first, last_name=last, email=email)
def _gen_dataset():
id, name = random.choice(datasets)
return utils.POPO(id=id, name=name, doi=_gen_ref())
def _gen_ref():
id, value = random.choice(references)
return utils.POPO(id=id, value=value)
def generate_calc(pid: int = 0) -> datamodel.CalcWithMetadata:
random.seed(pid)
self = datamodel.CalcWithMetadata()
self.upload_id = utils.create_uuid()
self.calc_id = utils.create_uuid()
self.upload_time = datetime.datetime.now()
self.calc_hash = utils.create_uuid()
self.pid = pid
self.mainfile = random.choice(files)
self.files = list([self.mainfile] + random.choices(files, k=random.choice(low_numbers_for_atoms)))
self.uploader = _gen_user()
self.with_embargo = random.choice([True, False])
self.published = True
self.coauthors = list(_gen_user() for _ in range(0, random.choice(low_numbers_for_refs_and_datasets)))
self.shared_with = list(_gen_user() for _ in range(0, random.choice(low_numbers_for_refs_and_datasets)))
self.comment = random.choice(comments)
self.references = list(_gen_ref() for _ in range(0, random.choice(low_numbers_for_refs_and_datasets)))
self.datasets = list(
_gen_dataset()
for _ in range(0, random.choice(low_numbers_for_refs_and_datasets)))
self.atoms = list(random.choices(chemical_symbols[1:], k=random.choice(low_numbers_for_atoms)))
self.formula = ''.join('%s%d' % (atom, random.choice(low_numbers_for_atoms)) for atom in self.atoms)
self.formula = self.formula.replace('1', '')
self.basis_set = random.choice(basis_sets)
self.xc_functional = random.choice(xc_functionals)
self.system = random.choice(systems)
self.crystal_system = random.choice(crystal_systems)
self.spacegroup = '1'
self.code_name = random.choice(codes)
self.code_version = '1.0.0'
return self
if __name__ == '__main__':
import time
n = 2
start = time.time()
for pid in range(0, n):
calc = generate_calc(pid)
print(calc.to_dict())
print('%f' % ((time.time() - start) / n))
......@@ -14,8 +14,8 @@
from elasticsearch_dsl import Q
from nomad import datamodel, search, processing, parsing
from nomad.search import Entry
from nomad import datamodel, search, processing, parsing, infrastructure, config, coe_repo
from nomad.search import Entry, aggregate_search, authors
def test_init_mapping(elastic):
......@@ -47,12 +47,44 @@ def test_index_upload(elastic, processed: processing.Upload):
pass
def test_search(elastic, normalized: parsing.LocalBackend):
calc_with_metadata = normalized.to_calc_with_metadata()
create_entry(calc_with_metadata)
refresh_index()
total, hits, aggs = aggregate_search()
assert total == 1
assert hits[0]['calc_id'] == calc_with_metadata.calc_id
assert 'Bulk' in aggs['system']
assert aggs['system']['Bulk'] == 1
def test_authors(elastic, normalized: parsing.LocalBackend, test_user: coe_repo.User, other_test_user: coe_repo.User):
calc_with_metadata = normalized.to_calc_with_metadata()
calc_with_metadata.uploader = test_user.to_popo()
create_entry(calc_with_metadata)
calc_with_metadata.calc_id = 'other test calc'
calc_with_metadata.uploader = other_test_user.to_popo()
create_entry(calc_with_metadata)
refresh_index()
results, after = authors(per_page=1)
assert len(results) == 1
name = list(results.keys())[0]
assert after == name
def refresh_index():
infrastructure.elastic_client.indices.refresh(index=config.elastic.index_name)
def create_entry(calc_with_metadata: datamodel.CalcWithMetadata):
search.Entry.from_calc_with_metadata(calc_with_metadata).save(refresh=True)
search.Entry.from_calc_with_metadata(calc_with_metadata).save()
assert_entry(calc_with_metadata.calc_id)
def assert_entry(calc_id):
refresh_index()
calc = Entry.get(calc_id)
assert calc is not None
......@@ -63,6 +95,7 @@ def assert_entry(calc_id):
def assert_search_upload(upload_id, published: bool = False):
refresh_index()
search = Entry.search().query('match_all')[0:10]
if search.count() > 0:
for hit in search:
......@@ -73,3 +106,22 @@ def assert_search_upload(upload_id, published: bool = False):
for coauthor in hit.get('coauthors', []):
assert coauthor.get('name', None) is not None
if __name__ == '__main__':
from test_datamodel import generate_calc
from elasticsearch.helpers import bulk
import sys
print('Generate index with random example calculation data. First arg is number of items')
infrastructure.setup_elastic()
n = 100
if len(sys.argv) > 1:
n = int(sys.argv[1])
def gen_data():
for pid in range(0, n):
calc = generate_calc(pid)
calc = Entry.from_calc_with_metadata(calc)
yield calc.to_dict(include_meta=True)
bulk(infrastructure.elastic_client, gen_data())
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment