search.py 15.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module represents calculations in elastic search.
"""

19
from typing import Iterable, Dict, Tuple, List
Markus Scheidgen's avatar
Markus Scheidgen committed
20
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
21
22
    Object, Boolean, Search, Q, A, analyzer, tokenizer
from elasticsearch_dsl.document import IndexMeta
23
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
24
from datetime import datetime
25

26
from nomad import config, datamodel, infrastructure, datamodel, coe_repo, utils
27

28
29
30
31
path_analyzer = analyzer(
    'path_analyzer',
    tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))

32
33

class AlreadyExists(Exception): pass
34
35


36
37
38
class ElasticSearchError(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
39
class User(InnerDoc):
40
41
42

    @classmethod
    def from_user_popo(cls, user):
43
        self = cls(user_id=user.id)
44
45
46
47

        if 'first_name' not in user:
            user = coe_repo.User.from_user_id(user.id).to_popo()

48
49
        name = '%s, %s' % (user['last_name'], user['first_name'])
        self.name = name
50
51

        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
52

53
    user_id = Keyword()
54
    name = Text(fields={'keyword': Keyword()})
Markus Scheidgen's avatar
Markus Scheidgen committed
55
56
57


class Dataset(InnerDoc):
58
59
60
61

    @classmethod
    def from_dataset_popo(cls, dataset):
        return cls(
Markus Scheidgen's avatar
Markus Scheidgen committed
62
63
64
65
66
67
68
69
70
            id=dataset.id,
            doi=dataset.doi.value if dataset.doi is not None else None,
            name=dataset.name)

    id = Keyword()
    doi = Keyword()
    name = Keyword()


71
72
73
class WithDomain(IndexMeta):
    """ Override elasticsearch_dsl metaclass to sneak in domain specific mappings """
    def __new__(cls, name, bases, attrs):
74
        for quantity in datamodel.Domain.instance.quantities.values():
75
76
77
78
79
80
            attrs[quantity.name] = quantity.elastic_mapping
        return super(WithDomain, cls).__new__(cls, name, bases, attrs)


class Entry(Document, metaclass=WithDomain):

81
    class Index:
Markus Scheidgen's avatar
Markus Scheidgen committed
82
        name = config.elastic.index_name
83

84
    upload_id = Keyword()
85
    upload_time = Date()
Markus Scheidgen's avatar
Markus Scheidgen committed
86
87
88
89
    calc_id = Keyword()
    calc_hash = Keyword()
    pid = Keyword()
    mainfile = Keyword()
90
    files = Text(multi=True, analyzer=path_analyzer, fields={'keyword': Keyword()})
91
    uploader = Object(User)
Markus Scheidgen's avatar
Markus Scheidgen committed
92

93
94
    with_embargo = Boolean()
    published = Boolean()
95

96
    processed = Boolean()
97
98
99
    last_processing = Date()
    nomad_version = Keyword()
    nomad_commit = Keyword()
100

101
102
    authors = Object(User, multi=True)
    owners = Object(User, multi=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
103
104
    comment = Text()
    references = Keyword()
105
    datasets = Object(Dataset)
Markus Scheidgen's avatar
Markus Scheidgen committed
106

107
    @classmethod
108
109
110
111
112
113
114
115
116
117
    def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
        entry = Entry(meta=dict(id=source.calc_id))
        entry.update(source)
        return entry

    def update(self, source: datamodel.CalcWithMetadata) -> None:
        self.upload_id = source.upload_id
        self.upload_time = source.upload_time
        self.calc_id = source.calc_id
        self.calc_hash = source.calc_hash
118
        self.pid = None if source.pid is None else str(source.pid)
119

120
        self.processed = source.processed
121
122
123
        self.last_processing = source.last_processing
        self.nomad_version = source.nomad_version
        self.nomad_commit = source.nomad_commit
124

125
        self.mainfile = source.mainfile
126
127
128
129
130
131
132
        if source.files is None:
            self.files = [self.mainfile]
        elif self.mainfile not in source.files:
            self.files = [self.mainfile] + source.files
        else:
            self.files = source.files

133
134
135
136
        self.uploader = User.from_user_popo(source.uploader) if source.uploader is not None else None

        self.with_embargo = source.with_embargo
        self.published = source.published
137
138
139
140
141
        self.authors = [User.from_user_popo(user) for user in source.coauthors]
        self.owners = [User.from_user_popo(user) for user in source.shared_with]
        if self.uploader is not None:
            self.authors.append(self.uploader)
            self.owners.append(self.uploader)
142
143
144
145
        self.comment = source.comment
        self.references = [ref.value for ref in source.references]
        self.datasets = [Dataset.from_dataset_popo(ds) for ds in source.datasets]

146
147
        for quantity in datamodel.Domain.instance.quantities.keys():
            setattr(self, quantity, getattr(source, quantity))
148
149


150
151
152
153
154
155
156
157
158
159
160
161
def delete_upload(upload_id):
    """ Delete all entries with given ``upload_id`` from the index. """
    index = Entry._default_index()
    Search(index=index).query('match', upload_id=upload_id).delete()


def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
    """ Update all given calcs with their metadata and set ``publish = True``. """
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry.published = True
162
163
164
165
166
            entry = entry.to_dict(include_meta=True)
            source = entry.pop('_source')
            entry['doc'] = source
            entry['_op_type'] = 'update'
            yield entry
167
168

    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
169
170
171
172
173
    refresh()


def refresh():
    infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
174
175


176
aggregations = datamodel.Domain.instance.aggregations
177
178
179
180
181
182
183
184
185
186
187
""" The available aggregations in :func:`aggregate_search` and their maximum aggregation size """

search_quantities = {
    'authors': ('term', 'authors.name.keyword', (
        'Search for the given author. Exact keyword matches in the form "Lastname, Firstname".')),

    'comment': ('match', 'comment', 'Search within the comments. This is a text search ala google.'),
    'paths': ('match', 'files', (
        'Search for elements in one of the file paths. The paths are split at all "/".')),

    'files': ('term', 'files.keyword', 'Search for exact file name with full path.'),
188
189
190
    'quantities': ('term', 'quantities', 'Search for the existence of a certain meta-info quantity'),
    'upload_id': ('term', 'upload_id', 'Search for the upload_id.'),
    'calc_id': ('term', 'calc_id', 'Search for the calc_id.'),
191
    'pid': ('term', 'pid', 'Search for the pid.'),
192
    'mainfile': ('term', 'mainfile', 'Search for the mainfile.')
193
194
195
196
197
}
"""
The available search quantities in :func:`aggregate_search` as tuples with *search type*,
elastic field and description.
"""
198

199
for quantity in datamodel.Domain.instance.quantities.values():
200
201
202
203
    search_spec = ('term', quantity.name, quantity.description)
    search_quantities[quantity.name] = search_spec


Markus Scheidgen's avatar
Markus Scheidgen committed
204
205
metrics = {
    'datasets': ('cardinality', 'datasets.id'),
206
    'unique_code_runs': ('cardinality', 'calc_hash')
Markus Scheidgen's avatar
Markus Scheidgen committed
207
}
208
209
210
211
212
213
"""
The available search metrics. Metrics are integer values given for each entry that can
be used in aggregations, e.g. the sum of all total energy calculations or cardinality of
all unique geometries.
"""

214
215
216
217
for key, value in datamodel.Domain.instance.metrics.items():
    metrics[key] = value

metrics_names = list(metric for metric in metrics.keys())
218
219
220


order_default_quantity = None
221
for quantity in datamodel.Domain.instance.quantities.values():
222
223
    if quantity.order_default:
        order_default_quantity = quantity.name
Markus Scheidgen's avatar
Markus Scheidgen committed
224

225

Markus Scheidgen's avatar
Markus Scheidgen committed
226
def _construct_search(q: Q = None, time_range: Tuple[datetime, datetime] = None, **kwargs) -> Search:
227
    search = Search(index=config.elastic.index_name)
228

229
    if q is not None:
230
231
        search = search.query(q)

Markus Scheidgen's avatar
Markus Scheidgen committed
232
233
234
    if time_range is not None:
        search = search.query('range', upload_time=dict(gte=time_range[0], lte=time_range[1]))

235
    for key, value in kwargs.items():
236
237
238
239
240
        query_type, field, _ = search_quantities.get(key, (None, None, None))
        if query_type is None:
            raise KeyError('Unknown quantity %s' % key)

        if isinstance(value, list):
241
            values = value
242
        else:
243
244
245
            values = [value]

        for item in values:
246
            if datamodel.Domain.instance.quantities[key].multi:
247
248
249
250
251
                items = item.split(',')
            else:
                items = [item]
            for item in items:
                search = search.query(Q(query_type, **{field: item}))
252

253
254
255
256
257
258
259
260
261
262
263
264
265
    search = search.source(exclude=['quantities'])

    return search


def scroll_search(
        scroll_id: str = None, size: int = 1000, scroll: str = u'5m',
        q: Q = None, **kwargs) -> Tuple[str, int, List[dict]]:
    """
    Alternative search based on ES scroll API. Can be used similar to
    :func:`aggregate_search`, but pagination is replaced with scrolling, no ordering,
    and no aggregation information is given.

266
    Scrolling is done by calling this function again and again with the same ``scroll_id``.
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
    Each time, this function will return the next batch of search results.

    See see :func:`aggregate_search` for additional ``kwargs``

    Arguments:
        scroll_id: The scroll id to receive the next batch from. None will create a new
            scroll.
        size: The batch size in number of hits.
        scroll: The time the scroll should be kept alive (i.e. the time between requests
            to this method) in ES time units. Default is 5 minutes.
    """
    es = infrastructure.elastic_client

    if scroll_id is None:
        # initiate scroll
        search = _construct_search(q, **kwargs)
Markus Scheidgen's avatar
Markus Scheidgen committed
283
        resp = es.search(body=search.to_dict(), scroll=scroll, size=size, index=config.elastic.index_name)  # pylint: disable=E1123
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307

        scroll_id = resp.get('_scroll_id')
        if scroll_id is None:
            # no results for search query
            return None, 0, []
    else:
        resp = es.scroll(scroll_id, scroll=scroll)  # pylint: disable=E1123

    total = resp['hits']['total']
    results = [hit['_source'] for hit in resp['hits']['hits']]

    # since we are using the low level api here, we should check errors
    if resp["_shards"]["successful"] < resp["_shards"]["total"]:
        utils.get_logger(__name__).error('es operation was unsuccessful on at least one shard')
        raise ElasticSearchError('es operation was unsuccessful on at least one shard')

    if len(results) == 0:
        es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))  # pylint: disable=E1123
        return None, total, []

    return scroll_id, total, results


def aggregate_search(
308
        page: int = 1, per_page: int = 10, order_by: str = order_default_quantity, order: int = -1,
Markus Scheidgen's avatar
Markus Scheidgen committed
309
310
311
        q: Q = None,
        time_range: Tuple[datetime, datetime] = None,
        aggregations: Dict[str, int] = aggregations,
312
313
        aggregation_metrics: List[str] = [],
        total_metrics: List[str] = [],
Markus Scheidgen's avatar
Markus Scheidgen committed
314
        **kwargs) -> Tuple[int, List[dict], Dict[str, Dict[str, Dict[str, int]]], Dict[str, int]]:
315
    """
Markus Scheidgen's avatar
Markus Scheidgen committed
316
    Performs a search and returns paginated search results and aggregations. The aggregations
317
318
319
    contain overall and per quantity value sums of code runs (calcs), unique code runs, datasets,
    and additional domain specific metrics (e.g. total energies, and unique geometries for DFT
    calculations).
320
321
322
323

    Arguments:
        page: The page to return starting with page 1
        per_page: Results per page
Markus Scheidgen's avatar
Markus Scheidgen committed
324
325
        q: An *elasticsearch_dsl* query used to further filter the results (via ``and``)
        time_range: A tuple to filter for uploads within with start, end ``upload_time``.
326
327
        aggregations: A customized list of aggregations to perform. Keys are index fields,
            and values the amount of buckets to return. Only works on *keyword* field.
328
        aggregation_metrics: The metrics used to aggregate over. Can be ``unique_code_runs``, ``datasets``,
329
330
            other domain specific metrics. The basic doc_count metric ``code_runs`` is always given.
        total_metrics: The metrics used to for total numbers (see ``aggregation_metrics``).
331
332
333
        **kwargs: Quantity, value pairs to search for.

    Returns: A tuple with the total hits, an array with the results, an dictionary with
Markus Scheidgen's avatar
Markus Scheidgen committed
334
        the aggregation data, and a dictionary with the overall metrics.
335
336
    """

Markus Scheidgen's avatar
Markus Scheidgen committed
337
    search = _construct_search(q, time_range, **kwargs)
338

Markus Scheidgen's avatar
Markus Scheidgen committed
339
340
341
342
    def add_metrics(parent, metrics_to_add):
        for metric in metrics_to_add:
            metric_kind, field = metrics[metric]
            parent.metric(metric, A(metric_kind, field=field))
Markus Scheidgen's avatar
Markus Scheidgen committed
343

344
    for aggregation, size in aggregations.items():
Markus Scheidgen's avatar
Markus Scheidgen committed
345

346
        if aggregation == 'authors':
Markus Scheidgen's avatar
Markus Scheidgen committed
347
            a = A('terms', field='authors.name_keyword', size=size)
348
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
349
350
351
            a = A('terms', field=aggregation, size=size, min_doc_count=0, order=dict(_key='asc'))

        buckets = search.aggs.bucket(aggregation, a)
Markus Scheidgen's avatar
Markus Scheidgen committed
352
        add_metrics(buckets, aggregation_metrics)
Markus Scheidgen's avatar
Markus Scheidgen committed
353

Markus Scheidgen's avatar
Markus Scheidgen committed
354
    add_metrics(search.aggs, total_metrics)
355

356
357
358
359
    if order_by not in search_quantities:
        raise KeyError('Unknown order quantity %s' % order_by)
    search = search.sort(order_by if order == 1 else '-%s' % order_by)

Markus Scheidgen's avatar
Markus Scheidgen committed
360
    response = search[(page - 1) * per_page: page * per_page].execute()  # pylint: disable=E1101
361
362
363
364

    total_results = response.hits.total
    search_results = [hit.to_dict() for hit in response.hits]

Markus Scheidgen's avatar
Markus Scheidgen committed
365
366
367
368
369
370
371
372
    def get_metrics(bucket, metrics_to_get, code_runs):
        result = {
            metric: bucket[metric]['value']
            for metric in metrics_to_get
        }
        result.update(code_runs=code_runs)
        return result

373
374
    aggregation_results = {
        aggregation: {
Markus Scheidgen's avatar
Markus Scheidgen committed
375
            bucket.key: get_metrics(bucket, aggregation_metrics, bucket.doc_count)
376
377
378
            for bucket in getattr(response.aggregations, aggregation).buckets
        }
        for aggregation in aggregations.keys()
379
        if aggregation not in metrics_names
Markus Scheidgen's avatar
Markus Scheidgen committed
380
381
    }

Markus Scheidgen's avatar
Markus Scheidgen committed
382
    total_metrics_result = get_metrics(response.aggregations, total_metrics, total_results)
383

Markus Scheidgen's avatar
Markus Scheidgen committed
384
    return total_results, search_results, aggregation_results, total_metrics_result
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407


def authors(per_page: int = 10, after: str = None, prefix: str = None) -> Tuple[Dict[str, int], str]:
    """
    Returns the name field for all authors with the number of their calculations in
    their natural order.

    The author buckets of :func:`search` is limit to the top 10 author. This function
    in contrast, allows to paginate through all authors.

    Arguments:
        per_page: The number of authors to return
        after: Only return the authors after the given ``name_keyword`` field value
            (for pagination).
        prefix: Used to do a prefix search on authors. Be aware the return authors also
            contain the coauthors of the actual authors with prefix.

    Returns: A tuple with an ordered dict containing author ``name_keyword`` field value
        and number of calculations and the ``name_keyword`` value of the last author
        (to be used as the next ``after`` value).
    """
    composite = dict(
        size=per_page,
408
        sources=dict(authors=dict(terms=dict(field='authors.name.keyword'))))
409
410
411
412
413
414
415
416
417
418
419
420
    if after is not None:
        composite.update(after=dict(authors=after))

    body = dict(size=0, aggs=dict(authors=dict(composite=composite)))
    if prefix is not None:
        body.update(query=dict(prefix={'authors.name': prefix}))

    response = infrastructure.elastic_client.search(index=config.elastic.index_name, body=body)
    response = response['aggregations']['authors']
    return {
        bucket['key']['authors']: bucket['doc_count']
        for bucket in response['buckets']}, response.get('after_key', {'authors': None})['authors']