Commit 3a3ca31e authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Optimized upload tar handling. [skip ci]

parent 8d4af687
Pipeline #45759 skipped
......@@ -24,6 +24,7 @@ from typing import Generator, Tuple, List, Iterable, Any, Dict, BinaryIO, Callab
import os
import os.path
import zipfile
import tarfile
import math
from mongoengine import Document, IntField, StringField, DictField
import datetime
......@@ -32,7 +33,6 @@ import os
import runstats
import io
import threading
from tarfile import TarFile
from contextlib import contextmanager
from nomad import utils, infrastructure, files, config
......@@ -87,8 +87,9 @@ def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
return io.BufferedReader(IterStream(), buffer_size=buffer_size)
OpenFunction = Callable[[str], ContextManager[BinaryIO]]
Directory = Tuple[List[str], int]
OpenFunction = Callable[[Any], ContextManager[BinaryIO]]
PathFunction = Callable[[Any], str]
Directory = Tuple[List[Any], int]
class Package(Document):
......@@ -186,8 +187,9 @@ class Package(Document):
package_size = 0
package_files = 0
package_zip = open_package_zip(package_entry)
for (filepaths, size), open in cls.iterate_directory(upload_path):
for filepath in filepaths:
for (fileobjects, size), path, open in cls.iterate_directory(upload_path):
for fileobject in fileobjects:
filepath = path(fileobject)
basepath = os.path.basename(filepath)
if basepath.startswith('RESTRICTED'):
restricted = 36
......@@ -197,7 +199,7 @@ class Package(Document):
pass
with package_zip.open(filepath, "w") as target:
with open(filepath) as source:
with open(fileobject) as source:
buffer = source.read(1024 * 1024)
while len(buffer) > 0:
target.write(buffer)
......@@ -230,34 +232,35 @@ class Package(Document):
return package_query
@classmethod
def iterate_directory(cls, upload_path: str) -> Generator[Tuple[Directory, OpenFunction], None, None]:
def iterate_directory(
cls, upload_path: str) \
-> Generator[Tuple[Directory, PathFunction, OpenFunction], None, None]:
"""
Traverses the given upload path and provides directory and file information.
Returns:
A tuple with an open function and directory generator. The open function
take a path as argument and create a context manager with a BinaryIO file like.
The path are as provided by the directory generator. The directory generator
yields directories, which are tuples of a path list and a approx. directory
size in bytes.
A generator of tuples with the directory, a path function, and an open
function. The directory contains an iterable of fileobjects and the directory
size. The path function gives the pathname of fileobject. The open function
create a contextmanager for a readable file-like for a fileobject.
"""
potential_archive_path = os.path.join(upload_path, 'archive.tar.gz')
if os.path.isfile(potential_archive_path):
with TarFile.open(potential_archive_path) as tar_file:
with tarfile.TarFile.open(potential_archive_path) as tar_file:
@contextmanager
def open_function(filepath):
file = tar_file.extractfile(filepath)
def open_function(info):
file = tar_file.extractfile(info)
yield file
file.close()
for dir in cls._iterate_upload_archive(tar_file):
yield dir, open_function
yield dir, lambda o: o.name, open_function
else:
def open_function(filepath):
return open(os.path.join(upload_path, filepath), 'rb')
for dir in cls._iterate_upload_directory(upload_path):
yield dir, open_function
yield dir, lambda p: p, open_function
@classmethod
def _iterate_upload_directory(cls, upload_path) -> Generator[Directory, None, None]:
......@@ -290,29 +293,30 @@ class Package(Document):
yield directory_filepaths, directory_size
@classmethod
def _iterate_upload_archive(cls, tar_file: TarFile) -> Generator[Directory, None, None]:
def _iterate_upload_archive(cls, tar_file: tarfile.TarFile) -> Generator[Directory, None, None]:
"""
Interprets the given upload path as an archive. File paths are as in the archive.
"""
current_directory = None
directory_filepaths: List[str] = []
directory_infos: List[tarfile.TarInfo] = []
directory_size = 0
for f in tar_file.getmembers():
f = tar_file.next()
while f is not None:
if f.isfile():
directory = os.path.dirname(f.name)
if current_directory != directory and len(directory_filepaths) > 0:
yield directory_filepaths, directory_size
directory_filepaths = []
if current_directory != directory and len(directory_infos) > 0:
yield directory_infos, directory_size
directory_infos = []
directory_size = 0
current_directory = directory
directory_filepaths.append(f.name)
directory_infos.append(f)
directory_size += f.size
f = tar_file.next()
yield directory_filepaths, directory_size
yield directory_infos, directory_size
class SourceCalc(Document):
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment