elasticsearch_extension.py 47.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#
# Copyright The NOMAD Authors.
#
# This file is part of NOMAD. See https://nomad-lab.eu for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

'''
This elasticsearch extension for the Metainfo allows to define how quantities are
added to Elasticsearch indices.

This extension supports two search indices: ``entry_index`` and ``material_index``.
There are three different types of "searchable documents": ``entry_type``, ``material_type``,
``material_entry_type``. Entry documents are indexed in the entry index; material documents
in the material index. The material entry documents are nested documents in material documents.

The document types are subsets of the metainfo schema; documents have the exact same
structure as archives, but with only some of the quantities. Which quantities are in these
documents can be defined in the metainfo by using the :class:`Elasticsearch` annotation on
quantity definitions.

Entry and material entry documents start with the metainfo entry root section.
Material documents start with the ``results.material`` sub-section. Nested material entry
documents are placed under the ``entries`` key within a material document. This is the only
exception, where the material document structure deviates from the metainfo/archive structure.

A quantity can appear in multiple document types. All indexed quantities
appear by default in entry documents. If specified quantities are also put in either
the material document or a nested material entry document within the material document.
The material quantities describe the material itself
(e.g. formula, elements, system type, symmetries). These quantities are always in all
entries of the same material. Material entry quantities describe individual results
and metadata that are contributed by the entries of this material (e.g. published, embargo,
band gap, available properties). The values contributed by different entries of the same
material may vary.

Here is a small metainfo example:

.. code-block:: python

    class Entry(MSection):

        entry_id = Quantity(
            type=str,
            a_elasticsearch=Elasticsearch(material_entry_type))

58
        upload_create_time = Quantity(
59
60
61
            type=Datetime,
            a_elasticsearch=Elasticsearch())

62
        results = SubSection(sub_section=Results.m_def, a_elasticsearch=Elasticsearch())
63
64
65
66


    class Results(MSection):

67
68
        material = SubSection(sub_section=Material.m_def, a_elasticsearch=Elasticsearch())
        properties = SubSection(sub_section=Properties.m_def, a_elasticsearch=Elasticsearch())
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101


    class Material(MSection):

        material_id = Quantity(
            type=str,
            a_elasticsearch=Elasticsearch(material_type))

        formula = Quantity(
            type=str,
            a_elasticsearch=[
                Elasticsearch(material_type),
                Elasticsearch(material_type, field='text', mapping='text')])


    class Properties(MSection):

        available_properties = Quantity(
            type=str, shape=['*'],
            a_elasticsearch=Elasticsearch(material_entry_type))

        band_gap = Quantity(
            type=float, unit='J',
            a_elasticsearch=Elasticsearch(material_entry_type))


The resulting indices with a single entry in them would look like this. Entry index:

.. code-block:: json

    [
        {
            "entry_id": "de54f1",
102
            "upload_create_time": "2021-02-01 01:23:12",
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
            "results": {
                "material": {
                    "material_id": "23a8bf",
                    "formula": "H2O"
                },
                "properties": {
                    "available_properties": ["dos", "bs", "band_gap", "energy_total_0"],
                    "band_gap": 0.283e-12
                }
            }
        }
    ]


And material index:

.. code-block:: json

    [
        {
            "material_id": "23a8bf",
            "formula": "H2O"
            "entries": [
                {
                    "entry_id": "de54f1",
                    "results": {
                        "properties": {
                            "available_properties": ["dos", "bs", "band_gap", "energy_total_0"],
                            "band_gap": 0.283e-12
                        }
                    }
                }
            ]
        }
    ]

You can freely define sub-sections and quantities. The only fixed structures that are
required from the metainfo are:
- the root section has an ``entry_id``
- materials are placed in ``results.material``
- the ``results.material`` sub-section has a ``material_id``
- the ``results.material`` sub-section has no property called ``entries``

146
147
This extension resolves references during indexing and basically treats referenced
sub-sections as if they were direct sub-sections.
148
149
150
151
152
153
154
155
156
157
158
159

.. autofunction:: index_entry
.. autofunction:: index_entries
.. autofunction:: create_indices


.. autoclass:: Elasticsearch
.. autoclass:: DocumentType
.. autoclass:: Index
'''


160
161
from typing import Union, Any, Dict, cast, Set, List, Callable, Tuple, DefaultDict
from collections import defaultdict
162
import numpy as np
163
import re
164

165
from nomad import config, utils
166
167

from .metainfo import (
168
    MSectionBound, Section, Quantity, MSection, MEnum, Datetime, Reference, DefinitionAnnotation,
169
    Definition, QuantityReference)
170
171
172
173
174
175
176


class DocumentType():
    '''
    DocumentType allows to create Elasticsearch index mappings and documents based on
    Metainfo definitions and instances. Genrally this class should not be used outside
    the elasticsearch_extension module.
177
178
179

    Attributes:
        root_section_def: The section definition that serves as the root for all documents.
180
            mapping: The elasticsearch mapping definition.
181
182
183
184
185
186
187
188
        indexed_properties: All definitions (quantities and sub sections) that are covered
            by documents of this type.
        quantities: All elasticsearch quantities that in documents of this type. A dictionary
            with full qualified name as key and :class:`Elasticsearch` annotations as
            values.
        metrics: All metrics in this document type. A dictionary with metric names as
            keys and tuples of elasticsearch metric aggregation and respective
            :class:`Elasticsearch` metainfo annotation as values.
189
190
        id_field: The quantity (and elasticsearch field) name that is used as unique
            identifier for this type of documents.
191
    '''
192
    def __init__(self, name: str, id_field: str):
193
        self.name = name
194
        self.id_field = id_field
195
196
197
        self.root_section_def = None
        self.mapping: Dict[str, Any] = None
        self.indexed_properties: Set[Definition] = set()
198
199
        self.nested_object_keys: List[str] = []
        self.nested_sections: List[SearchQuantity] = []
200
        self.quantities: Dict[str, SearchQuantity] = {}
201
        self.suggestions: Dict[str, Elasticsearch] = {}
202
        self.metrics: Dict[str, Tuple[str, SearchQuantity]] = {}
203
204
205

    def _reset(self):
        self.indexed_properties.clear()
206
        self.nested_object_keys.clear()
207
        self.nested_sections.clear()
208
209
        self.quantities.clear()
        self.metrics.clear()
210
211
212
213
214

    def create_index_doc(self, root: MSection):
        '''
        Creates an indexable document from the given archive.
        '''
215
216
217
        suggestions: DefaultDict = defaultdict(list)

        def transform(quantity, section, value, path):
218
            '''
219
220
            Custom transform function that possibly transforms the indexed
            values and also gathers the suggestion values for later storage.
221
            '''
222
223
224
            elasticsearch_annotations = quantity.m_get_annotations(Elasticsearch, as_list=True)
            for elasticsearch_annotation in elasticsearch_annotations:
                if elasticsearch_annotation.field is None:
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
                    if elasticsearch_annotation.suggestion:
                        # The suggestions may have a different doc_type: we don't
                        # serialize them if the doc types don't match.
                        if self != entry_type and elasticsearch_annotation.doc_type != self:
                            continue

                        # The suggestion values are saved into a temporary
                        # dictionary. The actual path of the data in the metainfo
                        # is used as a key.
                        transform_function = elasticsearch_annotation.value
                        if transform_function is not None:
                            suggestion_value = transform_function(value)
                        else:
                            suggestion_value = value
                        section_path = section.m_path()[len(root.m_path()):]
                        name = elasticsearch_annotation.property_name
                        if path:
                            suggestion_path = f"{section_path}/{path}/{name}"
                        else:
                            suggestion_path = f"{section_path}/{name}"
                        suggestions[suggestion_path].extend(suggestion_value)
                    else:
                        transform_function = elasticsearch_annotation.value
                        if transform_function is not None:
                            return transform_function(section)
250
251
252

            return value

253
254
255
256
257
258
        def exclude(property_, section):
            if property_ not in self.indexed_properties:
                return True

            return False

259
260
261
262
        kwargs: Dict[str, Any] = dict(
            with_meta=False,
            include_defaults=True,
            include_derived=True,
263
            resolve_references=True,
264
            exclude=exclude,
265
266
267
268
            transform=transform
        )

        result = root.m_to_dict(**kwargs)
269

270
271
272
273
274
275
276
277
278
279
280
281
282
283
        # Add the collected suggestion values
        for path, value in suggestions.items():
            parts = path.split("/")
            section = result
            for part in parts[:-1]:
                if part == "":
                    continue
                try:
                    part = int(part)
                except ValueError:
                    pass
                section = section[part]
            section[parts[-1]] = value

284
285
        # TODO deal with metadata
        metadata = result.get('metadata')
286
        if metadata is not None:
287
            del(result['metadata'])
288
289
290
291
292
293
294
            result.update(**metadata)

        return result

    def create_mapping(
            self, section_def: Section, prefix: str = None,
            auto_include_subsections: bool = False):
295
296
297
298
        '''
        Creates an Elasticsearch mapping for the given root section. It traverses all
        sub-sections to create the mapping. It will not create the mapping for nested
        documents. These have to be created manually (e.g. by :func:`create_indices`).
299
        Will override the existing mapping.
300
301
302
303
304
305
306
307

        Arguments:
            section_def: The section definition to create a mapping for.
            prefix: The qualified name of the section within the search index. This
                is used to create the qualified names of quantities and sub-sections.
            auto_include_subsections: Considers all sub and sub sub sections regardless
                of any annotation in the sub section definitions. By default only
                sub sections with elasticsearch annotation are traversed.
308
309
310
        '''
        mappings: Dict[str, Any] = {}

311
312
313
        if self == material_type and prefix is None:
            mappings['n_entries'] = {'type': 'integer'}

314
315
        for quantity_def in section_def.all_quantities.values():
            elasticsearch_annotations = quantity_def.m_get_annotations(Elasticsearch, as_list=True)
316
            for elasticsearch_annotation in elasticsearch_annotations:
317
                if self != entry_type and elasticsearch_annotation.doc_type != self:
318
319
                    continue

320
321
322
323
324
325
                if prefix is None:
                    qualified_name = quantity_def.name
                else:
                    qualified_name = f'{prefix}.{quantity_def.name}'
                if elasticsearch_annotation.suggestion:
                    self.suggestions[qualified_name] = elasticsearch_annotation
326
327
                is_section_reference = isinstance(quantity_def.type, Reference)
                is_section_reference &= not isinstance(quantity_def.type, QuantityReference)
328
329
330
                if is_section_reference:
                    # Treat referenced sections as sub-sections
                    assert quantity_def.type.target_section_def is not None
331
                    # TODO e.g. viewers, entry_coauthors, etc. ... should be treated as multiple inner docs
332
                    # assert quantity_def.is_scalar
333

334
335
                    reference_mapping = self.create_mapping(
                        cast(Section, quantity_def.type.target_section_def),
336
                        prefix=qualified_name)
337
338
                    if len(reference_mapping['properties']) > 0:
                        mappings[quantity_def.name] = reference_mapping
339
                else:
340
341
342
343
                    mapping = mappings.setdefault(elasticsearch_annotation.property_name, {})
                    fields = elasticsearch_annotation.fields
                    if len(fields) > 0:
                        mapping.setdefault('fields', {}).update(**fields)
344

345
346
347
348
                    else:
                        mapping.update(**elasticsearch_annotation.mapping)

                self.indexed_properties.add(quantity_def)
349
                self._register(elasticsearch_annotation, prefix)
350
351

        for sub_section_def in section_def.all_sub_sections.values():
352
353
            annotation = sub_section_def.m_get_annotations(Elasticsearch)
            if annotation is None and not auto_include_subsections:
354
355
                continue

356
357
358
359
360
            assert not isinstance(annotation, list), \
                'sub sections can onyl have one elasticsearch annotation'
            continue_with_auto_include_subsections = auto_include_subsections or (
                False if annotation is None else annotation.auto_include_subsections)

361
362
363
364
            if prefix is None:
                qualified_name = sub_section_def.name
            else:
                qualified_name = f'{prefix}.{sub_section_def.name}'
365

366
367
            # TODO deal with metadata
            qualified_name = re.sub(r'\.?metadata', '', qualified_name)
368
            qualified_name = None if qualified_name == '' else qualified_name
369

370
371
372
            sub_section_mapping = self.create_mapping(
                sub_section_def.sub_section, prefix=qualified_name,
                auto_include_subsections=continue_with_auto_include_subsections)
373

374
375
376
377
            nested = annotation is not None and annotation.nested
            if nested:
                sub_section_mapping['type'] = 'nested'

378
            if len(sub_section_mapping['properties']) > 0:
379
                if sub_section_def.name == 'metadata':
380
381
                    mappings.update(**sub_section_mapping['properties'])
                else:
382
                    mappings[sub_section_def.name] = sub_section_mapping
383
                self.indexed_properties.add(sub_section_def)
384
385
                if nested and qualified_name not in self.nested_object_keys:
                    self.nested_object_keys.append(qualified_name)
386
387
388

                    search_quantity = SearchQuantity(annotation=annotation, doc_type=self, prefix=prefix)
                    self.nested_sections.append(search_quantity)
389
                    self.nested_object_keys.sort(key=lambda item: len(item))
390
391
392
393

        self.mapping = dict(properties=mappings)
        return self.mapping

394
    def _register(self, annotation, prefix):
395
396
        search_quantity = SearchQuantity(annotation=annotation, doc_type=self, prefix=prefix)
        name = search_quantity.qualified_name
397

398
        assert name not in self.quantities or self.quantities[name] == search_quantity, \
399
            'Search quantity names must be unique: %s' % name
400

401
        self.quantities[name] = search_quantity
402
403
404

        if annotation.metrics is not None:
            for name, metric in annotation.metrics.items():
405
406
                assert name not in self.metrics, 'Metric names must be unique: %s' % name
                self.metrics[name] = (metric, search_quantity)
407

408
    def __repr__(self):
409
        return self.name
410

411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458

class Index():
    '''
    Allows to access an Elasticsearch index. It forwards method calls to Python's
    Elasticsearch package for Elasticsearch document APIs like search, index, get, mget,
    bulk, etc. It adds the necessary doc_type and index parameters for you.

    Arguments:
        doc_type: The :class:`DocumentType` instance that describes the document type
            of this index.
        index_config_key: The ``nomad.config.elastic`` config key that holds the name
            for this index.

    Attributes:
        elastic_client: The used Elasticsearch Python package client.
        index_name: The name of the index in Elasticsearch.
    '''
    def __init__(self, doc_type: DocumentType, index_config_key: str):
        self.doc_type = doc_type
        self.index_config_key = index_config_key

    def __elasticsearch_operation(self, name: str, *args, **kwargs):
        if 'doc_type' not in kwargs:
            kwargs['doc_type'] = self.doc_type.name
        if 'index' not in kwargs:
            kwargs['index'] = self.index_name

        results = getattr(self.elastic_client, name)(*args, **kwargs)
        return results

    @property
    def index_name(self):
        return getattr(config.elastic, self.index_config_key)

    @property
    def elastic_client(self):
        from nomad.infrastructure import elastic_client
        return elastic_client

    def __getattr__(self, name):
        if name not in ['get', 'index', 'mget', 'bulk', 'search']:
            return super().__getattribute__(name)

        def wrapper(*args, **kwargs):
            return self.__elasticsearch_operation(name, *args, **kwargs)

        return wrapper

459
    def create_index(self, upsert: bool = False):
460
461
        ''' Initially creates the index with the mapping of its document type. '''
        assert self.doc_type.mapping is not None, 'The mapping has to be created first.'
462
463
        logger = utils.get_logger(__name__, index=self.index_name)
        if not self.elastic_client.indices.exists(index=self.index_name):
464
465
            # TODO the settings emulate the path_analyzer used in the v0 elasticsearch_dsl
            # based index, configured by nomad.datamodel.datamodel::path_analyzer
466
            self.elastic_client.indices.create(index=self.index_name, body={
467
468
469
470
471
472
473
474
475
476
                'settings': {
                    'analysis': {
                        'analyzer': {
                            'path_analyzer': {'tokenizer': 'path_tokenizer', 'type': 'custom'}
                        },
                        'tokenizer': {
                            'path_tokenizer': {'pattern': '/', 'type': 'pattern'}
                        }
                    }
                },
477
478
479
480
481
                'mappings': {
                    self.doc_type.name: self.doc_type.mapping
                }
            })
            logger.info('elasticsearch index created')
482
483
484
485
486
487
        elif upsert:
            self.elastic_client.indices.put_mapping(
                index=self.index_name,
                doc_type=self.doc_type.name,
                body=self.doc_type.mapping)
            logger.info('elasticsearch index updated')
488
489
490
        else:
            logger.info('elasticsearch index exists')

491
492
493
494
    def delete(self):
        if self.elastic_client.indices.exists(index=self.index_name):
            self.elastic_client.indices.delete(index=self.index_name)

495
496
    def refresh(self):
        self.elastic_client.indices.refresh(index=self.index_name)
497
498


499
500
# TODO type 'doc' because it's the default used by elasticsearch_dsl and the v0 entries index.
# 'entry' would be more descriptive.
501
502
503
entry_type = DocumentType('doc', id_field='entry_id')
material_type = DocumentType('material', id_field='material_id')
material_entry_type = DocumentType('material_entry', id_field='entry_id')
504

505
506
entry_index = Index(entry_type, index_config_key='entries_index')
material_index = Index(material_type, index_config_key='materials_index')
507
508


509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
def tokenizer_default(value):
    """The default suggestion tokenizer. Contains the full value and the value
    split into tokens using whitespace, dot and underscore.
    """
    tokens = [value]
    fragments = list(filter(lambda x: x != "", re.split(r'[\s_]', value)))
    if len(fragments) > 1:
        tokens += fragments
    return tokens


def tokenizer_formula(value):
    """Suggestion tokenizer for chemical formulas. Contains the full value and the value
    split into formula fragments.
    """
    tokens = [value]
    fragments = re.findall(r'[A-Z][a-z]?\d*', value)
    if len(fragments) > 1:
        tokens += fragments
    return tokens


531
532
533
534
class Elasticsearch(DefinitionAnnotation):
    '''
    A metainfo annotation for quantity definitions. This annotation can be used multiple
    times on the same quantity (e.g. to define Elasticsearch fields with differrent mapping
535
536
537
538
539
    types). Each annotation will create a field in the respective elasticsearch document type.

    This annotation has to be used on all sub sections that lead to quantities that should
    be included. On sub sections an inner document mapping is applied and all other
    arguments are ignored.
540
541
542
543

    Arguments:
        doc_type: An additional document type: ``material_type`` or ``material_entry_type``.
            All quantities with this annotation are automatically placed in ``entry_type``.
544
545
546
547
        mapping: The Elasticsearch mapping for the underlying elasticsearch field. The
            default depends on the quantity type. You can provide the elasticsearch type
            name, a full dictionary with additional elasticsearch mapping parameters, or
            an elasticsearch_dsl mapping object.
548
549
        field: Allows to specify sub-field name. There has to be another annotation on the
            same quantity with the default name. The custom field name is concatenated
550
551
552
553
554
555
556
557
558
559
            to the default. This will create an additional mapping for this
            quantity. In queries this can be used like an additional field, but the
            quantity is only stored once (under the quantity name) in the source document.
        value:
            A callable that is applied to the containering section to get a value for
            this quantity when saving the section in the elastic search index. By default
            this will be the serialized quantity value.
        index:
            A boolean that indicates if this quantity should be indexed or merely be
            part of the elastic document ``_source`` without being indexed for search.
560
        values:
561
            If the quantity is used in aggregations for a fixed set of values,
562
563
            use this parameter to preset these values. On aggregation, elasticsearch
            will only return values that exist in the search results. This allows to
564
            create 0 statistic values and return consistent set of values. If the underlying
565
            quantity is an Enum, the values are determined automatically.
566
567
568
        default_aggregation_size:
            The of values to return by default if this quantity is used in aggregation.
            If no value is given and there are not fixed value, 10 will be used.
569
        metrics:
570
            If the quantity is used as a metric for aggregating, this has to
571
572
573
            be used to define a valid elasticsearch metrics aggregations, e.g.
            'sum' or 'cardinality'. It is a dictionary with metric name as key,
            and elasticsearch aggregation name as values.
574
575
576
577
578
579
580
581
582
583
584
585
586
        many_all:
            Multiple values can be used to search based on a property. If no operator
            is given by the user, a logical or is used, i.e., all those entries that
            have at least one the values are selected (any operator). Set many_all to true
            to alter the default behavior to a logical and (all operator). If the user
            provides an operator, the provided operator is used regardless. Usually
            many_all is only sensible for properties that can have multiple value per
            entry (e.g. elements).
        auto_include_subsections:
            If true all sub and sub sub sections are considered for search even if
            there are no elasticsearch annotations in the sub section definitions.
            By default only sub sections with elasticsearch annotation are considered
            during index mapping creation.
587
588
589
        nested:
            If true the section is mapped to elasticsearch nested object and all queries
            become nested queries. Only applicable to sub sections.
590
        suggestion:
591
592
593
594
            If true, a mapping with postfix '__suggestion' or a field called
            'suggestion' is automatically created for this metainfo (depends on
            whether you have specified a custom tokenizer with "value" or
            not). This stores autocompletion suggestions for this value.
595
596

    Attributes:
597
598
        name:
            The name of the quantity (plus additional field if set).
599
600
601
602
603
    '''
    def __init__(
            self,
            doc_type: DocumentType = entry_type,
            mapping: Union[str, Dict[str, Any]] = None,
604
605
            field: str = None,
            es_field: str = None,
606
            value: Callable[[MSectionBound], Any] = None,
607
            index: bool = True,
608
609
            values: List[str] = None,
            default_aggregation_size: int = None,
610
611
            metrics: Dict[str, str] = None,
            many_all: bool = False,
612
            auto_include_subsections: bool = False,
613
            nested: bool = False,
614
            suggestion: Union[str, Callable[[MSectionBound], Any]] = None,
615
616
617
618
            _es_field: str = None):

        # TODO remove _es_field if it is not necessary anymore to enforce a specific mapping
        # for v0 compatibility
619
        if suggestion:
620
621
622
623
624
625
626
627
            if doc_type != entry_type:
                raise ValueError("Suggestions should only be stored in the entry index.")
            for arg in [field, mapping, es_field, _es_field]:
                if arg is not None:
                    raise ValueError(f"You cannot modify the way suggestions are mapped or named.")
            # If no tokenizer is specified, the suggestion is stored as a field
            # that holds only the original value.
            if suggestion == "simple":
628
                field = "suggestion"
629
630
631
632
633
634
            elif suggestion == "formula":
                value = tokenizer_formula
            elif suggestion == "default":
                value = tokenizer_default
            elif callable(suggestion):
                value = suggestion
635
            else:
636
637
638
639
640
641
                raise ValueError(
                    "Please provide the suggestion as one of the predefined "
                    "shortcuts, False or a custom callable."
                )

        self._custom_mapping = mapping
642
        self.field = field
643
        self._es_field = field if _es_field is None else _es_field
644
        self.doc_type = doc_type
645
646
647
        self.value = value
        self.index = index
        self._mapping: Dict[str, Any] = None
648

649
        self.default_aggregation_size = default_aggregation_size
650
        self.values = values
651
        self.metrics = metrics
652
653
654
        self.many_all = many_all

        self.auto_include_subsections = auto_include_subsections
655
        self.nested = nested
656
657
658
659
660
        self.suggestion = suggestion

    @property
    def values(self):
        return self._values
661

662
663
664
665
666
    @values.setter
    def values(self, value):
        self._values = value
        if self.default_aggregation_size is None and self._values is not None:
            self.default_aggregation_size = len(self._values)
667

668
    @property
669
    def mapping(self) -> Dict[str, Any]:
670
671
672
        if self._mapping is not None:
            return self._mapping

673
        if self.suggestion:
674
675
676
677
            from elasticsearch_dsl import Completion
            self._mapping = Completion().to_dict()
            return self._mapping

678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
        if self._custom_mapping is not None:
            from elasticsearch_dsl import Field

            if isinstance(self._custom_mapping, Field):
                self._mapping = self._custom_mapping.to_dict()
            elif isinstance(self._custom_mapping, str):
                self._mapping = dict(type=self._custom_mapping)
            else:
                self._mapping = self._custom_mapping

            return self._mapping

        def compute_mapping(quantity: Quantity) -> Dict[str, Any]:
            if quantity.type == str:
                return dict(type='keyword')
693
            elif quantity.type in [float, np.float64]:
694
                return dict(type='double')
695
            elif quantity.type == np.float32:
696
                return dict(type='float')
697
            elif quantity.type in [int, np.int32]:
698
                return dict(type='integer')
699
            elif quantity.type == np.int64:
700
701
702
703
704
705
706
707
                return dict(type='long')
            elif quantity.type == bool:
                return dict(type='boolean')
            elif quantity.type == Datetime:
                return dict(type='date')
            elif isinstance(quantity.type, QuantityReference):
                return compute_mapping(quantity.type.target_quantity_def)
            elif isinstance(quantity.type, Reference):
708
                raise NotImplementedError('Resolving section references is not supported.')
709
710
711
712
713
714
715
716
717
718
719
720
721
            elif isinstance(quantity.type, MEnum):
                return dict(type='keyword')
            else:
                raise NotImplementedError(
                    'Quantity type %s for quantity %s is not supported.' % (quantity.type, quantity))

        self._mapping = compute_mapping(cast(Quantity, self.definition))

        if not self.index:
            self._mapping['index'] = False

        return self._mapping

722
    @property
723
    def fields(self) -> Dict[str, Any]:
724
        if self._es_field == '' or self._es_field is None:
725
726
727
            return {}

        return {
728
            self._es_field: self.mapping
729
730
731
        }

    @property
732
    def property_name(self) -> str:
733
734
        if self.suggestion and not self.field:
            return f'{self.definition.name}__suggestion'
735
736
        return self.definition.name

737
738
739
740
    @property
    def name(self) -> str:
        if self.field is not None:
            return f'{self.property_name}.{self.field}'
741
742
        elif self.suggestion:
            return f'{self.property_name}__suggestion'
743
744
745
        else:
            return self.property_name

746
747
748
    def __repr__(self):
        if self.definition is None:
            return super().__repr__()
749

750
        return f'Elasticsearch({self.definition})'
751

752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767

class SearchQuantity():
    '''
    This is used to represent search quantities. It is different from a metainfo quantity
    because the same metainfo quantity can appear multiple times at different places in
    an archive (an search index document). A search quantity is uniquely identified by
    a qualified name that pin points its place in the sub-section hierarchy.

    Arguments:
        annotation: The elasticsearch annotation that this search quantity is based on.
        doc_type: The elasticsearch document type that this search quantity appears in.
        prefix: The prefix to build the full qualified name for this search quantity.

    Attributes:
        qualified_field:
            The full qualified name of the resulting elasticsearch field in the entry
768
769
770
            document type. This will be the quantity name (plus additional
            field or suggestion postfix if set) with subsection names up to the
            root of the metainfo data.
771
        search_field:
772
            The full qualified name of the field in the elasticsearch index.
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
        qualified_name:
            Same name as qualified_field. This will be used to address the search
            property in our APIs.
        definition: The metainfo quantity definition that this search quantity is based on
        aggregateable:
            A boolean that determines, if this quantity can be used in aggregations.
    '''
    def __init__(self, annotation: Elasticsearch, doc_type: DocumentType, prefix: str):
        self.annotation = annotation
        self.doc_type = DocumentType

        qualified_field = self.annotation.definition.name

        if prefix is not None:
            qualified_field = f'{prefix}.{qualified_field}'

        if annotation.field is not None:
            qualified_field = f'{qualified_field}.{annotation.field}'

792
793
794
        if annotation.suggestion:
            qualified_field = f'{qualified_field}__suggestion'

795
796
        self.qualified_field = qualified_field
        self.qualified_name = qualified_field
797

798
799
800
801
        self.search_field = qualified_field
        if not(annotation._es_field == '' or annotation._es_field is None):
            self.search_field = f'{qualified_field}.{annotation._es_field}'

802
    @property
803
804
    def definition(self):
        return self.annotation.definition
805

806
807
    @property
    def aggregateable(self):
808
809
810
811
        if isinstance(self.definition.type, Reference):
            return False

        return self.annotation.mapping['type'] == 'keyword'
812
813
814
815
816

    def __repr__(self):
        if self.definition is None:
            return super().__repr__()

817
818
819
820
        return f'SearchQuantity({self.qualified_field})'

    def __getattr__(self, name):
        return getattr(self.annotation, name)
821

822
823
824

def create_indices(entry_section_def: Section = None, material_section_def: Section = None):
    '''
825
826
    Creates the mapping for all document types and creates the indices in Elasticsearch.
    The indices must not exist already. Prior created mappings will be replaced.
827
828
829
830
831
832
    '''
    if entry_section_def is None:
        from nomad.datamodel import EntryArchive
        entry_section_def = EntryArchive.m_def

    if material_section_def is None:
833
        from nomad.datamodel.results import Material
834
835
        material_section_def = Material.m_def

836
837
838
839
    entry_type._reset()
    material_type._reset()
    material_entry_type._reset()

840
    entry_type.create_mapping(entry_section_def)
841
    material_type.create_mapping(material_section_def, auto_include_subsections=True)
842
    material_entry_type.create_mapping(entry_section_def, prefix='entries')
843
844
845
846

    # Here we manually add the material_entry_type mapping as a nested field
    # inside the material index. We also need to manually specify the
    # additional nested fields that come with this: the entries + all
847
848
849
    # nested_object_keys from material_entry_type. Notice that we need to sort
    # the list: the API expects a list sorted by name length in ascending
    # order.
850
851
    material_entry_type.mapping['type'] = 'nested'
    material_type.mapping['properties']['entries'] = material_entry_type.mapping
852
853
    material_type.nested_object_keys += ['entries'] + material_entry_type.nested_object_keys
    material_type.nested_object_keys.sort(key=lambda item: len(item))
854

855
    entry_index.create_index(upsert=True)  # TODO update the existing v0 index
856
857
858
    material_index.create_index()


859
860
861
862
863
def delete_indices():
    entry_index.delete()
    material_index.delete()


864
def index_entry(entry: MSection, **kwargs):
865
866
867
868
    '''
    Upserts the given entry in the entry index. Optionally updates the materials index
    as well.
    '''
869
870
871
872
873
874
    index_entries([entry], **kwargs)


def index_entries_with_materials(entries: List, refresh: bool = False):
    index_entries(entries, refresh=refresh)
    update_materials(entries, refresh=refresh)
875
876


877
def index_entries(entries: List, refresh: bool = False):
878
879
880
881
882
    '''
    Upserts the given entries in the entry index. Optionally updates the materials index
    as well.
    '''
    # split into reasonably sized problems
883
884
    if len(entries) > config.elastic.bulk_size:
        for entries_part in [entries[i:i + config.elastic.bulk_size] for i in range(0, len(entries), config.elastic.bulk_size)]:
885
            index_entries(entries_part, refresh=refresh)
886
887
888
889
890
        return

    if len(entries) == 0:
        return

891
892
893
894
895
896
    logger = utils.get_logger('nomad.search', n_entries=len(entries))

    with utils.timer(logger, 'prepare bulk index of entries actions and docs'):
        actions_and_docs = []
        for entry in entries:
            actions_and_docs.append(dict(index=dict(_id=entry['entry_id'])))
897
898
899
900
901
            try:
                entry_index_doc = entry_type.create_index_doc(entry)
                actions_and_docs.append(entry_index_doc)
            except Exception as e:
                logger.error('could not create entry index doc', calc_id=entry['entry_id'], exc_info=e)
902

903
        timer_kwargs: Dict[str, Any] = {}
904
905
906
        try:
            import json
            timer_kwargs['size'] = len(json.dumps(actions_and_docs))
907
            timer_kwargs['n_actions'] = len(actions_and_docs)
908
909
910
        except Exception:
            pass

911
912
913
914
915
    with utils.timer(
        logger, 'perform bulk index of entries',
        lnr_event='failed to bulk index entries',
        **timer_kwargs
    ):
916
917
918
919
        entry_index.bulk(
            body=actions_and_docs, refresh=refresh,
            timeout=f'{config.elastic.bulk_timeout}s',
            request_timeout=config.elastic.bulk_timeout)
920

921

922
923
def update_materials(entries: List, refresh: bool = False):
    # split into reasonably sized problems
924
925
    if len(entries) > config.elastic.bulk_size:
        for entries_part in [entries[i:i + config.elastic.bulk_size] for i in range(0, len(entries), config.elastic.bulk_size)]:
926
927
928
929
            update_materials(entries_part, refresh=refresh)
        return

    if len(entries) == 0:
930
931
        return

932
933
    logger = utils.get_logger('nomad.search', n_entries=len(entries))

934
935
936
937
938
939
940
941
    def get_material_id(entry):
        material_id = None
        try:
            material_id = entry.results.material.material_id
        except AttributeError:
            pass
        return material_id

942
943
944
945
946
947
    # Get all entry and material ids.
    entry_ids, material_ids = set(), set()
    entries_dict = {}
    for entry in entries:
        entries_dict[entry.entry_id] = entry
        entry_ids.add(entry.entry_id)
948
949
950
        material_id = get_material_id(entry)
        if material_id is not None:
            material_ids.add(material_id)
951

952
953
    logger = logger.bind(n_materials=len(material_ids))

954
955
    # Get existing materials for entries' material ids (i.e. the entry needs to be added
    # or updated).
956
    with utils.timer(logger, 'get existing materials', lnr_event='failed to get existing materials'):
957
        if material_ids:
958
959
960
961
962
            elasticsearch_results = material_index.mget(
                body={
                    'docs': [dict(_id=material_id) for material_id in material_ids]
                },
                request_timeout=config.elastic.bulk_timeout)
963
964
965
966
            existing_material_docs = [
                doc['_source'] for doc in elasticsearch_results['docs'] if '_source' in doc]
        else:
            existing_material_docs = []
967
968
969
970

    # Get old materials that still have one of the entries, but the material id has changed
    # (i.e. the materials where entries need to be removed due entries having different
    # materials now).
971
    with utils.timer(logger, 'get old materials', lnr_event='failed to get old materials'):
972
973
974
975
976
977
978
979
980
981
982
        elasticsearch_results = material_index.search(body={
            'size': len(entry_ids),
            'query': {
                'bool': {
                    'must': {
                        'nested': {
                            'path': 'entries',
                            'query': {
                                'terms': {
                                    'entries.entry_id': list(entry_ids)
                                }
983
984
                            }
                        }
985
986
987
988
989
                    },
                    'must_not': {
                        'terms': {
                            'material_id': list(material_ids)
                        }
990
991
992
                    }
                }
            }
993
994
        })
        old_material_docs = [hit['_source'] for hit in elasticsearch_results['hits']['hits']]
995
996
997
998
999
1000
1001
1002
1003

    # Compare and create the appropriate materials index actions
    # First, we go through the existing materials. The following cases need to be covered:
    # - an entry needs to be updated within its existing material (standard case)
    # - an entry needs to be added to an existing material (new entry case)
    # - there is an entry with no existing material (new material case)
    # - there is an entry that moves from one existing material to another (super rare
    #   case where an entry's material id changed within the set of other entries' material ids)
    # This n + m complexity with n=number of materials and m=number of entries
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019

    # We create lists of bulk operations. Each list only contains enough materials to
    # have the ammount of entries in all these materials roughly match the desired bulk size.
    # Using materials as a measure might not be good enough, if a single material has
    # lots of nested entries.
    _actions_and_docs_bulks: List[List[Any]] = []
    _n_entries_in_bulk = [0]

    def add_action_or_doc(action_or_doc):
        if len(_actions_and_docs_bulks) == 0 or _n_entries_in_bulk[0] > config.elastic.bulk_size:
            _n_entries_in_bulk[0] = 0
            _actions_and_docs_bulks.append([])
        _actions_and_docs_bulks[-1].append(action_or_doc)
        if 'entries' in action_or_doc:
            _n_entries_in_bulk[0] = _n_entries_in_bulk[0] + len(action_or_doc['entries'])

1020
    material_docs = []
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
    material_docs_dict = {}
    remaining_entry_ids = set(entry_ids)
    for material_doc in existing_material_docs:
        material_id = material_doc['material_id']
        material_docs_dict[material_id] = material_doc
        material_entries = material_doc['entries']
        material_entries_to_remove = []
        for index, material_entry in enumerate(material_entries):
            entry_id = material_entry['entry_id']
            entry = entries_dict.get(entry_id)
            if entry is None:
                # The entry was not changed.
                continue
1034
1035
1036
1037
            else:
                # Update the material, there might be slight changes even if it is made
                # from entry properties that are "material defining", e.g. changed external
                # material quantities like new AFLOW prototypes
1038
1039
1040
1041
                try:
                    material_doc.update(**material_type.create_index_doc(entry.results.material))
                except Exception as e:
                    logger.error('could not create material index doc', exc_info=e)
1042

1043
1044
            new_material_id = get_material_id(entry)
            if new_material_id != material_id:
1045
1046
1047
1048
1049
                # Remove the entry, it moved to another material. But the material cannot
                # run empty, because another entry had this material id.
                material_entries_to_remove.append(index)
            else:
                # Update the entry.
1050
1051
1052
1053
                try:
                    material_entries[index] = material_entry_type.create_index_doc(entry)
                except Exception as e:
                    logger.error('could not create material index doc', exc_info=e)
1054
1055
1056
1057
                remaining_entry_ids.remove(entry_id)
        for index in reversed(material_entries_to_remove):
            del(material_entries[index])

1058
1059
        add_action_or_doc(dict(index=dict(_id=material_id)))
        add_action_or_doc(material_doc)
1060
        material_docs.append(material_doc)
1061
1062
1063

    for entry_id in remaining_entry_ids:
        entry = entries_dict.get(entry_id)
1064
        material_id = get_material_id(entry)
1065
1066
1067
1068
        if material_id is not None:
            material_doc = material_docs_dict.get(material_id)
            if material_doc is None:
                # The material does not yet exist. Create it.
1069
1070
1071
1072
                try:
                    material_doc = material_type.create_index_doc(entry.results.material)
                except Exception as e:
                    logger.error('could not create material index doc', exc_info=e)
1073
                material_docs_dict[material_id] = material_doc
1074
1075
                add_action_or_doc(dict(create=dict(_id=material_id)))
                add_action_or_doc(material_doc)
1076
                material_docs.append(material_doc)
1077
            # The material does exist (now), but the entry is new.
1078
1079
1080
1081
            try:
                material_doc.setdefault('entries', []).append(material_entry_type.create_index_doc(entry))
            except Exception as e:
                logger.error('could not create material entry index doc', exc_info=e)
1082
1083
1084

    # Second, we go through the old materials. The following cases need to be covered:
    # - the old materials are empty (standard case)
1085
    # - an entry needs to be removed but the material still has entries (new material id case 1)
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
    # - an entry needs to be removed and the material is now "empty" (new material id case 2)
    for material_doc in old_material_docs:
        material_id = material_doc['material_id']
        material_entries = material_doc['entries']
        material_entries_to_remove = []
        for index, material_entry in enumerate(material_entries):
            entry_id = material_entry['entry_id']
            if entry_id in entry_ids:
                # The entry does not belong to this material anymore and needs to be removed.
                material_entries_to_remove.append(index)
        for index in reversed(material_entries_to_remove):
            del(material_entries[index])
        if len(material_entries) == 0:
            # The material is empty now and needs to be removed.
1100
            add_action_or_doc(dict(delete=dict(_id=material_id)))
1101
1102
        else:
            # The material needs to be updated
1103
1104
            add_action_or_doc(dict(index=dict(_id=material_id)))
            add_action_or_doc(material_doc)
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
            material_docs.append(material_doc)

    # Third, we potentially cap the number of entries in a material. We ensure that only
    # a certain amounts of entries are stored with all metadata. The rest will only
    # have their entry id.
    all_n_entries_capped = 0
    all_n_entries = 0
    for material_doc in material_docs:
        material_entries = material_doc.get('entries', [])
        material_doc['n_entries'] = len(material_entries)
        if len(material_entries) > config.elastic.entries_per_material_cap:
            material_doc['entries'] = material_entries[0:config.elastic.entries_per_material_cap]

        all_n_entries_capped += len(material_entries)
        all_n_entries += material_doc['n_entries']
1120
1121

    # Execute the created actions in bulk.
1122
    timer_kwargs: Dict[str, Any] = {}
1123
1124
    try:
        import json
1125
1126
        timer_kwargs['size'] = len(json.dumps(_actions_and_docs_bulks))
        timer_kwargs['n_actions'] = sum([len(bulk) for bulk in _actions_and_docs_bulks])
1127
1128
        timer_kwargs['n_entries'] = all_n_entries
        timer_kwargs['n_entries_capped'] = all_n_entries_capped
1129
1130
1131
    except Exception:
        pass

1132
1133
1134
1135
1136
    with utils.timer(
        logger, 'perform bulk index of materials',
        lnr_event='failed to bulk index materials',
        **timer_kwargs
    ):
1137
1138
1139
1140
1141
        for bulk in _actions_and_docs_bulks:
            material_index.bulk(
                body=bulk, refresh=False,
                timeout=f'{config.elastic.bulk_timeout}s',
                request_timeout=config.elastic.bulk_timeout)
1142
1143
1144

    if refresh:
        entry_index.refresh()
1145
        material_index.refresh()