Commit 587c03ff authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Implemented api portion of non minio version.

parent ff2b7248
MINIO_HOST_PORT=9007
RABBITMQ_HOST_PORT=5672
REDIS_HOST_PORT=6379
ELASTIC_HOST_PORT=9200
......
MINIO_HOST_PORT=19000
RABBITMQ_HOST_PORT=15672
REDIS_HOST_PORT=16379
ELASTIC_HOST_PORT=19200
......
......@@ -15,43 +15,12 @@
version: '3.4'
x-common-variables: &nomad_backend_env
NOMAD_MINIO_PORT: 9000
NOMAD_MINIO_HOST: minio
NOMAD_RABBITMQ_HOST: rabbitmq
NOMAD_LOGSTASH_HOST: elk
NOMAD_ELASTIC_HOST: elastic
NOMAD_MONGO_HOST: mongo
services:
# an object store for uploads
minio:
restart: always
image: minio/minio:RELEASE.2018-06-08T03-49-38Z
# image: minio/minio
container_name: nomad_minio
ports:
- ${MINIO_HOST_PORT}:9000
volumes:
- ${VOLUME_BINDS}/minio:/data
environment:
- MINIO_ACCESS_KEY=AKIAIOSFODNN7EXAMPLE
- MINIO_SECRET_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
command: server /data
minio-config:
image: minio/mc
links:
- minio
entrypoint: /bin/sh -c "
sleep 5;
/usr/bin/mc config host add minio http://minio:9000 AKIAIOSFODNN7EXAMPLE wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY;
/usr/bin/mc mb minio/uploads -p;
/usr/bin/mc mb minio/archive -p;
/usr/bin/mc mb minio/repository -p;
/usr/bin/mc policy download minio/repository;
/usr/bin/mc policy download minio/archive;
/usr/bin/mc policy upload minio/uploads;" # bug #22
# broker for celery
rabbitmq:
restart: always
......@@ -115,7 +84,6 @@ services:
<<: *nomad_backend_env
NOMAD_SERVICE: nomad_worker
links:
- minio
- rabbitmq
- elastic
- mongo
......@@ -135,7 +103,6 @@ services:
<<: *nomad_backend_env
NOMAD_SERVICE: nomad_handler
links:
- minio
- rabbitmq
- elastic
- mongo
......@@ -152,12 +119,10 @@ services:
environment:
<<: *nomad_backend_env
NOMAD_API_BASE_PATH: /nomad/api
NOMAD_OBJECTS_HOST: ${EXTERNAL_HOST}
NOMAD_OBJECTS_PORT: ${EXTERNAL_PORT}
NOMAD_OBJECTS_BASE_PATH: /nomad/objects
NOMAD_API_HOST: ${EXTERNAL_HOST}
NOMAD_API_PORT: ${EXTERNAL_PORT}
NOMAD_SERVICE: nomad_api
links:
- minio
- rabbitmq
- elastic
- mongo
......@@ -187,7 +152,6 @@ services:
links:
- elk
- gui
- minio
- api
ports:
- ${APP_HOST_PORT}:80
......@@ -196,7 +160,6 @@ services:
command: nginx -g 'daemon off;'
volumes:
nomad_minio:
nomad_mongo:
nomad_elastic:
nomad_redis:
......
......@@ -12,17 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from flask import Flask, request, redirect, g, jsonify
from flask import Flask, request, g, jsonify, send_file
from flask_restful import Resource, Api, abort
from flask_cors import CORS
from flask_httpauth import HTTPBasicAuth
from elasticsearch.exceptions import NotFoundError
from datetime import datetime
import os.path
from nomad import config, files
from nomad import config
from nomad.files import UploadFile, ArchiveFile
from nomad.utils import get_logger, create_uuid
from nomad.processing import Upload, Calc, NotAllowedDuringProcessing, SUCCESS, FAILURE
from nomad.processing import Upload, NotAllowedDuringProcessing
from nomad.repo import RepoCalc
from nomad.user import User, me
from nomad.user import User
base_path = config.services.api_base_path
......@@ -382,6 +385,44 @@ class UploadRes(Resource):
abort(400, message='You must not delete an upload during processing.')
class UploadFileRes(Resource):
"""
Upload a file to an existing upload. Can be used to upload files via bowser
or other http clients like curl. This will start the processing of the upload.
.. :quickref: upload; Upload a file to an existing upload.
:param string upload_id: the upload_id of the upload
:resheader Content-Type: application/json
:status 200: upload successfully received.
:status 404: upload with given id does not exist
:returns: the upload (see GET /uploads/<upload_id>)
"""
def put(self, upload_id):
try:
upload = Upload.get(upload_id)
except KeyError:
abort(404, message='Upload with id %s does not exist.' % upload_id)
if 'file' not in request.files:
abort(400, message='No file part.')
file = request.files['file']
if file.filename != 'file':
abort(400, message='Wrong file name %s, expected "file" as name.' % (file.filename))
uploadFile = UploadFile(upload_id)
file.save(uploadFile.os_path)
logger = get_logger(__name__, endpoint='upload', action='put', upload_id=upload_id)
logger.debug('received uploaded file')
upload.upload_time = datetime.now()
upload.process()
logger.debug('initiated processing')
return upload.json_dict, 200
class RepoCalcRes(Resource):
def get(self, upload_hash, calc_hash):
"""
......@@ -507,8 +548,6 @@ class RepoCalcsRes(Resource):
:status 200: calcs successfully retrieved
:returns: a list of repository entries in ``results`` and pagination info
"""
logger = get_logger(__name__, endpoint='repo', action='get')
# TODO use argparse? bad request reponse an bad params, pagination as decorator
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
......@@ -568,16 +607,20 @@ def get_calc(upload_hash, calc_hash):
:status 404: calc with given hashes does not exist
:returns: the metainfo formated JSON data of the requested calculation
"""
logger = get_logger(__name__, endpoint='archive', action='get', upload_hash=upload_hash, calc_hash=calc_hash)
archive_id = '%s/%s' % (upload_hash, calc_hash)
try:
url = files.external_objects_url(files.archive_url(archive_id))
return redirect(url, 302)
archive = ArchiveFile(archive_id)
arhchive_path = archive.os_path
return send_file(arhchive_path, attachment_filename=os.path.basename(arhchive_path))
except KeyError:
abort(404, message='Archive %s does not exist.' % archive_id)
except FileNotFoundError:
abort(404, message='Archive %s does not exist.' % archive_id)
except Exception as e:
logger = get_logger(
__name__, endpoint='archive', action='get',
upload_hash=upload_hash, calc_hash=calc_hash)
logger.error('Exception on accessing archive', exc_info=e)
abort(500, message='Could not accessing the archive.')
......@@ -593,6 +636,7 @@ def call_admin_operation(operation):
api.add_resource(UploadsRes, '%s/uploads' % base_path)
api.add_resource(UploadRes, '%s/uploads/<string:upload_id>' % base_path)
api.add_resource(UploadFileRes, '%s/uploads/<string:upload_id>/file' % base_path)
api.add_resource(RepoCalcsRes, '%s/repo' % base_path)
api.add_resource(RepoCalcRes, '%s/repo/<string:upload_hash>/<string:calc_hash>' % base_path)
......
......@@ -40,14 +40,14 @@ MongoConfig = namedtuple('MongoConfig', ['host', 'port', 'users_db'])
LogstashConfig = namedtuple('LogstashConfig', ['enabled', 'host', 'tcp_port', 'level'])
""" Used to configure and enable/disable the ELK based centralized logging. """
NomadServicesConfig = namedtuple('NomadServicesConfig', ['api_base_path', 'api_secret'])
NomadServicesConfig = namedtuple('NomadServicesConfig', ['api_host', 'api_port', 'api_base_path', 'api_secret'])
""" Used to configure nomad services: worker, handler, api """
files = FilesConfig(
uploads_bucket='uploads',
repository_bucket='repository',
archive_bucket='archive',
compress_archive=False
compress_archive=True
)
rabbit_host = os.environ.get('NOMAD_RABBITMQ_HOST', 'localhost')
......@@ -83,6 +83,8 @@ logstash = LogstashConfig(
level=int(os.environ.get('NOMAD_LOGSTASH_LEVEL', logging.DEBUG))
)
services = NomadServicesConfig(
api_host=os.environ.get('NOMAD_API_HOST', 'localhost'),
api_port=int(os.environ.get('NOMAD_API_PORT', 8000)),
api_base_path=os.environ.get('NOMAD_API_BASE_PATH', '/nomad/api'),
api_secret='the quick fox jumps over something'
)
......@@ -39,7 +39,7 @@ Uploads
:members:
"""
from typing import Callable, List, Any, Generator, IO, TextIO, cast, AnyStr
from typing import List, Any, Generator, IO, TextIO, cast
import os
import os.path
from zipfile import ZipFile, BadZipFile
......@@ -51,95 +51,6 @@ import shutil
from nomad import config, utils
# _client = None
# if _client is None and 'sphinx' not in sys.modules:
# _client = Minio('%s:%s' % (config.minio.host, config.minio.port),
# access_key=config.minio.accesskey,
# secret_key=config.minio.secret,
# secure=False)
# # ensure all neccessary buckets exist
# def ensure_bucket(name):
# try:
# _client.make_bucket(bucket_name=name)
# logger.info('Created uploads bucket', bucket=name)
# except minio.error.BucketAlreadyOwnedByYou:
# pass
# ensure_bucket(config.files.uploads_bucket)
# ensure_bucket(config.files.archive_bucket)
# def get_presigned_upload_url(upload_id: str) -> str:
# """
# Generates a presigned upload URL. Presigned URL allows users (and their client programs)
# to safely *PUT* a single file without further authorization or API to the *uploads* bucket
# using the given ``upload_id``. Example usages for presigned URLs include
# browser based uploads or simple *curl* commands (see also :func:`create_curl_upload_cmd`).
# Arguments:
# upload_id: The upload id for the uploaded file.
# Returns:
# The presigned URL string.
# """
# return _client.presigned_put_object(config.files.uploads_bucket, upload_id)
# def create_curl_upload_cmd(presigned_url: str, file_dummy: str='<ZIPFILE>') -> str:
# """Creates a readymade curl command for uploading.
# Arguments:
# presigned_url: The presigned URL to base the command on.
# Kwargs:
# file_dummy: A placeholder for the file that the user/client has to replace.
# Returns:
# The curl shell command with correct method, url, headers, etc.
# """
# return 'curl "%s" --upload-file %s' % (presigned_url, file_dummy)
# def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]:
# def upload_notifications(events: List[Any]) -> Generator[str, None, None]:
# for event in events:
# for event_record in event['Records']:
# try:
# event_name = event_record['eventName']
# if event_name == 's3:ObjectCreated:Put':
# upload_id = event_record['s3']['object']['key']
# logger.debug('Received bucket upload event', upload_id=upload_id)
# yield upload_id
# break # only one per record, pls
# else:
# logger.debug('Unhanled bucket event', bucket_event_name=event_name)
# except KeyError:
# logger.warning(
# 'Unhandled bucket event due to unexprected event format',
# bucket_event_record=event_record)
# def wrapper(*args, **kwargs) -> None:
# logger.info('Start listening to uploads notifications.')
# _client.remove_all_bucket_notification(config.files.uploads_bucket)
# events = _client.listen_bucket_notification(
# config.files.uploads_bucket,
# events=['s3:ObjectCreated:*'])
# upload_ids = upload_notifications(events)
# for upload_id in upload_ids:
# try:
# func(upload_id)
# except StopIteration:
# # Using StopIteration to allow clients to stop handling of events.
# logger.debug('Handling of upload notifications was stopped via StopIteration.')
# return
# except Exception:
# pass
# return wrapper
class Objects:
"""
......@@ -160,7 +71,7 @@ class Objects:
if not os.path.isdir(directory):
os.makedirs(directory)
return path
return os.path.abspath(path)
@classmethod
def open(cls, bucket: str, name: str, ext: str=None, *args, **kwargs) -> IO:
......
......@@ -37,7 +37,8 @@ from mongoengine import \
import mongoengine.errors
import logging
from nomad import config, files, utils
from nomad import config, utils
from nomad.files import UploadFile, ArchiveFile, FileError
from nomad.repo import RepoCalc
from nomad.user import User, me
from nomad.processing.base import Proc, Chord, process, task, PENDING, SUCCESS, FAILURE, RUNNING
......@@ -95,7 +96,7 @@ class Calc(Proc):
"""
# delete the archive
if self.archive_id is not None:
files.delete_archive(self.archive_id)
ArchiveFile(self.archive_id).delete()
# delete the search index entry
try:
......@@ -178,7 +179,7 @@ class Calc(Proc):
upload_id=self.upload_id)
# persist the archive
with files.write_archive_json(self.archive_id) as out:
with ArchiveFile(self.archive_id).write_archive_json() as out:
self._parser_backend.write_json(out, pretty=True)
......@@ -249,7 +250,7 @@ class Upload(Chord):
with lnr(logger, 'delete upload file'):
try:
files.Upload(self.upload_id).delete()
UploadFile(self.upload_id).delete()
except KeyError:
if self.current_task == 'uploading':
logger.debug(
......@@ -260,7 +261,7 @@ class Upload(Chord):
with lnr(logger, 'deleting calcs'):
# delete archive files
files.delete_archives(upload_hash=self.upload_hash)
ArchiveFile.delete_archives(upload_hash=self.upload_hash)
# delete repo entries
RepoCalc.delete_upload(upload_id=self.upload_id)
......@@ -273,16 +274,10 @@ class Upload(Chord):
@classmethod
def _external_objects_url(cls, url):
""" Replaces the given internal object storage url (minio) with an URL that allows
""" Replaces the given internal object storage url with an URL that allows
external access.
"""
port_with_colon = ''
if config.services.objects_port > 0:
port_with_colon = ':%d' % config.services.objects_port
return url.replace(
'%s:%s' % (config.minio.host, config.minio.port),
'%s%s%s' % (config.services.objects_host, port_with_colon, config.services.objects_base_path))
return 'http://%s:%s%s%s' % (config.services.api_host, config.services.api_port, config.services.api_base_path, url)
@classmethod
def create(cls, **kwargs) -> 'Upload':
......@@ -292,8 +287,8 @@ class Upload(Chord):
The upload will be already saved to the database.
"""
self = super().create(**kwargs)
self.presigned_url = cls._external_objects_url(files.get_presigned_upload_url(self.upload_id))
self.upload_command = files.create_curl_upload_cmd(self.presigned_url, 'your_file')
self.presigned_url = cls._external_objects_url('/uploads/%s/file' % self.upload_id)
self.upload_command = 'curl "%s" --upload-file your_file' % self.presigned_url
self._continue_with('uploading')
return self
......@@ -338,8 +333,8 @@ class Upload(Chord):
def extracting(self):
logger = self.get_logger()
try:
self._upload = files.Upload(self.upload_id)
self._upload.open()
self._upload = UploadFile(self.upload_id)
self._upload.extract()
logger.debug('opened upload')
except KeyError as e:
self.fail('process request for non existing upload', level=logging.INFO)
......@@ -347,7 +342,7 @@ class Upload(Chord):
try:
self.upload_hash = self._upload.hash()
except files.UploadError as e:
except Exception as e:
self.fail('could not create upload hash', e)
return
......@@ -386,12 +381,12 @@ class Upload(Chord):
@task
def cleanup(self):
try:
upload = files.Upload(self.upload_id)
upload = UploadFile(self.upload_id)
except KeyError as e:
upload_proc.fail('Upload does not exist', exc_info=e)
return
upload.close()
upload.remove_extract()
self.get_logger().debug('closed upload')
@property
......
......@@ -21,6 +21,31 @@ from nomad.processing.data import Upload
from nomad.utils import get_logger, lnr
def initiate_processing(upload_id: str):
logger = get_logger(__name__, upload_id=upload_id)
logger.debug('Initiate upload processing')
try:
with lnr(logger, 'Could not load'):
try:
upload = Upload.get(upload_id)
except KeyError as e:
logger.error('Upload does not exist')
raise e
if upload.upload_time is not None:
logger.warn('Ignore upload notification, since file is already uploaded')
raise StopIteration
with lnr(logger, 'Save upload time'):
upload.upload_time = datetime.now()
with lnr(logger, 'Start processing'):
upload.process()
except Exception as e:
logger.error('Exception while handling upload put notification.', exc_info=e)
def handle_uploads(ready=None, quit=False):
"""
Starts a daemon that will listen to files for new uploads. For each new
......@@ -35,28 +60,7 @@ def handle_uploads(ready=None, quit=False):
@files.upload_put_handler
def handle_upload_put(received_upload_id: str):
logger = get_logger(__name__, upload_id=received_upload_id)
logger.debug('Initiate upload processing')
try:
with lnr(logger, 'Could not load'):
try:
upload = Upload.get(received_upload_id)
except KeyError as e:
logger.error('Upload does not exist')
raise e
if upload.upload_time is not None:
logger.warn('Ignore upload notification, since file is already uploaded')
raise StopIteration
with lnr(logger, 'Save upload time'):
upload.upload_time = datetime.now()
with lnr(logger, 'Start processing'):
upload.process()
except Exception as e:
logger.error('Exception while handling upload put notification.', exc_info=e)
initiate_processing(received_upload_id)
if quit:
raise StopIteration
......
import pytest
from threading import Thread
import subprocess
import shlex
import time
import json
from mongoengine import connect
......@@ -15,21 +12,21 @@ services_config = config.services._asdict()
services_config.update(api_base_path='')
config.services = config.NomadServicesConfig(**services_config)
from nomad import api, files # noqa
from nomad.processing import Upload, handle_uploads_thread # noqa
from nomad import api # noqa
from nomad.files import UploadFile # noqa
from nomad.processing import Upload # noqa
from tests.processing.test_data import example_files # noqa
from tests.test_files import assert_exists # noqa
# import fixtures
from tests.test_files import clear_files, archive_id # noqa pylint: disable=unused-import
from tests.test_files import clear_files, archive, archive_config # noqa pylint: disable=unused-import
from tests.test_normalizing import normalized_template_example # noqa pylint: disable=unused-import
from tests.test_parsing import parsed_template_example # noqa pylint: disable=unused-import
from tests.test_repo import example_elastic_calc # noqa pylint: disable=unused-import
@pytest.fixture(scope='function')
def client():
def client(mockmongo):
disconnect()
connect('users_test', host=config.mongo.host, port=config.mongo.port, is_mock=True)
......@@ -148,22 +145,24 @@ def test_delete_empty_upload(client, test_user_auth, no_warn):
@pytest.mark.parametrize("file", example_files)
@pytest.mark.timeout(10)
def test_processing(client, file, worker, mocksearch, test_user_auth, no_warn):
handler = handle_uploads_thread(quit=True)
rv = client.post('/uploads', headers=test_user_auth)
assert rv.status_code == 200
upload = assert_upload(rv.data)
upload_id = upload['upload_id']
upload_url = upload['presigned_url']
cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file)
subprocess.call(shlex.split(cmd))
upload_endpoint = '/uploads/%s' % upload_id
upload_file_endpoint = '%s/file' % upload_endpoint
handler.join()
assert upload_url.endswith(upload_file_endpoint)
rv = client.put(upload_file_endpoint, data=dict(file=(open(file, 'rb'), 'file')))
assert rv.status_code == 200
upload = assert_upload(rv.data)
while True:
time.sleep(0.1)
rv = client.get('/uploads/%s' % upload['upload_id'], headers=test_user_auth)
rv = client.get(upload_endpoint, headers=test_user_auth)
assert rv.status_code == 200
upload = assert_upload(rv.data)
assert 'upload_time' in upload
......@@ -178,16 +177,16 @@ def test_processing(client, file, worker, mocksearch, test_user_auth, no_warn):
assert calc['status'] == 'SUCCESS'
assert calc['current_task'] == 'archiving'
assert len(calc['tasks']) == 3
assert_exists(config.files.uploads_bucket, upload['upload_id'])
assert UploadFile(upload['upload_id']).exists()
if upload['calcs']['pagination']['total'] > 1:
rv = client.get('/uploads/%s?page=2&per_page=1&order_by=status' % upload['upload_id'])
rv = client.get('%s?page=2&per_page=1&order_by=status' % upload_endpoint)
assert rv.status_code == 200
upload = assert_upload(rv.data)
assert len(upload['calcs']['results']) == 1
rv = client.post(
'/uploads/%s' % upload['upload_id'],
upload_endpoint,
headers=test_user_auth,
data=json.dumps(dict(operation='unstage')),
content_type='application/json')
......@@ -252,9 +251,9 @@ def test_repo_calcs_user_invisible(client, example_elastic_calc, test_other_user
assert len(results) == 0
def test_get_archive(client, archive_id, no_warn):
rv = client.get('/archive/%s' % archive_id)
assert rv.status_code == 302
def test_get_archive(client, archive, no_warn):
rv = client.get('/archive/%s' % archive.object_id)
assert rv.status_code == 200
def test_get_non_existing_archive(client, no_warn):
......
......@@ -69,22 +69,26 @@ class TestObjects:
assert not Objects.exists(example_bucket, name, ext)