From ff046860eba421f1050adf7236fae292f48f9c47 Mon Sep 17 00:00:00 2001 From: David Sikter <david.sikter@physik.hu-berlin.de> Date: Thu, 17 Feb 2022 14:32:17 +0100 Subject: [PATCH] Added functionality for local processing to the PUT uploads/{upload_id}/raw/{path} endpoint --- nomad/app/v1/routers/entries.py | 23 ++-- nomad/app/v1/routers/uploads.py | 213 +++++++++++++++++++++++++------- nomad/processing/base.py | 119 ++++++++++++++++-- nomad/processing/data.py | 75 ++++++++++- 4 files changed, 362 insertions(+), 68 deletions(-) diff --git a/nomad/app/v1/routers/entries.py b/nomad/app/v1/routers/entries.py index a3bf7bdac4..849d4d2e67 100644 --- a/nomad/app/v1/routers/entries.py +++ b/nomad/app/v1/routers/entries.py @@ -1028,20 +1028,23 @@ async def get_entry_raw_file( return StreamingResponse(raw_file_content, media_type=mime_type) -def answer_entry_archive_request(query: Dict[str, Any], required: ArchiveRequired, user: User): +def answer_entry_archive_request( + query: Dict[str, Any], required: ArchiveRequired, user: User, entry_metadata=None): required_reader = _validate_required(required) - response = perform_search( - owner=Owner.visible, query=query, - required=MetadataRequired(include=['entry_id', 'upload_id', 'parser_name']), - user_id=user.user_id if user is not None else None) + if not entry_metadata: + response = perform_search( + owner=Owner.visible, query=query, + required=MetadataRequired(include=['entry_id', 'upload_id', 'parser_name']), + user_id=user.user_id if user is not None else None) - if response.pagination.total == 0: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail='The entry does not exist or is not visible to you.') + if response.pagination.total == 0: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail='The entry does not exist or is not visible to you.') + + entry_metadata = response.data[0] - entry_metadata = response.data[0] entry_id = entry_metadata['entry_id'] uploads = _Uploads() diff --git a/nomad/app/v1/routers/uploads.py b/nomad/app/v1/routers/uploads.py index 7c965f12b2..de9d994198 100644 --- a/nomad/app/v1/routers/uploads.py +++ b/nomad/app/v1/routers/uploads.py @@ -278,6 +278,23 @@ class RawDirResponse(BaseModel): pagination: Optional[PaginationResponse] = Field() +class PutRawFileResults(BaseModel): + entry_id: Optional[str] = Field() + parser_name: Optional[str] = Field() + entry: Optional[EntryProcData] = Field() + archive: Optional[Dict[str, Any]] = Field() + + +class PutRawFileResponse(BaseModel): + upload_id: str = Field(None, description=strip(''' + Unique id of the upload.''')) + data: UploadProcData = Field(None, description=strip(''' + The upload data as a dictionary.''')) + results: Optional[PutRawFileResults] = Field(None, description=strip(''' + The results (generated entry and [optionally] archive) of processing the file. + If the file does not result in an entry, this will be left empty.''')) + + class UploadCommandExamplesResponse(BaseModel): upload_url: str = Field() upload_command: str = Field() @@ -330,15 +347,28 @@ _upload_or_path_not_found = status.HTTP_404_NOT_FOUND, { The specified upload, or a resource with the specified path within the upload, could not be found.''')} -_upload_response = 200, { +_post_upload_response = 200, { 'model': UploadProcDataResponse, 'content': { 'application/json': {}, 'text/plain': {'example': 'Thanks for uploading your data to nomad.'} }, 'description': strip(''' - A json structure with upload data, if the request headers specifies - `Accept = application/json`, otherwise a plain text information string.''')} + A json structure with upload data, or a plain text information string. + It will be a json structure if the request headers specifies `Accept = application/json`.''')} + +_put_raw_file_response = 200, { + 'model': PutRawFileResponse, + 'content': { + 'application/json': {}, + 'text/plain': {'example': 'Thanks for uploading your data to nomad.'} + }, + 'description': strip(''' + A json structure with upload data and possibly the result of processing, + or a plain text information string. + It will be a json structure if the request headers specifies `Accept = application/json` + or if `wait_for_results` is set.''')} + _raw_path_response = 200, { 'content': { @@ -560,22 +590,14 @@ async def get_upload_entry( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=strip(''' An entry by that id could not be found in the specified upload.''')) - # load entries's metadata from search - metadata_entries = search( - pagination=MetadataPagination(page_size=1), - owner='admin' if user.is_admin else 'visible', - user_id=user.user_id, - query=dict(entry_id=entry.entry_id)) - data = _entry_to_pydantic(entry) - if len(metadata_entries.data) == 1: - data.entry_metadata = metadata_entries.data[0] + data = _entry_to_pydantic(entry, add_es_metadata=True, user=user) return EntryProcDataResponse(entry_id=entry_id, data=data) @router.get( '/{upload_id}/rawdir/{path:path}', tags=[raw_tag], - summary='Get the raw files and folders metadata for a given upload and path.', + summary='Get the metadata for the raw file or folder located at the specified path in the specified upload.', response_model=RawDirResponse, responses=create_responses(_upload_or_path_not_found, _not_authorized_to_upload, _bad_request), response_model_exclude_unset=True, @@ -669,7 +691,7 @@ async def get_upload_rawdir_path( @router.get( '/{upload_id}/raw/{path:path}', tags=[raw_tag], - summary='Get the raw files and folders for a given upload and path.', + summary='Get the raw file or folder located at the specified path in the specified upload.', response_class=StreamingResponse, responses=create_responses( _raw_path_response, _upload_or_path_not_found, _not_authorized_to_upload, _bad_request), @@ -778,10 +800,10 @@ async def get_upload_raw_path( @router.put( '/{upload_id}/raw/{path:path}', tags=[raw_tag], - summary='Put (add or replace) files to an upload at the specified path.', + summary='Put a raw file to the directory specified by path in the specified upload.', response_class=StreamingResponse, responses=create_responses( - _upload_response, _upload_not_found, _not_authorized_to_upload, _bad_request), + _put_raw_file_response, _upload_not_found, _not_authorized_to_upload, _bad_request), response_model_exclude_unset=True, response_model_exclude_none=True) async def put_upload_raw_path( @@ -801,16 +823,34 @@ async def put_upload_raw_path( None, description=strip(''' Specifies the name of the file, when using method 2.''')), + wait_for_results: bool = FastApiQuery( + False, + description=strip(''' + Waits for the processing to complete and return the results in the response (**USE WITH CARE**).''')), + include_archive: bool = FastApiQuery( + False, + description=strip(''' + If the archive data should be included in the response when using `wait_for_results` (**USE WITH CARE**).''')), user: User = Depends(create_user_dependency(required=True, upload_token_auth_allowed=True))): ''' - Upload files to an already existing upload (identified by upload_id). The files are - *merged* with the existing files, i.e. new files are added, if there is a collision + Uploads a file to a specified path within the upload identified by `upload_id`. + If the file is a zip or tar archive, it will first be extracted, and the content will be + *merged* into the path, i.e. new files are added, and if there is a collision (an old file with the same path and name as one of the new files), the old file will - be overwritten, but the rest of the old files will remain untouched. + be overwritten, but the rest of the old files will remain untouched. If the file is not + a zip or tar archive, the file will just be uploaded as it is, overwriting the existing + file if there is one. The `path` is interpreted as a directory. The empty string gives the "root" directory. - If the file is a zip or tar archive, it will first be extracted, then merged. + If a single file is uploaded (i.e. not a zip or tar archive), it is possible to specify + `wait_for_results`. This means that the file (and only this file) will be matched and + processed, and the result will be returned with the response. **NOTE**: this should be + used with caution! When this option is set, the call will block until processing is complete, + which may take some time. Also note, that just processing the new/modified file may not be + enough in some cases (since adding/modifying a file somewhere in the directory structure + may affect other entries). Also note that results.entry.entry_metadata will not be populated + in the response. There are two basic ways to upload a file: in the multipart-formdata or streaming the file data in the http body. Both are supported. Note, however, that the second method @@ -820,6 +860,11 @@ async def put_upload_raw_path( matter since they are extracted). See the POST `uploads` endpoint for examples of curl commands for uploading files. ''' + if include_archive and not wait_for_results: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail='`include_archive` requires `wait_for_results`.') + upload = _get_upload_with_write_access(upload_id, user, include_published=False) if not is_safe_relative_path(path): @@ -835,38 +880,99 @@ async def put_upload_raw_path( status_code=status.HTTP_400_BAD_REQUEST, detail='No upload file provided.') - if files.zipfile.is_zipfile(upload_path) or files.tarfile.is_tarfile(upload_path): - # Uploading an compressed file -> reprocess the entire target directory - path_filter = path - else: - # Uploading a single file -> reprocess only the file - path_filter = os.path.join(path, os.path.basename(upload_path)) + is_compressed = files.zipfile.is_zipfile(upload_path) or files.tarfile.is_tarfile(upload_path) - try: - upload.process_upload( - file_operation=dict(op='ADD', path=upload_path, target_dir=path, temporary=(method != 0)), - path_filter=path_filter) - except ProcessAlreadyRunning: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail='The upload is currently blocked by another process.') + if not wait_for_results: + # Process on worker (normal case) + if is_compressed: + # Uploading an compressed file -> reprocess the entire target directory + path_filter = path + else: + # Uploading a single file -> reprocess only the file + path_filter = os.path.join(path, os.path.basename(upload_path)) - if request.headers.get('Accept') == 'application/json': - upload_proc_data_response = UploadProcDataResponse( - upload_id=upload_id, - data=_upload_to_pydantic(upload)) - response_text = upload_proc_data_response.json() - media_type = 'application/json' + try: + upload.process_upload( + file_operation=dict(op='ADD', path=upload_path, target_dir=path, temporary=(method != 0)), + path_filter=path_filter) + except ProcessAlreadyRunning: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail='The upload is currently blocked by another process.') + + if request.headers.get('Accept') == 'application/json': + response = PutRawFileResponse( + upload_id=upload_id, + data=_upload_to_pydantic(upload)) + response_text = response.json() + media_type = 'application/json' + else: + response_text = _thank_you_message + media_type = 'text/plain' else: - response_text = _thank_you_message - media_type = 'text/plain' + # Process locally + if is_compressed: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail='`wait_for_results` can only be used with single files, not with compressed files.') + + full_path = os.path.join(path, os.path.basename(upload_path)) + try: + entry = upload.put_file_and_process_local(upload_path, path) + if upload.process_status == ProcessStatus.FAILURE: + # Should only happen if we fail to put the file, match the file, or to *initiate* + # entry processing - i.e. normally, this shouldn't happen. + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f'Failed to put and process: {upload.errors[0]}') + + archive = None + if entry and entry.process_status == ProcessStatus.SUCCESS and include_archive: + # NOTE: We can't rely on ES to get the metadata for the entry, since it may + # not have hade enough time to update its index etc. For now, we will just + # ignore this, as we do not need it. + entry_metadata = dict( + upload_id=upload_id, + entry_id=entry.entry_id, + parser_name=entry.parser_name) + archive = answer_entry_archive_request( + dict(upload_id=upload_id, mainfile=full_path), + required='*', user=user, + entry_metadata=entry_metadata)['data']['archive'] + + response = PutRawFileResponse( + upload_id=upload_id, + data=_upload_to_pydantic(upload), + results=PutRawFileResults( + entry_id=entry.entry_id if entry else None, + parser_name=entry.parser_name if entry else None, + entry=_entry_to_pydantic(entry) if entry else None, + archive=archive)) + response_text = response.json() + media_type = 'application/json' + + except HTTPException: + raise + except ProcessAlreadyRunning: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail='The upload is currently being processed, operation not allowed.') + except Exception as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f'Unexpected exception occurred: {e}') + finally: + try: + shutil.rmtree(os.path.dirname(upload_path)) + except Exception: + pass return StreamingResponse(create_stream_from_string(response_text), media_type=media_type) @router.delete( '/{upload_id}/raw/{path:path}', tags=[raw_tag], - summary='Delete file or folder located at the specified path in the specified upload.', + summary='Delete the raw file or folder located at the specified path in the specified upload.', response_model=UploadProcDataResponse, responses=create_responses(_upload_not_found, _not_authorized_to_upload, _bad_request), response_model_exclude_unset=True, @@ -962,7 +1068,7 @@ async def get_upload_entry_archive( '', tags=[default_tag], summary='Submit a new upload', response_class=StreamingResponse, - responses=create_responses(_upload_response, _not_authorized, _bad_request), + responses=create_responses(_post_upload_response, _not_authorized, _bad_request), response_model_exclude_unset=True, response_model_exclude_none=True) async def post_upload( @@ -1652,9 +1758,22 @@ def _upload_to_pydantic(upload: Upload) -> UploadProcData: return pydantic_upload -def _entry_to_pydantic(entry: Entry) -> EntryProcData: - ''' Converts the mongo db object to an EntryProcData object''' - return EntryProcData.from_orm(entry) +def _entry_to_pydantic(entry: Entry, add_es_metadata: bool = False, user=None) -> EntryProcData: + ''' + Converts the mongo db object to an EntryProcData object, and optionally also adds metadata + from ES + ''' + rv = EntryProcData.from_orm(entry) + if add_es_metadata: + # load entries's metadata from search + metadata_entries = search( + pagination=MetadataPagination(page_size=1), + owner='admin' if user.is_admin else 'visible', + user_id=user.user_id, + query=dict(entry_id=entry.entry_id)) + if len(metadata_entries.data) == 1: + rv.entry_metadata = metadata_entries.data[0] + return rv def _check_upload_not_processing(upload: Upload): diff --git a/nomad/processing/base.py b/nomad/processing/base.py index fd2e5df290..7b9e29321c 100644 --- a/nomad/processing/base.py +++ b/nomad/processing/base.py @@ -530,7 +530,7 @@ class Proc(Document): is running or has been added to the queue (blocking processes prevent further requests to be queued until they have been completed). - This is the first of two *sync operations*. These should be atomic and occur in + This is the first of three *sync operations*. These should be atomic and occur in sequence, each call fully seeing the relevant changes of the previous operation. Because of propagation delays and possible race conditions, we use `sync_counter` to detect update collisions. Such collisions should be very unusual, but if they occur @@ -572,7 +572,7 @@ class Proc(Document): ] }, mongo_update) try_counter += 1 - if old_record and old_record['sync_counter'] == self.sync_counter: + if old_record and old_record.get('sync_counter') == self.sync_counter: # We have successfully scheduled the process! self.reload() return not prev_process_running @@ -584,11 +584,61 @@ class Proc(Document): time.sleep(0.1) self.reload() + def _sync_start_local_process(self, func_name: str): + ''' + Used to start a *local* process. If successful, the status transitions to RUNNING + atomically. The call will fail and raise a :class:`ProcessAlreadyRunning` if any + other process is currently running. + + This is one of three *sync operations*. See :func:`_sync_schedule_process` + for more info. + ''' + try_counter = 0 + while True: + if self.process_running: + raise ProcessAlreadyRunning('Another process is running or waiting to run') + mongo_update = { + '$set': dict( + sync_counter=self.sync_counter + 1, + process_status=ProcessStatus.RUNNING, + current_process=func_name, + last_status_message='Started: ' + func_name, + worker_hostname=None, + celery_task_id=None, + errors=[], + warnings=[])} + # Try to update self atomically. Will fail if someone else has managed to write + # a sync op in between. + old_record = self._get_collection().find_one_and_update( + { + '$and': [ + {'_id': self.id}, + { + '$or': [ + {'sync_counter': self.sync_counter}, + {'sync_counter': {'$exists': False}} + ] + } + ] + }, mongo_update) + try_counter += 1 + if old_record and old_record.get('sync_counter') == self.sync_counter: + # We have successfully started the process! + self.reload() + return + # Someone else must have written a sync op (ticked up the sync_counter) in between + if try_counter >= 3: + # Three failed attempts - should be virtually impossible! + raise ProcessSyncFailure('Failed to start local process too many times - should not happen') + # Otherwise, sleep, reload, and try again + time.sleep(0.1) + self.reload() + def _sync_complete_process(self) -> Tuple[str, List[Any], Dict[str, Any]]: ''' - Used to complete a process (when done, successful or not). Should only be invoked - by a celery worker. Returns a triple containing information about the next process - to run (if any), of the form (func_name, args, kwargs). + Used to complete a process (when done, successful or not). Returns a triple + containing information about the next process to run (if any), of the + form (func_name, args, kwargs). There are 3 possibilities: 1) There is something in the queue, and the current process was successful @@ -598,7 +648,7 @@ class Proc(Document): 3) There is nothing in the queue: -> We set the status to the provided value and return None - This is one of two *sync operations*. See :func:`_sync_schedule_process` + This is one of three *sync operations*. See :func:`_sync_schedule_process` for more info. ''' assert self.process_status in ProcessStatus.STATUSES_COMPLETED @@ -632,7 +682,7 @@ class Proc(Document): old_record = self._get_collection().find_one_and_update( {'_id': self.id, 'sync_counter': self.sync_counter}, mongo_update) try_counter += 1 - if old_record and old_record['sync_counter'] == self.sync_counter: + if old_record and old_record.get('sync_counter') == self.sync_counter: # We have successfully completed the process return next_process # Someone else must have written a sync op (ticked up the sync_counter) in between @@ -927,3 +977,58 @@ def process(is_blocking: bool = False, is_child: bool = False): setattr(wrapper, '__is_child', is_child) return wrapper return process_decorator + + +def process_local(func): + ''' + The decorator for functions that process locally. These work similarly to functions + marked with the `@process` decorator, but they are executed directly, in the current + thread, not via celery. Consequently, they can only be started if no other process is + running. They are also implicitly blocking, i.e. while running, no other process + (local or celery-based) can be started or scheduled. If successful, a local process can + return a value to the caller (unlike celery processes). Invoking a local process should + only throw exceptions if it was not possible to start the process (because some other + process is running). Any other errors that happen during the running of the process are + handled in the usual way, by setting self.errors etc. The Proc object should not have + any unsaved changes when a local process is invoked. + ''' + # Determine canonical class name + cls_name, func_name = func.__qualname__.split('.') + all_blocking_processes[cls_name].append(func_name) + + def wrapper(self: Proc, *args, **kwargs): + logger = self.get_logger() + logger.debug('Executing local process') + self._sync_start_local_process(func_name) + + try: + os.chdir(config.fs.working_directory) + with utils.timer(logger, 'process executed locally', log_memory=True): + # Actually call the process function + rv = func(self, *args, **kwargs) + if self.errors: + # Should be impossible unless the process has tampered with self.errors, which + # it should not do. We will treat it essentially as if it had raised an exception + self.fail('completed with errors but no exception, should not happen', complete=False) + else: + # All looks good + self.on_success() + self.process_status = ProcessStatus.SUCCESS + self.complete_time = datetime.utcnow() + if self.warnings: + self.last_status_message = f'Process {func_name} completed with warnings' + else: + self.last_status_message = f'Process {func_name} completed successfully' + logger.info('completed process') + return rv + except SystemExit as e: + self.fail(e, complete=False) + except ProcessFailure as e: + # Exception with details about how to call self.fail + self.fail(*e._errors, log_level=e._log_level, complete=False, **e._kwargs) + except Exception as e: + self.fail(e, complete=False) + finally: + self._sync_complete_process() # Queue should be empty, so nothing more to do + + return wrapper diff --git a/nomad/processing/data.py b/nomad/processing/data.py index 6ef80caec0..503c62baf7 100644 --- a/nomad/processing/data.py +++ b/nomad/processing/data.py @@ -48,7 +48,7 @@ from nomad.files import ( PathObject, UploadFiles, PublicUploadFiles, StagingUploadFiles, UploadBundle, create_tmp_dir, is_safe_relative_path) from nomad.processing.base import ( - Proc, process, ProcessStatus, ProcessFailure, ProcessAlreadyRunning, worker_hostname) + Proc, process, process_local, ProcessStatus, ProcessFailure, ProcessAlreadyRunning, worker_hostname) from nomad.parsing import Parser from nomad.parsing.parsers import parser_dict, match_parser from nomad.normalizing import normalizers @@ -906,6 +906,14 @@ class Entry(Proc): @process(is_child=True) def process_entry(self): ''' Processes or reprocesses an entry. ''' + self._process_entry_local() + + @process_local + def process_entry_local(self): + ''' Processes or reprocesses an entry locally. ''' + self._process_entry_local() + + def _process_entry_local(self): logger = self.get_logger() if self.upload is None: logger.error('upload does not exist') @@ -947,7 +955,7 @@ class Entry(Proc): self.parser_name = parser.name # Parser renamed else: should_parse = False - logger.error('could not determine a perser for this entry') + logger.error('could not determine a parser for this entry') self.errors = ['could not determine a parser for this entry'] if should_parse: @@ -1064,7 +1072,7 @@ class Entry(Proc): with utils.timer(logger, 'normalizer executed', input_size=self.mainfile_file.size): try: normalizer(self._parser_results).normalize(logger=logger) - logger.info('normalizer completed successfull', **context) + logger.info('normalizer completed successfully', **context) except Exception as e: raise ProcessFailure('normalizer failed with exception', exc_info=e, error=str(e), **context) @@ -1477,6 +1485,65 @@ class Upload(Proc): else: self.cleanup() + @process_local + def put_file_and_process_local(self, path, target_dir, reprocess_settings: Dict[str, Any] = None) -> Entry: + ''' + Pushes a raw file, matches it, and if matched, runs the processing - all as a local process. + If the the target path exists, it will be overwritten. If matched, we return the + resulting Entry, otherwise None. + ''' + assert not self.published, 'Upload cannot be published' + assert os.path.isfile(path), '`path` does not specify a file' + assert is_safe_relative_path(target_dir), 'Bad target path provided' + target_path = os.path.join(target_dir, os.path.basename(path)) + staging_upload_files = self.staging_upload_files + if staging_upload_files.raw_path_exists(target_path): + assert staging_upload_files.raw_path_is_file(target_path), 'Target path is a directory' + + self.reprocess_settings = reprocess_settings + + # Push the file + self.set_last_status_message('Putting the file') + staging_upload_files.add_rawfiles(path, target_dir) + + # Match + self.set_last_status_message('Matching') + parser = match_parser(staging_upload_files.raw_file_object(target_path).os_path) + + # Process entry, if matched; remove existing entry if unmatched. + entry: Entry = Entry.objects(upload_id=self.upload_id, mainfile=target_path).first() + if parser: + if entry: + # Entry already exists. Reset it and the parser_name attribute + entry.parser_name = parser.name + entry.reset(force=True) + entry.save() + else: + # Create new entry + entry = Entry.create( + entry_id=generate_entry_id(self.upload_id, target_path), + mainfile=target_path, + parser_name=parser.name, + upload_id=self.upload_id) + # Apply entry level metadata from files, if provided + metadata_handler = MetadataEditRequestHandler( + self.get_logger(), self.main_author_user, staging_upload_files, self.upload_id) + entry_metadata = metadata_handler.get_entry_mongo_metadata(self, entry) + for quantity_name, mongo_value in entry_metadata.items(): + setattr(entry, quantity_name, mongo_value) + entry.save() + # process locally + self.set_last_status_message('Processing') + entry.process_entry_local() + return entry + else: + if entry: + # The new file does not match a parser, but an old entry exists - delete it + delete_partial_archives_from_mongo([entry.entry_id]) + search.delete_entry(entry_id=entry.entry_id, update_materials=True) + entry.delete() + return None + @property def upload_files(self) -> UploadFiles: upload_files_class = StagingUploadFiles if not self.published else PublicUploadFiles @@ -1674,7 +1741,7 @@ class Upload(Proc): for entry_id in entries_to_delete: search.delete_entry(entry_id=entry_id, update_materials=True) entry = Entry.get(entry_id) - entry.delete() + entry.delete() # No entries *should* be processing, but if there are, we reset them to # to minimize problems (should be safe to do so). -- GitLab