Commit 44ef3cd3 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Added upload status polling. Debuging double threaded notification deamon.

parent 515f2c6a
......@@ -11,7 +11,7 @@ cd nomad-xt
Second, create and source your own virtual python environment:
```
pip install virtualenv
virtualenv -p `which phyton3` .pyenv
virtualenv -p `which python3` .pyenv
source .pyenv/bin/activate
```
......
import { apiBase } from './config'
class Upload {
constructor(json) {
console.debug('Created local upload for ' + json.id)
Object.assign(this, json)
}
uploadFile(file) {
console.assert(this.presigned_url)
console.debug(`Upload ${file} to ${this.presigned_url}.`)
return fetch(this.presigned_url, {
method: 'PUT',
headers: {
'Content-Type': 'application/gzip'
},
body: file
}).then(() => {
console.log(`Uploaded ${file} to ${this.id}.`)
return this
}).catch(error => {
console.error(`Could not upload ${file} to ${this.presigned_url}: ${error}.`)
return this
})
}
update() {
return fetch(`${apiBase}/uploads/${this.id}`)
.then(response => response.json())
.then(uploadJson => {
Object.assign(this, uploadJson)
return this
})
}
}
function createUpload() {
console.debug('Request new upload.')
return fetch(`${apiBase}/uploads`, {method: 'POST'})
.then(response => response.json())
.then(uploadJson => new Upload(uploadJson))
}
const api = {
createUpload: createUpload
};
export default api;
\ No newline at end of file
import React from 'react';
import PropTypes from 'prop-types';
import { withStyles } from '@material-ui/core';
class Upload extends React.Component {
static propTypes = {
classes: PropTypes.object.isRequired,
upload: PropTypes.object.isRequired
}
static styles = theme => ({
root: {
background: 'red'
}
});
constructor(props) {
super(props)
this.state = {
upload: props.upload
}
}
updateUpload() {
window.setTimeout(() => {
this.state.upload.update()
.then(upload => {
console.debug(`Sucessfully updated upload ${upload.id}.`)
this.setState({upload: upload})
if (!(upload.processing && upload.processing.status == 'SUCCESS')) {
this.updateUpload()
}
})
}, 1000)
}
componentDidMount() {
this.updateUpload()
}
render() {
const { classes } = this.props;
const { upload } = this.state;
return (
<div className={classes.root}>
<pre>
{JSON.stringify(upload, null, 2)}
</pre>
</div>
)
}
}
export default withStyles(Upload.styles)(Upload);
\ No newline at end of file
......@@ -3,6 +3,8 @@ import Markdown from './Markdown';
import { withStyles, Paper } from '@material-ui/core';
import UploadIcon from '@material-ui/icons/CloudUpload';
import Dropzone from 'react-dropzone';
import api from '../api';
import Upload from './Upload'
const greetings = `
## Upload your own data to **nomad xt**
......@@ -34,11 +36,23 @@ var styles = theme => ({
}
});
class Upload extends React.Component {
class Uploads extends React.Component {
constructor(props) {
super(props)
this.state = {
uploads: []
}
}
onDrop(files, rejected) {
console.log(files)
onDrop(files) {
files.forEach(file => {
api.createUpload()
.then(upload => upload.uploadFile(file))
.then(upload => {
this.setState({uploads: [...this.state.uploads, upload]})
})
});
}
render() {
......@@ -59,9 +73,10 @@ class Upload extends React.Component {
<UploadIcon style={{fontSize: 36}}/>
</Dropzone>
</Paper>
{this.state.uploads.map((upload, key) => (<Upload key={key} upload={upload}/>))}
</div>
)
}
}
export default withStyles(styles)(Upload);
\ No newline at end of file
export default withStyles(styles)(Uploads);
\ No newline at end of file
export const apiBase = 'http://localhost:5000'
\ No newline at end of file
......@@ -3,11 +3,14 @@ from flask_restful import Resource, Api, abort
from datetime import datetime
from threading import Thread
import mongoengine.errors
from flask_cors import CORS
import logging
from nomad import users, files, processing
from nomad.utils import get_logger
app = Flask(__name__)
CORS(app)
api = Api(app)
......@@ -89,50 +92,6 @@ api.add_resource(Uploads, '/uploads')
api.add_resource(Upload, '/uploads/<string:upload_id>')
def start_upload_handler(quit=False):
"""
Starts a notification handler for uploads in a different thread. This handler
will initiate processing for all received upload events. The processing status
will be saved to the users db.
Arguments:
quit: If true, will only handling one event and stop. Otherwise run forever.
"""
@files.upload_put_handler
def handle_upload_put(received_upload_id: str):
logger = get_logger(__name__, upload_id=received_upload_id)
try:
upload = users.Upload.objects(id=received_upload_id).first()
if upload is None:
logger.error('Upload does not exist')
raise Exception()
with logger.lnr_error('Save upload time'):
upload.upload_time = datetime.now()
upload.save()
with logger.lnr_error('Start processing'):
proc = processing.UploadProcessing(received_upload_id)
proc.start()
upload.processing = proc.result_tuple
upload.save()
except Exception:
pass
if quit:
raise StopIteration
def handle_uploads():
handle_upload_put(received_upload_id='provided by decorator')
handle_uploads_thread = Thread(target=handle_uploads)
handle_uploads_thread.start()
return handle_uploads_thread
if __name__ == '__main__':
handle_uploads_thread = start_upload_handler()
logging.basicConfig(level=logging.DEBUG)
app.run(debug=True)
handle_uploads_thread.join()
......@@ -53,6 +53,7 @@ import base64
from contextlib import contextmanager
import gzip
import io
import json
import nomad.config as config
......@@ -112,30 +113,29 @@ def create_curl_upload_cmd(presigned_url: str, file_dummy: str='<ZIPFILE>') -> s
def upload_put_handler(func: Callable[[str], None]) -> Callable[[], None]:
def upload_notifications(events: List[Any]) -> Generator[str, None, None]:
# The given events is a generator that will block and yield indefinetely.
# Therefore, we have to use generator expressions and must not use list
# comprehension. Same for chain vs chain.from_iterable.
nested_event_records = (event['Records'] for event in events)
event_records = itertools.chain.from_iterable(nested_event_records)
for event_record in event_records:
try:
event_name = event_record['eventName']
if event_name == 's3:ObjectCreated:Put':
logger.debug('Received bucket upload event of type %s.' % event_name)
upload_id = event_record['s3']['object']['key']
yield upload_id
else:
logger.debug('Unhandled bucket event of type %s.' % event_name)
except KeyError:
logger.warning(
'Unhandled bucket event due to unexprected event format: %s' %
event_record)
for event in events:
for event_record in event['Records']:
try:
event_name = event_record['eventName']
if event_name == 's3:ObjectCreated:Put':
upload_id = event_record['s3']['object']['key']
logger.debug('Received bucket upload event of for upload %s.' % upload_id)
yield upload_id
break # only one per record, pls
else:
logger.debug('Unhanled bucket event %s.' % event_name)
except KeyError:
logger.warning(
'Unhandled bucket event due to unexprected event format: %s' %
event_record)
def wrapper(*args, **kwargs) -> None:
logger.info('Start listening to uploads notifications.')
events = _client.listen_bucket_notification(config.files.uploads_bucket)
_client.remove_all_bucket_notification(config.files.uploads_bucket)
events = _client.listen_bucket_notification(
config.files.uploads_bucket,
events=['s3:ObjectCreated:*'])
upload_ids = upload_notifications(events)
for upload_id in upload_ids:
......
......@@ -42,7 +42,7 @@ from datetime import datetime
import nomad.config as config
from nomad.files import Upload, UploadError
from nomad import files, utils
from nomad import files, utils, users
from nomad.parsing import parsers, parser_dict
from nomad.normalizing import normalizers
from nomad import search
......@@ -470,6 +470,51 @@ def parse(self, processing: CalcProcessing) -> CalcProcessing:
return processing
@app.task()
def mul(x, y):
return x * y
def start_upload_handler(quit=False):
"""
Starts a notification handler for uploads in a different thread. This handler
will initiate processing for all received upload events. The processing status
will be saved to the users db.
Arguments:
quit: If true, will only handling one event and stop. Otherwise run forever.
"""
@files.upload_put_handler
def handle_upload_put(received_upload_id: str):
logger = utils.get_logger(__name__, upload_id=received_upload_id)
logger.debug('Initiate upload processing')
try:
upload = users.Upload.objects(id=received_upload_id).first()
if upload is None:
logger.error('Upload does not exist')
raise Exception()
logger.error('%s' % upload.upload_time)
if upload.upload_time is not None:
logger.warn('Ignore upload notification, since file is already uploaded')
raise StopIteration
with logger.lnr_error('Save upload time'):
upload.upload_time = datetime.now()
upload.save()
with logger.lnr_error('Start processing'):
proc = UploadProcessing(received_upload_id)
proc.start()
upload.processing = proc.result_tuple
upload.save()
except Exception:
pass
if quit:
raise StopIteration
logger.debug('Initiated upload processing')
logger = logging.getLogger(__name__)
logger.debug('Start upload put notification handler.')
handle_upload_put(received_upload_id='provided by decorator')
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
start_upload_handler()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment