diff --git a/nomad/app/v1/routers/datasets.py b/nomad/app/v1/routers/datasets.py index fff8ab95d5ac78b439574ede950a4d00f96df483..f0f9248ae822616bc47d9e5b2c81b9d50f96893e 100644 --- a/nomad/app/v1/routers/datasets.py +++ b/nomad/app/v1/routers/datasets.py @@ -40,7 +40,7 @@ from nomad.doi import DOI, DOIException from nomad.search import search, update_by_query from .auth import create_user_dependency -from .entries import _do_exaustive_search +from .entries import _do_exhaustive_search from ..utils import create_responses, parameter_dependency_from_model from ..models import ( Pagination, @@ -177,7 +177,7 @@ Dataset = datamodel.Dataset.m_def.a_pydantic.model def _delete_dataset(user: User, dataset_id, dataset): es_query = cast(Query, {'datasets.dataset_id': dataset_id}) - entries = _do_exaustive_search( + entries = _do_exhaustive_search( owner=Owner.user, query=es_query, user=user, include=['entry_id'] ) entry_ids = [entry['entry_id'] for entry in entries] @@ -406,7 +406,7 @@ async def post_datasets( if es_query is None: empty = True else: - entries = _do_exaustive_search( + entries = _do_exhaustive_search( owner=Owner.user, query=es_query, user=user, include=['entry_id'] ) entry_ids = [entry['entry_id'] for entry in entries] diff --git a/nomad/app/v1/routers/entries.py b/nomad/app/v1/routers/entries.py index 23f0f108b9facf718b7628774a1e79baf23dd60d..e26a2ebc3ed2bf1604b8e0f8392d4f00e134ee22 100644 --- a/nomad/app/v1/routers/entries.py +++ b/nomad/app/v1/routers/entries.py @@ -567,7 +567,7 @@ async def get_entries_metadata( return res -def _do_exaustive_search( +def _do_exhaustive_search( owner: Owner, query: Query, include: List[str], user: User ) -> Iterator[Dict[str, Any]]: page_after_value = None @@ -646,7 +646,7 @@ def _answer_entries_rawdir_request( ): if owner == Owner.all_: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, + status.HTTP_401_UNAUTHORIZED, detail=strip( """ The owner=all is not allowed for this operation as it will search for entries @@ -680,7 +680,7 @@ def _answer_entries_rawdir_request( def _answer_entries_raw_request(owner: Owner, query: Query, files: Files, user: User): if owner == Owner.all_: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, + status.HTTP_401_UNAUTHORIZED, detail=strip( """ The owner=all is not allowed for this operation as it will search for entries @@ -699,7 +699,7 @@ def _answer_entries_raw_request(owner: Owner, query: Query, files: Files, user: if response.pagination.total > config.services.max_entry_download: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='The limit of maximum number of entries in a single download (%d) has been exeeded (%d).' % (config.services.max_entry_download, response.pagination.total), ) @@ -711,7 +711,7 @@ def _answer_entries_raw_request(owner: Owner, query: Query, files: Files, user: # a generator of File objects to create the streamed zip from def download_items_generator(): # go through all entries that match the query - for entry_metadata in _do_exaustive_search( + for entry_metadata in _do_exhaustive_search( owner, query, include=search_includes, user=user ): upload_id = entry_metadata['upload_id'] @@ -913,7 +913,7 @@ async def _answer_entries_archive_request( ): if owner == Owner.all_: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, + status.HTTP_401_UNAUTHORIZED, detail=strip( """The owner=all is not allowed for this operation as it will search for entries that you might now be allowed to access.""" @@ -1034,7 +1034,7 @@ def _answer_entries_archive_download_request( ): if owner == Owner.all_: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, + status.HTTP_401_UNAUTHORIZED, detail=strip( """ The owner=all is not allowed for this operation as it will search for entries @@ -1055,7 +1055,7 @@ def _answer_entries_archive_download_request( if response.pagination.total > config.services.max_entry_download: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=f'The limit of maximum number of entries in a single download' f' ({config.services.max_entry_download}) has been exceeded ({response.pagination.total}).', ) @@ -1068,7 +1068,7 @@ def _answer_entries_archive_download_request( # a generator of StreamedFile objects to create the zipstream from def streamed_files(): # go through all entries that match the query - for entry_metadata in _do_exaustive_search( + for entry_metadata in _do_exhaustive_search( owner, query, include=search_includes, user=user ): path = os.path.join( @@ -1195,7 +1195,7 @@ async def get_entry_metadata( if response.pagination.total == 0: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='The entry with the given id does not exist or is not visible to you.', ) @@ -1231,7 +1231,7 @@ async def get_entry_rawdir( if response.pagination.total == 0: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='The entry with the given id does not exist or is not visible to you.', ) @@ -1268,7 +1268,7 @@ async def get_entry_raw( if response.pagination.total == 0: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='The entry with the given id does not exist or is not visible to you.', ) @@ -1334,7 +1334,7 @@ async def get_entry_raw_file( if response.pagination.total == 0: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='The entry with the given id does not exist or is not visible to you.', ) @@ -1348,7 +1348,7 @@ async def get_entry_raw_file( if not upload_files.raw_path_exists(path): raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='The requested file does not exist.', ) # We only provide a specific mime-type, if the whole file is requested. Otherwise, @@ -1378,7 +1378,7 @@ def answer_entry_archive_request( if response.pagination.total == 0: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='The entry does not exist or is not visible to you.', ) @@ -1437,7 +1437,7 @@ async def post_entry_edit( if response.pagination.total == 0: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='The entry with the given id does not exist or is not visible to you.', ) @@ -1451,13 +1451,13 @@ async def post_entry_edit( if not (is_admin or is_writer): raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, + status.HTTP_401_UNAUTHORIZED, detail='Not enough permissions to execute edit request.', ) if entry_data.get('published', False): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Editing is only allowed for non published entries.', ) @@ -1473,7 +1473,7 @@ async def post_entry_edit( archive_data = yaml.load(f, Loader=yaml.SafeLoader) else: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='The entry mainfile in not in archive format.', ) @@ -1619,7 +1619,7 @@ def edit( entry_ids: List[str] = [] upload_ids: Set[str] = set() with utils.timer(logger, 'edit query executed'): - all_entries = _do_exaustive_search( + all_entries = _do_exhaustive_search( owner=Owner.user, query=query, include=['entry_id', 'upload_id'], user=user ) @@ -1721,7 +1721,7 @@ async def post_entry_metadata_edit( quantity = _editable_quantities.get(action_quantity_name) if quantity is None: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Unknown quantity %s' % action_quantity_name, ) @@ -1731,13 +1731,13 @@ async def post_entry_metadata_edit( if action_quantity_name in ['main_author', 'upload_create_time']: if not user.is_admin(): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Only the admin user can set %s' % quantity.name, ) if isinstance(quantity_actions, list) == quantity.is_scalar: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Wrong shape for quantity %s' % action_quantity_name, ) @@ -1759,7 +1759,7 @@ async def post_entry_metadata_edit( if action_quantity_name == 'with_embargo': raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Updating the embargo flag on entry level is no longer allowed.', ) @@ -1913,4 +1913,4 @@ async def post_entries_edit( raise # A problem which we have handled explicitly. Fastapi does json conversion. except Exception as e: # The upload is processing or some kind of unexpected error has occured - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e)) diff --git a/nomad/app/v1/routers/uploads.py b/nomad/app/v1/routers/uploads.py index f1368d14b56caf82ccd213e0eb4bd5ce4f63d5f8..993e0a241887d41ced6c8721a7b7619fa1a5b8b0 100644 --- a/nomad/app/v1/routers/uploads.py +++ b/nomad/app/v1/routers/uploads.py @@ -106,6 +106,11 @@ bundle_tag = 'uploads/bundle' logger = utils.get_logger(__name__) +async def async_wrapper(content): + for x in content: + yield x + + class UploadRole(str, Enum): main_author = 'main_author' reviewer = 'reviewer' @@ -899,7 +904,7 @@ async def get_upload_entry( entry = upload.get_entry(entry_id) if not entry: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail=strip( """ An entry by that id could not be found in the specified upload.""" @@ -952,7 +957,7 @@ async def get_upload_rawdir_path( upload_files = upload.upload_files if not upload_files.raw_path_exists(path): raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail=strip( """ Not found. Invalid path?""" @@ -1061,7 +1066,7 @@ async def get_upload_raw( upload_files = upload.upload_files if not isinstance(upload_files, PublicUploadFiles): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ Cannot download raw files .zip from non published uploads. Use '/{upload_id}/raw/' instead @@ -1147,7 +1152,7 @@ async def get_upload_raw_path( """ if files_params.compress and (offset != 0 or length != -1): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ Cannot specify `offset` or `length` when `compress` is true""" @@ -1160,7 +1165,7 @@ async def get_upload_raw_path( try: if not upload_files.raw_path_exists(path): raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail=strip( """ Not found. Invalid path?""" @@ -1182,7 +1187,7 @@ async def get_upload_raw_path( else: if offset < 0: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ Invalid offset provided.""" @@ -1190,7 +1195,7 @@ async def get_upload_raw_path( ) if length <= 0 and length != -1: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ Invalid length provided. Should be greater than 0, or -1 if the remainder @@ -1216,7 +1221,7 @@ async def get_upload_raw_path( # Directory if not files_params.compress: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ Path is a directory, `compress` must be set to true""" @@ -1270,63 +1275,45 @@ async def put_upload_raw_path( file: List[UploadFile] = File(None), local_path: str = FastApiQuery( None, - description=strip( - """ - Internal/Admin use only.""" - ), + description=strip("""Internal/Admin use only."""), ), file_name: str = FastApiQuery( None, - description=strip( - """ - Specifies the name of the file, when using method 2.""" - ), + description=strip("""Specifies the name of the file, when using method 2."""), ), overwrite_if_exists: bool = FastApiQuery( True, description=strip( - """ - If set to True (default), overwrites the file if it already exists.""" + """If set to True (default), overwrites the file if it already exists.""" ), ), copy_or_move: str = FastApiQuery( None, description=strip( - """ - If moving or copying a file within the same upload, - specify which operation to do: move or copy""" + """If moving or copying a file within the same upload, specify which operation to do: move or copy""" ), ), copy_or_move_source_path: str = FastApiQuery( None, description=strip( - """ - If moving or copying a file within the same upload, specify the path to the - source file.""" + """If moving or copying a file within the same upload, specify the path to the source file.""" ), ), wait_for_processing: bool = FastApiQuery( False, description=strip( - """ - Waits for the processing to complete and return information about the outcome - in the response (**USE WITH CARE**).""" + """Waits for the processing to complete and return information about the outcome in the response (**USE WITH CARE**).""" ), ), include_archive: bool = FastApiQuery( False, description=strip( - """ - If the archive data should be included in the response when using - `wait_for_processing` (**USE WITH CARE**).""" + """If the archive data should be included in the response when using `wait_for_processing` (**USE WITH CARE**).""" ), ), entry_hash: str = FastApiQuery( None, - description=strip( - """ - The hash code of the not modified entry.""" - ), + description=strip("""The hash code of the not modified entry."""), ), user: User = Depends( create_user_dependency(required=True, upload_token_auth_allowed=True) @@ -1369,28 +1356,25 @@ async def put_upload_raw_path( """ if include_archive and not wait_for_processing: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='`include_archive` requires `wait_for_processing`.', ) upload = _get_upload_with_write_access(upload_id, user, include_published=False) - if local_path: - if not os.path.isfile(local_path): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail='Uploading folders with local_path is not yet supported.', - ) - - if not is_safe_relative_path(path): + if local_path and not os.path.isfile(local_path): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail='Bad path provided.' + status.HTTP_400_BAD_REQUEST, + detail='Uploading folders with local_path is not yet supported.', ) + if not is_safe_relative_path(path): + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail='Bad path provided.') + if copy_or_move is not None or copy_or_move_source_path is not None: if copy_or_move not in ['copy', 'move']: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail="The copy_or_move query parameter should be one of 'copy' or 'move' options.", ) @@ -1400,19 +1384,18 @@ async def put_upload_raw_path( or file_name is None ): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="""For a successful copy/move operation, all three query parameters: - file_name, copy_or_move and copy_or_move_source_path are required.""", + status.HTTP_400_BAD_REQUEST, + detail="""For a successful copy/move operation, all three query parameters: file_name, copy_or_move and copy_or_move_source_path are required.""", ) if not is_safe_basename(file_name): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail='Bad file name provided' + status.HTTP_400_BAD_REQUEST, detail='Bad file name provided' ) if not is_safe_relative_path(copy_or_move_source_path): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Bad source path provided.', ) @@ -1424,7 +1407,7 @@ async def put_upload_raw_path( copy_or_move and copy_or_move_source_path and file_name ): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Either an upload file or the query parameters for moving/copying a file should be provided.', ) @@ -1435,24 +1418,25 @@ async def put_upload_raw_path( entry = upload.get_entry(entry_id) if entry and entry_hash != entry.entry_hash or not entry: raise HTTPException( - status_code=status.HTTP_409_CONFLICT, + status.HTTP_409_CONFLICT, detail='The provided hash did not match the current file.', ) upload_files = StagingUploadFiles(upload_id) + decompress = None for upload_path in upload_paths: decompress = files.auto_decompress(upload_path) if decompress == 'error': raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Cannot extract file. Bad file format or file extension?', ) if not decompress and not overwrite_if_exists: full_path = os.path.join(path, os.path.basename(upload_path)) if upload_files.raw_path_exists(full_path): raise HTTPException( - status_code=status.HTTP_409_CONFLICT, + status.HTTP_409_CONFLICT, detail='The provided path already exists and overwrite_if_exists is set to False.', ) @@ -1462,12 +1446,12 @@ async def put_upload_raw_path( path_to_target_file = os.path.join(path, file_name) if upload_files.raw_path_exists(path_to_target_file): raise HTTPException( - status_code=status.HTTP_409_CONFLICT, + status.HTTP_409_CONFLICT, detail='The provided path already exists.', ) if not upload_files.raw_path_exists(path): raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='No file or folder with that path found.', ) file_operations: Any = [ @@ -1492,7 +1476,7 @@ async def put_upload_raw_path( ) except ProcessAlreadyRunning: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='The upload is currently blocked by another process.', ) # Create response @@ -1505,89 +1489,86 @@ async def put_upload_raw_path( else: response_text = _thank_you_message media_type = 'text/plain' - else: - # Process locally - if copy_or_move: # case for move/copy an existing file - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail='Cannot move/copy the file with wait_for_processing set to true.', - ) - if len(upload_paths) != 1 or decompress: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail='`wait_for_processing` can only be used with single files, and not with compressed files.', - ) + return StreamingResponse( + create_stream_from_string(response_text), media_type=media_type + ) - upload_path = upload_paths[0] - full_path = os.path.join(path, os.path.basename(upload_path)) - try: - reprocess_settings = Reprocess( + # Process locally + if copy_or_move: # case for move/copy an existing file + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + detail='Cannot move/copy the file with wait_for_processing set to true.', + ) + + if len(upload_paths) != 1 or decompress: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + detail='`wait_for_processing` can only be used with single files, and not with compressed files.', + ) + + upload_path = upload_paths[0] + full_path = os.path.join(path, os.path.basename(upload_path)) + try: + entry = upload.put_file_and_process_local( + upload_path, + path, + reprocess_settings=Reprocess( index_individual_entries=True, reprocess_existing_entries=True - ) - entry = upload.put_file_and_process_local( - upload_path, path, reprocess_settings=reprocess_settings - ) + ), + ) - search_refresh() - - archive = None - if ( - entry - and entry.process_status == ProcessStatus.SUCCESS - and include_archive - ): - # NOTE: We can't rely on ES to get the metadata for the entry, since it may - # not have hade enough time to update its index etc. For now, we will just - # ignore this, as we do not need it. - entry_metadata = dict( + search_refresh() + + archive = None + if entry and entry.process_status == ProcessStatus.SUCCESS and include_archive: + # NOTE: We can't rely on ES to get the metadata for the entry, since it may + # not have had enough time to update its index etc. For now, we will just + # ignore this, as we do not need it. + archive = answer_entry_archive_request( + dict(upload_id=upload_id, mainfile=full_path), + required='*', + user=user, + entry_metadata=dict( upload_id=upload_id, entry_id=entry.entry_id, parser_name=entry.parser_name, - ) - archive = answer_entry_archive_request( - dict(upload_id=upload_id, mainfile=full_path), - required='*', - user=user, - entry_metadata=entry_metadata, - )['data']['archive'] - - response = PutRawFileResponse( - upload_id=upload_id, - data=upload_to_pydantic(upload), - processing=ProcessingData( - upload_id=upload_id, - path=full_path, - entry_id=entry.entry_id if entry else None, - parser_name=entry.parser_name if entry else None, - entry=entry_to_pydantic(entry) if entry else None, - archive=archive, ), - ) - response_text = response.json() - media_type = 'application/json' + )['data']['archive'] - except HTTPException: - raise - except ProcessAlreadyRunning: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail='The upload is currently being processed, operation not allowed.', - ) - except Exception as e: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f'Unexpected exception occurred: {e}', - ) - finally: - try: - shutil.rmtree(os.path.dirname(upload_path)) - except Exception: - pass + response = PutRawFileResponse( + upload_id=upload_id, + data=upload_to_pydantic(upload), + processing=ProcessingData( + upload_id=upload_id, + path=full_path, + entry_id=entry.entry_id if entry else None, + parser_name=entry.parser_name if entry else None, + entry=entry_to_pydantic(entry) if entry else None, + archive=archive, + ), + ) - return StreamingResponse( - create_stream_from_string(response_text), media_type=media_type - ) + return StreamingResponse( + create_stream_from_string(response.json()), media_type='application/json' + ) + except HTTPException: + raise + except ProcessAlreadyRunning: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + detail='The upload is currently being processed, operation not allowed.', + ) + except Exception as e: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + detail=f'Unexpected exception occurred: {e}', + ) + finally: + try: + shutil.rmtree(os.path.dirname(upload_path)) + except Exception: # noqa + pass @router.delete( @@ -1616,15 +1597,13 @@ async def delete_upload_raw_path( upload = _get_upload_with_write_access(upload_id, user, include_published=False) if not is_safe_relative_path(path): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail='Bad path provided.' - ) + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail='Bad path provided.') upload_files = StagingUploadFiles(upload_id) if not upload_files.raw_path_exists(path): raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, + status.HTTP_404_NOT_FOUND, detail='No file or folder with that path found.', ) @@ -1634,7 +1613,7 @@ async def delete_upload_raw_path( ) except ProcessAlreadyRunning: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='The upload is currently blocked by another process.', ) @@ -1662,17 +1641,15 @@ async def post_upload_raw_create_dir_path( """ Create a new empty directory in the specified upload. The `path` should be the full path to the new directory (i.e. ending with the name of the new directory). The api call returns - immediately (no processing is neccessary). + immediately (no processing is necessary). """ upload = _get_upload_with_write_access(upload_id, user, include_published=False) if not path or not is_safe_relative_path(path): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail='Bad path provided.' - ) + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail='Bad path provided.') if upload.staging_upload_files.raw_path_exists(path): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=f'Path `{path}` already exists.', ) try: @@ -1683,7 +1660,7 @@ async def post_upload_raw_create_dir_path( upload.save() except Exception as e: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=f'Failed to create directory: {e}', ) @@ -1836,7 +1813,7 @@ async def post_upload( >= config.services.upload_limit ): # type: ignore raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ Limit of unpublished uploads exceeded for user.""" @@ -1845,7 +1822,7 @@ async def post_upload( if not 0 <= embargo_length <= 36: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='`embargo_length` must be between 0 and 36 months.', ) @@ -1947,8 +1924,8 @@ async def post_upload_edit( except RequestValidationError: raise # A problem which we have handled explicitly. Fastapi does json conversion. except Exception as e: - # The upload is processing or some kind of unexpected error has occured - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + # The upload is processing or some kind of unexpected error has occurred + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e)) @router.delete( @@ -1984,7 +1961,7 @@ async def delete_upload( upload.delete_upload() except ProcessAlreadyRunning: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ The upload is still being processed.""" @@ -2047,24 +2024,24 @@ async def post_upload_action_publish( if upload.published and not user.is_admin and not to_central_nomad: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, detail='Upload already published.' + status.HTTP_400_BAD_REQUEST, detail='Upload already published.' ) _check_upload_not_processing(upload) if upload.process_status == ProcessStatus.FAILURE: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Cannot publish an upload that failed processing.', ) if upload.processed_entries_count == 0: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Cannot publish an upload without any resulting entries.', ) if embargo_length is not None and not 0 <= embargo_length <= 36: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Invalid embargo_length. Must be between 0 and 36 months.', ) @@ -2072,17 +2049,17 @@ async def post_upload_action_publish( # Publish from an OASIS to the central repository if not config.oasis.is_oasis: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Must be on an OASIS to publish to the central NOMAD repository.', ) if not upload.published: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, + status.HTTP_401_UNAUTHORIZED, detail='The upload must be published on the OASIS first.', ) if not user.is_admin: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, + status.HTTP_401_UNAUTHORIZED, detail='Only admin of OASIS can publish to the central NOMAD.', ) # Everything looks ok, try to publish it to the central NOMAD! @@ -2091,14 +2068,14 @@ async def post_upload_action_publish( # Publish to this repository if upload.published: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, + status.HTTP_401_UNAUTHORIZED, detail='The upload is already published.', ) try: upload.publish_upload(embargo_length=embargo_length) except ProcessAlreadyRunning: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='The upload is still/already processed.', ) @@ -2160,7 +2137,7 @@ async def post_upload_action_delete_entry_files( # Evaluate query if not data.query: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ A query must be specified.""" @@ -2198,7 +2175,7 @@ async def post_upload_action_delete_entry_files( ) except ProcessAlreadyRunning: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='The upload is currently blocked by another process.', ) @@ -2229,7 +2206,7 @@ async def post_upload_action_lift_embargo( _check_upload_not_processing(upload) if not upload.published: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ Upload is not published, no embargo to lift.""" @@ -2237,7 +2214,7 @@ async def post_upload_action_lift_embargo( ) if not upload.with_embargo: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail=strip( """ Upload has no embargo.""" @@ -2254,7 +2231,7 @@ async def post_upload_action_lift_embargo( ) except Exception as e: # Should only happen if the upload just started processing or something unexpected happens - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) + raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e)) @router.get( @@ -2324,15 +2301,11 @@ async def get_upload_bundle( ).export_bundle() except Exception as e: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=strip( - """ - Could not export due to error: """ - + str(e) - ), + status.HTTP_400_BAD_REQUEST, + detail=strip(f'Could not export due to error: {e}'), ) - return StreamingResponse(iter(stream), media_type='application/zip') + return StreamingResponse(async_wrapper(stream), media_type='application/zip') @router.post( @@ -2456,7 +2429,7 @@ async def post_upload_bundle( if local_path: if not os.path.isfile(local_path): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='You can only target a single bundle file using local_path.', ) @@ -2475,12 +2448,12 @@ async def post_upload_bundle( if not bundle_paths: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='No bundle file provided', ) if len(bundle_paths) > 1: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Can only provide one bundle file at a time', ) bundle_path = bundle_paths[0] @@ -2506,8 +2479,8 @@ async def post_upload_bundle( if isinstance(e, HTTPException): raise raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail='Failed to import bundle: ' + str(e), + status.HTTP_400_BAD_REQUEST, + detail=f'Failed to import bundle: {str(e)}', ) @@ -2550,25 +2523,31 @@ async def _get_files_if_provided( if not user.is_admin: if not safe_path or not is_safe_path(local_path, safe_path): raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip(""" - You are not authorized to access this path. - """), + status.HTTP_401_UNAUTHORIZED, + detail='You are not authorized to access this path.', ) if not os.path.exists(local_path): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=strip(""" - The specified local_path cannot be found. - """), + status.HTTP_400_BAD_REQUEST, + detail='The specified local_path cannot be found.', ) method = 0 elif file: # Method 1: Data provided as formdata method = 1 + + async def _async_reader(_f): + try: + while _data := await _f.read(io.DEFAULT_BUFFER_SIZE): + yield _data + except Exception as _e: + raise _e + finally: + await _f.close() + sources = [ - (_asyncronous_file_reader(multipart_file), unquote(multipart_file.filename)) + (_async_reader(multipart_file), unquote(multipart_file.filename)) for multipart_file in file ] else: @@ -2581,11 +2560,7 @@ async def _get_files_if_provided( for _, source_file_name in sources: if not files.is_safe_basename(source_file_name): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=strip( - """ - Bad file name provided.""" - ), + status.HTTP_400_BAD_REQUEST, detail='Bad file name provided.' ) # Forward the file path (if method == 0) or save the file(s) @@ -2639,7 +2614,7 @@ async def _get_files_if_provided( shutil.rmtree(tmp_dir) logger.warn('IO error receiving upload data', exc_info=e) raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='Some IO went wrong, upload probably aborted/disrupted.', ) upload_paths.append(upload_path) @@ -2662,11 +2637,8 @@ async def _get_files_if_provided( ) if not ext: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=strip( - """ - No file name provided, and the file does not look like a zip or tar file.""" - ), + status.HTTP_400_BAD_REQUEST, + detail='No file name provided, and the file does not look like a zip or tar file.', ) # Add the correct extension shutil.move(upload_path, upload_path + ext) @@ -2676,20 +2648,6 @@ async def _get_files_if_provided( return upload_paths, upload_folders, method -async def _asyncronous_file_reader(f): - """Asynchronous generator to read file-like objects.""" - while True: - try: - data: bytes = await f.read(io.DEFAULT_BUFFER_SIZE) - except Exception: - await f.close() - raise - if not data: - await f.close() - return - yield data - - def _query_mongodb(**kwargs): return Upload.objects(**kwargs) @@ -2767,11 +2725,7 @@ def get_upload_with_read_access( upload = mongodb_query.first() if upload is None: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=strip( - """ - The specified upload_id was not found.""" - ), + status.HTTP_404_NOT_FOUND, detail='The specified upload_id was not found.' ) if is_user_upload_viewer(upload, user): @@ -2779,37 +2733,25 @@ def get_upload_with_read_access( if not include_others: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - You do not have access to the specified upload.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='You do not have access to the specified upload.', ) if not upload.published: if user: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - You do not have access to the specified upload.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='You do not have access to the specified upload.', ) raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - You need to log in to access the specified upload.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='You need to log in to access the specified upload.', ) if upload.with_embargo: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - You do not have access to the specified upload - published with embargo.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='You do not have access to the specified upload - published with embargo.', ) return upload @@ -2830,40 +2772,27 @@ def _get_upload_with_write_access( """ if not user: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - User authentication required to access uploads.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='User authentication required to access uploads.', ) mongodb_query = _query_mongodb(upload_id=upload_id) if not mongodb_query.count(): raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail=strip( - """ - The specified upload_id was not found.""" - ), + status.HTTP_404_NOT_FOUND, detail='The specified upload_id was not found.' ) upload = mongodb_query.first() if not is_user_upload_writer(upload, user): raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - You do not have write access to the specified upload.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='You do not have write access to the specified upload.', ) if only_main_author and not user.is_admin and upload.main_author != user.user_id: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - Only main author has permissions for this operation.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='Only main author has permissions for this operation.', ) if not upload.published: @@ -2871,11 +2800,8 @@ def _get_upload_with_write_access( if not include_published: raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - Upload is already published, operation not possible.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='Upload is already published, operation not possible.', ) is_failed_import = ( @@ -2888,11 +2814,8 @@ def _get_upload_with_write_access( and not (is_failed_import and include_failed_imports) ): raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail=strip( - """ - Upload is already published, only admins can perform this operation.""" - ), + status.HTTP_401_UNAUTHORIZED, + detail='Upload is already published, only admins can perform this operation.', ) return upload @@ -2938,6 +2861,6 @@ def _check_upload_not_processing(upload: Upload): """ if upload.process_running: raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, + status.HTTP_400_BAD_REQUEST, detail='The upload is currently being processed, operation not allowed.', ) diff --git a/nomad/app/v1/utils.py b/nomad/app/v1/utils.py index ee71017f556452040c4da333152c399f07b32697..c67415c928b4c1ec0e28c6501681bea1f9802fa8 100644 --- a/nomad/app/v1/utils.py +++ b/nomad/app/v1/utils.py @@ -233,9 +233,10 @@ async def create_download_stream_raw_file( upload_files.close() -def create_stream_from_string(content: str) -> io.BytesIO: +async def create_stream_from_string(content: str): """For returning strings as content using""" - return io.BytesIO(content.encode()) + for x in io.BytesIO(content.encode()): + yield x def create_responses(*args): diff --git a/nomad/bundles.py b/nomad/bundles.py index bee8ec2d0422d812f3684ebedae2bfe8c1647953..aef86ca14d53d191503444099235ef3b9c605b35 100644 --- a/nomad/bundles.py +++ b/nomad/bundles.py @@ -102,7 +102,7 @@ class BundleExporter: ), 'You must specify either `export_as_stream = True` or `export_path`.' assert self.overwrite or not os.path.exists( self.export_path - ), '`export_path` alredy exists.' + ), '`export_path` already exists.' assert ( not self.upload.process_running or self.upload.current_process == 'publish_externally'