Commit b18de659 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added more logs and configurability to es indexing. #654

parent e3f99fa1
Pipeline #115559 passed with stages
in 35 minutes and 59 seconds
......@@ -137,6 +137,8 @@ elastic = NomadConfig(
port=9200,
timeout=60,
bulk_timeout='10m',
bulk_size=1000,
entries_per_material_cap=1000,
entries_index='nomad_entries_v1',
materials_index='nomad_materials_v1',
)
......@@ -336,6 +338,13 @@ reprocess = NomadConfig(
index_invidiual_entries=False
)
process = NomadConfig(
index_materials=True,
reuse_parser=True,
metadata_file_name='nomad',
metadata_file_extensions=('json', 'yaml', 'yml')
)
bundle_import = NomadConfig(
# Basic settings
allow_bundles_from_oasis=True, # If oasis admins can "push" bundles to this NOMAD deployment
......@@ -374,13 +383,9 @@ console_log_level = logging.WARNING
max_upload_size = 32 * (1024 ** 3)
raw_file_strip_cutoff = 1000
max_entry_download = 500000
use_empty_parsers = False
process_reuse_parser = True
metadata_file_name = 'nomad'
metadata_file_extensions = ('json', 'yaml', 'yml')
enable_lazy_import = True
encyclopedia_enabled = True
aitoolkit_enabled = False
use_empty_parsers = False
def normalize_loglevel(value, default_level=logging.INFO):
......
......@@ -308,6 +308,9 @@ class DocumentType():
'''
mappings: Dict[str, Any] = {}
if self == material_type and prefix is None:
mappings['n_entries'] = {'type': 'integer'}
for quantity_def in section_def.all_quantities.values():
elasticsearch_annotations = quantity_def.m_get_annotations(Elasticsearch, as_list=True)
for elasticsearch_annotation in elasticsearch_annotations:
......@@ -871,17 +874,14 @@ def index_entries_with_materials(entries: List, refresh: bool = False):
update_materials(entries, refresh=refresh)
_max_entries_index_size = 1000
def index_entries(entries: List, refresh: bool = False):
'''
Upserts the given entries in the entry index. Optionally updates the materials index
as well.
'''
# split into reasonably sized problems
if len(entries) > _max_entries_index_size:
for entries_part in [entries[i:i + _max_entries_index_size] for i in range(0, len(entries), _max_entries_index_size)]:
if len(entries) > config.elastic.bulk_size:
for entries_part in [entries[i:i + config.elastic.bulk_size] for i in range(0, len(entries), config.elastic.bulk_size)]:
index_entries(entries_part, refresh=refresh)
return
......@@ -901,17 +901,22 @@ def index_entries(entries: List, refresh: bool = False):
try:
import json
timer_kwargs['size'] = len(json.dumps(actions_and_docs))
timer_kwargs['n_actions'] = len(actions_and_docs)
except Exception:
pass
with utils.timer(logger, 'perform bulk index of entries', **timer_kwargs):
with utils.timer(
logger, 'perform bulk index of entries',
lnr_event='failed to bulk index entries',
**timer_kwargs
):
entry_index.bulk(body=actions_and_docs, refresh=refresh, timeout=config.elastic.bulk_timeout)
def update_materials(entries: List, refresh: bool = False):
# split into reasonably sized problems
if len(entries) > _max_entries_index_size:
for entries_part in [entries[i:i + _max_entries_index_size] for i in range(0, len(entries), _max_entries_index_size)]:
if len(entries) > config.elastic.bulk_size:
for entries_part in [entries[i:i + config.elastic.bulk_size] for i in range(0, len(entries), config.elastic.bulk_size)]:
update_materials(entries_part, refresh=refresh)
return
......@@ -938,9 +943,11 @@ def update_materials(entries: List, refresh: bool = False):
if material_id is not None:
material_ids.add(material_id)
logger = logger.bind(n_materials=len(material_ids))
# Get existing materials for entries' material ids (i.e. the entry needs to be added
# or updated).
with utils.timer(logger, 'get existing materials'):
with utils.timer(logger, 'get existing materials', lnr_event='failed to get existing materials'):
if material_ids:
elasticsearch_results = material_index.mget(body={
'docs': [dict(_id=material_id) for material_id in material_ids]
......@@ -953,7 +960,7 @@ def update_materials(entries: List, refresh: bool = False):
# Get old materials that still have one of the entries, but the material id has changed
# (i.e. the materials where entries need to be removed due entries having different
# materials now).
with utils.timer(logger, 'get old materials'):
with utils.timer(logger, 'get old materials', lnr_event='failed to get old materials'):
elasticsearch_results = material_index.search(body={
'size': len(entry_ids),
'query': {
......@@ -987,6 +994,7 @@ def update_materials(entries: List, refresh: bool = False):
# case where an entry's material id changed within the set of other entries' material ids)
# This n + m complexity with n=number of materials and m=number of entries
actions_and_docs = []
material_docs = []
material_docs_dict = {}
remaining_entry_ids = set(entry_ids)
for material_doc in existing_material_docs:
......@@ -1020,6 +1028,7 @@ def update_materials(entries: List, refresh: bool = False):
actions_and_docs.append(dict(index=dict(_id=material_id)))
actions_and_docs.append(material_doc)
material_docs.append(material_doc)
for entry_id in remaining_entry_ids:
entry = entries_dict.get(entry_id)
......@@ -1032,6 +1041,7 @@ def update_materials(entries: List, refresh: bool = False):
material_docs_dict[material_id] = material_doc
actions_and_docs.append(dict(create=dict(_id=material_id)))
actions_and_docs.append(material_doc)
material_docs.append(material_doc)
# The material does exist (now), but the entry is new.
material_doc.setdefault('entries', []).append(material_entry_type.create_index_doc(entry))
......@@ -1057,17 +1067,38 @@ def update_materials(entries: List, refresh: bool = False):
# The material needs to be updated
actions_and_docs.append(dict(index=dict(_id=material_id)))
actions_and_docs.append(material_doc)
material_docs.append(material_doc)
# Third, we potentially cap the number of entries in a material. We ensure that only
# a certain amounts of entries are stored with all metadata. The rest will only
# have their entry id.
all_n_entries_capped = 0
all_n_entries = 0
for material_doc in material_docs:
material_entries = material_doc.get('entries', [])
material_doc['n_entries'] = len(material_entries)
if len(material_entries) > config.elastic.entries_per_material_cap:
material_doc['entries'] = material_entries[0:config.elastic.entries_per_material_cap]
all_n_entries_capped += len(material_entries)
all_n_entries += material_doc['n_entries']
# Execute the created actions in bulk.
timer_kwargs = {}
try:
import json
timer_kwargs['size'] = len(json.dumps(actions_and_docs))
timer_kwargs['n_actions'] = len(actions_and_docs)
timer_kwargs['n_entries'] = all_n_entries
timer_kwargs['n_entries_capped'] = all_n_entries_capped
except Exception:
pass
with utils.timer(logger, 'perform bulk index of materials', **timer_kwargs):
with utils.timer(
logger, 'perform bulk index of materials',
lnr_event='failed to bulk index materials',
**timer_kwargs
):
if len(actions_and_docs) > 0:
material_index.bulk(body=actions_and_docs, refresh=False, timeout=config.elastic.bulk_timeout)
......
......@@ -250,7 +250,7 @@ class Calc(Proc):
# metadata file name defined in nomad.config nomad_metadata.yaml/json
# which can be placed in the directory containing the mainfile or somewhere up
# highest priority is directory with mainfile
metadata_file = config.metadata_file_name
metadata_file = config.process.metadata_file_name
metadata_dir = os.path.dirname(self.mainfile_file.os_path)
upload_raw_dir = self.upload_files._raw_dir.os_path
......@@ -587,7 +587,7 @@ class Calc(Proc):
self._entry_metadata.parser_name = self.parser_name
with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
if not config.process_reuse_parser:
if not config.process.reuse_parser:
if isinstance(parser, parsing.FairdiParser):
try:
parser = parser.__class__()
......@@ -849,7 +849,7 @@ class Upload(Proc):
@lru_cache()
def metadata_file_cached(self, path):
for ext in config.metadata_file_extensions:
for ext in config.process.metadata_file_extensions:
full_path = '%s.%s' % (path, ext)
if os.path.isfile(full_path):
try:
......@@ -1185,7 +1185,7 @@ class Upload(Proc):
staging_upload_files = self.staging_upload_files
metadata = self.metadata_file_cached(
os.path.join(self.upload_files.os_path, 'raw', config.metadata_file_name))
os.path.join(self.upload_files.os_path, 'raw', config.process.metadata_file_name))
skip_matching = metadata.get('skip_matching', False)
entries_metadata = metadata.get('entries', {})
......@@ -1364,7 +1364,9 @@ class Upload(Proc):
with self.entries_metadata() as entries:
with utils.timer(logger, 'upload entries and materials indexed'):
archives = [entry.m_parent for entry in entries]
search.index(archives, update_materials=True, refresh=True)
search.index(
archives, update_materials=config.process.index_materials,
refresh=True)
# send email about process finish
if not self.publish_directly:
......@@ -1526,7 +1528,9 @@ class Upload(Proc):
# Update entries and elastic search
with self.entries_metadata() as entries_metadata:
with utils.timer(logger, 'index updated'):
search.update_metadata(entries_metadata, update_materials=True, refresh=True)
search.update_metadata(
entries_metadata, update_materials=config.process.index_materials,
refresh=True)
def entry_ids(self) -> List[str]:
return [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)]
......@@ -1824,7 +1828,9 @@ class Upload(Proc):
# Index in elastic search
if entry_data_to_index:
search.index(entry_data_to_index, update_materials=True, refresh=True)
search.index(
entry_data_to_index, update_materials=config.process.index_materials,
refresh=True)
if settings.trigger_processing:
reprocess_settings = {
......
......@@ -208,13 +208,13 @@ def lnr(logger, event, **kwargs):
except Exception as e:
# ignore HTTPException as they are part of the normal app error handling
if e.__class__.__name__ == 'HTTPException':
if e.__class__.__name__ != 'HTTPException':
logger.error(event, exc_info=e, **kwargs)
raise e
@contextmanager
def timer(logger, event, method='info', **kwargs):
def timer(logger, event, method='info', lnr_event: str = None, **kwargs):
'''
A context manager that takes execution time and produces a log entry with said time.
......@@ -232,6 +232,11 @@ def timer(logger, event, method='info', **kwargs):
try:
yield kwargs
except Exception as e:
if lnr_event is not None:
stop = time.time()
logger.error(lnr_event, exc_info=e, exec_time=stop - start, **kwargs)
raise e
finally:
stop = time.time()
......
......@@ -18,9 +18,11 @@ data:
{{ if .Values.meta.deployment }}
deployment: "{{ .Values.meta.deployment }}"
{{ end }}
process_reuse_parser: {{ .Values.processReuseParser }}
encyclopedia_enabled: {{ .Values.encyclopedia.enabled }}
aitoolkit_enabled: {{ .Values.aitoolkit.enabled }}
process:
reuse_parser: {{ .Values.process.reuseParser }}
index_materials: {{ .Values.process.indexMaterials }}
reprocess:
rematch_published: {{ .Values.reprocess.rematchPublished }}
reprocess_existing_entries: {{ .Values.reprocess.reprocessExistingEntries }}
......@@ -34,7 +36,6 @@ data:
isTest: {{ .Values.version.isTest }}
usesBetaData: {{ .Values.version.usesBetaData }}
officialUrl: "{{ .Values.version.officialUrl }}"
process_reuse_parser: {{ .Values.processReuseParser }}
fs:
tmp: ".volumes/fs/staging/tmp"
prefix_size: {{ .Values.volumes.prefixSize }}
......@@ -60,7 +61,9 @@ data:
host: "{{ .Values.elastic.host }}"
port: {{ .Values.elastic.port }}
timeout: {{ .Values.elastic.timeout }}
bulk_timeout: "{{ .Values.elastic.bulkTimeout }}""
bulk_timeout: "{{ .Values.elastic.bulkTimeout }}"
bulk_size=1000: {{ .Values.elastic.bulkSize }}
entries_per_material_cap: {{ .Values.elastic.entriesPerMaterialCap }}
entries_index: "{{ .Values.dbname }}_entries_v1"
materials_index: "{{ .Values.dbname }}_materials_v1"
mongo:
......
......@@ -129,6 +129,8 @@ elastic:
port: 9200
timeout: 60
bulkTimeout: '10m'
bulk_size: 1000
entries_per_material_cap: 1000
logstash:
enabled: true
......@@ -175,7 +177,9 @@ reprocess:
deleteUnmatchedPublishedEntries: false
indexIndividualEntries: false
processReuseParser: true
process:
reuseParser: true
indexMaterials: true
datacite:
enabled: false
......
......@@ -344,6 +344,25 @@ def test_index_entry(elastic, indices, example_entry):
assert_entry_indexed(example_entry)
def create_entry(spec: str, material_kwargs: dict = None):
entry_id, material_id = spec.split('-')
changed = material_id.endswith('*')
if changed:
material_id = material_id[:-1]
entry = Entry(entry_id=entry_id)
entry.m_create(Results).m_create(
Material, material_id=material_id,
springer_labels=['A', 'B'] if changed else ['A'])
return entry
def create_entries(spec: str):
return [
create_entry(entry_spec.strip())
for entry_spec in spec.split(',')
if entry_spec.strip() != '']
# The parameters are before, to_index, after. Before and after describes what is in the index
# before and after the test. To_index describes what is to be indexed in the test.
# Both are strings with csv, where each value is a dash separated pair of numbers. The
......@@ -358,24 +377,23 @@ def test_index_entry(elastic, indices, example_entry):
pytest.param('1-1', '1-1*', '1-1*', id='update-material-property')
])
def test_index_entries(elastic, indices, before, to_index, after):
def create_entry(spec: str, material_kwargs: dict = None):
entry_id, material_id = spec.split('-')
changed = material_id.endswith('*')
if changed:
material_id = material_id[:-1]
entry = Entry(entry_id=entry_id)
entry.m_create(Results).m_create(
Material, material_id=material_id,
springer_labels=['A', 'B'] if changed else ['A'])
return entry
def create_entries(spec: str):
return [
create_entry(entry_spec.strip())
for entry_spec in spec.split(',')
if entry_spec.strip() != '']
index_entries_with_materials(create_entries(before), refresh=True)
index_entries_with_materials(create_entries(to_index), refresh=True)
assert_entries_indexed(create_entries(after))
@pytest.mark.parametrize('cap, entries', [
pytest.param(2, 1, id='below-cap'),
pytest.param(2, 3, id='above-cap')
])
def test_index_materials_capped(elastic, indices, monkeypatch, cap, entries):
monkeypatch.setattr('nomad.config.elastic.entries_per_material_cap', cap)
index_entries_with_materials(create_entries(','.join([f'{i}-1' for i in range(1, entries + 1)])), refresh=True)
material_docs = [
hit['_source'] for hit in
material_index.search(body=dict(query=dict(match_all={})))['hits']['hits']]
for material_doc in material_docs:
assert len(material_doc['entries']) <= cap
assert material_doc['n_entries'] == entries
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment