search.py 26.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module represents calculations in elastic search.
"""

19
from typing import Iterable, Dict, List, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
20
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
21
22
    Object, Boolean, Search, Q, A, analyzer, tokenizer
from elasticsearch_dsl.document import IndexMeta
23
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
24
from elasticsearch.exceptions import NotFoundError
Markus Scheidgen's avatar
Markus Scheidgen committed
25
from datetime import datetime
26
import json
27

Markus Scheidgen's avatar
Markus Scheidgen committed
28
from nomad import config, datamodel, infrastructure, datamodel, utils, processing as proc
29

30

31
32
33
34
path_analyzer = analyzer(
    'path_analyzer',
    tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))

35
36

class AlreadyExists(Exception): pass
37
38


39
40
41
class ElasticSearchError(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
class ScrollIdNotFound(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
45
class User(InnerDoc):
46
47

    @classmethod
Markus Scheidgen's avatar
Markus Scheidgen committed
48
49
    def from_user(cls, user):
        self = cls(user_id=user.user_id)
50
51
        self.name = user.name
        self.email = user.email
52
53

        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
54

55
    user_id = Keyword()
56
    email = Keyword()
57
    name = Text(fields={'keyword': Keyword()})
Markus Scheidgen's avatar
Markus Scheidgen committed
58
59
60


class Dataset(InnerDoc):
61
62

    @classmethod
63
    def from_dataset_id(cls, dataset_id):
64
        dataset = datamodel.Dataset.m_def.m_x('me').get(dataset_id=dataset_id)
65
        return cls(id=dataset.dataset_id, doi=dataset.doi, name=dataset.name)
Markus Scheidgen's avatar
Markus Scheidgen committed
66
67
68
69
70
71

    id = Keyword()
    doi = Keyword()
    name = Keyword()


72
73
74
class WithDomain(IndexMeta):
    """ Override elasticsearch_dsl metaclass to sneak in domain specific mappings """
    def __new__(cls, name, bases, attrs):
75
        for quantity in datamodel.Domain.instance.domain_quantities.values():
76
77
78
79
80
81
            attrs[quantity.name] = quantity.elastic_mapping
        return super(WithDomain, cls).__new__(cls, name, bases, attrs)


class Entry(Document, metaclass=WithDomain):

82
    class Index:
Markus Scheidgen's avatar
Markus Scheidgen committed
83
        name = config.elastic.index_name
84

85
    upload_id = Keyword()
86
    upload_time = Date()
Markus Scheidgen's avatar
Markus Scheidgen committed
87
88
89
    calc_id = Keyword()
    calc_hash = Keyword()
    pid = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
90
    raw_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
91
    mainfile = Keyword()
92
    files = Text(multi=True, analyzer=path_analyzer, fields={'keyword': Keyword()})
93
    uploader = Object(User)
Markus Scheidgen's avatar
Markus Scheidgen committed
94

95
96
    with_embargo = Boolean()
    published = Boolean()
97

98
    processed = Boolean()
99
100
101
    last_processing = Date()
    nomad_version = Keyword()
    nomad_commit = Keyword()
102

103
104
    authors = Object(User, multi=True)
    owners = Object(User, multi=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
105
106
    comment = Text()
    references = Keyword()
107
    datasets = Object(Dataset)
108
    external_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
109

110
    @classmethod
111
112
113
114
115
116
117
118
119
120
    def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
        entry = Entry(meta=dict(id=source.calc_id))
        entry.update(source)
        return entry

    def update(self, source: datamodel.CalcWithMetadata) -> None:
        self.upload_id = source.upload_id
        self.upload_time = source.upload_time
        self.calc_id = source.calc_id
        self.calc_hash = source.calc_hash
121
        self.pid = None if source.pid is None else str(source.pid)
Markus Scheidgen's avatar
Markus Scheidgen committed
122
        self.raw_id = None if source.raw_id is None else str(source.raw_id)
123

124
        self.processed = source.processed
125
126
127
        self.last_processing = source.last_processing
        self.nomad_version = source.nomad_version
        self.nomad_commit = source.nomad_commit
128

129
        self.mainfile = source.mainfile
130
131
132
133
134
135
136
        if source.files is None:
            self.files = [self.mainfile]
        elif self.mainfile not in source.files:
            self.files = [self.mainfile] + source.files
        else:
            self.files = source.files

Markus Scheidgen's avatar
Markus Scheidgen committed
137
        self.with_embargo = bool(source.with_embargo)
138
        self.published = source.published
Markus Scheidgen's avatar
Markus Scheidgen committed
139
140
141
142
143
144
145
146
147
148

        uploader = datamodel.User.get(user_id=source.uploader) if source.uploader is not None else None
        authors = [datamodel.User.get(user_id) for user_id in source.coauthors]
        owners = [datamodel.User.get(user_id) for user_id in source.shared_with]
        if uploader is not None:
            authors.append(uploader)
            owners.append(uploader)
        authors.sort(key=lambda user: user.last_name + ' ' + user.first_name)
        owners.sort(key=lambda user: user.last_name + ' ' + user.first_name)

Markus Scheidgen's avatar
Markus Scheidgen committed
149
        self.uploader = User.from_user(uploader) if uploader is not None else None
Markus Scheidgen's avatar
Markus Scheidgen committed
150
151
        self.authors = [User.from_user(user) for user in authors]
        self.owners = [User.from_user(user) for user in owners]
152

153
        self.comment = source.comment
154
        self.references = source.references
155
        self.datasets = [Dataset.from_dataset_id(dataset_id) for dataset_id in source.datasets]
156
        self.external_id = source.external_id
157

158
        for quantity in datamodel.Domain.instance.domain_quantities.values():
159
160
161
            setattr(
                self, quantity.name,
                quantity.elastic_value(getattr(source, quantity.metadata_field)))
162
163


164
165
166
167
168
169
170
171
172
173
174
175
def delete_upload(upload_id):
    """ Delete all entries with given ``upload_id`` from the index. """
    index = Entry._default_index()
    Search(index=index).query('match', upload_id=upload_id).delete()


def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
    """ Update all given calcs with their metadata and set ``publish = True``. """
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry.published = True
176
177
178
179
180
            entry = entry.to_dict(include_meta=True)
            source = entry.pop('_source')
            entry['doc'] = source
            entry['_op_type'] = 'update'
            yield entry
181
182

    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
183
184
185
    refresh()


Markus Scheidgen's avatar
Markus Scheidgen committed
186
def index_all(calcs: Iterable[datamodel.CalcWithMetadata], do_refresh=True) -> None:
187
188
189
190
191
192
    """
    Adds all given calcs with their metadata to the index.

    Returns:
        Number of failed entries.
    """
193
194
195
196
197
198
199
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry = entry.to_dict(include_meta=True)
            entry['_op_type'] = 'index'
            yield entry

200
    _, failed = elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates(), stats_only=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
201

Markus Scheidgen's avatar
Markus Scheidgen committed
202
    if do_refresh:
Markus Scheidgen's avatar
Markus Scheidgen committed
203
204
        refresh()

205
    return failed
206
207


208
209
def refresh():
    infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
210
211


212
quantities = datamodel.Domain.instance.quantities
213
"""The available search quantities """
214

215
metrics = datamodel.Domain.instance.metrics
216
217
"""
The available search metrics. Metrics are integer values given for each entry that can
218
be used in statistics (aggregations), e.g. the sum of all total energy calculations or cardinality of
219
220
221
all unique geometries.
"""

222
223
metrics_names = datamodel.Domain.instance.metrics_names
""" Names of all available metrics """
224

Markus Scheidgen's avatar
Markus Scheidgen committed
225
226
227
groups = datamodel.Domain.instance.groups
"""The available groupable quantities"""

228
order_default_quantity = None
229
for quantity in datamodel.Domain.instance.quantities.values():
230
231
    if quantity.order_default:
        order_default_quantity = quantity.name
Markus Scheidgen's avatar
Markus Scheidgen committed
232

233

234
class SearchRequest:
235
    '''
236
237
    Represents a search request and allows to execute that request.
    It allows to compose the following features: a query;
238
239
240
    statistics (metrics and aggregations); quantity values; scrolling, pagination for entries;
    scrolling for quantity values.

241
242
243
    The query part filters NOMAD data before the other features come into effect. There
    are specialized methods for configuring the :func:`owner` and :func:`time_range` queries.
    Quantity's can be search for by setting them as attributes.
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263

    The aggregations for statistics can be requested for pre-configured quantities. These
    bucket aggregations come with a metric calculated for each each possible
    quantity value.

    The other possible form of aggregations, allows to get quantity values as results
    (e.g. get all datasets, get all users, etc.). Each value can be accompanied by metrics
    (over all entries with that value) and an example value.

    Of course, searches can return a set of search results. Search objects can be
    configured with pagination or scrolling for these results. Pagination is the default
    and also allows ordering of results. Scrolling can be used if all entries need to be
    'scrolled through'. This might be necessary, since elastic search has limits on
    possible pages (e.g. 'from' must by smaller than 10000). On the downside, there is no
    ordering on scrolling.

    There is also scrolling for quantities to go through all quantity values. There is no
    paging for aggregations.
    '''
    def __init__(self, query=None):
264
265
        self._query = query
        self._search = Search(index=config.elastic.index_name)
266
267
268
269
270
271

    def owner(self, owner_type: str = 'all', user_id: str = None):
        """
        Uses the query part of the search to restrict the results based on the owner.
        The possible types are: ``all`` for all calculations; ``public`` for
        caclulations visible by everyone, excluding entries only visible to the given user;
272
        ``user`` for all calculations of to the given user; ``staging`` for all
273
274
275
276
277
        calculations in staging of the given user.

        Arguments:
            owner_type: The type of the owner query, see above.
            user_id: The 'owner' given as the user's unique id.
278
279
280
281
282

        Raises:
            KeyError: If the given owner_type is not supported
            ValueError: If the owner_type requires a user but none is given, or the
                given user is not allowed to use the given owner_type.
283
        """
284
285
286
287
288
289
        if owner_type == 'all':
            q = Q('term', published=True) & Q('term', with_embargo=False)
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
        elif owner_type == 'public':
            q = Q('term', published=True) & Q('term', with_embargo=False)
290
291
292
293
294
        elif owner_type == 'shared':
            if user_id is None:
                raise ValueError('Authentication required for owner value shared.')

            q = Q('term', owners__user_id=user_id)
295
296
297
298
        elif owner_type == 'user':
            if user_id is None:
                raise ValueError('Authentication required for owner value user.')

299
            q = Q('term', uploader__user_id=user_id)
300
301
302
303
304
        elif owner_type == 'staging':
            if user_id is None:
                raise ValueError('Authentication required for owner value user')
            q = Q('term', published=False) & Q('term', owners__user_id=user_id)
        elif owner_type == 'admin':
Markus Scheidgen's avatar
Markus Scheidgen committed
305
            if user_id is None or not datamodel.User.get(user_id=user_id).is_admin:
306
307
308
309
                raise ValueError('This can only be used by the admin user.')
            q = None
        else:
            raise KeyError('Unsupported owner value')
310

311
        if q is not None:
312
            self.q = self.q & q
313

314
        return self
315

316
317
318
319
    def search_parameters(self, **kwargs):
        """
        Configures the existing query with additional search parameters. Kwargs are
        interpreted as key value pairs. Keys have to coresspond to valid entry quantities
320
321
        in the domain's (DFT calculations) datamodel. Alternatively search parameters
        can be set via attributes.
322
        """
323
324
        for name, value in kwargs.items():
            self.search_parameter(name, value)
Markus Scheidgen's avatar
Markus Scheidgen committed
325

326
        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
327

328
    def search_parameter(self, name, value):
329
        quantity = quantities.get(name, None)
330
        if quantity is None:
331
            raise KeyError('Unknown quantity %s' % name)
332

333
334
335
336
337
        if quantity.multi and not isinstance(value, list):
            value = [value]

        value = quantity.elastic_value(value)

338
339
340
341
342
343
344
        if quantity.elastic_search_type == 'terms':
            if not isinstance(value, list):
                value = [value]
            self.q &= Q('terms', **{quantity.elastic_field: value})

            return self

345
        if isinstance(value, list):
346
            values = value
347
        else:
348
            values = [value]
349
350

        for item in values:
351
            self.q &= Q(quantity.elastic_search_type, **{quantity.elastic_field: item})
352

353
        return self
354

355
356
357
    def query(self, query):
        """ Adds the given query as a 'and' (i.e. 'must') clause to the request. """
        self._query &= query
358

359
        return self
360

361
362
    def time_range(self, start: datetime, end: datetime):
        """ Adds a time range to the query. """
363
364
        if start is None and end is None:
            return self
365

366
367
368
369
        if start is None:
            start = datetime.fromtimestamp(0)
        if end is None:
            end = datetime.utcnow()
370

371
        self.q &= Q('range', upload_time=dict(gte=start, lte=end))
372

373
        return self
374

375
376
377
378
379
    @property
    def q(self):
        """ The underlying elasticsearch_dsl query object """
        if self._query is None:
            return Q('match_all')
380
381
        else:
            return self._query
382

383
    @q.setter
384
    def q(self, q):
385
        self._query = q
386

387
388
389
    def totals(self, metrics_to_use: List[str] = []):
        """
        Configure the request to return overall totals for the given metrics.
390

391
392
393
394
395
396
        The statics are returned with the other quantity statistics under the pseudo
        quantity name 'total'. 'total' contains the pseudo value 'all'. It is used to
        store the metrics aggregated over all entries in the search results.
        """
        self._add_metrics(self._search.aggs, metrics_to_use)
        return self
397

398
399
400
401
402
403
404
405
406
    def default_statistics(self, metrics_to_use: List[str] = []):
        """
        Configures the domain's default statistics.
        """
        for name in datamodel.Domain.instance.default_statistics:
            self.statistic(
                name,
                quantities[name].aggregations,
                metrics_to_use=metrics_to_use)
407

408
        return self
409

410
    def statistic(self, quantity_name: str, size: int, metrics_to_use: List[str] = []):
411
412
413
414
415
416
417
418
419
420
421
422
423
424
        """
        This can be used to display statistics over the searched entries and allows to
        implement faceted search on the top values for each quantity.

        The metrics contain overall and per quantity value sums of code runs (calcs),
        unique code runs, datasets, and additional domain specific metrics
        (e.g. total energies, and unique geometries for DFTcalculations). The quantities
        that can be aggregated to metrics are defined in module:`datamodel`. Aggregations
        and respective metrics are calculated for aggregations given in ``aggregations``
        and metrics in ``aggregation_metrics``. As a pseudo aggregation ``total_metrics``
        are calculation over all search results. The ``aggregations`` gives tuples of
        quantities and default aggregation sizes.

        The search results will contain a dictionary ``statistics``. This has a key
425
        for each configured quantity. Each quantity key will hold a dict
426
427
        with a key for each quantity value. Each quantity value key will hold a dict
        with a key for each metric. The values will be the actual aggregated metric values.
428

429
        Arguments:
430
            quantity_name: The quantity to aggregate statistics for. Only works on *keyword* field.
431
432
433
434
            metrics_to_use: The metrics calculated over the aggregations. Can be
                ``unique_code_runs``, ``datasets``, other domain specific metrics.
                The basic doc_count metric ``code_runs`` is always given.
        """
435
436
437
438
439
        quantity = quantities[quantity_name]
        min_doc_count = 0 if quantity.zero_aggs else 1
        terms = A(
            'terms', field=quantity.elastic_field, size=size, min_doc_count=min_doc_count,
            order=dict(_key='asc'))
440

441
442
        buckets = self._search.aggs.bucket('statistics:%s' % quantity_name, terms)
        self._add_metrics(buckets, metrics_to_use)
443

444
        return self
445

446
447
448
    def _add_metrics(self, parent=None, metrics_to_use: List[str] = []):
        if parent is None:
            parent = self._search.aggs
449

450
        for metric in metrics_to_use:
451
452
            field, metric_kind = metrics[metric]
            parent.metric('metric:%s' % metric, A(metric_kind, field=field))
453

454
455
456
457
458
459
    def date_histogram(self, metrics_to_use: List[str] = []):
        """
        Adds a date histogram on the given metrics to the statistics part.
        """
        histogram = A('date_histogram', field='upload_time', interval='1M', format='yyyy-MM-dd')
        self._add_metrics(self._search.aggs.bucket('statistics:date_histogram', histogram), metrics_to_use)
460

461
        return self
462

463
464
    def quantities(self, **kwargs):
        """
465
466
467
468
469
470
        Shorthand for adding multiple quantities. See :func:`quantity`. Keywork argument
        keys are quantity name, values are tuples of size and after value.
        """
        for name, spec in kwargs:
            size, after = spec
            self.quantity(name, after=after, size=size)
471

472
        return self
473

474
    def quantity(self, name, size=100, after=None, examples=0, examples_source=None):
475
476
        """
        Adds a requests for values of the given quantity.
477
478
479
        It allows to scroll through all values via elasticsearch's
        composite aggregations. The response will contain the quantity values and
        an example entry for each value.
480

481
482
        This can be used to implement continues scrolling through authors, datasets,
        or uploads within the searched entries.
483

484
485
        If one or more quantities are specified,
        the search results will contain a dictionary ``quantities``. The keys are quantity
486
487
        name the values dictionary with 'after' and 'values' key.
        The 'values' key holds a dict with all the values as keys and their entry count
488
489
        as values (i.e. number of entries with that value).

490
        Arguments:
491
            name: The quantity name. Must be in :data:`quantities`.
492
493
494
495
496
497
            after: The 'after' value allows to scroll over various requests, by providing
                the 'after' value of the last search. The 'after' value is part of the
                response. Use ``None`` in the first request.
            size:
                The size gives the ammount of maximum values in the next scroll window.
                If the size is None, a maximum of 100 quantity values will be requested.
498
        """
499
500
501
        if size is None:
            size = 100

502
        quantity = quantities[name]
503
        terms = A('terms', field=quantity.elastic_field)
504

505
506
507
        # We are using elastic searchs 'composite aggregations' here. We do not really
        # compose aggregations, but only those pseudo composites allow us to use the
        # 'after' feature that allows to scan through all aggregation values.
508
        composite = dict(sources={name: terms}, size=size)
509
        if after is not None:
510
            composite['after'] = {name: after}
511

512
513
514
515
516
        composite = self._search.aggs.bucket('quantity:%s' % name, 'composite', **composite)
        if examples > 0:
            kwargs = {}
            if examples_source is not None:
                kwargs.update(_source=dict(includes=examples_source))
517

518
            composite.metric('examples', A('top_hits', size=examples, **kwargs))
519

520
        return self
521

522
523
524
525
526
    def execute(self):
        """
        Exectutes without returning actual results. Only makes sense if the request
        was configured for statistics or quantity values.
        """
527
        return self._response(self._search.query(self.q)[0:0].execute())
528

529
    def execute_scan(self, order_by: str = None, order: int = -1):
530
531
532
533
        """
        This execute the search as scan. The result will be a generator over the found
        entries. Everything but the query part of this object, will be ignored.
        """
534
535
536
537
538
539
540
541
542
543
544
545
546
547
        search = self._search.query(self.q)

        if order_by is not None:
            if order_by not in quantities:
                raise KeyError('Unknown order quantity %s' % order_by)

            order_by_quantity = quantities[order_by]

            if order == 1:
                search = search.sort(order_by_quantity.elastic_field)
            else:
                search = search.sort('-%s' % order_by_quantity.elastic_field)

        for hit in search.scan():
548
            yield hit.to_dict()
549

550
    def execute_paginated(
551
552
553
554
555
556
557
558
559
560
561
            self, page: int = 1, per_page=10, order_by: str = order_default_quantity,
            order: int = -1):
        """
        Executes the search and returns paginated results. Those are sorted.

        Arguments:
            page: The requested page, starts with 1.
            per_page: The number of entries per page.
            order_by: The quantity to order by.
            order: -1 or 1 for descending or ascending order.
        """
562
        search = self._search.query(self.q)
563

564
        if order_by not in quantities:
565
566
            raise KeyError('Unknown order quantity %s' % order_by)

567
        order_by_quantity = quantities[order_by]
568
569
570
571
572
573

        if order == 1:
            search = search.sort(order_by_quantity.elastic_field)
        else:
            search = search.sort('-%s' % order_by_quantity.elastic_field)
        search = search[(page - 1) * per_page: page * per_page]
574

575
        result = self._response(search.execute(), with_hits=True)
576
        result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
577
578
        return result

579
    def execute_scrolled(self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m'):
580
        """
581
582
        Executes a scrolling search. based on ES scroll API. Pagination is replaced with
        scrolling, no ordering is available, no statistics, no quantities will be provided.
583

584
585
586
587
        Scrolling is done by calling this function again and again with the same ``scroll_id``.
        Each time, this function will return the next batch of search results. If the
        ``scroll_id`` is not available anymore, a new ``scroll_id`` is assigned and scrolling
        starts from the beginning again.
588

589
590
        The response will contain a 'scroll' part with attributes 'total', 'scroll_id',
        and 'size'.
591

592
593
594
595
596
597
        Arguments:
            scroll_id: The scroll id to receive the next batch from. None will create a new
                scroll.
            size: The batch size in number of hits.
            scroll: The time the scroll should be kept alive (i.e. the time between requests
                to this method) in ES time units. Default is 5 minutes.
598
        """
599
        es = infrastructure.elastic_client
600

601
602
603
        if scroll_id is None:
            # initiate scroll
            resp = es.search(  # pylint: disable=E1123
604
                body=self._search.query(self.q).to_dict(), scroll=scroll, size=size,
605
                index=config.elastic.index_name)
606

607
608
609
610
            scroll_id = resp.get('_scroll_id')
            if scroll_id is None:
                # no results for search query
                return dict(scroll=dict(total=0, size=size), results=[])
611

612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
        else:
            try:
                resp = es.scroll(scroll_id, scroll=scroll)  # pylint: disable=E1123
            except NotFoundError:
                raise ScrollIdNotFound()

        total = resp['hits']['total']
        results = list(hit['_source'] for hit in resp['hits']['hits'])

        # since we are using the low level api here, we should check errors
        if resp["_shards"]["successful"] < resp["_shards"]["total"]:
            utils.get_logger(__name__).error('es operation was unsuccessful on at least one shard')
            raise ElasticSearchError('es operation was unsuccessful on at least one shard')

        if len(results) == 0:
            es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))  # pylint: disable=E1123
            scroll_id = None

        scroll_info = dict(total=total, size=size)
        if scroll_id is not None:
            scroll_info.update(scroll_id=scroll_id)

        return dict(scroll=scroll_info, results=results)

636
637
    def _response(self, response, with_hits: bool = False) -> Dict[str, Any]:
        """
638
        Prepares a response object covering the total number of results, hits, statistics,
639
640
641
        and quantities. Other aspects like pagination and scrolling have to be added
        elsewhere.
        """
642
        result: Dict[str, Any] = dict()
643
        aggs = response.aggregations.to_dict()
644

645
        # total
646
647
648
649
        total = response.hits.total if hasattr(response, 'hits') else 0
        result.update(total=total)

        # hits
650
651
        if len(response.hits) > 0 or with_hits:
            result.update(results=[hit.to_dict() for hit in response.hits])
652
653
654

        # statistics
        def get_metrics(bucket, code_runs):
655
656
657
658
659
            result = {}
            for metric in metrics_names:
                agg_name = 'metric:%s' % metric
                if agg_name in bucket:
                    result[metric] = bucket[agg_name]['value']
660
            result.update(code_runs=code_runs)
661
662
            return result

663
        statistics_results = {
664
665
666
            quantity_name[11:]: {
                bucket['key']: get_metrics(bucket, bucket['doc_count'])
                for bucket in quantity['buckets']
667
            }
668
            for quantity_name, quantity in aggs.items()
669
670
            if quantity_name.startswith('statistics:')
        }
671

672
673
674
        # totals
        totals_result = get_metrics(aggs, total)
        statistics_results['total'] = dict(all=totals_result)
Markus Scheidgen's avatar
Markus Scheidgen committed
675

676
677
        if len(statistics_results) > 0:
            result.update(statistics=statistics_results)
Markus Scheidgen's avatar
Markus Scheidgen committed
678

679
        # quantities
680
        def create_quantity_result(quantity_name, quantity):
681
682
683
684
685
686
687
            values = {}
            for bucket in quantity['buckets']:
                value = dict(
                    total=bucket['doc_count'])
                if 'examples' in bucket:
                    examples = [hit['_source'] for hit in bucket['examples']['hits']['hits']]
                    value.update(examples=examples)
Markus Scheidgen's avatar
Markus Scheidgen committed
688

689
                values[bucket['key'][quantity_name]] = value
690

691
            result = dict(values=values)
692
693
            if 'after_key' in quantity:
                result.update(after=quantity['after_key'][quantity_name])
694

695
            return result
696

697
        quantity_results = {
698
699
700
            quantity_name[9:]: create_quantity_result(quantity_name[9:], quantity)
            for quantity_name, quantity in aggs.items()
            if quantity_name.startswith('quantity:')
Markus Scheidgen's avatar
Markus Scheidgen committed
701
702
        }

703
704
        if len(quantity_results) > 0:
            result.update(quantities=quantity_results)
705
706

        return result
707

708
709
    def __str__(self):
        return json.dumps(self._search.to_dict(), indent=2)
710
711


712
713
714
715
716
717
def to_calc_with_metadata(results: List[Dict[str, Any]]):
    """ Translates search results into :class:`CalcWithMetadata` objects read from mongo. """
    ids = [result['calc_id'] for result in results]
    return [
        datamodel.CalcWithMetadata(**calc.metadata)
        for calc in proc.Calc.objects(calc_id__in=ids)]