Commit fc32f513 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added index transformation to fix optimade entries. #450 #461

parent c52c39c5
......@@ -192,44 +192,6 @@ def lift_embargo(dry, parallel):
__run_processing(uploads_to_repack, parallel, lambda upload: upload.re_pack(), 're-packing')
@admin.command(help='(Re-)index all calcs.')
@click.option('--threads', type=int, default=1, help='Number of threads to use.')
@click.option('--dry', is_flag=True, help='Do not index, just compute entries.')
def index(threads, dry):
all_calcs = proc.Calc.objects().count()
print('indexing %d ...' % all_calcs)
def elastic_updates():
with utils.ETA(all_calcs, ' index %10d or %10d calcs, ETA %s') as eta:
for calc in proc.Calc.objects():
entry_metadata = datamodel.EntryMetadata.m_from_dict(calc.metadata)
entry = entry_metadata.a_elastic.create_index_entry().to_dict(include_meta=True)
entry['_op_type'] = 'index'
yield entry
if dry:
for _ in elastic_updates():
if threads > 1:
print(' use %d threads' % threads)
for _ in elasticsearch.helpers.parallel_bulk(
infrastructure.elastic_client, elastic_updates(), chunk_size=500,
infrastructure.elastic_client, elastic_updates())
print('indexing completed')
@click.option('--threads', type=int, default=1, help='Number of threads to use.')
@click.option('--code', multiple=True, type=str, help='Index only calculcations of given codes.')
......@@ -247,19 +247,32 @@ def reset(ctx, uploads, with_calcs):
@uploads.command(help='(Re-)index all calcs of the given uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--transformer', help='Qualified name to a Python function that should be applied to each EntryMetadata.')
def index(ctx, uploads, parallel):
def index(ctx, uploads, parallel, transformer):
transformer_func = None
if transformer is not None:
import importlib
module_name, func_name = transformer.rsplit('.', 1)
module = importlib.import_module(module_name)
transformer_func = getattr(module, func_name)
_, uploads = query_uploads(ctx, uploads)
def transform(calcs):
for calc in calcs:
calc = transformer_func(calc)
except Exception as e:
import traceback
print(f' ERROR failed to transform calc (stop transforming for upload): {str(e)}')
def index_upload(upload, logger):
with upload.entries_metadata() as calcs:
# This is just a temporary fix to update the group hash without re-processing
for calc in calcs:
if calc.dft is not None:
except Exception:
if transformer is not None:
failed = search.index_all(calcs)
if failed > 0:
print(' WARNING failed to index %d entries' % failed)
......@@ -33,6 +33,30 @@ from nomad.datamodel.metainfo.public import section_system
species_re = re.compile(r'^([A-Z][a-z]?)(\d*)$')
def transform_to_v1(entry: EntryMetadata) -> EntryMetadata:
Transformation function to use during re-indexing of entries with outdated optimade
format. Fixes formulas and periodic dimensions, removed entries with X in formula.
optimade = entry.dft.optimade if entry.dft is not None else None
if optimade is None:
return entry
if 'X' in optimade.chemical_formula_reduced:
entry.dft.m_remove_sub_section(DFTMetadata.optimade, -1)
return entry
optimade.chemical_formula_reduced = optimade_chemical_formula_reduced(optimade.chemical_formula_reduced)
optimade.chemical_formula_anonymous = optimade_chemical_formula_anonymous(optimade.chemical_formula_reduced)
optimade.chemical_formula_hill = optimade_chemical_formula_hill(optimade.chemical_formula_hill)
optimade.chemical_formula_descriptive = optimade.chemical_formula_hill
dimension_types = optimade.dimension_types
if isinstance(dimension_types, int):
optimade.dimension_types = [1] * dimension_types + [0] * (3 - dimension_types)
return entry
def optimade_chemical_formula_reduced(formula: str):
if formula is None:
return formula
......@@ -117,21 +117,6 @@ class TestAdmin:
with files.UploadFiles.get(upload_id=upload_id).read_archive(calc_id=calc.calc_id) as archive:
assert calc.calc_id in archive
def test_index(self, published):
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
calc.metadata['comment'] = 'specific'
assert search.SearchRequest().search_parameter('comment', 'specific').execute()['total'] == 0
result = click.testing.CliRunner().invoke(
cli, ['admin', 'index', '--threads', '2'], catch_exceptions=False)
assert result.exit_code == 0
assert 'index' in result.stdout
assert search.SearchRequest().search_parameter('comment', 'specific').execute()['total'] == 1
def test_delete_entry(self, published):
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
......@@ -145,6 +130,11 @@ class TestAdmin:
assert Calc.objects(calc_id=calc.calc_id).first() is None
def transform_for_index_test(calc):
calc.comment = 'specific'
return calc
@pytest.mark.usefixtures('reset_config', 'no_warn')
class TestAdminUploads:
......@@ -236,6 +226,21 @@ class TestAdminUploads:
assert search.SearchRequest().search_parameters(comment='specific').execute()['total'] == 1
def test_index_with_transform(self, published):
upload_id = published.upload_id
assert search.SearchRequest().search_parameters(comment='specific').execute()['total'] == 0
result = click.testing.CliRunner().invoke(
cli, [
'admin', 'uploads', 'index',
'--transformer', 'tests.test_cli.transform_for_index_test',
assert result.exit_code == 0
assert 'index' in result.stdout
assert search.SearchRequest().search_parameters(comment='specific').execute()['total'] == 1
def test_re_process(self, published, monkeypatch):
monkeypatch.setattr('nomad.config.meta.version', 'test_version')
upload_id = published.upload_id
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment