Commit ffecdabd authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Switch to new files implemenation. Not all tests working.

parent 49851e8c
......@@ -44,7 +44,7 @@
"cwd": "${workspaceFolder}",
"program": "${workspaceFolder}/.pyenv/bin/pytest",
"args": [
"-sv", "tests/test_uploads.py::TestPublicUploadFiles::test_rawfile[Ppr]"
"-sv", "tests/processing/test_data.py::test_processing[tests/data/proc/examples_template.zip]"
]
},
{
......
......@@ -25,7 +25,7 @@ from flask_restplus import abort, Resource
import nomad_meta_info
from nomad import config
from nomad.files import ArchiveFile, ArchiveLogFile
from nomad.uploads import UploadFiles
from nomad.utils import get_logger
from .app import api
......@@ -52,17 +52,13 @@ class ArchiveCalcLogResource(Resource):
archive_id = '%s/%s' % (upload_hash, calc_hash)
try:
archive = ArchiveLogFile(archive_id)
if not archive.exists():
raise FileNotFoundError()
archive_path = archive.os_path
rv = send_file(
archive_path,
mimetype='text/plain',
as_attachment=True,
attachment_filename=os.path.basename(archive_path))
upload_files = UploadFiles.get(upload_hash)
with upload_files.archive_log_file(calc_hash, 'rt') as f:
rv = send_file(
f,
mimetype='text/plain',
as_attachment=True,
attachment_filename='%s.log' % archive_id)
return rv
except FileNotFoundError:
......@@ -90,23 +86,20 @@ class ArchiveCalcResource(Resource):
archive_id = '%s/%s' % (upload_hash, calc_hash)
try:
archive = ArchiveFile(archive_id)
if not archive.exists():
raise FileNotFoundError()
archive_path = archive.os_path
rv = send_file(
archive_path,
mimetype='application/json',
as_attachment=True,
attachment_filename=os.path.basename(archive_path))
upload_file = UploadFiles.get(upload_hash)
mode = 'rb' if config.files.compress_archive else 'rt'
with upload_file.archive_file(calc_hash, mode) as f:
rv = send_file(
f,
mimetype='application/json',
as_attachment=True,
attachment_filename='%s.json' % archive_id)
if config.files.compress_archive:
rv.headers['Content-Encoding'] = 'gzip'
return rv
except FileNotFoundError:
except KeyError:
abort(404, message='Archive %s does not exist.' % archive_id)
except Exception as e:
logger = get_logger(
......
......@@ -26,8 +26,8 @@ from flask import Response, request, send_file
from flask_restplus import abort, Resource, fields
from werkzeug.exceptions import HTTPException
from nomad.files import RepositoryFile
from nomad.utils import get_logger
from nomad.uploads import UploadFiles
from .app import api
from .auth import login_if_available
......@@ -69,15 +69,14 @@ class RawFileFromPathResource(Resource):
"""
upload_filepath = fix_file_paths(path)
repository_file = RepositoryFile(upload_hash)
if not repository_file.exists():
try:
upload_files = UploadFiles.get(upload_hash)
except KeyError:
abort(404, message='The upload with hash %s does not exist.' % upload_hash)
if upload_filepath[-1:] == '*':
upload_filepath = upload_filepath[0:-1]
files = list(
file for file in repository_file.manifest
if file.startswith(upload_filepath))
files = list(upload_files.raw_file_manifest(path_prefix=upload_filepath))
if len(files) == 0:
abort(404, message='There are no files for %s.' % upload_filepath)
else:
......@@ -85,8 +84,7 @@ class RawFileFromPathResource(Resource):
return respond_to_get_raw_files(upload_hash, files, compress)
try:
the_file = repository_file.get_file(upload_filepath)
with the_file.open() as f:
with upload_files.raw_file(upload_filepath) as f:
rv = send_file(
f,
mimetype='application/octet-stream',
......@@ -94,7 +92,7 @@ class RawFileFromPathResource(Resource):
attachment_filename=os.path.basename(upload_filepath))
return rv
except KeyError:
files = list(file for file in repository_file.manifest if file.startswith(upload_filepath))
files = list(file for file in upload_files.raw_file_manifest(upload_filepath))
if len(files) == 0:
abort(404, message='The file %s does not exist.' % upload_filepath)
else:
......@@ -161,8 +159,9 @@ class RawFilesResource(Resource):
def respond_to_get_raw_files(upload_hash, files, compress=False):
logger = get_logger(__name__, endpoint='raw', action='get files', upload_hash=upload_hash)
repository_file = RepositoryFile(upload_hash)
if not repository_file.exists():
try:
upload_file = UploadFiles.get(upload_hash)
except KeyError:
abort(404, message='The upload with hash %s does not exist.' % upload_hash)
def generator():
......@@ -170,22 +169,21 @@ def respond_to_get_raw_files(upload_hash, files, compress=False):
def iterator():
""" Replace the directory based iter of zipstream with an iter over all given files. """
try:
with repository_file.zipped_container.zip_file() as zf:
for filename in files:
# Write a file to the zipstream.
try:
with zf.open(repository_file.zipped_container.get_zip_path(filename)) as f:
def iter_content():
while True:
data = f.read(100000)
if not data:
break
yield data
yield dict(arcname=filename, iterable=iter_content())
except KeyError as e:
# files that are not found, will not be returned
pass
for filename in files:
# Write a file to the zipstream.
try:
with upload_file.raw_file(filename) as f:
def iter_content():
while True:
data = f.read(100000)
if not data:
break
yield data
yield dict(arcname=filename, iterable=iter_content())
except KeyError as e:
# files that are not found, will not be returned
pass
except Exception as e:
logger.error('Exception while accessing files.', exc_info=e)
......
......@@ -13,8 +13,8 @@
# limitations under the License.
"""
The upload API of the nomad@FAIRDI APIs. Provides endpoints to create uploads, upload
files, and retrieve the processing status of uploads.
The upload API of the nomad@FAIRDI APIs. Provides endpoints to upload files and
get the processing status of uploads.
"""
from flask import g, request
......@@ -27,7 +27,7 @@ from nomad import config
from nomad.processing import Upload
from nomad.processing import NotAllowedDuringProcessing
from nomad.utils import get_logger
from nomad.files import UploadFile
from nomad.uploads import ArchiveBasedStagingUploadFiles
from .app import api
from .auth import login_really_required
......@@ -163,9 +163,11 @@ class UploadListResource(Resource):
logger = get_logger(__name__, endpoint='upload', action='put', upload_id=upload.upload_id)
logger.info('upload created')
uploadFile = UploadFile(upload.upload_id, local_path=local_path)
upload_files = ArchiveBasedStagingUploadFiles(
upload.upload_id, create=True, local_path=local_path)
if local_path:
# file is already there and does not to be received
pass
elif request.mimetype == 'application/multipart-formdata':
# multipart formdata, e.g. with curl -X put "url" -F file=@local_file
......@@ -176,11 +178,11 @@ class UploadListResource(Resource):
if upload.name is '':
upload.name = file.filename
file.save(uploadFile.os_path)
file.save(upload_files.upload_file_os_path)
else:
# simple streaming data in HTTP body, e.g. with curl "url" -T local_file
try:
with uploadFile.open('wb') as f:
with open(upload_files.upload_file_os_path, 'wb') as f:
while not request.stream.is_exhausted:
f.write(request.stream.read(1024))
......@@ -188,10 +190,10 @@ class UploadListResource(Resource):
logger.error('Error on streaming upload', exc_info=e)
abort(400, message='Some IO went wrong, download probably aborted/disrupted.')
if not uploadFile.is_valid:
uploadFile.delete()
if not upload_files.is_valid:
upload_files.delete()
upload.delete()
abort(400, message='Bad file format, excpected %s.' % ", ".join(UploadFile.formats))
abort(400, message='Bad file format, excpected %s.' % ", ".join(upload_files.formats))
logger.info('received uploaded file')
upload.upload_time = datetime.now()
......
......@@ -16,7 +16,7 @@
This module contains classes that allow to represent the core
nomad data entities :class:`Upload` and :class:`Calc` on a high level of abstraction
independent from their representation in the different modules :py:mod:`nomad.repo`,
:py:mod:`nomad.processing`, :py:mod:`nomad.coe_repo`, :py:mod:`nomad.files`.
:py:mod:`nomad.processing`, :py:mod:`nomad.coe_repo`, :py:mod:`nomad.uploads`.
It is not about representing every detail, but those parts that are directly involved in
api, processing, migration, mirroring, or other 'infrastructure' operations.
"""
......
......@@ -32,7 +32,7 @@ from structlog import wrap_logger
from contextlib import contextmanager
from nomad import utils, coe_repo, datamodel
from nomad.files import UploadFile, ArchiveFile, ArchiveLogFile, File
from nomad.uploads import PathObject, ArchiveBasedStagingUploadFiles
from nomad.repo import RepoCalc, RepoUpload
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import parsers, parser_dict
......@@ -58,13 +58,11 @@ class Calc(Proc, datamodel.Calc):
parser: the name of the parser used to process this calc
upload_id: the id of the upload used to create this calculation
mainfile: the mainfile (including path in upload) that was used to create this calc
mainfile_tmp_path: path to the mainfile extracted for processing
"""
archive_id = StringField(primary_key=True)
upload_id = StringField()
mainfile = StringField()
parser = StringField()
mainfile_tmp_path = StringField()
meta: Any = {
'indices': [
......@@ -75,9 +73,9 @@ class Calc(Proc, datamodel.Calc):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._parser_backend = None
self._upload = None
self._upload: Upload = None
self._upload_files: ArchiveBasedStagingUploadFiles = None
self._calc_proc_logwriter = None
self._calc_proc_logfile = None
self._calc_proc_logwriter_ctx: ContextManager = None
@classmethod
......@@ -85,8 +83,8 @@ class Calc(Proc, datamodel.Calc):
return cls.get_by_id(id, 'archive_id')
@property
def mainfile_file(self) -> File:
return File(self.mainfile_tmp_path)
def mainfile_file(self) -> PathObject:
return self.upload_files.raw_file_object(self.mainfile)
@property
def calc_hash(self) -> str:
......@@ -98,15 +96,24 @@ class Calc(Proc, datamodel.Calc):
self._upload = Upload.get(self.upload_id)
return self._upload
@property
def upload_files(self) -> ArchiveBasedStagingUploadFiles:
if not self._upload_files:
self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, public_only=False)
return self._upload_files
@property
def upload_hash(self):
return utils.archive.upload_hash(self.archive_id)
def delete(self):
"""
Delete this calculation and all associated data. This includes all files,
the archive, and this search index entry.
TODO is this needed? Or do we always delete hole uploads in bulk.
"""
# delete the archive
if self.archive_id is not None:
ArchiveFile(self.archive_id).delete()
# delete all files
self.upload_files.delete()
# delete the search index entry
try:
......@@ -120,11 +127,10 @@ class Calc(Proc, datamodel.Calc):
super().delete()
def get_logger(self, **kwargs):
upload_hash, calc_hash = self.archive_id.split('/')
logger = super().get_logger()
logger = logger.bind(
upload_id=self.upload_id, mainfile=self.mainfile,
upload_hash=upload_hash, calc_hash=calc_hash,
upload_hash=self.upload_hash, calc_hash=self.calc_hash,
archive_id=self.archive_id, **kwargs)
return logger
......@@ -137,8 +143,7 @@ class Calc(Proc, datamodel.Calc):
logger = self.get_logger(**kwargs)
if self._calc_proc_logwriter is None:
self._calc_proc_logfile = ArchiveLogFile(self.archive_id)
self._calc_proc_logwriter_ctx = self._calc_proc_logfile.open('wt')
self._calc_proc_logwriter_ctx = self.upload_files.archive_log_file(self.calc_hash, 'wt')
self._calc_proc_logwriter = self._calc_proc_logwriter_ctx.__enter__() # pylint: disable=E1101
def save_to_calc_log(logger, method_name, event_dict):
......@@ -184,7 +189,8 @@ class Calc(Proc, datamodel.Calc):
parser = parser_dict[self.parser]
with utils.timer(logger, 'parser executed', input_size=self.mainfile_file.size):
self._parser_backend = parser.run(self.mainfile_tmp_path, logger=logger)
self._parser_backend = parser.run(
self.upload_files.raw_file_object(self.mainfile).os_path, logger=logger)
self._parser_backend.openNonOverlappingSection('section_calculation_info')
self._parser_backend.addValue('upload_id', self.upload_id)
......@@ -263,7 +269,7 @@ class Calc(Proc, datamodel.Calc):
staging=True,
restricted=False,
user_id=self.upload.user_id,
aux_files=list(self.upload.upload_file.get_siblings(self.mainfile)))
aux_files=list(self.upload_files.calc_files(self.mainfile, with_mainfile=False)))
with utils.timer(logger, 'indexed', step='index'):
# persist to elastic search
......@@ -280,11 +286,10 @@ class Calc(Proc, datamodel.Calc):
input_size=self.mainfile_file.size) as log_data:
# persist the archive
archive_file = ArchiveFile(self.archive_id)
with archive_file.write_archive_json() as out:
with self.upload_files.archive_file(self.calc_hash, 'wt') as out:
self._parser_backend.write_json(out, pretty=True)
log_data.update(archive_size=archive_file.size)
log_data.update(archive_size=self.upload_files.archive_file_object(self.calc_hash).size)
# close loghandler
if self._calc_proc_logwriter is not None:
......@@ -294,7 +299,7 @@ class Calc(Proc, datamodel.Calc):
self._calc_proc_logwriter_ctx.__exit__(None, None, None) # pylint: disable=E1101
self._calc_proc_logwriter = None
log_data.update(log_size=self._calc_proc_logfile.size)
log_data.update(log_size=self.upload_files.archive_log_file_object(self.calc_hash).size)
class Upload(Chord, datamodel.Upload):
......@@ -341,7 +346,7 @@ class Upload(Chord, datamodel.Upload):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._upload_file = None
self._upload_files: ArchiveBasedStagingUploadFiles = None
@classmethod
def get(cls, id):
......@@ -367,28 +372,17 @@ class Upload(Chord, datamodel.Upload):
if not (self.completed or self.current_task == 'uploading'):
raise NotAllowedDuringProcessing()
with lnr(logger, 'delete upload file'):
try:
UploadFile(self.upload_id, local_path=self.local_path).delete()
except KeyError:
if self.current_task == 'uploading':
logger.debug(
'Upload exist, but file does not exist. '
'It was probably aborted and deleted.')
else:
logger.debug('Upload exist, but uploaded file does not exist.')
with lnr(logger, 'deleting calcs'):
# delete archive files
ArchiveFile.delete_archives(upload_hash=self.upload_hash)
with lnr(logger, 'delete all files of upload'):
self.upload_files.delete()
with lnr(logger, 'deleting calcs db entries'):
# delete repo entries
self.to(RepoUpload).delete()
# delete calc processings
Calc.objects(upload_id=self.upload_id).delete()
with lnr(logger, 'deleting upload'):
with lnr(logger, 'deleting upload db entry'):
super().delete()
@classmethod
......@@ -433,11 +427,10 @@ class Upload(Chord, datamodel.Upload):
pass
@property
def upload_file(self):
""" The :class:`UploadFile` instance that represents the uploaded file of this upload. """
if not self._upload_file:
self._upload_file = UploadFile(self.upload_id, local_path=self.local_path)
return self._upload_file
def upload_files(self) -> ArchiveBasedStagingUploadFiles:
if not self._upload_files:
self._upload_files = ArchiveBasedStagingUploadFiles(self.upload_id, public_only=False)
return self._upload_files
@task
def extracting(self):
......@@ -451,15 +444,15 @@ class Upload(Chord, datamodel.Upload):
try:
with utils.timer(
logger, 'upload extracted', step='extracting',
upload_size=self.upload_file.size):
self.upload_file.extract()
upload_size=self.upload_files.size):
self.upload_files.extract()
except KeyError as e:
self.fail('process request for non existing upload', level=logging.INFO)
self.fail('process request for non existing upload', level=logging.ERROR)
return
# create and save a hash for the upload
try:
self.upload_hash = self.upload_file.upload_hash()
self.upload_hash = self.upload_id # TODO self.upload_file.upload_hash()
except Exception as e:
self.fail('could not create upload hash', e)
return
......@@ -469,7 +462,7 @@ class Upload(Chord, datamodel.Upload):
self.fail('The same file was already uploaded and processed.', level=logging.INFO)
return
def match_mainfiles(self) -> Generator[Tuple[File, str, object], None, None]:
def match_mainfiles(self) -> Generator[Tuple[str, object], None, None]:
"""
Generator function that matches all files in the upload to all parsers to
determine the upload's mainfiles.
......@@ -477,13 +470,12 @@ class Upload(Chord, datamodel.Upload):
Returns:
Tuples of mainfile, filename, and parsers
"""
for filename in self.upload_file.filelist:
potential_mainfile = self.upload_file.get_file(filename)
for filename in self.upload_files.raw_file_manifest():
for parser in parsers:
try:
with potential_mainfile.open('r') as mainfile_f:
with self.upload_files.raw_file(filename) as mainfile_f:
if parser.is_mainfile(filename, lambda fn: mainfile_f):
yield potential_mainfile, filename, parser
yield filename, parser
except Exception as e:
self.get_logger().error(
'exception while matching pot. mainfile',
......@@ -500,14 +492,12 @@ class Upload(Chord, datamodel.Upload):
# TODO: deal with multiple possible parser specs
with utils.timer(
logger, 'upload extracted', step='matching',
upload_size=self.upload_file.size,
upload_filecount=len(self.upload_file.filelist)):
upload_size=self.upload_files.size):
total_calcs = 0
for mainfile, filename, parser in self.match_mainfiles():
for filename, parser in self.match_mainfiles():
calc = Calc.create(
archive_id='%s/%s' % (self.upload_hash, utils.hash(filename)),
mainfile=filename, parser=parser.name,
mainfile_tmp_path=mainfile.os_path,
upload_id=self.upload_id)
calc.process()
......@@ -521,22 +511,13 @@ class Upload(Chord, datamodel.Upload):
@task
def cleanup(self):
try:
upload = UploadFile(self.upload_id, local_path=self.local_path)
with utils.timer(
self.get_logger(), 'upload persisted', step='cleaning',
upload_size=upload.size):
upload.persist()
with utils.timer(
self.get_logger(), 'processing cleaned up', step='cleaning',
upload_size=upload.size):
upload.remove_extract()
except KeyError as e:
self.fail('Upload does not exist', exc_info=e)
return
self.get_logger().debug('closed upload')
# TODO issue #83
with utils.timer(
self.get_logger(), 'pack staging upload', step='cleaning',
upload_size=self.upload_files.size):
pass
# self.upload_files.pack()
# self.upload_files.delete()
@property
def processed_calcs(self):
......
......@@ -43,10 +43,11 @@ import ujson
import os.path
import os
import shutil
from zipfile import ZipFile, BadZipFile
from zipfile import ZipFile, BadZipFile, is_zipfile
from bagit import make_bag
import contextlib
import hashlib
import io
from nomad import config, utils
......@@ -79,6 +80,11 @@ class PathObject:
def exists(self) -> bool:
return os.path.exists(self.os_path)
@property
def size(self) -> int:
""" Returns the os determined file size. """
return os.stat(self.os_path).st_size
def __repr__(self) -> str:
return self.os_path
......@@ -290,38 +296,79 @@ class Restricted(Exception):
pass
class UploadFiles(metaclass=ABCMeta):
def __init__(self, upload_id: str, public_only: bool = True, archive_ext: str = 'json') -> None:
class UploadFiles(DirectoryObject, metaclass=ABCMeta):
def __init__(
self, bucket: str, upload_id: str, public_only: bool = True,
create: bool = False,
archive_ext: str = 'json.gz' if config.files.compress_archive else 'json') -> None:
self.logger = utils.get_logger(__name__, upload_id=upload_id)
super().__init__(bucket, upload_id, create=create, prefix=True)
if not create and not self.exists():
raise KeyError()
self.upload_id = upload_id
self.public_only = public_only
self._archive_ext = archive_ext
@staticmethod
def get(upload_id: str, *args, **kwargs) -> 'UploadFiles':
if DirectoryObject(config.files.staging_bucket, upload_id, prefix=True).exists():
return StagingUploadFiles(upload_id, *args, **kwargs)
elif DirectoryObject(config.files.public_bucket, upload_id, prefix=True).exists():
return PublicUploadFiles(upload_id, *args, **kwargs)
else:
return None
@property
def metadata(self) -> Metadata:
""" The calc metadata for this upload. """
raise NotImplementedError
@contextlib.contextmanager
def raw_file(self, file_path: str, read: bool = True) -> Generator[IO, None, None]:
def raw_file(self, file_path: str, *args, **kwargs) -> Generator[IO, None, None]:
"""
Opens a raw file and returns a file-like objects.
Opens a raw file and returns a file-like objects. Additional args, kwargs are
delegated to the respective `open` call.
Arguments:
file_path: The path to the file relative to the upload.
read: Open for read or write. Default is True=read.
Raises:
KeyError: If the file does not exist.
Restricted: If the file is restricted and upload access evaluated to False.
"""
raise NotImplementedError()
def raw_file_manifest(self, path_prefix: str = None) -> Generator[str, None, None]:
"""
Returns the path for all raw files in the archive (with a given prefix).
Arguments:
path_prefix: An optional prefix; only returns those files that have the prefix.
Returns:
An iterable over all (matching) raw files.
"""
raise NotImplementedError()
@contextlib.contextmanager
def archive_file(self, calc_hash: str, *args, **kwargs) -> Generator[IO, None, None]:
"""