Commit 39be1f85 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Implemented coe python api mimicing #159.

parent 04b8ada0
Pipeline #51164 passed with stages
in 18 minutes and 6 seconds
......@@ -57,8 +57,10 @@ class CalcMetaData(Base): # type: ignore
calc_id = Column(Integer, ForeignKey('calculations.calc_id'), primary_key=True)
calc = relationship('Calc')
added = Column(DateTime)
oadate = Column(DateTime)
chemical_formula = Column(String)
filenames = Column(BYTEA)
download_size = Column(Integer)
location = Column(String)
version_id = Column(Integer, ForeignKey('codeversions.version_id'))
version = relationship('CodeVersion', lazy='joined', uselist=False)
......
......@@ -12,14 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from typing import List, Dict, Any
import json
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, aliased
from sqlalchemy.sql.expression import literal
from datetime import datetime
import os.path
from nomad import infrastructure, utils, config
from nomad import infrastructure, utils, config, files
from nomad.datamodel import DFTCalcWithMetadata
from . import base
......@@ -56,9 +57,11 @@ class PublishContext:
Access to a logger with bound data about the upload, etc.
"""
def __init__(self, **kwargs):
self._cache = {}
self.logger = utils.get_logger(__name__, **kwargs)
def __init__(self, upload_id: str = None, **kwargs):
self._cache: Dict[str, Any] = {}
self.upload_id = upload_id
self.upload_files = None if upload_id is None else files.UploadFiles.get(upload_id, is_authorized=lambda: True)
self.logger = utils.get_logger(__name__, upload_id=upload_id, **kwargs)
def cache(self, entity, **kwargs):
key = json.dumps(dict(entity=entity.__class__.__name__, **kwargs))
......@@ -223,11 +226,31 @@ class Calc(Base):
else:
added_time = datetime.now()
upload_id = context.upload_id
upload_files = context.upload_files
coe_files = list()
if upload_files is None:
upload_size = -1
else:
upload_size = 0
for calc_file in calc.files:
if config.repository_db.mode == 'coe':
coe_file = os.path.join('$EXTRACTED', 'fairdi', upload_id, calc_file)
else:
coe_file = calc_file
if upload_files is not None:
upload_size += upload_files.raw_file_size(calc_file)
coe_files.append(coe_file)
metadata = CalcMetaData(
calc=self,
added=added_time,
oadate=added_time,
chemical_formula=calc.formula,
filenames=('[%s]' % ','.join(['"%s"' % filename for filename in calc.files])).encode('utf-8'),
filenames=('[%s]' % ','.join(['"%s"' % coe_file for coe_file in coe_files])).encode('utf-8'),
download_size=upload_size,
location=calc.mainfile,
version=code_version_obj)
repo_db.add(metadata)
......
......@@ -86,6 +86,7 @@ class Upload(Base): # type: ignore
coe_upload_id = Column('upload_id', Integer, primary_key=True, autoincrement=True)
upload_name = Column(String)
target_path = Column(String)
user_id = Column(Integer, ForeignKey('users.user_id'))
is_processed = Column(Boolean)
created = Column(DateTime)
......@@ -140,6 +141,14 @@ class Upload(Base): # type: ignore
uploads-entry, respective calculation and property entries. Everything in one
transaction.
There are two modes (fairdi, coe). The coe mode will mimic the old python API
behaviour. An additional extracted raw-file copy needs to be stored for the old CoE
repository. Here, we will add the file path to the respective table.
In fairdi mode, only the .zip-based raw file archive is used. The file path data
will be replaced with information necessary to use nomad@fairdis raw-file API.
This function only handles the postgres entries. Files are created elsewhere
(e.g. nomad.processing.data).
Arguments:
upload: The upload to add, including calculations with respective IDs, UMD, CMD.
......@@ -177,6 +186,7 @@ class Upload(Base): # type: ignore
# create upload
coe_upload = Upload(
upload_name=upload.upload_id,
target_path='$EXTRACTED/fairdi/%s' % upload.upload_id if config.repository_db.mode == 'coe' else None,
created=upload.upload_time,
user_id=upload.uploader.id,
is_processed=True)
......
......@@ -78,6 +78,7 @@ fs = NomadConfig(
tmp='.volumes/fs/tmp',
staging='.volumes/fs/staging',
public='.volumes/fs/public',
coe_extracted='.volumes/fs/extracted',
migration_packages='.volumes/fs/migration_packages',
local_tmp='/tmp',
prefix_size=2,
......@@ -98,7 +99,8 @@ repository_db = NomadConfig(
dbname='nomad_fairdi_repo_db',
user='postgres',
password='nomad',
handle_prefix='21.11132/'
handle_prefix='21.11132/',
mode='fairdi'
)
mongo = NomadConfig(
......
......@@ -85,6 +85,7 @@ class CalcWithMetadata():
# basic upload and processing related metadata
self.upload_time: datetime.datetime = None
self.files: List[str] = None
self.file_sizes: List[int] = None
self.uploader: utils.POPO = None
self.processed: bool = False
self.last_processing: datetime.datetime = None
......
......@@ -224,6 +224,13 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
"""
raise NotImplementedError()
def raw_file_size(self, file_path: str) -> int:
"""
Returns:
The size of the given raw file.
"""
raise NotImplementedError()
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
"""
Returns the path for all raw files in the archive (with a given prefix).
......@@ -291,6 +298,11 @@ class StagingUploadFiles(UploadFiles):
raise Restricted
return self._file(self.raw_file_object(file_path), *args, **kwargs)
def raw_file_size(self, file_path: str) -> int:
if not self._is_authorized():
raise Restricted
return self.raw_file_object(file_path).size
def raw_file_object(self, file_path: str) -> PathObject:
return self._raw_dir.join_file(file_path)
......@@ -364,6 +376,13 @@ class StagingUploadFiles(UploadFiles):
""" Returns True if this upload is already *bagged*. """
return self._frozen_file.exists()
def create_extracted_copy(self) -> None:
"""
Copies all raw-file to the extracted bucket to mimic the behaviour of the old
CoE python API. TODO: should be removed after migration.
"""
shutil.copytree(self._raw_dir.os_path, os.path.join(config.fs.extracted, self.upload_id))
def pack(self, upload: UploadWithMetadata) -> None:
"""
Replaces the staging upload data with a public upload record by packing all
......@@ -596,6 +615,23 @@ class PublicUploadFiles(UploadFiles):
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
return self._file('raw', 'plain', file_path, *args, *kwargs)
def raw_file_size(self, file_path: str) -> int:
for access in ['public', 'restricted']:
try:
zip_file = self.join_file('raw-%s.plain.zip' % access)
with ZipFile(zip_file.os_path) as zf:
info = zf.getinfo(file_path)
if (access == 'restricted' or always_restricted(file_path)) and not self._is_authorized():
raise Restricted
return info.file_size
except FileNotFoundError:
pass
except KeyError:
pass
raise KeyError()
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
for access in ['public', 'restricted']:
try:
......
......@@ -532,6 +532,13 @@ class Upload(Proc):
upload_with_metadata = self.to_upload_with_metadata(self.metadata)
if config.repository_db.publish_enabled:
if config.repository_db.mode == 'coe' and isinstance(self.upload_files, StagingUploadFiles):
with utils.timer(
logger, 'coe extracted raw-file copy created', step='repo',
upload_size=self.upload_files.size):
self.upload_files.create_extracted_copy()
coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id)
if coe_upload is None:
with utils.timer(
......
......@@ -13,6 +13,7 @@ data:
domain: "{{ .Values.domain }}"
fs:
tmp: "{{ .Values.volumes.tmp }}"
coe_extracted: "{{ .Values.volumes.coe_extracted }}"
prefix_size: {{ .Values.volumes.prefixSize }}
working_directory: /app
logstash:
......@@ -44,6 +45,7 @@ data:
dbname: "{{ .Values.dbname }}"
sequential_publish: {{ .Values.postgres.sequential_publish }}
publish_enabled: {{ .Values.postgres.publish_enabled }}
mode: "{{ .Values.postgres.mode }}"
mail:
host: "{{ .Values.mail.host }}"
port: {{ .Values.mail.port }}
......
......@@ -43,6 +43,8 @@ spec:
name: public-volume
- mountPath: /app/.volumes/fs/staging
name: staging-volume
- mountPath: /app/.volumes/fs/extracted
name: extracted-volume
- mountPath: /nomad
name: nomad-volume
env:
......@@ -86,6 +88,10 @@ spec:
hostPath:
path: {{ .Values.volumes.public }}
type: Directory
- name: extracted-volume
hostPath:
path: {{ .Values.volumes.extracted }}
type: Directory
- name: staging-volume
{{ if (eq .Values.worker.storage "memory") }}
emptyDir:
......
......@@ -106,6 +106,9 @@ postgres:
publish_enabled: true
host: nomad-flink-01.esc
port: 5432
## CoE repository mode, values are fairdi, coe. Fairdi stores raw-file path for
# new raw-file API; coe stores raw-file paths for extracted raw-file copy
mode: fairdi
logstash:
port: 5000
......
......@@ -61,13 +61,14 @@ def raw_files_infra():
config.fs.staging = '.volumes/test_fs/staging'
config.fs.public = '.volumes/test_fs/public'
config.fs.migration_packages = '.volumes/test_fs/migration_packages'
config.fs.extracted = '.volumes/test_fs/extracted'
config.fs.prefix_size = 2
@pytest.fixture(scope='function')
def raw_files(raw_files_infra):
""" Provides cleaned out files directory structure per function. Clears files after test. """
directories = [config.fs.staging, config.fs.public, config.fs.migration_packages, config.fs.tmp]
directories = [config.fs.staging, config.fs.public, config.fs.migration_packages, config.fs.tmp, config.fs.extracted]
for directory in directories:
if not os.path.exists(directory):
os.makedirs(directory)
......@@ -545,7 +546,10 @@ def non_empty_processed(non_empty_uploaded: Tuple[str, str], test_user: coe_repo
return test_processing.run_processing(non_empty_uploaded, test_user)
@pytest.fixture(scope='function', params=[False, True])
@pytest.fixture(scope='function', params=[None, 'fairdi', 'coe'])
def with_publish_to_coe_repo(monkeypatch, request):
monkeypatch.setattr('nomad.config.repository_db.publish_enabled', request.param)
return request.param
mode = request.param
if mode is not None:
monkeypatch.setattr('nomad.config.repository_db.publish_enabled', True)
monkeypatch.setattr('nomad.config.repository_db.mode', mode)
return request.param is not None
......@@ -101,7 +101,7 @@ def test_processing(processed, no_warn, mails, monkeypatch):
assert re.search(r'Processing completed', mails.messages[0].data.decode('utf-8')) is not None
def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo):
def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, with_publish_to_coe_repo, monkeypatch):
processed = non_empty_processed
processed.compress_and_set_metadata(example_user_metadata)
......@@ -122,6 +122,9 @@ def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, mo
assert_upload_files(upload, PublicUploadFiles, published=True)
assert_search_upload(upload, additional_keys, published=True)
if with_publish_to_coe_repo and config.repository_db.mode == 'coe':
assert(os.path.exists(os.path.join(config.fs.extracted, upload.upload_id)))
def test_republish(non_empty_processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo):
processed = non_empty_processed
......
......@@ -13,7 +13,7 @@
# limitations under the License.
import pytest
from typing import cast
from typing import cast, Tuple
from passlib.hash import bcrypt
from datetime import datetime
......@@ -97,8 +97,9 @@ def assert_coe_calc(coe_calc: Calc, calc: datamodel.DFTCalcWithMetadata, has_han
assert not coe_calc.with_embargo
def test_add_normalized_calc(postgres, normalized: parsing.LocalBackend, test_user):
calc_with_metadata = datamodel.DFTCalcWithMetadata()
def test_add_normalized_calc(postgres, example_mainfile: Tuple[str, str], normalized: parsing.LocalBackend, test_user):
_, mainfile = example_mainfile
calc_with_metadata = datamodel.DFTCalcWithMetadata(mainfile=mainfile)
calc_with_metadata.apply_domain_metadata(normalized)
calc_with_metadata.uploader = test_user.to_popo()
calc_with_metadata.files = [calc_with_metadata.mainfile, '1', '2', '3', '4']
......
......@@ -332,6 +332,13 @@ class TestStagingUploadFiles(UploadFilesContract):
upload_files.delete()
assert not upload_files.exists()
def test_create_extracted_copy(self, test_upload: StagingUploadWithFiles):
upload, upload_files = test_upload
upload_files.create_extracted_copy()
for calc in upload.calcs:
assert os.path.exists(os.path.join(
config.fs.extracted, upload_files.upload_id, calc.mainfile))
class TestArchiveBasedStagingUploadFiles(UploadFilesFixtures):
def test_create(self, test_upload_id):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment