search.py 28 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module represents calculations in elastic search.
"""

19
from typing import Iterable, Dict, List, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
20
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
21
22
    Object, Boolean, Search, Q, A, analyzer, tokenizer
from elasticsearch_dsl.document import IndexMeta
23
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
24
from elasticsearch.exceptions import NotFoundError
Markus Scheidgen's avatar
Markus Scheidgen committed
25
from datetime import datetime
26
import json
27

Markus Scheidgen's avatar
Markus Scheidgen committed
28
from nomad import config, datamodel, infrastructure, datamodel, utils, processing as proc
29

30

31
32
33
34
path_analyzer = analyzer(
    'path_analyzer',
    tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))

35
36

class AlreadyExists(Exception): pass
37
38


39
40
41
class ElasticSearchError(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
class ScrollIdNotFound(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
45
class User(InnerDoc):
46
47

    @classmethod
Markus Scheidgen's avatar
Markus Scheidgen committed
48
49
    def from_user(cls, user):
        self = cls(user_id=user.user_id)
50
51
        self.name = user.name
        self.email = user.email
52
53

        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
54

55
    user_id = Keyword()
56
    email = Keyword()
57
    name = Text(fields={'keyword': Keyword()})
Markus Scheidgen's avatar
Markus Scheidgen committed
58
59
60


class Dataset(InnerDoc):
61
62

    @classmethod
63
    def from_dataset_id(cls, dataset_id):
64
        dataset = datamodel.Dataset.m_def.m_x('me').get(dataset_id=dataset_id)
65
        return cls(id=dataset.dataset_id, doi=dataset.doi, name=dataset.name)
Markus Scheidgen's avatar
Markus Scheidgen committed
66
67
68
69
70
71

    id = Keyword()
    doi = Keyword()
    name = Keyword()


72
73
74
class WithDomain(IndexMeta):
    """ Override elasticsearch_dsl metaclass to sneak in domain specific mappings """
    def __new__(cls, name, bases, attrs):
75
        for quantity in datamodel.Domain.instance.domain_quantities.values():
76
77
78
79
80
81
            attrs[quantity.name] = quantity.elastic_mapping
        return super(WithDomain, cls).__new__(cls, name, bases, attrs)


class Entry(Document, metaclass=WithDomain):

82
    class Index:
Markus Scheidgen's avatar
Markus Scheidgen committed
83
        name = config.elastic.index_name
84

85
    upload_id = Keyword()
86
    upload_time = Date()
87
    upload_name = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
88
89
90
    calc_id = Keyword()
    calc_hash = Keyword()
    pid = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
91
    raw_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
92
    mainfile = Keyword()
93
    files = Text(multi=True, analyzer=path_analyzer, fields={'keyword': Keyword()})
94
    uploader = Object(User)
Markus Scheidgen's avatar
Markus Scheidgen committed
95

96
97
    with_embargo = Boolean()
    published = Boolean()
98

99
    processed = Boolean()
100
101
102
    last_processing = Date()
    nomad_version = Keyword()
    nomad_commit = Keyword()
103

104
105
    authors = Object(User, multi=True)
    owners = Object(User, multi=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
106
107
    comment = Text()
    references = Keyword()
108
    datasets = Object(Dataset)
109
    external_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
110

111
    @classmethod
112
113
114
115
116
117
118
119
    def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
        entry = Entry(meta=dict(id=source.calc_id))
        entry.update(source)
        return entry

    def update(self, source: datamodel.CalcWithMetadata) -> None:
        self.upload_id = source.upload_id
        self.upload_time = source.upload_time
120
        self.upload_name = source.upload_name
121
122
        self.calc_id = source.calc_id
        self.calc_hash = source.calc_hash
123
        self.pid = None if source.pid is None else str(source.pid)
Markus Scheidgen's avatar
Markus Scheidgen committed
124
        self.raw_id = None if source.raw_id is None else str(source.raw_id)
125

126
        self.processed = source.processed
127
128
129
        self.last_processing = source.last_processing
        self.nomad_version = source.nomad_version
        self.nomad_commit = source.nomad_commit
130

131
        self.mainfile = source.mainfile
132
133
134
135
136
137
138
        if source.files is None:
            self.files = [self.mainfile]
        elif self.mainfile not in source.files:
            self.files = [self.mainfile] + source.files
        else:
            self.files = source.files

Markus Scheidgen's avatar
Markus Scheidgen committed
139
        self.with_embargo = bool(source.with_embargo)
140
        self.published = source.published
Markus Scheidgen's avatar
Markus Scheidgen committed
141
142
143
144
145
146
147
148
149
150

        uploader = datamodel.User.get(user_id=source.uploader) if source.uploader is not None else None
        authors = [datamodel.User.get(user_id) for user_id in source.coauthors]
        owners = [datamodel.User.get(user_id) for user_id in source.shared_with]
        if uploader is not None:
            authors.append(uploader)
            owners.append(uploader)
        authors.sort(key=lambda user: user.last_name + ' ' + user.first_name)
        owners.sort(key=lambda user: user.last_name + ' ' + user.first_name)

Markus Scheidgen's avatar
Markus Scheidgen committed
151
        self.uploader = User.from_user(uploader) if uploader is not None else None
Markus Scheidgen's avatar
Markus Scheidgen committed
152
153
        self.authors = [User.from_user(user) for user in authors]
        self.owners = [User.from_user(user) for user in owners]
154

155
        self.comment = source.comment
156
        self.references = source.references
157
        self.datasets = [Dataset.from_dataset_id(dataset_id) for dataset_id in source.datasets]
158
        self.external_id = source.external_id
159

160
        for quantity in datamodel.Domain.instance.domain_quantities.values():
161
162
163
            setattr(
                self, quantity.name,
                quantity.elastic_value(getattr(source, quantity.metadata_field)))
164
165


166
167
168
169
170
171
def delete_upload(upload_id):
    """ Delete all entries with given ``upload_id`` from the index. """
    index = Entry._default_index()
    Search(index=index).query('match', upload_id=upload_id).delete()


172
173
174
175
176
177
def delete_entry(calc_id):
    """ Delete the entry with the given ``calc_id`` from the index. """
    index = Entry._default_index()
    Search(index=index).query('match', calc_id=calc_id).delete()


178
179
180
181
182
183
def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
    """ Update all given calcs with their metadata and set ``publish = True``. """
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry.published = True
184
185
186
187
188
            entry = entry.to_dict(include_meta=True)
            source = entry.pop('_source')
            entry['doc'] = source
            entry['_op_type'] = 'update'
            yield entry
189
190

    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
191
192
193
    refresh()


Markus Scheidgen's avatar
Markus Scheidgen committed
194
def index_all(calcs: Iterable[datamodel.CalcWithMetadata], do_refresh=True) -> None:
195
196
197
198
199
200
    """
    Adds all given calcs with their metadata to the index.

    Returns:
        Number of failed entries.
    """
201
202
203
204
205
206
207
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry = entry.to_dict(include_meta=True)
            entry['_op_type'] = 'index'
            yield entry

208
    _, failed = elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates(), stats_only=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
209

Markus Scheidgen's avatar
Markus Scheidgen committed
210
    if do_refresh:
Markus Scheidgen's avatar
Markus Scheidgen committed
211
212
        refresh()

213
    return failed
214
215


216
217
def refresh():
    infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
218
219


220
quantities = datamodel.Domain.instance.quantities
221
"""The available search quantities """
222

223
metrics = datamodel.Domain.instance.metrics
224
225
"""
The available search metrics. Metrics are integer values given for each entry that can
226
be used in statistics (aggregations), e.g. the sum of all total energy calculations or cardinality of
227
228
229
all unique geometries.
"""

230
231
metrics_names = datamodel.Domain.instance.metrics_names
""" Names of all available metrics """
232

Markus Scheidgen's avatar
Markus Scheidgen committed
233
234
235
groups = datamodel.Domain.instance.groups
"""The available groupable quantities"""

236
order_default_quantity = None
237
for quantity in datamodel.Domain.instance.quantities.values():
238
239
    if quantity.order_default:
        order_default_quantity = quantity.name
Markus Scheidgen's avatar
Markus Scheidgen committed
240

241

242
class SearchRequest:
243
    '''
244
245
    Represents a search request and allows to execute that request.
    It allows to compose the following features: a query;
246
247
248
    statistics (metrics and aggregations); quantity values; scrolling, pagination for entries;
    scrolling for quantity values.

249
250
251
    The query part filters NOMAD data before the other features come into effect. There
    are specialized methods for configuring the :func:`owner` and :func:`time_range` queries.
    Quantity's can be search for by setting them as attributes.
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271

    The aggregations for statistics can be requested for pre-configured quantities. These
    bucket aggregations come with a metric calculated for each each possible
    quantity value.

    The other possible form of aggregations, allows to get quantity values as results
    (e.g. get all datasets, get all users, etc.). Each value can be accompanied by metrics
    (over all entries with that value) and an example value.

    Of course, searches can return a set of search results. Search objects can be
    configured with pagination or scrolling for these results. Pagination is the default
    and also allows ordering of results. Scrolling can be used if all entries need to be
    'scrolled through'. This might be necessary, since elastic search has limits on
    possible pages (e.g. 'from' must by smaller than 10000). On the downside, there is no
    ordering on scrolling.

    There is also scrolling for quantities to go through all quantity values. There is no
    paging for aggregations.
    '''
    def __init__(self, query=None):
272
273
        self._query = query
        self._search = Search(index=config.elastic.index_name)
274
275
276
277
278
279

    def owner(self, owner_type: str = 'all', user_id: str = None):
        """
        Uses the query part of the search to restrict the results based on the owner.
        The possible types are: ``all`` for all calculations; ``public`` for
        caclulations visible by everyone, excluding entries only visible to the given user;
280
        ``user`` for all calculations of to the given user; ``staging`` for all
281
282
283
284
285
        calculations in staging of the given user.

        Arguments:
            owner_type: The type of the owner query, see above.
            user_id: The 'owner' given as the user's unique id.
286
287
288
289
290

        Raises:
            KeyError: If the given owner_type is not supported
            ValueError: If the owner_type requires a user but none is given, or the
                given user is not allowed to use the given owner_type.
291
        """
292
293
294
295
296
297
        if owner_type == 'all':
            q = Q('term', published=True) & Q('term', with_embargo=False)
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
        elif owner_type == 'public':
            q = Q('term', published=True) & Q('term', with_embargo=False)
298
299
300
301
302
        elif owner_type == 'shared':
            if user_id is None:
                raise ValueError('Authentication required for owner value shared.')

            q = Q('term', owners__user_id=user_id)
303
304
305
306
        elif owner_type == 'user':
            if user_id is None:
                raise ValueError('Authentication required for owner value user.')

307
            q = Q('term', uploader__user_id=user_id)
308
309
310
311
312
        elif owner_type == 'staging':
            if user_id is None:
                raise ValueError('Authentication required for owner value user')
            q = Q('term', published=False) & Q('term', owners__user_id=user_id)
        elif owner_type == 'admin':
Markus Scheidgen's avatar
Markus Scheidgen committed
313
            if user_id is None or not datamodel.User.get(user_id=user_id).is_admin:
314
315
316
317
                raise ValueError('This can only be used by the admin user.')
            q = None
        else:
            raise KeyError('Unsupported owner value')
318

319
        if q is not None:
320
            self.q = self.q & q
321

322
        return self
323

324
325
326
327
    def search_parameters(self, **kwargs):
        """
        Configures the existing query with additional search parameters. Kwargs are
        interpreted as key value pairs. Keys have to coresspond to valid entry quantities
328
329
        in the domain's (DFT calculations) datamodel. Alternatively search parameters
        can be set via attributes.
330
        """
331
332
        for name, value in kwargs.items():
            self.search_parameter(name, value)
Markus Scheidgen's avatar
Markus Scheidgen committed
333

334
        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
335

336
    def search_parameter(self, name, value):
337
        quantity = quantities.get(name, None)
338
        if quantity is None:
339
            raise KeyError('Unknown quantity %s' % name)
340

341
342
343
344
345
        if quantity.multi and not isinstance(value, list):
            value = [value]

        value = quantity.elastic_value(value)

346
347
348
349
350
351
352
        if quantity.elastic_search_type == 'terms':
            if not isinstance(value, list):
                value = [value]
            self.q &= Q('terms', **{quantity.elastic_field: value})

            return self

353
        if isinstance(value, list):
354
            values = value
355
        else:
356
            values = [value]
357
358

        for item in values:
359
            self.q &= Q(quantity.elastic_search_type, **{quantity.elastic_field: item})
360

361
        return self
362

363
364
365
    def query(self, query):
        """ Adds the given query as a 'and' (i.e. 'must') clause to the request. """
        self._query &= query
366

367
        return self
368

369
370
    def time_range(self, start: datetime, end: datetime):
        """ Adds a time range to the query. """
371
372
        if start is None and end is None:
            return self
373

374
375
376
377
        if start is None:
            start = datetime.fromtimestamp(0)
        if end is None:
            end = datetime.utcnow()
378

379
        self.q &= Q('range', upload_time=dict(gte=start, lte=end))
380

381
        return self
382

383
384
385
386
387
    @property
    def q(self):
        """ The underlying elasticsearch_dsl query object """
        if self._query is None:
            return Q('match_all')
388
389
        else:
            return self._query
390

391
    @q.setter
392
    def q(self, q):
393
        self._query = q
394

395
396
397
    def totals(self, metrics_to_use: List[str] = []):
        """
        Configure the request to return overall totals for the given metrics.
398

399
400
401
402
403
404
        The statics are returned with the other quantity statistics under the pseudo
        quantity name 'total'. 'total' contains the pseudo value 'all'. It is used to
        store the metrics aggregated over all entries in the search results.
        """
        self._add_metrics(self._search.aggs, metrics_to_use)
        return self
405

406
407
408
409
410
411
412
413
414
    def default_statistics(self, metrics_to_use: List[str] = []):
        """
        Configures the domain's default statistics.
        """
        for name in datamodel.Domain.instance.default_statistics:
            self.statistic(
                name,
                quantities[name].aggregations,
                metrics_to_use=metrics_to_use)
415

416
        return self
417

418
    def statistic(self, quantity_name: str, size: int, metrics_to_use: List[str] = []):
419
420
421
422
423
424
425
426
427
428
429
430
431
432
        """
        This can be used to display statistics over the searched entries and allows to
        implement faceted search on the top values for each quantity.

        The metrics contain overall and per quantity value sums of code runs (calcs),
        unique code runs, datasets, and additional domain specific metrics
        (e.g. total energies, and unique geometries for DFTcalculations). The quantities
        that can be aggregated to metrics are defined in module:`datamodel`. Aggregations
        and respective metrics are calculated for aggregations given in ``aggregations``
        and metrics in ``aggregation_metrics``. As a pseudo aggregation ``total_metrics``
        are calculation over all search results. The ``aggregations`` gives tuples of
        quantities and default aggregation sizes.

        The search results will contain a dictionary ``statistics``. This has a key
433
        for each configured quantity. Each quantity key will hold a dict
434
435
        with a key for each quantity value. Each quantity value key will hold a dict
        with a key for each metric. The values will be the actual aggregated metric values.
436

437
        Arguments:
438
            quantity_name: The quantity to aggregate statistics for. Only works on *keyword* field.
439
440
441
442
            metrics_to_use: The metrics calculated over the aggregations. Can be
                ``unique_code_runs``, ``datasets``, other domain specific metrics.
                The basic doc_count metric ``code_runs`` is always given.
        """
443
444
445
446
447
        quantity = quantities[quantity_name]
        min_doc_count = 0 if quantity.zero_aggs else 1
        terms = A(
            'terms', field=quantity.elastic_field, size=size, min_doc_count=min_doc_count,
            order=dict(_key='asc'))
448

449
450
        buckets = self._search.aggs.bucket('statistics:%s' % quantity_name, terms)
        self._add_metrics(buckets, metrics_to_use)
451

452
        return self
453

454
455
456
    def _add_metrics(self, parent=None, metrics_to_use: List[str] = []):
        if parent is None:
            parent = self._search.aggs
457

458
        for metric in metrics_to_use:
459
460
            field, metric_kind = metrics[metric]
            parent.metric('metric:%s' % metric, A(metric_kind, field=field))
461

462
463
464
465
466
467
    def date_histogram(self, metrics_to_use: List[str] = []):
        """
        Adds a date histogram on the given metrics to the statistics part.
        """
        histogram = A('date_histogram', field='upload_time', interval='1M', format='yyyy-MM-dd')
        self._add_metrics(self._search.aggs.bucket('statistics:date_histogram', histogram), metrics_to_use)
468

469
        return self
470

471
472
    def quantities(self, **kwargs):
        """
473
474
475
476
477
478
        Shorthand for adding multiple quantities. See :func:`quantity`. Keywork argument
        keys are quantity name, values are tuples of size and after value.
        """
        for name, spec in kwargs:
            size, after = spec
            self.quantity(name, after=after, size=size)
479

480
        return self
481

482
483
484
    def quantity(
            self, name, size=100, after=None, examples=0, examples_source=None,
            order_by: str = None, order: str = 'desc'):
485
486
        """
        Adds a requests for values of the given quantity.
487
488
489
        It allows to scroll through all values via elasticsearch's
        composite aggregations. The response will contain the quantity values and
        an example entry for each value.
490

491
492
        This can be used to implement continues scrolling through authors, datasets,
        or uploads within the searched entries.
493

494
495
        If one or more quantities are specified,
        the search results will contain a dictionary ``quantities``. The keys are quantity
496
497
        name the values dictionary with 'after' and 'values' key.
        The 'values' key holds a dict with all the values as keys and their entry count
498
499
        as values (i.e. number of entries with that value).

500
        Arguments:
501
            name: The quantity name. Must be in :data:`quantities`.
502
503
504
505
506
507
            after: The 'after' value allows to scroll over various requests, by providing
                the 'after' value of the last search. The 'after' value is part of the
                response. Use ``None`` in the first request.
            size:
                The size gives the ammount of maximum values in the next scroll window.
                If the size is None, a maximum of 100 quantity values will be requested.
508
509
510
511
512
513
514
            examples:
                Number of results to return that has each value
            order_by:
                A sortable quantity that should be used to order. The max of each
                value bucket is used.
            order:
                "desc" or "asc"
515
        """
516
517
518
        if size is None:
            size = 100

519
        quantity = quantities[name]
520
        terms = A('terms', field=quantity.elastic_field)
521

522
523
524
        # We are using elastic searchs 'composite aggregations' here. We do not really
        # compose aggregations, but only those pseudo composites allow us to use the
        # 'after' feature that allows to scan through all aggregation values.
525
526
527
        if order_by is None:
            composite = dict(sources={name: terms}, size=size)
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
528
            sort_terms = A('terms', field=order_by, order=order)
529
            composite = dict(sources=[{order_by: sort_terms}, {name: terms}], size=size)
530
        if after is not None:
531
532
533
534
535
536
            if order_by is None:
                composite['after'] = {name: after}
            else:
                composite['after'] = {order_by: after, name: ''}

        composite_agg = self._search.aggs.bucket('quantity:%s' % name, 'composite', **composite)
537

538
        if examples > 0:
539
            kwargs: Dict[str, Any] = {}
540
541
            if examples_source is not None:
                kwargs.update(_source=dict(includes=examples_source))
542

543
            composite_agg.metric('examples', A('top_hits', size=examples, **kwargs))
544

545
        return self
546

547
548
549
550
551
    def execute(self):
        """
        Exectutes without returning actual results. Only makes sense if the request
        was configured for statistics or quantity values.
        """
552
        return self._response(self._search.query(self.q)[0:0].execute())
553

554
    def execute_scan(self, order_by: str = None, order: int = -1, **kwargs):
555
556
557
558
        """
        This execute the search as scan. The result will be a generator over the found
        entries. Everything but the query part of this object, will be ignored.
        """
559
560
561
562
563
564
565
566
567
568
569
570
571
        search = self._search.query(self.q)

        if order_by is not None:
            if order_by not in quantities:
                raise KeyError('Unknown order quantity %s' % order_by)

            order_by_quantity = quantities[order_by]

            if order == 1:
                search = search.sort(order_by_quantity.elastic_field)
            else:
                search = search.sort('-%s' % order_by_quantity.elastic_field)

572
        for hit in search.params(**kwargs).scan():
573
            yield hit.to_dict()
574

575
    def execute_paginated(
576
577
578
579
580
581
582
583
584
585
586
            self, page: int = 1, per_page=10, order_by: str = order_default_quantity,
            order: int = -1):
        """
        Executes the search and returns paginated results. Those are sorted.

        Arguments:
            page: The requested page, starts with 1.
            per_page: The number of entries per page.
            order_by: The quantity to order by.
            order: -1 or 1 for descending or ascending order.
        """
587
        search = self._search.query(self.q)
588

589
        if order_by not in quantities:
590
591
            raise KeyError('Unknown order quantity %s' % order_by)

592
        order_by_quantity = quantities[order_by]
593
594
595
596
597
598

        if order == 1:
            search = search.sort(order_by_quantity.elastic_field)
        else:
            search = search.sort('-%s' % order_by_quantity.elastic_field)
        search = search[(page - 1) * per_page: page * per_page]
599

600
        result = self._response(search.execute(), with_hits=True)
601
        result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
602
603
        return result

604
    def execute_scrolled(self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m'):
605
        """
606
607
        Executes a scrolling search. based on ES scroll API. Pagination is replaced with
        scrolling, no ordering is available, no statistics, no quantities will be provided.
608

609
610
611
612
        Scrolling is done by calling this function again and again with the same ``scroll_id``.
        Each time, this function will return the next batch of search results. If the
        ``scroll_id`` is not available anymore, a new ``scroll_id`` is assigned and scrolling
        starts from the beginning again.
613

614
615
        The response will contain a 'scroll' part with attributes 'total', 'scroll_id',
        and 'size'.
616

617
618
619
620
621
622
        Arguments:
            scroll_id: The scroll id to receive the next batch from. None will create a new
                scroll.
            size: The batch size in number of hits.
            scroll: The time the scroll should be kept alive (i.e. the time between requests
                to this method) in ES time units. Default is 5 minutes.
623
        """
624
        es = infrastructure.elastic_client
625

626
627
628
        if scroll_id is None:
            # initiate scroll
            resp = es.search(  # pylint: disable=E1123
629
                body=self._search.query(self.q).to_dict(), scroll=scroll, size=size,
630
                index=config.elastic.index_name)
631

632
633
634
635
            scroll_id = resp.get('_scroll_id')
            if scroll_id is None:
                # no results for search query
                return dict(scroll=dict(total=0, size=size), results=[])
636

637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
        else:
            try:
                resp = es.scroll(scroll_id, scroll=scroll)  # pylint: disable=E1123
            except NotFoundError:
                raise ScrollIdNotFound()

        total = resp['hits']['total']
        results = list(hit['_source'] for hit in resp['hits']['hits'])

        # since we are using the low level api here, we should check errors
        if resp["_shards"]["successful"] < resp["_shards"]["total"]:
            utils.get_logger(__name__).error('es operation was unsuccessful on at least one shard')
            raise ElasticSearchError('es operation was unsuccessful on at least one shard')

        if len(results) == 0:
            es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))  # pylint: disable=E1123
            scroll_id = None

        scroll_info = dict(total=total, size=size)
        if scroll_id is not None:
            scroll_info.update(scroll_id=scroll_id)

        return dict(scroll=scroll_info, results=results)

661
662
    def _response(self, response, with_hits: bool = False) -> Dict[str, Any]:
        """
663
        Prepares a response object covering the total number of results, hits, statistics,
664
665
666
        and quantities. Other aspects like pagination and scrolling have to be added
        elsewhere.
        """
667
        result: Dict[str, Any] = dict()
668
        aggs = response.aggregations.to_dict()
669

670
        # total
671
672
673
674
        total = response.hits.total if hasattr(response, 'hits') else 0
        result.update(total=total)

        # hits
675
676
        if len(response.hits) > 0 or with_hits:
            result.update(results=[hit.to_dict() for hit in response.hits])
677
678
679

        # statistics
        def get_metrics(bucket, code_runs):
680
681
682
683
684
            result = {}
            for metric in metrics_names:
                agg_name = 'metric:%s' % metric
                if agg_name in bucket:
                    result[metric] = bucket[agg_name]['value']
685
            result.update(code_runs=code_runs)
686
687
            return result

688
        statistics_results = {
689
690
691
            quantity_name[11:]: {
                bucket['key']: get_metrics(bucket, bucket['doc_count'])
                for bucket in quantity['buckets']
692
            }
693
            for quantity_name, quantity in aggs.items()
694
695
            if quantity_name.startswith('statistics:')
        }
696

697
698
699
        # totals
        totals_result = get_metrics(aggs, total)
        statistics_results['total'] = dict(all=totals_result)
Markus Scheidgen's avatar
Markus Scheidgen committed
700

701
702
        if len(statistics_results) > 0:
            result.update(statistics=statistics_results)
Markus Scheidgen's avatar
Markus Scheidgen committed
703

704
        # quantities
705
        def create_quantity_result(quantity_name, quantity):
706
707
708
709
710
711
712
            values = {}
            for bucket in quantity['buckets']:
                value = dict(
                    total=bucket['doc_count'])
                if 'examples' in bucket:
                    examples = [hit['_source'] for hit in bucket['examples']['hits']['hits']]
                    value.update(examples=examples)
Markus Scheidgen's avatar
Markus Scheidgen committed
713

714
                values[bucket['key'][quantity_name]] = value
715

716
            result = dict(values=values)
717
            if 'after_key' in quantity:
718
719
720
721
722
723
724
725
                after = quantity['after_key']
                if len(after) == 1:
                    result.update(after=after[quantity_name])
                else:
                    for key in after:
                        if key != quantity_name:
                            result.update(after=after[key])
                            break
726

727
            return result
728

729
        quantity_results = {
730
731
732
            quantity_name[9:]: create_quantity_result(quantity_name[9:], quantity)
            for quantity_name, quantity in aggs.items()
            if quantity_name.startswith('quantity:')
Markus Scheidgen's avatar
Markus Scheidgen committed
733
734
        }

735
736
        if len(quantity_results) > 0:
            result.update(quantities=quantity_results)
737
738

        return result
739

740
741
    def __str__(self):
        return json.dumps(self._search.to_dict(), indent=2)
742
743


744
745
746
747
748
749
def to_calc_with_metadata(results: List[Dict[str, Any]]):
    """ Translates search results into :class:`CalcWithMetadata` objects read from mongo. """
    ids = [result['calc_id'] for result in results]
    return [
        datamodel.CalcWithMetadata(**calc.metadata)
        for calc in proc.Calc.objects(calc_id__in=ids)]