diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 78762d940d774bd837f1da98006c0a3c809c8c5a..331b4c32b63cea65952070413e6f79a39637054c 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -1902,6 +1902,7 @@ class Upload(Proc): old_entries.add(entry.entry_id) with utils.timer(logger, 'matching completed'): + entries = [] for mainfile, mainfile_key, parser in self.match_mainfiles(path_filter, updated_files): entry, was_created, metadata_handler = self._get_or_create_entry( mainfile, mainfile_key, parser, @@ -1910,10 +1911,20 @@ class Upload(Proc): metadata_handler=metadata_handler, logger=logger) - if not was_created and entry is not None: + if was_created: + entries.append(entry) + # batched write if memory is an issue + # if len(entries) > 1000: + # Entry.objects.insert(entries) + # entries = [] + elif entry is not None: old_entries.remove(entry.entry_id) - # Delete old entries + with utils.timer(logger, 'storing entry metadata to mongo'): + if entries: + Entry.objects.insert(entries) + + # Delete old entries if len(old_entries) > 0: logger.warn('Some entries did not match', count=len(old_entries)) if not self.published or reprocess_settings.delete_unmatched_published_entries: @@ -1928,7 +1939,7 @@ class Upload(Proc): # to minimize problems (should be safe to do so). if processing_entries: logger.warn('Some entries are processing', count=len(processing_entries)) - with utils.timer(logger, 'processing entries resetted'): + with utils.timer(logger, 'processing entries reset'): Entry._get_collection().update_many( {'_id': {'$in': processing_entries}}, {'$set': Entry.reset_pymongo_update( @@ -1961,7 +1972,7 @@ class Upload(Proc): # No existing entry found if can_create: # Create new entry - entry = Entry.create( + entry = Entry( entry_id=entry_id, mainfile=mainfile, mainfile_key=mainfile_key, @@ -1975,7 +1986,6 @@ class Upload(Proc): entry_metadata = metadata_handler.get_entry_mongo_metadata(self, entry) for quantity_name, mongo_value in entry_metadata.items(): setattr(entry, quantity_name, mongo_value) - entry.save() was_created = True return entry, was_created, metadata_handler @@ -2030,20 +2040,28 @@ class Upload(Proc): for mainfile, mainfile_key, parser in self.match_mainfiles(path, None): # File matched! - entry, _was_created, metadata_handler = self._get_or_create_entry( + entry, was_created, metadata_handler = self._get_or_create_entry( mainfile, mainfile_key, parser, raise_if_exists=not allow_modify or self.published, can_create=not self.published, metadata_handler=metadata_handler, logger=logger) + + if was_created: + entry.save() + if entry: if self.current_process_flags.is_local: # Running locally if entry.process_running: # Should not happen, but if it does happen (which suggests that some jobs # have been interrupted abnormally or the like) we reset it, to avoid problems. - logger.warn('Running locally and entry is already processing, will reset it.', entry_id=entry.entry_id) - entry.reset(force=True, worker_hostname=self.worker_hostname, process_status=ProcessStatus.FAILURE) + logger.warn( + 'Running locally and entry is already processing, will reset it.', + entry_id=entry.entry_id) + entry.reset( + force=True, worker_hostname=self.worker_hostname, + process_status=ProcessStatus.FAILURE) entry.save() # Run also this entry processing locally. If it fails, an exception will be raised. entry.process_entry_local()