Spaces:
Paused
Paused
| import logging | |
| import os | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional | |
| from pydantic import BaseModel | |
| import mimetypes | |
| from open_webui.storage.provider import Storage | |
| from open_webui.apps.webui.models.files import ( | |
| FileForm, | |
| FileModel, | |
| FileModelResponse, | |
| Files, | |
| ) | |
| from open_webui.apps.retrieval.main import process_file, ProcessFileForm | |
| from open_webui.config import UPLOAD_DIR | |
| from open_webui.env import SRC_LOG_LEVELS | |
| from open_webui.constants import ERROR_MESSAGES | |
| from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status | |
| from fastapi.responses import FileResponse, StreamingResponse | |
| from open_webui.utils.utils import get_admin_user, get_verified_user | |
| log = logging.getLogger(__name__) | |
| log.setLevel(SRC_LOG_LEVELS["MODELS"]) | |
| router = APIRouter() | |
| ############################ | |
| # Upload File | |
| ############################ | |
| def upload_file(file: UploadFile = File(...), user=Depends(get_verified_user)): | |
| log.info(f"file.content_type: {file.content_type}") | |
| try: | |
| unsanitized_filename = file.filename | |
| filename = os.path.basename(unsanitized_filename) | |
| # replace filename with uuid | |
| id = str(uuid.uuid4()) | |
| name = filename | |
| filename = f"{id}_{filename}" | |
| contents, file_path = Storage.upload_file(file.file, filename) | |
| file_item = Files.insert_new_file( | |
| user.id, | |
| FileForm( | |
| **{ | |
| "id": id, | |
| "filename": name, | |
| "path": file_path, | |
| "meta": { | |
| "name": name, | |
| "content_type": file.content_type, | |
| "size": len(contents), | |
| }, | |
| } | |
| ), | |
| ) | |
| try: | |
| process_file(ProcessFileForm(file_id=id)) | |
| file_item = Files.get_file_by_id(id=id) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error(f"Error processing file: {file_item.id}") | |
| file_item = FileModelResponse( | |
| **{ | |
| **file_item.model_dump(), | |
| "error": str(e.detail) if hasattr(e, "detail") else str(e), | |
| } | |
| ) | |
| if file_item: | |
| return file_item | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error uploading file"), | |
| ) | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| ############################ | |
| # List Files | |
| ############################ | |
| async def list_files(user=Depends(get_verified_user)): | |
| if user.role == "admin": | |
| files = Files.get_files() | |
| else: | |
| files = Files.get_files_by_user_id(user.id) | |
| return files | |
| ############################ | |
| # Delete All Files | |
| ############################ | |
| async def delete_all_files(user=Depends(get_admin_user)): | |
| result = Files.delete_all_files() | |
| if result: | |
| try: | |
| Storage.delete_all_files() | |
| except Exception as e: | |
| log.exception(e) | |
| log.error(f"Error deleting files") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), | |
| ) | |
| return {"message": "All files deleted successfully"} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), | |
| ) | |
| ############################ | |
| # Get File By Id | |
| ############################ | |
| async def get_file_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file and (file.user_id == user.id or user.role == "admin"): | |
| return file | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Get File Data Content By Id | |
| ############################ | |
| async def get_file_data_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file and (file.user_id == user.id or user.role == "admin"): | |
| return {"content": file.data.get("content", "")} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Update File Data Content By Id | |
| ############################ | |
| class ContentForm(BaseModel): | |
| content: str | |
| async def update_file_data_content_by_id( | |
| id: str, form_data: ContentForm, user=Depends(get_verified_user) | |
| ): | |
| file = Files.get_file_by_id(id) | |
| if file and (file.user_id == user.id or user.role == "admin"): | |
| try: | |
| process_file(ProcessFileForm(file_id=id, content=form_data.content)) | |
| file = Files.get_file_by_id(id=id) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error(f"Error processing file: {file.id}") | |
| return {"content": file.data.get("content", "")} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Get File Content By Id | |
| ############################ | |
| async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file and (file.user_id == user.id or user.role == "admin"): | |
| try: | |
| file_path = Storage.get_file(file.path) | |
| file_path = Path(file_path) | |
| # Check if the file already exists in the cache | |
| if file_path.is_file(): | |
| print(f"file_path: {file_path}") | |
| headers = { | |
| "Content-Disposition": f'attachment; filename="{file.meta.get("name", file.filename)}"' | |
| } | |
| return FileResponse(file_path, headers=headers) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error(f"Error getting file content") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error getting file content"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| async def get_html_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file and (file.user_id == user.id or user.role == "admin"): | |
| try: | |
| file_path = Storage.get_file(file.path) | |
| file_path = Path(file_path) | |
| # Check if the file already exists in the cache | |
| if file_path.is_file(): | |
| print(f"file_path: {file_path}") | |
| return FileResponse(file_path) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error(f"Error getting file content") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error getting file content"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file and (file.user_id == user.id or user.role == "admin"): | |
| file_path = file.path | |
| if file_path: | |
| file_path = Storage.get_file(file_path) | |
| file_path = Path(file_path) | |
| # Check if the file already exists in the cache | |
| if file_path.is_file(): | |
| print(f"file_path: {file_path}") | |
| headers = { | |
| "Content-Disposition": f'attachment; filename="{file.meta.get("name", file.filename)}"' | |
| } | |
| return FileResponse(file_path, headers=headers) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| else: | |
| # File path doesn’t exist, return the content as .txt if possible | |
| file_content = file.content.get("content", "") | |
| file_name = file.filename | |
| # Create a generator that encodes the file content | |
| def generator(): | |
| yield file_content.encode("utf-8") | |
| return StreamingResponse( | |
| generator(), | |
| media_type="text/plain", | |
| headers={"Content-Disposition": f"attachment; filename={file_name}"}, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Delete File By Id | |
| ############################ | |
| async def delete_file_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file and (file.user_id == user.id or user.role == "admin"): | |
| result = Files.delete_file_by_id(id) | |
| if result: | |
| try: | |
| Storage.delete_file(file.filename) | |
| except Exception as e: | |
| log.exception(e) | |
| log.error(f"Error deleting files") | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), | |
| ) | |
| return {"message": "File deleted successfully"} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting file"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |