Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import datetime as dt | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from fastapi import APIRouter, Request, Depends, Form, HTTPException, status | |
| from fastapi.responses import HTMLResponse, RedirectResponse, Response | |
| from fastapi.templating import Jinja2Templates | |
| import aiofiles # For async file operations if needed in admin, though mostly sync for listing/deleting | |
| # --- Admin Configuration --- | |
| BASE_DIR = Path(__file__).resolve().parent | |
| TEMPLATES_DIR = BASE_DIR / "templates" | |
| FILES_DIR = BASE_DIR / "uploaded_files" | |
| LOG_FILE = BASE_DIR / "app.log" | |
| # Ensure directories exist (though app.py also does this) | |
| FILES_DIR.mkdir(parents=True, exist_ok=True) | |
| templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) | |
| # INSECURE: Hardcoded credentials. Use environment variables in production! | |
| ADMIN_EMAIL = "admin@example.com" | |
| ADMIN_PASSWORD = "adminpassword" # Store hashed passwords in a real application | |
| ADMIN_SESSION_COOKIE_NAME = "admin_session_id" | |
| # This should be a very random, long, and secret string in a real app | |
| ADMIN_SESSION_SECRET_VALUE = "my_super_secret_admin_session_value_for_demo" | |
| admin_router = APIRouter( | |
| prefix="/admin", | |
| tags=["Admin"] | |
| ) | |
| # --- Admin Authentication Dependencies --- | |
| async def get_current_admin(request: Request): | |
| """ | |
| Dependency to check if the admin is logged in. | |
| Redirects to login page if not authenticated. | |
| """ | |
| session_cookie = request.cookies.get(ADMIN_SESSION_COOKIE_NAME) | |
| if session_cookie == ADMIN_SESSION_SECRET_VALUE: | |
| return {"email": ADMIN_EMAIL} # Return a dummy admin object | |
| # If trying to access a protected page without being logged in, redirect to login | |
| # Allow access to login page itself without this check. | |
| if request.url.path != "/admin/login" and request.url.path != "/admin/auth": | |
| return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) | |
| # If on login/auth page, don't raise/redirect, let the route handler proceed | |
| return None | |
| async def verify_admin_auth(request: Request): | |
| """ | |
| Stricter dependency for protected routes. Raises HTTPException if not authenticated. | |
| Used for API-like endpoints or where a redirect isn't the primary action. | |
| """ | |
| admin = await get_current_admin(request) | |
| if not admin and request.url.path not in ["/admin/login", "/admin/auth"]: # Check if it's already a redirect response | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Not authenticated", | |
| headers={"WWW-Authenticate": "Bearer"}, # Though we are using cookies | |
| ) | |
| if isinstance(admin, RedirectResponse): # if get_current_admin decided to redirect | |
| raise HTTPException( | |
| status_code=status.HTTP_307_TEMPORARY_REDIRECT, | |
| detail="Not authenticated, redirecting.", | |
| headers={"Location": "/admin/login"}, | |
| ) | |
| return admin | |
| # --- Helper Functions for Admin Panel --- | |
| def get_file_info(file_path: Path): | |
| try: | |
| if file_path.is_file(): | |
| stat_info = file_path.stat() | |
| return { | |
| "name": file_path.name, | |
| "size_bytes": stat_info.st_size, | |
| "size_human": format_bytes(stat_info.st_size), | |
| "modified_at": dt.datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), | |
| "url": f"/download/{file_path.name}" # User-facing download URL | |
| } | |
| except Exception: | |
| return None | |
| return None | |
| def format_bytes(bytes_val, decimals=2): | |
| if bytes_val == 0: return '0 Bytes' | |
| k = 1024 | |
| dm = decimals if decimals >= 0 else 0 | |
| sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB'] | |
| i = 0 | |
| if bytes_val > 0: | |
| i = int(dt.math.floor(dt.math.log(bytes_val) / dt.math.log(k))) | |
| return f"{bytes_val / (k**i):.{dm}f} {sizes[i]}" | |
| def get_log_lines(num_lines: int = 200) -> List[str]: | |
| if not LOG_FILE.exists(): | |
| return ["Log file not found or is empty."] | |
| try: | |
| with open(LOG_FILE, "r", encoding="utf-8") as f: | |
| lines = f.readlines() | |
| # Display latest lines at the top for easier reading | |
| return list(reversed(lines[-num_lines:])) | |
| except Exception as e: | |
| return [f"Error reading log file: {str(e)}"] | |
| # --- Admin Routes --- | |
| async def admin_login_page(request: Request): | |
| return templates.TemplateResponse("admin_login.html", {"request": request}) | |
| async def admin_authenticate(request: Request, email: str = Form(...), password: str = Form(...)): | |
| error_msg: Optional[str] = None | |
| if email == ADMIN_EMAIL and password == ADMIN_PASSWORD: | |
| response = RedirectResponse(url="/admin/dashboard", status_code=status.HTTP_303_SEE_OTHER) | |
| response.set_cookie( | |
| key=ADMIN_SESSION_COOKIE_NAME, | |
| value=ADMIN_SESSION_SECRET_VALUE, | |
| httponly=True, # Makes it inaccessible to JavaScript | |
| samesite="lax", # Protects against CSRF to some extent | |
| # secure=True, # Uncomment if served over HTTPS | |
| max_age=1800 # 30 minutes | |
| ) | |
| return response | |
| else: | |
| error_msg = "Invalid email or password." | |
| return templates.TemplateResponse("admin_login.html", {"request": request, "error": error_msg}, status_code=status.HTTP_401_UNAUTHORIZED) | |
| async def admin_logout(request: Request): | |
| response = RedirectResponse(url="/admin/login", status_code=status.HTTP_303_SEE_OTHER) | |
| response.delete_cookie(ADMIN_SESSION_COOKIE_NAME, httponly=True, samesite="lax") #, secure=True) | |
| return response | |
| async def admin_dashboard(request: Request, admin_user: dict = Depends(get_current_admin)): | |
| if isinstance(admin_user, RedirectResponse): return admin_user # Handle redirect from dependency | |
| if not admin_user: # Should be caught by dependency, but as a fallback | |
| return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) | |
| num_files = len([name for name in os.listdir(FILES_DIR) if (FILES_DIR / name).is_file()]) | |
| log_lines_count = 0 | |
| if LOG_FILE.exists(): | |
| with open(LOG_FILE, "r", encoding="utf-8") as f: | |
| log_lines_count = sum(1 for _ in f) | |
| return templates.TemplateResponse("admin_dashboard.html", { | |
| "request": request, | |
| "num_files": num_files, | |
| "log_lines_count": log_lines_count | |
| }) | |
| async def admin_files_page(request: Request, admin_user: dict = Depends(get_current_admin)): | |
| if isinstance(admin_user, RedirectResponse): return admin_user | |
| if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) | |
| uploaded_file_objects = [] | |
| try: | |
| for item_name in sorted(os.listdir(FILES_DIR)): | |
| item_path = FILES_DIR / item_name | |
| info = get_file_info(item_path) | |
| if info: | |
| uploaded_file_objects.append(info) | |
| except Exception as e: | |
| # Log this error, maybe show it on the page | |
| print(f"Error listing files: {e}") # Replace with proper logging | |
| pass # Or raise HTTPException | |
| return templates.TemplateResponse("admin_files.html", { | |
| "request": request, | |
| "files": uploaded_file_objects | |
| }) | |
| async def admin_delete_file(request: Request, filename: str, admin_user: dict = Depends(verify_admin_auth)): | |
| if isinstance(admin_user, RedirectResponse): return admin_user # Should not happen with verify_admin_auth | |
| if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) | |
| # Sanitize filename again, though it comes from our listed files | |
| safe_filename = os.path.basename(filename) | |
| file_path = FILES_DIR / safe_filename | |
| # Extra check to ensure we are deleting within FILES_DIR | |
| if not file_path.resolve().is_relative_to(FILES_DIR.resolve()): | |
| raise HTTPException(status_code=400, detail="Invalid file path for deletion.") | |
| if file_path.is_file(): | |
| try: | |
| file_path.unlink() | |
| # Optionally, add a success message to be displayed (e.g., via query params or session flash if implemented) | |
| # For now, just redirect. | |
| return RedirectResponse(url="/admin/files", status_code=status.HTTP_303_SEE_OTHER) | |
| except Exception as e: | |
| # Log this error | |
| raise HTTPException(status_code=500, detail=f"Could not delete file: {str(e)}") | |
| else: | |
| raise HTTPException(status_code=404, detail="File not found for deletion.") | |
| async def admin_logs_page(request: Request, admin_user: dict = Depends(get_current_admin)): | |
| if isinstance(admin_user, RedirectResponse): return admin_user | |
| if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) | |
| logs = get_log_lines(num_lines=500) # Show last 500 lines | |
| return templates.TemplateResponse("admin_logs.html", { | |
| "request": request, | |
| "log_entries": logs | |
| }) |