diff --git a/nomad/app/v1/routers/uploads.py b/nomad/app/v1/routers/uploads.py index 2d1220f5258e06e1576ece25a58bc8055d1075ae..c2da674ea82e3e036552587d716720454fae8abe 100644 --- a/nomad/app/v1/routers/uploads.py +++ b/nomad/app/v1/routers/uploads.py @@ -1065,7 +1065,7 @@ async def post_upload_action_publish( async def post_upload_action_process( upload_id: str = Path( ..., - description='The unique id of the upload to re-process.'), + description='The unique id of the upload to process.'), user: User = Depends(create_user_dependency(required=True))): ''' Processes an upload, i.e. parses the files and updates the NOMAD archive. Only admins diff --git a/nomad/cli/admin/uploads.py b/nomad/cli/admin/uploads.py index 2bb2812f823701b75fca2966a8788c55c2664ab5..efa1c163f5f8e59ee252d33194a067e5763fa745 100644 --- a/nomad/cli/admin/uploads.py +++ b/nomad/cli/admin/uploads.py @@ -83,7 +83,7 @@ def _run_parallel(uploads, parallel: int, callable, label: str): def _run_processing( - uploads, parallel: int, process, label: str, reprocess_running: bool = False, + uploads, parallel: int, process, label: str, process_running: bool = False, wait_until_complete: bool = True, reset_first: bool = False): from nomad import processing as proc @@ -93,7 +93,7 @@ def _run_processing( 'cli calls %s processing' % label, current_process=upload.current_process, last_status_message=upload.last_status_message, upload_id=upload.upload_id) - if upload.process_running and not reprocess_running: + if upload.process_running and not process_running: logger.warn( 'cannot trigger %s, since the upload is already/still processing' % label, current_process=upload.current_process, @@ -431,13 +431,18 @@ def rm(ctx, uploads, skip_es, skip_mongo, skip_files): @uploads.command(help='Reprocess selected uploads.') @click.argument('UPLOADS', nargs=-1) @click.option('--parallel', default=1, type=int, help='Use the given amount of parallel processes. Default is 1.') -@click.option('--reprocess-running', is_flag=True, help='Also reprocess already running processes.') +@click.option('--process-running', is_flag=True, help='Also reprocess already running processes.') +@click.option('--setting', type=str, multiple=True, help='key=value to overwrite a default reprocess config setting.') @click.pass_context -def re_process(ctx, uploads, parallel: int, reprocess_running: bool): +def process(ctx, uploads, parallel: int, process_running: bool, setting=typing.List[str]): _, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs) + settings: typing.Dict[str, bool] = {} + for settings_str in setting: + key, value = settings_str.split('=') + settings[key] = bool(value) _run_processing( - uploads, parallel, lambda upload: upload.process_upload(), 'processing', - reprocess_running=reprocess_running, reset_first=True) + uploads, parallel, lambda upload: upload.process_upload(reprocess_settings=settings), + 'processing', process_running=process_running, reset_first=True) @uploads.command(help='Repack selected uploads.') diff --git a/nomad/config.py b/nomad/config.py index d8ed6553d3b0d46f3fa26f1cdd4c4f7fc94c65da..bee9c7af056beca185359a265b5b0b8d502b484f 100644 --- a/nomad/config.py +++ b/nomad/config.py @@ -326,10 +326,10 @@ reprocess = NomadConfig( # Configures standard behaviour when reprocessing. # Note, the settings only matter for published uploads and entries. For uploads in # staging, we always reparse, add newfound entries, and delete unmatched entries. - reparse_published_if_parser_unchanged=True, - reparse_published_if_parser_changed=True, - reparse_with_changed_parser=True, - add_newfound_entries_to_published=True, + rematch_published=True, + reprocess_existing_entries=True, + use_original_parser=False, + add_matched_entries_to_published=True, delete_unmatched_published_entries=False ) @@ -357,11 +357,11 @@ bundle_import = NomadConfig( # When importing with trigger_processing=True, the settings below control the # initial processing behaviour (see the config for `reprocess` for more info). - reparse_published_if_parser_unchanged=True, - reparse_published_if_parser_changed=True, - reparse_with_changed_parser=True, - add_newfound_entries_to_published=True, - delete_unmatched_published_entries=True + rematch_published=True, + reprocess_existing_entries=True, + use_original_parser=False, + add_matched_entries_to_published=True, + delete_unmatched_published_entries=False ) ) diff --git a/nomad/processing/data.py b/nomad/processing/data.py index dc25671b27b4dbbbd509161ce8e77c76bd84dcf3..dc283fe8c1201282d7c3c275d53c7c926556efd1 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -449,6 +449,8 @@ class Calc(Proc): if self.upload is None: logger.error('calculation upload does not exist') + settings = config.reprocess.customize(reprocess_settings) # Add default settings + # 1. Determine if we should parse or not self.set_last_status_message('Determining action') # If this entry has been processed before, or imported from a bundle, nomad_version @@ -456,41 +458,29 @@ class Calc(Proc): self._is_initial_processing = self.nomad_version is None if not self.upload.published or self._is_initial_processing: should_parse = True - else: - # This entry has already been published and has metadata. - # Determine if we should reparse or keep it. + elif not settings.reprocess_existing_entries: should_parse = False - settings = config.reprocess.customize(reprocess_settings) # Add default settings - reparse_if_parser_unchanged = settings.reparse_published_if_parser_unchanged - reparse_if_parser_changed = settings.reparse_published_if_parser_changed - reparse_with_changed_parser = settings.reparse_with_changed_parser - if reparse_if_parser_unchanged or reparse_if_parser_changed: + else: + if settings.rematch_published and not settings.use_original_parser: with utils.timer(logger, 'parser matching executed'): parser = match_parser( self.upload_files.raw_file_object(self.mainfile).os_path, strict=False) - if parser is None: - # Should only be possible if the upload is published and we have - # settings.delete_unmatched_published_entries == False - logger.warn('no parser matches during re-process, not updating the entry') - self.warnings = ['no matching parser found during processing'] - else: - parser_changed = self.parser_name != parser.name and parser_dict[self.parser_name].name != parser.name - if reparse_if_parser_unchanged and not parser_changed: - should_parse = True - elif reparse_if_parser_changed and parser_changed: - should_parse = True - if should_parse and self.parser_name != parser.name: - if parser_dict[self.parser_name].name == parser.name: - logger.info( - 'parser renamed, using new parser name', - parser=parser.name) - self.parser_name = parser.name # Parser renamed - else: - if not reparse_with_changed_parser: - self.parser_name = parser.name # Parser changed - logger.info( - 'different parser matches during re-process, use new parser', - parser=parser.name) + else: + parser = parser_dict[self.parser_name] + + if parser is None: + # Should only be possible if the upload is published and we have + logger.warn('no parser matches during process, not updating the entry') + self.warnings = ['no matching parser found during processing'] + else: + should_parse = True + parser_changed = self.parser_name != parser.name and parser_dict[self.parser_name].name != parser.name + if parser_changed: + if not settings.use_original_parser: + logger.info( + 'different parser matches during process, use new parser', + parser=parser.name) + self.parser_name = parser.name # Parser renamed # 2. Either parse the entry, or preserve it as it is. if should_parse: @@ -515,7 +505,7 @@ class Calc(Proc): self._parser_results.m_resource.unload() except Exception as e: logger.error('could not unload processing results', exc_info=e) - else: + elif self.upload.published: # 2b. Keep published entry as it is self.set_last_status_message('Preserving entry data') try: @@ -526,7 +516,9 @@ class Calc(Proc): except Exception as e: logger.error('could not copy archive for non-reprocessed entry', exc_info=e) raise - return + else: + # 2b. Keep staging entry as it is + pass def on_fail(self): # in case of failure, index a minimum set of metadata and mark @@ -1208,84 +1200,64 @@ class Upload(Proc): self.set_last_status_message('Parsing all files') logger = self.get_logger() - with utils.timer(logger, 'calcs processing called'): - try: - settings = config.reprocess.customize(reprocess_settings) # Add default settings - - old_entries = Calc.objects(upload_id=self.upload_id) - has_old_entries = old_entries.count() > 0 - matched_entries: Set[str] = set() - entries_to_delete: List[str] = [] - count_already_processing = 0 - for filename, parser in self.match_mainfiles(): - calc_id = generate_entry_id(self.upload_id, filename) + try: + settings = config.reprocess.customize(reprocess_settings) # Add default settings - try: - entry = Calc.get(calc_id) - # Matching entry already exists. - if entry.process_running: - count_already_processing += 1 - # Ensure that we update the parser if in staging - if not self.published and parser.name != entry.parser_name: - entry.parser_name = parser.name - entry.save() - matched_entries.add(calc_id) - except KeyError: - # No existing entry found - if not self.published or settings.add_newfound_entries_to_published: - entry = Calc.create( - calc_id=calc_id, - mainfile=filename, - parser_name=parser.name, - worker_hostname=self.worker_hostname, - upload_id=self.upload_id) - entry.save() - matched_entries.add(calc_id) - # Done matching. Examine old unmatched entries. - for entry in old_entries: - if entry.calc_id not in matched_entries: - if entry.process_running: - count_already_processing += 1 + if not self.published or settings.rematch_published: + with utils.timer(logger, 'matching completed'): + old_entries = set([entry.calc_id for entry in Calc.objects(upload_id=self.upload_id)]) + for filename, parser in self.match_mainfiles(): + calc_id = generate_entry_id(self.upload_id, filename) + + try: + entry = Calc.get(calc_id) + # Matching entry already exists. + # Ensure that we update the parser if in staging + if not self.published and parser.name != entry.parser_name: + entry.parser_name = parser.name + entry.save() + + old_entries.remove(calc_id) + except KeyError: + # No existing entry found + if not self.published or settings.add_matched_entries_to_published: + entry = Calc.create( + calc_id=calc_id, + mainfile=filename, + parser_name=parser.name, + worker_hostname=self.worker_hostname, + upload_id=self.upload_id) + entry.save() + + # Delete old entries + if len(old_entries) > 0: + entries_to_delete: List[str] = list(old_entries) + logger.warn('Some entries did not match', count=len(entries_to_delete)) if not self.published or settings.delete_unmatched_published_entries: - entries_to_delete.append(entry.calc_id) - - # Delete entries - if entries_to_delete: - logger.warn( - 'Some entries are disappearing', - count=len(entries_to_delete)) - delete_partial_archives_from_mongo(entries_to_delete) - for calc_id in entries_to_delete: - search.delete_entry(entry_id=calc_id, refresh=True, update_materials=True) - entry = Calc.get(calc_id) - entry.delete() - - if has_old_entries: - # Reset all entries on upload - with utils.timer(logger, 'calcs resetted'): - if count_already_processing > 0: - logger.warn( - 'processes are still/already running some entries, they have been resetted', - count=count_already_processing) - - # reset all calcs - Calc._get_collection().update_many( - dict(upload_id=self.upload_id), - {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)}) - - with utils.timer(logger, 'calcs processing called'): - # process call calcs - Calc.process_all( - Calc.process_calc, dict(upload_id=self.upload_id), - process_kwargs=dict(reprocess_settings=settings)) - logger.info('completed to trigger process of all calcs') + delete_partial_archives_from_mongo(entries_to_delete) + for calc_id in entries_to_delete: + search.delete_entry(entry_id=calc_id, refresh=True, update_materials=True) + entry = Calc.get(calc_id) + entry.delete() + + # reset all calcs + with utils.timer(logger, 'calcs processing resetted'): + Calc._get_collection().update_many( + dict(upload_id=self.upload_id), + {'$set': Calc.reset_pymongo_update(worker_hostname=self.worker_hostname)}) + + # process call calcs + with utils.timer(logger, 'calcs processing called'): + Calc.process_all( + Calc.process_calc, dict(upload_id=self.upload_id), + process_kwargs=dict(reprocess_settings=settings)) - except Exception as e: - # try to remove the staging copy in failure case - logger.error('failed to trigger processing of all entries', exc_info=e) - if self.published: - self._cleanup_staging_files() - raise + except Exception as e: + # try to remove the staging copy in failure case + logger.error('failed to trigger processing of all entries', exc_info=e) + if self.published: + self._cleanup_staging_files() + raise def check_join(self): ''' diff --git a/ops/helm/nomad/templates/nomad-configmap.yml b/ops/helm/nomad/templates/nomad-configmap.yml index 62cbb02c53fd4e2f8cca33f1ec1a98c212ec26f5..e1fe129d115c9d5a9a4e222023b3696371eb866d 100644 --- a/ops/helm/nomad/templates/nomad-configmap.yml +++ b/ops/helm/nomad/templates/nomad-configmap.yml @@ -22,17 +22,18 @@ data: encyclopedia_enabled: {{ .Values.encyclopedia.enabled }} aitoolkit_enabled: {{ .Values.aitoolkit.enabled }} reprocess: - reparse_published_if_parser_unchanged: "{{ .Values.reprocess.reparsePublishedIfParserUnchanged }}" - reparse_published_if_parser_changed: "{{ .Values.reprocess.reparsePublishedIfParserChanged }}" - reparse_with_changed_parser: "{{ .Values.reprocess.reparseWithChangedParser }}" - add_newfound_entries_to_published: "{{ .Values.reprocess.addNewfoundEntriesToPublished }}" - delete_unmatched_published_entries: "{{ .Values.reprocess.deleteUnmatchedPublishedEntries }}" + rematch_published: {{ .Values.reprocess.rematchPublished }} + reprocess_existing_entries: {{ .Values.reprocess.reprocessExistingEntries }} + use_original_parser: {{ .Values.reprocess.useOriginalParser }} + add_matched_entries_to_published: {{ .Values.reprocess.addMatchedEntriesToPublished }} + delete_unmatched_published_entries: {{ .Values.reprocess.deleteUnmatchedPublishedEntries }} beta: label: "{{ .Values.version.label }}" isBeta: {{ .Values.version.isBeta }} isTest: {{ .Values.version.isTest }} usesBetaData: {{ .Values.version.usesBetaData }} officialUrl: "{{ .Values.version.officialUrl }}" + process_reuse_parser: {{ .Values.processReuseParser }} fs: tmp: ".volumes/fs/staging/tmp" prefix_size: {{ .Values.volumes.prefixSize }} diff --git a/ops/helm/nomad/values.yaml b/ops/helm/nomad/values.yaml index 82e7dffb736d01cc7af7bb4402191bac7be9a969..906af73e9a0bb1bf4e0379e3eadf6b788b22acb1 100644 --- a/ops/helm/nomad/values.yaml +++ b/ops/helm/nomad/values.yaml @@ -166,11 +166,11 @@ volumes: springerDbPath: /nomad/fairdi/db/data/springer.msg reprocess: - reparsePublishedIfParserUnchanged: true - reparsePublishedIfParserChanged: true - reparseWithChangedParser: true - addNewfoundEntriesToPublished: true - deleteUnmatchedPublishedEntries: false + rematchPublished: true + reprocessExistingEntries: true + useOriginalParser: false + addMatchedEntriesToPublished: false + deleteUnmatchedPublishedEntries: false processReuseParser: true diff --git a/tests/processing/test_data.py b/tests/processing/test_data.py index 513cc7775dac6af9b3088d33c823b08427078f7c..95e549f27a2688087962d1dc0dd136e2f1fd2189 100644 --- a/tests/processing/test_data.py +++ b/tests/processing/test_data.py @@ -323,7 +323,7 @@ def test_process_non_existing(proc_infra, test_user, with_error): @pytest.mark.parametrize('with_failure', [None, 'before', 'after', 'not-matched']) def test_re_processing(published: Upload, internal_example_user_metadata, monkeypatch, tmp, with_failure): if with_failure == 'not-matched': - monkeypatch.setattr('nomad.config.reprocess.delete_unmatched_published_entries', False) + monkeypatch.setattr('nomad.config.reprocess.use_original_parser', True) if with_failure == 'before': calc = published.all_calcs(0, 1)[0] @@ -411,10 +411,8 @@ def test_re_processing(published: Upload, internal_example_user_metadata, monkey assert_processing(Upload.get(published.upload_id, include_published=True), published=True) # assert changed calc data - if with_failure not in ['after', 'not-matched']: + if with_failure not in ['after']: assert archive.results.material.elements[0] == 'H' - elif with_failure == 'not-matched': - assert archive.results.material.elements[0] == 'Si' else: assert archive.results is None diff --git a/tests/test_cli.py b/tests/test_cli.py index 942cc18cf3f6a7fcd6c129f0a36b959b192f5255..b4925916a525922039a4c70b533e87a260a867f3 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -217,7 +217,7 @@ class TestAdminUploads: assert calc.nomad_version != 'test_version' result = invoke_cli( - cli, ['admin', 'uploads', 're-process', '--parallel', '2', upload_id], catch_exceptions=False) + cli, ['admin', 'uploads', 'process', '--parallel', '2', upload_id], catch_exceptions=False) assert result.exit_code == 0 assert 'processing' in result.stdout