From bb57b6bc7d62b73878610f5041e4c4e6d048b266 Mon Sep 17 00:00:00 2001
From: Markus Scheidgen <markus.scheidgen@gmail.com>
Date: Sun, 17 Feb 2019 22:36:46 +0100
Subject: [PATCH] Implemented basic aggregation based search functions.

---
 nomad/api/repo.py        |   6 +-
 nomad/processing/data.py |   9 +--
 nomad/search.py          | 169 ++++++++++++++++++++++++++++++---------
 requirements.txt         |   1 +
 tests/test_datamodel.py  | 106 ++++++++++++++++++++++++
 tests/test_search.py     |  58 +++++++++++++-
 6 files changed, 300 insertions(+), 49 deletions(-)
 create mode 100644 tests/test_datamodel.py

diff --git a/nomad/api/repo.py b/nomad/api/repo.py
index 292ba8eecb..76b4c734df 100644
--- a/nomad/api/repo.py
+++ b/nomad/api/repo.py
@@ -99,16 +99,16 @@ class RepoCalcsResource(Resource):
             if g.user is None:
                 q = Q('term', published=True)
             else:
-                q = Q('term', published=True) | Q('term', uploader__user_id=g.user.user_id)
+                q = Q('term', published=True) | Q('term', owners__user_id=g.user.user_id)
         elif owner == 'user':
             if g.user is None:
                 abort(401, message='Authentication required for owner value user.')
 
-            q = Q('term', uploader__user_id=g.user.user_id)
+            q = Q('term', owners__user_id=g.user.user_id)
         elif owner == 'staging':
             if g.user is None:
                 abort(401, message='Authentication required for owner value user.')
-            q = Q('term', published=False) & Q('term', uploader__user_id=g.user.user_id)
+            q = Q('term', published=False) & Q('term', owners__user_id=g.user.user_id)
         else:
             abort(400, message='Invalid owner value. Valid values are all|user|staging, default is all')
 
diff --git a/nomad/processing/data.py b/nomad/processing/data.py
index 0f0fb24ad8..c8616954c2 100644
--- a/nomad/processing/data.py
+++ b/nomad/processing/data.py
@@ -364,7 +364,7 @@ class Upload(Proc):
             with utils.timer(
                     logger, 'upload deleted from index', step='delete',
                     upload_size=self.upload_files.size):
-                search.Entry.delete_upload(self.upload_id)
+                search.delete_upload(self.upload_id)
 
             with utils.timer(
                     logger, 'staged upload deleted', step='delete',
@@ -402,11 +402,8 @@ class Upload(Proc):
                     upload_size=self.upload_files.size):
                 coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
                 if coe_upload is not None:
-                    for coe_calc in coe_upload.calcs:
-                        entry = search.Entry.from_calc_with_metadata(
-                            coe_calc.to_calc_with_metadata())
-                        entry.published = True
-                        entry.save(refresh=True)
+                    search.publish(
+                        [coe_calc.to_calc_with_metadata() for coe_calc in coe_upload.calcs])
 
             with utils.timer(
                     logger, 'staged upload deleted', step='publish',
diff --git a/nomad/search.py b/nomad/search.py
index f9d06fe843..56322c32c0 100644
--- a/nomad/search.py
+++ b/nomad/search.py
@@ -16,10 +16,13 @@
 This module represents calculations in elastic search.
 """
 
+from typing import Iterable, Dict, Tuple, List
 from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
-    Object, Boolean, Search, Integer
+    Object, Boolean, Search, Integer, Q, A
+import elasticsearch.helpers
+import ase.data
 
-from nomad import config, datamodel, infrastructure, datamodel, coe_repo
+from nomad import config, datamodel, infrastructure, datamodel, coe_repo, parsing
 
 
 class AlreadyExists(Exception): pass
@@ -34,8 +37,9 @@ class User(InnerDoc):
         if 'first_name' not in user:
             user = coe_repo.User.from_user_id(user.id).to_popo()
 
-        self.name = '%s %s' % (user['first_name'], user['last_name'])
-        self.name_keyword = '%s %s' % (user['first_name'], user['last_name'])
+        name = '%s, %s' % (user['last_name'], user['first_name'])
+        self.name = name
+        self.name_keyword = name
 
         return self
 
@@ -74,8 +78,8 @@ class Entry(Document):
     with_embargo = Boolean()
     published = Boolean()
 
-    coauthors = Object(User)
-    shared_with = Object(User)
+    authors = Object(User, multi=True)
+    owners = Object(User, multi=True)
     comment = Text()
     references = Keyword()
     datasets = Object(Dataset)
@@ -95,6 +99,11 @@ class Entry(Document):
     geometries = Keyword(multi=True)
     quantities = Keyword(multi=True)
 
+    # def __init__(self, *args, **kwargs):
+    #     super().__init__(*args, **kwargs)
+    #     self.authors = []
+    #     self.owners = []
+
     @classmethod
     def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
         entry = Entry(meta=dict(id=source.calc_id))
@@ -113,8 +122,11 @@ class Entry(Document):
 
         self.with_embargo = source.with_embargo
         self.published = source.published
-        self.coauthors = [User.from_user_popo(user) for user in source.coauthors]
-        self.shared_with = [User.from_user_popo(user) for user in source.shared_with]
+        self.authors = [User.from_user_popo(user) for user in source.coauthors]
+        self.owners = [User.from_user_popo(user) for user in source.shared_with]
+        if self.uploader is not None:
+            self.authors.append(self.uploader)
+            self.owners.append(self.uploader)
         self.comment = source.comment
         self.references = [ref.value for ref in source.references]
         self.datasets = [Dataset.from_dataset_popo(ds) for ds in source.datasets]
@@ -149,33 +161,116 @@ class Entry(Document):
             self.n_total_energies = n_total_energies
             self.n_geometries = n_geometries
 
-    @classmethod
-    def update_by_query(cls, upload_id, script):
-        """ Update all entries of a given upload via elastic script. """
-        index = cls._default_index()
-        doc_type = cls._doc_type.name
-        conn = cls._get_connection()
-        body = {
-            'script': script,
-            'query': {
-                'match': {
-                    'upload_id': upload_id
-                }
-            }
-        }
-        conn.update_by_query(index, doc_type=[doc_type], body=body)
 
-    @classmethod
-    def publish_upload(cls, upload: datamodel.UploadWithMetadata):
-        cls.update_by_query(upload.upload_id, 'ctx._source["published"] = true')
-        # TODO run update on all calcs with their new metadata
-
-    @classmethod
-    def delete_upload(cls, upload_id):
-        index = cls._default_index()
-        Search(index=index).query('match', upload_id=upload_id).delete()
-
-    @staticmethod
-    def es_search(body):
-        """ Perform an elasticsearch and not elasticsearch_dsl search on the Calc index. """
-        return infrastructure.elastic_client.search(index=config.elastic.index_name, body=body)
+def delete_upload(upload_id):
+    """ Delete all entries with given ``upload_id`` from the index. """
+    index = Entry._default_index()
+    Search(index=index).query('match', upload_id=upload_id).delete()
+
+
+def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
+    """ Update all given calcs with their metadata and set ``publish = True``. """
+    def elastic_updates():
+        for calc in calcs:
+            entry = Entry.from_calc_with_metadata(calc)
+            entry.published = True
+            yield entry.to_dict(include_meta=True)
+
+    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
+
+
+default_aggregations = {
+    'atoms': len(ase.data.chemical_symbols),
+    'system': 10,
+    'crystal_system': 10,
+    'code_name': len(parsing.parsers),
+    'xc_functional': 10,
+    'authors': 10
+}
+
+
+def aggregate_search(
+        page: int = 0, per_page: int = 10, q: Q = None,
+        aggregations: Dict[str, int] = default_aggregations,
+        **kwargs) -> Tuple[int, List[dict], Dict[str, Dict[str, int]]]:
+    """
+    Performs a search and returns paginated search results and aggregation bucket sizes
+    based on key quantities.
+
+    Arguments:
+        page: The page to return starting with 0
+        per_page: Results per page
+        q: An *elasticsearch_dsl* query used to further filter the results (via `and`)
+        aggregations: A customized list of aggregations to perform. Keys are index fields,
+            and values the amount of buckets to return. Only works on *keyword* field.
+        **kwargs: Field, value pairs to search for.
+
+    Returns: A tuple with the total hits, an array with the results, an dictionary with
+        the aggregation data.
+    """
+
+    search = Search()
+    if q is not None:
+        search.query(q)
+    for key, value in kwargs.items():
+        if key == 'comment':
+            search = search.query(Q('match', **{key: value}))
+        else:
+            search = search.query(Q('term', **{key: value}))
+
+    for aggregation, size in aggregations.items():
+        if aggregation == 'authors':
+            search.aggs.bucket(aggregation, A('terms', field='authors.name_keyword', size=size))
+        else:
+            search.aggs.bucket(aggregation, A('terms', field=aggregation, size=size))
+
+    response = search[page * per_page: (page + 1) * per_page].execute()  # pylint: disable=no-member
+
+    total_results = response.hits.total
+    search_results = [hit.to_dict() for hit in response.hits]
+
+    aggregation_results = {
+        aggregation: {
+            bucket.key: bucket.doc_count
+            for bucket in getattr(response.aggregations, aggregation).buckets
+        }
+        for aggregation in aggregations.keys()
+    }
+
+    return total_results, search_results, aggregation_results
+
+
+def authors(per_page: int = 10, after: str = None, prefix: str = None) -> Tuple[Dict[str, int], str]:
+    """
+    Returns the name field for all authors with the number of their calculations in
+    their natural order.
+
+    The author buckets of :func:`search` is limit to the top 10 author. This function
+    in contrast, allows to paginate through all authors.
+
+    Arguments:
+        per_page: The number of authors to return
+        after: Only return the authors after the given ``name_keyword`` field value
+            (for pagination).
+        prefix: Used to do a prefix search on authors. Be aware the return authors also
+            contain the coauthors of the actual authors with prefix.
+
+    Returns: A tuple with an ordered dict containing author ``name_keyword`` field value
+        and number of calculations and the ``name_keyword`` value of the last author
+        (to be used as the next ``after`` value).
+    """
+    composite = dict(
+        size=per_page,
+        sources=dict(authors=dict(terms=dict(field='authors.name_keyword'))))
+    if after is not None:
+        composite.update(after=dict(authors=after))
+
+    body = dict(size=0, aggs=dict(authors=dict(composite=composite)))
+    if prefix is not None:
+        body.update(query=dict(prefix={'authors.name': prefix}))
+
+    response = infrastructure.elastic_client.search(index=config.elastic.index_name, body=body)
+    response = response['aggregations']['authors']
+    return {
+        bucket['key']['authors']: bucket['doc_count']
+        for bucket in response['buckets']}, response.get('after_key', {'authors': None})['authors']
diff --git a/requirements.txt b/requirements.txt
index d733c14988..fbf43a5253 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -61,3 +61,4 @@ pytest-timeout
 pytest-cov
 rope
 mongomock
+names
diff --git a/tests/test_datamodel.py b/tests/test_datamodel.py
new file mode 100644
index 0000000000..fe9f4c718b
--- /dev/null
+++ b/tests/test_datamodel.py
@@ -0,0 +1,106 @@
+# Copyright 2018 Markus Scheidgen
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an"AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import names
+import random
+from essential_generators import DocumentGenerator
+import datetime
+from ase.data import chemical_symbols
+
+from nomad import datamodel, parsing, utils
+
+number_of = 20
+
+random.seed(0)
+gen = DocumentGenerator()
+
+users = [(i + 1, names.get_first_name(), names.get_last_name(), gen.email()) for i in range(0, number_of)]
+basis_sets = ['Numeric AOs', 'Gaussians', '(L)APW+lo', 'Plane waves']
+xc_functionals = ['LDA', 'GGA', 'hybrid', 'meta-GGA', 'GW', 'unknown']
+crystal_systems = ['triclinic', 'monoclinic', 'orthorombic', 'tetragonal', 'hexagonal', 'cubic']
+systems = ['atom', 'molecule/cluster', '2D/surface', 'bulk']
+comments = [gen.sentence() for _ in range(0, number_of)]
+references = [(i + 1, gen.url()) for i in range(0, number_of)]
+datasets = [(i + 1, gen.slug()) for i in range(0, number_of)]
+codes = [parser[8:] for parser in parsing.parser_dict.keys()]
+files = ['/'.join(gen.url().split('/')[3:]) for _ in range(0, number_of)]
+
+low_numbers_for_atoms = [1, 1, 2, 2, 2, 2, 2, 3, 3, 4]
+low_numbers_for_refs_and_datasets = [0, 0, 0, 0, 1, 1, 1, 2]
+
+
+def _gen_user():
+    id, first, last, email = random.choice(users)
+    return utils.POPO(id=id, first_name=first, last_name=last, email=email)
+
+
+def _gen_dataset():
+    id, name = random.choice(datasets)
+    return utils.POPO(id=id, name=name, doi=_gen_ref())
+
+
+def _gen_ref():
+    id, value = random.choice(references)
+    return utils.POPO(id=id, value=value)
+
+
+def generate_calc(pid: int = 0) -> datamodel.CalcWithMetadata:
+    random.seed(pid)
+
+    self = datamodel.CalcWithMetadata()
+
+    self.upload_id = utils.create_uuid()
+    self.calc_id = utils.create_uuid()
+
+    self.upload_time = datetime.datetime.now()
+    self.calc_hash = utils.create_uuid()
+    self.pid = pid
+    self.mainfile = random.choice(files)
+    self.files = list([self.mainfile] + random.choices(files, k=random.choice(low_numbers_for_atoms)))
+    self.uploader = _gen_user()
+
+    self.with_embargo = random.choice([True, False])
+    self.published = True
+    self.coauthors = list(_gen_user() for _ in range(0, random.choice(low_numbers_for_refs_and_datasets)))
+    self.shared_with = list(_gen_user() for _ in range(0, random.choice(low_numbers_for_refs_and_datasets)))
+    self.comment = random.choice(comments)
+    self.references = list(_gen_ref() for _ in range(0, random.choice(low_numbers_for_refs_and_datasets)))
+    self.datasets = list(
+        _gen_dataset()
+        for _ in range(0, random.choice(low_numbers_for_refs_and_datasets)))
+
+    self.atoms = list(random.choices(chemical_symbols[1:], k=random.choice(low_numbers_for_atoms)))
+    self.formula = ''.join('%s%d' % (atom, random.choice(low_numbers_for_atoms)) for atom in self.atoms)
+    self.formula = self.formula.replace('1', '')
+
+    self.basis_set = random.choice(basis_sets)
+    self.xc_functional = random.choice(xc_functionals)
+    self.system = random.choice(systems)
+    self.crystal_system = random.choice(crystal_systems)
+    self.spacegroup = '1'
+    self.code_name = random.choice(codes)
+    self.code_version = '1.0.0'
+
+    return self
+
+
+if __name__ == '__main__':
+    import time
+    n = 2
+    start = time.time()
+    for pid in range(0, n):
+        calc = generate_calc(pid)
+        print(calc.to_dict())
+
+    print('%f' % ((time.time() - start) / n))
diff --git a/tests/test_search.py b/tests/test_search.py
index 5f930cfd06..c0419b2cb4 100644
--- a/tests/test_search.py
+++ b/tests/test_search.py
@@ -14,8 +14,8 @@
 
 from elasticsearch_dsl import Q
 
-from nomad import datamodel, search, processing, parsing
-from nomad.search import Entry
+from nomad import datamodel, search, processing, parsing, infrastructure, config, coe_repo
+from nomad.search import Entry, aggregate_search, authors
 
 
 def test_init_mapping(elastic):
@@ -47,12 +47,44 @@ def test_index_upload(elastic, processed: processing.Upload):
     pass
 
 
+def test_search(elastic, normalized: parsing.LocalBackend):
+    calc_with_metadata = normalized.to_calc_with_metadata()
+    create_entry(calc_with_metadata)
+    refresh_index()
+
+    total, hits, aggs = aggregate_search()
+    assert total == 1
+    assert hits[0]['calc_id'] == calc_with_metadata.calc_id
+    assert 'Bulk' in aggs['system']
+    assert aggs['system']['Bulk'] == 1
+
+
+def test_authors(elastic, normalized: parsing.LocalBackend, test_user: coe_repo.User, other_test_user: coe_repo.User):
+    calc_with_metadata = normalized.to_calc_with_metadata()
+    calc_with_metadata.uploader = test_user.to_popo()
+    create_entry(calc_with_metadata)
+    calc_with_metadata.calc_id = 'other test calc'
+    calc_with_metadata.uploader = other_test_user.to_popo()
+    create_entry(calc_with_metadata)
+    refresh_index()
+
+    results, after = authors(per_page=1)
+    assert len(results) == 1
+    name = list(results.keys())[0]
+    assert after == name
+
+
+def refresh_index():
+    infrastructure.elastic_client.indices.refresh(index=config.elastic.index_name)
+
+
 def create_entry(calc_with_metadata: datamodel.CalcWithMetadata):
-    search.Entry.from_calc_with_metadata(calc_with_metadata).save(refresh=True)
+    search.Entry.from_calc_with_metadata(calc_with_metadata).save()
     assert_entry(calc_with_metadata.calc_id)
 
 
 def assert_entry(calc_id):
+    refresh_index()
     calc = Entry.get(calc_id)
     assert calc is not None
 
@@ -63,6 +95,7 @@ def assert_entry(calc_id):
 
 
 def assert_search_upload(upload_id, published: bool = False):
+    refresh_index()
     search = Entry.search().query('match_all')[0:10]
     if search.count() > 0:
         for hit in search:
@@ -73,3 +106,22 @@ def assert_search_upload(upload_id, published: bool = False):
 
             for coauthor in hit.get('coauthors', []):
                 assert coauthor.get('name', None) is not None
+
+
+if __name__ == '__main__':
+    from test_datamodel import generate_calc
+    from elasticsearch.helpers import bulk
+    import sys
+    print('Generate index with random example calculation data. First arg is number of items')
+    infrastructure.setup_elastic()
+    n = 100
+    if len(sys.argv) > 1:
+        n = int(sys.argv[1])
+
+    def gen_data():
+        for pid in range(0, n):
+            calc = generate_calc(pid)
+            calc = Entry.from_calc_with_metadata(calc)
+            yield calc.to_dict(include_meta=True)
+
+    bulk(infrastructure.elastic_client, gen_data())
-- 
GitLab