Commit 8789e7d7 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Merge branch 'v1-process' into 'v1.0.0'

Re-processing related fixes

See merge request !456
parents f782e11e b18de659
Pipeline #115565 canceled with stages
in 28 seconds
......@@ -1126,7 +1126,7 @@ async def post_upload_action_publish(
async def post_upload_action_process(
upload_id: str = Path(
...,
description='The unique id of the upload to re-process.'),
description='The unique id of the upload to process.'),
user: User = Depends(create_user_dependency(required=True))):
'''
Processes an upload, i.e. parses the files and updates the NOMAD archive. Only admins
......
......@@ -16,5 +16,4 @@
# limitations under the License.
#
# from . import admin, uploads, entries, run, clean, users # noqa
from . import admin, run, clean, users, entries, uploads
......@@ -111,9 +111,9 @@ def clean(dry, skip_calcs, skip_fs, skip_es, staging_too, force):
es_upload_buckets = quantity_values('upload_id', owner='all', return_buckets=True)
to_delete = list(
(bucket['value'], bucket['count'])
(bucket.value, bucket.count)
for bucket in es_upload_buckets
if processing.Upload.objects(upload_id=bucket['value']).first() is None)
if processing.Upload.objects(upload_id=bucket.value).first() is None)
calcs = 0
for _, upload_calcs in to_delete:
......
......@@ -26,6 +26,7 @@ from pymongo.cursor import Cursor
from nomad.processing import ProcessStatus, Upload, Calc
from nomad.processing.data import generate_entry_id
from nomad.datamodel import Dataset
from nomad.parsing.parsers import parser_dict
_metadata_keys_to_flatten_v0 = (
......@@ -289,6 +290,10 @@ def _convert_mongo_entry(entry_dict: Dict[str, Any], common_coauthors: Set, fix_
for field in ('_id', 'upload_id', 'entry_create_time', 'parser_name'):
assert entry_dict.get(field) is not None, f'Missing required entry field: {field}'
# Check if the parser exists
parser_name = entry_dict.get('parser_name')
assert parser_name in parser_dict, f'Parser does not exist: {parser_name}'
def _convert_mongo_proc(proc_dict: Dict[str, Any]):
if 'tasks_status' in proc_dict:
......
......@@ -83,7 +83,7 @@ def _run_parallel(uploads, parallel: int, callable, label: str):
def _run_processing(
uploads, parallel: int, process, label: str, reprocess_running: bool = False,
uploads, parallel: int, process, label: str, process_running: bool = False,
wait_until_complete: bool = True, reset_first: bool = False):
from nomad import processing as proc
......@@ -93,7 +93,7 @@ def _run_processing(
'cli calls %s processing' % label,
current_process=upload.current_process,
last_status_message=upload.last_status_message, upload_id=upload.upload_id)
if upload.process_running and not reprocess_running:
if upload.process_running and not process_running:
logger.warn(
'cannot trigger %s, since the upload is already/still processing' % label,
current_process=upload.current_process,
......@@ -121,12 +121,12 @@ def _run_processing(
@admin.group(help='Upload related commands')
@click.option('--user', help='Select uploads of user with given id', type=str)
@click.option('--uploads-mongo-query', type=str, help='A query')
@click.option('--entries-mongo-query', type=str, help='A query')
@click.option('--entries-es-query', type=str, help='A query')
@click.option('--unpublished', help='Select only uploads in staging', is_flag=True)
@click.option('--published', help='Select only uploads that are publised', is_flag=True)
@click.option('--outdated', help='Select published uploads with older nomad version', is_flag=True)
@click.option('--program-name', multiple=True, type=str, help='Select only uploads with calcs of given codes')
@click.option('--query-mongo', is_flag=True, help='Select query mongo instead of elastic search.')
@click.option('--processing', help='Select only processing uploads', is_flag=True)
@click.option('--processing-failure-uploads', is_flag=True, help='Select uploads with failed processing')
@click.option('--processing-failure-calcs', is_flag=True, help='Select uploads with calcs with failed processing')
......@@ -143,98 +143,104 @@ def uploads(ctx, **kwargs):
def _query_uploads(
uploads,
user: str, unpublished: bool, published: bool, processing: bool, outdated: bool,
program_name: typing.List[str], query_mongo: bool,
unpublished: bool, published: bool, processing: bool, outdated: bool,
uploads_mongo_query: str, entries_mongo_query: str, entries_es_query: str,
processing_failure_uploads: bool, processing_failure_calcs: bool,
processing_failure: bool, processing_incomplete_uploads: bool,
processing_incomplete_calcs: bool, processing_incomplete: bool,
processing_necessary: bool, unindexed: bool):
import mongoengine
'''
Produces a list of uploads (mongoengine proc.Upload objects) based on a given
list of upoad ids and further filter parameters.
'''
from typing import Set, cast
import json
import elasticsearch_dsl as es
from mongoengine import Q
from nomad import infrastructure, processing as proc
from nomad import infrastructure, processing as proc, search
from nomad.app.v1 import models
mongo_client = infrastructure.setup_mongo()
infrastructure.setup_mongo()
infrastructure.setup_elastic()
query = mongoengine.Q()
calc_query = None
if user is not None:
query |= mongoengine.Q(user_id=user)
if unpublished:
query |= mongoengine.Q(published=False)
if published:
query |= mongoengine.Q(published=True)
if processing:
query |= mongoengine.Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
if uploads is not None and len(uploads) == 0:
uploads = None # None meaning all uploads
else:
uploads = set(uploads)
entries_mongo_query_q = Q()
if entries_mongo_query:
entries_mongo_query_q = Q(**json.loads(entries_mongo_query))
entries_query_uploads: Set[str] = None
if entries_es_query is not None:
entries_es_query_dict = json.loads(entries_es_query)
results = search.search(
owner='admin',
query=entries_es_query_dict,
pagination=models.MetadataPagination(page_size=0),
user_id=config.services.admin_user_id,
aggregations={
'uploads': models.Aggregation(
terms=models.TermsAggregation(
quantity='upload_id',
pagination=models.AggregationPagination(
page_size=10000
)
)
)
})
entries_query_uploads = set([
cast(str, bucket.value)
for bucket in results.aggregations['uploads'].terms.data]) # pylint: disable=no-member
if outdated:
uploads = proc.Calc._get_collection().distinct(
'upload_id',
{'nomad_version': {'$ne': config.meta.version}})
query |= mongoengine.Q(upload_id__in=uploads)
entries_mongo_query_q &= Q(nomad_version={'$ne': config.meta.version})
if program_name is not None and len(program_name) > 0:
code_queries = [es.Q('match', **{'results.method.simulation.program_name': name}) for name in program_name]
code_query = es.Q('bool', should=code_queries, minimum_should_match=1)
if processing_failure_calcs or processing_failure or processing_necessary:
entries_mongo_query_q &= Q(process_status=proc.ProcessStatus.FAILURE)
code_search = es.Search(index=config.elastic.entries_index)
code_search = code_search.query(code_query)
code_search.aggs.bucket('uploads', es.A(
'terms', field='upload_id', size=10000, min_doc_count=1))
uploads = [
upload['key']
for upload in code_search.execute().aggs['uploads']['buckets']]
if processing_incomplete_calcs or processing_incomplete or processing_necessary:
entries_mongo_query_q &= Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
query |= mongoengine.Q(upload_id__in=uploads)
mongo_entry_based_uploads = set(proc.Calc.objects(entries_mongo_query_q).distinct(field="upload_id"))
if entries_query_uploads is not None:
entries_query_uploads = entries_query_uploads.intersection(mongo_entry_based_uploads)
else:
entries_query_uploads = mongo_entry_based_uploads
if processing_failure_calcs or processing_failure or processing_necessary:
if calc_query is None:
calc_query = mongoengine.Q()
calc_query |= mongoengine.Q(process_status=proc.ProcessStatus.FAILURE)
if processing_failure_uploads or processing_failure or processing_necessary:
query |= mongoengine.Q(process_status=proc.ProcessStatus.FAILURE)
if processing_incomplete_calcs or processing_incomplete or processing_necessary:
if calc_query is None:
calc_query = mongoengine.Q()
calc_query |= mongoengine.Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
if processing_incomplete_uploads or processing_incomplete or processing_necessary:
query |= mongoengine.Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
if entries_query_uploads:
uploads_mongo_query_q = Q(upload_id__in=list(entries_query_uploads))
else:
uploads_mongo_query_q = Q()
if unindexed:
from nomad.search import quantity_values
uploads_in_es = set(quantity_values('upload_id', page_size=1000, owner='all'))
if uploads_mongo_query:
uploads_mongo_query_q &= Q(**json.loads(uploads_mongo_query))
if published:
uploads_mongo_query_q &= Q(publish_time__exists=True)
uploads_in_mongo = mongo_client[config.mongo.db_name]['calc'].distinct('upload_id')
if unpublished:
uploads_mongo_query_q &= Q(publish_time__exists=False)
uploads_not_in_es = []
for upload_id in uploads_in_mongo:
if upload_id not in uploads_in_es:
uploads_not_in_es.append(upload_id)
if processing:
uploads_mongo_query_q &= Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
query |= mongoengine.Q(
upload_id__in=uploads_not_in_es)
if processing_failure_uploads or processing_failure or processing_necessary:
uploads_mongo_query_q &= Q(process_status=proc.ProcessStatus.FAILURE)
try:
json_query = json.loads(' '.join(uploads))
if query_mongo:
uploads = proc.Calc.objects(**json_query).distinct(field="upload_id")
else:
from nomad.search import quantity_values
uploads = list(quantity_values(
'upload_id', query=es.Q(json_query), page_size=1000, owner='all'))
except Exception:
pass
if processing_incomplete_uploads or processing_incomplete or processing_necessary:
uploads_mongo_query_q &= Q(process_status__in=proc.ProcessStatus.STATUSES_PROCESSING)
if calc_query is not None:
query |= mongoengine.Q(
upload_id__in=proc.Calc.objects(calc_query).distinct(field="upload_id"))
if len(uploads) > 0:
query |= mongoengine.Q(upload_id__in=uploads)
final_query = uploads_mongo_query_q
if uploads is not None:
final_query &= Q(upload_id__in=list(uploads))
return query, proc.Upload.objects(query)
return final_query, proc.Upload.objects(final_query)
@uploads.command(help='List selected uploads')
......@@ -342,8 +348,9 @@ def reset(ctx, uploads, with_calcs, success, failure):
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--transformer', help='Qualified name to a Python function that should be applied to each EntryMetadata.')
@click.option('--skip-materials', is_flag=True, help='Only update the entries index.')
@click.pass_context
def index(ctx, uploads, parallel, transformer):
def index(ctx, uploads, parallel, transformer, skip_materials):
from nomad import search
transformer_func = None
......@@ -369,7 +376,8 @@ def index(ctx, uploads, parallel, transformer):
with upload.entries_metadata() as entries:
if transformer is not None:
transform(entries)
search.index([entry.m_parent for entry in entries], update_materials=True, refresh=True)
archives = [entry.m_parent for entry in entries]
search.index(archives, update_materials=not skip_materials, refresh=True)
return True
......@@ -421,13 +429,18 @@ def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
@click.option('--reprocess-running', is_flag=True, help='Also reprocess already running processes.')
@click.option('--process-running', is_flag=True, help='Also reprocess already running processes.')
@click.option('--setting', type=str, multiple=True, help='key=value to overwrite a default reprocess config setting.')
@click.pass_context
def re_process(ctx, uploads, parallel: int, reprocess_running: bool):
def process(ctx, uploads, parallel: int, process_running: bool, setting=typing.List[str]):
_, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
settings: typing.Dict[str, bool] = {}
for settings_str in setting:
key, value = settings_str.split('=')
settings[key] = bool(value)
_run_processing(
uploads, parallel, lambda upload: upload.process_upload(), 'processing',
reprocess_running=reprocess_running, reset_first=True)
uploads, parallel, lambda upload: upload.process_upload(reprocess_settings=settings),
'processing', process_running=process_running, reset_first=True)
@uploads.command(help='Repack selected uploads.')
......
......@@ -135,6 +135,10 @@ fs = NomadConfig(
elastic = NomadConfig(
host='localhost',
port=9200,
timeout=60,
bulk_timeout='10m',
bulk_size=1000,
entries_per_material_cap=1000,
entries_index='nomad_entries_v1',
materials_index='nomad_materials_v1',
)
......@@ -326,10 +330,19 @@ reprocess = NomadConfig(
# Configures standard behaviour when reprocessing.
# Note, the settings only matter for published uploads and entries. For uploads in
# staging, we always reparse, add newfound entries, and delete unmatched entries.
reparse_published_if_parser_unchanged=True,
reparse_published_if_parser_changed=True,
add_newfound_entries_to_published=True,
delete_unmatched_published_entries=False
rematch_published=True,
reprocess_existing_entries=True,
use_original_parser=False,
add_matched_entries_to_published=True,
delete_unmatched_published_entries=False,
index_invidiual_entries=False
)
process = NomadConfig(
index_materials=True,
reuse_parser=True,
metadata_file_name='nomad',
metadata_file_extensions=('json', 'yaml', 'yml')
)
bundle_import = NomadConfig(
......@@ -356,10 +369,11 @@ bundle_import = NomadConfig(
# When importing with trigger_processing=True, the settings below control the
# initial processing behaviour (see the config for `reprocess` for more info).
reparse_published_if_parser_unchanged=True,
reparse_published_if_parser_changed=True,
add_newfound_entries_to_published=True,
delete_unmatched_published_entries=True
rematch_published=True,
reprocess_existing_entries=True,
use_original_parser=False,
add_matched_entries_to_published=True,
delete_unmatched_published_entries=False
)
)
......@@ -369,13 +383,9 @@ console_log_level = logging.WARNING
max_upload_size = 32 * (1024 ** 3)
raw_file_strip_cutoff = 1000
max_entry_download = 500000
use_empty_parsers = False
process_reuse_parser = True
metadata_file_name = 'nomad'
metadata_file_extensions = ('json', 'yaml', 'yml')
enable_lazy_import = True
encyclopedia_enabled = True
aitoolkit_enabled = False
use_empty_parsers = False
def normalize_loglevel(value, default_level=logging.INFO):
......
......@@ -321,7 +321,7 @@ class Structure(MSection):
n_sites = Quantity(
type=int,
default=0,
derived=lambda a: len(a.cartesian_site_positions),
derived=lambda a: len(a.cartesian_site_positions) if a.cartesian_site_positions is not None else 0,
description="""
An integer specifying the length of the cartesian_site_positions property.
""",
......
......@@ -86,7 +86,7 @@ def setup_elastic(create_indices=True):
global elastic_client
elastic_client = connections.create_connection(
hosts=['%s:%d' % (config.elastic.host, config.elastic.port)],
timeout=60, max_retries=10, retry_on_timeout=True)
timeout=config.elastic.timeout, max_retries=10, retry_on_timeout=True)
logger.info('setup elastic connection')
# Setup materials index mapping. An alias is used to be able to reindex the
......
......@@ -308,6 +308,9 @@ class DocumentType():
'''
mappings: Dict[str, Any] = {}
if self == material_type and prefix is None:
mappings['n_entries'] = {'type': 'integer'}
for quantity_def in section_def.all_quantities.values():
elasticsearch_annotations = quantity_def.m_get_annotations(Elasticsearch, as_list=True)
for elasticsearch_annotation in elasticsearch_annotations:
......@@ -858,43 +861,70 @@ def delete_indices():
material_index.delete()
def index_entry(entry: MSection, update_material: bool = False):
def index_entry(entry: MSection, **kwargs):
'''
Upserts the given entry in the entry index. Optionally updates the materials index
as well.
'''
index_entries([entry], update_materials=update_material)
index_entries([entry], **kwargs)
_max_entries_index_size = 1000
def index_entries_with_materials(entries: List, refresh: bool = False):
index_entries(entries, refresh=refresh)
update_materials(entries, refresh=refresh)
def index_entries(entries: List, update_materials: bool = True, refresh: bool = False):
def index_entries(entries: List, refresh: bool = False):
'''
Upserts the given entries in the entry index. Optionally updates the materials index
as well.
'''
# split into reasonably sized problems
if len(entries) > _max_entries_index_size:
for entries_part in [entries[i:i + _max_entries_index_size] for i in range(0, len(entries), _max_entries_index_size)]:
index_entries(entries_part, update_materials=update_materials)
if len(entries) > config.elastic.bulk_size:
for entries_part in [entries[i:i + config.elastic.bulk_size] for i in range(0, len(entries), config.elastic.bulk_size)]:
index_entries(entries_part, refresh=refresh)
return
if len(entries) == 0:
return
# Index the entries themselves.
actions_and_docs = []
for entry in entries:
actions_and_docs.append(dict(index=dict(_id=entry['entry_id'])))
entry_index_doc = entry_type.create_index_doc(entry)
actions_and_docs.append(entry_index_doc)
logger = utils.get_logger('nomad.search', n_entries=len(entries))
elasticsearch_results = entry_index.bulk(body=actions_and_docs, refresh=True)
with utils.timer(logger, 'prepare bulk index of entries actions and docs'):
actions_and_docs = []
for entry in entries:
actions_and_docs.append(dict(index=dict(_id=entry['entry_id'])))
entry_index_doc = entry_type.create_index_doc(entry)
actions_and_docs.append(entry_index_doc)
timer_kwargs = {}
try:
import json
timer_kwargs['size'] = len(json.dumps(actions_and_docs))
timer_kwargs['n_actions'] = len(actions_and_docs)
except Exception:
pass
with utils.timer(
logger, 'perform bulk index of entries',
lnr_event='failed to bulk index entries',
**timer_kwargs
):
entry_index.bulk(body=actions_and_docs, refresh=refresh, timeout=config.elastic.bulk_timeout)
def update_materials(entries: List, refresh: bool = False):
# split into reasonably sized problems
if len(entries) > config.elastic.bulk_size:
for entries_part in [entries[i:i + config.elastic.bulk_size] for i in range(0, len(entries), config.elastic.bulk_size)]:
update_materials(entries_part, refresh=refresh)
return
if not update_materials:
if len(entries) == 0:
return
logger = utils.get_logger('nomad.search', n_entries=len(entries))
def get_material_id(entry):
material_id = None
try:
......@@ -913,43 +943,47 @@ def index_entries(entries: List, update_materials: bool = True, refresh: bool =
if material_id is not None:
material_ids.add(material_id)
logger = logger.bind(n_materials=len(material_ids))
# Get existing materials for entries' material ids (i.e. the entry needs to be added
# or updated).
if material_ids:
elasticsearch_results = material_index.mget(body={
'docs': [dict(_id=material_id) for material_id in material_ids]
})
existing_material_docs = [
doc['_source'] for doc in elasticsearch_results['docs'] if '_source' in doc]
else:
existing_material_docs = []
with utils.timer(logger, 'get existing materials', lnr_event='failed to get existing materials'):
if material_ids:
elasticsearch_results = material_index.mget(body={
'docs': [dict(_id=material_id) for material_id in material_ids]
})
existing_material_docs = [
doc['_source'] for doc in elasticsearch_results['docs'] if '_source' in doc]
else:
existing_material_docs = []
# Get old materials that still have one of the entries, but the material id has changed
# (i.e. the materials where entries need to be removed due entries having different
# materials now).
elasticsearch_results = material_index.search(body={
'size': len(entry_ids),
'query': {
'bool': {
'must': {
'nested': {
'path': 'entries',
'query': {
'terms': {
'entries.entry_id': list(entry_ids)
with utils.timer(logger, 'get old materials', lnr_event='failed to get old materials'):
elasticsearch_results = material_index.search(body={
'size': len(entry_ids),
'query': {
'bool': {
'must': {
'nested': {
'path': 'entries',
'query': {
'terms': {
'entries.entry_id': list(entry_ids)
}
}
}
}
},
'must_not': {
'terms': {
'material_id': list(material_ids)
},
'must_not': {
'terms': {
'material_id': list(material_ids)
}
}
}
}
}
})
old_material_docs = [hit['_source'] for hit in elasticsearch_results['hits']['hits']]
})
old_material_docs = [hit['_source'] for hit in elasticsearch_results['hits']['hits']]
# Compare and create the appropriate materials index actions
# First, we go through the existing materials. The following cases need to be covered:
......@@ -960,6 +994,7 @@ def index_entries(entries: List, update_materials: bool = True, refresh: bool =
# case where an entry's material id changed within the set of other entries' material ids)
# This n + m complexity with n=number of materials and m=number of entries
actions_and_docs = []
material_docs = []
material_docs_dict = {}
remaining_entry_ids = set(entry_ids)
for material_doc in existing_material_docs:
......@@ -993,6 +1028,7 @@ def index_entries(entries: List, update_materials: bool = True, refresh: bool =
actions_and_docs.append(dict(index=dict(_id=material_id)))
actions_and_docs.append(material_doc)
material_docs.append(material_doc)
for entry_id in remaining_entry_ids:
entry = entries_dict.get(entry_id)
......@@ -1005,6 +1041,7 @@ def index_entries(entries: List, update_materials: bool = True, refresh: bool =
material_docs_dict[material_id] = material_doc
actions_and_docs.append(dict(create=dict(_id=material_id)))
actions_and_docs.append(material_doc)
material_docs.append(material_doc)
# The material does exist (now), but the entry is new.
material_doc.setdefault('entries', []).append(material_entry_type.create_index_doc(entry))
......@@ -1030,12 +1067,41 @@ def index_entries(entries: List, update_materials: bool = True, refresh: bool =
# The material needs to be updated
actions_and_docs.append(dict(index=dict(_id=material_id)))
actions_and_docs.append(material_doc)
material_docs.append(material_doc)
# Third, we potentially cap the number of entries in a material. We ensure that only
# a certain amounts of entries are stored with all metadata. The rest will only
# have their entry id.
all_n_entries_capped = 0