search.py 25.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
This module represents calculations in elastic search.
17
'''
18

19
20
from typing import Iterable, Dict, List, Any
from elasticsearch_dsl import Search, Q, A, analyzer, tokenizer
21
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
22
from elasticsearch.exceptions import NotFoundError
Markus Scheidgen's avatar
Markus Scheidgen committed
23
from datetime import datetime
24
import json
25

26
from nomad import config, datamodel, infrastructure, datamodel, utils
27
from nomad.metainfo.search_extension import search_quantities, metrics, order_default_quantities, default_statistics
28

29

30
31
32
33
path_analyzer = analyzer(
    'path_analyzer',
    tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))

34
35

class AlreadyExists(Exception): pass
36
37


38
39
40
class ElasticSearchError(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
43
class ScrollIdNotFound(Exception): pass


44
entry_document = datamodel.EntryMetadata.m_def.a_elastic.document
45
46
47

for domain in datamodel.domains:
    order_default_quantities.setdefault(domain, order_default_quantities.get('__all__'))
48
    default_statistics.setdefault(domain, []).extend(default_statistics.get('__all__'))
49
50


51
def delete_upload(upload_id):
52
    ''' Delete all entries with given ``upload_id`` from the index. '''
53
    index = entry_document._default_index()
54
55
56
    Search(index=index).query('match', upload_id=upload_id).delete()


57
def delete_entry(calc_id):
58
    ''' Delete the entry with the given ``calc_id`` from the index. '''
59
    index = entry_document._default_index()
60
61
62
    Search(index=index).query('match', calc_id=calc_id).delete()


63
64
def publish(calcs: Iterable[datamodel.EntryMetadata]) -> None:
    ''' Update all given calcs with their metadata and set ``publish = True``. '''
65
66
    def elastic_updates():
        for calc in calcs:
67
            entry = calc.a_elastic.create_index_entry()
68
            entry.published = True
69
70
71
72
73
            entry = entry.to_dict(include_meta=True)
            source = entry.pop('_source')
            entry['doc'] = source
            entry['_op_type'] = 'update'
            yield entry
74
75

    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
76
77
78
    refresh()


79
80
def index_all(calcs: Iterable[datamodel.EntryMetadata], do_refresh=True) -> None:
    '''
81
82
83
84
    Adds all given calcs with their metadata to the index.

    Returns:
        Number of failed entries.
85
    '''
86
87
    def elastic_updates():
        for calc in calcs:
88
            entry = calc.a_elastic.create_index_entry()
89
90
91
92
            entry = entry.to_dict(include_meta=True)
            entry['_op_type'] = 'index'
            yield entry

93
    _, failed = elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates(), stats_only=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
94

Markus Scheidgen's avatar
Markus Scheidgen committed
95
    if do_refresh:
Markus Scheidgen's avatar
Markus Scheidgen committed
96
97
        refresh()

98
    return failed
99
100


101
102
def refresh():
    infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
103
104


105
class SearchRequest:
106
    '''
107
108
    Represents a search request and allows to execute that request.
    It allows to compose the following features: a query;
109
110
111
    statistics (metrics and aggregations); quantity values; scrolling, pagination for entries;
    scrolling for quantity values.

112
113
114
    The query part filters NOMAD data before the other features come into effect. There
    are specialized methods for configuring the :func:`owner` and :func:`time_range` queries.
    Quantity's can be search for by setting them as attributes.
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133

    The aggregations for statistics can be requested for pre-configured quantities. These
    bucket aggregations come with a metric calculated for each each possible
    quantity value.

    The other possible form of aggregations, allows to get quantity values as results
    (e.g. get all datasets, get all users, etc.). Each value can be accompanied by metrics
    (over all entries with that value) and an example value.

    Of course, searches can return a set of search results. Search objects can be
    configured with pagination or scrolling for these results. Pagination is the default
    and also allows ordering of results. Scrolling can be used if all entries need to be
    'scrolled through'. This might be necessary, since elastic search has limits on
    possible pages (e.g. 'from' must by smaller than 10000). On the downside, there is no
    ordering on scrolling.

    There is also scrolling for quantities to go through all quantity values. There is no
    paging for aggregations.
    '''
134
    def __init__(self, domain: str = config.default_domain, query=None):
135
        self._domain = domain
136
137
        self._query = query
        self._search = Search(index=config.elastic.index_name)
138

139
    def domain(self, domain: str = None):
140
        '''
141
142
        Applies the domain of this request to the query. Allows to optionally update
        the domain of this request.
143
        '''
144
145
146
147
148
149
        if domain is not None:
            self._domain = domain

        self.q = self.q & Q('term', domain=self._domain)
        return self

150
    def owner(self, owner_type: str = 'all', user_id: str = None):
151
        '''
152
153
        Uses the query part of the search to restrict the results based on the owner.
        The possible types are: ``all`` for all calculations; ``public`` for
154
155
156
157
        calculations visible by everyone, excluding embargo-ed entries and entries only visible
        to the given user; ``visible`` all data that is visible by the user, excluding
        embargo-ed entries from other users; ``user`` for all calculations of to the given
        user; ``staging`` for all calculations in staging of the given user.
158
159
160
161

        Arguments:
            owner_type: The type of the owner query, see above.
            user_id: The 'owner' given as the user's unique id.
162
163
164
165
166

        Raises:
            KeyError: If the given owner_type is not supported
            ValueError: If the owner_type requires a user but none is given, or the
                given user is not allowed to use the given owner_type.
167
        '''
168
        if owner_type == 'all':
169
            q = Q('term', published=True)
170
171
172
173
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
        elif owner_type == 'public':
            q = Q('term', published=True) & Q('term', with_embargo=False)
174
175
176
177
        elif owner_type == 'visible':
            q = Q('term', published=True) & Q('term', with_embargo=False)
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
178
179
180
181
182
        elif owner_type == 'shared':
            if user_id is None:
                raise ValueError('Authentication required for owner value shared.')

            q = Q('term', owners__user_id=user_id)
183
184
185
186
        elif owner_type == 'user':
            if user_id is None:
                raise ValueError('Authentication required for owner value user.')

187
            q = Q('term', uploader__user_id=user_id)
188
189
190
191
192
        elif owner_type == 'staging':
            if user_id is None:
                raise ValueError('Authentication required for owner value user')
            q = Q('term', published=False) & Q('term', owners__user_id=user_id)
        elif owner_type == 'admin':
Markus Scheidgen's avatar
Markus Scheidgen committed
193
            if user_id is None or not datamodel.User.get(user_id=user_id).is_admin:
194
195
196
197
                raise ValueError('This can only be used by the admin user.')
            q = None
        else:
            raise KeyError('Unsupported owner value')
198

199
        if q is not None:
200
            self.q = self.q & q
201

202
        return self
203

204
    def search_parameters(self, **kwargs):
205
        '''
206
207
        Configures the existing query with additional search parameters. Kwargs are
        interpreted as key value pairs. Keys have to coresspond to valid entry quantities
208
209
        in the domain's (DFT calculations) datamodel. Alternatively search parameters
        can be set via attributes.
210
        '''
211
212
        for name, value in kwargs.items():
            self.search_parameter(name, value)
Markus Scheidgen's avatar
Markus Scheidgen committed
213

214
        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
215

216
    def search_parameter(self, name, value):
217
        quantity = search_quantities[name]
218

219
        if quantity.many and not isinstance(value, list):
220
221
            value = [value]

222
        if quantity.many_or and isinstance(value, List):
223
            self.q &= Q('terms', **{quantity.search_field: value})
224
            return self
225

226
227
        if quantity.derived:
            if quantity.many and not isinstance(value, list):
228
                value = [value]
229
            value = quantity.derived(value)
230

231
        if isinstance(value, list):
232
            values = value
233
        else:
234
            values = [value]
235
236

        for item in values:
237
            self.q &= Q('match', **{quantity.search_field: item})
238

239
        return self
240

241
    def query(self, query):
242
        ''' Adds the given query as a 'and' (i.e. 'must') clause to the request. '''
243
        self._query &= query
244

245
        return self
246

247
    def time_range(self, start: datetime, end: datetime):
248
        ''' Adds a time range to the query. '''
249
250
        if start is None and end is None:
            return self
251

252
253
254
255
        if start is None:
            start = datetime.fromtimestamp(0)
        if end is None:
            end = datetime.utcnow()
256

257
        self.q &= Q('range', upload_time=dict(gte=start, lte=end))
258

259
        return self
260

261
262
    @property
    def q(self):
263
        ''' The underlying elasticsearch_dsl query object '''
264
265
        if self._query is None:
            return Q('match_all')
266
267
        else:
            return self._query
268

269
    @q.setter
270
    def q(self, q):
271
        self._query = q
272

273
    def totals(self, metrics_to_use: List[str] = []):
274
        '''
275
        Configure the request to return overall totals for the given metrics.
276

277
278
279
        The statics are returned with the other quantity statistics under the pseudo
        quantity name 'total'. 'total' contains the pseudo value 'all'. It is used to
        store the metrics aggregated over all entries in the search results.
280
        '''
281
282
        self._add_metrics(self._search.aggs, metrics_to_use)
        return self
283

284
    def default_statistics(self, metrics_to_use: List[str] = []):
285
        '''
286
        Configures the domain's default statistics.
287
288
        '''
        for search_quantity in default_statistics[self._domain]:
289
            statistic_order = search_quantity.statistic_order
290
            self.statistic(
291
292
                search_quantity.qualified_name,
                search_quantity.statistic_size,
293
                metrics_to_use=metrics_to_use,
294
                order={statistic_order: 'asc' if statistic_order == '_key' else 'desc'})
295

296
        return self
297

298
299
300
    def statistic(
            self, quantity_name: str, size: int, metrics_to_use: List[str] = [],
            order: Dict[str, str] = dict(_key='asc')):
301
        '''
302
303
304
305
306
307
308
309
310
311
312
313
314
        This can be used to display statistics over the searched entries and allows to
        implement faceted search on the top values for each quantity.

        The metrics contain overall and per quantity value sums of code runs (calcs),
        unique code runs, datasets, and additional domain specific metrics
        (e.g. total energies, and unique geometries for DFTcalculations). The quantities
        that can be aggregated to metrics are defined in module:`datamodel`. Aggregations
        and respective metrics are calculated for aggregations given in ``aggregations``
        and metrics in ``aggregation_metrics``. As a pseudo aggregation ``total_metrics``
        are calculation over all search results. The ``aggregations`` gives tuples of
        quantities and default aggregation sizes.

        The search results will contain a dictionary ``statistics``. This has a key
315
        for each configured quantity. Each quantity key will hold a dict
316
317
        with a key for each quantity value. Each quantity value key will hold a dict
        with a key for each metric. The values will be the actual aggregated metric values.
318

319
        Arguments:
320
            quantity_name: The quantity to aggregate statistics for. Only works on *keyword* field.
321
322
323
            metrics_to_use: The metrics calculated over the aggregations. Can be
                ``unique_code_runs``, ``datasets``, other domain specific metrics.
                The basic doc_count metric ``code_runs`` is always given.
324
            order: The order dictionary is passed to the elastic search aggregation.
325
326
        '''
        quantity = search_quantities[quantity_name]
327
        terms = A('terms', field=quantity.search_field, size=size, order=order)
328

329
330
        buckets = self._search.aggs.bucket('statistics:%s' % quantity_name, terms)
        self._add_metrics(buckets, metrics_to_use)
331

332
        return self
333

334
335
336
    def _add_metrics(self, parent=None, metrics_to_use: List[str] = []):
        if parent is None:
            parent = self._search.aggs
337

338
        for metric in metrics_to_use:
339
            metric_quantity = metrics[metric]
340
            field = metric_quantity.search_field
341
342
343
            parent.metric(
                'metric:%s' % metric_quantity.metric_name,
                A(metric_quantity.metric, field=field))
344

345
    def date_histogram(self, metrics_to_use: List[str] = [], interval: str = '1M'):
346
        '''
347
        Adds a date histogram on the given metrics to the statistics part.
348
        '''
349
        histogram = A('date_histogram', field='upload_time', interval=interval, format='yyyy-MM-dd')
350
        self._add_metrics(self._search.aggs.bucket('statistics:date_histogram', histogram), metrics_to_use)
351

352
        return self
353

354
    def quantities(self, **kwargs):
355
        '''
356
357
        Shorthand for adding multiple quantities. See :func:`quantity`. Keywork argument
        keys are quantity name, values are tuples of size and after value.
358
        '''
359
360
361
        for name, spec in kwargs:
            size, after = spec
            self.quantity(name, after=after, size=size)
362

363
        return self
364

365
366
367
    def quantity(
            self, name, size=100, after=None, examples=0, examples_source=None,
            order_by: str = None, order: str = 'desc'):
368
        '''
369
        Adds a requests for values of the given quantity.
370
371
372
        It allows to scroll through all values via elasticsearch's
        composite aggregations. The response will contain the quantity values and
        an example entry for each value.
373

374
375
        This can be used to implement continues scrolling through authors, datasets,
        or uploads within the searched entries.
376

377
378
        If one or more quantities are specified,
        the search results will contain a dictionary ``quantities``. The keys are quantity
379
380
        name the values dictionary with 'after' and 'values' key.
        The 'values' key holds a dict with all the values as keys and their entry count
381
382
        as values (i.e. number of entries with that value).

383
        Arguments:
384
            name: The quantity name. Must be in :data:`quantities`.
385
386
387
388
389
390
            after: The 'after' value allows to scroll over various requests, by providing
                the 'after' value of the last search. The 'after' value is part of the
                response. Use ``None`` in the first request.
            size:
                The size gives the ammount of maximum values in the next scroll window.
                If the size is None, a maximum of 100 quantity values will be requested.
391
392
393
394
395
396
397
            examples:
                Number of results to return that has each value
            order_by:
                A sortable quantity that should be used to order. The max of each
                value bucket is used.
            order:
                "desc" or "asc"
398
        '''
399
400
401
        if size is None:
            size = 100

402
        quantity = search_quantities[name]
403
        terms = A('terms', field=quantity.search_field)
404

405
406
407
        # We are using elastic searchs 'composite aggregations' here. We do not really
        # compose aggregations, but only those pseudo composites allow us to use the
        # 'after' feature that allows to scan through all aggregation values.
408
409
410
        if order_by is None:
            composite = dict(sources={name: terms}, size=size)
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
411
            sort_terms = A('terms', field=order_by, order=order)
412
            composite = dict(sources=[{order_by: sort_terms}, {name: terms}], size=size)
413
        if after is not None:
414
415
416
417
418
419
            if order_by is None:
                composite['after'] = {name: after}
            else:
                composite['after'] = {order_by: after, name: ''}

        composite_agg = self._search.aggs.bucket('quantity:%s' % name, 'composite', **composite)
420

421
        if examples > 0:
422
            kwargs: Dict[str, Any] = {}
423
424
            if examples_source is not None:
                kwargs.update(_source=dict(includes=examples_source))
425

426
            composite_agg.metric('examples', A('top_hits', size=examples, **kwargs))
427

428
        return self
429

430
    def exclude(self, *args):
431
        ''' Exclude certain elastic fields from the search results. '''
432
433
434
        self._search = self._search.source(excludes=args)
        return self

435
    def include(self, *args):
436
        ''' Include only the given fields in the search results. '''
437
438
439
        self._search = self._search.source(includes=args)
        return self

440
    def execute(self):
441
        '''
442
443
        Exectutes without returning actual results. Only makes sense if the request
        was configured for statistics or quantity values.
444
        '''
445
446
447
        search = self._search.query(self.q)[0:0]
        response = search.execute()
        return self._response(response)
448

449
    def execute_scan(self, order_by: str = None, order: int = -1, **kwargs):
450
        '''
451
452
        This execute the search as scan. The result will be a generator over the found
        entries. Everything but the query part of this object, will be ignored.
453
        '''
454
455
456
        search = self._search.query(self.q)

        if order_by is not None:
457
            order_by_quantity = search_quantities[order_by]
458
459

            if order == 1:
460
                search = search.sort(order_by_quantity.search_field)
461
            else:
462
                search = search.sort('-%s' % order_by_quantity.search_field)
463

464
465
            search = search.params(preserve_order=True)

466
        for hit in search.params(**kwargs).scan():
467
            yield hit.to_dict()
468

469
    def execute_paginated(
470
            self, page: int = 1, per_page=10, order_by: str = None,
471
            order: int = -1):
472
        '''
473
474
475
476
477
478
479
        Executes the search and returns paginated results. Those are sorted.

        Arguments:
            page: The requested page, starts with 1.
            per_page: The number of entries per page.
            order_by: The quantity to order by.
            order: -1 or 1 for descending or ascending order.
480
        '''
481
        if order_by is None:
482
483
484
            order_by_quantity = order_default_quantities[self._domain]
        else:
            order_by_quantity = search_quantities[order_by]
485

486
        search = self._search.query(self.q)
487
488

        if order == 1:
489
            search = search.sort(order_by_quantity.search_field)
490
        else:
491
            search = search.sort('-%s' % order_by_quantity.search_field)
492
        search = search[(page - 1) * per_page: page * per_page]
493

Markus Scheidgen's avatar
Markus Scheidgen committed
494
        es_result = search.execute()
495

Markus Scheidgen's avatar
Markus Scheidgen committed
496
497
        result = self._response(es_result, with_hits=True)

498
        result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
499
500
        return result

501
502
503
    def execute_scrolled(
            self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m',
            order_by: str = None, order: int = -1):
504
        '''
505
506
        Executes a scrolling search. based on ES scroll API. Pagination is replaced with
        scrolling, no ordering is available, no statistics, no quantities will be provided.
507

508
509
510
511
        Scrolling is done by calling this function again and again with the same ``scroll_id``.
        Each time, this function will return the next batch of search results. If the
        ``scroll_id`` is not available anymore, a new ``scroll_id`` is assigned and scrolling
        starts from the beginning again.
512

513
514
        The response will contain a 'scroll' part with attributes 'total', 'scroll_id',
        and 'size'.
515

516
517
518
519
520
521
        Arguments:
            scroll_id: The scroll id to receive the next batch from. None will create a new
                scroll.
            size: The batch size in number of hits.
            scroll: The time the scroll should be kept alive (i.e. the time between requests
                to this method) in ES time units. Default is 5 minutes.
522
523

        TODO support order and order_by
524
        '''
525
        es = infrastructure.elastic_client
526

527
528
529
        if scroll_id is None:
            # initiate scroll
            resp = es.search(  # pylint: disable=E1123
530
                body=self._search.query(self.q).to_dict(), scroll=scroll, size=size,
531
                index=config.elastic.index_name)
532

533
534
535
536
            scroll_id = resp.get('_scroll_id')
            if scroll_id is None:
                # no results for search query
                return dict(scroll=dict(total=0, size=size), results=[])
537

538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
        else:
            try:
                resp = es.scroll(scroll_id, scroll=scroll)  # pylint: disable=E1123
            except NotFoundError:
                raise ScrollIdNotFound()

        total = resp['hits']['total']
        results = list(hit['_source'] for hit in resp['hits']['hits'])

        # since we are using the low level api here, we should check errors
        if resp["_shards"]["successful"] < resp["_shards"]["total"]:
            utils.get_logger(__name__).error('es operation was unsuccessful on at least one shard')
            raise ElasticSearchError('es operation was unsuccessful on at least one shard')

        if len(results) == 0:
            es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))  # pylint: disable=E1123
            scroll_id = None

        scroll_info = dict(total=total, size=size)
        if scroll_id is not None:
            scroll_info.update(scroll_id=scroll_id)

        return dict(scroll=scroll_info, results=results)

562
    def _response(self, response, with_hits: bool = False) -> Dict[str, Any]:
563
        '''
564
        Prepares a response object covering the total number of results, hits, statistics,
565
566
        and quantities. Other aspects like pagination and scrolling have to be added
        elsewhere.
567
        '''
568
        result: Dict[str, Any] = dict()
569
        aggs = response.aggregations.to_dict()
570

571
        # total
572
573
574
575
        total = response.hits.total if hasattr(response, 'hits') else 0
        result.update(total=total)

        # hits
576
577
        if len(response.hits) > 0 or with_hits:
            result.update(results=[hit.to_dict() for hit in response.hits])
578
579
580

        # statistics
        def get_metrics(bucket, code_runs):
581
            result = {}
582
            # TODO optimize ... go through the buckets not the metrics
583
            for metric in metrics:
584
585
586
                agg_name = 'metric:%s' % metric
                if agg_name in bucket:
                    result[metric] = bucket[agg_name]['value']
587
            result.update(code_runs=code_runs)
588
589
            return result

590
        statistics_results = {
591
            quantity_name[11:]: {
592
                str(bucket['key']): get_metrics(bucket, bucket['doc_count'])
593
                for bucket in quantity['buckets']
594
            }
595
            for quantity_name, quantity in aggs.items()
596
597
            if quantity_name.startswith('statistics:')
        }
598

599
600
601
        # totals
        totals_result = get_metrics(aggs, total)
        statistics_results['total'] = dict(all=totals_result)
Markus Scheidgen's avatar
Markus Scheidgen committed
602

603
604
        if len(statistics_results) > 0:
            result.update(statistics=statistics_results)
Markus Scheidgen's avatar
Markus Scheidgen committed
605

606
        # quantities
607
        def create_quantity_result(quantity_name, quantity):
608
609
610
611
612
613
614
            values = {}
            for bucket in quantity['buckets']:
                value = dict(
                    total=bucket['doc_count'])
                if 'examples' in bucket:
                    examples = [hit['_source'] for hit in bucket['examples']['hits']['hits']]
                    value.update(examples=examples)
Markus Scheidgen's avatar
Markus Scheidgen committed
615

616
                values[bucket['key'][quantity_name]] = value
617

618
            result = dict(values=values)
619
            if 'after_key' in quantity:
620
621
622
623
624
625
626
627
                after = quantity['after_key']
                if len(after) == 1:
                    result.update(after=after[quantity_name])
                else:
                    for key in after:
                        if key != quantity_name:
                            result.update(after=after[key])
                            break
628

629
            return result
630

631
        quantity_results = {
632
633
634
            quantity_name[9:]: create_quantity_result(quantity_name[9:], quantity)
            for quantity_name, quantity in aggs.items()
            if quantity_name.startswith('quantity:')
Markus Scheidgen's avatar
Markus Scheidgen committed
635
636
        }

637
638
        if len(quantity_results) > 0:
            result.update(quantities=quantity_results)
639
640

        return result
641

642
643
    def __str__(self):
        return json.dumps(self._search.to_dict(), indent=2)
644
645


646
def flat(obj, prefix=None):
647
    '''
648
649
    Helper that translates nested result objects into flattened dicts with
    ``domain.quantity`` as keys.
650
    '''
651
652
653
654
    if isinstance(obj, dict):
        result = {}
        for key, value in obj.items():
            if isinstance(value, dict):
655
                value = flat(value)
656
                for child_key, child_value in value.items():
657
                    result['%s.%s' % (key, child_key)] = child_value
658
659
660
661
662
663
664

            else:
                result[key] = value

        return result
    else:
        return obj