Skip to content
Snippets Groups Projects
Commit d23b198c authored by Markus Scheidgen's avatar Markus Scheidgen Committed by Lauri Himanen
Browse files

Consider the new entries index in most processing functions.

parent afa10eae
Branches
Tags
5 merge requests!548Release v1,!461Draft: remove owner key from the query,!407Fix a bug concerning outcar parser,!353Restructured spectrum data for XPS parser,!340Unified raw file download functionality and bugfix
...@@ -359,7 +359,7 @@ def search( ...@@ -359,7 +359,7 @@ def search(
more_response_data['es_query'] = es_query.to_dict() more_response_data['es_query'] = es_query.to_dict()
result = SearchResponse( result = SearchResponse(
owner=owner, owner='all' if owner is None else owner,
query=query, query=query,
pagination=pagination_response, pagination=pagination_response,
required=required, required=required,
...@@ -396,7 +396,8 @@ def update_by_query( ...@@ -396,7 +396,8 @@ def update_by_query(
if query is None: if query is None:
query = {} query = {}
es_query = _api_to_es_query(query) es_query = _api_to_es_query(query)
es_query &= _owner_es_query(owner=owner, user_id=user_id) if owner is not None:
es_query &= _owner_es_query(owner=owner, user_id=user_id)
body = { body = {
'script': { 'script': {
...@@ -421,3 +422,42 @@ def update_by_query( ...@@ -421,3 +422,42 @@ def update_by_query(
infrastructure.elastic_client.indices.refresh(index=index_name) infrastructure.elastic_client.indices.refresh(index=index_name)
return result return result
def delete_by_query(
owner: str = 'public',
query: Query = None,
user_id: str = None,
index: Union[Index, str] = entry_index,
refresh: bool = False):
'''
Deletes all entries that match the given query.
'''
if isinstance(index, Index):
index_name = index.index_name
else:
index_name = index
if query is None:
query = {}
es_query = _api_to_es_query(query)
es_query &= _owner_es_query(owner=owner, user_id=user_id)
body = {
'query': es_query.to_dict()
}
try:
result = infrastructure.elastic_client.delete_by_query(
body=body, index=index_name)
except TransportError as e:
utils.get_logger(__name__).error(
'es delete_by_query error', exc_info=e,
es_info=json.dumps(e.info, indent=2))
raise SearchError(e)
if refresh:
infrastructure.elastic_client.indices.refresh(index=index_name)
return result
...@@ -84,7 +84,7 @@ def setup_mongo(client=False): ...@@ -84,7 +84,7 @@ def setup_mongo(client=False):
return mongo_client return mongo_client
def setup_elastic(create_mappings=True): def setup_elastic(create_indices=True):
''' Creates connection to elastic search. ''' ''' Creates connection to elastic search. '''
from nomad.search import entry_document, material_document from nomad.search import entry_document, material_document
from elasticsearch_dsl import Index from elasticsearch_dsl import Index
...@@ -99,7 +99,7 @@ def setup_elastic(create_mappings=True): ...@@ -99,7 +99,7 @@ def setup_elastic(create_mappings=True):
# materials with zero downtime. First see to which index the alias points # materials with zero downtime. First see to which index the alias points
# to. If alias is not set, create it. Update the mapping in the index # to. If alias is not set, create it. Update the mapping in the index
# pointed to by the alias. # pointed to by the alias.
if create_mappings: if create_indices:
try: try:
if elastic_client.indices.exists_alias(config.elastic.materials_index_name): if elastic_client.indices.exists_alias(config.elastic.materials_index_name):
index_name = list(elastic_client.indices.get(config.elastic.materials_index_name).keys())[0] index_name = list(elastic_client.indices.get(config.elastic.materials_index_name).keys())[0]
...@@ -131,8 +131,8 @@ def setup_elastic(create_mappings=True): ...@@ -131,8 +131,8 @@ def setup_elastic(create_mappings=True):
logger.info('initialized elastic index for calculations', index_name=config.elastic.index_name) logger.info('initialized elastic index for calculations', index_name=config.elastic.index_name)
logger.info('initialized elastic index for materials', index_name=config.elastic.materials_index_name) logger.info('initialized elastic index for materials', index_name=config.elastic.materials_index_name)
from nomad.metainfo.elasticsearch_extension import create_indices from nomad.metainfo.elasticsearch_extension import create_indices as create_v1_indices
create_indices() create_v1_indices()
logger.info('initialized v1 elastic indices') logger.info('initialized v1 elastic indices')
return elastic_client return elastic_client
...@@ -461,6 +461,12 @@ def reset(remove: bool): ...@@ -461,6 +461,12 @@ def reset(remove: bool):
if not remove: if not remove:
entry_document.init(index=config.elastic.index_name) entry_document.init(index=config.elastic.index_name)
material_document.init(index=material_index_name) material_document.init(index=material_index_name)
from nomad.metainfo.elasticsearch_extension import create_indices, delete_indices
delete_indices()
if not remove:
create_indices()
logger.info('elastic index resetted') logger.info('elastic index resetted')
except Exception as e: except Exception as e:
logger.error('exception resetting elastic', exc_info=e) logger.error('exception resetting elastic', exc_info=e)
......
...@@ -392,6 +392,10 @@ class Index(): ...@@ -392,6 +392,10 @@ class Index():
else: else:
logger.info('elasticsearch index exists') logger.info('elasticsearch index exists')
def delete(self):
if self.elastic_client.indices.exists(index=self.index_name):
self.elastic_client.indices.delete(index=self.index_name)
def refresh(self): def refresh(self):
self.elastic_client.indices.refresh(index=self.index_name) self.elastic_client.indices.refresh(index=self.index_name)
...@@ -636,8 +640,8 @@ class SearchQuantity(): ...@@ -636,8 +640,8 @@ class SearchQuantity():
def create_indices(entry_section_def: Section = None, material_section_def: Section = None): def create_indices(entry_section_def: Section = None, material_section_def: Section = None):
''' '''
Initially creates the mapping for all document types and creates the indices in Elasticsearch. Creates the mapping for all document types and creates the indices in Elasticsearch.
The indices must not exist already. The indices must not exist already. Prior created mappings will be replaced.
''' '''
if entry_section_def is None: if entry_section_def is None:
from nomad.datamodel import EntryArchive from nomad.datamodel import EntryArchive
...@@ -661,6 +665,11 @@ def create_indices(entry_section_def: Section = None, material_section_def: Sect ...@@ -661,6 +665,11 @@ def create_indices(entry_section_def: Section = None, material_section_def: Sect
material_index.create_index() material_index.create_index()
def delete_indices():
entry_index.delete()
material_index.delete()
def index_entry(entry: MSection, update_material: bool = False): def index_entry(entry: MSection, update_material: bool = False):
''' '''
Upserts the given entry in the entry index. Optionally updates the materials index Upserts the given entry in the entry index. Optionally updates the materials index
......
...@@ -57,6 +57,8 @@ from nomad.datamodel import ( ...@@ -57,6 +57,8 @@ from nomad.datamodel import (
from nomad.archive import ( from nomad.archive import (
write_partial_archive_to_mongo, delete_partial_archives_from_mongo) write_partial_archive_to_mongo, delete_partial_archives_from_mongo)
from nomad.datamodel.encyclopedia import EncyclopediaMetadata from nomad.datamodel.encyclopedia import EncyclopediaMetadata
from nomad.metainfo.elasticsearch_extension import index_entry
from nomad.app.v1.search import update_by_query, delete_by_query
section_metadata = datamodel.EntryArchive.section_metadata.name section_metadata = datamodel.EntryArchive.section_metadata.name
...@@ -95,6 +97,22 @@ _log_processors = [ ...@@ -95,6 +97,22 @@ _log_processors = [
TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)] TimeStamper(fmt="%Y-%m-%d %H:%M.%S", utc=False)]
def _es_publish_upload(upload_id: str):
# TODO does this belong in this module
# TODO what about the materials index
update_by_query(
'ctx._source.published = true;',
query=dict(upload_id=upload_id),
owner=None, refresh=True)
def _es_delete_upload(upload_id: str):
# TODO does this belong in this module
# TODO what about the materials index
# TODO implement
delete_by_query(owner=None, query=dict(upload_id=upload_id), refresh=True)
def _normalize_oasis_upload_metadata(upload_id, metadata): def _normalize_oasis_upload_metadata(upload_id, metadata):
# This is overwritten by the tests to do necessary id manipulations # This is overwritten by the tests to do necessary id manipulations
return upload_id, metadata return upload_id, metadata
...@@ -415,6 +433,8 @@ class Calc(Proc): ...@@ -415,6 +433,8 @@ class Calc(Proc):
'could not apply domain metadata to entry', exc_info=e) 'could not apply domain metadata to entry', exc_info=e)
self._entry_metadata.a_elastic.index() self._entry_metadata.a_elastic.index()
assert self._parser_results.section_metadata == self._entry_metadata
index_entry(self._parser_results)
except Exception as e: except Exception as e:
self.get_logger().error( self.get_logger().error(
'could not index after processing failure', exc_info=e) 'could not index after processing failure', exc_info=e)
...@@ -541,6 +561,8 @@ class Calc(Proc): ...@@ -541,6 +561,8 @@ class Calc(Proc):
# index in search # index in search
with utils.timer(logger, 'calc metadata indexed'): with utils.timer(logger, 'calc metadata indexed'):
self._entry_metadata.a_elastic.index() self._entry_metadata.a_elastic.index()
assert self._parser_results.section_metadata == self._entry_metadata
index_entry(self._parser_results)
# persist the archive # persist the archive
with utils.timer( with utils.timer(
...@@ -656,6 +678,8 @@ class Calc(Proc): ...@@ -656,6 +678,8 @@ class Calc(Proc):
# index in search # index in search
with utils.timer(logger, 'calc metadata indexed'): with utils.timer(logger, 'calc metadata indexed'):
self._entry_metadata.a_elastic.index() self._entry_metadata.a_elastic.index()
assert self._parser_results.section_metadata == self._entry_metadata
index_entry(self._parser_results)
# persist the archive # persist the archive
with utils.timer( with utils.timer(
...@@ -867,6 +891,7 @@ class Upload(Proc): ...@@ -867,6 +891,7 @@ class Upload(Proc):
with utils.lnr(logger, 'upload delete failed'): with utils.lnr(logger, 'upload delete failed'):
with utils.timer(logger, 'upload deleted from index'): with utils.timer(logger, 'upload deleted from index'):
search.delete_upload(self.upload_id) search.delete_upload(self.upload_id)
_es_delete_upload(self.upload_id)
with utils.timer(logger, 'upload partial archives deleted'): with utils.timer(logger, 'upload partial archives deleted'):
calc_ids = [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)] calc_ids = [calc.calc_id for calc in Calc.objects(upload_id=self.upload_id)]
...@@ -918,6 +943,7 @@ class Upload(Proc): ...@@ -918,6 +943,7 @@ class Upload(Proc):
with utils.timer(logger, 'index updated'): with utils.timer(logger, 'index updated'):
search.publish(calcs) search.publish(calcs)
_es_publish_upload(self.upload_id)
if isinstance(self.upload_files, StagingUploadFiles): if isinstance(self.upload_files, StagingUploadFiles):
with utils.timer(logger, 'upload staging files deleted'): with utils.timer(logger, 'upload staging files deleted'):
......
...@@ -112,18 +112,25 @@ for domain in datamodel.domains: ...@@ -112,18 +112,25 @@ for domain in datamodel.domains:
order_default_quantities.setdefault(domain, order_default_quantities.get('__all__')) order_default_quantities.setdefault(domain, order_default_quantities.get('__all__'))
# Used by:
# - processing to delete upload
# - cli client mirror to delete uploads while testing
# - cli admin uploads rm
def delete_upload(upload_id): def delete_upload(upload_id):
''' Delete all entries with given ``upload_id`` from the index. ''' ''' Delete all entries with given ``upload_id`` from the index. '''
index = entry_document._default_index() index = entry_document._default_index()
Search(index=index).query('match', upload_id=upload_id).delete() Search(index=index).query('match', upload_id=upload_id).delete()
# Used by:
# - cli admin entry delete
def delete_entry(calc_id): def delete_entry(calc_id):
''' Delete the entry with the given ``calc_id`` from the index. ''' ''' Delete the entry with the given ``calc_id`` from the index. '''
index = entry_document._default_index() index = entry_document._default_index()
Search(index=index).query('match', calc_id=calc_id).delete() Search(index=index).query('match', calc_id=calc_id).delete()
# Only used by processing when publishing uploads
def publish(calcs: Iterable[datamodel.EntryMetadata]) -> None: def publish(calcs: Iterable[datamodel.EntryMetadata]) -> None:
''' Update all given calcs with their metadata and set ``publish = True``. ''' ''' Update all given calcs with their metadata and set ``publish = True``. '''
def elastic_updates(): def elastic_updates():
...@@ -140,6 +147,10 @@ def publish(calcs: Iterable[datamodel.EntryMetadata]) -> None: ...@@ -140,6 +147,10 @@ def publish(calcs: Iterable[datamodel.EntryMetadata]) -> None:
refresh() refresh()
# Used by:
# - cli admin uploads [chown, index]
# - cli admin lift embargo
# - cli client mirror
def index_all(calcs: Iterable[datamodel.EntryMetadata], do_refresh=True) -> None: def index_all(calcs: Iterable[datamodel.EntryMetadata], do_refresh=True) -> None:
''' '''
Adds all given calcs with their metadata to the index. Adds all given calcs with their metadata to the index.
...@@ -162,6 +173,7 @@ def index_all(calcs: Iterable[datamodel.EntryMetadata], do_refresh=True) -> None ...@@ -162,6 +173,7 @@ def index_all(calcs: Iterable[datamodel.EntryMetadata], do_refresh=True) -> None
return failed return failed
# Used in a lot of places
def refresh(): def refresh():
infrastructure.elastic_client.indices.refresh(config.elastic.index_name) infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
...@@ -195,6 +207,8 @@ def _owner_es_query(owner: str, user_id: str = None): ...@@ -195,6 +207,8 @@ def _owner_es_query(owner: str, user_id: str = None):
if user_id is None or not datamodel.User.get(user_id=user_id).is_admin: if user_id is None or not datamodel.User.get(user_id=user_id).is_admin:
raise AuthenticationRequiredError('This can only be used by the admin user.') raise AuthenticationRequiredError('This can only be used by the admin user.')
q = None q = None
elif owner is None:
q = None
else: else:
raise KeyError('Unsupported owner value') raise KeyError('Unsupported owner value')
......
...@@ -209,15 +209,14 @@ def elastic_infra(monkeysession): ...@@ -209,15 +209,14 @@ def elastic_infra(monkeysession):
monkeysession.setattr('nomad.config.elastic.entries_index', elastic_test_entries_index) monkeysession.setattr('nomad.config.elastic.entries_index', elastic_test_entries_index)
monkeysession.setattr('nomad.config.elastic.materials_index', elastic_test_materials_index) monkeysession.setattr('nomad.config.elastic.materials_index', elastic_test_materials_index)
clear_elastic_infra() # attempt to remove and recreate all indices
try: return clear_elastic_infra()
return infrastructure.setup_elastic()
except Exception:
# try to delete index, error might be caused by changed mapping
return clear_elastic_infra()
def clear_elastic_infra(): def clear_elastic_infra():
'''
Removes and re-creates all indices and mappings.
'''
from elasticsearch_dsl import connections from elasticsearch_dsl import connections
connection = connections.create_connection( connection = connections.create_connection(
hosts=['%s:%d' % (config.elastic.host, config.elastic.port)]) hosts=['%s:%d' % (config.elastic.host, config.elastic.port)])
...@@ -228,18 +227,20 @@ def clear_elastic_infra(): ...@@ -228,18 +227,20 @@ def clear_elastic_infra():
except Exception: except Exception:
pass pass
return infrastructure.setup_elastic() return infrastructure.setup_elastic(create_indices=True)
def clear_elastic(elastic_infra): def clear_elastic(elastic_infra):
'''
Removes all contents from the existing indices.
'''
try: try:
for index in indices: for index in indices:
elastic_infra.delete_by_query( elastic_infra.delete_by_query(
index=index, body=dict(query=dict(match_all={})), index=index, body=dict(query=dict(match_all={})),
wait_for_completion=True, refresh=True) wait_for_completion=True, refresh=True)
except elasticsearch.exceptions.NotFoundError: except elasticsearch.exceptions.NotFoundError:
# it is unclear why this happens, but it happens at least once, when all tests # Happens if a test removed indices without recreating them.
# are executed
clear_elastic_infra() clear_elastic_infra()
assert infrastructure.elastic_client is not None assert infrastructure.elastic_client is not None
......
...@@ -27,6 +27,8 @@ from nomad.metainfo.elasticsearch_extension import ( ...@@ -27,6 +27,8 @@ from nomad.metainfo.elasticsearch_extension import (
Elasticsearch, create_indices, index_entry, index_entries, Elasticsearch, create_indices, index_entry, index_entries,
entry_type, material_type, material_entry_type, entry_index, material_index) entry_type, material_type, material_entry_type, entry_index, material_index)
from tests.conftest import clear_elastic_infra
class Material(MSection): class Material(MSection):
...@@ -187,21 +189,20 @@ def example_entry(): ...@@ -187,21 +189,20 @@ def example_entry():
return entry return entry
@pytest.fixture @pytest.fixture(scope='module')
def elastic_client(elastic_function): def indices(elastic_infra):
yield elastic_function
@pytest.fixture
def indices(elastic_client):
# remove whatever the infrastructure created by default # remove whatever the infrastructure created by default
from nomad.infrastructure import elastic_client
try: try:
elastic_client.indices.delete(index=config.elastic.entries_index) elastic_client.indices.delete(index=config.elastic.entries_index)
elastic_client.indices.delete(index=config.elastic.materials_index) elastic_client.indices.delete(index=config.elastic.materials_index)
except Exception: except Exception:
pass pass
return create_indices(Entry.m_def, Material.m_def) yield create_indices(Entry.m_def, Material.m_def)
# re-establish the default elasticsearch setup.
clear_elastic_infra()
def test_mappings(indices): def test_mappings(indices):
...@@ -302,7 +303,7 @@ def test_index_docs(indices): ...@@ -302,7 +303,7 @@ def test_index_docs(indices):
} }
def test_index_entry(elastic_client, indices, example_entry): def test_index_entry(elastic, indices, example_entry):
index_entry(example_entry, update_material=True) index_entry(example_entry, update_material=True)
assert_entry_indexed(example_entry) assert_entry_indexed(example_entry)
...@@ -320,7 +321,7 @@ def test_index_entry(elastic_client, indices, example_entry): ...@@ -320,7 +321,7 @@ def test_index_entry(elastic_client, indices, example_entry):
pytest.param('1-1, 2-1', '1-2', '2-1, 1-2', id='moved-entry-between-materials-remaining'), pytest.param('1-1, 2-1', '1-2', '2-1, 1-2', id='moved-entry-between-materials-remaining'),
pytest.param('1-1', '1-1*', '1-1*', id='update-material-property') pytest.param('1-1', '1-1*', '1-1*', id='update-material-property')
]) ])
def test_index_entries(elastic_client, indices, before, to_index, after): def test_index_entries(elastic, indices, before, to_index, after):
def create_entry(spec: str, material_kwargs: dict = None): def create_entry(spec: str, material_kwargs: dict = None):
entry_id, material_id = spec.split('-') entry_id, material_id = spec.split('-')
changed = material_id.endswith('*') changed = material_id.endswith('*')
......
...@@ -28,6 +28,7 @@ from nomad.archive import read_partial_archive_from_mongo ...@@ -28,6 +28,7 @@ from nomad.archive import read_partial_archive_from_mongo
from nomad.files import UploadFiles, StagingUploadFiles, PublicUploadFiles from nomad.files import UploadFiles, StagingUploadFiles, PublicUploadFiles
from nomad.processing import Upload, Calc from nomad.processing import Upload, Calc
from nomad.processing.base import task as task_decorator, FAILURE, SUCCESS from nomad.processing.base import task as task_decorator, FAILURE, SUCCESS
from nomad.app.v1.search import search
from tests.test_search import assert_search_upload from tests.test_search import assert_search_upload
from tests.test_files import assert_upload_files from tests.test_files import assert_upload_files
...@@ -127,6 +128,12 @@ def assert_processing(upload: Upload, published: bool = False): ...@@ -127,6 +128,12 @@ def assert_processing(upload: Upload, published: bool = False):
upload_files.close() upload_files.close()
search_results = search(owner=None, query={'upload_id': upload.upload_id})
assert search_results.pagination.total == Calc.objects(upload_id=upload.upload_id).count()
for entry in search_results.data:
assert entry['published'] == published
assert entry['upload_id'] == upload.upload_id
def test_processing(processed, no_warn, mails, monkeypatch): def test_processing(processed, no_warn, mails, monkeypatch):
assert_processing(processed) assert_processing(processed)
......
...@@ -68,13 +68,19 @@ class TestAdmin: ...@@ -68,13 +68,19 @@ class TestAdmin:
cli, ['admin', 'reset'], catch_exceptions=False) cli, ['admin', 'reset'], catch_exceptions=False)
assert result.exit_code == 1 assert result.exit_code == 1
# TODO this has somekind of raise condition in it and the test fails every other time
# on the CI/CD
# def test_clean(self, published): # def test_clean(self, published):
# upload_id = published.upload_id # upload_id = published.upload_id
# Upload.objects(upload_id=upload_id).delete() # Upload.objects(upload_id=upload_id).delete()
# assert published.upload_files.exists() # assert published.upload_files.exists()
# assert Calc.objects(upload_id=upload_id).first() is not None # assert Calc.objects(upload_id=upload_id).first() is not None
# search.refresh()
# assert search.SearchRequest().search_parameter('upload_id', upload_id).execute()['total'] > 0 # assert search.SearchRequest().search_parameter('upload_id', upload_id).execute()['total'] > 0
# # TODO test new index pair
# # assert es_search(owner=None, query=dict(upload_id=upload_id)).pagination.total == 0
# # assert es_search(owner=None, query=dict(upload_id=upload_id)).pagination.total == 0
# result = click.testing.CliRunner().invoke( # result = click.testing.CliRunner().invoke(
# cli, ['admin', 'clean', '--force', '--skip-es'], catch_exceptions=False) # cli, ['admin', 'clean', '--force', '--skip-es'], catch_exceptions=False)
...@@ -82,7 +88,10 @@ class TestAdmin: ...@@ -82,7 +88,10 @@ class TestAdmin:
# assert result.exit_code == 0 # assert result.exit_code == 0
# assert not published.upload_files.exists() # assert not published.upload_files.exists()
# assert Calc.objects(upload_id=upload_id).first() is None # assert Calc.objects(upload_id=upload_id).first() is None
# search.refresh()
# assert search.SearchRequest().search_parameter('upload_id', upload_id).execute()['total'] > 0 # assert search.SearchRequest().search_parameter('upload_id', upload_id).execute()['total'] > 0
# # TODO test new index pair
# # assert es_search(owner=None, query=dict(upload_id=upload_id)).pagination.total == 0
@pytest.mark.parametrize('upload_time,dry,lifted', [ @pytest.mark.parametrize('upload_time,dry,lifted', [
(datetime.datetime.now(), False, False), (datetime.datetime.now(), False, False),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment