diff --git a/nomad/cli/admin/uploads.py b/nomad/cli/admin/uploads.py index 1d4fda87d2fb44115acc0764442cfea73965e9ab..0eb5186f8c38cfcf551f323711fbb050fcfa0ece 100644 --- a/nomad/cli/admin/uploads.py +++ b/nomad/cli/admin/uploads.py @@ -162,10 +162,9 @@ def reset(ctx, uploads, with_calcs): i = 0 for upload in uploads: - if with_calcs: - for calc in proc.Calc.objects(upload_id=upload.upload_id): - calc.reset() - calc.save() + proc.Calc._get_collection().update_many( + dict(upload_id=upload.upload_id), + {'$set': proc.Calc.reset_pymongo_update()}) upload.reset() upload.save() diff --git a/nomad/processing/base.py b/nomad/processing/base.py index aa73bc67d6426ae85a4b4ac2c4db21c602c5ee4b..35aecadbb9ac21fcfb11c4fbedb7243f2c3ce3a8 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -166,8 +166,8 @@ class Proc(Document, metaclass=ProcMetaclass): """ Returns True of an asynchrounous process is currently running. """ return self.process_status is not None and self.process_status != PROCESS_COMPLETED - @staticmethod - def process_running_mongoengine_query(): + @classmethod + def process_running_mongoengine_query(cls): """ Returns a mongoengine query dict (to be used in objects) to find running processes. """ return dict(process_status__in=[PROCESS_CALLED, PROCESS_RUNNING]) @@ -203,8 +203,8 @@ class Proc(Document, metaclass=ProcMetaclass): self.warnings = [] self.worker_hostname = worker_hostname - @staticmethod - def reset_pymongo_update(worker_hostname: str = None): + @classmethod + def reset_pymongo_update(cls, worker_hostname: str = None): """ Returns a pymongo update dict part to reset calculations. """ return dict( current_task=None, tasks_status=PENDING, errors=[], warnings=[], diff --git a/nomad/processing/data.py b/nomad/processing/data.py index e3cc9d7fcfb497ffa7b3b04d0e6d468ce0724670..495a9ea57ae47f1cbbc349b0f26aac2090393f77 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -515,9 +515,10 @@ class Upload(Proc): logger = super().get_logger() user = self.uploader user_name = '%s %s' % (user.first_name, user.last_name) + # We are not using 'user_id' because logstash (?) will filter these entries ?! logger = logger.bind( upload_id=self.upload_id, upload_name=self.name, user_name=user_name, - user_id=user.user_id, **kwargs) + user=user.user_id, **kwargs) return logger @classmethod @@ -536,8 +537,7 @@ class Upload(Proc): if 'upload_id' not in kwargs: kwargs.update(upload_id=utils.create_uuid()) - # We are not using 'user_id' because logstash (?) will filter these entries ?! - kwargs.update(user=user.user_id) + kwargs.update(user_id=user.user_id) self = super().create(**kwargs) self._continue_with('uploading') @@ -648,8 +648,6 @@ class Upload(Proc): logger = self.get_logger() logger.info('started to re-process') - logger.error('WHAAATTT') - raise Exception('break') # mock the steps of actual processing self._continue_with('uploading') @@ -668,7 +666,7 @@ class Upload(Proc): logger.warn('a process is already running on calc', calc_id=calc.calc_id) # reset all calcs - Calc._get_collection().update( + Calc._get_collection().update_many( dict(upload_id=self.upload_id), {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)}) @@ -886,6 +884,12 @@ class Upload(Proc): self.joined = False super().reset() + @classmethod + def reset_pymongo_update(cls, worker_hostname: str = None): + update = super().reset_pymongo_update() + update.update(joined=False) + return update + def _cleanup_after_processing(self): # send email about process finish user = self.uploader