Commit 21a451f4 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added processing information to API. Bugfixes.

parent 6dcdf36b
......@@ -2,18 +2,16 @@ from flask import Flask
from flask_restful import Resource, Api, abort
from datetime import datetime
from threading import Thread
import logging
import mongoengine.errors
from nomad import users, files, processing
logger = logging.getLogger(__name__)
from nomad.utils import get_logger
app = Flask(__name__)
api = Api(app)
# provida a fake user for testing
# provid a fake user for testing
me = users.User.objects(email='me@gmail.com').first()
if me is None:
me = users.User(email='me@gmail.com', name='Me Meyer')
......@@ -28,9 +26,20 @@ class Uploads(Resource):
'id': upload.upload_id,
'presigned_url': upload.presigned_url,
'create_time': upload.create_time.isoformat() if upload.create_time is not None else None,
'upload_time': upload.upload_time.isoformat() if upload.upload_time is not None else None
'upload_time': upload.upload_time.isoformat() if upload.upload_time is not None else None,
'upload_hash': upload.upload_hash,
}
if upload.processing is not None:
proc = processing.UploadProcessing.from_result_backend(upload.upload_id, upload.processing)
data['processing'] = {
'status': proc.status,
'parse_specs': proc.parse_specs,
'processing_results': proc.processing_results,
'current_task': proc.task_name,
'error': proc.cause.__str__()
}
return {key: value for key, value in data.items() if value is not None}
def get(self):
......@@ -63,27 +72,40 @@ class Upload(Resource):
api.add_resource(Uploads, '/uploads')
api.add_resource(Upload, '/uploads/<string:upload_id>')
if __name__ == '__main__':
def start_upload_handler(quit=False):
"""
Starts a notification handler for uploads in a different thread. This handler
will initiate processing for all received upload events. The processing status
will be saved to the users db.
Arguments:
quit: If true, will only handling one event and stop. Otherwise run forever.
"""
@files.upload_put_handler
def handle_upload_put(received_upload_id: str):
upload = users.Upload.objects(id=received_upload_id).first()
if upload is None:
logger.error(
'Received upload put event on non existing upload %s.' %
received_upload_id)
return
logger = get_logger(__name__, upload_id=received_upload_id)
upload.upload_time = datetime.now()
try:
proc = processing.UploadProcessing(received_upload_id)
proc.start()
upload.processing = proc.to_json()
except Exception as e:
logger.error(
'Unexpected exception while starting processing of upload %s.' %
received_upload_id, exc_info=e)
upload.save()
upload = users.Upload.objects(id=received_upload_id).first()
if upload is None:
logger.error('Upload does not exist')
raise Exception()
with logger.lnr_error('Save upload time'):
upload.upload_time = datetime.now()
upload.save()
with logger.lnr_error('Start processing'):
proc = processing.UploadProcessing(received_upload_id)
proc.start()
upload.processing = proc.result_tuple
upload.save()
except Exception:
pass
if quit:
raise StopIteration
def handle_uploads():
handle_upload_put(received_upload_id='provided by decorator')
......@@ -91,5 +113,10 @@ if __name__ == '__main__':
handle_uploads_thread = Thread(target=handle_uploads)
handle_uploads_thread.start()
return handle_uploads_thread
if __name__ == '__main__':
handle_uploads_thread = start_upload_handler()
app.run(debug=True)
handle_uploads_thread.join()
......@@ -146,9 +146,8 @@ def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]:
logging.debug(
'Handling of upload notifications was stopped via StopIteration.')
return
except Exception as e:
logger.error(
'Unexpected exception in upload handler for %s' % upload_id, exc_info=e)
except Exception:
pass
return wrapper
......
......@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Any
from .normalizer import Normalizer
from .system import SystemNormalizer
......@@ -22,7 +23,10 @@ After parsing calculations have to be normalized with a set of *normalizers*.
In NOMAD-coe those were programmed in python (we'll reuse) and scala (we'll rewrite).
"""
normalizers = [
# The loose explicit type is necessary to avoid a ABC class as item type that causes
# further errors on instantiating the normalizers. A solution would be to use objects
# instead of classes.
normalizers: List[Any] = [
SystemNormalizer,
SymmetryNormalizer,
SystemTypeNormalizer
......
......@@ -38,7 +38,6 @@ from celery.utils.log import get_task_logger
from celery.canvas import Signature
import logging
import logstash
import json
from datetime import datetime
import nomad.config as config
......@@ -128,6 +127,13 @@ class UploadProcessing():
self.cause: Exception = None
self.result_tuple: Any = None
@staticmethod
def from_result_backend(upload_id, result_tuple):
""" Loads the processing data from the results backend and returnes an updated instance. """
processing = UploadProcessing(upload_id)
processing.result_tuple = result_tuple
return processing.updated()
def start(self):
""" Initiates the processing tasks via celery canvas. """
assert not self._is_started, 'Cannot start a started or used processing.'
......@@ -185,11 +191,16 @@ class UploadProcessing():
assert self._is_started, 'Run is not yet started.'
async_result = self._async_result
is_last_task = True
while async_result is not None:
if async_result.ready():
async_result.result.status = async_result.status
status = async_result.status
if status == 'SUCCESS' and not is_last_task:
status == 'PROGRESS'
async_result.result.status = status
return self._update(async_result.result)
else:
is_last_task = False
async_result = async_result.parent
return self
......@@ -238,10 +249,6 @@ class UploadProcessing():
self.task_id = task.request.id
return True
def to_json(self) -> str:
""" Creates a representative JSON record as str. """
return json.dumps(self, indent=4)
@app.task(bind=True)
def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing:
......
......@@ -55,7 +55,7 @@ class Upload(Document):
upload_hash = StringField()
in_staging = BooleanField(default=True)
is_private = BooleanField(default=False)
processing = DictField()
processing = ListField(default=None)
upload_time = DateTimeField()
create_time = DateTimeField()
presigned_url = StringField()
......
from typing import Union, IO, cast
import hashlib
import base64
import json
import logging
from contextlib import contextmanager
def hash(obj: Union[IO, str]) -> str:
......@@ -13,3 +16,53 @@ def hash(obj: Union[IO, str]) -> str:
hash.update(obj.encode('utf-8'))
return base64.b64encode(hash.digest(), altchars=b'-_')[0:28].decode('utf-8')
class DataLogger():
def __init__(self, logger, **kwargs):
self._logger = logger
self.data = kwargs
def _prepare_msg(self, base_msg):
return '%s %s' % (base_msg, self._format_data())
def _format_data(self, ):
return json.dumps(self.data)
def debug(self, msg, *args, **kwargs):
self._logger.debug(self._prepare_msg(msg), *args, **kwargs)
def info(self, msg, *args, **kwargs):
self._logger.info(self._prepare_msg(msg), *args, **kwargs)
def warn(self, msg, *args, **kwargs):
self._logger.warn(self._prepare_msg(msg), *args, **kwargs)
def error(self, msg, *args, **kwargs):
self._logger.error(self._prepare_msg(msg), *args, **kwargs)
def crit(self, msg, *args, **kwargs):
self._logger.crit(self._prepare_msg(msg), *args, **kwargs)
@contextmanager
def lnr_error(self, msg, *args, **kwargs):
"""
Will *log and raise* with an error and the given message and args/kwargs
on all exceptions.
"""
try:
yield
except Exception as e:
self._logger.error(
self._prepare_msg('Exception while: %s' % msg),
exc_info=e, *args, **kwargs)
raise e
def get_logger(name, *args, **kwargs):
"""
Returns a :class:`DataLogger` with the data given as kwargs.
A data logger can be used like any other logger, but will add the data to all
log output. Allowing more structured logging.
"""
return DataLogger(logging.getLogger(name), *args, **kwargs)
......@@ -10,7 +10,8 @@ from minio.error import ResponseError
from nomad import config, api, files
from tests.test_files import example_file
from tests.test_processing import example_files
@pytest.fixture
def client():
......@@ -75,8 +76,9 @@ def test_create_upload(client):
assert_uploads(rv.data, count=1, id=upload_id)
@pytest.mark.parametrize("file", example_files)
@pytest.mark.timeout(10)
def test_upload_to_upload(client):
def test_upload_to_upload(client, file):
rv = client.post('/uploads')
assert rv.status_code == 200
upload = assert_upload(rv.data)
......@@ -94,7 +96,7 @@ def test_upload_to_upload(client):
time.sleep(1)
upload_url = upload['presigned_url']
cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', example_file)
cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file)
subprocess.call(shlex.split(cmd))
handle_uploads_thread.join()
......@@ -103,3 +105,39 @@ def test_upload_to_upload(client):
files._client.remove_object(config.files.uploads_bucket, upload['id'])
except ResponseError:
assert False
@pytest.mark.parametrize("file", example_files)
@pytest.mark.timeout(10)
def test_processing(client, file):
handle_uploads_thread = api.start_upload_handler(quit=True)
rv = client.post('/uploads')
assert rv.status_code == 200
upload = assert_upload(rv.data)
time.sleep(1)
upload_url = upload['presigned_url']
cmd = files.create_curl_upload_cmd(upload_url).replace('<ZIPFILE>', file)
subprocess.call(shlex.split(cmd))
handle_uploads_thread.join()
while True:
time.sleep(1)
rv = client.get('/uploads/%s' % upload['id'])
assert rv.status_code == 200
upload = assert_upload(rv.data)
assert 'upload_time' in upload
assert 'processing' in upload
if upload['processing']['status'] in ['SUCCESS', 'FAILURE']:
break
assert upload['processing']['status'] == 'SUCCESS'
try:
files._client.remove_object(config.files.uploads_bucket, upload['id'])
except ResponseError:
assert False
......@@ -24,7 +24,8 @@ import json
import nomad.files as files
import nomad.config as config
example_file = './data/examples_vasp.zip'
example_file = 'data/examples_vasp.zip'
empty_file = 'data/empty.zip'
@pytest.fixture
......
......@@ -29,8 +29,9 @@ from nomad.processing import UploadProcessing
# delete search index after each test via imported fixture
from tests.test_search import index # pylint: disable=unused-import
from tests.test_files import example_file, empty_file
example_files = ['data/examples_vasp.zip', 'data/empty.zip']
example_files = [empty_file, example_file]
# disable test worker for now, see docstring above
......@@ -84,6 +85,11 @@ def test_processing(uploaded_id, celery_session_worker):
run = UploadProcessing(uploaded_id)
run.start()
# test that the instance can be reinstantiated from a persistable representation
run = UploadProcessing.from_result_backend(uploaded_id, run.result_tuple)
assert run.status in ['PENDING', 'PROGRESS']
run.get(timeout=10)
assert run.ready()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment