Commit 7512fcfb authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

WIP for multiple domain support.

parent caebe599
from nomad import datamodel
print(datamodel.CalcWithMetadata(domain='DFT', calc_id='test').__class__.__name__)
print(datamodel.CalcWithMetadata(calc_id='test').__class__.__name__)
print(datamodel.CalcWithMetadata(domain='EMS', calc_id='test').__class__.__name__)
......@@ -81,7 +81,7 @@ _search_request_parser.add_argument(
_search_request_parser.add_argument(
'metrics', type=str, action='append', help=(
'Metrics to aggregate over all quantities and their values as comma separated list. '
'Possible values are %s.' % ', '.join(datamodel.Domain.instance.metrics_names)))
'Possible values are %s.' % ', '.join(search.metrics_names)))
_search_request_parser.add_argument(
'statistics', type=bool, help=('Return statistics.'))
for group_name in search.groups:
......@@ -96,7 +96,7 @@ _repo_calcs_model_fields = {
'A dict with all statistics. Each statistic is dictionary with a metrics dict as '
'value and quantity value as key. The possible metrics are code runs(calcs), %s. '
'There is a pseudo quantity "total" with a single value "all" that contains the '
' metrics over all results. ' % ', '.join(datamodel.Domain.instance.metrics_names))),
' metrics over all results. ' % ', '.join(search.metrics_names))),
'python': fields.String(description=(
'A string of python code snippet which can be executed to reproduce the api result.')),
'curl': fields.String(description=(
......
......@@ -173,6 +173,6 @@ def local(calc_id, show_backend, show_metadata, skip_normalizers, not_strict, **
backend.write_json(sys.stdout, pretty=True)
if show_metadata:
metadata = CalcWithMetadata()
metadata = CalcWithMetadata(domain=local.parser.domain)
metadata.apply_domain_metadata(backend)
ujson.dump(metadata.to_dict(), sys.stdout, indent=4)
......@@ -129,6 +129,6 @@ def _parse(
if show_backend:
backend.write_json(sys.stdout, pretty=True)
if show_metadata:
metadata = CalcWithMetadata()
metadata = CalcWithMetadata(domain='dft') # TODO take domain from matched parser
metadata.apply_domain_metadata(backend)
json.dump(metadata.to_dict(), sys.stdout, indent=4)
......@@ -211,7 +211,6 @@ datacite = NomadConfig(
version = '0.7.5'
commit = gitinfo.commit
release = 'devel'
domain = 'DFT'
service = 'unknown nomad service'
auxfile_cutoff = 100
parser_matching_size = 9128
......
......@@ -64,6 +64,3 @@ from nomad.datamodel import ems, dft
from nomad.datamodel.dft import DFTCalcWithMetadata
from nomad.datamodel.ems import EMSEntryWithMetadata
from nomad.datamodel.metainfo import Dataset, User, UserMetadata
# Override the CalcWithMetadata with the domain specific decendant
setattr(sys.modules['nomad.datamodel'], 'CalcWithMetadata', Domain.instance.domain_entry_class)
......@@ -57,6 +57,8 @@ class CalcWithMetadata(Mapping):
to fill these attributes from processed entries, i.e. instance of :class:`nomad.parsing.LocalBackend`.
Attributes:
domain: Must be the key for a registered domain. This determines which actual
subclass is instantiated.
upload_id: The ``upload_id`` of the calculations upload (random UUID).
calc_id: The unique mainfile based calculation id.
calc_hash: The raw file content based checksum/hash of this calculation.
......@@ -79,7 +81,18 @@ class CalcWithMetadata(Mapping):
references: Objects describing user provided references, keys are ``id`` and ``value``.
datasets: A list of dataset ids. The corresponding :class:`Dataset`s must exist.
"""
def __init__(self, **kwargs):
def __new__(cls, domain: str = None, **kwargs):
if domain is not None:
domain_obj = Domain.instances.get(domain)
assert domain_obj is not None
return super().__new__(domain_obj.domain_entry_class)
else:
return super().__new__(cls)
def __init__(self, domain: str = None, **kwargs):
self.domain = domain
# id relevant metadata
self.upload_id: str = None
self.calc_id: str = None
......@@ -226,6 +239,7 @@ class DomainQuantity:
elastic_value: Callable[[Any], Any] = None,
argparse_action: str = 'append'):
self.domain: str = None
self._name: str = None
self.description = description
self.multi = multi
......@@ -248,7 +262,10 @@ class DomainQuantity:
@property
def name(self) -> str:
return self._name
if self.domain is not None:
return '%s.%s' % (self.domain, self._name)
else:
return self._name
@name.setter
def name(self, name: str) -> None:
......@@ -256,7 +273,7 @@ class DomainQuantity:
if self.metadata_field is None:
self.metadata_field = name
if self.elastic_field is None:
self.elastic_field = name
self.elastic_field = self.name
class Domain:
......@@ -288,7 +305,6 @@ class Domain:
root_sections: The name of the possible root sections for this domain.
metainfo_all_package: The name of the full metainfo package for this domain.
"""
instance: 'Domain' = None
instances: Dict[str, 'Domain'] = {}
base_quantities = dict(
......@@ -362,14 +378,13 @@ class Domain:
root_sections=['section_run', 'section_entry_info'],
metainfo_all_package='all.nomadmetainfo.json') -> None:
for quantity in quantities.values():
quantity.domain = name
domain_quantities = quantities
domain_metrics = metrics
domain_groups = groups
if name == config.domain:
assert Domain.instance is None, 'you can only define one domain.'
Domain.instance = self
Domain.instances[name] = self
self.name = name
......@@ -379,8 +394,8 @@ class Domain:
self.metainfo_all_package = metainfo_all_package
self.default_statistics = default_statistics
reference_domain_calc = domain_entry_class()
reference_general_calc = CalcWithMetadata()
reference_domain_calc = CalcWithMetadata(domain=name)
reference_general_calc = CalcWithMetadata(domain=None)
# add non specified quantities from additional metadata class fields
for quantity_name in reference_domain_calc.__dict__.keys():
......@@ -440,6 +455,14 @@ class Domain:
""" Just the names of all metrics. """
return list(self.aggregations.keys())
@property
def order_default_quantity(self) -> str:
for quantity in self.quantities.values():
if quantity.order_default:
return quantity.name
assert False, 'each domain must defina an order_default quantity'
def get_optional_backend_value(backend, key, section, unavailable_value=None, logger=None):
# Section is section_system, section_symmetry, etc...
......
......@@ -194,7 +194,7 @@ def only_atoms(atoms):
Domain(
'DFT', DFTCalcWithMetadata,
'dft', DFTCalcWithMetadata,
quantities=dict(
formula=DomainQuantity(
'The chemical (hill) formula of the simulated system.',
......
......@@ -112,7 +112,7 @@ class EMSEntryWithMetadata(CalcWithMetadata):
Domain(
'EMS', EMSEntryWithMetadata,
'ems', EMSEntryWithMetadata,
root_sections=['section_experiment', 'section_entry_info'],
metainfo_all_package='all.experimental.nomadmetainfo.json',
quantities=dict(
......
......@@ -149,9 +149,6 @@ def match_parser(mainfile: str, upload_files: Union[str, files.StagingUploadFile
if strict and (isinstance(parser, MissingParser) or isinstance(parser, EmptyParser)):
continue
if parser.domain != config.domain:
continue
if parser.is_mainfile(mainfile_path, mime_type, buffer, decoded_buffer, compression):
# potentially convert the file
if encoding in ['iso-8859-1']:
......@@ -415,48 +412,48 @@ parsers = [
r'Copyright \(C\) [0-9]+ TURBOMOLE GmbH, Karlsruhe')
),
LegacyParser(
name='parsers/skeleton', code_name='skeleton', domain='EMS',
name='parsers/skeleton', code_name='skeleton', domain='ems',
parser_class_name='skeletonparser.SkeletonParserInterface',
mainfile_mime_re=r'(application/json)|(text/.*)',
mainfile_contents_re=(r'skeleton experimental metadata format')
),
LegacyParser(
name='parsers/mpes', code_name='mpes', domain='EMS',
name='parsers/mpes', code_name='mpes', domain='ems',
parser_class_name='mpesparser.MPESParserInterface',
mainfile_mime_re=r'(application/json)|(text/.*)',
mainfile_name_re=(r'.*.meta'),
mainfile_contents_re=(r'"data_repository_name": "zenodo.org"')
),
LegacyParser(
name='parsers/aptfim', code_name='mpes', domain='EMS',
name='parsers/aptfim', code_name='mpes', domain='ems',
parser_class_name='aptfimparser.APTFIMParserInterface',
mainfile_mime_re=r'(application/json)|(text/.*)',
mainfile_name_re=(r'.*.aptfim')
),
LegacyParser(
name='parsers/qbox', code_name='qbox', domain='DFT',
name='parsers/qbox', code_name='qbox', domain='dft',
parser_class_name='qboxparser.QboxParser',
mainfile_mime_re=r'(application/xml)|(text/.*)',
mainfile_contents_re=(r'http://qboxcode.org')
),
LegacyParser(
name='parsers/dmol', code_name='DMol3', domain='DFT',
name='parsers/dmol', code_name='DMol3', domain='dft',
parser_class_name='dmol3parser.Dmol3Parser',
mainfile_name_re=r'.*\.outmol',
mainfile_contents_re=r'Materials Studio DMol\^3'
),
LegacyParser(
name='parser/fleur', code_name='fleur', domain='DFT',
name='parser/fleur', code_name='fleur', domain='dft',
parser_class_name='fleurparser.FleurParser',
mainfile_contents_re=r'This output is generated by fleur.'
),
LegacyParser(
name='parser/molcas', code_name='MOLCAS', domain='DFT',
name='parser/molcas', code_name='MOLCAS', domain='dft',
parser_class_name='molcasparser.MolcasParser',
mainfile_contents_re=r'M O L C A S'
),
LegacyParser(
name='parser/onetep', code_name='ONETEP', domain='DFT',
name='parser/onetep', code_name='ONETEP', domain='dft',
parser_class_name='onetepparser.OnetepParser',
mainfile_contents_re=r'####### # # ####### ####### ####### ######'
)
......@@ -468,7 +465,7 @@ if config.use_empty_parsers:
# to keep the PIDs. These parsers will not match for new, non migrated data.
parsers.extend([
EmptyParser(
name='missing/octopus', code_name='Octopus', domain='DFT',
name='missing/octopus', code_name='Octopus', domain='dft',
mainfile_name_re=r'(inp)|(.*/inp)'
),
EmptyParser(
......@@ -480,7 +477,7 @@ if config.use_empty_parsers:
mainfile_name_re=r'.*\.scf'
),
EmptyParser(
name='missing/fhi-aims', code_name='FHI-aims', domain='DFT',
name='missing/fhi-aims', code_name='FHI-aims', domain='dft',
mainfile_name_re=r'.*\.fhiaims'
)
])
......
......@@ -573,7 +573,12 @@ class LocalBackend(LegacyParserBackend, metaclass=DelegatingMeta):
# TODO the root sections should be determined programatically
for root_section in root_sections:
json_writer.key(root_section)
self._write(json_writer, self._delegate.results[root_section], filter=filter)
try:
self._write(json_writer, self._delegate.results[root_section], filter=filter)
except LookupError:
# not all root_sections might be part of the backend
self._write(json_writer, {})
pass
for name, section in self.mi2_data.items():
json_writer.key_value(name, section.m_to_dict())
......
......@@ -35,7 +35,7 @@ class Parser(metaclass=ABCMeta):
"""
def __init__(self):
self.domain = 'DFT'
self.domain = 'dft'
@abstractmethod
def is_mainfile(
......@@ -105,7 +105,7 @@ class MatchingParser(Parser):
mainfile_contents_re: A regexp that is used to match the first 1024 bytes of a
potential mainfile.
mainfile_name_re: A regexp that is used to match the paths of potential mainfiles
domain: The domain that this parser should be used for. Default is 'DFT'.
domain: The domain that this parser should be used for. Default is 'dft'.
supported_compressions: A list of [gz, bz2], if the parser supports compressed files
"""
def __init__(
......@@ -114,7 +114,7 @@ class MatchingParser(Parser):
mainfile_binary_header: bytes = None,
mainfile_mime_re: str = r'text/.*',
mainfile_name_re: str = r'.*',
domain='DFT',
domain='dft',
supported_compressions: List[str] = []) -> None:
super().__init__()
......
......@@ -40,7 +40,7 @@ from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagi
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parser_dict, match_parser, LocalBackend
from nomad.normalizing import normalizers
from nomad.datamodel import UploadWithMetadata, Domain
from nomad.datamodel import UploadWithMetadata
def _pack_log_event(logger, method_name, event_dict):
......@@ -66,6 +66,12 @@ _log_processors = [
JSONRenderer(sort_keys=True)]
_all_root_sections = []
for domain in datamodel.Domain.instances.values():
for root_section in domain.root_sections:
_all_root_sections.append(root_section)
class Calc(Proc):
"""
Instances of this class represent calculations. This class manages the elastic
......@@ -226,6 +232,7 @@ class Calc(Proc):
# save preliminary minimum calc metadata in case processing fails
# successful processing will replace it with the actual metadata
calc_with_metadata = datamodel.CalcWithMetadata(
domain=parser_dict[self.parser].domain,
upload_id=self.upload_id,
calc_id=self.calc_id,
calc_hash=self.upload_files.calc_hash(self.mainfile),
......@@ -367,7 +374,7 @@ class Calc(Proc):
def normalizing(self):
""" The *task* that encapsulates all normalizing related actions. """
for normalizer in normalizers:
if normalizer.domain != config.domain:
if normalizer.domain != parser_dict[self.parser].domain:
continue
normalizer_name = normalizer.__name__
......@@ -415,7 +422,7 @@ class Calc(Proc):
logger, 'archived', step='archive',
input_size=self.mainfile_file.size) as log_data:
with self.upload_files.archive_file(self.calc_id, 'wt') as out:
self._parser_backend.write_json(out, pretty=True, root_sections=Domain.instance.root_sections)
self._parser_backend.write_json(out, pretty=True, root_sections=_all_root_sections)
log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_id).size)
......
......@@ -72,8 +72,10 @@ class Dataset(InnerDoc):
class WithDomain(IndexMeta):
""" Override elasticsearch_dsl metaclass to sneak in domain specific mappings """
def __new__(cls, name, bases, attrs):
for quantity in datamodel.Domain.instance.domain_quantities.values():
attrs[quantity.name] = quantity.elastic_mapping
for domain in datamodel.Domain.instances.values():
for quantity in domain.domain_quantities.values():
attrs[quantity.elastic_field] = quantity.elastic_mapping
return super(WithDomain, cls).__new__(cls, name, bases, attrs)
......@@ -82,6 +84,7 @@ class Entry(Document, metaclass=WithDomain):
class Index:
name = config.elastic.index_name
domain = Keyword()
upload_id = Keyword()
upload_time = Date()
upload_name = Keyword()
......@@ -115,6 +118,7 @@ class Entry(Document, metaclass=WithDomain):
return entry
def update(self, source: datamodel.CalcWithMetadata) -> None:
self.domain = source.domain
self.upload_id = source.upload_id
self.upload_time = source.upload_time
self.upload_name = source.upload_name
......@@ -157,10 +161,11 @@ class Entry(Document, metaclass=WithDomain):
self.datasets = [Dataset.from_dataset_id(dataset_id) for dataset_id in source.datasets]
self.external_id = source.external_id
for quantity in datamodel.Domain.instance.domain_quantities.values():
setattr(
self, quantity.name,
quantity.elastic_value(getattr(source, quantity.metadata_field)))
if self.domain is not None:
for quantity in datamodel.Domain.instances[self.domain].domain_quantities.values():
setattr(
self, quantity.name,
quantity.elastic_value(getattr(source, quantity.metadata_field)))
def delete_upload(upload_id):
......@@ -217,26 +222,40 @@ def refresh():
infrastructure.elastic_client.indices.refresh(config.elastic.index_name)
quantities = datamodel.Domain.instance.quantities
quantities = {
quantity_name: quantity
for domain in datamodel.Domain.instances.values()
for quantity_name, quantity in domain.quantities.items()}
"""The available search quantities """
metrics = datamodel.Domain.instance.metrics
metrics = {
metric_name: metric
for domain in datamodel.Domain.instances.values()
for metric_name, metric in domain.metrics.items()}
"""
The available search metrics. Metrics are integer values given for each entry that can
be used in statistics (aggregations), e.g. the sum of all total energy calculations or cardinality of
all unique geometries.
"""
metrics_names = datamodel.Domain.instance.metrics_names
metrics_names = [metric_name for domain in datamodel.Domain.instances.values() for metric_name in domain.metrics_names]
""" Names of all available metrics """
groups = datamodel.Domain.instance.groups
groups = {
key: value
for domain in datamodel.Domain.instances.values()
for key, value in domain.groups.items()}
"""The available groupable quantities"""
order_default_quantity = None
for quantity in datamodel.Domain.instance.quantities.values():
if quantity.order_default:
order_default_quantity = quantity.name
order_default_quantities = {
domain_name: domain.order_default_quantity
for domain_name, domain in datamodel.Domain.instances.items()
}
default_statistics = {
domain_name: domain.default_statistics
for domain_name, domain in datamodel.Domain.instances.items()
}
class SearchRequest:
......@@ -268,7 +287,8 @@ class SearchRequest:
There is also scrolling for quantities to go through all quantity values. There is no
paging for aggregations.
'''
def __init__(self, query=None):
def __init__(self, domain: str, query=None):
self._domain = domain
self._query = query
self._search = Search(index=config.elastic.index_name)
......@@ -407,7 +427,7 @@ class SearchRequest:
"""
Configures the domain's default statistics.
"""
for name in datamodel.Domain.instance.default_statistics:
for name in default_statistics[self._domain]:
self.statistic(
name,
quantities[name].aggregations,
......@@ -575,7 +595,7 @@ class SearchRequest:
yield hit.to_dict()
def execute_paginated(
self, page: int = 1, per_page=10, order_by: str = order_default_quantity,
self, page: int = 1, per_page=10, order_by: str = None,
order: int = -1):
"""
Executes the search and returns paginated results. Those are sorted.
......@@ -586,6 +606,9 @@ class SearchRequest:
order_by: The quantity to order by.
order: -1 or 1 for descending or ascending order.
"""
if order_by is None:
order_by = order_default_quantities[self._domain]
search = self._search.query(self.q)
if order_by not in quantities:
......
......@@ -149,8 +149,8 @@ volumes:
nomad: /nomad
## Everything else
# The domain configuration, currently there is DFT and EMS
domain: DFT
# The domain configuration, currently there is dft and ems
domain: dft
springerDbPath: /nomad/fairdi/db/data/springer.db
......
......@@ -60,7 +60,7 @@ def get_upload_with_metadata(upload: dict) -> UploadWithMetadata:
""" Create a :class:`UploadWithMetadata` from a API upload json record. """
return UploadWithMetadata(
upload_id=upload['upload_id'], calcs=[
CalcWithMetadata(calc_id=calc['calc_id'], mainfile=calc['mainfile'])
CalcWithMetadata(domain='dft', calc_id=calc['calc_id'], mainfile=calc['mainfile'])
for calc in upload['calcs']['results']])
......@@ -222,6 +222,12 @@ class TestUploads:
assert calc['tasks_status'] == SUCCESS
assert calc['current_task'] == 'archiving'
assert len(calc['tasks']) == 3
print('##########')
for key, value in calc['metadata'].items():
print(key, value)
print('##########')
assert 'dft.atoms' in calc['metadata']
assert api.get('/archive/logs/%s/%s' % (calc['upload_id'], calc['calc_id']), headers=test_user_auth).status_code == 200
if upload['calcs']['pagination']['total'] > 1:
......@@ -232,7 +238,7 @@ class TestUploads:
upload_with_metadata = get_upload_with_metadata(upload)
assert_upload_files(upload_with_metadata, files.StagingUploadFiles)
assert_search_upload(upload_with_metadata, additional_keys=['atoms', 'system'])
assert_search_upload(upload_with_metadata, additional_keys=['dft.atoms', 'dft.system'])
def assert_published(self, api, test_user_auth, upload_id, proc_infra, metadata={}):
rv = api.get('/uploads/%s' % upload_id, headers=test_user_auth)
......@@ -691,7 +697,7 @@ class TestRepo():
example_dataset.m_x('me').create()
calc_with_metadata = CalcWithMetadata(
upload_id='example_upload_id', calc_id='0', upload_time=today)
domain='dft', upload_id='example_upload_id', calc_id='0', upload_time=today)
calc_with_metadata.files = ['test/mainfile.txt']
calc_with_metadata.apply_domain_metadata(normalized)
......@@ -1699,7 +1705,8 @@ class TestDataset:
@pytest.fixture()
def example_dataset_with_entry(self, mongo, elastic, example_datasets):
calc = CalcWithMetadata(
calc_id='1', upload_id='1', published=True, with_embargo=False, datasets=['1'])
domain='dft', calc_id='1', upload_id='1', published=True, with_embargo=False,
datasets=['1'])
Calc(
calc_id='1', upload_id='1', create_time=datetime.datetime.now(),
metadata=calc.to_dict()).save()
......@@ -1731,7 +1738,8 @@ class TestDataset:
def test_assign_doi_unpublished(self, api, test_user_auth, example_datasets):
calc = CalcWithMetadata(
calc_id='1', upload_id='1', published=False, with_embargo=False, datasets=['1'])
domain='dft', calc_id='1', upload_id='1', published=False, with_embargo=False,
datasets=['1'])
Calc(
calc_id='1', upload_id='1', create_time=datetime.datetime.now(),
metadata=calc.to_dict()).save()
......
......@@ -167,21 +167,21 @@ def mongo(mongo_infra):
@pytest.fixture(scope='session')
def elastic_infra(monkeysession):
""" Provides elastic infrastructure to the session """
monkeysession.setattr('nomad.config.elastic.index_name', 'test_nomad_fairdi_0_6')
monkeysession.setattr('nomad.config.elastic.index_name', 'nomad_fairdi_test')
try:
return infrastructure.setup_elastic()
except Exception:
# try to delete index, error might be caused by changed mapping
from elasticsearch_dsl import connections
connections.create_connection(hosts=['%s:%d' % (config.elastic.host, config.elastic.port)]) \
.indices.delete(index='test_nomad_fairdi_0_6')
.indices.delete(index='nomad_fairdi_test')
return infrastructure.setup_elastic()
def clear_elastic(elastic):
try:
elastic.delete_by_query(
index='test_nomad_fairdi_0_6', body=dict(query=dict(match_all={})),
index='nomad_fairdi_test', body=dict(query=dict(match_all={})),
wait_for_completion=True, refresh=True)
except elasticsearch.exceptions.NotFoundError:
# it is unclear why this happens, but it happens at least once, when all tests
......@@ -658,8 +658,8 @@ def create_test_structure(
backend = run_normalize(backend)
calc = CalcWithMetadata(
upload_id='test_uload_id', calc_id='test_calc_id_%d' % id, mainfile='test_mainfile',
published=True, with_embargo=False)
domain='dft', upload_id='test_uload_id', calc_id='test_calc_id_%d' % id,
mainfile='test_mainfile', published=True, with_embargo=False)
calc.apply_domain_metadata(backend)
if metadata is not None:
calc.update(**metadata)
......
......@@ -379,8 +379,8 @@ def test_malicious_parser_task_failure(proc_infra, failure, test_user):
def test_ems_data(proc_infra, test_user, monkeypatch):
monkeypatch.setattr('nomad.config.domain', 'EMS')
monkeypatch.setattr('nomad.datamodel.Domain.instance', datamodel.Domain.instances['EMS'])
monkeypatch.setattr('nomad.config.domain', 'ems')
monkeypatch.setattr('nomad.datamodel.Domain.instance', datamodel.Domain.instances['ems'])
monkeypatch.setattr('nomad.datamodel.CalcWithMetadata', datamodel.Domain.instance.domain_entry_class)
upload = run_processing(('test_ems_upload', 'tests/data/proc/example_ems.zip'), test_user)
......