Commit 38d3de89 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Refactoring creation of file names in public upload files. #466

parent 6f365dcb
......@@ -161,21 +161,14 @@ class MirrorFilesResource(Resource):
upload_files = PublicUploadFiles(upload_id)
if prefix == 'raw':
ext = 'plain'
ending = 'zip'
fileobj = upload_files._raw_file_object('public')
elif prefix == 'archive':
ext = 'msg'
ending = 'msg'
elif prefix == 'legacy-archive':
ext = 'json'
ending = 'zip'
fileobj = upload_files._msg_file_object('public')
else:
abort(400, message='Unsupported prefix.')
fileobj = upload_files._file_object(prefix, 'public', ext, ending)
if not fileobj.exists():
raise KeyError
......
......@@ -24,7 +24,7 @@ import pymongo
import elasticsearch_dsl as es
import json
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel, archive
from nomad import processing as proc, config, infrastructure, utils, search, files, datamodel
from .admin import admin, __run_processing, __run_parallel
......@@ -322,30 +322,6 @@ def rm(ctx, uploads, skip_es, skip_mongo, skip_files):
delete_upload(upload, skip_es=skip_es, skip_mongo=skip_mongo, skip_files=skip_files)
@uploads.command(help='Create msgpack file for upload')
@click.argument('UPLOADS', nargs=-1)
@click.pass_context
def msgpack(ctx, uploads):
_, uploads = query_uploads(ctx, uploads)
for upload in uploads:
upload_files = files.UploadFiles.get(upload_id=upload.upload_id)
if isinstance(upload_files, files.PublicUploadFiles):
def iterator(zf, names):
for name in names:
calc_id = name.strip('.json')
with zf.open(name) as f:
yield (calc_id, json.load(f))
for access in ['public', 'restricted']:
with upload_files._open_zip_file('archive', access, 'json') as zf:
archive_path = upload_files._file_object('archive', access, 'msg', 'msg').os_path
names = [name for name in zf.namelist() if name.endswith('json')]
archive.write_archive(archive_path, len(names), iterator(zf, names))
print('wrote msgpack archive %s' % archive_path)
@uploads.command(help='Reprocess selected uploads.')
@click.argument('UPLOADS', nargs=-1)
@click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.')
......
......@@ -114,6 +114,7 @@ fs = NomadConfig(
public='.volumes/fs/public',
local_tmp='/tmp',
prefix_size=2,
archive_version_suffix=None,
working_directory=os.getcwd()
)
......
......@@ -455,14 +455,13 @@ class StagingUploadFiles(UploadFiles):
create_prefix=True)
assert target_dir.exists()
# In prior versions we used bagit on raw files. There was not much purpose for
# it, so it was removed. Check 0.3.x for the implementation
def create_zipfile(kind: str, prefix: str, ext: str) -> zipfile.ZipFile:
file = target_dir.join_file('%s-%s.%s.zip' % (kind, prefix, ext))
return zipfile.ZipFile(file.os_path, mode='w')
def create_zipfile(access: str):
return zipfile.ZipFile(
PublicUploadFiles._create_raw_file_object(target_dir, access).os_path,
mode='w')
def write_msgfile(kind: str, prefix: str, ext: str, size: int, data: Iterable[Tuple[str, Any]]):
file_object = target_dir.join_file('%s-%s.%s.msg' % (kind, prefix, ext))
def write_msgfile(access: str, size: int, data: Iterable[Tuple[str, Any]]):
file_object = PublicUploadFiles._create_msg_file_object(target_dir, access)
write_archive(file_object.os_path, size, data)
# zip archives
......@@ -495,8 +494,8 @@ class StagingUploadFiles(UploadFiles):
yield (calc.calc_id, {})
try:
write_msgfile('archive', 'public', 'msg', public, create_iterator(False))
write_msgfile('archive', 'restricted', 'msg', restricted, create_iterator(True))
write_msgfile('public', public, create_iterator(False))
write_msgfile('restricted', restricted, create_iterator(True))
except Exception as e:
self.logger.error('exception during packing archives', exc_info=e)
......@@ -504,8 +503,8 @@ class StagingUploadFiles(UploadFiles):
return restricted, public
def _pack_raw_files(self, entries: Iterable[datamodel.EntryMetadata], create_zipfile):
raw_public_zip = create_zipfile('raw', 'public', 'plain')
raw_restricted_zip = create_zipfile('raw', 'restricted', 'plain')
raw_public_zip = create_zipfile('public')
raw_restricted_zip = create_zipfile('restricted')
try:
# 1. add all public raw files
......@@ -694,12 +693,12 @@ class PublicUploadFilesBasedStagingUploadFiles(StagingUploadFiles):
def extract(self, include_archive: bool = False) -> None:
assert next(self.raw_file_manifest(), None) is None, 'can only extract once'
for access in ['public', 'restricted']:
raw_file_zip = self.public_upload_files._zip_file_object('raw', access, 'plain')
raw_file_zip = self.public_upload_files._raw_file_object(access)
if raw_file_zip.exists():
super().add_rawfiles(raw_file_zip.os_path, force_archive=True)
if include_archive:
with self.public_upload_files._open_msg_file('archive', access, 'msg') as archive:
with self.public_upload_files._open_msg_file(access) as archive:
for calc_id, data in archive.items():
calc_id = calc_id.strip()
self.write_archive(calc_id, data.to_dict())
......@@ -729,29 +728,42 @@ class PublicUploadFiles(UploadFiles):
for f in self._archive_msg_files.values():
f.close()
def _file_object(self, prefix: str, access: str, ext: str, ending: str) -> PathObject:
return self.join_file('%s-%s.%s.%s' % (prefix, access, ext, ending))
@staticmethod
def _create_raw_file_object(dir: DirectoryObject, access: str, suffix: str = '') -> PathObject:
return dir.join_file(f'raw-{access}{suffix}.plain.zip')
def _zip_file_object(self, prefix: str, access: str, ext: str) -> PathObject:
return self._file_object(prefix, access, ext, 'zip')
def _raw_file_object(self, access: str, **kwargs) -> PathObject:
return PublicUploadFiles._create_raw_file_object(self, access, **kwargs)
def _open_zip_file(self, prefix: str, access: str, ext: str) -> zipfile.ZipFile:
def _open_raw_file(self, access: str) -> zipfile.ZipFile:
if access in self._raw_zip_files:
return self._raw_zip_files[access]
zip_path = self._zip_file_object(prefix, access, ext).os_path
zip_path = self._raw_file_object(access).os_path
f = zipfile.ZipFile(zip_path)
self._raw_zip_files[access] = f
return f
def _open_msg_file(self, prefix: str, access: str, ext: str) -> ArchiveReader:
@staticmethod
def _create_msg_file_object(dir: DirectoryObject, access: str, suffix: str = '') -> PathObject:
if config.fs.archive_version_suffix:
return dir.join_file(
f'archive-{access}{suffix}-{config.fs.archive_version_suffix}.msg.msg')
return dir.join_file(f'archive-{access}{suffix}.msg.msg')
def _msg_file_object(self, access: str, **kwargs) -> PathObject:
return PublicUploadFiles._create_msg_file_object(self, access, **kwargs)
def _open_msg_file(self, access: str) -> ArchiveReader:
if access in self._archive_msg_files:
archive = self._archive_msg_files[access]
if not archive.is_closed():
return archive
msg_object = self._file_object(prefix, access, ext, 'msg')
msg_object = self._msg_file_object(access)
if not msg_object.exists():
raise FileNotFoundError()
......@@ -761,32 +773,6 @@ class PublicUploadFiles(UploadFiles):
return archive
def _file_in_zip(self, prefix: str, ext: str, path: str, *args, **kwargs) -> IO:
mode = kwargs.get('mode') if len(args) == 0 else args[0]
if 'mode' in kwargs:
del(kwargs['mode'])
mode = mode if mode else 'rb'
for access in ['public', 'restricted']:
try:
zf = self._open_zip_file(prefix, access, ext)
f = zf.open(path, 'r', **kwargs)
if (access == 'restricted' or always_restricted(path)) and not self._is_authorized():
raise Restricted
if 't' in mode:
return io.TextIOWrapper(f)
else:
return f
except FileNotFoundError:
pass
except IsADirectoryError:
pass
except KeyError:
pass
raise KeyError(path)
def to_staging_upload_files(self, create: bool = False, **kwargs) -> 'StagingUploadFiles':
exists = False
try:
......@@ -807,22 +793,46 @@ class PublicUploadFiles(UploadFiles):
return staging_upload_files
def add_metadata_file(self, metadata: dict):
zip_path = self._zip_file_object('raw', 'public', 'plain').os_path
zip_path = self._raw_file_object('public').os_path
with zipfile.ZipFile(zip_path, 'a') as zf:
with zf.open('nomad.json', 'w') as f:
f.write(json.dumps(metadata).encode())
@property
def public_raw_data_file(self):
return self._zip_file_object('raw', 'public', 'plain').os_path
return self._raw_file_object('public').os_path
def raw_file(self, file_path: str, *args, **kwargs) -> IO:
return self._file_in_zip('raw', 'plain', file_path, *args, *kwargs)
mode = kwargs.get('mode') if len(args) == 0 else args[0]
if 'mode' in kwargs:
del(kwargs['mode'])
mode = mode if mode else 'rb'
for access in ['public', 'restricted']:
try:
zf = self._open_raw_file(access)
f = zf.open(file_path, 'r', **kwargs)
if (access == 'restricted' or always_restricted(file_path)) and not self._is_authorized():
raise Restricted
if 't' in mode:
return io.TextIOWrapper(f)
else:
return f
except FileNotFoundError:
pass
except IsADirectoryError:
pass
except KeyError:
pass
raise KeyError(file_path)
def raw_file_size(self, file_path: str) -> int:
for access in ['public', 'restricted']:
try:
zf = self._open_zip_file('raw', access, 'plain')
zf = self._open_raw_file(access)
info = zf.getinfo(file_path)
if (access == 'restricted' or always_restricted(file_path)) and not self._is_authorized():
raise Restricted
......@@ -838,7 +848,7 @@ class PublicUploadFiles(UploadFiles):
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
for access in ['public', 'restricted']:
try:
zf = self._open_zip_file('raw', access, 'plain')
zf = self._open_raw_file(access)
for path in zf.namelist():
if path_prefix is None or path.startswith(path_prefix):
yield path
......@@ -850,7 +860,7 @@ class PublicUploadFiles(UploadFiles):
self._directories = dict()
for access in ['public', 'restricted']:
try:
zf = self._open_zip_file('raw', access, 'plain')
zf = self._open_raw_file(access)
for path in zf.namelist():
file_name = os.path.basename(path)
directory_path = os.path.dirname(path)
......@@ -884,7 +894,7 @@ class PublicUploadFiles(UploadFiles):
for access in accesses:
try:
archive = self._open_msg_file('archive', access, 'msg')
archive = self._open_msg_file(access)
if calc_id in archive:
if access == 'restricted' and not self._is_authorized():
raise Restricted
......@@ -905,17 +915,16 @@ class PublicUploadFiles(UploadFiles):
'''
# compute a list of files to repack
files = []
kinds = []
if not skip_archive:
kinds.append(('archive', 'msg'))
if not skip_raw:
kinds.append(('raw', 'plain'))
for kind, ext in kinds:
for prefix in ['public', 'restricted']:
ending = 'zip' if kind == 'raw' else 'msg'
for access in ['public', 'restricted']:
if not skip_archive:
files.append((
self._msg_file_object(access, suffix='repacked'),
self._msg_file_object(access)))
if not skip_raw:
files.append((
self.join_file('%s-%s.%s.repacked.%s' % (kind, prefix, ext, ending)),
self.join_file('%s-%s.%s.%s' % (kind, prefix, ext, ending))))
self._raw_file_object(access, suffix='repacked'),
self._raw_file_object(access)))
# check if there already is a running repack
for repacked_file, _ in files:
......@@ -925,12 +934,12 @@ class PublicUploadFiles(UploadFiles):
# create staging files
staging_upload = self.to_staging_upload_files(create=True, include_archive=True)
def create_zipfile(kind: str, prefix: str, ext: str) -> zipfile.ZipFile:
file = self.join_file('%s-%s.%s.repacked.zip' % (kind, prefix, ext))
def create_zipfile(access: str) -> zipfile.ZipFile:
file = self._raw_file_object(access, suffix='repacked')
return zipfile.ZipFile(file.os_path, mode='w')
def write_msgfile(kind: str, prefix: str, ext: str, size: int, data: Iterable[Tuple[str, Any]]):
file = self.join_file('%s-%s.%s.repacked.msg' % (kind, prefix, ext))
def write_msgfile(access: str, size: int, data: Iterable[Tuple[str, Any]]):
file = self._msg_file_object(access, suffix='repacked')
write_archive(file.os_path, size, data)
# perform the repacking
......
......@@ -21,7 +21,6 @@ import pytest
import click.testing
import json
import datetime
import zipfile
import time
from nomad import search, processing as proc, files
......@@ -193,24 +192,6 @@ class TestAdminUploads:
assert Upload.objects(upload_id=upload_id).first() is None
assert Calc.objects(upload_id=upload_id).first() is None
def test_msgpack(self, published):
upload_id = published.upload_id
upload_files = files.UploadFiles.get(upload_id=upload_id)
for access in ['public', 'restricted']:
zip_path = upload_files._file_object('archive', access, 'json', 'zip').os_path
with zipfile.ZipFile(zip_path, mode='w') as zf:
for i in range(0, 2):
with zf.open('%d_%s.json' % (i, access), 'w') as f:
f.write(json.dumps(dict(archive='test')).encode())
result = click.testing.CliRunner().invoke(
cli, ['admin', 'uploads', 'msgpack', upload_id], catch_exceptions=False)
assert result.exit_code == 0
assert 'wrote msgpack archive' in result.stdout
with upload_files.read_archive('0_public') as archive:
assert archive['0_public'].to_dict() == dict(archive='test')
def test_index(self, published):
upload_id = published.upload_id
calc = Calc.objects(upload_id=upload_id).first()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment