Commit 2af64e8f authored by Lauri Himanen's avatar Lauri Himanen
Browse files

Added new options for more targeted materials indexing, small enc GUI update.

parent c118d872
Pipeline #85314 passed with stages
in 34 minutes and 41 seconds
Subproject commit 716280b014a422f809919fed17fdbb88b0704e0f
Subproject commit 88f453c827f0ec5c626c33ed11be977bd084ff07
......@@ -231,11 +231,13 @@ def index(threads, dry):
@admin.command()
@click.option('--threads', type=int, default=1, help='Number of threads to use.')
@click.option('--code', multiple=True, type=str, help='Index only calculcations of given codes.')
@click.option('--dry', is_flag=True, help='Do not index, just compute entries.')
@click.option('--in-place', is_flag=True, default=False, help='Perform indexing in the current elastic search index. Meant only for small reindex operations.')
@click.option('-n', type=int, default=None, help='Number of calculations to process. Leave undefined to process all calculations.')
@click.option('--source',
type=click.Choice(['mongo', 'es'], case_sensitive=True))
def index_materials(threads, n, dry, source):
def index_materials(threads, code, dry, in_place, n, source):
"""(Re-)index all materials.
This command will completely rebuild the materials index. The index is
......@@ -249,15 +251,18 @@ def index_materials(threads, n, dry, source):
# In order to do the reindexing with zero downtime, two different indices
# are rotated and an alias is used
old_index_name = list(client.indices.get(config.elastic.materials_index_name).keys())[0]
if old_index_name == config.elastic.materials_index_name + "_a":
new_index_name = config.elastic.materials_index_name + "_b"
elif old_index_name == config.elastic.materials_index_name + "_b":
new_index_name = config.elastic.materials_index_name + "_a"
if in_place:
target_index_name = old_index_name
else:
raise ValueError(
"Unrecognized index name accociated with the alias {}"
.format(config.elastic.materials_index_name)
)
if old_index_name == config.elastic.materials_index_name + "_a":
target_index_name = config.elastic.materials_index_name + "_b"
elif old_index_name == config.elastic.materials_index_name + "_b":
target_index_name = config.elastic.materials_index_name + "_a"
else:
raise ValueError(
"Unrecognized index name accociated with the alias {}"
.format(config.elastic.materials_index_name)
)
if source == "mongo":
all_calcs = proc.Calc.objects().count()
......@@ -364,7 +369,7 @@ def index_materials(threads, n, dry, source):
# nested subdocument
entry = {}
entry['_op_type'] = 'update'
entry['_index'] = new_index_name
entry['_index'] = target_index_name
entry['_id'] = material.material_id
entry['_type'] = 'doc'
entry['_source'] = {
......@@ -380,9 +385,12 @@ def index_materials(threads, n, dry, source):
yield entry
elif source == "es":
s = elasticsearch_dsl.Search(index=config.elastic.index_name)
filters = [elasticsearch_dsl.Q("term", encyclopedia__status="success")]
if code:
filters.append(elasticsearch_dsl.Q("terms", dft__code_name=code))
query = elasticsearch_dsl.Q(
"bool",
filter=[elasticsearch_dsl.Q("term", encyclopedia__status="success")],
filter=filters,
)
s = s.query(query)
s = s.extra(**{
......@@ -395,10 +403,12 @@ def index_materials(threads, n, dry, source):
with utils.ETA(all_calcs, ' index %10d of %10d calcs, ETA %s', chunk_size) as eta:
s = elasticsearch_dsl.Search(index=config.elastic.index_name)
filters = [elasticsearch_dsl.Q("term", encyclopedia__status="success")]
if code:
filters.append(elasticsearch_dsl.Q("terms", dft__code_name=code))
query = elasticsearch_dsl.Q(
"bool",
filter=[elasticsearch_dsl.Q("term", encyclopedia__status="success")],
filter=filters,
)
s = s.query(query)
s = s.extra(**{
......@@ -520,19 +530,22 @@ def index_materials(threads, n, dry, source):
# the nested subdocument
entry = {}
entry['_op_type'] = 'update'
entry['_index'] = new_index_name
entry['_index'] = target_index_name
entry['_id'] = material.material_id
entry['_type'] = 'doc'
entry['_source'] = {
"upsert": material.m_to_dict(include_defaults=False, partial="es"),
"doc_as_upsert": False,
"script": {
"source": "ctx._source.calculations.add(params.calc)",
"params": {
"calc": calc.m_to_dict(include_defaults=False, partial="es")
},
}
}
if in_place:
entry['_source']["script"]["source"] = "ctx._source.calculations.removeIf(x -> x.calc_id == params.calc.calc_id); ctx._source.calculations.add(params.calc)"
else:
entry['_source']["script"]["source"] = "ctx._source.calculations.add(params.calc)"
yield entry
if dry:
......@@ -541,7 +554,7 @@ def index_materials(threads, n, dry, source):
else:
# Create new index into which the data will be inserted. The old index will
# keep working while the new index is being built
material_document.init(index=new_index_name)
material_document.init(index=target_index_name)
if threads > 1:
print(' use %d threads' % threads)
......@@ -556,10 +569,11 @@ def index_materials(threads, n, dry, source):
# Changes materials index alias to point to the new index and remove the
# old index.
new_index = elasticsearch_dsl.Index(new_index_name)
new_index.put_alias(name=config.elastic.materials_index_name)
old_index = elasticsearch_dsl.Index(old_index_name)
old_index.delete()
if not in_place:
new_index = elasticsearch_dsl.Index(target_index_name)
new_index.put_alias(name=config.elastic.materials_index_name)
old_index = elasticsearch_dsl.Index(old_index_name)
old_index.delete()
print('')
print('indexing completed')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment