Commit b5fddbe2 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Simplified files.py metadata; only read only public metadata.

parent 375a62fc
......@@ -28,8 +28,7 @@ almost readonly (beside metadata) storage.
/.frozen
/.public
/.restricted
fs/public/<upload>/metadata.json
/metadata.json.lock
fs/public/<upload>/metadata.json.gz
/raw-public.bagit.zip
/raw-restricted.bagit.zip
/archive-public.hdf5.zip
......@@ -38,7 +37,6 @@ almost readonly (beside metadata) storage.
from abc import ABCMeta
from typing import IO, Generator, Dict, Iterator, Iterable, Callable
from filelock import Timeout, FileLock
import ujson
import os.path
import os
......@@ -48,6 +46,7 @@ from bagit import make_bag
import hashlib
import base64
import io
import gzip
from nomad import config, utils
......@@ -125,29 +124,8 @@ class MetadataTimeout(Exception):
class Metadata(metaclass=ABCMeta):
"""
An ABC for a contextmanager that encapsulates access to a set of calc metadata.
Allows to add, update, read metadata. Subclasses might deal with concurrent access.
An ABC for upload metadata classes that encapsulates access to a set of calc metadata.
"""
def __enter__(self) -> 'Metadata':
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
return None
def open(self):
pass
def close(self):
pass
def insert(self, calc: dict) -> None:
""" Insert a calc, using calc_id as key. """
raise NotImplementedError()
def update(self, calc_id: str, updates: dict) -> dict:
""" Updating a calc, using calc_id as key and running dict update with the given data. """
raise NotImplementedError()
def get(self, calc_id: str) -> dict:
""" Retrive the calc metadata for a given calc. """
raise NotImplementedError()
......@@ -169,19 +147,14 @@ class StagingMetadata(Metadata):
def __init__(self, directory: DirectoryObject) -> None:
self._dir = directory
def __enter__(self) -> 'Metadata':
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
return None
def open(self):
pass
def close(self):
pass
def remove(self, calc: dict) -> None:
id = calc['calc_id']
path = self._dir.join_file('%s.json' % id)
assert path.exists()
os.remove(path.os_path)
def insert(self, calc: dict) -> None:
""" Insert a calc, using calc_id as key. """
id = calc['calc_id']
path = self._dir.join_file('%s.json' % id)
assert not path.exists()
......@@ -189,6 +162,7 @@ class StagingMetadata(Metadata):
ujson.dump(calc, f)
def update(self, calc_id: str, updates: dict) -> dict:
""" Updating a calc, using calc_id as key and running dict update with the given data. """
metadata = self.get(calc_id)
metadata.update(updates)
path = self._dir.join_file('%s.json' % calc_id)
......@@ -215,51 +189,28 @@ class StagingMetadata(Metadata):
class PublicMetadata(Metadata):
"""
A Metadata implementation based on a single .json file. It loads and write
the metadata to the given path and uses a lock to deal with concurrent access.
A Metadata implementation based on a single .json file.
Arguments:
path: The parent directory for the metadata and lock file.
lock_timeout: Max timeout before __enter__ raises MetadataTimeout while waiting
for an available lock on the metadata file. Default is 1s.
"""
def __init__(self, path: str, lock_timeout=1) -> None:
self._db_file = os.path.join(path, 'metadata.json')
self._lock_file = os.path.join(path, 'metadata.json.lock')
self._lock: FileLock = FileLock(self._lock_file, timeout=lock_timeout)
self._db_file = os.path.join(path, 'metadata.json.gz')
self._modified = False
self.data: Dict[str, dict] = None
def __enter__(self) -> 'Metadata':
self.open()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
return None
def open(self):
assert self.data is None, "Metadata is already open."
try:
self._lock.acquire()
except Timeout:
raise MetadataTimeout()
self._data: Dict[str, dict] = None
if os.path.exists(self._db_file):
with open(self._db_file, 'rt') as f:
self.data = ujson.load(f)
else:
self.data = {}
self._modified = True
def close(self):
assert self.data is not None, "Metadata is not open."
if self._modified:
with open(self._db_file, 'wt') as f:
ujson.dump(self.data, f, ensure_ascii=False)
self.data = None
self._lock.release()
@property
def data(self):
if self._data is None:
with gzip.open(self._db_file, 'rt') as f:
self._data = ujson.load(f)
return self._data
def _create(self, calcs: Iterable[dict]) -> None:
assert not os.path.exists(self._db_file) and self._data is None
self._data = {data['calc_id']: data for data in calcs}
with gzip.open(self._db_file, 'wt') as f:
ujson.dump(self._data, f)
def insert(self, calc: dict) -> None:
assert self.data is not None, "Metadata is not open."
......@@ -270,25 +221,15 @@ class PublicMetadata(Metadata):
self._modified = True
def update(self, calc_id: str, updates: dict) -> dict:
assert self.data is not None, "Metadata is not open."
if calc_id not in self.data:
raise KeyError()
self.data[calc_id].update(updates)
self._modified = True
return self.data[calc_id]
raise NotImplementedError
def get(self, calc_id: str) -> dict:
assert self.data is not None, "Metadata is not open."
return self.data[calc_id]
def __iter__(self) -> Iterator[dict]:
assert self.data is not None, "Metadata is not open."
return self.data.values().__iter__()
def __len__(self) -> int:
assert self.data is not None, "Metadata is not open."
return len(self.data)
......@@ -393,7 +334,7 @@ class StagingUploadFiles(UploadFiles):
return self._size
@property
def metadata(self) -> Metadata:
def metadata(self) -> StagingMetadata:
return self._metadata
def _file(self, path_object: PathObject, *args, **kwargs) -> IO:
......@@ -532,9 +473,8 @@ class StagingUploadFiles(UploadFiles):
archive_public_zip.close()
# pack metadata
with PublicMetadata(packed_dir.os_path) as packed_metadata:
for calc in self.metadata:
packed_metadata.insert(calc)
packed_metadata = PublicMetadata(packed_dir.os_path)
packed_metadata._create(self._metadata)
# move to public bucket
target_dir = DirectoryObject(config.files.public_bucket, self.upload_id, create=False, prefix=True)
......
......@@ -239,8 +239,8 @@ class Calc(Proc, datamodel.Calc):
user_id=self.upload.user_id,
aux_files=list(self.upload_files.calc_files(self.mainfile, with_mainfile=False)))
# persist the repository metadata
with utils.timer(logger, 'indexed', step='index'):
# persist to elastic search
repo_calc = RepoCalc.create_from_backend(
self._parser_backend,
additional=additional,
......@@ -248,11 +248,10 @@ class Calc(Proc, datamodel.Calc):
upload_id=self.upload_id)
repo_calc.persist()
# persist the archive
with utils.timer(
logger, 'archived', step='archive',
input_size=self.mainfile_file.size) as log_data:
# persist the archive
with self.upload_files.archive_file(self.calc_id, 'wt') as out:
self._parser_backend.write_json(out, pretty=True)
......
......@@ -21,7 +21,7 @@ import json
from nomad import config
from nomad.files import DirectoryObject, PathObject
from nomad.files import Metadata, MetadataTimeout, PublicMetadata, StagingMetadata
from nomad.files import Metadata, PublicMetadata, StagingMetadata
from nomad.files import StagingUploadFiles, PublicUploadFiles, UploadFiles, Restricted, \
ArchiveBasedStagingUploadFiles
......@@ -127,76 +127,72 @@ class MetadataContract:
def md(self, test_dir):
raise NotImplementedError()
def test_open_empty(self, md):
pass
def test_insert(self, md: Metadata):
md.insert(example_calc)
assert len(md) == 1
def test_get(self, md: Metadata):
assert_example_calc(md.get(example_calc_id))
def test_insert_fail(self, md: Metadata):
def test_get_fail(self, md: Metadata):
failed = False
md.insert(example_calc)
try:
md.insert(example_calc)
except Exception:
md.get('unknown')
except KeyError:
failed = True
assert failed
assert len(md) == 1
def test_update(self, md: Metadata):
class TestStagingMetadata(MetadataContract):
@pytest.fixture(scope='function')
def md(self, test_dir):
md = StagingMetadata(DirectoryObject(None, None, os_path=test_dir))
md.insert(example_calc)
md.update(example_calc_id, dict(data='updated'))
assert len(md) == 1
assert md.get(example_calc_id)['data'] == 'updated'
return md
def test_update_fail(self, md: Metadata):
def test_remove(self, md: StagingMetadata):
md.remove(example_calc)
failed = False
try:
md.update(example_calc_id, dict(data='updated'))
assert md.get(example_calc['calc_id'])
except KeyError:
failed = True
assert failed
assert len(md) == 0
def test_get(self, md: Metadata):
def test_insert(self, md: StagingMetadata):
md.remove(example_calc)
md.insert(example_calc)
assert len(md) == 1
assert_example_calc(md.get(example_calc_id))
def test_get_fail(self, md: Metadata):
def test_insert_fail(self, md: StagingMetadata):
failed = False
try:
md.get(example_calc_id)
except KeyError:
md.insert(example_calc)
except Exception:
failed = True
assert failed
assert len(md) == 1
def test_update(self, md: StagingMetadata):
md.update(example_calc_id, dict(data='updated'))
assert len(md) == 1
assert md.get(example_calc_id)['data'] == 'updated'
class TestStagingMetadata(MetadataContract):
@pytest.fixture(scope='function')
def md(self, test_dir):
with StagingMetadata(DirectoryObject(None, None, os_path=test_dir)) as md:
yield md
def test_update_fail(self, md: StagingMetadata):
failed = False
try:
md.update('unknown', dict(data='updated'))
except KeyError:
failed = True
assert failed
assert len(md) == 1
class TestPublicMetadata(MetadataContract):
@pytest.fixture(scope='function')
def md(self, test_dir):
with PublicMetadata(test_dir) as md:
yield md
def test_lock(self, test_dir):
timeout = False
with PublicMetadata(test_dir):
try:
with PublicMetadata(test_dir, lock_timeout=0.1):
pass
except MetadataTimeout:
timeout = True
assert timeout
md = PublicMetadata(test_dir)
md._create([example_calc])
return md
class UploadFilesFixtures:
......@@ -232,12 +228,10 @@ class UploadFilesContract(UploadFilesFixtures):
with test_upload.raw_file(example_file_mainfile) as f:
assert len(f.read()) > 0
if not test_upload._is_authorized():
with test_upload.metadata as md:
assert not md.get(example_calc_id).get('restricted', False)
assert not test_upload.metadata.get(example_calc_id).get('restricted', False)
except Restricted:
assert not test_upload._is_authorized()
with test_upload.metadata as md:
assert md.get(example_calc_id).get('restricted', False)
assert test_upload.metadata.get(example_calc_id).get('restricted', False)
@pytest.mark.parametrize('prefix', [None, 'examples'])
def test_raw_file_manifest(self, test_upload: StagingUploadFiles, prefix: str):
......@@ -255,23 +249,13 @@ class UploadFilesContract(UploadFilesFixtures):
assert json.load(f) == 'archive'
if not test_upload._is_authorized():
with test_upload.metadata as md:
assert not md.get(example_calc_id).get('restricted', False)
assert not test_upload.metadata.get(example_calc_id).get('restricted', False)
except Restricted:
assert not test_upload._is_authorized()
with test_upload.metadata as md:
assert md.get(example_calc_id).get('restricted', False)
assert test_upload.metadata.get(example_calc_id).get('restricted', False)
def test_metadata(self, test_upload):
with test_upload.metadata as md:
assert_example_calc(md.get(example_calc_id))
def test_update_metadata(self, test_upload):
with test_upload.metadata as md:
md.update(example_calc_id, dict(data='updated'))
with test_upload.metadata as md:
assert md.get(example_calc_id)['data'] == 'updated'
assert_example_calc(test_upload.metadata.get(example_calc_id))
def create_staging_upload(upload_id: str, calc_specs: str) -> StagingUploadFiles:
......@@ -314,8 +298,7 @@ def create_staging_upload(upload_id: str, calc_specs: str) -> StagingUploadFiles
public_only = False
upload._is_authorized = lambda: not public_only
with upload.metadata as md:
assert len(md) == len(calc_specs)
assert len(upload.metadata) == len(calc_specs)
return upload
......@@ -364,6 +347,10 @@ class TestStagingUploadFiles(UploadFilesContract):
test_upload.delete()
assert not test_upload.exists()
def test_update_metadata(self, test_upload):
test_upload.metadata.update(example_calc_id, dict(data='updated'))
test_upload.metadata.get(example_calc_id)['data'] == 'updated'
class TestArchiveBasedStagingUploadFiles(UploadFilesFixtures):
def test_create(self, test_upload_id):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment