Commit 856b026d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Optimized packaging. [skip ci]

parent 3a3ca31e
Pipeline #45760 skipped
......@@ -68,6 +68,7 @@ fs = NomadConfig(
staging='.volumes/fs/staging',
public='.volumes/fs/public',
migration_packages='.volumes/fs/migration_packages',
local_tmp='/tmp',
prefix_size=2
)
......
......@@ -20,7 +20,7 @@ other/older nomad@FAIRDI instances to mass upload it to a new nomad@FAIRDI insta
.. autoclass:: SourceCalc
"""
from typing import Generator, Tuple, List, Iterable, Any, Dict, BinaryIO, Callable, ContextManager
from typing import Generator, Tuple, List, Iterable, Any, Dict
import os
import os.path
import zipfile
......@@ -34,6 +34,7 @@ import runstats
import io
import threading
from contextlib import contextmanager
import shutil
from nomad import utils, infrastructure, files, config
from nomad.coe_repo import User, Calc, LoginException
......@@ -87,9 +88,7 @@ def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
return io.BufferedReader(IterStream(), buffer_size=buffer_size)
OpenFunction = Callable[[Any], ContextManager[BinaryIO]]
PathFunction = Callable[[Any], str]
Directory = Tuple[List[Any], int]
Directory = Tuple[List[str], str, int]
class Package(Document):
......@@ -187,39 +186,34 @@ class Package(Document):
package_size = 0
package_files = 0
package_zip = open_package_zip(package_entry)
for (fileobjects, size), path, open in cls.iterate_directory(upload_path):
for fileobject in fileobjects:
filepath = path(fileobject)
basepath = os.path.basename(filepath)
if basepath.startswith('RESTRICTED'):
restricted = 36
try:
restricted = min(36, int(basepath[len('RESTRICTED_'):]))
except Exception:
pass
with package_zip.open(filepath, "w") as target:
with open(fileobject) as source:
buffer = source.read(1024 * 1024)
while len(buffer) > 0:
target.write(buffer)
buffer = source.read(1024 * 1024)
package_files += 1
if size > max_package_size:
logger.warn(
'directory exceeds max package size',
package_id=package_entry.package_id, size=package_size)
package_size += size
if package_size > max_package_size:
with cls.upload_iterator(upload_path) as directory:
for filepaths, parent_diretory, size in directory:
for filepath in filepaths:
basepath = os.path.basename(filepath)
if basepath.startswith('RESTRICTED'):
restricted = 36
try:
restricted = min(36, int(basepath[len('RESTRICTED_'):]))
except Exception:
pass
package_zip.write(os.path.join(parent_diretory, filepath), filepath)
package_files += 1
if size > max_package_size:
logger.warn(
'directory exceeds max package size',
package_id=package_entry.package_id, size=package_size)
package_size += size
if package_size > max_package_size:
close_package(package_size, package_files)
package_size, package_files = 0, 0
package_entry = create_package_entry()
package_zip = open_package_zip(package_entry)
if package_files > 0:
close_package(package_size, package_files)
package_size, package_files = 0, 0
package_entry = create_package_entry()
package_zip = open_package_zip(package_entry)
if package_files > 0:
close_package(package_size, package_files)
package_query = cls.objects(upload_id=upload_id)
package_query.update(restricted=restricted)
......@@ -232,38 +226,22 @@ class Package(Document):
return package_query
@classmethod
def iterate_directory(
cls, upload_path: str) \
-> Generator[Tuple[Directory, PathFunction, OpenFunction], None, None]:
@contextmanager
def upload_iterator(cls, upload_path: str) -> Generator[Generator[Directory, None, None], None, None]:
"""
Traverses the given upload path and provides directory and file information.
Returns:
A generator of tuples with the directory, a path function, and an open
function. The directory contains an iterable of fileobjects and the directory
size. The path function gives the pathname of fileobject. The open function
create a contextmanager for a readable file-like for a fileobject.
A contextmanager that opens the given upload and provides a generator for
directories. Directories are tuple of an iterable of upload relative filepaths
and the directory size.
"""
potential_archive_path = os.path.join(upload_path, 'archive.tar.gz')
if os.path.isfile(potential_archive_path):
with tarfile.TarFile.open(potential_archive_path) as tar_file:
@contextmanager
def open_function(info):
file = tar_file.extractfile(info)
yield file
file.close()
for dir in cls._iterate_upload_archive(tar_file):
yield dir, lambda o: o.name, open_function
with cls.extracted_archive(potential_archive_path) as extracted_archive:
yield cls.iterate_upload_directory(extracted_archive)
else:
def open_function(filepath):
return open(os.path.join(upload_path, filepath), 'rb')
for dir in cls._iterate_upload_directory(upload_path):
yield dir, lambda p: p, open_function
yield cls.iterate_upload_directory(upload_path)
@classmethod
def _iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
def iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
"""
Interprets the given upload path as a directory. Files path are given as upload
path relative paths.
......@@ -290,33 +268,24 @@ class Package(Document):
filesize = stats.mean()
directory_size += filesize
yield directory_filepaths, directory_size
yield directory_filepaths, upload_path, directory_size
@classmethod
def _iterate_upload_archive(cls, tar_file: tarfile.TarFile) -> Generator[Directory, None, None]:
@contextmanager
def extracted_archive(cls, archive_path: str) -> Generator[str, None, None]:
"""
Interprets the given upload path as an archive. File paths are as in the archive.
Temporarily extracts the given archive and returns directory with the extracted
data.
"""
current_directory = None
directory_infos: List[tarfile.TarInfo] = []
directory_size = 0
f = tar_file.next()
while f is not None:
if f.isfile():
directory = os.path.dirname(f.name)
if current_directory != directory and len(directory_infos) > 0:
yield directory_infos, directory_size
directory_infos = []
directory_size = 0
current_directory = directory
directory_infos.append(f)
directory_size += f.size
f = tar_file.next()
yield directory_infos, directory_size
tmp_directory = os.path.join(config.fs.local_tmp, utils.create_uuid())
os.mkdir(tmp_directory)
with tarfile.TarFile.open(archive_path) as tar_file:
tar_file.extractall(tmp_directory)
yield tmp_directory
shutil.rmtree(tmp_directory)
class SourceCalc(Document):
......
......@@ -5,3 +5,4 @@ export NOMAD_FS_MIGRATION_PACKAGES=/nomad/fairdi/migration/fs/migration_packages
export NOMAD_FS_STAGING=/nomad/fairdi/migration/fs/staging
export NOMAD_FS_PUBLIC=/nomad/fairdi/migration/fs/public
export NOMAD_FS_TMP=/nomad/fairdi/migration/fs/tmp
export NOMAD_FS_LOCAL_TMP=/scratch/fairdi/tmp
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment