search.py 26.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module represents calculations in elastic search.
"""

19
from typing import Iterable, Dict, List, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
20
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
21
22
    Object, Boolean, Search, Q, A, analyzer, tokenizer
from elasticsearch_dsl.document import IndexMeta
23
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
24
from elasticsearch.exceptions import NotFoundError
Markus Scheidgen's avatar
Markus Scheidgen committed
25
from datetime import datetime
26
import json
27

28
from nomad import config, datamodel, infrastructure, datamodel, coe_repo, utils, processing as proc
29

30

31
32
33
34
path_analyzer = analyzer(
    'path_analyzer',
    tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))

35

Markus Scheidgen's avatar
Markus Scheidgen committed
36
user_cache: Dict[str, Any] = dict()
37
38
39
40
41
42
"""
A cache for user popos used in the index. We will not retrieve names all the time.
This cache should be cleared, before larger re-index operations.
"""


43
class AlreadyExists(Exception): pass
44
45


46
47
48
class ElasticSearchError(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
49
50
51
class ScrollIdNotFound(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
52
class User(InnerDoc):
53
54
55

    @classmethod
    def from_user_popo(cls, user):
56
57
58
        self = user_cache.get(user.id, None)
        if self is None:
            self = cls(user_id=user.id)
59

60
61
            if 'first_name' not in user:
                user = coe_repo.User.from_user_id(user.id).to_popo()
62

63
64
65
66
67
68
69
70
71
72
73
74
            last_name = user['last_name'].strip()
            first_name = user['first_name'].strip()

            if len(last_name) > 0 and len(first_name) > 0:
                name = '%s, %s' % (user['last_name'], user['first_name'])
            elif len(last_name) != 0:
                name = last_name
            elif len(first_name) != 0:
                name = first_name
            else:
                name = 'unnamed user with id %d' % user.id

75
76
            self.name = name
            user_cache[user.id] = self
77
78

        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
79

80
    user_id = Keyword()
81
    name = Text(fields={'keyword': Keyword()})
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84


class Dataset(InnerDoc):
85
86
87
88

    @classmethod
    def from_dataset_popo(cls, dataset):
        return cls(
Markus Scheidgen's avatar
Markus Scheidgen committed
89
            id=dataset.id,
90
            doi=dataset.doi['value'] if dataset.doi is not None else None,
Markus Scheidgen's avatar
Markus Scheidgen committed
91
92
93
94
95
96
97
            name=dataset.name)

    id = Keyword()
    doi = Keyword()
    name = Keyword()


98
99
100
class WithDomain(IndexMeta):
    """ Override elasticsearch_dsl metaclass to sneak in domain specific mappings """
    def __new__(cls, name, bases, attrs):
101
        for quantity in datamodel.Domain.instance.domain_quantities.values():
102
103
104
105
106
107
            attrs[quantity.name] = quantity.elastic_mapping
        return super(WithDomain, cls).__new__(cls, name, bases, attrs)


class Entry(Document, metaclass=WithDomain):

108
    class Index:
Markus Scheidgen's avatar
Markus Scheidgen committed
109
        name = config.elastic.index_name
110

111
    upload_id = Keyword()
112
    upload_time = Date()
Markus Scheidgen's avatar
Markus Scheidgen committed
113
114
115
116
    calc_id = Keyword()
    calc_hash = Keyword()
    pid = Keyword()
    mainfile = Keyword()
117
    files = Text(multi=True, analyzer=path_analyzer, fields={'keyword': Keyword()})
118
    uploader = Object(User)
Markus Scheidgen's avatar
Markus Scheidgen committed
119

120
121
    with_embargo = Boolean()
    published = Boolean()
122

123
    processed = Boolean()
124
125
126
    last_processing = Date()
    nomad_version = Keyword()
    nomad_commit = Keyword()
127

128
129
    authors = Object(User, multi=True)
    owners = Object(User, multi=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
130
131
    comment = Text()
    references = Keyword()
132
    datasets = Object(Dataset)
133
    external_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
134

135
    @classmethod
136
137
138
139
140
141
142
143
144
145
    def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
        entry = Entry(meta=dict(id=source.calc_id))
        entry.update(source)
        return entry

    def update(self, source: datamodel.CalcWithMetadata) -> None:
        self.upload_id = source.upload_id
        self.upload_time = source.upload_time
        self.calc_id = source.calc_id
        self.calc_hash = source.calc_hash
146
        self.pid = None if source.pid is None else str(source.pid)
147

148
        self.processed = source.processed
149
150
151
        self.last_processing = source.last_processing
        self.nomad_version = source.nomad_version
        self.nomad_commit = source.nomad_commit
152

153
        self.mainfile = source.mainfile
154
155
156
157
158
159
160
        if source.files is None:
            self.files = [self.mainfile]
        elif self.mainfile not in source.files:
            self.files = [self.mainfile] + source.files
        else:
            self.files = source.files

161
162
        self.uploader = User.from_user_popo(source.uploader) if source.uploader is not None else None

Markus Scheidgen's avatar
Markus Scheidgen committed
163
        self.with_embargo = bool(source.with_embargo)
164
        self.published = source.published
165
166
167
        self.authors = [User.from_user_popo(user) for user in source.coauthors]
        self.owners = [User.from_user_popo(user) for user in source.shared_with]
        if self.uploader is not None:
168
169
170
171
            if self.uploader not in self.authors:
                self.authors.append(self.uploader)
            if self.uploader not in self.owners:
                self.owners.append(self.uploader)
172
173
174
        self.comment = source.comment
        self.references = [ref.value for ref in source.references]
        self.datasets = [Dataset.from_dataset_popo(ds) for ds in source.datasets]
175
        self.external_id = source.external_id
176

177
        for quantity in datamodel.Domain.instance.domain_quantities.values():
178
179
180
            setattr(
                self, quantity.name,
                quantity.elastic_value(getattr(source, quantity.metadata_field)))
181
182


183
184
185
186
187
188
189
190
191
192
193
194
def delete_upload(upload_id):
    """ Delete all entries with given ``upload_id`` from the index. """
    index = Entry._default_index()
    Search(index=index).query('match', upload_id=upload_id).delete()


def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
    """ Update all given calcs with their metadata and set ``publish = True``. """
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry.published = True
195
196
197
198
199
            entry = entry.to_dict(include_meta=True)
            source = entry.pop('_source')
            entry['doc'] = source
            entry['_op_type'] = 'update'
            yield entry
200
201

    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
202
203
204
    refresh()


205
def index_all(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
206
207
208
209
210
211
    """
    Adds all given calcs with their metadata to the index.

    Returns:
        Number of failed entries.
    """
212
213
214
215
216
217
218
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry = entry.to_dict(include_meta=True)
            entry['_op_type'] = 'index'
            yield entry

219
    _, failed = elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates(), stats_only=True)
220
    refresh()
221
    return failed
222
223


224
225
def refresh():
    infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
226
227


228
quantities = datamodel.Domain.instance.quantities
229
"""The available search quantities """
230

231
metrics = datamodel.Domain.instance.metrics
232
233
"""
The available search metrics. Metrics are integer values given for each entry that can
234
be used in statistics (aggregations), e.g. the sum of all total energy calculations or cardinality of
235
236
237
all unique geometries.
"""

238
239
metrics_names = datamodel.Domain.instance.metrics_names
""" Names of all available metrics """
240
241

order_default_quantity = None
242
for quantity in datamodel.Domain.instance.quantities.values():
243
244
    if quantity.order_default:
        order_default_quantity = quantity.name
Markus Scheidgen's avatar
Markus Scheidgen committed
245

246

247
class SearchRequest:
248
    '''
249
250
    Represents a search request and allows to execute that request.
    It allows to compose the following features: a query;
251
252
253
    statistics (metrics and aggregations); quantity values; scrolling, pagination for entries;
    scrolling for quantity values.

254
255
256
    The query part filters NOMAD data before the other features come into effect. There
    are specialized methods for configuring the :func:`owner` and :func:`time_range` queries.
    Quantity's can be search for by setting them as attributes.
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276

    The aggregations for statistics can be requested for pre-configured quantities. These
    bucket aggregations come with a metric calculated for each each possible
    quantity value.

    The other possible form of aggregations, allows to get quantity values as results
    (e.g. get all datasets, get all users, etc.). Each value can be accompanied by metrics
    (over all entries with that value) and an example value.

    Of course, searches can return a set of search results. Search objects can be
    configured with pagination or scrolling for these results. Pagination is the default
    and also allows ordering of results. Scrolling can be used if all entries need to be
    'scrolled through'. This might be necessary, since elastic search has limits on
    possible pages (e.g. 'from' must by smaller than 10000). On the downside, there is no
    ordering on scrolling.

    There is also scrolling for quantities to go through all quantity values. There is no
    paging for aggregations.
    '''
    def __init__(self, query=None):
277
278
        self._query = query
        self._search = Search(index=config.elastic.index_name)
279
280
281
282
283
284

    def owner(self, owner_type: str = 'all', user_id: str = None):
        """
        Uses the query part of the search to restrict the results based on the owner.
        The possible types are: ``all`` for all calculations; ``public`` for
        caclulations visible by everyone, excluding entries only visible to the given user;
285
        ``user`` for all calculations of to the given user; ``staging`` for all
286
287
288
289
290
        calculations in staging of the given user.

        Arguments:
            owner_type: The type of the owner query, see above.
            user_id: The 'owner' given as the user's unique id.
291
292
293
294
295

        Raises:
            KeyError: If the given owner_type is not supported
            ValueError: If the owner_type requires a user but none is given, or the
                given user is not allowed to use the given owner_type.
296
        """
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
        if owner_type == 'all':
            q = Q('term', published=True) & Q('term', with_embargo=False)
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
        elif owner_type == 'public':
            q = Q('term', published=True) & Q('term', with_embargo=False)
        elif owner_type == 'user':
            if user_id is None:
                raise ValueError('Authentication required for owner value user.')

            q = Q('term', owners__user_id=user_id)
        elif owner_type == 'staging':
            if user_id is None:
                raise ValueError('Authentication required for owner value user')
            q = Q('term', published=False) & Q('term', owners__user_id=user_id)
        elif owner_type == 'admin':
313
            if user_id is None or not coe_repo.User.from_user_id(user_id).is_admin:
314
315
316
317
318
319
                raise ValueError('This can only be used by the admin user.')
            q = None
        else:
            raise KeyError('Unsupported owner value')

        if q is not None:
320
            self.q = self.q & q
321

322
323
324
325
326
327
        return self

    def search_parameters(self, **kwargs):
        """
        Configures the existing query with additional search parameters. Kwargs are
        interpreted as key value pairs. Keys have to coresspond to valid entry quantities
328
329
        in the domain's (DFT calculations) datamodel. Alternatively search parameters
        can be set via attributes.
330
        """
331
332
        for name, value in kwargs.items():
            self.search_parameter(name, value)
333

334
        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
335

336
    def search_parameter(self, name, value):
337
        quantity = quantities.get(name, None)
338
339
340
341
342
343
344
345
346
347
348
349
350
351
        if quantity is None:
            raise KeyError('Unknown quantity %s' % name)

        if quantity.multi and not isinstance(value, list):
            value = [value]

        value = quantity.elastic_value(value)

        if isinstance(value, list):
            values = value
        else:
            values = [value]

        for item in values:
352
            self.q &= Q(quantity.elastic_search_type, **{quantity.elastic_field: item})
353

354
355
        return self

356
357
358
359
    def query(self, query):
        """ Adds the given query as a 'and' (i.e. 'must') clause to the request. """
        self._query &= query

360
361
        return self

362
363
    def time_range(self, start: datetime, end: datetime):
        """ Adds a time range to the query. """
364
365
366
367
368
369
370
371
372
373
        if start is None and end is None:
            return self

        if start is None:
            start = datetime.fromtimestamp(0)
        if end is None:
            end = datetime.utcnow()

        self.q &= Q('range', upload_time=dict(gte=start, lte=end))

374
375
        return self

376
377
378
379
380
    @property
    def q(self):
        """ The underlying elasticsearch_dsl query object """
        if self._query is None:
            return Q('match_all')
381
382
        else:
            return self._query
383
384

    @q.setter
385
    def q(self, q):
386
        self._query = q
387

388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
    def totals(self, metrics_to_use: List[str] = []):
        """
        Configure the request to return overall totals for the given metrics.

        The statics are returned with the other quantity statistics under the pseudo
        quantity name 'total'. 'total' contains the pseudo value 'all'. It is used to
        store the metrics aggregated over all entries in the search results.
        """
        self._add_metrics(self._search.aggs, metrics_to_use)
        return self

    def default_statistics(self, metrics_to_use: List[str] = []):
        """
        Configures the domain's default statistics.
        """
        for name in datamodel.Domain.instance.default_statistics:
            self.statistic(
                name,
                quantities[name].aggregations,
                metrics_to_use=metrics_to_use)

        return self

    def statistic(self, quantity_name: str, size: int, metrics_to_use: List[str] = []):
412
413
414
415
416
417
418
419
420
421
422
423
424
425
        """
        This can be used to display statistics over the searched entries and allows to
        implement faceted search on the top values for each quantity.

        The metrics contain overall and per quantity value sums of code runs (calcs),
        unique code runs, datasets, and additional domain specific metrics
        (e.g. total energies, and unique geometries for DFTcalculations). The quantities
        that can be aggregated to metrics are defined in module:`datamodel`. Aggregations
        and respective metrics are calculated for aggregations given in ``aggregations``
        and metrics in ``aggregation_metrics``. As a pseudo aggregation ``total_metrics``
        are calculation over all search results. The ``aggregations`` gives tuples of
        quantities and default aggregation sizes.

        The search results will contain a dictionary ``statistics``. This has a key
426
        for each configured quantity. Each quantity key will hold a dict
427
428
429
430
        with a key for each quantity value. Each quantity value key will hold a dict
        with a key for each metric. The values will be the actual aggregated metric values.

        Arguments:
431
            quantity_name: The quantity to aggregate statistics for. Only works on *keyword* field.
432
433
434
435
            metrics_to_use: The metrics calculated over the aggregations. Can be
                ``unique_code_runs``, ``datasets``, other domain specific metrics.
                The basic doc_count metric ``code_runs`` is always given.
        """
436
437
438
439
440
        quantity = quantities[quantity_name]
        min_doc_count = 0 if quantity.zero_aggs else 1
        terms = A(
            'terms', field=quantity.elastic_field, size=size, min_doc_count=min_doc_count,
            order=dict(_key='asc'))
441

442
443
        buckets = self._search.aggs.bucket('statistics:%s' % quantity_name, terms)
        self._add_metrics(buckets, metrics_to_use)
444
445
446
447
448
449
450
451

        return self

    def _add_metrics(self, parent=None, metrics_to_use: List[str] = []):
        if parent is None:
            parent = self._search.aggs

        for metric in metrics_to_use:
452
453
            field, metric_kind = metrics[metric]
            parent.metric('metric:%s' % metric, A(metric_kind, field=field))
454
455
456
457
458
459
460
461

    def date_histogram(self, metrics_to_use: List[str] = []):
        """
        Adds a date histogram on the given metrics to the statistics part.
        """
        histogram = A('date_histogram', field='upload_time', interval='1M', format='yyyy-MM-dd')
        self._add_metrics(self._search.aggs.bucket('statistics:date_histogram', histogram), metrics_to_use)

462
463
464
465
        return self

    def quantities(self, **kwargs):
        """
466
467
468
469
470
471
472
473
474
        Shorthand for adding multiple quantities. See :func:`quantity`. Keywork argument
        keys are quantity name, values are tuples of size and after value.
        """
        for name, spec in kwargs:
            size, after = spec
            self.quantity(name, after=after, size=size)

        return self

475
    def quantity(self, name, size=100, after=None, examples=0, examples_source=None):
476
477
        """
        Adds a requests for values of the given quantity.
478
479
480
481
482
483
484
        It allows to scroll through all values via elasticsearch's
        composite aggregations. The response will contain the quantity values and
        an example entry for each value.

        This can be used to implement continues scrolling through authors, datasets,
        or uploads within the searched entries.

485
486
        If one or more quantities are specified,
        the search results will contain a dictionary ``quantities``. The keys are quantity
487
488
489
        name the values dictionary with 'after' and 'values' key.
        The 'values' key holds a dict with all the values as keys and their entry count
        as values (i.e. number of entries with that value).
490
491

        Arguments:
492
            name: The quantity name. Must be in :data:`quantities`.
493
494
495
496
497
498
            after: The 'after' value allows to scroll over various requests, by providing
                the 'after' value of the last search. The 'after' value is part of the
                response. Use ``None`` in the first request.
            size:
                The size gives the ammount of maximum values in the next scroll window.
                If the size is None, a maximum of 100 quantity values will be requested.
499
        """
500
501
502
        if size is None:
            size = 100

503
        quantity = quantities[name]
504
505
        terms = A('terms', field=quantity.elastic_field)

506
507
508
        # We are using elastic searchs 'composite aggregations' here. We do not really
        # compose aggregations, but only those pseudo composites allow us to use the
        # 'after' feature that allows to scan through all aggregation values.
509
510
511
512
        composite = dict(sources={name: terms}, size=size)
        if after is not None:
            composite['after'] = {name: after}

513
514
515
516
517
518
519
        composite = self._search.aggs.bucket('quantity:%s' % name, 'composite', **composite)
        if examples > 0:
            kwargs = {}
            if examples_source is not None:
                kwargs.update(_source=dict(includes=examples_source))

            composite.metric('examples', A('top_hits', size=examples, **kwargs))
520
521
522

        return self

523
524
525
526
527
    def execute(self):
        """
        Exectutes without returning actual results. Only makes sense if the request
        was configured for statistics or quantity values.
        """
528
        return self._response(self._search.query(self.q)[0:0].execute())
529
530

    def execute_scan(self):
531
532
533
534
        """
        This execute the search as scan. The result will be a generator over the found
        entries. Everything but the query part of this object, will be ignored.
        """
535
        for hit in self._search.query(self.q).scan():
536
            yield hit.to_dict()
537

538
    def execute_paginated(
539
540
541
542
543
544
545
546
547
548
549
            self, page: int = 1, per_page=10, order_by: str = order_default_quantity,
            order: int = -1):
        """
        Executes the search and returns paginated results. Those are sorted.

        Arguments:
            page: The requested page, starts with 1.
            per_page: The number of entries per page.
            order_by: The quantity to order by.
            order: -1 or 1 for descending or ascending order.
        """
550
        search = self._search.query(self.q)
551

552
        if order_by not in quantities:
553
554
            raise KeyError('Unknown order quantity %s' % order_by)

555
        order_by_quantity = quantities[order_by]
556
557
558
559
560
561
562

        if order == 1:
            search = search.sort(order_by_quantity.elastic_field)
        else:
            search = search.sort('-%s' % order_by_quantity.elastic_field)
        search = search[(page - 1) * per_page: page * per_page]

563
        result = self._response(search.execute(), with_hits=True)
564
        result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
565
        return result
566
567

    def execute_scrolled(self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m'):
568
        """
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
        Executes a scrolling search. based on ES scroll API. Pagination is replaced with
        scrolling, no ordering is available, no statistics, no quantities will be provided.

        Scrolling is done by calling this function again and again with the same ``scroll_id``.
        Each time, this function will return the next batch of search results. If the
        ``scroll_id`` is not available anymore, a new ``scroll_id`` is assigned and scrolling
        starts from the beginning again.

        The response will contain a 'scroll' part with attributes 'total', 'scroll_id',
        and 'size'.

        Arguments:
            scroll_id: The scroll id to receive the next batch from. None will create a new
                scroll.
            size: The batch size in number of hits.
            scroll: The time the scroll should be kept alive (i.e. the time between requests
                to this method) in ES time units. Default is 5 minutes.
586
        """
587
588
589
590
591
        es = infrastructure.elastic_client

        if scroll_id is None:
            # initiate scroll
            resp = es.search(  # pylint: disable=E1123
592
                body=self._search.query(self.q).to_dict(), scroll=scroll, size=size,
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
                index=config.elastic.index_name)

            scroll_id = resp.get('_scroll_id')
            if scroll_id is None:
                # no results for search query
                return dict(scroll=dict(total=0, size=size), results=[])

        else:
            try:
                resp = es.scroll(scroll_id, scroll=scroll)  # pylint: disable=E1123
            except NotFoundError:
                raise ScrollIdNotFound()

        total = resp['hits']['total']
        results = list(hit['_source'] for hit in resp['hits']['hits'])

        # since we are using the low level api here, we should check errors
        if resp["_shards"]["successful"] < resp["_shards"]["total"]:
            utils.get_logger(__name__).error('es operation was unsuccessful on at least one shard')
            raise ElasticSearchError('es operation was unsuccessful on at least one shard')

        if len(results) == 0:
            es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))  # pylint: disable=E1123
            scroll_id = None

        scroll_info = dict(total=total, size=size)
        if scroll_id is not None:
            scroll_info.update(scroll_id=scroll_id)

        return dict(scroll=scroll_info, results=results)

624
625
626
627
628
629
    def _response(self, response, with_hits: bool = False) -> Dict[str, Any]:
        """
        Prepares a response object covering the total number of resutls, hits, statistics,
        and quantities. Other aspects like pagination and scrolling have to be added
        elsewhere.
        """
630
        result: Dict[str, Any] = dict()
631
        aggs = response.aggregations.to_dict()
632

633
        # total
634
635
636
637
        total = response.hits.total if hasattr(response, 'hits') else 0
        result.update(total=total)

        # hits
638
639
        if len(response.hits) > 0 or with_hits:
            result.update(results=[hit.to_dict() for hit in response.hits])
640
641
642

        # statistics
        def get_metrics(bucket, code_runs):
643
644
645
646
647
648
            result = {}
            for metric in metrics_names:
                agg_name = 'metric:%s' % metric
                if agg_name in bucket:
                    result[metric] = bucket[agg_name]['value']
                result.update(code_runs=code_runs)
649
650
            return result

651
        statistics_results = {
652
653
654
            quantity_name[11:]: {
                bucket['key']: get_metrics(bucket, bucket['doc_count'])
                for bucket in quantity['buckets']
655
            }
656
            for quantity_name, quantity in aggs.items()
657
658
659
            if quantity_name.startswith('statistics:')
        }

660
661
662
663
664
665
        # totals
        totals_result = get_metrics(aggs, total)
        statistics_results['total'] = dict(all=totals_result)

        if len(statistics_results) > 0:
            result.update(statistics=statistics_results)
666
667

        # quantities
668
        def create_quantity_result(quantity_name, quantity):
669
670
671
672
673
674
675
            values = {}
            for bucket in quantity['buckets']:
                value = dict(
                    total=bucket['doc_count'])
                if 'examples' in bucket:
                    examples = [hit['_source'] for hit in bucket['examples']['hits']['hits']]
                    value.update(examples=examples)
676

677
678
679
                values[bucket['key'][quantity_name]] = value

            result = dict(values=values)
680
681
            if 'after_key' in quantity:
                result.update(after=quantity['after_key'][quantity_name])
682
683
684
685

            return result

        quantity_results = {
686
687
688
            quantity_name[9:]: create_quantity_result(quantity_name[9:], quantity)
            for quantity_name, quantity in aggs.items()
            if quantity_name.startswith('quantity:')
689
690
        }

691
692
        if len(quantity_results) > 0:
            result.update(quantities=quantity_results)
693
694

        return result
695
696
697

    def __str__(self):
        return json.dumps(self._search.to_dict(), indent=2)
698
699
700
701
702
703
704
705


def to_calc_with_metadata(results: List[Dict[str, Any]]):
    """ Translates search results into :class:`CalcWithMetadata` objects read from mongo. """
    ids = [result['calc_id'] for result in results]
    return [
        datamodel.CalcWithMetadata(**calc.metadata)
        for calc in proc.Calc.objects(calc_id__in=ids)]