diff --git a/nomad/config.py b/nomad/config.py index b1a27638478a91ac3e4fb597b81a9d8e053bb11f..38341c1606391bdaa1f0bb8e64d8a257b2bf6ef2 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -136,7 +136,7 @@ elastic = NomadConfig( host='localhost', port=9200, timeout=60, - bulk_timeout='10m', + bulk_timeout=600, bulk_size=1000, entries_per_material_cap=1000, entries_index='nomad_entries_v1', diff --git a/nomad/metainfo/elasticsearch_extension.py b/nomad/metainfo/elasticsearch_extension.py index 45d1f793b7e87d8f92bc23a3d5f5ad007ace17b3..2f07b955dc1d9a7eec1fa33573ec515313fe7c8c 100644 --- a/nomad/metainfo/elasticsearch_extension.py +++ b/nomad/metainfo/elasticsearch_extension.py @@ -910,7 +910,10 @@ def index_entries(entries: List, refresh: bool = False): lnr_event='failed to bulk index entries', **timer_kwargs ): - entry_index.bulk(body=actions_and_docs, refresh=refresh, timeout=config.elastic.bulk_timeout) + entry_index.bulk( + body=actions_and_docs, refresh=refresh, + timeout=f'{config.elastic.bulk_timeout}s', + request_timeout=config.elastic.bulk_timeout) def update_materials(entries: List, refresh: bool = False): @@ -949,9 +952,11 @@ def update_materials(entries: List, refresh: bool = False): # or updated). with utils.timer(logger, 'get existing materials', lnr_event='failed to get existing materials'): if material_ids: - elasticsearch_results = material_index.mget(body={ - 'docs': [dict(_id=material_id) for material_id in material_ids] - }) + elasticsearch_results = material_index.mget( + body={ + 'docs': [dict(_id=material_id) for material_id in material_ids] + }, + request_timeout=config.elastic.bulk_timeout) existing_material_docs = [ doc['_source'] for doc in elasticsearch_results['docs'] if '_source' in doc] else: @@ -993,7 +998,22 @@ def update_materials(entries: List, refresh: bool = False): # - there is an entry that moves from one existing material to another (super rare # case where an entry's material id changed within the set of other entries' material ids) # This n + m complexity with n=number of materials and m=number of entries - actions_and_docs = [] + + # We create lists of bulk operations. Each list only contains enough materials to + # have the ammount of entries in all these materials roughly match the desired bulk size. + # Using materials as a measure might not be good enough, if a single material has + # lots of nested entries. + _actions_and_docs_bulks: List[List[Any]] = [] + _n_entries_in_bulk = [0] + + def add_action_or_doc(action_or_doc): + if len(_actions_and_docs_bulks) == 0 or _n_entries_in_bulk[0] > config.elastic.bulk_size: + _n_entries_in_bulk[0] = 0 + _actions_and_docs_bulks.append([]) + _actions_and_docs_bulks[-1].append(action_or_doc) + if 'entries' in action_or_doc: + _n_entries_in_bulk[0] = _n_entries_in_bulk[0] + len(action_or_doc['entries']) + material_docs = [] material_docs_dict = {} remaining_entry_ids = set(entry_ids) @@ -1026,8 +1046,8 @@ def update_materials(entries: List, refresh: bool = False): for index in reversed(material_entries_to_remove): del(material_entries[index]) - actions_and_docs.append(dict(index=dict(_id=material_id))) - actions_and_docs.append(material_doc) + add_action_or_doc(dict(index=dict(_id=material_id))) + add_action_or_doc(material_doc) material_docs.append(material_doc) for entry_id in remaining_entry_ids: @@ -1039,8 +1059,8 @@ def update_materials(entries: List, refresh: bool = False): # The material does not yet exist. Create it. material_doc = material_type.create_index_doc(entry.results.material) material_docs_dict[material_id] = material_doc - actions_and_docs.append(dict(create=dict(_id=material_id))) - actions_and_docs.append(material_doc) + add_action_or_doc(dict(create=dict(_id=material_id))) + add_action_or_doc(material_doc) material_docs.append(material_doc) # The material does exist (now), but the entry is new. material_doc.setdefault('entries', []).append(material_entry_type.create_index_doc(entry)) @@ -1062,11 +1082,11 @@ def update_materials(entries: List, refresh: bool = False): del(material_entries[index]) if len(material_entries) == 0: # The material is empty now and needs to be removed. - actions_and_docs.append(dict(delete=dict(_id=material_id))) + add_action_or_doc(dict(delete=dict(_id=material_id))) else: # The material needs to be updated - actions_and_docs.append(dict(index=dict(_id=material_id))) - actions_and_docs.append(material_doc) + add_action_or_doc(dict(index=dict(_id=material_id))) + add_action_or_doc(material_doc) material_docs.append(material_doc) # Third, we potentially cap the number of entries in a material. We ensure that only @@ -1087,8 +1107,8 @@ def update_materials(entries: List, refresh: bool = False): timer_kwargs = {} try: import json - timer_kwargs['size'] = len(json.dumps(actions_and_docs)) - timer_kwargs['n_actions'] = len(actions_and_docs) + timer_kwargs['size'] = len(json.dumps(_actions_and_docs_bulks)) + timer_kwargs['n_actions'] = sum([len(bulk) for bulk in _actions_and_docs_bulks]) timer_kwargs['n_entries'] = all_n_entries timer_kwargs['n_entries_capped'] = all_n_entries_capped except Exception: @@ -1099,8 +1119,11 @@ def update_materials(entries: List, refresh: bool = False): lnr_event='failed to bulk index materials', **timer_kwargs ): - if len(actions_and_docs) > 0: - material_index.bulk(body=actions_and_docs, refresh=False, timeout=config.elastic.bulk_timeout) + for bulk in _actions_and_docs_bulks: + material_index.bulk( + body=bulk, refresh=False, + timeout=f'{config.elastic.bulk_timeout}s', + request_timeout=config.elastic.bulk_timeout) if refresh: entry_index.refresh() diff --git a/ops/helm/nomad/templates/nomad-configmap.yml b/ops/helm/nomad/templates/nomad-configmap.yml index fb5f040ff0438696ce533895b9826b36649d393f..5bd0b248ec15a35a07f014c2949277f81d12f16c 100644 --- a/ops/helm/nomad/templates/nomad-configmap.yml +++ b/ops/helm/nomad/templates/nomad-configmap.yml @@ -61,8 +61,8 @@ data: host: "{{ .Values.elastic.host }}" port: {{ .Values.elastic.port }} timeout: {{ .Values.elastic.timeout }} - bulk_timeout: "{{ .Values.elastic.bulkTimeout }}" - bulk_size=1000: {{ .Values.elastic.bulkSize }} + bulk_timeout: {{ .Values.elastic.bulkTimeout }} + bulk_size: {{ .Values.elastic.bulkSize }} entries_per_material_cap: {{ .Values.elastic.entriesPerMaterialCap }} entries_index: "{{ .Values.dbname }}_entries_v1" materials_index: "{{ .Values.dbname }}_materials_v1" diff --git a/ops/helm/nomad/values.yaml b/ops/helm/nomad/values.yaml index 0f000210d784d03a00ca435238b4e9776cd4f79c..09410953c63f7f588be5cc9ccc0c621ee94bcd4e 100644 --- a/ops/helm/nomad/values.yaml +++ b/ops/helm/nomad/values.yaml @@ -128,9 +128,9 @@ elastic: host: nomad-flink-01.esc port: 9200 timeout: 60 - bulkTimeout: '10m' - bulk_size: 1000 - entries_per_material_cap: 1000 + bulkTimeout: 600 + bulkSize: 1000 + entriesPerMaterialCap: 1000 logstash: enabled: true