search.py 29.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2018 Markus Scheidgen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an"AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
'''
16
This module represents calculations in elastic search.
17
'''
18

19
20
from typing import Iterable, Dict, List, Any
from elasticsearch_dsl import Search, Q, A, analyzer, tokenizer
21
import elasticsearch.helpers
Markus Scheidgen's avatar
Markus Scheidgen committed
22
from elasticsearch.exceptions import NotFoundError
Markus Scheidgen's avatar
Markus Scheidgen committed
23
from datetime import datetime
24
import json
25

26
from nomad import config, datamodel, infrastructure, datamodel, utils
27
from nomad.metainfo.search_extension import search_quantities, metrics, order_default_quantities
28

29

30
31
32
33
path_analyzer = analyzer(
    'path_analyzer',
    tokenizer=tokenizer('path_tokenizer', 'pattern', pattern='/'))

34
35

class AlreadyExists(Exception): pass
36
37


38
39
40
class ElasticSearchError(Exception): pass


Markus Scheidgen's avatar
Markus Scheidgen committed
41
42
43
class ScrollIdNotFound(Exception): pass


44
entry_document = datamodel.EntryMetadata.m_def.a_elastic.document
45
46
47

for domain in datamodel.domains:
    order_default_quantities.setdefault(domain, order_default_quantities.get('__all__'))
48
49


50
def delete_upload(upload_id):
51
    ''' Delete all entries with given ``upload_id`` from the index. '''
52
    index = entry_document._default_index()
53
54
55
    Search(index=index).query('match', upload_id=upload_id).delete()


56
def delete_entry(calc_id):
57
    ''' Delete the entry with the given ``calc_id`` from the index. '''
58
    index = entry_document._default_index()
59
60
61
    Search(index=index).query('match', calc_id=calc_id).delete()


62
63
def publish(calcs: Iterable[datamodel.EntryMetadata]) -> None:
    ''' Update all given calcs with their metadata and set ``publish = True``. '''
64
65
    def elastic_updates():
        for calc in calcs:
66
            entry = calc.a_elastic.create_index_entry()
67
            entry.published = True
68
69
70
71
72
            entry = entry.to_dict(include_meta=True)
            source = entry.pop('_source')
            entry['doc'] = source
            entry['_op_type'] = 'update'
            yield entry
73
74

    elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates())
75
76
77
    refresh()


78
79
def index_all(calcs: Iterable[datamodel.EntryMetadata], do_refresh=True) -> None:
    '''
80
81
82
83
    Adds all given calcs with their metadata to the index.

    Returns:
        Number of failed entries.
84
    '''
85
86
    def elastic_updates():
        for calc in calcs:
87
            entry = calc.a_elastic.create_index_entry()
88
89
90
91
            entry = entry.to_dict(include_meta=True)
            entry['_op_type'] = 'index'
            yield entry

92
    _, failed = elasticsearch.helpers.bulk(infrastructure.elastic_client, elastic_updates(), stats_only=True)
Markus Scheidgen's avatar
Markus Scheidgen committed
93

Markus Scheidgen's avatar
Markus Scheidgen committed
94
    if do_refresh:
Markus Scheidgen's avatar
Markus Scheidgen committed
95
96
        refresh()

97
    return failed
98
99


100
101
def refresh():
    infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
102
103


104
class SearchRequest:
105
    '''
106
107
    Represents a search request and allows to execute that request.
    It allows to compose the following features: a query;
108
109
110
    statistics (metrics and aggregations); quantity values; scrolling, pagination for entries;
    scrolling for quantity values.

111
112
113
    The query part filters NOMAD data before the other features come into effect. There
    are specialized methods for configuring the :func:`owner` and :func:`time_range` queries.
    Quantity's can be search for by setting them as attributes.
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

    The aggregations for statistics can be requested for pre-configured quantities. These
    bucket aggregations come with a metric calculated for each each possible
    quantity value.

    The other possible form of aggregations, allows to get quantity values as results
    (e.g. get all datasets, get all users, etc.). Each value can be accompanied by metrics
    (over all entries with that value) and an example value.

    Of course, searches can return a set of search results. Search objects can be
    configured with pagination or scrolling for these results. Pagination is the default
    and also allows ordering of results. Scrolling can be used if all entries need to be
    'scrolled through'. This might be necessary, since elastic search has limits on
    possible pages (e.g. 'from' must by smaller than 10000). On the downside, there is no
    ordering on scrolling.

    There is also scrolling for quantities to go through all quantity values. There is no
    paging for aggregations.
    '''
133
    def __init__(self, domain: str = config.meta.default_domain, query=None):
134
        self._domain = domain
135
136
        self._query = query
        self._search = Search(index=config.elastic.index_name)
137

138
    def domain(self, domain: str = None):
139
        '''
140
141
        Applies the domain of this request to the query. Allows to optionally update
        the domain of this request.
142
        '''
143
144
145
146
147
148
        if domain is not None:
            self._domain = domain

        self.q = self.q & Q('term', domain=self._domain)
        return self

149
    def owner(self, owner_type: str = 'all', user_id: str = None):
150
        '''
151
152
        Uses the query part of the search to restrict the results based on the owner.
        The possible types are: ``all`` for all calculations; ``public`` for
153
154
155
156
        calculations visible by everyone, excluding embargo-ed entries and entries only visible
        to the given user; ``visible`` all data that is visible by the user, excluding
        embargo-ed entries from other users; ``user`` for all calculations of to the given
        user; ``staging`` for all calculations in staging of the given user.
157
158
159
160

        Arguments:
            owner_type: The type of the owner query, see above.
            user_id: The 'owner' given as the user's unique id.
161
162
163
164
165

        Raises:
            KeyError: If the given owner_type is not supported
            ValueError: If the owner_type requires a user but none is given, or the
                given user is not allowed to use the given owner_type.
166
        '''
167
        if owner_type == 'all':
168
            q = Q('term', published=True)
169
170
171
172
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
        elif owner_type == 'public':
            q = Q('term', published=True) & Q('term', with_embargo=False)
173
174
175
176
        elif owner_type == 'visible':
            q = Q('term', published=True) & Q('term', with_embargo=False)
            if user_id is not None:
                q = q | Q('term', owners__user_id=user_id)
177
178
179
180
181
        elif owner_type == 'shared':
            if user_id is None:
                raise ValueError('Authentication required for owner value shared.')

            q = Q('term', owners__user_id=user_id)
182
183
184
185
        elif owner_type == 'user':
            if user_id is None:
                raise ValueError('Authentication required for owner value user.')

186
            q = Q('term', uploader__user_id=user_id)
187
188
189
190
191
        elif owner_type == 'staging':
            if user_id is None:
                raise ValueError('Authentication required for owner value user')
            q = Q('term', published=False) & Q('term', owners__user_id=user_id)
        elif owner_type == 'admin':
Markus Scheidgen's avatar
Markus Scheidgen committed
192
            if user_id is None or not datamodel.User.get(user_id=user_id).is_admin:
193
194
195
196
                raise ValueError('This can only be used by the admin user.')
            q = None
        else:
            raise KeyError('Unsupported owner value')
197

198
        if q is not None:
199
            self.q = self.q & q
200

201
        return self
202

203
    def search_parameters(self, **kwargs):
204
        '''
205
206
        Configures the existing query with additional search parameters. Kwargs are
        interpreted as key value pairs. Keys have to coresspond to valid entry quantities
207
208
        in the domain's (DFT calculations) datamodel. Alternatively search parameters
        can be set via attributes.
209
        '''
210
211
        for name, value in kwargs.items():
            self.search_parameter(name, value)
Markus Scheidgen's avatar
Markus Scheidgen committed
212

213
        return self
Markus Scheidgen's avatar
Markus Scheidgen committed
214

215
    def search_parameter(self, name, value):
216
        quantity = search_quantities[name]
217

218
        if quantity.many and not isinstance(value, list):
219
220
            value = [value]

221
        if quantity.many_or and isinstance(value, List):
222
            self.q &= Q('terms', **{quantity.search_field: value})
223
            return self
224

225
226
        if quantity.derived:
            if quantity.many and not isinstance(value, list):
227
                value = [value]
228
            value = quantity.derived(value)
229

230
        if isinstance(value, list):
231
            values = value
232
        else:
233
            values = [value]
234
235

        for item in values:
236
            self.q &= Q('match', **{quantity.search_field: item})
237

238
        return self
239

240
    def query(self, query):
241
        ''' Adds the given query as a 'and' (i.e. 'must') clause to the request. '''
242
        self._query &= query
243

244
        return self
245

246
    def time_range(self, start: datetime, end: datetime):
247
        ''' Adds a time range to the query. '''
248
249
        if start is None and end is None:
            return self
250

251
252
253
254
        if start is None:
            start = datetime.fromtimestamp(0)
        if end is None:
            end = datetime.utcnow()
255

256
        self.q &= Q('range', upload_time=dict(gte=start, lte=end))
257

258
        return self
259

260
261
    @property
    def q(self):
262
        ''' The underlying elasticsearch_dsl query object '''
263
264
        if self._query is None:
            return Q('match_all')
265
266
        else:
            return self._query
267

268
    @q.setter
269
    def q(self, q):
270
        self._query = q
271

272
    def totals(self, metrics_to_use: List[str] = []):
273
        '''
274
        Configure the request to return overall totals for the given metrics.
275

276
277
278
        The statics are returned with the other quantity statistics under the pseudo
        quantity name 'total'. 'total' contains the pseudo value 'all'. It is used to
        store the metrics aggregated over all entries in the search results.
279
        '''
280
281
        self._add_metrics(self._search.aggs, metrics_to_use)
        return self
282

283
    def statistics(self, statistics: List[str], metrics_to_use: List[str] = []):
284
        '''
285
        Configures the domain's default statistics.
286
        '''
287
288
        for statistic in statistics:
            search_quantity = search_quantities[statistic]
289
            statistic_order = search_quantity.statistic_order
290
            self.statistic(
291
292
                search_quantity.qualified_name,
                search_quantity.statistic_size,
293
                metrics_to_use=metrics_to_use,
294
                order={statistic_order: 'asc' if statistic_order == '_key' else 'desc'})
295

296
        return self
297

298
299
    def statistic(
            self, quantity_name: str, size: int, metrics_to_use: List[str] = [],
300
            order: Dict[str, str] = dict(_key='asc'), include: str = None):
301
        '''
302
303
304
305
306
307
308
309
310
311
312
313
314
        This can be used to display statistics over the searched entries and allows to
        implement faceted search on the top values for each quantity.

        The metrics contain overall and per quantity value sums of code runs (calcs),
        unique code runs, datasets, and additional domain specific metrics
        (e.g. total energies, and unique geometries for DFTcalculations). The quantities
        that can be aggregated to metrics are defined in module:`datamodel`. Aggregations
        and respective metrics are calculated for aggregations given in ``aggregations``
        and metrics in ``aggregation_metrics``. As a pseudo aggregation ``total_metrics``
        are calculation over all search results. The ``aggregations`` gives tuples of
        quantities and default aggregation sizes.

        The search results will contain a dictionary ``statistics``. This has a key
315
        for each configured quantity. Each quantity key will hold a dict
316
317
        with a key for each quantity value. Each quantity value key will hold a dict
        with a key for each metric. The values will be the actual aggregated metric values.
318

319
        Arguments:
320
            quantity_name: The quantity to aggregate statistics for. Only works on *keyword* field.
321
322
323
            metrics_to_use: The metrics calculated over the aggregations. Can be
                ``unique_code_runs``, ``datasets``, other domain specific metrics.
                The basic doc_count metric ``code_runs`` is always given.
324
            order: The order dictionary is passed to the elastic search aggregation.
325
326
327
            include:
                Uses an regular expression in ES to only return values that include
                the given substring.
328
329
        '''
        quantity = search_quantities[quantity_name]
330
331
332
333
        terms_kwargs = {}
        if include is not None:
            terms_kwargs['include'] = '.*%s.*' % include
        terms = A('terms', field=quantity.search_field, size=size, order=order, **terms_kwargs)
334

335
336
        buckets = self._search.aggs.bucket('statistics:%s' % quantity_name, terms)
        self._add_metrics(buckets, metrics_to_use)
337

338
        return self
339

340
341
342
    def _add_metrics(self, parent=None, metrics_to_use: List[str] = []):
        if parent is None:
            parent = self._search.aggs
343

344
        for metric in metrics_to_use:
345
            metric_quantity = metrics[metric]
346
            field = metric_quantity.search_field
347
348
349
            parent.metric(
                'metric:%s' % metric_quantity.metric_name,
                A(metric_quantity.metric, field=field))
350

351
    def date_histogram(self, metrics_to_use: List[str] = [], interval: str = '1M'):
352
        '''
353
        Adds a date histogram on the given metrics to the statistics part.
354
        '''
355
        histogram = A('date_histogram', field='upload_time', interval=interval, format='yyyy-MM-dd')
356
        self._add_metrics(self._search.aggs.bucket('statistics:date_histogram', histogram), metrics_to_use)
357

358
        return self
359

360
    def quantities(self, **kwargs):
361
        '''
362
363
        Shorthand for adding multiple quantities. See :func:`quantity`. Keywork argument
        keys are quantity name, values are tuples of size and after value.
364
        '''
365
366
367
        for name, spec in kwargs:
            size, after = spec
            self.quantity(name, after=after, size=size)
368

369
        return self
370

371
372
373
    def quantity(
            self, name, size=100, after=None, examples=0, examples_source=None,
            order_by: str = None, order: str = 'desc'):
374
        '''
375
        Adds a requests for values of the given quantity.
376
377
378
        It allows to scroll through all values via elasticsearch's
        composite aggregations. The response will contain the quantity values and
        an example entry for each value.
379

380
381
        This can be used to implement continues scrolling through authors, datasets,
        or uploads within the searched entries.
382

383
384
        If one or more quantities are specified,
        the search results will contain a dictionary ``quantities``. The keys are quantity
385
386
        name the values dictionary with 'after' and 'values' key.
        The 'values' key holds a dict with all the values as keys and their entry count
387
388
        as values (i.e. number of entries with that value).

389
        Arguments:
390
            name: The quantity name. Must be in :data:`quantities`.
391
392
393
394
395
396
            after: The 'after' value allows to scroll over various requests, by providing
                the 'after' value of the last search. The 'after' value is part of the
                response. Use ``None`` in the first request.
            size:
                The size gives the ammount of maximum values in the next scroll window.
                If the size is None, a maximum of 100 quantity values will be requested.
397
398
399
400
401
402
403
            examples:
                Number of results to return that has each value
            order_by:
                A sortable quantity that should be used to order. The max of each
                value bucket is used.
            order:
                "desc" or "asc"
404
        '''
405
406
407
        if size is None:
            size = 100

408
        quantity = search_quantities[name]
409
        terms = A('terms', field=quantity.search_field)
410

411
412
413
        # We are using elastic searchs 'composite aggregations' here. We do not really
        # compose aggregations, but only those pseudo composites allow us to use the
        # 'after' feature that allows to scan through all aggregation values.
414
415
416
        if order_by is None:
            composite = dict(sources={name: terms}, size=size)
        else:
Markus Scheidgen's avatar
Markus Scheidgen committed
417
            sort_terms = A('terms', field=order_by, order=order)
418
            composite = dict(sources=[{order_by: sort_terms}, {name: terms}], size=size)
419
        if after is not None:
420
421
422
423
424
425
            if order_by is None:
                composite['after'] = {name: after}
            else:
                composite['after'] = {order_by: after, name: ''}

        composite_agg = self._search.aggs.bucket('quantity:%s' % name, 'composite', **composite)
426

427
        if examples > 0:
428
            kwargs: Dict[str, Any] = {}
429
430
            if examples_source is not None:
                kwargs.update(_source=dict(includes=examples_source))
431

432
            composite_agg.metric('examples', A('top_hits', size=examples, **kwargs))
433

434
        return self
435

436
437
438
439
440
441
442
443
    def global_statistics(self):
        '''
        Adds general statistics to the request. The results will have a key called
        global_statistics.
        '''
        self._search.aggs.metric(
            'global_statistics:n_entries', A('value_count', field='calc_id'))
        self._search.aggs.metric(
Markus Scheidgen's avatar
Markus Scheidgen committed
444
            'global_statistics:n_uploads', A('cardinality', field='upload_id'))
445
446
447
448
449
450
451
        self._search.aggs.metric(
            'global_statistics:n_calculations', A('sum', field='dft.n_calculations'))
        self._search.aggs.metric(
            'global_statistics:n_quantities', A('sum', field='dft.n_quantities'))

        return self

452
    def exclude(self, *args):
453
        ''' Exclude certain elastic fields from the search results. '''
454
455
456
        self._search = self._search.source(excludes=args)
        return self

457
    def include(self, *args):
458
        ''' Include only the given fields in the search results. '''
459
460
461
        self._search = self._search.source(includes=args)
        return self

462
    def execute(self):
463
        '''
464
        Executes without returning actual results. Only makes sense if the request
465
        was configured for statistics or quantity values.
466
        '''
467
468
469
        search = self._search.query(self.q)[0:0]
        response = search.execute()
        return self._response(response)
470

471
    def execute_scan(self, order_by: str = None, order: int = -1, **kwargs):
472
        '''
473
474
        This execute the search as scan. The result will be a generator over the found
        entries. Everything but the query part of this object, will be ignored.
475
        '''
476
477
478
        search = self._search.query(self.q)

        if order_by is not None:
479
            order_by_quantity = search_quantities[order_by]
480
481

            if order == 1:
482
                search = search.sort(order_by_quantity.search_field)
483
            else:
484
                search = search.sort('-%s' % order_by_quantity.search_field)
485

486
487
            search = search.params(preserve_order=True)

488
        for hit in search.params(**kwargs).scan():
489
            yield hit.to_dict()
490

491
    def execute_paginated(
492
            self, page: int = 1, per_page=10, order_by: str = None,
493
            order: int = -1):
494
        '''
495
496
497
498
499
500
501
        Executes the search and returns paginated results. Those are sorted.

        Arguments:
            page: The requested page, starts with 1.
            per_page: The number of entries per page.
            order_by: The quantity to order by.
            order: -1 or 1 for descending or ascending order.
502
        '''
503
        if order_by is None:
504
505
506
            order_by_quantity = order_default_quantities[self._domain]
        else:
            order_by_quantity = search_quantities[order_by]
507

508
        search = self._search.query(self.q)
509
510

        if order == 1:
511
            search = search.sort(order_by_quantity.search_field)
512
        else:
513
            search = search.sort('-%s' % order_by_quantity.search_field)
514
        search = search[(page - 1) * per_page: page * per_page]
515

Markus Scheidgen's avatar
Markus Scheidgen committed
516
        es_result = search.execute()
517

Markus Scheidgen's avatar
Markus Scheidgen committed
518
519
        result = self._response(es_result, with_hits=True)

520
        result.update(pagination=dict(total=result['total'], page=page, per_page=per_page))
521
522
        return result

523
524
525
    def execute_scrolled(
            self, scroll_id: str = None, size: int = 1000, scroll: str = u'5m',
            order_by: str = None, order: int = -1):
526
        '''
527
528
        Executes a scrolling search. based on ES scroll API. Pagination is replaced with
        scrolling, no ordering is available, no statistics, no quantities will be provided.
529

530
531
532
533
        Scrolling is done by calling this function again and again with the same ``scroll_id``.
        Each time, this function will return the next batch of search results. If the
        ``scroll_id`` is not available anymore, a new ``scroll_id`` is assigned and scrolling
        starts from the beginning again.
534

535
536
        The response will contain a 'scroll' part with attributes 'total', 'scroll_id',
        and 'size'.
537

538
539
540
541
542
543
        Arguments:
            scroll_id: The scroll id to receive the next batch from. None will create a new
                scroll.
            size: The batch size in number of hits.
            scroll: The time the scroll should be kept alive (i.e. the time between requests
                to this method) in ES time units. Default is 5 minutes.
544
545

        TODO support order and order_by
546
        '''
547
        es = infrastructure.elastic_client
548

549
550
551
        if scroll_id is None:
            # initiate scroll
            resp = es.search(  # pylint: disable=E1123
552
                body=self._search.query(self.q).to_dict(), scroll=scroll, size=size,
553
                index=config.elastic.index_name)
554

555
556
557
558
            scroll_id = resp.get('_scroll_id')
            if scroll_id is None:
                # no results for search query
                return dict(scroll=dict(total=0, size=size), results=[])
559

560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
        else:
            try:
                resp = es.scroll(scroll_id, scroll=scroll)  # pylint: disable=E1123
            except NotFoundError:
                raise ScrollIdNotFound()

        total = resp['hits']['total']
        results = list(hit['_source'] for hit in resp['hits']['hits'])

        # since we are using the low level api here, we should check errors
        if resp["_shards"]["successful"] < resp["_shards"]["total"]:
            utils.get_logger(__name__).error('es operation was unsuccessful on at least one shard')
            raise ElasticSearchError('es operation was unsuccessful on at least one shard')

        if len(results) == 0:
            es.clear_scroll(body={'scroll_id': [scroll_id]}, ignore=(404, ))  # pylint: disable=E1123
            scroll_id = None

        scroll_info = dict(total=total, size=size)
        if scroll_id is not None:
            scroll_info.update(scroll_id=scroll_id)

        return dict(scroll=scroll_info, results=results)

584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
    def execute_aggregated(
            self, after: str = None, per_page: int = 1000, includes: List[str] = None):
        '''
        Uses a composite aggregation on top of the search to go through the result
        set. This allows to go arbirarely deep without using scroll. But, it will
        only return results with ``upload_id``, ``calc_id`` and the given
        quantities. The results will be 'ordered' by ``upload_id``.

        Arguments:
            after: The key that determines the start of the current page. This after
                key is returned with each response. Use None (default) for the first
                request.
            per_page: The size of each page.
            includes: A list of quantity names that should be returned in addition to
                ``upload_id`` and ``calc_id``.
        '''
        upload_id_agg = A('terms', field="upload_id")
        calc_id_agg = A('terms', field="calc_id")

        composite = dict(
            sources=[dict(upload_id=upload_id_agg), dict(calc_id=calc_id_agg)],
            size=per_page)

        if after is not None:
            upload_id, calc_id = after.split(':')
            composite['after'] = dict(upload_id=upload_id, calc_id=calc_id)

        composite_agg = self._search.aggs.bucket('ids', 'composite', **composite)
        if includes is not None:
            composite_agg.metric('examples', A('top_hits', size=1, _source=dict(includes=includes)))

        search = self._search.query(self.q)[0:0]
        response = search.execute()

        ids = response['aggregations']['ids']
        if 'after_key' in ids:
            after_dict = ids['after_key']
            after = '%s:%s' % (after_dict['upload_id'], after_dict['calc_id'])
        else:
            after = None

        id_agg_info = dict(total=response['hits']['total'], after=after, per_page=per_page)

        def transform_result(es_result):
            result = dict(
                upload_id=es_result['key']['upload_id'],
                calc_id=es_result['key']['calc_id'])

            if includes is not None:
                source = es_result['examples']['hits']['hits'][0]['_source']
                for key in source:
                    result[key] = source[key]

            return result

        results = [
            transform_result(item) for item in ids['buckets']]

        return dict(aggregation=id_agg_info, results=results)

644
    def _response(self, response, with_hits: bool = False) -> Dict[str, Any]:
645
        '''
646
        Prepares a response object covering the total number of results, hits, statistics,
647
648
        and quantities. Other aspects like pagination and scrolling have to be added
        elsewhere.
649
        '''
650
        result: Dict[str, Any] = dict()
651
        aggs = response.aggregations.to_dict()
652

653
        # total
654
655
656
657
        total = response.hits.total if hasattr(response, 'hits') else 0
        result.update(total=total)

        # hits
658
659
        if len(response.hits) > 0 or with_hits:
            result.update(results=[hit.to_dict() for hit in response.hits])
660
661
662

        # statistics
        def get_metrics(bucket, code_runs):
663
            result = {}
664
            # TODO optimize ... go through the buckets not the metrics
665
            for metric in metrics:
666
667
668
                agg_name = 'metric:%s' % metric
                if agg_name in bucket:
                    result[metric] = bucket[agg_name]['value']
669
            result.update(code_runs=code_runs)
670
671
            return result

672
        statistics_results = {
673
            quantity_name[11:]: {
674
                str(bucket['key']): get_metrics(bucket, bucket['doc_count'])
675
                for bucket in quantity['buckets']
676
            }
677
            for quantity_name, quantity in aggs.items()
678
679
            if quantity_name.startswith('statistics:')
        }
680

681
682
683
684
685
686
687
688
689
        # global statistics
        global_statistics_results = {
            agg_name[18:]: agg.get('value')
            for agg_name, agg in aggs.items()
            if agg_name.startswith('global_statistics:')
        }
        if len(global_statistics_results) > 0:
            result.update(global_statistics=global_statistics_results)

690
691
692
        # totals
        totals_result = get_metrics(aggs, total)
        statistics_results['total'] = dict(all=totals_result)
Markus Scheidgen's avatar
Markus Scheidgen committed
693

694
695
        if len(statistics_results) > 0:
            result.update(statistics=statistics_results)
Markus Scheidgen's avatar
Markus Scheidgen committed
696

697
        # quantities
698
        def create_quantity_result(quantity_name, quantity):
699
700
701
702
703
704
705
            values = {}
            for bucket in quantity['buckets']:
                value = dict(
                    total=bucket['doc_count'])
                if 'examples' in bucket:
                    examples = [hit['_source'] for hit in bucket['examples']['hits']['hits']]
                    value.update(examples=examples)
Markus Scheidgen's avatar
Markus Scheidgen committed
706

707
                values[bucket['key'][quantity_name]] = value
708

709
            result = dict(values=values)
710
            if 'after_key' in quantity:
711
712
713
714
715
716
717
718
                after = quantity['after_key']
                if len(after) == 1:
                    result.update(after=after[quantity_name])
                else:
                    for key in after:
                        if key != quantity_name:
                            result.update(after=after[key])
                            break
719

720
            return result
721

722
        quantity_results = {
723
724
725
            quantity_name[9:]: create_quantity_result(quantity_name[9:], quantity)
            for quantity_name, quantity in aggs.items()
            if quantity_name.startswith('quantity:')
Markus Scheidgen's avatar
Markus Scheidgen committed
726
727
        }

728
729
        if len(quantity_results) > 0:
            result.update(quantities=quantity_results)
730
731

        return result
732

733
734
    def __str__(self):
        return json.dumps(self._search.to_dict(), indent=2)
735
736


737
def flat(obj, prefix=None):
738
    '''
739
740
    Helper that translates nested result objects into flattened dicts with
    ``domain.quantity`` as keys.
741
    '''
742
743
744
745
    if isinstance(obj, dict):
        result = {}
        for key, value in obj.items():
            if isinstance(value, dict):
746
                value = flat(value)
747
                for child_key, child_value in value.items():
748
                    result['%s.%s' % (key, child_key)] = child_value
749
750
751
752
753
754
755

            else:
                result[key] = value

        return result
    else:
        return obj
Markus Scheidgen's avatar
Markus Scheidgen committed
756
757
758


if __name__ == '__main__':
759
760
761
    # Due to this import, the parsing module will register all code_names based on parser
    # implementations.
    from nomad import parsing  # pylint: disable=unused-import
Markus Scheidgen's avatar
Markus Scheidgen committed
762
763
    import json

764
765
    def to_dict(search_quantity):
        result = {
Markus Scheidgen's avatar
Markus Scheidgen committed
766
767
            'name': search_quantity.qualified_name,
            'description': search_quantity.description,
768
            'many': search_quantity.many,
Markus Scheidgen's avatar
Markus Scheidgen committed
769
        }
770

Markus Scheidgen's avatar
Markus Scheidgen committed
771
772
        if search_quantity.statistic_fixed_size is not None:
            result['statistic_size'] = search_quantity.statistic_fixed_size
773
774
775
776
777
778
779
        if search_quantity.statistic_values is not None:
            result['statistic_values'] = search_quantity.statistic_values

        return result

    export = {
        search_quantity.qualified_name: to_dict(search_quantity)
Markus Scheidgen's avatar
Markus Scheidgen committed
780
781
782
        for search_quantity in search_quantities.values()
    }
    print(json.dumps(export, indent=2))