diff --git a/nomad/api.py b/nomad/api.py index 88e969f12f56537af7960749aeae737d38061d55..7c14b98cc0417c06299c3aa88777f0122d70a0bf 100644 --- a/nomad/api.py +++ b/nomad/api.py @@ -286,16 +286,13 @@ class UploadRes(Resource): order_by = ('-%s' if order == -1 else '+%s') % order_by + calcs = upload.all_calcs((page - 1) * per_page, page * per_page, order_by) + failed_calcs = upload.failed_calcs result = upload.json_dict - all_calcs = Calc.objects(upload_id=upload_id) - total = all_calcs.count() - successes = Calc.objects(upload_id=upload_id, status=SUCCESS).count() - failures = Calc.objects(upload_id=upload_id, status=FAILURE).count() - calcs = all_calcs[(page - 1) * per_page:page * per_page].order_by(order_by) result['calcs'] = { 'pagination': dict( - total=total, page=page, per_page=per_page, - successes=successes, failures=failures), + total=upload.total_calcs, page=page, per_page=per_page, + successes=upload.processed_calcs - failed_calcs, failures=failed_calcs), 'results': [calc.json_dict for calc in calcs] } diff --git a/nomad/processing/base.py b/nomad/processing/base.py index bfa2eeb7ec2144fca66430ae9233d49ad6f8c77f..fb63977586893a2dc0ca85cae27541a017a5d30d 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -125,6 +125,8 @@ class Proc(Document, metaclass=ProcMetaclass): create_time = DateTimeField(required=True) complete_time = DateTimeField() + _async_status = StringField(default='UNCALLED') + @property def completed(self) -> bool: """ Returns True of the process has failed or succeeded. """ @@ -300,6 +302,7 @@ class Proc(Document, metaclass=ProcMetaclass): 'warnings': self.warnings, 'create_time': self.create_time.isoformat() if self.create_time is not None else None, 'complete_time': self.complete_time.isoformat() if self.complete_time is not None else None, + '_async_status': self._async_status } return {key: value for key, value in data.items() if value is not None} @@ -368,6 +371,7 @@ def proc_task(task, cls_name, self_id, func_attr): return try: + self._async_status = 'RECEIVED-%s' % func.__name__ func(self) except Exception as e: self.fail(e) @@ -382,6 +386,7 @@ def process(func): """ def wrapper(self, *args, **kwargs): assert len(args) == 0 and len(kwargs) == 0, 'process functions must not have arguments' + self._async_status = 'CALLED-%s' % func.__name__ self.save() self_id = self.id.__str__() diff --git a/nomad/processing/data.py b/nomad/processing/data.py index ae921ffbaac40ee1005532f8ed969702e0f2ad02..1cc9efc34eee807a04885d38054e0bc382e1ed6d 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -40,7 +40,7 @@ import logging from nomad import config, files, utils from nomad.repo import RepoCalc from nomad.user import User, me -from nomad.processing.base import Proc, process, task, PENDING +from nomad.processing.base import Proc, process, task, PENDING, SUCCESS, FAILURE from nomad.parsing import LocalBackend, parsers, parser_dict from nomad.normalizing import normalizers from nomad.utils import get_logger, lnr @@ -74,7 +74,7 @@ class Calc(Proc): meta: Any = { 'indices': [ - 'upload_id', 'mainfile', 'code', 'parser' + 'upload_id', 'mainfile', 'code', 'parser', 'status' ] } @@ -211,11 +211,10 @@ class Upload(Proc): upload_time = DateTimeField() upload_hash = StringField(default=None) - processed_calcs = IntField(default=0) - total_calcs = IntField(default=-1) - user_id = StringField(required=True) + _initiated_parsers = IntField(default=-1) + meta: Any = { 'indexes': [ 'upload_hash', @@ -305,6 +304,7 @@ class Upload(Proc): return False def unstage(self): + self.get_logger().info('unstage') self.in_staging = False RepoCalc.update_upload(upload_id=self.upload_id, staging=False) self.save() @@ -357,7 +357,7 @@ class Upload(Proc): @task def parse_all(self): # TODO: deal with multiple possible parser specs - self.total_calcs = 0 + total_calcs = 0 for filename in self._upload.filelist: for parser in parsers: try: @@ -370,16 +370,17 @@ class Upload(Proc): upload_id=self.upload_id) calc.process() - self.total_calcs += 1 + total_calcs += 1 except Exception as e: self.warning( 'exception while matching pot. mainfile', mainfile=filename, exc_info=e) - if self.total_calcs == 0: + if total_calcs == 0: self.cleanup() # have to save the total_calcs information + self._initiated_parsers = total_calcs self.save() @task @@ -394,12 +395,20 @@ class Upload(Proc): self.get_logger().debug('closed upload') def calc_proc_completed(self): - processed_calcs, (total_calcs,) = self.incr_counter( - 'processed_calcs', other_fields=['total_calcs']) - - if processed_calcs == total_calcs: + if self._initiated_parsers >= 0 and self.processed_calcs >= self.total_calcs: self.cleanup() @property - def calcs(self): - return Calc.objects(upload_id=self.upload_hash) + def processed_calcs(self): + return Calc.objects(upload_id=self.upload_id, status__in=[SUCCESS, FAILURE]).count() + + @property + def total_calcs(self): + return Calc.objects(upload_id=self.upload_id).count() + + @property + def failed_calcs(self): + return Calc.objects(upload_id=self.upload_id, status=FAILURE).count() + + def all_calcs(self, start, end, order_by='mainfile'): + return Calc.objects(upload_id=self.upload_id)[start:end].order_by(order_by) diff --git a/nomad/repo.py b/nomad/repo.py index 526255caabf149fd18cb892c960144e8f80da848..655c2ad5090c8f123d91def3a42ef9ae7c88c0de 100644 --- a/nomad/repo.py +++ b/nomad/repo.py @@ -94,8 +94,6 @@ class RepoCalc(ElasticDocument): Create a new calculation instance in elastic search. The data from the given backend will be used. Additional meta-data can be given as *kwargs*. ``upload_id``, ``upload_hash``, and ``calc_hash`` are mandatory. - This will create a elastic search entry and store the backend data to the - archive. Arguments: backend: The parsing/normalizing backend that contains the calculation data. @@ -124,9 +122,13 @@ class RepoCalc(ElasticDocument): try: value = backend.get_value(property, 0) except KeyError: + try: + program_name = backend.get_value('program_name', 0) + except KeyError: + program_name = 'unknown' logger.warning( 'Missing property value', property=property, upload_id=upload_id, - upload_hash=upload_hash, calc_hash=calc_hash) + upload_hash=upload_hash, calc_hash=calc_hash, code=program_name) continue setattr(calc, property, value) @@ -135,14 +137,23 @@ class RepoCalc(ElasticDocument): try: # In practive es operation might fail due to timeout under heavy loads/ # bad configuration. Retries with a small delay is a pragmatic solution. + e_after_retries = None for _ in range(0, 2): try: calc.save(op_type='create') + e_after_retries = None break except ConnectionTimeout as e: + e_after_retries = e + time.sleep(1) + except ConflictError as e: # this should never happen, but happens + e_after_retries = e time.sleep(1) else: raise e + if e_after_retries is not None: + # if we had and exception and could not fix with retries, throw it + raise e_after_retries # pylint: disable=E0702 except ConflictError: raise AlreadyExists('Calculation %s does already exist.' % (calc.archive_id)) diff --git a/nomad/utils.py b/nomad/utils.py index 620ff0ae5a556f920bb19cc51924db10550ec2a3..e4645243e88a39d58a465b50c168e4dca54ce75d 100644 --- a/nomad/utils.py +++ b/nomad/utils.py @@ -75,7 +75,7 @@ if not _logging_is_configured: def logger_factory(*args): logger = default_factory(*args) - if 'pytest' not in sys.modules: + if 'pytest' in sys.modules: logger.setLevel(logging.WARNING) else: logger.setLevel(logging.DEBUG) diff --git a/tests/conftest.py b/tests/conftest.py index 7105b99a36c3cefc33fb6ff4ec41886316341dd8..04fefe18574e7c091fcaa313442b00a4adbe3f91 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -27,7 +27,7 @@ def worker(celery_session_worker): processes are finished. Therefore open task request might bleed into the next test. """ yield - time.sleep(0.2) + time.sleep(0.5) @pytest.fixture(scope='function', autouse=True) diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index b52e71adc58f127813a6e3b5b438d924cf62ae61..20a34fdd377927eae13c93786fc2da746004efb7 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -138,11 +138,13 @@ def test_task_failure(monkeypatch, uploaded_id, worker, task, caplog): assert upload.status == 'FAILURE' assert upload.current_task == task assert len(upload.errors) > 0 - elif len(upload.calcs) > 0: # pylint: disable=E1101 - assert upload.status == 'SUCCESS' - assert upload.current_task == 'cleanup' - assert len(upload.errors) > 0 - for calc in upload.calcs: # pylint: disable=E1101 - assert calc.status == 'FAILURE' - assert calc.current_task == 'parsing' - assert len(calc.errors) > 0 + else: + # there is an empty example with no calcs, even if past parsing_all task + if upload.total_calcs > 0: # pylint: disable=E1101 + assert upload.status == 'SUCCESS' + assert upload.current_task == 'cleanup' + assert len(upload.errors) == 0 + for calc in upload.all_calcs(0, 100): # pylint: disable=E1101 + assert calc.status == 'FAILURE' + assert calc.current_task == 'parsing' + assert len(calc.errors) > 0