From 39be1f850a6596b32a8612cd81f5619f5cf8e200 Mon Sep 17 00:00:00 2001 From: Markus Scheidgen <markus.scheidgen@gmail.com> Date: Wed, 26 Jun 2019 18:01:39 +0200 Subject: [PATCH] Implemented coe python api mimicing #159. --- nomad/coe_repo/base.py | 2 ++ nomad/coe_repo/calc.py | 35 ++++++++++++++---- nomad/coe_repo/upload.py | 10 ++++++ nomad/config.py | 4 ++- nomad/datamodel/base.py | 1 + nomad/files.py | 36 +++++++++++++++++++ nomad/processing/data.py | 7 ++++ ops/helm/nomad/templates/nomad-configmap.yml | 2 ++ .../nomad/templates/worker-deployment.yaml | 6 ++++ ops/helm/nomad/values.yaml | 3 ++ tests/conftest.py | 12 ++++--- tests/processing/test_data.py | 5 ++- tests/test_coe_repo.py | 7 ++-- tests/test_files.py | 7 ++++ 14 files changed, 122 insertions(+), 15 deletions(-) diff --git a/nomad/coe_repo/base.py b/nomad/coe_repo/base.py index 039378ed2d..5f368f9694 100644 --- a/nomad/coe_repo/base.py +++ b/nomad/coe_repo/base.py @@ -57,8 +57,10 @@ class CalcMetaData(Base): # type: ignore calc_id = Column(Integer, ForeignKey('calculations.calc_id'), primary_key=True) calc = relationship('Calc') added = Column(DateTime) + oadate = Column(DateTime) chemical_formula = Column(String) filenames = Column(BYTEA) + download_size = Column(Integer) location = Column(String) version_id = Column(Integer, ForeignKey('codeversions.version_id')) version = relationship('CodeVersion', lazy='joined', uselist=False) diff --git a/nomad/coe_repo/calc.py b/nomad/coe_repo/calc.py index 10febfba61..12ea2fd088 100644 --- a/nomad/coe_repo/calc.py +++ b/nomad/coe_repo/calc.py @@ -12,14 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List +from typing import List, Dict, Any import json from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy.orm import relationship, aliased from sqlalchemy.sql.expression import literal from datetime import datetime +import os.path -from nomad import infrastructure, utils, config +from nomad import infrastructure, utils, config, files from nomad.datamodel import DFTCalcWithMetadata from . import base @@ -56,9 +57,11 @@ class PublishContext: Access to a logger with bound data about the upload, etc. """ - def __init__(self, **kwargs): - self._cache = {} - self.logger = utils.get_logger(__name__, **kwargs) + def __init__(self, upload_id: str = None, **kwargs): + self._cache: Dict[str, Any] = {} + self.upload_id = upload_id + self.upload_files = None if upload_id is None else files.UploadFiles.get(upload_id, is_authorized=lambda: True) + self.logger = utils.get_logger(__name__, upload_id=upload_id, **kwargs) def cache(self, entity, **kwargs): key = json.dumps(dict(entity=entity.__class__.__name__, **kwargs)) @@ -223,11 +226,31 @@ class Calc(Base): else: added_time = datetime.now() + upload_id = context.upload_id + upload_files = context.upload_files + coe_files = list() + if upload_files is None: + upload_size = -1 + else: + upload_size = 0 + + for calc_file in calc.files: + if config.repository_db.mode == 'coe': + coe_file = os.path.join('$EXTRACTED', 'fairdi', upload_id, calc_file) + else: + coe_file = calc_file + + if upload_files is not None: + upload_size += upload_files.raw_file_size(calc_file) + coe_files.append(coe_file) + metadata = CalcMetaData( calc=self, added=added_time, + oadate=added_time, chemical_formula=calc.formula, - filenames=('[%s]' % ','.join(['"%s"' % filename for filename in calc.files])).encode('utf-8'), + filenames=('[%s]' % ','.join(['"%s"' % coe_file for coe_file in coe_files])).encode('utf-8'), + download_size=upload_size, location=calc.mainfile, version=code_version_obj) repo_db.add(metadata) diff --git a/nomad/coe_repo/upload.py b/nomad/coe_repo/upload.py index 65ce0fcc5b..76e39e27f9 100644 --- a/nomad/coe_repo/upload.py +++ b/nomad/coe_repo/upload.py @@ -86,6 +86,7 @@ class Upload(Base): # type: ignore coe_upload_id = Column('upload_id', Integer, primary_key=True, autoincrement=True) upload_name = Column(String) + target_path = Column(String) user_id = Column(Integer, ForeignKey('users.user_id')) is_processed = Column(Boolean) created = Column(DateTime) @@ -140,6 +141,14 @@ class Upload(Base): # type: ignore uploads-entry, respective calculation and property entries. Everything in one transaction. + There are two modes (fairdi, coe). The coe mode will mimic the old python API + behaviour. An additional extracted raw-file copy needs to be stored for the old CoE + repository. Here, we will add the file path to the respective table. + In fairdi mode, only the .zip-based raw file archive is used. The file path data + will be replaced with information necessary to use nomad@fairdis raw-file API. + This function only handles the postgres entries. Files are created elsewhere + (e.g. nomad.processing.data). + Arguments: upload: The upload to add, including calculations with respective IDs, UMD, CMD. @@ -177,6 +186,7 @@ class Upload(Base): # type: ignore # create upload coe_upload = Upload( upload_name=upload.upload_id, + target_path='$EXTRACTED/fairdi/%s' % upload.upload_id if config.repository_db.mode == 'coe' else None, created=upload.upload_time, user_id=upload.uploader.id, is_processed=True) diff --git a/nomad/config.py b/nomad/config.py index c83855dc83..69170583e4 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -78,6 +78,7 @@ fs = NomadConfig( tmp='.volumes/fs/tmp', staging='.volumes/fs/staging', public='.volumes/fs/public', + coe_extracted='.volumes/fs/extracted', migration_packages='.volumes/fs/migration_packages', local_tmp='/tmp', prefix_size=2, @@ -98,7 +99,8 @@ repository_db = NomadConfig( dbname='nomad_fairdi_repo_db', user='postgres', password='nomad', - handle_prefix='21.11132/' + handle_prefix='21.11132/', + mode='fairdi' ) mongo = NomadConfig( diff --git a/nomad/datamodel/base.py b/nomad/datamodel/base.py index deae57fecb..b8f49a7740 100644 --- a/nomad/datamodel/base.py +++ b/nomad/datamodel/base.py @@ -85,6 +85,7 @@ class CalcWithMetadata(): # basic upload and processing related metadata self.upload_time: datetime.datetime = None self.files: List[str] = None + self.file_sizes: List[int] = None self.uploader: utils.POPO = None self.processed: bool = False self.last_processing: datetime.datetime = None diff --git a/nomad/files.py b/nomad/files.py index 5715e9c058..e00dc61664 100644 --- a/nomad/files.py +++ b/nomad/files.py @@ -224,6 +224,13 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta): """ raise NotImplementedError() + def raw_file_size(self, file_path: str) -> int: + """ + Returns: + The size of the given raw file. + """ + raise NotImplementedError() + def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]: """ Returns the path for all raw files in the archive (with a given prefix). @@ -291,6 +298,11 @@ class StagingUploadFiles(UploadFiles): raise Restricted return self._file(self.raw_file_object(file_path), *args, **kwargs) + def raw_file_size(self, file_path: str) -> int: + if not self._is_authorized(): + raise Restricted + return self.raw_file_object(file_path).size + def raw_file_object(self, file_path: str) -> PathObject: return self._raw_dir.join_file(file_path) @@ -364,6 +376,13 @@ class StagingUploadFiles(UploadFiles): """ Returns True if this upload is already *bagged*. """ return self._frozen_file.exists() + def create_extracted_copy(self) -> None: + """ + Copies all raw-file to the extracted bucket to mimic the behaviour of the old + CoE python API. TODO: should be removed after migration. + """ + shutil.copytree(self._raw_dir.os_path, os.path.join(config.fs.extracted, self.upload_id)) + def pack(self, upload: UploadWithMetadata) -> None: """ Replaces the staging upload data with a public upload record by packing all @@ -596,6 +615,23 @@ class PublicUploadFiles(UploadFiles): def raw_file(self, file_path: str, *args, **kwargs) -> IO: return self._file('raw', 'plain', file_path, *args, *kwargs) + def raw_file_size(self, file_path: str) -> int: + for access in ['public', 'restricted']: + try: + zip_file = self.join_file('raw-%s.plain.zip' % access) + with ZipFile(zip_file.os_path) as zf: + info = zf.getinfo(file_path) + if (access == 'restricted' or always_restricted(file_path)) and not self._is_authorized(): + raise Restricted + + return info.file_size + except FileNotFoundError: + pass + except KeyError: + pass + + raise KeyError() + def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]: for access in ['public', 'restricted']: try: diff --git a/nomad/processing/data.py b/nomad/processing/data.py index cc777817ec..f6845a741b 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -532,6 +532,13 @@ class Upload(Proc): upload_with_metadata = self.to_upload_with_metadata(self.metadata) if config.repository_db.publish_enabled: + if config.repository_db.mode == 'coe' and isinstance(self.upload_files, StagingUploadFiles): + with utils.timer( + logger, 'coe extracted raw-file copy created', step='repo', + upload_size=self.upload_files.size): + + self.upload_files.create_extracted_copy() + coe_upload = coe_repo.Upload.from_upload_id(upload_with_metadata.upload_id) if coe_upload is None: with utils.timer( diff --git a/ops/helm/nomad/templates/nomad-configmap.yml b/ops/helm/nomad/templates/nomad-configmap.yml index 279c1d9087..da4e8107de 100644 --- a/ops/helm/nomad/templates/nomad-configmap.yml +++ b/ops/helm/nomad/templates/nomad-configmap.yml @@ -13,6 +13,7 @@ data: domain: "{{ .Values.domain }}" fs: tmp: "{{ .Values.volumes.tmp }}" + coe_extracted: "{{ .Values.volumes.coe_extracted }}" prefix_size: {{ .Values.volumes.prefixSize }} working_directory: /app logstash: @@ -44,6 +45,7 @@ data: dbname: "{{ .Values.dbname }}" sequential_publish: {{ .Values.postgres.sequential_publish }} publish_enabled: {{ .Values.postgres.publish_enabled }} + mode: "{{ .Values.postgres.mode }}" mail: host: "{{ .Values.mail.host }}" port: {{ .Values.mail.port }} diff --git a/ops/helm/nomad/templates/worker-deployment.yaml b/ops/helm/nomad/templates/worker-deployment.yaml index 656afd23f2..51dcfdbf3f 100644 --- a/ops/helm/nomad/templates/worker-deployment.yaml +++ b/ops/helm/nomad/templates/worker-deployment.yaml @@ -43,6 +43,8 @@ spec: name: public-volume - mountPath: /app/.volumes/fs/staging name: staging-volume + - mountPath: /app/.volumes/fs/extracted + name: extracted-volume - mountPath: /nomad name: nomad-volume env: @@ -86,6 +88,10 @@ spec: hostPath: path: {{ .Values.volumes.public }} type: Directory + - name: extracted-volume + hostPath: + path: {{ .Values.volumes.extracted }} + type: Directory - name: staging-volume {{ if (eq .Values.worker.storage "memory") }} emptyDir: diff --git a/ops/helm/nomad/values.yaml b/ops/helm/nomad/values.yaml index c801523df8..31c0f8ced5 100644 --- a/ops/helm/nomad/values.yaml +++ b/ops/helm/nomad/values.yaml @@ -106,6 +106,9 @@ postgres: publish_enabled: true host: nomad-flink-01.esc port: 5432 + ## CoE repository mode, values are fairdi, coe. Fairdi stores raw-file path for + # new raw-file API; coe stores raw-file paths for extracted raw-file copy + mode: fairdi logstash: port: 5000 diff --git a/tests/conftest.py b/tests/conftest.py index 2117d05fd9..4adf0e7b2b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -61,13 +61,14 @@ def raw_files_infra(): config.fs.staging = '.volumes/test_fs/staging' config.fs.public = '.volumes/test_fs/public' config.fs.migration_packages = '.volumes/test_fs/migration_packages' + config.fs.extracted = '.volumes/test_fs/extracted' config.fs.prefix_size = 2 @pytest.fixture(scope='function') def raw_files(raw_files_infra): """ Provides cleaned out files directory structure per function. Clears files after test. """ - directories = [config.fs.staging, config.fs.public, config.fs.migration_packages, config.fs.tmp] + directories = [config.fs.staging, config.fs.public, config.fs.migration_packages, config.fs.tmp, config.fs.extracted] for directory in directories: if not os.path.exists(directory): os.makedirs(directory) @@ -545,7 +546,10 @@ def non_empty_processed(non_empty_uploaded: Tuple[str, str], test_user: coe_repo return test_processing.run_processing(non_empty_uploaded, test_user) -@pytest.fixture(scope='function', params=[False, True]) +@pytest.fixture(scope='function', params=[None, 'fairdi', 'coe']) def with_publish_to_coe_repo(monkeypatch, request): - monkeypatch.setattr('nomad.config.repository_db.publish_enabled', request.param) - return request.param + mode = request.param + if mode is not None: + monkeypatch.setattr('nomad.config.repository_db.publish_enabled', True) + monkeypatch.setattr('nomad.config.repository_db.mode', mode) + return request.param is not None diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index acdc7e02ab..28949ecc98 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -101,7 +101,7 @@ def test_processing(processed, no_warn, mails, monkeypatch): assert re.search(r'Processing completed', mails.messages[0].data.decode('utf-8')) is not None -def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo): +def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, with_publish_to_coe_repo, monkeypatch): processed = non_empty_processed processed.compress_and_set_metadata(example_user_metadata) @@ -122,6 +122,9 @@ def test_publish(non_empty_processed: Upload, no_warn, example_user_metadata, mo assert_upload_files(upload, PublicUploadFiles, published=True) assert_search_upload(upload, additional_keys, published=True) + if with_publish_to_coe_repo and config.repository_db.mode == 'coe': + assert(os.path.exists(os.path.join(config.fs.extracted, upload.upload_id))) + def test_republish(non_empty_processed: Upload, no_warn, example_user_metadata, monkeypatch, with_publish_to_coe_repo): processed = non_empty_processed diff --git a/tests/test_coe_repo.py b/tests/test_coe_repo.py index 82ab496b91..b1a5d8c4d2 100644 --- a/tests/test_coe_repo.py +++ b/tests/test_coe_repo.py @@ -13,7 +13,7 @@ # limitations under the License. import pytest -from typing import cast +from typing import cast, Tuple from passlib.hash import bcrypt from datetime import datetime @@ -97,8 +97,9 @@ def assert_coe_calc(coe_calc: Calc, calc: datamodel.DFTCalcWithMetadata, has_han assert not coe_calc.with_embargo -def test_add_normalized_calc(postgres, normalized: parsing.LocalBackend, test_user): - calc_with_metadata = datamodel.DFTCalcWithMetadata() +def test_add_normalized_calc(postgres, example_mainfile: Tuple[str, str], normalized: parsing.LocalBackend, test_user): + _, mainfile = example_mainfile + calc_with_metadata = datamodel.DFTCalcWithMetadata(mainfile=mainfile) calc_with_metadata.apply_domain_metadata(normalized) calc_with_metadata.uploader = test_user.to_popo() calc_with_metadata.files = [calc_with_metadata.mainfile, '1', '2', '3', '4'] diff --git a/tests/test_files.py b/tests/test_files.py index 01ff452ccc..1b6bebd10a 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -332,6 +332,13 @@ class TestStagingUploadFiles(UploadFilesContract): upload_files.delete() assert not upload_files.exists() + def test_create_extracted_copy(self, test_upload: StagingUploadWithFiles): + upload, upload_files = test_upload + upload_files.create_extracted_copy() + for calc in upload.calcs: + assert os.path.exists(os.path.join( + config.fs.extracted, upload_files.upload_id, calc.mainfile)) + class TestArchiveBasedStagingUploadFiles(UploadFilesFixtures): def test_create(self, test_upload_id): -- GitLab