import os import shutil import datetime as dt from pathlib import Path from typing import List, Optional from fastapi import APIRouter, Request, Depends, Form, HTTPException, status from fastapi.responses import HTMLResponse, RedirectResponse, Response from fastapi.templating import Jinja2Templates import aiofiles # For async file operations if needed in admin, though mostly sync for listing/deleting # --- Admin Configuration --- BASE_DIR = Path(__file__).resolve().parent TEMPLATES_DIR = BASE_DIR / "templates" FILES_DIR = BASE_DIR / "uploaded_files" LOG_FILE = BASE_DIR / "app.log" # Ensure directories exist (though app.py also does this) FILES_DIR.mkdir(parents=True, exist_ok=True) templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) # INSECURE: Hardcoded credentials. Use environment variables in production! ADMIN_EMAIL = "admin@example.com" ADMIN_PASSWORD = "adminpassword" # Store hashed passwords in a real application ADMIN_SESSION_COOKIE_NAME = "admin_session_id" # This should be a very random, long, and secret string in a real app ADMIN_SESSION_SECRET_VALUE = "my_super_secret_admin_session_value_for_demo" admin_router = APIRouter( prefix="/admin", tags=["Admin"] ) # --- Admin Authentication Dependencies --- async def get_current_admin(request: Request): """ Dependency to check if the admin is logged in. Redirects to login page if not authenticated. """ session_cookie = request.cookies.get(ADMIN_SESSION_COOKIE_NAME) if session_cookie == ADMIN_SESSION_SECRET_VALUE: return {"email": ADMIN_EMAIL} # Return a dummy admin object # If trying to access a protected page without being logged in, redirect to login # Allow access to login page itself without this check. if request.url.path != "/admin/login" and request.url.path != "/admin/auth": return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) # If on login/auth page, don't raise/redirect, let the route handler proceed return None async def verify_admin_auth(request: Request): """ Stricter dependency for protected routes. Raises HTTPException if not authenticated. Used for API-like endpoints or where a redirect isn't the primary action. """ admin = await get_current_admin(request) if not admin and request.url.path not in ["/admin/login", "/admin/auth"]: # Check if it's already a redirect response raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", headers={"WWW-Authenticate": "Bearer"}, # Though we are using cookies ) if isinstance(admin, RedirectResponse): # if get_current_admin decided to redirect raise HTTPException( status_code=status.HTTP_307_TEMPORARY_REDIRECT, detail="Not authenticated, redirecting.", headers={"Location": "/admin/login"}, ) return admin # --- Helper Functions for Admin Panel --- def get_file_info(file_path: Path): try: if file_path.is_file(): stat_info = file_path.stat() return { "name": file_path.name, "size_bytes": stat_info.st_size, "size_human": format_bytes(stat_info.st_size), "modified_at": dt.datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S'), "url": f"/download/{file_path.name}" # User-facing download URL } except Exception: return None return None def format_bytes(bytes_val, decimals=2): if bytes_val == 0: return '0 Bytes' k = 1024 dm = decimals if decimals >= 0 else 0 sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB'] i = 0 if bytes_val > 0: i = int(dt.math.floor(dt.math.log(bytes_val) / dt.math.log(k))) return f"{bytes_val / (k**i):.{dm}f} {sizes[i]}" def get_log_lines(num_lines: int = 200) -> List[str]: if not LOG_FILE.exists(): return ["Log file not found or is empty."] try: with open(LOG_FILE, "r", encoding="utf-8") as f: lines = f.readlines() # Display latest lines at the top for easier reading return list(reversed(lines[-num_lines:])) except Exception as e: return [f"Error reading log file: {str(e)}"] # --- Admin Routes --- @admin_router.get("/login", response_class=HTMLResponse) async def admin_login_page(request: Request): return templates.TemplateResponse("admin_login.html", {"request": request}) @admin_router.post("/auth") async def admin_authenticate(request: Request, email: str = Form(...), password: str = Form(...)): error_msg: Optional[str] = None if email == ADMIN_EMAIL and password == ADMIN_PASSWORD: response = RedirectResponse(url="/admin/dashboard", status_code=status.HTTP_303_SEE_OTHER) response.set_cookie( key=ADMIN_SESSION_COOKIE_NAME, value=ADMIN_SESSION_SECRET_VALUE, httponly=True, # Makes it inaccessible to JavaScript samesite="lax", # Protects against CSRF to some extent # secure=True, # Uncomment if served over HTTPS max_age=1800 # 30 minutes ) return response else: error_msg = "Invalid email or password." return templates.TemplateResponse("admin_login.html", {"request": request, "error": error_msg}, status_code=status.HTTP_401_UNAUTHORIZED) @admin_router.get("/logout") async def admin_logout(request: Request): response = RedirectResponse(url="/admin/login", status_code=status.HTTP_303_SEE_OTHER) response.delete_cookie(ADMIN_SESSION_COOKIE_NAME, httponly=True, samesite="lax") #, secure=True) return response @admin_router.get("/dashboard", response_class=HTMLResponse) async def admin_dashboard(request: Request, admin_user: dict = Depends(get_current_admin)): if isinstance(admin_user, RedirectResponse): return admin_user # Handle redirect from dependency if not admin_user: # Should be caught by dependency, but as a fallback return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) num_files = len([name for name in os.listdir(FILES_DIR) if (FILES_DIR / name).is_file()]) log_lines_count = 0 if LOG_FILE.exists(): with open(LOG_FILE, "r", encoding="utf-8") as f: log_lines_count = sum(1 for _ in f) return templates.TemplateResponse("admin_dashboard.html", { "request": request, "num_files": num_files, "log_lines_count": log_lines_count }) @admin_router.get("/files", response_class=HTMLResponse) async def admin_files_page(request: Request, admin_user: dict = Depends(get_current_admin)): if isinstance(admin_user, RedirectResponse): return admin_user if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) uploaded_file_objects = [] try: for item_name in sorted(os.listdir(FILES_DIR)): item_path = FILES_DIR / item_name info = get_file_info(item_path) if info: uploaded_file_objects.append(info) except Exception as e: # Log this error, maybe show it on the page print(f"Error listing files: {e}") # Replace with proper logging pass # Or raise HTTPException return templates.TemplateResponse("admin_files.html", { "request": request, "files": uploaded_file_objects }) @admin_router.post("/files/delete/{filename}") async def admin_delete_file(request: Request, filename: str, admin_user: dict = Depends(verify_admin_auth)): if isinstance(admin_user, RedirectResponse): return admin_user # Should not happen with verify_admin_auth if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) # Sanitize filename again, though it comes from our listed files safe_filename = os.path.basename(filename) file_path = FILES_DIR / safe_filename # Extra check to ensure we are deleting within FILES_DIR if not file_path.resolve().is_relative_to(FILES_DIR.resolve()): raise HTTPException(status_code=400, detail="Invalid file path for deletion.") if file_path.is_file(): try: file_path.unlink() # Optionally, add a success message to be displayed (e.g., via query params or session flash if implemented) # For now, just redirect. return RedirectResponse(url="/admin/files", status_code=status.HTTP_303_SEE_OTHER) except Exception as e: # Log this error raise HTTPException(status_code=500, detail=f"Could not delete file: {str(e)}") else: raise HTTPException(status_code=404, detail="File not found for deletion.") @admin_router.get("/logs", response_class=HTMLResponse) async def admin_logs_page(request: Request, admin_user: dict = Depends(get_current_admin)): if isinstance(admin_user, RedirectResponse): return admin_user if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT) logs = get_log_lines(num_lines=500) # Show last 500 lines return templates.TemplateResponse("admin_logs.html", { "request": request, "log_entries": logs })