Commit 7ba22eda authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Bugfixes.

parent 7f2dfd85
......@@ -2,7 +2,7 @@ import { apiBase } from './config'
class Upload {
constructor(json) {
console.debug('Created local upload for ' + json.id)
console.debug('Created local upload for ' + json.upload_id)
Object.assign(this, json)
}
......@@ -16,7 +16,7 @@ class Upload {
},
body: file
}).then(() => {
console.log(`Uploaded ${file} to ${this.id}.`)
console.log(`Uploaded ${file} to ${this.upload_id}.`)
return this
}).catch(error => {
console.error(`Could not upload ${file} to ${this.presigned_url}: ${error}.`)
......@@ -25,7 +25,7 @@ class Upload {
}
update() {
return fetch(`${apiBase}/uploads/${this.id}`)
return fetch(`${apiBase}/uploads/${this.upload_id}`)
.then(response => response.json())
.then(uploadJson => {
Object.assign(this, uploadJson)
......
......@@ -29,9 +29,9 @@ class Upload extends React.Component {
window.setTimeout(() => {
this.state.upload.update()
.then(upload => {
console.debug(`Sucessfully updated upload ${upload.id}.`)
console.debug(`Sucessfully updated upload ${upload.upload_id}.`)
this.setState({upload: upload})
if (!(upload.processing && upload.processing.status == 'SUCCESS')) {
if (upload.proc.status != 'SUCCESS') {
this.updateUpload()
}
})
......@@ -58,9 +58,11 @@ class Upload extends React.Component {
</Typography>
)
const proc = upload.proc
console.assert(proc, 'Uploads always must have a proc')
const stepper = (
<Stepper activeStep={upload.tasks.indexOf(upload.task)} orientation="vertical">
{upload.tasks.map((label, index) => (
<Stepper activeStep={proc.task_names.indexOf(proc.current_task_name)} orientation="vertical">
{proc.task_names.map((label, index) => (
<Step key={label}>
<StepLabel>{label}</StepLabel>
</Step>
......
......@@ -132,4 +132,4 @@ services:
# - elk
volumes:
- '../.volumes/fs:/app/.volumes/fs'
command: python -m nomad.handler
command: python -m nomad.processing.handlerdaemon
......@@ -4,7 +4,6 @@ from datetime import datetime
import mongoengine.errors
from flask_cors import CORS
import logging
import json
from nomad import users, files
from nomad.processing import UploadProc
......@@ -52,6 +51,7 @@ class Uploads(Resource):
upload.presigned_url = files.get_presigned_upload_url(upload.upload_id)
upload.create_time = datetime.now()
upload.proc = UploadProc(upload.upload_id)
upload.save()
return Uploads._render(upload), 200
......
......@@ -13,7 +13,6 @@
# limitations under the License.
from datetime import datetime
import logging
from threading import Thread
from nomad import files, utils, users
......@@ -22,16 +21,13 @@ from nomad.processing.tasks import extracting_task, cleanup_task, parse_all_task
from nomad.processing.state import UploadProc
def start_processing(upload_id) -> UploadProc:
def start_processing(upload_id, proc: UploadProc=None) -> UploadProc:
""" Starts the processing tasks via celery canvas. """
task_names = [
extracting_task.name,
parse_all_task.name,
cleanup_task.name
]
proc = UploadProc(upload_id, task_names)
if proc is not None:
proc = UploadProc(**proc)
else:
proc = UploadProc(upload_id)
# Keep the results of the last task is the workflow.
# The last task is started by another task, therefore it
......@@ -84,7 +80,7 @@ def handle_uploads(quit=False):
upload.save()
with logger.lnr_error('Start processing'):
proc = start_processing(received_upload_id)
proc = start_processing(received_upload_id, proc=upload.proc)
assert proc.is_started
upload.proc = proc
upload.save()
......@@ -103,8 +99,3 @@ def handle_uploads_thread(quit=True):
thread = Thread(target=lambda: handle_uploads(quit))
thread.start()
return thread
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
handle_uploads()
import logging
from nomad.processing import handle_uploads
from nomad.processing.handler import handle_uploads
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
......
......@@ -157,8 +157,11 @@ class UploadProc(ProcPipeline):
calc_procs: The state data for all child calc processings.
celery_task_ids: Serialized form of the celery async_results tree for the processing.
"""
def __init__(self, upload_id: str, task_names: List[str], *args, **kwargs) -> None:
def __init__(self, upload_id: str, *args, **kwargs) -> None:
assert upload_id is not None
# TODO there should be a way to read the names from the tasks
# but currently this is not possible due to import cycles
task_names = ['uploading', 'extracting', 'parse_all', 'cleanup']
super().__init__(task_names, *args)
self.upload_id = upload_id
......@@ -170,6 +173,9 @@ class UploadProc(ProcPipeline):
self.update(kwargs)
if not self.is_started:
self.continue_with(task_names[0])
@property
def _celery_task_result(self) -> AsyncResult:
"""
......@@ -185,10 +191,13 @@ class UploadProc(ProcPipeline):
return result_from_tuple(self.celery_task_ids, app=app)
def update_from_backend(self) -> 'UploadProc':
def update_from_backend(self):
""" Consults the result backend and updates itself with the available results. """
assert self.is_started, 'Run is not yet started.'
if self.celery_task_ids is None:
return
celery_task_result = self._celery_task_result
while celery_task_result is not None:
......@@ -202,8 +211,6 @@ class UploadProc(ProcPipeline):
for calc_proc in self.calc_procs:
calc_proc.update_from_backend()
return self
def forget(self) -> None:
""" Forget the results of a completed run; free all resources in the results backend. """
# TODO, this is not forgetting the parse task in the parse_all header, right?
......
......@@ -13,7 +13,7 @@ from tests.test_processing import example_files
from tests.test_files import assert_exists
# import fixtures
from tests.test_files import clear_files, archive_id # pylint: disable=unused-import
from tests.test_processing import celery_config, celery_includes # pylint: disable=unused-import
from tests.test_processing import celery_session_worker, celery_config, celery_includes # pylint: disable=unused-import
@pytest.fixture
......@@ -130,17 +130,15 @@ def test_processing(client, file, celery_session_worker):
assert rv.status_code == 200
upload = assert_upload(rv.data)
assert 'upload_time' in upload
if 'proc' in upload:
assert 'status' in upload['proc']
if upload['proc']['status'] in ['SUCCESS', 'FAILURE']:
break
if upload['proc']['status'] in ['SUCCESS', 'FAILURE']:
break
proc = upload['proc']
assert proc['status'] == 'SUCCESS'
assert 'calc_procs' in proc
assert proc['calc_procs'] is not None
assert proc['current_task_name'] == 'cleanup'
assert len(proc['task_names']) == 4
assert_exists(config.files.uploads_bucket, upload['upload_id'])
......
......@@ -24,7 +24,7 @@ import time
import nomad.config as config
import nomad.files as files
from nomad.processing import start_processing
from nomad.processing import start_processing, app
from tests.test_files import example_file, empty_file
# import fixtures
......@@ -34,6 +34,11 @@ from tests.test_files import clear_files # pylint: disable=unused-import
example_files = [empty_file, example_file]
@pytest.fixture(scope='session')
def celery_session_worker():
return app
@pytest.fixture(scope='session')
def celery_includes():
return ['nomad.processing.tasks']
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment