Commit 62e0ed4d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'materials' into 'v0.9.3'

Materials index and new Encyclopedia API

See merge request !199
parents d341a647 9013fc84
Pipeline #85332 passed with stages
in 26 minutes and 28 seconds
Subproject commit bdffb455f21577435477daa9c871205e6c118efd
Subproject commit f19b84de11b1004548e8e5a324df78bb356e7896
Subproject commit 59ecd6cddb2bbf75000a1454fe40ceace2d2c207
Subproject commit bc9be2124405b87987e04d286e385f23b7b5712f
This diff is collapsed.
......@@ -26,7 +26,6 @@ import elasticsearch.helpers
from datetime import datetime
from nomad import search, utils, datamodel, processing as proc, infrastructure, files
from nomad.metainfo import search_extension
from nomad.datamodel import Dataset, User, EditableUserMetadata
from nomad.app import common
from nomad.app.common import RFC3339DateTime, DotKeyNested
......@@ -88,13 +87,13 @@ _search_request_parser.add_argument(
_search_request_parser.add_argument(
'metrics', type=str, action='append', help=(
'Metrics to aggregate over all quantities and their values as comma separated list. '
'Possible values are %s.' % ', '.join(search_extension.metrics.keys())))
'Possible values are %s.' % ', '.join(search.metrics.keys())))
_search_request_parser.add_argument(
'statistics', type=str, action='append', help=(
'Quantities for which to aggregate values and their metrics.'))
_search_request_parser.add_argument(
'exclude', type=str, action='split', help='Excludes the given keys in the returned data.')
for group_name in search_extension.groups:
for group_name in search.groups:
_search_request_parser.add_argument(
group_name, type=bool, help=('Return %s group data.' % group_name))
_search_request_parser.add_argument(
......@@ -106,15 +105,15 @@ _repo_calcs_model_fields = {
'A dict with all statistics. Each statistic is dictionary with a metrics dict as '
'value and quantity value as key. The possible metrics are code runs(calcs), %s. '
'There is a pseudo quantity "total" with a single value "all" that contains the '
' metrics over all results. ' % ', '.join(search_extension.metrics.keys())))}
' metrics over all results. ' % ', '.join(search.metrics.keys())))}
for group_name in search_extension.groups:
for group_name in search.groups:
_repo_calcs_model_fields[group_name] = (DotKeyNested if '.' in group_name else fields.Nested)(api.model('RepoGroup', {
'after': fields.String(description='The after value that can be used to retrieve the next %s.' % group_name),
'values': fields.Raw(description='A dict with %s as key. The values are dicts with "total" and "examples" keys.' % group_name)
}), skip_none=True)
for qualified_name, quantity in search_extension.search_quantities.items():
for qualified_name, quantity in search.search_quantities.items():
_repo_calcs_model_fields[qualified_name] = fields.Raw(
description=quantity.description, allow_null=True, skip_none=True)
......@@ -123,7 +122,7 @@ _repo_calcs_model_fields.update(**{
'interval': fields.String(description='Interval to use for upload time aggregation.', allow_null=True, skip_none=True),
'metrics': fields.List(fields.String, description=(
'Metrics to aggregate over all quantities and their values as comma separated list. '
'Possible values are %s.' % ', '.join(search_extension.metrics.keys())), allow_null=True, skip_none=True),
'Possible values are %s.' % ', '.join(search.metrics.keys())), allow_null=True, skip_none=True),
'statistics_required': fields.List(fields.String, description='Quantities for which to aggregate values and their metrics.', allow_null=True, skip_none=True),
'exclude': fields.List(fields.String, description='Excludes the given keys in the returned data.', allow_null=True, skip_none=True)
})
......@@ -192,7 +191,7 @@ class RepoCalcsResource(Resource):
abort(400, message='bad parameters: %s' % str(e))
for metric in metrics:
if metric not in search_extension.metrics:
if metric not in search.metrics:
abort(400, message='there is no metric %s' % metric)
search_request = search.SearchRequest()
......@@ -214,7 +213,7 @@ class RepoCalcsResource(Resource):
group_metrics = [
group_quantity.metric_name
for group_name, group_quantity in search_extension.groups.items()
for group_name, group_quantity in search.groups.items()
if args.get(group_name, False)]
total_metrics = metrics + group_metrics
if len(total_metrics) > 0:
......@@ -230,7 +229,7 @@ class RepoCalcsResource(Resource):
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
else:
for group_name, group_quantity in search_extension.groups.items():
for group_name, group_quantity in search.groups.items():
if args.get(group_name, False):
kwargs: Dict[str, Any] = {}
if group_name == 'uploads_grouped':
......@@ -252,7 +251,7 @@ class RepoCalcsResource(Resource):
if 'quantities' in results:
quantities = results.pop('quantities')
for group_name, group_quantity in search_extension.groups.items():
for group_name, group_quantity in search.groups.items():
if args.get(group_name, False):
results[group_name] = quantities[group_quantity.qualified_name]
......@@ -335,7 +334,7 @@ class RepoCalcsResource(Resource):
abort(400, message='bad parameters: %s' % str(e))
for metric in metrics:
if metric not in search_extension.metrics:
if metric not in search.metrics:
abort(400, message='there is no metric %s' % metric)
search_request = search.SearchRequest()
......@@ -363,7 +362,7 @@ class RepoCalcsResource(Resource):
group_metrics = [
group_quantity.metric_name
for group_name, group_quantity in search_extension.groups.items()
for group_name, group_quantity in search.groups.items()
if group_name in data_in]
total_metrics = metrics + group_metrics
if len(total_metrics) > 0:
......@@ -379,7 +378,7 @@ class RepoCalcsResource(Resource):
results = search_request.execute_scrolled(scroll_id=scroll_id, size=per_page)
else:
for group_name, group_quantity in search_extension.groups.items():
for group_name, group_quantity in search.groups.items():
if group_name in data_in:
kwargs: Dict[str, Any] = {}
if group_name == 'uploads_grouped':
......@@ -401,7 +400,7 @@ class RepoCalcsResource(Resource):
if 'quantities' in results:
quantities = results.pop('quantities')
for group_name, group_quantity in search_extension.groups.items():
for group_name, group_quantity in search.groups.items():
if group_name in data_in:
results[group_name] = quantities[group_quantity.qualified_name]
......@@ -865,7 +864,7 @@ _repo_quantities_search_request_parser.add_argument(
_repo_quantities_model = api.model('RepoQuantitiesResponse', {
'quantities': fields.Nested(api.model('RepoQuantities', {
quantity: fields.List(fields.Nested(_repo_quantity_model))
for quantity in search_extension.search_quantities
for quantity in search.search_quantities
}))
})
......
......@@ -21,6 +21,10 @@ import threading
from nomad import processing as proc, search, datamodel, infrastructure, utils, config
from nomad.cli.cli import cli
from nomad.datamodel.material import Material, Calculation
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
from nomad.search import material_document
from nomad.datamodel.material import Material, Calculation, Method, Properties, IdealizedStructure, Energies, Workflow, Bulk
def __run_parallel(
......@@ -209,16 +213,367 @@ def index(threads, dry):
if dry:
for _ in elastic_updates():
pass
if threads > 1:
print(' use %d threads' % threads)
for _ in elasticsearch.helpers.parallel_bulk(
infrastructure.elastic_client, elastic_updates(), chunk_size=500,
thread_count=threads):
else:
if threads > 1:
print(' use %d threads' % threads)
for _ in elasticsearch.helpers.parallel_bulk(
infrastructure.elastic_client, elastic_updates(), chunk_size=500,
thread_count=threads):
pass
else:
elasticsearch.helpers.bulk(
infrastructure.elastic_client, elastic_updates())
search.refresh()
print('')
print('indexing completed')
@admin.command()
@click.option('--threads', type=int, default=1, help='Number of threads to use.')
@click.option('--code', multiple=True, type=str, help='Index only calculcations of given codes.')
@click.option('--dry', is_flag=True, help='Do not index, just compute entries.')
@click.option('--in-place', is_flag=True, default=False, help='Perform indexing in the current elastic search index. Meant only for small reindex operations.')
@click.option('-n', type=int, default=None, help='Number of calculations to process. Leave undefined to process all calculations.')
@click.option('--source',
type=click.Choice(['mongo', 'es'], case_sensitive=True))
def index_materials(threads, code, dry, in_place, n, source):
"""(Re-)index all materials.
This command will completely rebuild the materials index. The index is
built from the material metainfo stored in MongoDB. The materials index can
be used normally during the reindexing.
"""
chunk_size = 500
infrastructure.setup_mongo()
client = infrastructure.setup_elastic()
# In order to do the reindexing with zero downtime, two different indices
# are rotated and an alias is used
old_index_name = list(client.indices.get(config.elastic.materials_index_name).keys())[0]
if in_place:
target_index_name = old_index_name
else:
if old_index_name == config.elastic.materials_index_name + "_a":
target_index_name = config.elastic.materials_index_name + "_b"
elif old_index_name == config.elastic.materials_index_name + "_b":
target_index_name = config.elastic.materials_index_name + "_a"
else:
raise ValueError(
"Unrecognized index name accociated with the alias {}"
.format(config.elastic.materials_index_name)
)
if source == "mongo":
all_calcs = proc.Calc.objects().count()
print('indexing materials from %d calculations ...' % all_calcs)
# Bulk update
def elastic_updates():
with utils.ETA(all_calcs, ' index %10d of %10d calcs, ETA %s') as eta:
mongo_db = infrastructure.mongo_client[config.mongo.db_name]
mongo_collection = mongo_db['archive']
i_calc = 0
for mongo_archive in mongo_collection.find():
i_calc += 1
if n is not None:
if i_calc > n:
return
eta.add()
# Do not process entries that do not have the material
# information
try:
status = mongo_archive["section_metadata"]["encyclopedia"]["status"]
if status != EncyclopediaMetadata.status.type.success:
raise AttributeError
except (KeyError, AttributeError, IndexError):
continue
# Create material information
metadata = mongo_archive["section_metadata"]
encyclopedia = EncyclopediaMetadata.m_from_dict(metadata["encyclopedia"])
dft = metadata["dft"]
material: Material = Material()
material.material_id = encyclopedia.material.material_id
material.material_type = encyclopedia.material.material_type
material.material_name = encyclopedia.material.material_name
material.material_classification = encyclopedia.material.material_classification
material.formula = encyclopedia.material.formula
material.formula_reduced = encyclopedia.material.formula_reduced
material.species_and_counts = encyclopedia.material.species_and_counts
material.species = encyclopedia.material.species
enc_bulk = encyclopedia.material.bulk
if enc_bulk:
bulk = Bulk.m_from_dict(enc_bulk.m_to_dict())
material.m_add_sub_section(Material.bulk, bulk)
# Create calculation info for this entry
calc = Calculation()
calc.calc_id = metadata["calc_id"]
calc.upload_id = metadata["upload_id"]
mongo_calc = proc.Calc.get(calc.calc_id)
calc.published = mongo_calc["metadata"]["published"]
calc.with_embargo = mongo_calc["metadata"]["with_embargo"]
calc.owners = [mongo_calc["metadata"]["uploader"]] + mongo_calc["metadata"]["shared_with"]
enc_idealized_structure = encyclopedia.material.idealized_structure
idealized_structure = IdealizedStructure()
cell_volume = enc_idealized_structure.cell_volume
if cell_volume is not None:
idealized_structure.cell_volume = cell_volume
idealized_structure.lattice_parameters = enc_idealized_structure.lattice_parameters
calc.m_add_sub_section(Calculation.idealized_structure, idealized_structure)
enc_method = encyclopedia.method
method = Method.m_from_dict(enc_method.m_to_dict())
method.program_name = dft["code_name"]
method.program_version = dft["code_version"]
method.basis_set = dft["basis_set"]
calc.m_add_sub_section(Calculation.method, method)
enc_props = encyclopedia.properties
# Properties may not exist at all
if enc_props is not None:
properties = Properties()
# Energies may not be present in all calculations
try:
energies = Energies.m_from_dict(enc_props.energies.m_to_dict())
properties.m_add_sub_section(Properties.energies, energies)
except AttributeError:
pass
properties.has_electronic_dos = enc_props.electronic_dos is not None
properties.has_electronic_band_structure = enc_props.electronic_band_structure is not None
properties.has_thermodynamical_properties = enc_props.thermodynamical_properties is not None
atomic_density = enc_props.atomic_density
if atomic_density is not None:
properties.atomic_density = atomic_density
mass_density = enc_props.mass_density
if mass_density is not None:
properties.mass_density = mass_density
band_gap = enc_props.band_gap
if band_gap is not None:
properties.band_gap = band_gap
band_gap_direct = enc_props.band_gap_direct
if band_gap_direct is not None:
properties.band_gap_direct = band_gap_direct
calc.m_add_sub_section(Calculation.properties, properties)
workflow = Workflow()
workflow.workflow_type = encyclopedia.calculation.calculation_type
calc.m_add_sub_section(Calculation.workflow, workflow)
material.m_add_sub_section(Material.calculations, calc)
# Update entry that inserts the full material info if entry
# does not exists, otherwise only adds the calculation into the
# nested subdocument
entry = {}
entry['_op_type'] = 'update'
entry['_index'] = target_index_name
entry['_id'] = material.material_id
entry['_type'] = 'doc'
entry['_source'] = {
"upsert": material.m_to_dict(include_defaults=False, partial="es"),
"doc_as_upsert": False,
"script": {
"source": "ctx._source.calculations.add(params.calc)",
"params": {
"calc": calc.m_to_dict(include_defaults=False, partial="es")
},
}
}
yield entry
elif source == "es":
s = elasticsearch_dsl.Search(index=config.elastic.index_name)
filters = [elasticsearch_dsl.Q("term", encyclopedia__status="success")]
if code:
filters.append(elasticsearch_dsl.Q("terms", dft__code_name=code))
query = elasticsearch_dsl.Q(
"bool",
filter=filters,
)
s = s.query(query)
s = s.extra(**{
"size": 0,
})
all_calcs = s.execute().hits.total
print('indexing materials from %d calculations ...' % all_calcs)
def elastic_updates():
with utils.ETA(all_calcs, ' index %10d of %10d calcs, ETA %s', chunk_size) as eta:
s = elasticsearch_dsl.Search(index=config.elastic.index_name)
filters = [elasticsearch_dsl.Q("term", encyclopedia__status="success")]
if code:
filters.append(elasticsearch_dsl.Q("terms", dft__code_name=code))
query = elasticsearch_dsl.Q(
"bool",
filter=filters,
)
s = s.query(query)
s = s.extra(**{
"size": chunk_size,
})
i_calc = 0
for hit in s.scan():
i_calc += 1
if n is not None:
if i_calc > n:
return
eta.add()
material: Material = Material()
calc = Calculation()
# Check that all required information exists. If not, the
# calculation is skipped.
try:
material.material_id = hit.encyclopedia.material.material_id
material.material_type = hit.encyclopedia.material.material_type
material.formula = hit.encyclopedia.material.formula
material.formula_reduced = hit.encyclopedia.material.formula_reduced
material.species_and_counts = hit.encyclopedia.material.species_and_counts
material.species = hit.encyclopedia.material.species
calc.calc_id = hit.calc_id
calc.upload_id = hit.upload_id
calc.published = hit.published
calc.with_embargo = hit.with_embargo
calc.owners = [x.user_id for x in hit.owners]
idealized_structure = IdealizedStructure.m_from_dict(hit.encyclopedia.material.idealized_structure.to_dict())
calc.m_add_sub_section(Calculation.idealized_structure, idealized_structure)
method = Method.m_from_dict(hit.encyclopedia.method.to_dict())
method.program_name = hit.dft.code_name
method.program_version = hit.dft.code_version
method.basis_set = hit.dft.basis_set
calc.m_add_sub_section(Calculation.method, method)
workflow = Workflow()
workflow.workflow_type = hit.encyclopedia.calculation.calculation_type
calc.m_add_sub_section(Calculation.workflow, workflow)
except AttributeError:
continue
# Not all materials have a name
try:
material.material_name = hit.encyclopedia.material.material_name
except AttributeError:
pass
# Not all materials have a bulk section
try:
bulk = Bulk.m_from_dict(hit.encyclopedia.material.bulk)
material.m_add_sub_section(Material.bulk, bulk)
except AttributeError:
pass
# Properties may not exist at all
try:
enc_properties = hit.encyclopedia.properties
except AttributeError:
pass
else:
properties = Properties()
# Energies may not be present in all calculations
try:
energies = Energies.m_from_dict(enc_properties.energies.to_dict())
properties.m_add_sub_section(Properties.energies, energies)
except AttributeError:
pass
# Gather the boolean flags that indicate the presence of
# certain properties
try:
properties.has_electronic_dos = enc_properties.electronic_dos is not None
except AttributeError:
properties.has_electronic_dos = False
try:
properties.has_electronic_band_structure = enc_properties.electronic_band_structure is not None
except AttributeError:
properties.has_electronic_band_structure = False
try:
properties.has_thermodynamical_properties = enc_properties.thermodynamical_properties is not None
except AttributeError:
properties.has_thermodynamical_properties = False
# Not all materials have an atomic density
try:
properties.atomic_density = enc_properties.atomic_density
except AttributeError:
pass
# Not all materials have a mass density
try:
properties.mass_density = enc_properties.mass_density
except AttributeError:
pass
# Not all materials have band gaps
try:
properties.band_gap = enc_properties.band_gap
except AttributeError:
pass
# Not all materials have band gap type
try:
properties.band_gap_direct = enc_properties.band_gap_direct
except AttributeError:
pass
calc.m_add_sub_section(Calculation.properties, properties)
material.m_add_sub_section(Material.calculations, calc)
# Update entry that inserts the full material info if entry
# does not exists, otherwise only adds the calculation into
# the nested subdocument
entry = {}
entry['_op_type'] = 'update'
entry['_index'] = target_index_name
entry['_id'] = material.material_id
entry['_type'] = 'doc'
entry['_source'] = {
"upsert": material.m_to_dict(include_defaults=False, partial="es"),
"doc_as_upsert": False,
"script": {
"params": {
"calc": calc.m_to_dict(include_defaults=False, partial="es")
},
}
}
if in_place:
entry['_source']["script"]["source"] = "ctx._source.calculations.removeIf(x -> x.calc_id == params.calc.calc_id); ctx._source.calculations.add(params.calc)"
else:
entry['_source']["script"]["source"] = "ctx._source.calculations.add(params.calc)"
yield entry
if dry:
for _ in elastic_updates():
pass
else:
elasticsearch.helpers.bulk(
infrastructure.elastic_client, elastic_updates())
search.refresh()
# Create new index into which the data will be inserted. The old index will
# keep working while the new index is being built
material_document.init(index=target_index_name)
if threads > 1:
print(' use %d threads' % threads)
for _ in elasticsearch.helpers.parallel_bulk(
infrastructure.elastic_client, elastic_updates(), chunk_size=chunk_size,
thread_count=threads):
pass
else:
elasticsearch.helpers.bulk(
infrastructure.elastic_client, elastic_updates())
search.refresh()
# Changes materials index alias to point to the new index and remove the
# old index.
if not in_place:
new_index = elasticsearch_dsl.Index(target_index_name)
new_index.put_alias(name=config.elastic.materials_index_name)
old_index = elasticsearch_dsl.Index(old_index_name)
old_index.delete()
print('')
print('indexing completed')
......
......@@ -116,7 +116,8 @@ fs = NomadConfig(
elastic = NomadConfig(
host='localhost',
port=9200,
index_name='nomad_fairdi_calcs'
index_name='nomad_fairdi_calcs',
materials_index_name='nomad_fairdi_materials'
)
keycloak = NomadConfig(
......
......@@ -536,7 +536,7 @@ class EntryMetadata(metainfo.MSection):
ems = metainfo.SubSection(sub_section=EMSMetadata, a_search='ems')
dft = metainfo.SubSection(sub_section=DFTMetadata, a_search='dft', categories=[fast_access])
qcms = metainfo.SubSection(sub_section=QCMSMetadata, a_search='qcms')
encyclopedia = metainfo.SubSection(sub_section=EncyclopediaMetadata, a_search='encyclopedia')
encyclopedia = metainfo.SubSection(sub_section=EncyclopediaMetadata, categories=[fast_access], a_search='encyclopedia')
def apply_user_metadata(self, metadata: dict):
''' Applies a user provided metadata dict to this calc. '''
......
......@@ -64,7 +64,7 @@ class WyckoffSet(MSection):
Chemical element at this Wyckoff position.
"""
)
variables = SubSection(sub_section=WyckoffVariables.m_def, repeats=False)
variables = SubSection(sub_section=WyckoffVariables.m_def, repeats=False, categories=[fast_access])
class LatticeParameters(MSection):
......@@ -190,8 +190,8 @@ class IdealizedStructure(MSection):
""",
a_search=Search()
)
wyckoff_sets = SubSection(sub_section=WyckoffSet.m_def, repeats=True)
lattice_parameters = SubSection(sub_section=LatticeParameters.m_def)
wyckoff_sets = SubSection(sub_section=WyckoffSet.m_def, repeats=True, categories=[fast_access])
lattice_parameters = SubSection(sub_section=LatticeParameters.m_def, categories=[fast_access])
class Bulk(MSection):
......@@ -370,10 +370,10 @@ class Material(MSection):
)
# Bulk-specific properties
bulk = SubSection(sub_section=Bulk.m_def, repeats=False)
bulk = SubSection(sub_section=Bulk.m_def, repeats=False, categories=[fast_access])
# The idealized structure for this material
idealized_structure = SubSection(sub_section=IdealizedStructure.m_def, repeats=False)
idealized_structure = SubSection(sub_section=IdealizedStructure.m_def, repeats=False, categories=[fast_access])
class Method(MSection):
......@@ -575,11 +575,10 @@ class Properties(MSection):
""",
a_search=Search()
)
energies = SubSection(sub_section=Energies.m_def, repeats=False, a_search='energies')
energies = SubSection(sub_section=Energies.m_def, repeats=False, categories=[fast_access], a_search='energies')
electronic_band_structure = Quantity(
type=Reference(section_k_band.m_def),
shape=[],
categories=[fast_access],
description="""
Reference to an electronic band structure.
""",
......@@ -588,7 +587,6 @@ class Properties(MSection):
electronic_dos = Quantity(
type=Reference(section_dos.m_def),
shape=[],
categories=[fast_access],