Commit 12b7c869 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added basic processing of from_oasis uploads.

parent 1b57c4d6
Pipeline #89138 passed with stages
in 24 minutes and 12 seconds
......@@ -29,7 +29,7 @@ from elasticsearch.exceptions import NotFoundError
import elasticsearch.helpers
from datetime import datetime
from nomad import search, utils, datamodel, processing as proc, infrastructure, files
from nomad import search, utils, datamodel, processing as proc, infrastructure, files, metainfo
from nomad.datamodel import Dataset, User, EditableUserMetadata
from nomad.app import common
from nomad.app.common import RFC3339DateTime, DotKeyNested
......@@ -465,6 +465,7 @@ _repo_edit_model = api.model('RepoEdit', {
api.model('RepoEditActions', {
quantity.name: repo_edit_action_field(quantity)
for quantity in EditableUserMetadata.m_def.definitions
if isinstance(quantity, metainfo.Quantity)
}), skip_none=True,
description='Each action specifies a single value (even for multi valued quantities).'),
'success': fields.Boolean(description='If the overall edit can/could be done. Only in API response.'),
......
......@@ -91,8 +91,8 @@ from .dft import DFTMetadata
from .ems import EMSMetadata
from .qcms import QCMSMetadata
from .datamodel import (
Dataset, User, Author, EditableUserMetadata, UserProvidableMetadata, MongoMetadata,
EntryMetadata, EntryArchive)
Dataset, User, Author, EditableUserMetadata, UserProvidableMetadata, OasisMetadata,
MongoMetadata, EntryMetadata, EntryArchive)
from .optimade import OptimadeEntry, Species
from .metainfo import m_env
......
......@@ -239,6 +239,11 @@ class EditableUserMetadata(metainfo.MCategory):
m_def = metainfo.Category(categories=[UserProvidableMetadata])
class OasisMetadata(metainfo.MCategory):
''' NOMAD entry metadata quantities that can be provided by an OASIS. '''
m_def = metainfo.Category(categories=[EditableUserMetadata])
class MongoMetadata(metainfo.MCategory):
''' NOMAD entry quantities that are stored in mongodb and not necessarely in the archive. '''
pass
......@@ -305,12 +310,14 @@ class EntryMetadata(metainfo.MSection):
upload_id = metainfo.Quantity(
type=str,
description='The persistent and globally unique identifier for the upload of the entry',
categories=[OasisMetadata],
a_search=Search(
many_or='append', group='uploads_grouped', metric_name='uploads', metric='cardinality'))
calc_id = metainfo.Quantity(
type=str,
description='A persistent and globally unique identifier for the entry',
categories=[OasisMetadata],
a_search=Search(many_or='append'))
calc_hash = metainfo.Quantity(
......@@ -375,7 +382,7 @@ class EntryMetadata(metainfo.MSection):
published = metainfo.Quantity(
type=bool, default=False,
description='Indicates if the entry is published',
categories=[MongoMetadata],
categories=[MongoMetadata, OasisMetadata],
a_search=Search())
processed = metainfo.Quantity(
......@@ -478,7 +485,7 @@ class EntryMetadata(metainfo.MSection):
a_search=Search())
upload_time = metainfo.Quantity(
type=metainfo.Datetime, categories=[MongoMetadata],
type=metainfo.Datetime, categories=[MongoMetadata, OasisMetadata],
description='The date and time this entry was uploaded to nomad',
a_flask=dict(admin_only=True),
a_search=Search(order_default=True))
......@@ -511,7 +518,7 @@ class EntryMetadata(metainfo.MSection):
a_search=Search(many_or='split'))
last_edit = metainfo.Quantity(
type=metainfo.Datetime, categories=[MongoMetadata],
type=metainfo.Datetime, categories=[MongoMetadata, OasisMetadata],
description='The date and time the user metadata was edited last',
a_search=Search())
......
......@@ -40,8 +40,7 @@ import hashlib
from structlog.processors import StackInfoRenderer, format_exc_info, TimeStamper
import yaml
import json
from cachetools import cached, LRUCache
from cachetools.keys import hashkey
from functools import lru_cache
from nomad import utils, config, infrastructure, search, datamodel
from nomad.files import (
......@@ -50,7 +49,7 @@ from nomad.files import (
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing.parsers import parser_dict, match_parser
from nomad.normalizing import normalizers
from nomad.datamodel import EntryArchive, EditableUserMetadata
from nomad.datamodel import EntryArchive, EditableUserMetadata, OasisMetadata
from nomad.archive import query_archive, write_partial_archive_to_mongo, delete_partial_archives_from_mongo
from nomad.datamodel.encyclopedia import EncyclopediaMetadata
......@@ -61,26 +60,8 @@ section_workflow = datamodel.EntryArchive.section_workflow.name
_editable_metadata = {
quantity.name: quantity for quantity in EditableUserMetadata.m_def.definitions}
@cached(cache=LRUCache(maxsize=100), key=lambda path, *args, **kwargs: hashkey(path))
def metadata_file_cached(path, logger):
for ext in config.metadata_file_extensions:
full_path = '%s.%s' % (path, ext)
if os.path.isfile(full_path):
try:
with open(full_path) as f:
if full_path.endswith('.json'):
return json.load(f)
elif full_path.endswith('.yaml') or full_path.endswith('.yml'):
return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
else:
return {}
except Exception as e:
logger.warn('could not parse nomad.yaml/json', path=path, exc_info=e)
# ignore the file contents if the file is not parsable
pass
return {}
_oasis_metadata = {
quantity.name: quantity for quantity in OasisMetadata.m_def.definitions}
def _pack_log_event(logger, method_name, event_dict):
......@@ -557,27 +538,49 @@ class Calc(Proc):
metadata_dir = os.path.dirname(self.mainfile_file.os_path)
metadata = {}
metadata_path = None
while metadata_dir:
metadata_part = metadata_file_cached(
os.path.join(metadata_dir, metadata_file), logger)
while True:
# top-level nomad file can also contain an entries dict with entry
# metadata per mainfile as key
if metadata_dir == self.upload_files.os_path:
entries = metadata_part.get('entries', {})
metadata_part = entries.get(self.mainfile, {})
for key, val in metadata_part.items():
metadata.setdefault(key, val)
# consider the nomad file of the current directory
metadata_part = self.upload.metadata_file_cached(
os.path.join(metadata_dir, metadata_file))
for key, val in metadata_part.items():
metadata.setdefault(key, val)
metadata_dir = os.path.dirname(metadata_dir)
if metadata_path is not None:
if metadata_dir == self.upload_files.os_path:
break
if len(metadata_dir) > 0:
metadata_dir = os.path.dirname(metadata_dir)
if len(metadata) > 0:
logger.info('Apply user metadata from nomad.yaml/json file')
for key, val in metadata.items():
if key == 'entries':
continue
definition = _editable_metadata.get(key, None)
if not definition:
logger.warn('Cannot set metadata %s' % key)
if definition is None and self.upload.from_oasis:
definition = _oasis_metadata.get(key, None)
if definition is None:
logger.warn('Users cannot set metadata', quantity=key)
continue
self._entry_metadata.m_set(definition, val)
try:
self._entry_metadata.m_set(definition, val)
if definition == datamodel.EntryMetadata.calc_id:
self.calc_id = val
except Exception as e:
logger.error(
'Could not apply user metadata from nomad.yaml/json file',
quantitiy=definition.name, exc_info=e)
@task
def archiving(self):
......@@ -680,6 +683,7 @@ class Upload(Proc):
publish_time = DateTimeField()
last_update = DateTimeField()
from_oasis = BooleanField(default=False)
joined = BooleanField(default=False)
meta: Any = {
......@@ -692,6 +696,25 @@ class Upload(Proc):
super().__init__(**kwargs)
self._upload_files: ArchiveBasedStagingUploadFiles = None
@lru_cache()
def metadata_file_cached(self, path):
for ext in config.metadata_file_extensions:
full_path = '%s.%s' % (path, ext)
if os.path.isfile(full_path):
try:
with open(full_path) as f:
if full_path.endswith('.json'):
return json.load(f)
elif full_path.endswith('.yaml') or full_path.endswith('.yml'):
return yaml.load(f, Loader=getattr(yaml, 'FullLoader'))
else:
return {}
except Exception as e:
self.get_logger().warn('could not parse nomad.yaml/json', path=path, exc_info=e)
# ignore the file contents if the file is not parsable
pass
return {}
@property
def metadata(self) -> dict:
'''
......@@ -1060,12 +1083,26 @@ class Upload(Proc):
'''
logger = self.get_logger()
oasis_metadata = {}
if self.from_oasis:
oasis_metadata = self.metadata_file_cached(
os.path.join(self.upload_files.os_path, 'raw', config.metadata_file_name)).get('entries', {})
with utils.timer(
logger, 'upload extracted', step='matching',
upload_size=self.upload_files.size):
for filename, parser in self.match_mainfiles():
oasis_entry_metadata = oasis_metadata.get(filename)
if oasis_entry_metadata is not None:
calc_id = oasis_entry_metadata.get('calc_id')
if calc_id is None:
logger.warn('Oasis entry without id', mainfile=filename)
calc_id = self.upload_files.calc_id(filename)
else:
calc_id = self.upload_files.calc_id(filename)
calc = Calc.create(
calc_id=self.upload_files.calc_id(filename),
calc_id=calc_id,
mainfile=filename, parser=parser.name,
worker_hostname=self.worker_hostname,
upload_id=self.upload_id)
......@@ -1145,6 +1182,47 @@ class Upload(Proc):
# don't fail or present this error to clients
self.logger.error('could not send after processing email', exc_info=e)
def _cleanup_after_processing_oasis_upload(self):
'''
Moves the upload out of staging to the public area. It will
pack the staging upload files in to public upload files.
'''
assert self.processed_calcs > 0
logger = self.get_logger()
logger.info('started to publish oasis upload')
with utils.lnr(logger, 'publish failed'):
metadata = self.metadata_file_cached(
os.path.join(self.upload_files.os_path, 'raw', config.metadata_file_name))
with self.entries_metadata(self.metadata) as calcs:
with utils.timer(
logger, 'staged upload files packed', step='pack',
upload_size=self.upload_files.size):
self.upload_files.pack(calcs)
with utils.timer(
logger, 'staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
self.upload_files.delete()
if metadata is not None:
self.publish_time = metadata.get('publish_time')
self.upload_time = metadata.get('upload_time')
if self.publish_time is None:
self.publish_time = datetime.utcnow()
logger.warn('oasis upload without publish time')
if self.upload_time is None:
self.upload_time = datetime.utcnow()
logger.warn('oasis upload without upload time')
self.published = True
self.last_update = datetime.utcnow()
self.save()
def _cleanup_after_re_processing(self):
logger = self.get_logger()
if self.published:
......@@ -1179,7 +1257,10 @@ class Upload(Proc):
if self.current_process == 're_process_upload':
self._cleanup_after_re_processing()
else:
self._cleanup_after_processing()
if self.from_oasis:
self._cleanup_after_processing_oasis_upload()
else:
self._cleanup_after_processing()
def get_calc(self, calc_id) -> Calc:
''' Returns the upload calc with the given id or ``None``. '''
......
......@@ -22,6 +22,7 @@ from datetime import datetime
import os.path
import re
import shutil
import json
from nomad import utils, infrastructure, config
from nomad.archive import read_partial_archive_from_mongo
......@@ -204,6 +205,46 @@ def test_publish_failed(
assert_search_upload(entries, additional_keys, published=True, processed=False)
@pytest.mark.timeout(5)
def test_oasis_upload_processing(proc_infra, non_empty_uploaded: Tuple[str, str], test_user):
Upload.metadata_file_cached.cache_clear()
from shutil import copyfile
import zipfile
uploaded_id, uploaded_path = non_empty_uploaded
uploaded_zipfile = os.path.join(config.fs.tmp, 'upload.zip')
copyfile(uploaded_path, uploaded_zipfile)
metadata = {
'upload_id': uploaded_id,
'upload_time': '2020-01-01 00:00:00',
'published': True,
'entries': {
'examples_template/template.json': {
'calc_id': 'test_calc_id'
}
}
}
with zipfile.ZipFile(uploaded_zipfile, 'a') as zf:
with zf.open('nomad.json', 'w') as f:
f.write(json.dumps(metadata).encode())
upload = Upload.create(
upload_id=uploaded_id, user=test_user, upload_path=uploaded_zipfile)
upload.from_oasis = True
assert upload.tasks_status == 'RUNNING'
assert upload.current_task == 'uploading'
upload.process_upload() # pylint: disable=E1101
upload.block_until_complete(interval=.01)
assert upload.published
assert str(upload.upload_time) == metadata['upload_time']
assert_processing(upload, published=True)
@pytest.mark.timeout(config.tests.default_timeout)
def test_processing_with_warning(proc_infra, test_user, with_warn):
example_file = 'tests/data/proc/examples_with_warning_template.zip'
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment