Commit 8d02075a authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Improved upload file handling of various archive formats. Fixed #92

parent cf9f6a4f
Pipeline #41905 passed with stages
in 15 minutes and 26 seconds
......@@ -210,7 +210,7 @@ class Uploads extends React.Component {
<Paper className={classes.dropzoneContainer}>
<Dropzone
accept="application/zip"
accept={['application/zip', 'application/gzip', 'application/bz2']}
className={classes.dropzone}
activeClassName={classes.dropzoneAccept}
rejectClassName={classes.dropzoneReject}
......
......@@ -157,38 +157,52 @@ class UploadListResource(Resource):
logger.info('upload created', upload_id=upload.upload_id)
upload_files = ArchiveBasedStagingUploadFiles(
upload.upload_id, create=True, local_path=local_path)
if local_path:
# file is already there and does not to be received
pass
elif request.mimetype == 'application/multipart-formdata':
# multipart formdata, e.g. with curl -X put "url" -F file=@local_file
# might have performance issues for large files: https://github.com/pallets/flask/issues/2086
if 'file' in request.files:
abort(400, message='Bad multipart-formdata, there is no file part.')
file = request.files['file']
if upload.name is '':
upload.name = file.filename
file.save(upload_files.upload_file_os_path)
else:
# simple streaming data in HTTP body, e.g. with curl "url" -T local_file
try:
with open(upload_files.upload_file_os_path, 'wb') as f:
while not request.stream.is_exhausted:
f.write(request.stream.read(1024))
except Exception as e:
logger.warning('Error on streaming upload', exc_info=e)
abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
if not upload_files.is_valid:
try:
if local_path:
# file is already there and does not to be received
upload_files = ArchiveBasedStagingUploadFiles(
upload.upload_id, create=True, local_path=local_path)
elif request.mimetype == 'application/multipart-formdata':
# multipart formdata, e.g. with curl -X put "url" -F file=@local_file
# might have performance issues for large files: https://github.com/pallets/flask/issues/2086
if 'file' in request.files:
abort(400, message='Bad multipart-formdata, there is no file part.')
file = request.files['file']
if upload.name is None or upload.name is '':
upload.name = file.filename
upload_files = ArchiveBasedStagingUploadFiles(
upload.upload_id, create=True, local_path=local_path,
file_name='.upload.%s' % os.path.splitext(file.filename)[1])
file.save(upload_files.upload_file_os_path)
else:
# simple streaming data in HTTP body, e.g. with curl "url" -T local_file
file_name = '.upload'
try:
ext = os.path.splitext(upload.name)[1]
if ext is not None:
file_name += '.' + ext
except Exception:
pass
upload_files = ArchiveBasedStagingUploadFiles(
upload.upload_id, create=True, local_path=local_path,
file_name='.upload')
try:
with open(upload_files.upload_file_os_path, 'wb') as f:
while not request.stream.is_exhausted:
f.write(request.stream.read(1024))
except Exception as e:
logger.warning('Error on streaming upload', exc_info=e)
abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
except Exception as e:
upload_files.delete()
upload.delete(force=True)
logger.info('Invalid upload')
abort(400, message='Bad file format, excpected %s.' % ", ".join(upload_files.formats))
logger.info('Invalid or aborted upload')
raise e
logger.info('received uploaded file')
upload.upload_time = datetime.now()
......@@ -365,7 +379,7 @@ class UploadCommandResource(Resource):
config.services.api_port,
config.services.api_base_path)
upload_command = 'curl -H "X-Token: %s" "%s" --upload-file <local_file>' % (
upload_command = 'curl -X PUT -H "X-Token: %s" "%s" -F file=@<local_file>' % (
g.user.get_auth_token().decode('utf-8'), upload_url)
return dict(upload_url=upload_url, upload_command=upload_command), 200
......@@ -42,6 +42,7 @@ import os.path
import os
import shutil
from zipfile import ZipFile, BadZipFile, is_zipfile
import tarfile
from bagit import make_bag
import hashlib
import base64
......@@ -118,7 +119,7 @@ class DirectoryObject(PathObject):
return os.path.isdir(self.os_path)
class MetadataTimeout(Exception):
class ExtractError(Exception):
pass
......@@ -369,13 +370,15 @@ class StagingUploadFiles(UploadFiles):
def archive_log_file_object(self, calc_id: str) -> PathObject:
return self._archive_dir.join_file('%s.log' % calc_id)
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None) -> None:
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
"""
Add rawfiles to the upload. The given file will be copied, moved, or extracted.
Arguments:
path: Path to a directory, file, or zip file. Zip files will be extracted.
move: Whether the file should be moved instead of copied. Zips will be extracted and then deleted.
prefix: Optional path prefix for the added files.
force_archive: Expect the file to be a zip or other support archive file.
Usually those files are only extracted if they can be extracted and copied instead.
"""
assert not self.is_frozen
assert os.path.exists(path)
......@@ -384,7 +387,7 @@ class StagingUploadFiles(UploadFiles):
if prefix is not None:
target_dir = target_dir.join_dir(prefix, create=True)
ext = os.path.splitext(path)[1]
if ext == '.zip':
if force_archive or ext == '.zip':
try:
with ZipFile(path) as zf:
zf.extractall(target_dir.os_path)
......@@ -394,6 +397,19 @@ class StagingUploadFiles(UploadFiles):
except BadZipFile:
pass
if force_archive or ext in ['.tgz', '.tar.gz', '.tar.bz2']:
try:
with tarfile.open(path) as tf:
tf.extractall(target_dir.os_path)
if move:
os.remove(path)
return
except tarfile.TarError:
pass
if force_archive:
raise ExtractError
if move:
shutil.move(path, target_dir.os_path)
else:
......@@ -566,10 +582,12 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles):
formats = ['zip']
""" A human readable list of supported file formats. """
def __init__(self, upload_id: str, local_path: str = None, *args, **kwargs) -> None:
def __init__(
self, upload_id: str, local_path: str = None, file_name: str = '.upload',
*args, **kwargs) -> None:
super().__init__(upload_id, *args, **kwargs)
self._local_path = local_path
self._upload_file = self.join_file('.upload.zip')
self._upload_file = self.join_file(file_name)
@property
def upload_file_os_path(self):
......@@ -589,9 +607,9 @@ class ArchiveBasedStagingUploadFiles(StagingUploadFiles):
def extract(self) -> None:
assert next(self.raw_file_manifest(), None) is None, 'can only extract once'
super().add_rawfiles(self.upload_file_os_path)
super().add_rawfiles(self.upload_file_os_path, force_archive=True)
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None) -> None:
def add_rawfiles(self, path: str, move: bool = False, prefix: str = None, force_archive: bool = False) -> None:
assert False, 'do not add_rawfiles to a %s' % self.__class__.__name__
......
......@@ -31,7 +31,7 @@ from structlog import wrap_logger
from contextlib import contextmanager
from nomad import utils, coe_repo, datamodel
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles
from nomad.files import PathObject, ArchiveBasedStagingUploadFiles, ExtractError
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parsers, parser_dict
from nomad.normalizing import normalizers
......@@ -390,7 +390,10 @@ class Upload(Chord, datamodel.Upload):
upload_size=self.upload_files.size):
self.upload_files.extract()
except KeyError:
self.fail('process request for non existing upload', level=logging.ERROR)
self.fail('process request for non existing upload', log_level=logging.ERROR)
return
except ExtractError:
self.fail('bad .zip/.tar file', log_level=logging.INFO)
return
def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment