Commit 0e362767 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Optimize bulk operations for entries and materials indexing. #654

parent 0b45651f
......@@ -136,7 +136,7 @@ elastic = NomadConfig(
host='localhost',
port=9200,
timeout=60,
bulk_timeout='10m',
bulk_timeout=600,
bulk_size=1000,
entries_per_material_cap=1000,
entries_index='nomad_entries_v1',
......
......@@ -910,7 +910,10 @@ def index_entries(entries: List, refresh: bool = False):
lnr_event='failed to bulk index entries',
**timer_kwargs
):
entry_index.bulk(body=actions_and_docs, refresh=refresh, timeout=config.elastic.bulk_timeout)
entry_index.bulk(
body=actions_and_docs, refresh=refresh,
timeout=f'{config.elastic.bulk_timeout}s',
request_timeout=config.elastic.bulk_timeout)
def update_materials(entries: List, refresh: bool = False):
......@@ -949,9 +952,11 @@ def update_materials(entries: List, refresh: bool = False):
# or updated).
with utils.timer(logger, 'get existing materials', lnr_event='failed to get existing materials'):
if material_ids:
elasticsearch_results = material_index.mget(body={
'docs': [dict(_id=material_id) for material_id in material_ids]
})
elasticsearch_results = material_index.mget(
body={
'docs': [dict(_id=material_id) for material_id in material_ids]
},
request_timeout=config.elastic.bulk_timeout)
existing_material_docs = [
doc['_source'] for doc in elasticsearch_results['docs'] if '_source' in doc]
else:
......@@ -993,7 +998,22 @@ def update_materials(entries: List, refresh: bool = False):
# - there is an entry that moves from one existing material to another (super rare
# case where an entry's material id changed within the set of other entries' material ids)
# This n + m complexity with n=number of materials and m=number of entries
actions_and_docs = []
# We create lists of bulk operations. Each list only contains enough materials to
# have the ammount of entries in all these materials roughly match the desired bulk size.
# Using materials as a measure might not be good enough, if a single material has
# lots of nested entries.
_actions_and_docs_bulks: List[List[Any]] = []
_n_entries_in_bulk = [0]
def add_action_or_doc(action_or_doc):
if len(_actions_and_docs_bulks) == 0 or _n_entries_in_bulk[0] > config.elastic.bulk_size:
_n_entries_in_bulk[0] = 0
_actions_and_docs_bulks.append([])
_actions_and_docs_bulks[-1].append(action_or_doc)
if 'entries' in action_or_doc:
_n_entries_in_bulk[0] = _n_entries_in_bulk[0] + len(action_or_doc['entries'])
material_docs = []
material_docs_dict = {}
remaining_entry_ids = set(entry_ids)
......@@ -1026,8 +1046,8 @@ def update_materials(entries: List, refresh: bool = False):
for index in reversed(material_entries_to_remove):
del(material_entries[index])
actions_and_docs.append(dict(index=dict(_id=material_id)))
actions_and_docs.append(material_doc)
add_action_or_doc(dict(index=dict(_id=material_id)))
add_action_or_doc(material_doc)
material_docs.append(material_doc)
for entry_id in remaining_entry_ids:
......@@ -1039,8 +1059,8 @@ def update_materials(entries: List, refresh: bool = False):
# The material does not yet exist. Create it.
material_doc = material_type.create_index_doc(entry.results.material)
material_docs_dict[material_id] = material_doc
actions_and_docs.append(dict(create=dict(_id=material_id)))
actions_and_docs.append(material_doc)
add_action_or_doc(dict(create=dict(_id=material_id)))
add_action_or_doc(material_doc)
material_docs.append(material_doc)
# The material does exist (now), but the entry is new.
material_doc.setdefault('entries', []).append(material_entry_type.create_index_doc(entry))
......@@ -1062,11 +1082,11 @@ def update_materials(entries: List, refresh: bool = False):
del(material_entries[index])
if len(material_entries) == 0:
# The material is empty now and needs to be removed.
actions_and_docs.append(dict(delete=dict(_id=material_id)))
add_action_or_doc(dict(delete=dict(_id=material_id)))
else:
# The material needs to be updated
actions_and_docs.append(dict(index=dict(_id=material_id)))
actions_and_docs.append(material_doc)
add_action_or_doc(dict(index=dict(_id=material_id)))
add_action_or_doc(material_doc)
material_docs.append(material_doc)
# Third, we potentially cap the number of entries in a material. We ensure that only
......@@ -1087,8 +1107,8 @@ def update_materials(entries: List, refresh: bool = False):
timer_kwargs = {}
try:
import json
timer_kwargs['size'] = len(json.dumps(actions_and_docs))
timer_kwargs['n_actions'] = len(actions_and_docs)
timer_kwargs['size'] = len(json.dumps(_actions_and_docs_bulks))
timer_kwargs['n_actions'] = sum([len(bulk) for bulk in _actions_and_docs_bulks])
timer_kwargs['n_entries'] = all_n_entries
timer_kwargs['n_entries_capped'] = all_n_entries_capped
except Exception:
......@@ -1099,8 +1119,11 @@ def update_materials(entries: List, refresh: bool = False):
lnr_event='failed to bulk index materials',
**timer_kwargs
):
if len(actions_and_docs) > 0:
material_index.bulk(body=actions_and_docs, refresh=False, timeout=config.elastic.bulk_timeout)
for bulk in _actions_and_docs_bulks:
material_index.bulk(
body=bulk, refresh=False,
timeout=f'{config.elastic.bulk_timeout}s',
request_timeout=config.elastic.bulk_timeout)
if refresh:
entry_index.refresh()
......
......@@ -61,8 +61,8 @@ data:
host: "{{ .Values.elastic.host }}"
port: {{ .Values.elastic.port }}
timeout: {{ .Values.elastic.timeout }}
bulk_timeout: "{{ .Values.elastic.bulkTimeout }}"
bulk_size=1000: {{ .Values.elastic.bulkSize }}
bulk_timeout: {{ .Values.elastic.bulkTimeout }}
bulk_size: {{ .Values.elastic.bulkSize }}
entries_per_material_cap: {{ .Values.elastic.entriesPerMaterialCap }}
entries_index: "{{ .Values.dbname }}_entries_v1"
materials_index: "{{ .Values.dbname }}_materials_v1"
......
......@@ -128,9 +128,9 @@ elastic:
host: nomad-flink-01.esc
port: 9200
timeout: 60
bulkTimeout: '10m'
bulk_size: 1000
entries_per_material_cap: 1000
bulkTimeout: 600
bulkSize: 1000
entriesPerMaterialCap: 1000
logstash:
enabled: true
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment