Commit ee4d5987 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Moved upload user metadata from mongo to files. Added special package creation of OQMD migration.

parent a9415497
Pipeline #47335 passed with stages
in 16 minutes and 57 seconds
......@@ -21,6 +21,7 @@ import re
import shutil
import multiprocessing
import queue
import json
from nomad import config, infrastructure
from nomad.migration import NomadCOEMigration, SourceCalc, Package
......@@ -191,3 +192,12 @@ def upload(
_Migration(threads=parallel).migrate(
*determine_upload_paths(upload_paths, pattern), delete_failed=delete_failed,
create_packages=create_packages)
@migration.command(help='Get an report about not migrated calcs.')
def missing():
infrastructure.setup_logging()
infrastructure.setup_mongo()
report = SourceCalc.missing()
print(json.dumps(report, indent=2))
......@@ -57,11 +57,15 @@ from zipfile import ZipFile, BadZipFile
import tarfile
import hashlib
import io
import pickle
from nomad import config, utils
from nomad.datamodel import UploadWithMetadata
user_metadata_filename = 'user_metadata.pickle'
class PathObject:
"""
Object storage-like abstraction for paths in general.
......@@ -172,6 +176,20 @@ class UploadFiles(DirectoryObject, metaclass=ABCMeta):
self.upload_id = upload_id
self._is_authorized = is_authorized
self._user_metadata_file = self.join_file('user_metadata.pickle')
@property
def user_metadata(self) -> dict:
if self._user_metadata_file.exists():
with open(self._user_metadata_file.os_path, 'rb') as f:
return pickle.load(f)
else:
return {}
@user_metadata.setter
def user_metadata(self, data: dict) -> None:
with open(self._user_metadata_file.os_path, 'wb') as f:
pickle.dump(data, f)
@staticmethod
def get(upload_id: str, *args, **kwargs) -> 'UploadFiles':
......@@ -349,6 +367,12 @@ class StagingUploadFiles(UploadFiles):
create_prefix=True)
assert target_dir.exists()
# copy user metadata
if self._user_metadata_file.exists():
shutil.copyfile(
self._user_metadata_file.os_path,
target_dir.join_file(user_metadata_filename).os_path)
def create_zipfile(kind: str, prefix: str, ext: str) -> ZipFile:
file = target_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext))
return ZipFile(file.os_path, mode='w')
......
......@@ -38,6 +38,7 @@ import io
import threading
from contextlib import contextmanager
import shutil
import json
from nomad import utils, infrastructure, files, config
from nomad.coe_repo import User, Calc, LoginException
......@@ -316,6 +317,16 @@ class Package(Document):
if len(files) == 0:
continue
if len(files) < 20 and any(file.endswith('.tar.gz') for file in files):
# TODO the OQMD case, files are managed as bunch of .tar.gz files
for file in files:
archive_path = os.path.join(root, file)
prefix = os.path.dirname(archive_path)[len(upload_path) + 1:]
with cls.extracted_archive(archive_path) as extracted_archive:
for paths, _, size in cls.iterate_upload_directory(extracted_archive):
yield [os.path.join(prefix, path) for path in paths], upload_path, size
continue
for file in files:
filepath = os.path.join(root, file)
filename = filepath[len(upload_path) + 1:]
......@@ -382,6 +393,60 @@ class SourceCalc(Document):
_dataset_cache: dict = {}
@staticmethod
def missing():
"""
Produces data about non migrated calcs
"""
tmp_data_path = '/tmp/nomad_migration_missing.json'
if os.path.exists(tmp_data_path):
with open(tmp_data_path, 'rt') as f:
data = utils.POPO(**json.load(f))
else:
data = utils.POPO(step=0)
try:
# get source_uploads that have non migrated calcs
if data.step < 1:
import re
data.source_uploads = SourceCalc._get_collection() \
.find({'migration_version': {'$lt': 0}, 'mainfile': {'$not': re.compile(r'^aflowlib_data.*')}}) \
.distinct('upload')
data.step = 1
if data.step < 2:
source_uploads = []
data.packages = utils.POPO()
data.uploads_with_no_package = []
for source_upload in data.source_uploads:
package = Package.objects(upload_id=source_upload).first()
if package is None:
data.uploads_with_no_package.append(source_upload)
else:
source_uploads.append(source_upload)
data.source_uploads = source_uploads
data.step = 2
if data.step < 3:
source_uploads = []
for source_upload in data.source_uploads:
count = SourceCalc.objects(upload=source_upload).count()
source_uploads.append(utils.POPO(id=source_upload, calcs=count))
data.source_uploads = sorted(source_uploads, key=lambda k: k['calcs'])
data.step = 3
if data.step < 4:
source_uploads = []
for source_upload in data.source_uploads:
count = Package.objects(upload_id=source_upload.get('id')).count()
source_upload['packages'] = count
data.step = 4
finally:
with open(tmp_data_path, 'wt') as f:
json.dump(data, f)
return data
@staticmethod
def index(source, drop: bool = False, with_metadata: bool = True, per_query: int = 100) \
-> Generator[Tuple['SourceCalc', int], None, None]:
......
......@@ -369,7 +369,6 @@ class Upload(Proc):
temporary = BooleanField(default=False)
name = StringField(default=None)
metadata = DictField(default=None)
upload_time = DateTimeField()
user_id = StringField(required=True)
published = BooleanField(default=False)
......@@ -385,6 +384,14 @@ class Upload(Proc):
super().__init__(**kwargs)
self._upload_files: ArchiveBasedStagingUploadFiles = None
@property
def metadata(self) -> dict:
return self.upload_files.user_metadata
@metadata.setter
def metadata(self, data: dict) -> None:
self.upload_files.user_metadata = data
@classmethod
def get(cls, id: str, include_published: bool = False) -> 'Upload':
upload = cls.get_by_id(id, 'upload_id')
......@@ -535,10 +542,11 @@ class Upload(Proc):
@property
def upload_files(self) -> UploadFiles:
upload_files_class = ArchiveBasedStagingUploadFiles if not self.published else PublicUploadFiles
kwargs = dict(upload_path=self.upload_path) if not self.published else {}
if not self._upload_files or not isinstance(self._upload_files, upload_files_class):
self._upload_files = upload_files_class(
self.upload_id, is_authorized=lambda: True, upload_path=self.upload_path)
self.upload_id, is_authorized=lambda: True, **kwargs)
return self._upload_files
......
......@@ -95,15 +95,12 @@ def source_package(mongo, migration):
migration.package(*glob.glob('tests/data/migration/*'))
@pytest.mark.parametrize('archived', [False, True])
@pytest.mark.parametrize('variant', ['', '_archived', '_oqmd'])
@pytest.mark.parametrize('n_packages, restriction, upload', [(1, 36, 'baseline'), (2, 0, 'too_big'), (1, 24, 'restriction')])
def test_package(
mongo, migration: NomadCOEMigration, monkeypatch, n_packages, restriction, upload, archived):
mongo, migration: NomadCOEMigration, monkeypatch, n_packages, restriction, upload, variant):
monkeypatch.setattr('nomad.migration.max_package_size', 3)
if archived:
upload = os.path.join('tests/data/migration/packaging_archived', upload)
else:
upload = os.path.join('tests/data/migration/packaging', upload)
upload = os.path.join('tests/data/migration/packaging%s' % variant, upload)
migration.package_index(upload)
packages = Package.objects()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment