Commit 07fc2b39 authored by Theodore Chang's avatar Theodore Chang Committed by David Sikter
Browse files

Resolve "Hash based ids for metainfo definitions"

parent bf2e460b
......@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import datetime
from typing import Any, Dict
......@@ -68,22 +69,24 @@ class PackageDefinition(MSection):
def __init__(self, package: Package, **kwargs):
super().__init__()
self.definition_id = package.m_def.definition_id
self.definition_id = package.definition_id
if 'upload_id' in kwargs:
self.upload_id = kwargs['upload_id']
del kwargs['upload_id']
self.qualified_name = package.m_def.qualified_name()
self.qualified_name = package.qualified_name()
self.date_created = datetime.datetime.utcnow()
self.package_definition = package.m_to_dict(**kwargs)
self.section_definition_ids = [section.definition_id for section in package.section_definitions]
self.quantity_definition_ids = [
quantity.definition_id for section in package.section_definitions for quantity in section.quantities]
def store_package_definition(package: Package, **kwargs):
if package is None:
return
if PackageDefinition.m_def.a_mongo.objects(definition_id=package.m_def.definition_id).count() > 0:
logger.info(f'Package {package.m_def.definition_id} already exists. Skipping.')
if PackageDefinition.m_def.a_mongo.objects(definition_id=package.definition_id).count() > 0:
logger.info(f'Package {package.definition_id} already exists. Skipping.')
return
mongo_package = PackageDefinition(package, **kwargs)
......@@ -124,7 +127,13 @@ def get_package_by_section_definition_id(section_definition_id: str) -> dict:
detail='Package not found. The given section definition is not contained in any packages.'
)
return packages.first().package_definition
result = packages.first()
pkg_definition = result.package_definition
# add entry_id_based_name as a field which will be later used as the package name
pkg_definition['entry_id_based_name'] = str(result.qualified_name)
return copy.deepcopy(pkg_definition)
@router.get(
......
......@@ -379,6 +379,8 @@ process = NomadConfig(
# Configures if to attach definition id to `m_def`, note it is different from `m_def_id`.
# The `m_def_id` will be exported with the `with_def_id=True` via `m_to_dict`.
add_definition_id_to_reference=False,
# write `m_def_id` to the archive
write_definition_id_to_archive=False,
index_materials=True,
reuse_parser=True,
metadata_file_name='nomad',
......
......@@ -142,6 +142,11 @@ class Context(MetainfoContext):
if target_root in self.urls:
return f'{self.urls[target_root]}#{fragment}'
# when the package is loaded from mongo, there is no metadata
# and since it is versioned, it is not ideal to create a reference based on processed data
if getattr(target_root, 'metadata', None) is None:
return None # type: ignore
upload_id, entry_id = self._get_ids(target_root)
assert entry_id is not None, 'Only archives with entry_id can be referenced'
assert upload_id is not None, 'Only archives with upload_id can be referenced'
......@@ -353,7 +358,7 @@ class ClientContext(Context):
# TODO currently upload_id might be None
if upload_id is None:
# try to find a local file, useful when the context is used for local parsing
file_path = os.path.join(self.local_dir, path)
file_path = os.path.join(self.local_dir, path) if self.local_dir else path
if os.path.exists(file_path):
from nomad.parsing.parser import ArchiveParser
with open(file_path, 'rt') as f:
......@@ -388,11 +393,17 @@ class ClientContext(Context):
return f'<unavailable url>/#{value.m_path()}'
def retrieve_package_by_section_definition_id(self, definition_reference: str, definition_id: str) -> dict:
try:
url_parts = urlsplit(definition_reference)
url = urlunsplit((url_parts.scheme, url_parts.netloc, url_parts.path, f'metainfo/{definition_id}', '',))
except ValueError:
if definition_reference.startswith('http'):
try:
url_parts = urlsplit(definition_reference)
# it appears to be a valid remote url
# we assume the netloc is the installation_url
url = urlunsplit((url_parts.scheme, url_parts.netloc, f'api/v1/metainfo/{definition_id}', '', '',))
except ValueError:
# falls back to default installation_url
url = f'{self.installation_url}/metainfo/{definition_id}'
else:
# falls back to default installation_url
url = f'{self.installation_url}/metainfo/{definition_id}'
response = requests.get(url)
......
......@@ -499,6 +499,10 @@ class _QuantityType(DataType):
else:
type_data = value.target_section_def.m_path()
from nomad import config
if config.process.store_package_definition_in_mongo:
type_data += f'@{value.target_section_def.definition_id}'
return dict(type_kind='reference', type_data=type_data)
if isinstance(value, DataType):
......@@ -760,6 +764,13 @@ class _SectionReference(Reference):
return super().set_normalize(section, quantity_def, value)
def serialize(self, section: 'MSection', quantity_def: 'Quantity', value: Any) -> Any:
from nomad import config
def _append_definition_id(section_name) -> str:
if config.process.store_package_definition_in_mongo:
return f'{section_name}@{value.definition_id}'
return section_name
# First we try to use a potentially available Python name to serialize
if isinstance(value, Section):
pkg: MSection = value.m_root()
......@@ -767,7 +778,7 @@ class _SectionReference(Reference):
return f'{pkg.name}.{value.name}'
# Default back to URL
return super().serialize(section, quantity_def, value)
return _append_definition_id(super().serialize(section, quantity_def, value))
def deserialize(self, section: 'MSection', quantity_def: 'Quantity', value: Any) -> Any:
proxy_type = quantity_def.type if quantity_def else SectionReference
......@@ -1065,8 +1076,17 @@ class Context():
raise NotImplementedError()
def resolve_section_definition(self, definition_reference: str, definition_id: str) -> Type[MSectionBound]:
pkg = Package.m_from_dict(self.retrieve_package_by_section_definition_id(definition_reference, definition_id))
pkg_definition = self.retrieve_package_by_section_definition_id(definition_reference, definition_id)
entry_id_based_name = pkg_definition['entry_id_based_name']
del pkg_definition['entry_id_based_name']
pkg = Package.m_from_dict(pkg_definition)
if entry_id_based_name != '*':
pkg.entry_id_based_name = entry_id_based_name
pkg.m_context = self
pkg.init_metainfo()
for section in pkg.section_definitions:
if section.definition_id == definition_id:
return section.section_cls
......@@ -1888,9 +1908,17 @@ class MSection(metaclass=MObjectMeta): # TODO find a way to make this a subclas
definition_name = self.m_def.qualified_name()
if definition_name.startswith('entry_id:'):
# This is not from a python module, use archive reference instead
definition_name = self.m_def.m_root().m_context.create_reference(self, None, self.m_def)
if process.add_definition_id_to_reference:
# two cases:
# 1. loaded from file so archive.definitions.archive is set by parser
# 2. loaded from versioned mongo so entry_id_based_name is set by mongo
# second one has no metadata, so do not create reference
context = self.m_def.m_root().m_context
if context:
relative_name = context.create_reference(self, None, self.m_def)
if relative_name:
definition_name = relative_name
if process.add_definition_id_to_reference and '@' not in definition_name:
definition_name += '@' + self.m_def.definition_id
return definition_name
......@@ -3627,6 +3655,7 @@ class Package(Definition):
super().__init__(*args, **kwargs)
self.errors, self.warnings = [], []
self.archive = None
self.entry_id_based_name = None
def __init_metainfo__(self):
super().__init_metainfo__()
......@@ -3683,13 +3712,22 @@ class Package(Definition):
return super(Package, cls).m_from_dict(data, **kwargs)
def qualified_name(self):
# packages loaded from files have a hot qualified name based on entry id
# this name is not serialized which causes '*' name when reloaded from cold
# we store this name in a `str` and it will be reloaded from cold
# see Context.resolve_section_definition()
if self.entry_id_based_name:
return self.entry_id_based_name
if self.archive:
# If the package was defined within a regular uploaded archive file, we
# use its id, which is a globally unique identifier for the package.
if self.archive.metadata and self.archive.metadata.entry_id:
return f'entry_id:{self.archive.metadata.entry_id}'
self.entry_id_based_name = f'entry_id:{self.archive.metadata.entry_id}'
else:
return f'entry_id:*'
self.entry_id_based_name = f'entry_id:*'
return self.entry_id_based_name
return super().qualified_name()
......
......@@ -1182,12 +1182,18 @@ class Entry(Proc):
archive.processing_logs = filter_processing_logs(self._proc_logs)
if config.process.store_package_definition_in_mongo and archive.definitions is not None:
store_package_definition(archive.definitions, upload_id=archive.metadata.upload_id)
if config.process.store_package_definition_in_mongo:
if archive.definitions is not None:
store_package_definition(archive.definitions, upload_id=archive.metadata.upload_id)
if archive.data is not None:
pkg_definitions = getattr(archive.data.m_def.m_root(), 'definitions', None)
if pkg_definitions is not None:
store_package_definition(pkg_definitions, upload_id=archive.metadata.upload_id)
# save the archive msg-pack
try:
return self.upload_files.write_archive(self.entry_id, archive.m_to_dict())
return self.upload_files.write_archive(
self.entry_id, archive.m_to_dict(with_def_id=config.process.write_definition_id_to_archive))
except Exception as e:
# most likely failed due to domain data, try to write metadata and processing logs
archive = datamodel.EntryArchive(m_context=self.upload.archive_context)
......
......@@ -15,11 +15,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import os
from zipfile import ZipFile
import pytest
from nomad import config
from nomad.app.v1.routers.metainfo import store_package_definition
from nomad.metainfo import MSection
from nomad.datamodel import EntryArchive, ClientContext
from nomad.metainfo import MSection, MetainfoReferenceError
from nomad.utils import generate_entry_id
from tests.processing.test_data import run_processing
@pytest.mark.parametrize('metainfo_data', [
......@@ -44,7 +51,248 @@ def test_metainfo_section_id_endpoint(metainfo_data, mongo_infra, client):
response = client.get(f'metainfo/{section_id}')
assert response.status_code == 200
assert response.json()['data'] == metainfo_data
pkg_definition = response.json()['data']
del pkg_definition['entry_id_based_name']
assert pkg_definition == metainfo_data
response = client.get(f'metainfo/{section_id[::-1]}')
assert response.status_code == 404
def simple_schema(name: str):
return {
"name": "test schema package",
"definitions": {
"section_definitions": [
{
"base_sections": [
"nomad.datamodel.data.EntryData"
],
"name": "Chemical"
},
{
"base_sections": [
"nomad.datamodel.data.EntryData"
],
"name": "Sample",
"quantities": [
{
"name": name,
"type": {
"type_kind": "python",
"type_data": "str"
}
}
]
}
]
}
}
def simple_data(name: str):
return {
"data": {
"m_def": "../upload/raw/schema.json#/definitions/section_definitions/1",
name: "this is my name"
}
}
def test_upload_and_download(client, test_user, proc_infra, mongo_infra, no_warn, monkeypatch, tmp):
monkeypatch.setattr('nomad.config.process.store_package_definition_in_mongo', True)
monkeypatch.setattr('nomad.config.process.add_definition_id_to_reference', True)
monkeypatch.setattr('nomad.config.process.write_definition_id_to_archive', True)
def j(fn: str) -> str:
return os.path.join(tmp, fn)
schema_file_name = 'schema.json'
data_file_name = 'sample.archive.json'
archive_name = 'example_versioned_metainfo.zip'
# 1. generate version one with 'chemicals' quantity
# 2. upload and record version one
def pack_and_publish(name: str):
jschema = j(schema_file_name)
jdata = j(data_file_name)
jarchive = j(archive_name)
with open(jschema, 'w') as f:
json.dump(simple_schema(name), f)
with open(jdata, 'w') as f:
json.dump(simple_data(name), f)
with ZipFile(jarchive, 'w') as zipObj:
zipObj.write(jschema, arcname=schema_file_name)
zipObj.write(jdata, arcname=data_file_name)
return run_processing((name, jarchive), test_user, publish_directly=True)
processed = pack_and_publish('chemicals')
upload_id = processed.upload_id
response = client.get(f'entries/{generate_entry_id(upload_id, data_file_name)}/archive')
entry_data = response.json()['data']['archive']['data']
# check if 'chemicals' quantity is in the entry
assert 'chemicals' in entry_data
# check if package is stored in mongo
response = client.get(f'metainfo/{entry_data["m_def_id"]}')
assert response.status_code == 200
# 3. prepare a new entry refers to the previously uploaded package
data_file_name = 'new_' + data_file_name
with open(j(data_file_name), 'w') as f:
data = simple_data('chemicals')
data['data']['m_def'] += f'@{entry_data["m_def_id"]}'
data['data']['chemicals'] = 'this is my new name'
json.dump(data, f)
processed = run_processing(
(data_file_name.replace('.json', ''), j(data_file_name)), test_user,
publish_directly=True)
response = client.get(f'entries/{generate_entry_id(processed.upload_id, data_file_name)}/archive')
new_entry_data = response.json()['data']['archive']['data']
# 4. check if 'chemicals' quantity is in the entry and has the correct value
assert 'chemicals' in new_entry_data
assert new_entry_data['chemicals'] == 'this is my new name'
# 5. test if client side can read the package using versioned package
new_entry_data = EntryArchive.m_from_dict(data['data'], m_context=ClientContext())
assert new_entry_data.chemicals == 'this is my new name'
# 6. test if client side can detect wrong package version
definition_reference, definition_id = data['data']['m_def'].split('@')
data['data']['m_def'] = f'{definition_reference}@{definition_id[::-1]}'
with pytest.raises(MetainfoReferenceError):
EntryArchive.m_from_dict(data['data'], m_context=ClientContext())
# 7. now test if client side can read the package using non-versioned package
data['data']['m_def'] = f'/upload/{upload_id}/raw/schema.json#/definitions/section_definitions/1'
new_entry_data = EntryArchive.m_from_dict(data['data'], m_context=ClientContext())
assert new_entry_data.chemicals == 'this is my new name'
# 8. generate version two with 'toxicchemicals' quantity
processed = pack_and_publish('toxicchemicals')
response = client.get(f'entries/{generate_entry_id(processed.upload_id, data_file_name)}/archive')
new_entry_data = response.json()['data']['archive']['data']
# check if 'chemicals' quantity is in the entry
assert 'toxicchemicals' in new_entry_data
# check two sections shall have different id
assert entry_data["m_def_id"] != new_entry_data["m_def_id"]
@pytest.fixture(scope='function')
def example_upload_two_schemas():
return {
"schema_1": {
"name": "test schema package",
"definitions": {
"section_definitions": [
{
"base_sections": [
"nomad.datamodel.data.EntryData"
],
"name": "Chemical"
},
{
"base_sections": [
"nomad.datamodel.data.EntryData"
],
"name": "Sample"
}
]
}
},
"schema_2": {
"definitions": {
"section_definitions": [
{
"base_sections": [
"../upload/raw/schema_1.archive.json#/definitions/section_definitions/1"
],
"name": "Sample",
"quantities": [
{
"name": "chemical",
"type": {
"type_kind": "reference",
"type_data": "../upload/raw/schema_1.archive.json#/definitions/section_definitions/0"
}
}
]
}
]
}
},
"chemical": {
"data": {
"m_def": "../upload/raw/schema_1.archive.json#/definitions/section_definitions/0",
"name": "NaCl",
}
},
"sample": {
"data": {
"m_def": "../upload/raw/schema_2.archive.json#/definitions/section_definitions/0",
"name": "MySample",
"chemical": "../upload/raw/chemical.archive.json#/data"
}
}
}
def test_two_schemas(
example_upload_two_schemas, client, test_user, proc_infra, mongo_infra, no_warn, monkeypatch, raw_files_infra):
monkeypatch.setattr('nomad.config.process.store_package_definition_in_mongo', True)
monkeypatch.setattr('nomad.config.process.add_definition_id_to_reference', True)
monkeypatch.setattr('nomad.config.process.write_definition_id_to_archive', True)
def tmp(fn: str) -> str:
return os.path.join(config.fs.tmp, fn)
def public(fn: str) -> str:
return os.path.join(config.fs.public, f"ex/{fn.replace('.zip', '')}/raw-public.plain.zip")
archive_name = 'example_upload_two_schemas.zip'
# 1. pack and process initial archive containing two schemas
with ZipFile(tmp(archive_name), 'w') as zipObj:
for k, v in example_upload_two_schemas.items():
zipObj.writestr(f'{k}.archive.json', json.dumps(v))
processed = run_processing(
(archive_name.replace('.zip', ''), tmp(archive_name)), test_user,
publish_directly=True)
# 2. manually remove schema files
with ZipFile(public(archive_name), 'w') as zipObj:
for k, v in example_upload_two_schemas.items():
if 'schema' in k:
continue
zipObj.writestr(f'{k}.archive.json', json.dumps(v))
# retrieve the archive
response = client.get(f'entries/{generate_entry_id(processed.upload_id, "sample.archive.json")}/archive')
entry_data = response.json()['data']['archive']['data']
entry_data['chemical'] = f'/entries/{generate_entry_id(processed.upload_id, "chemical.archive.json")}/archive#/data'
# get data in absence of original schema files
entry = EntryArchive.m_from_dict(entry_data, m_context=ClientContext())
assert entry.m_def.name == 'Sample'
chemical = entry.chemical.m_proxy_resolve()
assert chemical.m_def.name == 'Chemical'
......@@ -112,7 +112,9 @@ def test_from_dict(metainfo_data, monkeypatch, mongo_infra):
section_id = package.section_definitions[0].definition_id
assert get_package_by_section_definition_id(section_id) == metainfo_data
pkg_definition = get_package_by_section_definition_id(section_id)
del pkg_definition['entry_id_based_name']
assert pkg_definition == metainfo_data
def test_with_meta(example):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment