Commit 54d99974 authored by Markus Scheidgen's avatar Markus Scheidgen
Browse files

Improved processing syncronization.

parent 96a163a8
......@@ -286,16 +286,13 @@ class UploadRes(Resource):
order_by = ('-%s' if order == -1 else '+%s') % order_by
calcs = upload.all_calcs((page - 1) * per_page, page * per_page, order_by)
failed_calcs = upload.failed_calcs
result = upload.json_dict
all_calcs = Calc.objects(upload_id=upload_id)
total = all_calcs.count()
successes = Calc.objects(upload_id=upload_id, status=SUCCESS).count()
failures = Calc.objects(upload_id=upload_id, status=FAILURE).count()
calcs = all_calcs[(page - 1) * per_page:page * per_page].order_by(order_by)
result['calcs'] = {
'pagination': dict(
total=total, page=page, per_page=per_page,
successes=successes, failures=failures),
total=upload.total_calcs, page=page, per_page=per_page,
successes=upload.processed_calcs - failed_calcs, failures=failed_calcs),
'results': [calc.json_dict for calc in calcs]
}
......
......@@ -125,6 +125,8 @@ class Proc(Document, metaclass=ProcMetaclass):
create_time = DateTimeField(required=True)
complete_time = DateTimeField()
_async_status = StringField(default='UNCALLED')
@property
def completed(self) -> bool:
""" Returns True of the process has failed or succeeded. """
......@@ -300,6 +302,7 @@ class Proc(Document, metaclass=ProcMetaclass):
'warnings': self.warnings,
'create_time': self.create_time.isoformat() if self.create_time is not None else None,
'complete_time': self.complete_time.isoformat() if self.complete_time is not None else None,
'_async_status': self._async_status
}
return {key: value for key, value in data.items() if value is not None}
......@@ -368,6 +371,7 @@ def proc_task(task, cls_name, self_id, func_attr):
return
try:
self._async_status = 'RECEIVED-%s' % func.__name__
func(self)
except Exception as e:
self.fail(e)
......@@ -382,6 +386,7 @@ def process(func):
"""
def wrapper(self, *args, **kwargs):
assert len(args) == 0 and len(kwargs) == 0, 'process functions must not have arguments'
self._async_status = 'CALLED-%s' % func.__name__
self.save()
self_id = self.id.__str__()
......
......@@ -40,7 +40,7 @@ import logging
from nomad import config, files, utils
from nomad.repo import RepoCalc
from nomad.user import User, me
from nomad.processing.base import Proc, process, task, PENDING
from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE
from nomad.parsing import LocalBackend, parsers, parser_dict
from nomad.normalizing import normalizers
from nomad.utils import get_logger, lnr
......@@ -74,7 +74,7 @@ class Calc(Proc):
meta: Any = {
'indices': [
'upload_id', 'mainfile', 'code', 'parser'
'upload_id', 'mainfile', 'code', 'parser', 'status'
]
}
......@@ -211,11 +211,10 @@ class Upload(Proc):
upload_time = DateTimeField()
upload_hash = StringField(default=None)
processed_calcs = IntField(default=0)
total_calcs = IntField(default=-1)
user_id = StringField(required=True)
_initiated_parsers = IntField(default=-1)
meta: Any = {
'indexes': [
'upload_hash',
......@@ -305,6 +304,7 @@ class Upload(Proc):
return False
def unstage(self):
self.get_logger().info('unstage')
self.in_staging = False
RepoCalc.update_upload(upload_id=self.upload_id, staging=False)
self.save()
......@@ -357,7 +357,7 @@ class Upload(Proc):
@task
def parse_all(self):
# TODO: deal with multiple possible parser specs
self.total_calcs = 0
total_calcs = 0
for filename in self._upload.filelist:
for parser in parsers:
try:
......@@ -370,16 +370,17 @@ class Upload(Proc):
upload_id=self.upload_id)
calc.process()
self.total_calcs += 1
total_calcs += 1
except Exception as e:
self.warning(
'exception while matching pot. mainfile',
mainfile=filename, exc_info=e)
if self.total_calcs == 0:
if total_calcs == 0:
self.cleanup()
# have to save the total_calcs information
self._initiated_parsers = total_calcs
self.save()
@task
......@@ -394,12 +395,20 @@ class Upload(Proc):
self.get_logger().debug('closed upload')
def calc_proc_completed(self):
processed_calcs, (total_calcs,) = self.incr_counter(
'processed_calcs', other_fields=['total_calcs'])
if processed_calcs == total_calcs:
if self._initiated_parsers >= 0 and self.processed_calcs >= self.total_calcs:
self.cleanup()
@property
def calcs(self):
return Calc.objects(upload_id=self.upload_hash)
def processed_calcs(self):
return Calc.objects(upload_id=self.upload_id, status__in=[SUCCESS, FAILURE]).count()
@property
def total_calcs(self):
return Calc.objects(upload_id=self.upload_id).count()
@property
def failed_calcs(self):
return Calc.objects(upload_id=self.upload_id, status=FAILURE).count()
def all_calcs(self, start, end, order_by='mainfile'):
return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by)
......@@ -94,8 +94,6 @@ class RepoCalc(ElasticDocument):
Create a new calculation instance in elastic search. The data from the given backend
will be used. Additional meta-data can be given as *kwargs*. ``upload_id``,
``upload_hash``, and ``calc_hash`` are mandatory.
This will create a elastic search entry and store the backend data to the
archive.
Arguments:
backend: The parsing/normalizing backend that contains the calculation data.
......@@ -124,9 +122,13 @@ class RepoCalc(ElasticDocument):
try:
value = backend.get_value(property, 0)
except KeyError:
try:
program_name = backend.get_value('program_name', 0)
except KeyError:
program_name = 'unknown'
logger.warning(
'Missing property value', property=property, upload_id=upload_id,
upload_hash=upload_hash, calc_hash=calc_hash)
upload_hash=upload_hash, calc_hash=calc_hash, code=program_name)
continue
setattr(calc, property, value)
......@@ -135,14 +137,23 @@ class RepoCalc(ElasticDocument):
try:
# In practive es operation might fail due to timeout under heavy loads/
# bad configuration. Retries with a small delay is a pragmatic solution.
e_after_retries = None
for _ in range(0, 2):
try:
calc.save(op_type='create')
e_after_retries = None
break
except ConnectionTimeout as e:
e_after_retries = e
time.sleep(1)
except ConflictError as e: # this should never happen, but happens
e_after_retries = e
time.sleep(1)
else:
raise e
if e_after_retries is not None:
# if we had and exception and could not fix with retries, throw it
raise e_after_retries # pylint: disable=E0702
except ConflictError:
raise AlreadyExists('Calculation %s does already exist.' % (calc.archive_id))
......
......@@ -75,7 +75,7 @@ if not _logging_is_configured:
def logger_factory(*args):
logger = default_factory(*args)
if 'pytest' not in sys.modules:
if 'pytest' in sys.modules:
logger.setLevel(logging.WARNING)
else:
logger.setLevel(logging.DEBUG)
......
......@@ -27,7 +27,7 @@ def worker(celery_session_worker):
processes are finished. Therefore open task request might bleed into the next test.
"""
yield
time.sleep(0.2)
time.sleep(0.5)
@pytest.fixture(scope='function', autouse=True)
......
......@@ -138,11 +138,13 @@ def test_task_failure(monkeypatch, uploaded_id, worker, task, caplog):
assert upload.status == 'FAILURE'
assert upload.current_task == task
assert len(upload.errors) > 0
elif len(upload.calcs) > 0: # pylint: disable=E1101
assert upload.status == 'SUCCESS'
assert upload.current_task == 'cleanup'
assert len(upload.errors) > 0
for calc in upload.calcs: # pylint: disable=E1101
assert calc.status == 'FAILURE'
assert calc.current_task == 'parsing'
assert len(calc.errors) > 0
else:
# there is an empty example with no calcs, even if past parsing_all task
if upload.total_calcs > 0: # pylint: disable=E1101
assert upload.status == 'SUCCESS'
assert upload.current_task == 'cleanup'
assert len(upload.errors) == 0
for calc in upload.all_calcs(0, 100): # pylint: disable=E1101
assert calc.status == 'FAILURE'
assert calc.current_task == 'parsing'
assert len(calc.errors) > 0
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment