Commit 84eebe94 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Replaced metadata.json with mongodb #130.

parent 414320ce
Pipeline #45653 passed with stages
in 27 minutes and 10 seconds
......@@ -20,12 +20,12 @@ meta-data.
from flask_restplus import Resource, abort, fields
from flask import request, g
from elasticsearch_dsl import Q
from elasticsearch.exceptions import NotFoundError
from nomad.files import UploadFiles, Restricted
from nomad import search
from .app import api
from .auth import login_if_available, create_authorization_predicate
from .auth import login_if_available
from .common import pagination_model, pagination_request_parser, calc_route
ns = api.namespace('repo', description='Access repository metadata.')
......@@ -42,21 +42,28 @@ class RepoCalcResource(Resource):
"""
Get calculation metadata in repository form.
Repository metadata only entails the quanties shown in the repository.
Repository metadata only entails the quantities shown in the repository.
Calcs are references via *upload_id*, *calc_id* pairs.
"""
# TODO use elastic search instead of the files
# TODO add missing user metadata (from elastic or repo db)
upload_files = UploadFiles.get(upload_id, create_authorization_predicate(upload_id, calc_id))
if upload_files is None:
abort(404, message='There is no upload %s' % upload_id)
try:
return upload_files.metadata.get(calc_id), 200
except Restricted:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
except KeyError:
abort(404, message='There is no calculation for %s/%s' % (upload_id, calc_id))
calc = search.Entry.get(calc_id)
except NotFoundError:
abort(404, message='There is no calculation %s/%s' % (upload_id, calc_id))
if calc.with_embargo or not calc.published:
if g.user is None:
abort(401, message='Not logged in to access %s/%s.' % (upload_id, calc_id))
is_owner = g.user.user_id == 0
if not is_owner:
for owner in calc.owners:
if owner.user_id == str(g.user.user_id):
is_owner = True
break
if not is_owner:
abort(401, message='Not authorized to access %s/%s.' % (upload_id, calc_id))
return calc.to_dict(), 200
repo_calcs_model = api.model('RepoCalculations', {
......
......@@ -366,6 +366,8 @@ class UploadResource(Resource):
abort(400, message='The upload is not processed yet')
if upload.tasks_status == FAILURE:
abort(400, message='Cannot publish an upload that failed processing')
if upload.processed_calcs == 0:
abort(400, message='Cannot publish an upload without calculations')
try:
upload.metadata = metadata
upload.publish_upload()
......
......@@ -237,7 +237,7 @@ class Calc(Base):
# user relations
def add_users_to_relation(source_users, relation):
for source_user in source_users:
coe_user = context.cache(User, user_id=source_user.id)
coe_user = context.cache(User, user_id=int(source_user.id))
if coe_user is None:
raise IllegalCalcMetadata(
'User with user_id %s does not exist.' % source_user.id)
......
......@@ -145,6 +145,7 @@ console_log_level = logging.WARNING
service = 'unknown nomad service'
release = 'devel'
auxfile_cutoff = 30
version = '4.3' # TODO replace with git hash?
def get_loglevel_from_env(key, default_level=logging.INFO):
......
......@@ -31,7 +31,7 @@ classes. These are the implemented transformations:
.. image:: datamodel_transformations.png
"""
from typing import Iterable, List
from typing import Iterable, List, Dict
import datetime
from nomad import utils
......@@ -47,11 +47,15 @@ class UploadWithMetadata():
self.uploader: utils.POPO = None
self.upload_time: datetime.datetime = None
self.calcs: Iterable[CalcWithMetadata] = list()
self.calcs: Iterable['CalcWithMetadata'] = list()
for key, value in kwargs.items():
setattr(self, key, value)
@property
def calcs_dict(self) -> Dict[str, 'CalcWithMetadata']:
return {calc.calc_id: calc for calc in self.calcs}
class CalcWithMetadata():
"""
......@@ -63,12 +67,19 @@ class CalcWithMetadata():
Attributes:
upload_id: The ``upload_id`` of the calculations upload (random UUID).
calc_id: The unique mainfile based calculation id.
upload_time: The time when the calc was uploaded.
calc_hash: The raw file content based checksum/hash of this calculation.
pid: The unique persistent id of this calculation.
mainfile: The upload relative mainfile path.
files: A list of all files, relative to upload.
upload_time: The time when the calc was uploaded.
uploader: An object describing the uploading user, has at least ``user_id``
processed: Boolean indicating if this calc was successfully processed and archive
data and calc metadata is available.
last_processing: A datatime with the time of the last successful processing.
nomad_version: A string that describes the version of the nomad software that was
used to do the last successful processing.
with_embargo: Show if user set an embargo on the calculation.
coauthors: List of coauther user objects with at ``user_id``.
shared_with: List of users this calcs ownership is shared with, objects with at ``user_id``.
......@@ -76,6 +87,7 @@ class CalcWithMetadata():
references: Objects describing user provided references, keys are ``id`` and ``value``.
datasets: Objects describing the datasets, keys are ``id``, ``name``, ``doi``.
DOI is optional, is an object with key ``id``, ``value``.
formula: The chemical formula
atoms: A list of all atoms, as labels. All atoms means the whole composition, with atom labels repeated.
basis_set: The basis set type of this calculation.
......@@ -87,16 +99,22 @@ class CalcWithMetadata():
code_version: The version of the used code.
"""
def __init__(self, **kwargs):
# id relevant metadata
self.upload_id: str = None
self.calc_id: str = None
self.upload_time: datetime.datetime = None
self.calc_hash: str = None
self.pid: int = None
self.mainfile: str = None
self.pid: int = None
# basic upload and processing related metadata
self.upload_time: datetime.datetime = None
self.files: List[str] = None
self.uploader: utils.POPO = None
self.processed: bool = False
self.last_processing: datetime.datetime = None
self.nomad_version: str = None
# user metadata, i.e. quantities given and editable by the user
self.with_embargo: bool = None
self.published: bool = False
self.coauthors: List[utils.POPO] = []
......@@ -105,6 +123,7 @@ class CalcWithMetadata():
self.references: List[utils.POPO] = []
self.datasets: List[utils.POPO] = []
# DFT specific calc metadata, derived from raw data through successful processing
self.formula: str = None
self.atoms: List[str] = []
self.basis_set: str = None
......@@ -116,6 +135,7 @@ class CalcWithMetadata():
self.code_name: str = None
self.code_version: str = None
# temporary reference to the backend after successful processing
self.backend = None
self.update(**kwargs)
......@@ -128,6 +148,12 @@ class CalcWithMetadata():
def update(self, **kwargs):
for key, value in kwargs.items():
if isinstance(value, list):
if len(value) > 0 and isinstance(value[0], dict) and not isinstance(value[0], utils.POPO):
value = list(utils.POPO(**item) for item in value)
if isinstance(value, dict) and not isinstance(value, utils.POPO):
value = utils.POPO(**value)
setattr(self, key, value)
def apply_user_metadata(self, metadata: dict):
......@@ -139,13 +165,13 @@ class CalcWithMetadata():
self.upload_time = metadata.get('_upload_time')
uploader_id = metadata.get('_uploader')
if uploader_id is not None:
self.uploader = utils.POPO(id=uploader_id)
self.uploader = utils.POPO(id=int(uploader_id))
self.references = [utils.POPO(value=ref) for ref in metadata.get('references', [])]
self.with_embargo = metadata.get('with_embargo', False)
self.coauthors = [
utils.POPO(id=user) for user in metadata.get('coauthors', [])]
utils.POPO(id=int(user)) for user in metadata.get('coauthors', [])]
self.shared_with = [
utils.POPO(id=user) for user in metadata.get('shared_with', [])]
utils.POPO(id=int(user)) for user in metadata.get('shared_with', [])]
self.datasets = [
utils.POPO(id=ds['id'], doi=utils.POPO(value=ds.get('_doi')), name=ds.get('_name'))
utils.POPO(id=int(ds['id']), doi=utils.POPO(value=ds.get('_doi')), name=ds.get('_name'))
for ds in metadata.get('datasets', [])]
......@@ -52,8 +52,7 @@ being other mainfiles. Therefore, the aux files of a restricted calc might becom
"""
from abc import ABCMeta
from typing import IO, Generator, Dict, Iterator, Iterable, Callable
import json
from typing import IO, Generator, Dict, Iterable, Callable
import os.path
import os
import shutil
......@@ -61,9 +60,9 @@ from zipfile import ZipFile, BadZipFile
import tarfile
import hashlib
import io
import gzip
from nomad import config, utils
from nomad.datamodel import UploadWithMetadata
class PathObject:
......@@ -155,117 +154,6 @@ class ExtractError(Exception):
pass
class Metadata(metaclass=ABCMeta):
"""
An ABC for upload metadata classes that encapsulates access to a set of calc metadata.
"""
def get(self, calc_id: str) -> dict:
""" Retrive the calc metadata for a given calc. """
raise NotImplementedError()
def __iter__(self) -> Iterator[dict]:
raise NotImplementedError()
def __len__(self) -> int:
raise NotImplementedError()
class StagingMetadata(Metadata):
"""
A Metadata implementation based on individual .json files per calc stored in a given
directory.
Arguments:
directory: The DirectoryObject for the directory to store the metadata in.
"""
def __init__(self, directory: DirectoryObject) -> None:
self._dir = directory
def remove(self, calc: dict) -> None:
id = calc['calc_id']
path = self._dir.join_file('%s.json' % id)
assert path.exists()
os.remove(path.os_path)
def insert(self, calc: dict) -> None:
""" Insert a calc, using calc_id as key. """
id = calc['calc_id']
path = self._dir.join_file('%s.json' % id)
assert not path.exists()
with open(path.os_path, 'wt') as f:
json.dump(calc, f, sort_keys=True, default=str)
def update(self, calc_id: str, updates: dict) -> dict:
""" Updating a calc, using calc_id as key and running dict update with the given data. """
metadata = self.get(calc_id)
metadata.update(updates)
path = self._dir.join_file('%s.json' % calc_id)
with open(path.os_path, 'wt') as f:
json.dump(metadata, f, sort_keys=True, default=str)
return metadata
def get(self, calc_id: str) -> dict:
try:
with open(self._dir.join_file('%s.json' % calc_id).os_path, 'rt') as f:
return json.load(f)
except FileNotFoundError:
raise KeyError()
def __iter__(self) -> Iterator[dict]:
for root, _, files in os.walk(self._dir.os_path):
for file in files:
with open(os.path.join(root, file), 'rt') as f:
yield json.load(f)
def __len__(self) -> int:
return len(os.listdir(self._dir.os_path))
class PublicMetadata(Metadata):
"""
A Metadata implementation based on a single .json file.
Arguments:
path: The parent directory for the metadata and lock file.
"""
def __init__(self, path: str, lock_timeout=1) -> None:
self._db_file = os.path.join(path, 'metadata.json.gz')
self._modified = False
self._data: Dict[str, dict] = None
@property
def data(self):
if self._data is None:
with gzip.open(self._db_file, 'rt') as f:
self._data = json.load(f)
return self._data
def _create(self, calcs: Iterable[dict]) -> None:
assert not os.path.exists(self._db_file) and self._data is None
self._data = {data['calc_id']: data for data in calcs}
with gzip.open(self._db_file, 'wt') as f:
json.dump(self._data, f, sort_keys=True, default=str)
def insert(self, calc: dict) -> None:
assert self.data is not None, "Metadata is not open."
id = calc['calc_id']
assert id not in self.data
self.data[id] = calc
self._modified = True
def update(self, calc_id: str, updates: dict) -> dict:
raise NotImplementedError
def get(self, calc_id: str) -> dict:
return self.data[calc_id]
def __iter__(self) -> Iterator[dict]:
return self.data.values().__iter__()
def __len__(self) -> int:
return len(self.data)
class Restricted(Exception):
pass
......@@ -297,11 +185,6 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
else:
return None
@property
def metadata(self) -> Metadata:
""" The calc metadata for this upload. """
raise NotImplementedError
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
"""
Opens a raw file and returns a file-like object. Additional args, kwargs are
......@@ -357,21 +240,12 @@ class StagingUploadFiles(UploadFiles):
self._archive_dir = self.join_dir('archive')
self._frozen_file = self.join_file('.frozen')
metadata_dir = self.join_dir('metadata')
self._metadata = StagingMetadata(metadata_dir)
self._size = 0
@property
def size(self) -> int:
return self._size
@property
def metadata(self) -> StagingMetadata:
if not self._is_authorized():
raise Restricted
return self._metadata
def _file(self, path_object: PathObject, *args, **kwargs) -> IO:
try:
return open(path_object.os_path, *args, **kwargs)
......@@ -456,13 +330,14 @@ class StagingUploadFiles(UploadFiles):
""" Returns True if this upload is already *bagged*. """
return self._frozen_file.exists()
def pack(self, bagit_metadata: dict = None) -> None:
def pack(self, upload: UploadWithMetadata) -> None:
"""
Replaces the staging upload data with a public upload record by packing all
data into files. It is only available if upload *is_bag*.
This is potentially a long running operation.
Arguments:
bagit_metadata: Additional data added to the bagit metadata.
calcs: The calculation metadata of the upload used to determine what files to
pack and what the embargo situation is.
"""
self.logger.debug('started to pack upload')
......@@ -491,16 +366,16 @@ class StagingUploadFiles(UploadFiles):
# 1. add all public raw files
# 1.1 collect all public mainfiles and aux files
public_files: Dict[str, str] = {}
for calc in self.metadata:
if not calc.get('with_embargo', False):
mainfile = calc['mainfile']
for calc in upload.calcs:
if not calc.with_embargo:
mainfile = calc.mainfile
assert mainfile is not None
for filepath in self.calc_files(mainfile):
public_files[filepath] = None
# 1.2 remove the non public mainfiles that have been added as auxfiles of public mainfiles
for calc in self.metadata:
if calc.get('with_embargo', False):
mainfile = calc['mainfile']
for calc in upload.calcs:
if calc.with_embargo:
mainfile = calc.mainfile
assert mainfile is not None
if mainfile in public_files:
del(public_files[mainfile])
......@@ -521,13 +396,13 @@ class StagingUploadFiles(UploadFiles):
archive_public_zip = create_zipfile('archive', 'public', self._archive_ext)
archive_restricted_zip = create_zipfile('archive', 'restricted', self._archive_ext)
for calc in self.metadata:
archive_zip = archive_restricted_zip if calc.get('with_embargo', False) else archive_public_zip
for calc in upload.calcs:
archive_zip = archive_restricted_zip if calc.with_embargo else archive_public_zip
archive_filename = '%s.%s' % (calc['calc_id'], self._archive_ext)
archive_filename = '%s.%s' % (calc.calc_id, self._archive_ext)
archive_zip.write(self._archive_dir.join_file(archive_filename).os_path, archive_filename)
archive_log_filename = '%s.%s' % (calc['calc_id'], 'log')
archive_log_filename = '%s.%s' % (calc.calc_id, 'log')
log_file = self._archive_dir.join_file(archive_log_filename)
if log_file.exists():
archive_zip.write(log_file.os_path, archive_log_filename)
......@@ -536,11 +411,6 @@ class StagingUploadFiles(UploadFiles):
archive_public_zip.close()
self.logger.debug('packed archives')
# pack metadata
packed_metadata = PublicMetadata(target_dir.os_path)
packed_metadata._create(self._metadata)
self.logger.debug('packed metadata')
self.logger.debug('packed upload')
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
......@@ -650,12 +520,6 @@ class PublicUploadFiles(UploadFiles):
def __init__(self, *args, **kwargs) -> None:
super().__init__(config.fs.public, *args, **kwargs)
self._metadata = PublicMetadata(self.os_path)
@property
def metadata(self) -> Metadata:
return self._metadata
def _file(self, prefix: str, ext: str, path: str, *args, **kwargs) -> IO:
mode = kwargs.get('mode') if len(args) == 0 else args[0]
if 'mode' in kwargs:
......
......@@ -24,17 +24,19 @@ calculations, and files
:members:
"""
from typing import List, Any, ContextManager, Tuple, Generator, Dict
from typing import List, Any, ContextManager, Tuple, Generator, Dict, cast
from mongoengine import StringField, DateTimeField, DictField, BooleanField
import logging
from structlog import wrap_logger
from contextlib import contextmanager
import os.path
from datetime import datetime
from pymongo import UpdateOne
from nomad import utils, coe_repo, config, infrastructure, search
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles
from nomad.files import PathObject, UploadFiles, ExtractError, ArchiveBasedStagingUploadFiles, PublicUploadFiles
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parser_dict, match_parser
from nomad.parsing import parser_dict, match_parser, LocalBackend
from nomad.normalizing import normalizers
from nomad.datamodel import UploadWithMetadata, CalcWithMetadata
......@@ -54,12 +56,16 @@ class Calc(Proc):
parser: the name of the parser used to process this calc
upload_id: the id of the upload used to create this calculation
mainfile: the mainfile (including path in upload) that was used to create this calc
metadata: the metadata record wit calc and user metadata, see :class:`CalcWithMetadata`
"""
calc_id = StringField(primary_key=True)
upload_id = StringField()
mainfile = StringField()
parser = StringField()
metadata = DictField()
queue = 'calcs'
meta: Any = {
......@@ -70,7 +76,7 @@ class Calc(Proc):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._parser_backend = None
self._parser_backend: LocalBackend = None
self._upload: Upload = None
self._upload_files: ArchiveBasedStagingUploadFiles = None
self._calc_proc_logwriter = None
......@@ -261,14 +267,19 @@ class Calc(Proc):
logger = self.get_logger()
calc_with_metadata = self._parser_backend.to_calc_with_metadata()
calc_with_metadata.published = False
calc_with_metadata.uploader = self.upload.uploader.to_popo()
calc_with_metadata.processed = True
calc_with_metadata.last_processing = datetime.now()
calc_with_metadata.nomad_version = config.version
# persist the repository metadata
with utils.timer(logger, 'saved repo metadata', step='metadata'):
self.upload_files.metadata.insert(calc_with_metadata.to_dict())
self.metadata = calc_with_metadata.to_dict()
self.save()
# index in search
with utils.timer(logger, 'indexed', step='index'):
calc_with_metadata.update(published=False, uploader=self.upload.uploader.to_popo())
search.Entry.from_calc_with_metadata(calc_with_metadata).save()
# persist the archive
......@@ -318,6 +329,8 @@ class Upload(Proc):
metadata = DictField(default=None)
upload_time = DateTimeField()
user_id = StringField(required=True)
published = BooleanField(default=False)
publish_time = DateTimeField()
queue = 'uploads'
......@@ -332,13 +345,18 @@ class Upload(Proc):
self._upload_files: ArchiveBasedStagingUploadFiles = None
@classmethod
def get(cls, id):
return cls.get_by_id(id, 'upload_id')
def get(cls, id: str, include_published: bool = False) -> 'Upload':
upload = cls.get_by_id(id, 'upload_id')
# TODO published uploads should not be hidden by this and API
if upload is not None and (not upload.published or include_published):
return upload
raise KeyError()
@classmethod
def user_uploads(cls, user: coe_repo.User) -> List['Upload']:
""" Returns all uploads for the given user. Currently returns all uploads. """
return cls.objects(user_id=str(user.user_id))
return cls.objects(user_id=str(user.user_id), published=False)
@property
def uploader(self):
......@@ -411,6 +429,8 @@ class Upload(Proc):
coe repository db and remove this instance and its calculation from the
processing state db.
"""
assert self.processed_calcs > 0
logger = self.get_logger()
logger.info('started to publish')
......@@ -435,15 +455,19 @@ class Upload(Proc):
with utils.timer(
logger, 'upload metadata updated', step='metadata',
upload_size=self.upload_files.size):
for calc in calcs:
def create_update(calc):
calc.published = True
self.upload_files.metadata.update(
calc_id=calc.calc_id, updates=calc.to_dict())
return UpdateOne(
{'_id': calc.calc_id},
{'$set': {'metadata': calc.to_dict()}})
Calc._get_collection().bulk_write([create_update(calc) for calc in calcs])
with utils.timer(
logger, 'staged upload files packed', step='pack',
upload_size=self.upload_files.size):
self.upload_files.pack()
self.upload_files.pack(upload_with_metadata)
with utils.timer(
logger, 'index updated', step='index',
......@@ -454,9 +478,9 @@ class Upload(Proc):
logger, 'staged upload deleted', step='delete staged',
upload_size=self.upload_files.size):
self.upload_files.delete()
self.delete()
return True # do not save the process status on the delete upload
self.published = True
self.publish_time = datetime.now()
self.save()
@process
def process_upload(self):
......@@ -468,12 +492,20 @@ class Upload(Proc):
pass
@property