search.py 28.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module represents calculations in elastic search.
"""

19
from typing import Iterable, Dict, List, Any
Markus Scheidgen's avatar
Markus Scheidgen committed
20
from elasticsearch_dsl import Document, InnerDoc, Keyword, Text, Date, \
21
22
    Object, Boolean, Search, Q, A, analyzer, tokenizer
from elasticsearch_dsl.document import IndexMeta
23
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
24
from elasticsearch.exceptions import NotFoundError
Markus Scheidgen's avatar
Markus Scheidgen committed
25
from datetime import datetime
26
import json
27

Markus Scheidgen's avatar
Markus Scheidgen committed
28
from nomad import config, datamodel, infrastructure, datamodel, utils, processing as proc
29

30

31
32
33
34
path_analyzer = analyzer(
    'path_analyzer',
    tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))

35
36

class AlreadyExists(Exception): pass
37
38


39
40
41
class ElasticSearchError(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
42
43
44
class ScrollIdNotFound(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
45
class User(InnerDoc):
46
47

    @classmethod
Markus Scheidgen's avatar
Markus Scheidgen committed
48
49
    def from_user(cls, user):
        self = cls(user_id=user.user_id)
50
51
        self.name = user.name
        self.email = user.email
52
53

        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
54

55
    user_id = Keyword()
56
    email = Keyword()
57
    name = Text(fields={'keyword': Keyword()})
Markus Scheidgen's avatar
Markus Scheidgen committed
58
59
60


class Dataset(InnerDoc):
61
62

    @classmethod
63
    def from_dataset_id(cls, dataset_id):
64
        dataset = datamodel.Dataset.m_def.m_x('me').get(dataset_id=dataset_id)
65
        return cls(id=dataset.dataset_id, doi=dataset.doi, name=dataset.name)
Markus Scheidgen's avatar
Markus Scheidgen committed
66
67
68
69
70
71

    id = Keyword()
    doi = Keyword()
    name = Keyword()


72
73
74
class WithDomain(IndexMeta):
    """ Override elasticsearch_dsl metaclass to sneak in domain specific mappings """
    def __new__(cls, name, bases, attrs):
75
        for quantity in datamodel.Domain.instance.domain_quantities.values():
76
77
78
79
80
81
            attrs[quantity.name] = quantity.elastic_mapping
        return super(WithDomain, cls).__new__(cls, name, bases, attrs)


class Entry(Document, metaclass=WithDomain):

82
    class Index:
Markus Scheidgen's avatar
Markus Scheidgen committed
83
        name = config.elastic.index_name
84

85
    upload_id = Keyword()
86
    upload_time = Date()
87
    upload_name = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
88
89
90
    calc_id = Keyword()
    calc_hash = Keyword()
    pid = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
91
    raw_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
92
    mainfile = Keyword()
93
    files = Text(multi=True, analyzer=path_analyzer, fields={'keyword': Keyword()})
94
    uploader = Object(User)
Markus Scheidgen's avatar
Markus Scheidgen committed
95

96
97
    with_embargo = Boolean()
    published = Boolean()
98

99
    processed = Boolean()
100
101
102
    last_processing = Date()
    nomad_version = Keyword()
    nomad_commit = Keyword()
103

104
105
    authors = Object(User, multi=True)
    owners = Object(User, multi=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
106
107
    comment = Text()
    references = Keyword()
108
    datasets = Object(Dataset)
109
    external_id = Keyword()
Markus Scheidgen's avatar
Markus Scheidgen committed
110

111
    @classmethod
112
113
114
115
116
117
118
119
    def from_calc_with_metadata(cls, source: datamodel.CalcWithMetadata) -> 'Entry':
        entry = Entry(meta=dict(id=source.calc_id))
        entry.update(source)
        return entry

    def update(self, source: datamodel.CalcWithMetadata) -> None:
        self.upload_id = source.upload_id
        self.upload_time = source.upload_time
120
        self.upload_name = source.upload_name
121
122
        self.calc_id = source.calc_id
        self.calc_hash = source.calc_hash
123
        self.pid = None if source.pid is None else str(source.pid)
Markus Scheidgen's avatar
Markus Scheidgen committed
124
        self.raw_id = None if source.raw_id is None else str(source.raw_id)
125

126
        self.processed = source.processed
127
128
129
        self.last_processing = source.last_processing
        self.nomad_version = source.nomad_version
        self.nomad_commit = source.nomad_commit
130

131
        self.mainfile = source.mainfile
132
133
134
135
136
137
138
        if source.files is None:
            self.files = [self.mainfile]
        elif self.mainfile not in source.files:
            self.files = [self.mainfile] + source.files
        else:
            self.files = source.files

Markus Scheidgen's avatar
Markus Scheidgen committed
139
        self.with_embargo = bool(source.with_embargo)
140
        self.published = source.published
Markus Scheidgen's avatar
Markus Scheidgen committed
141
142
143
144
145
146
147
148
149
150

        uploader = datamodel.User.get(user_id=source.uploader) if source.uploader is not None else None
        authors = [datamodel.User.get(user_id) for user_id in source.coauthors]
        owners = [datamodel.User.get(user_id) for user_id in source.shared_with]
        if uploader is not None:
            authors.append(uploader)
            owners.append(uploader)
        authors.sort(key=lambda user: user.last_name + ' ' + user.first_name)
        owners.sort(key=lambda user: user.last_name + ' ' + user.first_name)

Markus Scheidgen's avatar
Markus Scheidgen committed
151
        self.uploader = User.from_user(uploader) if uploader is not None else None
Markus Scheidgen's avatar
Markus Scheidgen committed
152
153
        self.authors = [User.from_user(user) for user in authors]
        self.owners = [User.from_user(user) for user in owners]
154

155
        self.comment = source.comment
156
        self.references = source.references
157
        self.datasets = [Dataset.from_dataset_id(dataset_id) for dataset_id in source.datasets]
158
        self.external_id = source.external_id
159

160
        for quantity in datamodel.Domain.instance.domain_quantities.values():
161
162
            quantity_value = quantity.elastic_value(getattr(source, quantity.metadata_field))
            setattr(self, quantity.name, quantity_value)
163
164


165
166
167
168
169
170
def delete_upload(upload_id):
    """ Delete all entries with given ``upload_id`` from the index. """
    index = Entry._default_index()
    Search(index=index).query('match', upload_id=upload_id).delete()


171
172
173
174
175
176
def delete_entry(calc_id):
    """ Delete the entry with the given ``calc_id`` from the index. """
    index = Entry._default_index()
    Search(index=index).query('match', calc_id=calc_id).delete()


177
178
179
180
181
182
def publish(calcs: Iterable[datamodel.CalcWithMetadata]) -> None:
    """ Update all given calcs with their metadata and set ``publish = True``. """
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry.published = True
183
184
185
186
187
            entry = entry.to_dict(include_meta=True)
            source = entry.pop('_source')
            entry['doc'] = source
            entry['_op_type'] = 'update'
            yield entry
188
189

    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
190
191
192
    refresh()


Markus Scheidgen's avatar
Markus Scheidgen committed
193
def index_all(calcs: Iterable[datamodel.CalcWithMetadata], do_refresh=True) -> None:
194
195
196
197
198
199
    """
    Adds all given calcs with their metadata to the index.

    Returns:
        Number of failed entries.
    """
200
201
202
203
204
205
206
    def elastic_updates():
        for calc in calcs:
            entry = Entry.from_calc_with_metadata(calc)
            entry = entry.to_dict(include_meta=True)
            entry['_op_type'] = 'index'
            yield entry

207
    _, failed = elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates(), stats_only=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
208

Markus Scheidgen's avatar
Markus Scheidgen committed
209
    if do_refresh:
Markus Scheidgen's avatar
Markus Scheidgen committed
210
211
        refresh()

212
    return failed
213
214


215
216
def refresh():
    infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
217
218


219
quantities = datamodel.Domain.instance.quantities
220
"""The available search quantities """
221

222
metrics = datamodel.Domain.instance.metrics
223
224
"""
The available search metrics. Metrics are integer values given for each entry that can
225
be used in statistics (aggregations), e.g. the sum of all total energy calculations or cardinality of
226
227
228
all unique geometries.
"""

229
230
metrics_names = datamodel.Domain.instance.metrics_names
""" Names of all available metrics """
231

Markus Scheidgen's avatar
Markus Scheidgen committed
232
233
234
groups = datamodel.Domain.instance.groups
"""The available groupable quantities"""

235
order_default_quantity = None
236
for quantity in datamodel.Domain.instance.quantities.values():
237
238
    if quantity.order_default:
        order_default_quantity = quantity.name
Markus Scheidgen's avatar
Markus Scheidgen committed
239

240

241
class SearchRequest:
242
    '''
243
244
    Represents a search request and allows to execute that request.
    It allows to compose the following features: a query;
245
246
247
    statistics (metrics and aggregations); quantity values; scrolling, pagination for entries;
    scrolling for quantity values.

248
249
250
    The query part filters NOMAD data before the other features come into effect. There
    are specialized methods for configuring the :func:`owner` and :func:`time_range` queries.
    Quantity's can be search for by setting them as attributes.
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270

    The aggregations for statistics can be requested for pre-configured quantities. These
    bucket aggregations come with a metric calculated for each each possible
    quantity value.

    The other possible form of aggregations, allows to get quantity values as results
    (e.g. get all datasets, get all users, etc.). Each value can be accompanied by metrics
    (over all entries with that value) and an example value.

    Of course, searches can return a set of search results. Search objects can be
    configured with pagination or scrolling for these results. Pagination is the default
    and also allows ordering of results. Scrolling can be used if all entries need to be
    'scrolled through'. This might be necessary, since elastic search has limits on
    possible pages (e.g. 'from' must by smaller than 10000). On the downside, there is no
    ordering on scrolling.

    There is also scrolling for quantities to go through all quantity values. There is no
    paging for aggregations.
    '''
    def __init__(self, query=None):
271
272
        self._query = query
        self._search = Search(index=config.elastic.index_name)
273
274
275
276
277
278

    def owner(self, owner_type: str = 'all', user_id: str = None):
        """
        Uses the query part of the search to restrict the results based on the owner.
        The possible types are: ``all`` for all calculations; ``public`` for
        caclulations visible by everyone, excluding entries only visible to the given user;
279
        ``user`` for all calculations of to the given user; ``staging`` for all
280
281
282
283
284
        calculations in staging of the given user.

        Arguments:
            owner_type: The type of the owner query, see above.
            user_id: The 'owner' given as the user's unique id.
285
286
287
288
289

        Raises:
            KeyError: If the given owner_type is not supported
            ValueError: If the owner_type requires a user but none is given, or the
                given user is not allowed to use the given owner_type.
290
        """
291
292
293
294
295
296
        if owner_type == 'all':
            q = Q('term', published=True) & Q('term', with_embargo=False)
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
        elif owner_type == 'public':
            q = Q('term', published=True) & Q('term', with_embargo=False)
297
298
299
300
301
        elif owner_type == 'shared':
            if user_id is None:
                raise ValueError('Authentication required for owner value shared.')

            q = Q('term', owners__user_id=user_id)
302
303
304
305
        elif owner_type == 'user':
            if user_id is None:
                raise ValueError('Authentication required for owner value user.')

306
            q = Q('term', uploader__user_id=user_id)
307
308
309
310
311
        elif owner_type == 'staging':
            if user_id is None:
                raise ValueError('Authentication required for owner value user')
            q = Q('term', published=False) & Q('term', owners__user_id=user_id)
        elif owner_type == 'admin':
Markus Scheidgen's avatar
Markus Scheidgen committed
312
            if user_id is None or not datamodel.User.get(user_id=user_id).is_admin:
313
314
315
316
                raise ValueError('This can only be used by the admin user.')
            q = None
        else:
            raise KeyError('Unsupported owner value')
317

318
        if q is not None:
319
            self.q = self.q & q
320

321
        return self
322

323
324
325
326
    def search_parameters(self, **kwargs):
        """
        Configures the existing query with additional search parameters. Kwargs are
        interpreted as key value pairs. Keys have to coresspond to valid entry quantities
327
328
        in the domain's (DFT calculations) datamodel. Alternatively search parameters
        can be set via attributes.
329
        """
330
331
        for name, value in kwargs.items():
            self.search_parameter(name, value)
Markus Scheidgen's avatar
Markus Scheidgen committed
332

333
        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
334

335
    def search_parameter(self, name, value):
336
        quantity = quantities.get(name, None)
337
        if quantity is None:
338
            raise KeyError('Unknown quantity %s' % name)
339

340
341
342
343
344
        if quantity.multi and not isinstance(value, list):
            value = [value]

        value = quantity.elastic_value(value)

345
346
347
348
349
350
351
        if quantity.elastic_search_type == 'terms':
            if not isinstance(value, list):
                value = [value]
            self.q &= Q('terms', **{quantity.elastic_field: value})

            return self

352
        if isinstance(value, list):
353
            values = value
354
        else:
355
            values = [value]
356
357

        for item in values:
358
            self.q &= Q(quantity.elastic_search_type, **{quantity.elastic_field: item})
359

360
        return self
361

362
363
364
    def query(self, query):
        """ Adds the given query as a 'and' (i.e. 'must') clause to the request. """
        self._query &= query
365

366
        return self
367

368
369
    def time_range(self, start: datetime, end: datetime):
        """ Adds a time range to the query. """
370
371
        if start is None and end is None:
            return self
372

373
374
375
376
        if start is None:
            start = datetime.fromtimestamp(0)
        if end is None:
            end = datetime.utcnow()
377

378
        self.q &= Q('range', upload_time=dict(gte=start, lte=end))
379

380
        return self
381

382
383
384
385
386
    @property
    def q(self):
        """ The underlying elasticsearch_dsl query object """
        if self._query is None:
            return Q('match_all')
387
388
        else:
            return self._query
389

390
    @q.setter
391
    def q(self, q):
392
        self._query = q
393

394
395
396
    def totals(self, metrics_to_use: List[str] = []):
        """
        Configure the request to return overall totals for the given metrics.
397

398
399
400
401
402
403
        The statics are returned with the other quantity statistics under the pseudo
        quantity name 'total'. 'total' contains the pseudo value 'all'. It is used to
        store the metrics aggregated over all entries in the search results.
        """
        self._add_metrics(self._search.aggs, metrics_to_use)
        return self
404

405
406
407
408
409
410
411
412
413
    def default_statistics(self, metrics_to_use: List[str] = []):
        """
        Configures the domain's default statistics.
        """
        for name in datamodel.Domain.instance.default_statistics:
            self.statistic(
                name,
                quantities[name].aggregations,
                metrics_to_use=metrics_to_use)
414

415
        return self
416

417
    def statistic(self, quantity_name: str, size: int, metrics_to_use: List[str] = []):
418
419
420
421
422
423
424
425
426
427
428
429
430
431
        """
        This can be used to display statistics over the searched entries and allows to
        implement faceted search on the top values for each quantity.

        The metrics contain overall and per quantity value sums of code runs (calcs),
        unique code runs, datasets, and additional domain specific metrics
        (e.g. total energies, and unique geometries for DFTcalculations). The quantities
        that can be aggregated to metrics are defined in module:`datamodel`. Aggregations
        and respective metrics are calculated for aggregations given in ``aggregations``
        and metrics in ``aggregation_metrics``. As a pseudo aggregation ``total_metrics``
        are calculation over all search results. The ``aggregations`` gives tuples of
        quantities and default aggregation sizes.

        The search results will contain a dictionary ``statistics``. This has a key
432
        for each configured quantity. Each quantity key will hold a dict
433
434
        with a key for each quantity value. Each quantity value key will hold a dict
        with a key for each metric. The values will be the actual aggregated metric values.
435

436
        Arguments:
437
            quantity_name: The quantity to aggregate statistics for. Only works on *keyword* field.
438
439
440
441
            metrics_to_use: The metrics calculated over the aggregations. Can be
                ``unique_code_runs``, ``datasets``, other domain specific metrics.
                The basic doc_count metric ``code_runs`` is always given.
        """
442
        quantity = quantities[quantity_name]
443
        terms = A('terms', field=quantity.elastic_field, size=size, order=dict(_key='asc'))
444

445
446
        buckets = self._search.aggs.bucket('statistics:%s' % quantity_name, terms)
        self._add_metrics(buckets, metrics_to_use)
447

448
        return self
449

450
451
452
    def _add_metrics(self, parent=None, metrics_to_use: List[str] = []):
        if parent is None:
            parent = self._search.aggs
453

454
        for metric in metrics_to_use:
455
456
            field, metric_kind = metrics[metric]
            parent.metric('metric:%s' % metric, A(metric_kind, field=field))
457

458
459
460
461
462
463
    def date_histogram(self, metrics_to_use: List[str] = []):
        """
        Adds a date histogram on the given metrics to the statistics part.
        """
        histogram = A('date_histogram', field='upload_time', interval='1M', format='yyyy-MM-dd')
        self._add_metrics(self._search.aggs.bucket('statistics:date_histogram', histogram), metrics_to_use)
464

465
        return self
466

467
468
    def quantities(self, **kwargs):
        """
469
470
471
472
473
474
        Shorthand for adding multiple quantities. See :func:`quantity`. Keywork argument
        keys are quantity name, values are tuples of size and after value.
        """
        for name, spec in kwargs:
            size, after = spec
            self.quantity(name, after=after, size=size)
475

476
        return self
477

478
479
480
    def quantity(
            self, name, size=100, after=None, examples=0, examples_source=None,
            order_by: str = None, order: str = 'desc'):
481
482
        """
        Adds a requests for values of the given quantity.
483
484
485
        It allows to scroll through all values via elasticsearch's
        composite aggregations. The response will contain the quantity values and
        an example entry for each value.
486

487
488
        This can be used to implement continues scrolling through authors, datasets,
        or uploads within the searched entries.
489

490
491
        If one or more quantities are specified,
        the search results will contain a dictionary ``quantities``. The keys are quantity
492
493
        name the values dictionary with 'after' and 'values' key.
        The 'values' key holds a dict with all the values as keys and their entry count
494
495
        as values (i.e. number of entries with that value).

496
        Arguments:
497
            name: The quantity name. Must be in :data:`quantities`.
498
499
500
501
502
503
            after: The 'after' value allows to scroll over various requests, by providing
                the 'after' value of the last search. The 'after' value is part of the
                response. Use ``None`` in the first request.
            size:
                The size gives the ammount of maximum values in the next scroll window.
                If the size is None, a maximum of 100 quantity values will be requested.
504
505
506
507
508
509
510
            examples:
                Number of results to return that has each value
            order_by:
                A sortable quantity that should be used to order. The max of each
                value bucket is used.
            order:
                "desc" or "asc"
511
        """
512
513
514
        if size is None:
            size = 100

515
        quantity = quantities[name]
516
        terms = A('terms', field=quantity.elastic_field)
517

518
519
520
        # We are using elastic searchs 'composite aggregations' here. We do not really
        # compose aggregations, but only those pseudo composites allow us to use the
        # 'after' feature that allows to scan through all aggregation values.
521
522
523
        if order_by is None:
            composite = dict(sources={name: terms}, size=size)
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
524
            sort_terms = A('terms', field=order_by, order=order)
525
            composite = dict(sources=[{order_by: sort_terms}, {name: terms}], size=size)
526
        if after is not None:
527
528
529
530
531
532
            if order_by is None:
                composite['after'] = {name: after}
            else:
                composite['after'] = {order_by: after, name: ''}

        composite_agg = self._search.aggs.bucket('quantity:%s' % name, 'composite', **composite)
533

534
        if examples > 0:
535
            kwargs: Dict[str, Any] = {}
536
537
            if examples_source is not None:
                kwargs.update(_source=dict(includes=examples_source))
538

539
            composite_agg.metric('examples', A('top_hits', size=examples, **kwargs))
540

541
        return self
542

543
    def exclude(self, *args):
544
        """ Exclude certain elastic fields from the search results. """
545
546
547
        self._search = self._search.source(excludes=args)
        return self

548
549
550
551
552
    def include(self, *args):
        """ Include only the given fields in the search results. """
        self._search = self._search.source(includes=args)
        return self

553
554
555
556
557
    def execute(self):
        """
        Exectutes without returning actual results. Only makes sense if the request
        was configured for statistics or quantity values.
        """
558
        return self._response(self._search.query(self.q)[0:0].execute())
559

560
    def execute_scan(self, order_by: str = None, order: int = -1, **kwargs):
561
562
563
564
        """
        This execute the search as scan. The result will be a generator over the found
        entries. Everything but the query part of this object, will be ignored.
        """
565
566
567
568
569
570
571
572
573
574
575
576
577
        search = self._search.query(self.q)

        if order_by is not None:
            if order_by not in quantities:
                raise KeyError('Unknown order quantity %s' % order_by)

            order_by_quantity = quantities[order_by]

            if order == 1:
                search = search.sort(order_by_quantity.elastic_field)
            else:
                search = search.sort('-%s' % order_by_quantity.elastic_field)

578
579
            search = search.params(preserve_order=True)

580
        for hit in search.params(**kwargs).scan():
581
            yield hit.to_dict()
582

583
    def execute_paginated(
584
585
586
587
588
589
590
591
592
593
594
            self, page: int = 1, per_page=10, order_by: str = order_default_quantity,
            order: int = -1):
        """
        Executes the search and returns paginated results. Those are sorted.

        Arguments:
            page: The requested page, starts with 1.
            per_page: The number of entries per page.
            order_by: The quantity to order by.
            order: -1 or 1 for descending or ascending order.
        """
595
        search = self._search.query(self.q)
596

597
        if order_by not in quantities:
598
599
            raise KeyError('Unknown order quantity %s' % order_by)

600
        order_by_quantity = quantities[order_by]
601
602
603
604
605
606

        if order == 1:
            search = search.sort(order_by_quantity.elastic_field)
        else:
            search = search.sort('-%s' % order_by_quantity.elastic_field)
        search = search[(page - 1) * per_page: page * per_page]
607

608
        result = self._response(search.execute(), with_hits=True)
609
        result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
610
611
        return result

612
    def execute_scrolled(self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m'):
613
        """
614
615
        Executes a scrolling search. based on ES scroll API. Pagination is replaced with
        scrolling, no ordering is available, no statistics, no quantities will be provided.
616

617
618
619
620
        Scrolling is done by calling this function again and again with the same ``scroll_id``.
        Each time, this function will return the next batch of search results. If the
        ``scroll_id`` is not available anymore, a new ``scroll_id`` is assigned and scrolling
        starts from the beginning again.
621

622
623
        The response will contain a 'scroll' part with attributes 'total', 'scroll_id',
        and 'size'.
624

625
626
627
628
629
630
        Arguments:
            scroll_id: The scroll id to receive the next batch from. None will create a new
                scroll.
            size: The batch size in number of hits.
            scroll: The time the scroll should be kept alive (i.e. the time between requests
                to this method) in ES time units. Default is 5 minutes.
631
        """
632
        es = infrastructure.elastic_client
633

634
635
636
        if scroll_id is None:
            # initiate scroll
            resp = es.search(  # pylint: disable=E1123
637
                body=self._search.query(self.q).to_dict(), scroll=scroll, size=size,
638
                index=config.elastic.index_name)
639

640
641
642
643
            scroll_id = resp.get('_scroll_id')
            if scroll_id is None:
                # no results for search query
                return dict(scroll=dict(total=0, size=size), results=[])
644

645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
        else:
            try:
                resp = es.scroll(scroll_id, scroll=scroll)  # pylint: disable=E1123
            except NotFoundError:
                raise ScrollIdNotFound()

        total = resp['hits']['total']
        results = list(hit['_source'] for hit in resp['hits']['hits'])

        # since we are using the low level api here, we should check errors
        if resp["_shards"]["successful"] < resp["_shards"]["total"]:
            utils.get_logger(__name__).error('es operation was unsuccessful on at least one shard')
            raise ElasticSearchError('es operation was unsuccessful on at least one shard')

        if len(results) == 0:
            es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))  # pylint: disable=E1123
            scroll_id = None

        scroll_info = dict(total=total, size=size)
        if scroll_id is not None:
            scroll_info.update(scroll_id=scroll_id)

        return dict(scroll=scroll_info, results=results)

669
670
    def _response(self, response, with_hits: bool = False) -> Dict[str, Any]:
        """
671
        Prepares a response object covering the total number of results, hits, statistics,
672
673
674
        and quantities. Other aspects like pagination and scrolling have to be added
        elsewhere.
        """
675
        result: Dict[str, Any] = dict()
676
        aggs = response.aggregations.to_dict()
677

678
        # total
679
680
681
682
        total = response.hits.total if hasattr(response, 'hits') else 0
        result.update(total=total)

        # hits
683
684
        if len(response.hits) > 0 or with_hits:
            result.update(results=[hit.to_dict() for hit in response.hits])
685
686
687

        # statistics
        def get_metrics(bucket, code_runs):
688
689
690
691
692
            result = {}
            for metric in metrics_names:
                agg_name = 'metric:%s' % metric
                if agg_name in bucket:
                    result[metric] = bucket[agg_name]['value']
693
            result.update(code_runs=code_runs)
694
695
            return result

696
        statistics_results = {
697
698
699
            quantity_name[11:]: {
                bucket['key']: get_metrics(bucket, bucket['doc_count'])
                for bucket in quantity['buckets']
700
            }
701
            for quantity_name, quantity in aggs.items()
702
703
            if quantity_name.startswith('statistics:')
        }
704

705
706
707
        # totals
        totals_result = get_metrics(aggs, total)
        statistics_results['total'] = dict(all=totals_result)
Markus Scheidgen's avatar
Markus Scheidgen committed
708

709
710
        if len(statistics_results) > 0:
            result.update(statistics=statistics_results)
Markus Scheidgen's avatar
Markus Scheidgen committed
711

712
        # quantities
713
        def create_quantity_result(quantity_name, quantity):
714
715
716
717
718
719
720
            values = {}
            for bucket in quantity['buckets']:
                value = dict(
                    total=bucket['doc_count'])
                if 'examples' in bucket:
                    examples = [hit['_source'] for hit in bucket['examples']['hits']['hits']]
                    value.update(examples=examples)
Markus Scheidgen's avatar
Markus Scheidgen committed
721

722
                values[bucket['key'][quantity_name]] = value
723

724
            result = dict(values=values)
725
            if 'after_key' in quantity:
726
727
728
729
730
731
732
733
                after = quantity['after_key']
                if len(after) == 1:
                    result.update(after=after[quantity_name])
                else:
                    for key in after:
                        if key != quantity_name:
                            result.update(after=after[key])
                            break
734

735
            return result
736

737
        quantity_results = {
738
739
740
            quantity_name[9:]: create_quantity_result(quantity_name[9:], quantity)
            for quantity_name, quantity in aggs.items()
            if quantity_name.startswith('quantity:')
Markus Scheidgen's avatar
Markus Scheidgen committed
741
742
        }

743
744
        if len(quantity_results) > 0:
            result.update(quantities=quantity_results)
745
746

        return result
747

748
749
    def __str__(self):
        return json.dumps(self._search.to_dict(), indent=2)
750
751


752
753
754
755
756
757
def to_calc_with_metadata(results: List[Dict[str, Any]]):
    """ Translates search results into :class:`CalcWithMetadata` objects read from mongo. """
    ids = [result['calc_id'] for result in results]
    return [
        datamodel.CalcWithMetadata(**calc.metadata)
        for calc in proc.Calc.objects(calc_id__in=ids)]