Commit 13c3891d authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Api saves updates on upload processings and provides stale upload information.

parent f67213e5
...@@ -46,7 +46,8 @@ class Uploads extends React.Component { ...@@ -46,7 +46,8 @@ class Uploads extends React.Component {
componentDidMount() { componentDidMount() {
api.getUploads() api.getUploads()
.then(uploads => { .then(uploads => {
this.setState({uploads: uploads}) const filteredUploads = uploads.filter(upload => !upload.is_state)
this.setState({uploads: filteredUploads})
}) })
.catch(error => { .catch(error => {
this.setState({uploads: []}) this.setState({uploads: []})
......
...@@ -39,31 +39,44 @@ def _external_objects_url(url): ...@@ -39,31 +39,44 @@ def _external_objects_url(url):
'%s%s%s' % (config.services.objects_host, port_with_colon, config.services.objects_base_path)) '%s%s%s' % (config.services.objects_host, port_with_colon, config.services.objects_base_path))
class Uploads(Resource): def _update_and_render(upload: users.Upload):
"""
If the given upload as a processing state attached, it will attempt to update this
state and store the results, before the upload is rendered for the client.
"""
is_stale = False
if upload.proc:
proc = UploadProc(**upload.proc)
if proc.update_from_backend():
upload.proc = proc
upload.save()
if proc.current_task_name == proc.task_names[0] and upload.upload_time is None:
is_stale = (datetime.now() - upload.create_time).days > 1
else:
proc = None
data = {
'name': upload.name,
'upload_id': upload.upload_id,
'presigned_url': _external_objects_url(upload.presigned_url),
'presigned_orig': upload.presigned_url,
'create_time': upload.create_time.isoformat() if upload.create_time is not None else None,
'upload_time': upload.upload_time.isoformat() if upload.upload_time is not None else None,
'proc_time': upload.proc_time.isoformat() if upload.proc_time is not None else None,
'is_stale': is_stale,
'proc': proc
}
return {key: value for key, value in data.items() if value is not None}
@staticmethod
def _render(upload: users.Upload):
if upload.proc:
proc = UploadProc(**upload.proc)
proc.update_from_backend()
else:
proc = None
data = {
'name': upload.name,
'upload_id': upload.upload_id,
'presigned_url': _external_objects_url(upload.presigned_url),
'presigned_orig': upload.presigned_url,
'create_time': upload.create_time.isoformat() if upload.create_time is not None else None,
'upload_time': upload.upload_time.isoformat() if upload.upload_time is not None else None,
'proc_time': upload.proc_time.isoformat() if upload.proc_time is not None else None,
'proc': proc
}
return {key: value for key, value in data.items() if value is not None} class Uploads(Resource):
def get(self): def get(self):
return [Uploads._render(user) for user in users.Upload.objects()], 200 return [_update_and_render(user) for user in users.Upload.objects()], 200
def post(self): def post(self):
json_data = request.get_json() json_data = request.get_json()
...@@ -78,7 +91,7 @@ class Uploads(Resource): ...@@ -78,7 +91,7 @@ class Uploads(Resource):
upload.proc = UploadProc(upload.upload_id) upload.proc = UploadProc(upload.upload_id)
upload.save() upload.save()
return Uploads._render(upload), 200 return _update_and_render(upload), 200
class Upload(Resource): class Upload(Resource):
...@@ -91,7 +104,7 @@ class Upload(Resource): ...@@ -91,7 +104,7 @@ class Upload(Resource):
if upload is None: if upload is None:
abort(404, message='Upload with id %s does not exist.' % upload_id) abort(404, message='Upload with id %s does not exist.' % upload_id)
return Uploads._render(upload), 200 return _update_and_render(upload), 200
class RepoCalc(Resource): class RepoCalc(Resource):
......
...@@ -118,17 +118,24 @@ class CalcProc(ProcPipeline): ...@@ -118,17 +118,24 @@ class CalcProc(ProcPipeline):
self.update(kwargs) self.update(kwargs)
def update_from_backend(self): def update_from_backend(self) -> bool:
""" Consults results backend and updates. Returns if object might have changed. """
if self.status in ['FAILED', 'SUCCESS']:
return False
if self.celery_task_id is None: if self.celery_task_id is None:
return return False
celery_task_result = AsyncResult(self.celery_task_id, app=app) celery_task_result = AsyncResult(self.celery_task_id, app=app)
if celery_task_result.ready(): if celery_task_result.ready():
self.update(celery_task_result.result) self.update(celery_task_result.result)
return True
else: else:
info = celery_task_result.info info = celery_task_result.info
if info is not None: if info is not None:
self.update(info) self.update(info)
return True
return False
class UploadProc(ProcPipeline): class UploadProc(ProcPipeline):
...@@ -208,25 +215,38 @@ class UploadProc(ProcPipeline): ...@@ -208,25 +215,38 @@ class UploadProc(ProcPipeline):
return result_from_tuple(self.celery_task_ids, app=app) return result_from_tuple(self.celery_task_ids, app=app)
def update_from_backend(self): def update_from_backend(self) -> bool:
""" Consults the result backend and updates itself with the available results. """ """
Consults the result backend and updates itself with the available results.
Will only update not completed processings.
Returns:
If object might have changed.
"""
assert self.is_started, 'Run is not yet started.' assert self.is_started, 'Run is not yet started.'
if self.status in ['SUCCESS', 'FAILED']:
return False
if self.celery_task_ids is None: if self.celery_task_ids is None:
return return False
celery_task_result = self._celery_task_result celery_task_result = self._celery_task_result
might_have_changed = False
while celery_task_result is not None: while celery_task_result is not None:
if celery_task_result.ready(): if celery_task_result.ready():
self.update(celery_task_result.result) self.update(celery_task_result.result)
might_have_changed = True
break break
else: else:
celery_task_result = celery_task_result.parent celery_task_result = celery_task_result.parent
if self.calc_procs is not None: if self.calc_procs is not None:
for calc_proc in self.calc_procs: for calc_proc in self.calc_procs:
calc_proc.update_from_backend() if calc_proc.update_from_backend():
might_have_changed = True
return might_have_changed
def forget(self) -> None: def forget(self) -> None:
""" Forget the results of a completed run; free all resources in the results backend. """ """ Forget the results of a completed run; free all resources in the results backend. """
...@@ -251,6 +271,8 @@ class UploadProc(ProcPipeline): ...@@ -251,6 +271,8 @@ class UploadProc(ProcPipeline):
Returns: An upadted instance of itself with all the results. Returns: An upadted instance of itself with all the results.
""" """
# TODO this is not a good idea, we wont catch failed parent processes and block
# forever
assert self.is_started, 'Run is not yet started.' assert self.is_started, 'Run is not yet started.'
self._celery_task_result.get(*args, **kwargs) self._celery_task_result.get(*args, **kwargs)
......
...@@ -6,20 +6,28 @@ import time ...@@ -6,20 +6,28 @@ import time
import json import json
from mongoengine import connect from mongoengine import connect
from mongoengine.connection import disconnect from mongoengine.connection import disconnect
from datetime import datetime, timedelta
from nomad import config, api, files, processing from nomad import config
# for convinience we test the api without path prefix
services_config = config.services._asdict()
services_config.update(api_base_path='')
config.services = config.NomadServicesConfig(**services_config)
from nomad import api, files, processing, users # noqa
from tests.test_processing import example_files # noqa
from tests.test_files import assert_exists # noqa
from tests.test_processing import example_files
from tests.test_files import assert_exists
# import fixtures # import fixtures
from tests.test_files import clear_files, archive_id # pylint: disable=unused-import from tests.test_files import clear_files, archive_id # noqa pylint: disable=unused-import
from tests.test_normalizing import normalized_vasp_example # pylint: disable=unused-import from tests.test_normalizing import normalized_vasp_example # noqa pylint: disable=unused-import
from tests.test_parsing import parsed_vasp_example # pylint: disable=unused-import from tests.test_parsing import parsed_vasp_example # noqa pylint: disable=unused-import
from tests.test_search import example_entry # pylint: disable=unused-import from tests.test_search import example_entry # noqa pylint: disable=unused-import
from tests.test_processing import celery_config, celery_includes # pylint: disable=unused-import from tests.test_processing import celery_config, celery_includes # noqa pylint: disable=unused-import
@pytest.fixture @pytest.fixture(scope='function')
def client(): def client():
disconnect() disconnect()
connect('users_test', host=config.mongo.host, is_mock=True) connect('users_test', host=config.mongo.host, is_mock=True)
...@@ -28,6 +36,7 @@ def client(): ...@@ -28,6 +36,7 @@ def client():
client = api.app.test_client() client = api.app.test_client()
yield client yield client
users.Upload._get_collection().drop()
def assert_uploads(upload_json_str, count=0, **kwargs): def assert_uploads(upload_json_str, count=0, **kwargs):
...@@ -39,7 +48,7 @@ def assert_uploads(upload_json_str, count=0, **kwargs): ...@@ -39,7 +48,7 @@ def assert_uploads(upload_json_str, count=0, **kwargs):
assert_upload(json.dumps(data[0]), **kwargs) assert_upload(json.dumps(data[0]), **kwargs)
def assert_upload(upload_json_str, id=None): def assert_upload(upload_json_str, id=None, **kwargs):
data = json.loads(upload_json_str) data = json.loads(upload_json_str)
assert 'upload_id' in data assert 'upload_id' in data
if id is not None: if id is not None:
...@@ -47,6 +56,9 @@ def assert_upload(upload_json_str, id=None): ...@@ -47,6 +56,9 @@ def assert_upload(upload_json_str, id=None):
assert 'create_time' in data assert 'create_time' in data
assert 'presigned_url' in data assert 'presigned_url' in data
for key, value in kwargs.items():
assert data.get(key, None) == value
return data return data
...@@ -67,6 +79,23 @@ def test_not_existing_upload(client): ...@@ -67,6 +79,23 @@ def test_not_existing_upload(client):
assert rv.status_code == 404 assert rv.status_code == 404
def test_stale_upload(client):
rv = client.post(
'/uploads',
data=json.dumps(dict(name='test_name')),
content_type='application/json')
assert rv.status_code == 200
upload_id = assert_upload(rv.data)['upload_id']
upload = users.Upload.objects(id=upload_id).first()
upload.create_time = datetime.now() - timedelta(days=2)
upload.save()
rv = client.get('/uploads/%s' % upload_id)
assert rv.status_code == 200
assert_upload(rv.data, is_stale=True)
def test_create_upload(client): def test_create_upload(client):
rv = client.post('/uploads') rv = client.post('/uploads')
...@@ -75,7 +104,7 @@ def test_create_upload(client): ...@@ -75,7 +104,7 @@ def test_create_upload(client):
rv = client.get('/uploads/%s' % upload_id) rv = client.get('/uploads/%s' % upload_id)
assert rv.status_code == 200 assert rv.status_code == 200
assert_upload(rv.data, id=upload_id) assert_upload(rv.data, id=upload_id, is_stale=False)
rv = client.get('/uploads') rv = client.get('/uploads')
assert rv.status_code == 200 assert rv.status_code == 200
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment