Commit 765bb3e9 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Some refactoring. Docker with handler.

parent 9c5c3656
......@@ -5,4 +5,7 @@
.volumes/
data/
docs/
infrastructure/
\ No newline at end of file
infrastructure/
gui/
__pycache__/
*.pyc
\ No newline at end of file
......@@ -77,20 +77,19 @@ services:
command: mongod --smallfiles --logpath=/dev/null # --quiet
# used for centralized logging
elk:
restart: always
build: ./elk/
container_name: nomad-xt-elk
ports:
- 5601:5601 # kibana web
- 9201:9200 # elastic search api
# - 9300:9300 # elastic transport api, not needed by other services or host
- 5044:5044 # logstash beats
- 5000:5000 # logstash tcp
# elk:
# restart: always
# build: ./elk/
# container_name: nomad-xt-elk
# ports:
# - 5601:5601 # kibana web
# - 9201:9200 # elastic search api
# # - 9300:9300 # elastic transport api, not needed by other services or host
# - 5044:5044 # logstash beats
# - 5000:5000 # logstash tcp
# nomad processing worker (parsing/normalizing)
# nomad processing worker
nomad-worker:
restart: always
build: ../
container_name: nomad-xt-worker
environment:
......@@ -99,12 +98,38 @@ services:
- NOMAD_RABBITMQ_HOST=rabbitmq
- NOMAD_REDIS_HOST=redis
- NOMAD_LOGSTASH_HOST=elk
- NOMAD_ELASTIC_HOST=elastic
- NOMAD_MONGO_HOST=mongo
links:
- minio
- redis
- rabbitmq
- elastic
- mongo
- elk
volumes:
- '../.volumes/fs:/app/.volumes/fs'
command: python -m celery worker -l info -A nomad.processing
# nomad upload handler
nomad-handler:
build: ../
container_name: nomad-xt-handler
environment:
- NOMAD_MINIO_PORT=9000
- NOMAD_MINIO_HOST=minio
- NOMAD_RABBITMQ_HOST=rabbitmq
- NOMAD_REDIS_HOST=redis
- NOMAD_LOGSTASH_HOST=elk
- NOMAD_ELASTIC_HOST=elastic
- NOMAD_MONGO_HOST=mongo
links:
- minio
- redis
- rabbitmq
- elastic
- mongo
- elk
volumes:
- '../.volumes/fs:/app/.volumes/fs'
command: python -m nomad.handler
......@@ -26,24 +26,19 @@ class Uploads(Resource):
@staticmethod
def _render(upload: users.Upload):
data = {
'id': upload.upload_id,
'id': upload.upload_id, # deprecated
'upload_id': upload.upload_id,
'presigned_url': upload.presigned_url,
'create_time': upload.create_time.isoformat() if upload.create_time is not None else None,
'upload_time': upload.upload_time.isoformat() if upload.upload_time is not None else None,
'upload_hash': upload.upload_hash,
}
if upload.processing is not None:
proc = processing.UploadProcessing.from_result_backend(upload.upload_id, upload.processing)
processing_data = {
'status': proc.status,
'results': proc.calc_processings,
'current_task': proc.task_name,
'error': proc.cause.__str__()
}
data['processing'] = {
key: value for key, value in processing_data.items() if value is not None
}
if upload.proc_results is not None:
data['processing'] = upload.proc_results
elif upload.proc_task is not None:
proc = processing.UploadProcessing.from_result_backend(upload.upload_id, upload.proc_task)
data['processing'] = proc.to_dict()
return {key: value for key, value in data.items() if value is not None}
......
......@@ -74,11 +74,11 @@ fs = FSConfig(
tmp='.volumes/fs'
)
elastic = ElasticConfig(
host='localhost',
host=os.environ.get('NOMAD_ELASTIC_HOST', 'localhost'),
calc_index='calcs'
)
mongo = MongoConfig(
host='localhost',
host=os.environ.get('NOMAD_MONGO_HOST', 'localhost'),
users_db='users'
)
logstash = LogstashConfig(
......
......@@ -157,10 +157,10 @@ dependencies = [
name='parsers/vasp',
git_url='https://gitlab.mpcdf.mpg.de/nomad-lab/parser-vasp.git',
git_commit='nomad-xt'),
PythonGit(
name='parsers/exciting',
git_url='https://gitlab.mpcdf.mpg.de/nomad-lab/parser-exciting.git',
git_commit='master'),
# PythonGit(
# name='parsers/exciting',
# git_url='https://gitlab.mpcdf.mpg.de/nomad-lab/parser-exciting.git',
# git_commit='master'),
PythonGit(
name='normalizers/stats',
git_url='https://gitlab.mpcdf.mpg.de/nomad-lab/normalizer-stats.git',
......
......@@ -631,12 +631,12 @@ parsers = [
r'?\s*<i name="program" type="string">\s*vasp\s*</i>'
r'?')
),
Parser(
python_git=dependencies['parsers/exciting'],
parser_class_name='vaspparser.VASPParser',
main_file_re=r'^.*\.todo$',
main_contents_re=(r'^todo')
),
# Parser(
# python_git=dependencies['parsers/exciting'],
# parser_class_name='vaspparser.VASPParser',
# main_file_re=r'^.*\.todo$',
# main_contents_re=(r'^todo')
# ),
]
""" Instanciation and constructor based config of all parsers. """
......
......@@ -30,7 +30,7 @@ a upload processing in a serializable form.
.. autoclass:: nomad.processing.UploadProcessing
"""
from typing import List, Any
from typing import List, Any, Dict
from celery import Celery, Task, chord, group
from celery.result import AsyncResult, result_from_tuple
from celery.signals import after_setup_task_logger, after_setup_logger
......@@ -309,6 +309,16 @@ class UploadProcessing():
self.task_id = task.request.id
return True
def to_dict(self) -> Dict[str, Any]:
""" Render processing information into a serializable dict. """
result = {
'status': self.status,
'calcs': self.calc_processings,
'current_task': self.task_name,
'error': self.cause.__str__()
}
return {key: value for key, value in result.items() if value is not None}
@app.task(bind=True)
def open_upload(task: Task, processing: UploadProcessing) -> UploadProcessing:
......@@ -356,24 +366,22 @@ def close_upload(
task, calc_processings: List[CalcProcessing], processing: UploadProcessing) \
-> UploadProcessing:
if not processing.continue_with(task):
return processing
processing.calc_processings = calc_processings
if processing.continue_with(task):
processing.calc_processings = calc_processings
try:
upload = Upload(processing.upload_id)
except KeyError as e:
logger.warning('No upload %s' % processing.upload_id)
return processing.fail(e)
try:
upload = Upload(processing.upload_id)
except KeyError as e:
logger.warning('No upload %s' % processing.upload_id)
return processing.fail(e)
try:
upload.close()
except Exception as e:
logger.error('Could not close upload %s: %s' % (processing.upload_id, e))
return processing.fail(e)
try:
upload.close()
except Exception as e:
logger.error('Could not close upload %s: %s' % (processing.upload_id, e))
return processing.fail(e)
logger.debug('Closed upload %s' % processing.upload_id)
logger.debug('Closed upload %s' % processing.upload_id)
return processing
......@@ -472,11 +480,15 @@ def parse(self, processing: CalcProcessing) -> CalcProcessing:
def handle_uploads(quit=False):
"""
Listens for new uploads in files and initiates their processing.
Starts a daemon that will listen to files for new uploads. For each new
upload it will initiate the processing and save the task in the upload user data,
it will wait for processing to be completed and store the results in the upload
user data.
Arguments:
quit: If true, will only handling one event and stop. Otherwise run forever.
"""
@files.upload_put_handler
def handle_upload_put(received_upload_id: str):
logger = utils.get_logger(__name__, upload_id=received_upload_id)
......@@ -498,14 +510,14 @@ def handle_uploads(quit=False):
with logger.lnr_error('Start processing'):
proc = UploadProcessing(received_upload_id)
proc.start()
upload.processing = proc.result_tuple
upload.proc_task = proc.result_tuple
upload.save()
except Exception:
pass
if quit:
raise StopIteration
logger.debug('Initiated upload processing')
logger.debug('Start upload put notification handler.')
handle_upload_put(received_upload_id='provided by decorator')
......@@ -55,10 +55,14 @@ class Upload(Document):
upload_hash = StringField()
in_staging = BooleanField(default=True)
is_private = BooleanField(default=False)
processing = ListField(default=None)
presigned_url = StringField()
upload_time = DateTimeField()
create_time = DateTimeField()
presigned_url = StringField()
proc_time = DateTimeField()
proc_task = ListField(default=None)
proc_results = DictField(default=None)
user = ReferenceField(User, required=True)
......
......@@ -138,8 +138,8 @@ def test_processing(client, file, celery_session_worker):
proc = upload['processing']
assert proc['status'] == 'SUCCESS'
assert 'results' in proc
assert proc['results'] is not None
assert 'calcs' in proc
assert proc['calcs'] is not None
assert proc['current_task'] == 'nomad.processing.close_upload'
assert_exists(config.files.uploads_bucket, upload['id'])
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment