Spaces:
Paused
Paused
| from fastapi import ( | |
| Depends, | |
| FastAPI, | |
| HTTPException, | |
| status, | |
| Request, | |
| UploadFile, | |
| File, | |
| Form, | |
| ) | |
| from datetime import datetime, timedelta | |
| from typing import List, Union, Optional | |
| from pathlib import Path | |
| from fastapi import APIRouter | |
| from fastapi.responses import StreamingResponse, JSONResponse, FileResponse | |
| from pydantic import BaseModel | |
| import json | |
| from apps.webui.models.files import ( | |
| Files, | |
| FileForm, | |
| FileModel, | |
| FileModelResponse, | |
| ) | |
| from utils.utils import get_verified_user, get_admin_user | |
| from constants import ERROR_MESSAGES | |
| from importlib import util | |
| import os | |
| import uuid | |
| import os, shutil, logging, re | |
| from config import SRC_LOG_LEVELS, UPLOAD_DIR | |
| log = logging.getLogger(__name__) | |
| log.setLevel(SRC_LOG_LEVELS["MODELS"]) | |
| router = APIRouter() | |
| ############################ | |
| # Upload File | |
| ############################ | |
| def upload_file(file: UploadFile = File(...), user=Depends(get_verified_user)): | |
| log.info(f"file.content_type: {file.content_type}") | |
| try: | |
| unsanitized_filename = file.filename | |
| filename = os.path.basename(unsanitized_filename) | |
| # replace filename with uuid | |
| id = str(uuid.uuid4()) | |
| name = filename | |
| filename = f"{id}_{filename}" | |
| file_path = f"{UPLOAD_DIR}/{filename}" | |
| contents = file.file.read() | |
| with open(file_path, "wb") as f: | |
| f.write(contents) | |
| f.close() | |
| file = Files.insert_new_file( | |
| user.id, | |
| FileForm( | |
| **{ | |
| "id": id, | |
| "filename": filename, | |
| "meta": { | |
| "name": name, | |
| "content_type": file.content_type, | |
| "size": len(contents), | |
| "path": file_path, | |
| }, | |
| } | |
| ), | |
| ) | |
| if file: | |
| return file | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error uploading file"), | |
| ) | |
| except Exception as e: | |
| log.exception(e) | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT(e), | |
| ) | |
| ############################ | |
| # List Files | |
| ############################ | |
| async def list_files(user=Depends(get_verified_user)): | |
| files = Files.get_files() | |
| return files | |
| ############################ | |
| # Delete All Files | |
| ############################ | |
| async def delete_all_files(user=Depends(get_admin_user)): | |
| result = Files.delete_all_files() | |
| if result: | |
| folder = f"{UPLOAD_DIR}" | |
| try: | |
| # Check if the directory exists | |
| if os.path.exists(folder): | |
| # Iterate over all the files and directories in the specified directory | |
| for filename in os.listdir(folder): | |
| file_path = os.path.join(folder, filename) | |
| try: | |
| if os.path.isfile(file_path) or os.path.islink(file_path): | |
| os.unlink(file_path) # Remove the file or link | |
| elif os.path.isdir(file_path): | |
| shutil.rmtree(file_path) # Remove the directory | |
| except Exception as e: | |
| print(f"Failed to delete {file_path}. Reason: {e}") | |
| else: | |
| print(f"The directory {folder} does not exist") | |
| except Exception as e: | |
| print(f"Failed to process the directory {folder}. Reason: {e}") | |
| return {"message": "All files deleted successfully"} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting files"), | |
| ) | |
| ############################ | |
| # Get File By Id | |
| ############################ | |
| async def get_file_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file: | |
| return file | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Get File Content By Id | |
| ############################ | |
| async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file: | |
| file_path = Path(file.meta["path"]) | |
| # Check if the file already exists in the cache | |
| if file_path.is_file(): | |
| print(f"file_path: {file_path}") | |
| return FileResponse(file_path) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| async def get_file_content_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file: | |
| file_path = Path(file.meta["path"]) | |
| # Check if the file already exists in the cache | |
| if file_path.is_file(): | |
| print(f"file_path: {file_path}") | |
| return FileResponse(file_path) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |
| ############################ | |
| # Delete File By Id | |
| ############################ | |
| async def delete_file_by_id(id: str, user=Depends(get_verified_user)): | |
| file = Files.get_file_by_id(id) | |
| if file: | |
| result = Files.delete_file_by_id(id) | |
| if result: | |
| return {"message": "File deleted successfully"} | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail=ERROR_MESSAGES.DEFAULT("Error deleting file"), | |
| ) | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail=ERROR_MESSAGES.NOT_FOUND, | |
| ) | |