search.py 26.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module represents calculations in elastic search.
"""

19
from typing import Iterable, Dict, List, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
20
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
21
22
    Object, Boolean, Search, Q, A, analyzer, tokenizer
from elasticsearch_dsl.document import IndexMeta
23
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
24
from elasticsearch.exceptions import NotFoundError
Markus Scheidgen's avatar
Markus Scheidgen committed
25
from datetime import datetime
26
import json
27

28
from nomad import config, datamodel, infrastructure, datamodel, coe_repo, utils, processing as proc
29

30

31
32
33
34
path_analyzer = analyzer(
    'path_analyzer',
    tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))

35

Markus Scheidgen's avatar
Markus Scheidgen committed
36
user_cache: Dict[str, Any] = dict()
37
38
39
40
41
42
"""
A cache for user popos used in the index. We will not retrieve names all the time.
This cache should be cleared, before larger re-index operations.
"""


43
class AlreadyExists(Exception): pass
44
45


46
47
48
class ElasticSearchError(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
49
50
51
class ScrollIdNotFound(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
52
class User(InnerDoc):
53
54
55

    @classmethod
    def from_user_popo(cls, user):
56
57
58
        self = user_cache.get(user.id, None)
        if self is None:
            self = cls(user_id=user.id)
59

60
61
            if 'first_name' not in user:
                user = coe_repo.User.from_user_id(user.id).to_popo()
62

63
64
65
66
67
68
69
70
71
72
73
74
            last_name = user['last_name'].strip()
            first_name = user['first_name'].strip()

            if len(last_name) > 0 and len(first_name) > 0:
                name = '%s, %s' % (user['last_name'], user['first_name'])
            elif len(last_name) != 0:
                name = last_name
            elif len(first_name) != 0:
                name = first_name
            else:
                name = 'unnamed user with id %d' % user.id

75
76
            self.name = name
            user_cache[user.id] = self
77
78

        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
79

80
    user_id = Keyword()
81
    name = Text(fields={'keyword': Keyword()})
Markus Scheidgen's avatar
Markus Scheidgen committed
82
83
84


class Dataset(InnerDoc):
85
86
87
88

    @classmethod
    def from_dataset_popo(cls, dataset):
        return cls(
Markus Scheidgen's avatar
Markus Scheidgen committed
89
            id=dataset.id,
90
            doi=dataset.doi['value'] if dataset.doi is not None else None,
Markus Scheidgen's avatar
Markus Scheidgen committed
91
92
93
94
95
96
97
            name=dataset.name)

    id = Keyword()
    doi = Keyword()
    name = Keyword()


98
99
100
class WithDomain(IndexMeta):
    """ Override elasticsearch_dsl metaclass to sneak in domain specific mappings """
    def __new__(cls, name, bases, attrs):
101
        for quantity in datamodel.Domain.instance.domain_quantities.values():
102
103
104
105
106
107
            attrs[quantity.name] = quantity.elastic_mapping
        return super(WithDomain, cls).__new__(cls, name, bases, attrs)


class Entry(Document, metaclass=WithDomain):

108
    class Index:
Markus Scheidgen's avatar
Markus Scheidgen committed
109
        name = config.elastic.index_name
110

111
    upload_id = Keyword()
112
    upload_time = Date()
Markus Scheidgen's avatar
Markus Scheidgen committed
113
114
115
    calc_id = Keyword()
    calc_hash = Keyword()
    pid = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
116
    raw_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
117
    mainfile = Keyword()
118
    files = Text(multi=True, analyzer=path_analyzer, fields={'keyword': Keyword()})
119
    uploader = Object(User)
Markus Scheidgen's avatar
Markus Scheidgen committed
120

121
122
    with_embargo = Boolean()
    published = Boolean()
123

124
    processed = Boolean()
125
126
127
    last_processing = Date()
    nomad_version = Keyword()
    nomad_commit = Keyword()
128

129
130
    authors = Object(User, multi=True)
    owners = Object(User, multi=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
131
132
    comment = Text()
    references = Keyword()
133
    datasets = Object(Dataset)
134
    external_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
135

136
    @classmethod
137
138
139
140
141
142
143
144
145
146
    def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
        entry = Entry(meta=dict(id=source.calc_id))
        entry.update(source)
        return entry

    def update(self, source: datamodel.CalcWithMetadata) -> None:
        self.upload_id = source.upload_id
        self.upload_time = source.upload_time
        self.calc_id = source.calc_id
        self.calc_hash = source.calc_hash
147
        self.pid = None if source.pid is None else str(source.pid)
Markus Scheidgen's avatar
Markus Scheidgen committed
148
        self.raw_id = None if source.raw_id is None else str(source.raw_id)
149

150
        self.processed = source.processed
151
152
153
        self.last_processing = source.last_processing
        self.nomad_version = source.nomad_version
        self.nomad_commit = source.nomad_commit
154

155
        self.mainfile = source.mainfile
156
157
158
159
160
161
162
        if source.files is None:
            self.files = [self.mainfile]
        elif self.mainfile not in source.files:
            self.files = [self.mainfile] + source.files
        else:
            self.files = source.files

163
164
        self.uploader = User.from_user_popo(source.uploader) if source.uploader is not None else None

Markus Scheidgen's avatar
Markus Scheidgen committed
165
        self.with_embargo = bool(source.with_embargo)
166
        self.published = source.published
167
168
169
        self.authors = [User.from_user_popo(user) for user in source.coauthors]
        self.owners = [User.from_user_popo(user) for user in source.shared_with]
        if self.uploader is not None:
170
171
172
173
            if self.uploader not in self.authors:
                self.authors.append(self.uploader)
            if self.uploader not in self.owners:
                self.owners.append(self.uploader)
174
175
176
        self.comment = source.comment
        self.references = [ref.value for ref in source.references]
        self.datasets = [Dataset.from_dataset_popo(ds) for ds in source.datasets]
177
        self.external_id = source.external_id
178

179
        for quantity in datamodel.Domain.instance.domain_quantities.values():
180
181
182
            setattr(
                self, quantity.name,
                quantity.elastic_value(getattr(source, quantity.metadata_field)))
183
184


185
186
187
188
189
190
191
192
193
194
195
196
def delete_upload(upload_id):
    """ Delete all entries with given ``upload_id`` from the index. """
    index = Entry._default_index()
    Search(index=index).query('match', upload_id=upload_id).delete()


def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
    """ Update all given calcs with their metadata and set ``publish = True``. """
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry.published = True
197
198
199
200
201
            entry = entry.to_dict(include_meta=True)
            source = entry.pop('_source')
            entry['doc'] = source
            entry['_op_type'] = 'update'
            yield entry
202
203

    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
204
205
206
    refresh()


Markus Scheidgen's avatar
Markus Scheidgen committed
207
def index_all(calcs: Iterable[datamodel.CalcWithMetadata], refresh=True) -> None:
208
209
210
211
212
213
    """
    Adds all given calcs with their metadata to the index.

    Returns:
        Number of failed entries.
    """
214
215
216
217
218
219
220
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry = entry.to_dict(include_meta=True)
            entry['_op_type'] = 'index'
            yield entry

221
    _, failed = elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates(), stats_only=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
222
223
224
225

    if refresh:
        refresh()

226
    return failed
227
228


229
230
def refresh():
    infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
231
232


233
quantities = datamodel.Domain.instance.quantities
234
"""The available search quantities """
235

236
metrics = datamodel.Domain.instance.metrics
237
238
"""
The available search metrics. Metrics are integer values given for each entry that can
239
be used in statistics (aggregations), e.g. the sum of all total energy calculations or cardinality of
240
241
242
all unique geometries.
"""

243
244
metrics_names = datamodel.Domain.instance.metrics_names
""" Names of all available metrics """
245
246

order_default_quantity = None
247
for quantity in datamodel.Domain.instance.quantities.values():
248
249
    if quantity.order_default:
        order_default_quantity = quantity.name
Markus Scheidgen's avatar
Markus Scheidgen committed
250

251

252
class SearchRequest:
253
    '''
254
255
    Represents a search request and allows to execute that request.
    It allows to compose the following features: a query;
256
257
258
    statistics (metrics and aggregations); quantity values; scrolling, pagination for entries;
    scrolling for quantity values.

259
260
261
    The query part filters NOMAD data before the other features come into effect. There
    are specialized methods for configuring the :func:`owner` and :func:`time_range` queries.
    Quantity's can be search for by setting them as attributes.
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281

    The aggregations for statistics can be requested for pre-configured quantities. These
    bucket aggregations come with a metric calculated for each each possible
    quantity value.

    The other possible form of aggregations, allows to get quantity values as results
    (e.g. get all datasets, get all users, etc.). Each value can be accompanied by metrics
    (over all entries with that value) and an example value.

    Of course, searches can return a set of search results. Search objects can be
    configured with pagination or scrolling for these results. Pagination is the default
    and also allows ordering of results. Scrolling can be used if all entries need to be
    'scrolled through'. This might be necessary, since elastic search has limits on
    possible pages (e.g. 'from' must by smaller than 10000). On the downside, there is no
    ordering on scrolling.

    There is also scrolling for quantities to go through all quantity values. There is no
    paging for aggregations.
    '''
    def __init__(self, query=None):
282
283
        self._query = query
        self._search = Search(index=config.elastic.index_name)
284
285
286
287
288
289

    def owner(self, owner_type: str = 'all', user_id: str = None):
        """
        Uses the query part of the search to restrict the results based on the owner.
        The possible types are: ``all`` for all calculations; ``public`` for
        caclulations visible by everyone, excluding entries only visible to the given user;
290
        ``user`` for all calculations of to the given user; ``staging`` for all
291
292
293
294
295
        calculations in staging of the given user.

        Arguments:
            owner_type: The type of the owner query, see above.
            user_id: The 'owner' given as the user's unique id.
296
297
298
299
300

        Raises:
            KeyError: If the given owner_type is not supported
            ValueError: If the owner_type requires a user but none is given, or the
                given user is not allowed to use the given owner_type.
301
        """
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
        if owner_type == 'all':
            q = Q('term', published=True) & Q('term', with_embargo=False)
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
        elif owner_type == 'public':
            q = Q('term', published=True) & Q('term', with_embargo=False)
        elif owner_type == 'user':
            if user_id is None:
                raise ValueError('Authentication required for owner value user.')

            q = Q('term', owners__user_id=user_id)
        elif owner_type == 'staging':
            if user_id is None:
                raise ValueError('Authentication required for owner value user')
            q = Q('term', published=False) & Q('term', owners__user_id=user_id)
        elif owner_type == 'admin':
318
            if user_id is None or not coe_repo.User.from_user_id(user_id).is_admin:
319
320
321
322
323
324
                raise ValueError('This can only be used by the admin user.')
            q = None
        else:
            raise KeyError('Unsupported owner value')

        if q is not None:
325
            self.q = self.q & q
326

327
328
329
330
331
332
        return self

    def search_parameters(self, **kwargs):
        """
        Configures the existing query with additional search parameters. Kwargs are
        interpreted as key value pairs. Keys have to coresspond to valid entry quantities
333
334
        in the domain's (DFT calculations) datamodel. Alternatively search parameters
        can be set via attributes.
335
        """
336
337
        for name, value in kwargs.items():
            self.search_parameter(name, value)
338

339
        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
340

341
    def search_parameter(self, name, value):
342
        quantity = quantities.get(name, None)
343
344
345
346
347
348
349
350
        if quantity is None:
            raise KeyError('Unknown quantity %s' % name)

        if quantity.multi and not isinstance(value, list):
            value = [value]

        value = quantity.elastic_value(value)

351
352
353
354
355
356
357
        if quantity.elastic_search_type == 'terms':
            if not isinstance(value, list):
                value = [value]
            self.q &= Q('terms', **{quantity.elastic_field: value})

            return self

358
359
360
361
362
363
        if isinstance(value, list):
            values = value
        else:
            values = [value]

        for item in values:
364
            self.q &= Q(quantity.elastic_search_type, **{quantity.elastic_field: item})
365

366
367
        return self

368
369
370
371
    def query(self, query):
        """ Adds the given query as a 'and' (i.e. 'must') clause to the request. """
        self._query &= query

372
373
        return self

374
375
    def time_range(self, start: datetime, end: datetime):
        """ Adds a time range to the query. """
376
377
378
379
380
381
382
383
384
385
        if start is None and end is None:
            return self

        if start is None:
            start = datetime.fromtimestamp(0)
        if end is None:
            end = datetime.utcnow()

        self.q &= Q('range', upload_time=dict(gte=start, lte=end))

386
387
        return self

388
389
390
391
392
    @property
    def q(self):
        """ The underlying elasticsearch_dsl query object """
        if self._query is None:
            return Q('match_all')
393
394
        else:
            return self._query
395
396

    @q.setter
397
    def q(self, q):
398
        self._query = q
399

400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
    def totals(self, metrics_to_use: List[str] = []):
        """
        Configure the request to return overall totals for the given metrics.

        The statics are returned with the other quantity statistics under the pseudo
        quantity name 'total'. 'total' contains the pseudo value 'all'. It is used to
        store the metrics aggregated over all entries in the search results.
        """
        self._add_metrics(self._search.aggs, metrics_to_use)
        return self

    def default_statistics(self, metrics_to_use: List[str] = []):
        """
        Configures the domain's default statistics.
        """
        for name in datamodel.Domain.instance.default_statistics:
            self.statistic(
                name,
                quantities[name].aggregations,
                metrics_to_use=metrics_to_use)

        return self

    def statistic(self, quantity_name: str, size: int, metrics_to_use: List[str] = []):
424
425
426
427
428
429
430
431
432
433
434
435
436
437
        """
        This can be used to display statistics over the searched entries and allows to
        implement faceted search on the top values for each quantity.

        The metrics contain overall and per quantity value sums of code runs (calcs),
        unique code runs, datasets, and additional domain specific metrics
        (e.g. total energies, and unique geometries for DFTcalculations). The quantities
        that can be aggregated to metrics are defined in module:`datamodel`. Aggregations
        and respective metrics are calculated for aggregations given in ``aggregations``
        and metrics in ``aggregation_metrics``. As a pseudo aggregation ``total_metrics``
        are calculation over all search results. The ``aggregations`` gives tuples of
        quantities and default aggregation sizes.

        The search results will contain a dictionary ``statistics``. This has a key
438
        for each configured quantity. Each quantity key will hold a dict
439
440
441
442
        with a key for each quantity value. Each quantity value key will hold a dict
        with a key for each metric. The values will be the actual aggregated metric values.

        Arguments:
443
            quantity_name: The quantity to aggregate statistics for. Only works on *keyword* field.
444
445
446
447
            metrics_to_use: The metrics calculated over the aggregations. Can be
                ``unique_code_runs``, ``datasets``, other domain specific metrics.
                The basic doc_count metric ``code_runs`` is always given.
        """
448
449
450
451
452
        quantity = quantities[quantity_name]
        min_doc_count = 0 if quantity.zero_aggs else 1
        terms = A(
            'terms', field=quantity.elastic_field, size=size, min_doc_count=min_doc_count,
            order=dict(_key='asc'))
453

454
455
        buckets = self._search.aggs.bucket('statistics:%s' % quantity_name, terms)
        self._add_metrics(buckets, metrics_to_use)
456
457
458
459
460
461
462
463

        return self

    def _add_metrics(self, parent=None, metrics_to_use: List[str] = []):
        if parent is None:
            parent = self._search.aggs

        for metric in metrics_to_use:
464
465
            field, metric_kind = metrics[metric]
            parent.metric('metric:%s' % metric, A(metric_kind, field=field))
466
467
468
469
470
471
472
473

    def date_histogram(self, metrics_to_use: List[str] = []):
        """
        Adds a date histogram on the given metrics to the statistics part.
        """
        histogram = A('date_histogram', field='upload_time', interval='1M', format='yyyy-MM-dd')
        self._add_metrics(self._search.aggs.bucket('statistics:date_histogram', histogram), metrics_to_use)

474
475
476
477
        return self

    def quantities(self, **kwargs):
        """
478
479
480
481
482
483
484
485
486
        Shorthand for adding multiple quantities. See :func:`quantity`. Keywork argument
        keys are quantity name, values are tuples of size and after value.
        """
        for name, spec in kwargs:
            size, after = spec
            self.quantity(name, after=after, size=size)

        return self

487
    def quantity(self, name, size=100, after=None, examples=0, examples_source=None):
488
489
        """
        Adds a requests for values of the given quantity.
490
491
492
493
494
495
496
        It allows to scroll through all values via elasticsearch's
        composite aggregations. The response will contain the quantity values and
        an example entry for each value.

        This can be used to implement continues scrolling through authors, datasets,
        or uploads within the searched entries.

497
498
        If one or more quantities are specified,
        the search results will contain a dictionary ``quantities``. The keys are quantity
499
500
501
        name the values dictionary with 'after' and 'values' key.
        The 'values' key holds a dict with all the values as keys and their entry count
        as values (i.e. number of entries with that value).
502
503

        Arguments:
504
            name: The quantity name. Must be in :data:`quantities`.
505
506
507
508
509
510
            after: The 'after' value allows to scroll over various requests, by providing
                the 'after' value of the last search. The 'after' value is part of the
                response. Use ``None`` in the first request.
            size:
                The size gives the ammount of maximum values in the next scroll window.
                If the size is None, a maximum of 100 quantity values will be requested.
511
        """
512
513
514
        if size is None:
            size = 100

515
        quantity = quantities[name]
516
517
        terms = A('terms', field=quantity.elastic_field)

518
519
520
        # We are using elastic searchs 'composite aggregations' here. We do not really
        # compose aggregations, but only those pseudo composites allow us to use the
        # 'after' feature that allows to scan through all aggregation values.
521
522
523
524
        composite = dict(sources={name: terms}, size=size)
        if after is not None:
            composite['after'] = {name: after}

525
526
527
528
529
530
531
        composite = self._search.aggs.bucket('quantity:%s' % name, 'composite', **composite)
        if examples > 0:
            kwargs = {}
            if examples_source is not None:
                kwargs.update(_source=dict(includes=examples_source))

            composite.metric('examples', A('top_hits', size=examples, **kwargs))
532
533
534

        return self

535
536
537
538
539
    def execute(self):
        """
        Exectutes without returning actual results. Only makes sense if the request
        was configured for statistics or quantity values.
        """
540
        return self._response(self._search.query(self.q)[0:0].execute())
541
542

    def execute_scan(self):
543
544
545
546
        """
        This execute the search as scan. The result will be a generator over the found
        entries. Everything but the query part of this object, will be ignored.
        """
547
        for hit in self._search.query(self.q).scan():
548
            yield hit.to_dict()
549

550
    def execute_paginated(
551
552
553
554
555
556
557
558
559
560
561
            self, page: int = 1, per_page=10, order_by: str = order_default_quantity,
            order: int = -1):
        """
        Executes the search and returns paginated results. Those are sorted.

        Arguments:
            page: The requested page, starts with 1.
            per_page: The number of entries per page.
            order_by: The quantity to order by.
            order: -1 or 1 for descending or ascending order.
        """
562
        search = self._search.query(self.q)
563

564
        if order_by not in quantities:
565
566
            raise KeyError('Unknown order quantity %s' % order_by)

567
        order_by_quantity = quantities[order_by]
568
569
570
571
572
573
574

        if order == 1:
            search = search.sort(order_by_quantity.elastic_field)
        else:
            search = search.sort('-%s' % order_by_quantity.elastic_field)
        search = search[(page - 1) * per_page: page * per_page]

575
        result = self._response(search.execute(), with_hits=True)
576
        result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
577
        return result
578
579

    def execute_scrolled(self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m'):
580
        """
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
        Executes a scrolling search. based on ES scroll API. Pagination is replaced with
        scrolling, no ordering is available, no statistics, no quantities will be provided.

        Scrolling is done by calling this function again and again with the same ``scroll_id``.
        Each time, this function will return the next batch of search results. If the
        ``scroll_id`` is not available anymore, a new ``scroll_id`` is assigned and scrolling
        starts from the beginning again.

        The response will contain a 'scroll' part with attributes 'total', 'scroll_id',
        and 'size'.

        Arguments:
            scroll_id: The scroll id to receive the next batch from. None will create a new
                scroll.
            size: The batch size in number of hits.
            scroll: The time the scroll should be kept alive (i.e. the time between requests
                to this method) in ES time units. Default is 5 minutes.
598
        """
599
600
601
602
603
        es = infrastructure.elastic_client

        if scroll_id is None:
            # initiate scroll
            resp = es.search(  # pylint: disable=E1123
604
                body=self._search.query(self.q).to_dict(), scroll=scroll, size=size,
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
                index=config.elastic.index_name)

            scroll_id = resp.get('_scroll_id')
            if scroll_id is None:
                # no results for search query
                return dict(scroll=dict(total=0, size=size), results=[])

        else:
            try:
                resp = es.scroll(scroll_id, scroll=scroll)  # pylint: disable=E1123
            except NotFoundError:
                raise ScrollIdNotFound()

        total = resp['hits']['total']
        results = list(hit['_source'] for hit in resp['hits']['hits'])

        # since we are using the low level api here, we should check errors
        if resp["_shards"]["successful"] < resp["_shards"]["total"]:
            utils.get_logger(__name__).error('es operation was unsuccessful on at least one shard')
            raise ElasticSearchError('es operation was unsuccessful on at least one shard')

        if len(results) == 0:
            es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))  # pylint: disable=E1123
            scroll_id = None

        scroll_info = dict(total=total, size=size)
        if scroll_id is not None:
            scroll_info.update(scroll_id=scroll_id)

        return dict(scroll=scroll_info, results=results)

636
637
638
639
640
641
    def _response(self, response, with_hits: bool = False) -> Dict[str, Any]:
        """
        Prepares a response object covering the total number of resutls, hits, statistics,
        and quantities. Other aspects like pagination and scrolling have to be added
        elsewhere.
        """
642
        result: Dict[str, Any] = dict()
643
        aggs = response.aggregations.to_dict()
644

645
        # total
646
647
648
649
        total = response.hits.total if hasattr(response, 'hits') else 0
        result.update(total=total)

        # hits
650
651
        if len(response.hits) > 0 or with_hits:
            result.update(results=[hit.to_dict() for hit in response.hits])
652
653
654

        # statistics
        def get_metrics(bucket, code_runs):
655
656
657
658
659
660
            result = {}
            for metric in metrics_names:
                agg_name = 'metric:%s' % metric
                if agg_name in bucket:
                    result[metric] = bucket[agg_name]['value']
                result.update(code_runs=code_runs)
661
662
            return result

663
        statistics_results = {
664
665
666
            quantity_name[11:]: {
                bucket['key']: get_metrics(bucket, bucket['doc_count'])
                for bucket in quantity['buckets']
667
            }
668
            for quantity_name, quantity in aggs.items()
669
670
671
            if quantity_name.startswith('statistics:')
        }

672
673
674
675
676
677
        # totals
        totals_result = get_metrics(aggs, total)
        statistics_results['total'] = dict(all=totals_result)

        if len(statistics_results) > 0:
            result.update(statistics=statistics_results)
678
679

        # quantities
680
        def create_quantity_result(quantity_name, quantity):
681
682
683
684
685
686
687
            values = {}
            for bucket in quantity['buckets']:
                value = dict(
                    total=bucket['doc_count'])
                if 'examples' in bucket:
                    examples = [hit['_source'] for hit in bucket['examples']['hits']['hits']]
                    value.update(examples=examples)
688

689
690
691
                values[bucket['key'][quantity_name]] = value

            result = dict(values=values)
692
693
            if 'after_key' in quantity:
                result.update(after=quantity['after_key'][quantity_name])
694
695
696
697

            return result

        quantity_results = {
698
699
700
            quantity_name[9:]: create_quantity_result(quantity_name[9:], quantity)
            for quantity_name, quantity in aggs.items()
            if quantity_name.startswith('quantity:')
701
702
        }

703
704
        if len(quantity_results) > 0:
            result.update(quantities=quantity_results)
705
706

        return result
707
708
709

    def __str__(self):
        return json.dumps(self._search.to_dict(), indent=2)
710
711
712
713
714
715
716
717


def to_calc_with_metadata(results: List[Dict[str, Any]]):
    """ Translates search results into :class:`CalcWithMetadata` objects read from mongo. """
    ids = [result['calc_id'] for result in results]
    return [
        datamodel.CalcWithMetadata(**calc.metadata)
        for calc in proc.Calc.objects(calc_id__in=ids)]