uploadkro2 / admin_logic.py
triflix's picture
Upload 10 files
ad0e3b3 verified
import os
import shutil
import datetime as dt
from pathlib import Path
from typing import List, Optional
from fastapi import APIRouter, Request, Depends, Form, HTTPException, status
from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi.templating import Jinja2Templates
import aiofiles # For async file operations if needed in admin, though mostly sync for listing/deleting
# --- Admin Configuration ---
BASE_DIR = Path(__file__).resolve().parent
TEMPLATES_DIR = BASE_DIR / "templates"
FILES_DIR = BASE_DIR / "uploaded_files"
LOG_FILE = BASE_DIR / "app.log"
# Ensure directories exist (though app.py also does this)
FILES_DIR.mkdir(parents=True, exist_ok=True)
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
# INSECURE: Hardcoded credentials. Use environment variables in production!
ADMIN_EMAIL = "admin@example.com"
ADMIN_PASSWORD = "adminpassword" # Store hashed passwords in a real application
ADMIN_SESSION_COOKIE_NAME = "admin_session_id"
# This should be a very random, long, and secret string in a real app
ADMIN_SESSION_SECRET_VALUE = "my_super_secret_admin_session_value_for_demo"
admin_router = APIRouter(
prefix="/admin",
tags=["Admin"]
)
# --- Admin Authentication Dependencies ---
async def get_current_admin(request: Request):
"""
Dependency to check if the admin is logged in.
Redirects to login page if not authenticated.
"""
session_cookie = request.cookies.get(ADMIN_SESSION_COOKIE_NAME)
if session_cookie == ADMIN_SESSION_SECRET_VALUE:
return {"email": ADMIN_EMAIL} # Return a dummy admin object
# If trying to access a protected page without being logged in, redirect to login
# Allow access to login page itself without this check.
if request.url.path != "/admin/login" and request.url.path != "/admin/auth":
return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT)
# If on login/auth page, don't raise/redirect, let the route handler proceed
return None
async def verify_admin_auth(request: Request):
"""
Stricter dependency for protected routes. Raises HTTPException if not authenticated.
Used for API-like endpoints or where a redirect isn't the primary action.
"""
admin = await get_current_admin(request)
if not admin and request.url.path not in ["/admin/login", "/admin/auth"]: # Check if it's already a redirect response
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"}, # Though we are using cookies
)
if isinstance(admin, RedirectResponse): # if get_current_admin decided to redirect
raise HTTPException(
status_code=status.HTTP_307_TEMPORARY_REDIRECT,
detail="Not authenticated, redirecting.",
headers={"Location": "/admin/login"},
)
return admin
# --- Helper Functions for Admin Panel ---
def get_file_info(file_path: Path):
try:
if file_path.is_file():
stat_info = file_path.stat()
return {
"name": file_path.name,
"size_bytes": stat_info.st_size,
"size_human": format_bytes(stat_info.st_size),
"modified_at": dt.datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S'),
"url": f"/download/{file_path.name}" # User-facing download URL
}
except Exception:
return None
return None
def format_bytes(bytes_val, decimals=2):
if bytes_val == 0: return '0 Bytes'
k = 1024
dm = decimals if decimals >= 0 else 0
sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB']
i = 0
if bytes_val > 0:
i = int(dt.math.floor(dt.math.log(bytes_val) / dt.math.log(k)))
return f"{bytes_val / (k**i):.{dm}f} {sizes[i]}"
def get_log_lines(num_lines: int = 200) -> List[str]:
if not LOG_FILE.exists():
return ["Log file not found or is empty."]
try:
with open(LOG_FILE, "r", encoding="utf-8") as f:
lines = f.readlines()
# Display latest lines at the top for easier reading
return list(reversed(lines[-num_lines:]))
except Exception as e:
return [f"Error reading log file: {str(e)}"]
# --- Admin Routes ---
@admin_router.get("/login", response_class=HTMLResponse)
async def admin_login_page(request: Request):
return templates.TemplateResponse("admin_login.html", {"request": request})
@admin_router.post("/auth")
async def admin_authenticate(request: Request, email: str = Form(...), password: str = Form(...)):
error_msg: Optional[str] = None
if email == ADMIN_EMAIL and password == ADMIN_PASSWORD:
response = RedirectResponse(url="/admin/dashboard", status_code=status.HTTP_303_SEE_OTHER)
response.set_cookie(
key=ADMIN_SESSION_COOKIE_NAME,
value=ADMIN_SESSION_SECRET_VALUE,
httponly=True, # Makes it inaccessible to JavaScript
samesite="lax", # Protects against CSRF to some extent
# secure=True, # Uncomment if served over HTTPS
max_age=1800 # 30 minutes
)
return response
else:
error_msg = "Invalid email or password."
return templates.TemplateResponse("admin_login.html", {"request": request, "error": error_msg}, status_code=status.HTTP_401_UNAUTHORIZED)
@admin_router.get("/logout")
async def admin_logout(request: Request):
response = RedirectResponse(url="/admin/login", status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(ADMIN_SESSION_COOKIE_NAME, httponly=True, samesite="lax") #, secure=True)
return response
@admin_router.get("/dashboard", response_class=HTMLResponse)
async def admin_dashboard(request: Request, admin_user: dict = Depends(get_current_admin)):
if isinstance(admin_user, RedirectResponse): return admin_user # Handle redirect from dependency
if not admin_user: # Should be caught by dependency, but as a fallback
return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT)
num_files = len([name for name in os.listdir(FILES_DIR) if (FILES_DIR / name).is_file()])
log_lines_count = 0
if LOG_FILE.exists():
with open(LOG_FILE, "r", encoding="utf-8") as f:
log_lines_count = sum(1 for _ in f)
return templates.TemplateResponse("admin_dashboard.html", {
"request": request,
"num_files": num_files,
"log_lines_count": log_lines_count
})
@admin_router.get("/files", response_class=HTMLResponse)
async def admin_files_page(request: Request, admin_user: dict = Depends(get_current_admin)):
if isinstance(admin_user, RedirectResponse): return admin_user
if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT)
uploaded_file_objects = []
try:
for item_name in sorted(os.listdir(FILES_DIR)):
item_path = FILES_DIR / item_name
info = get_file_info(item_path)
if info:
uploaded_file_objects.append(info)
except Exception as e:
# Log this error, maybe show it on the page
print(f"Error listing files: {e}") # Replace with proper logging
pass # Or raise HTTPException
return templates.TemplateResponse("admin_files.html", {
"request": request,
"files": uploaded_file_objects
})
@admin_router.post("/files/delete/{filename}")
async def admin_delete_file(request: Request, filename: str, admin_user: dict = Depends(verify_admin_auth)):
if isinstance(admin_user, RedirectResponse): return admin_user # Should not happen with verify_admin_auth
if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT)
# Sanitize filename again, though it comes from our listed files
safe_filename = os.path.basename(filename)
file_path = FILES_DIR / safe_filename
# Extra check to ensure we are deleting within FILES_DIR
if not file_path.resolve().is_relative_to(FILES_DIR.resolve()):
raise HTTPException(status_code=400, detail="Invalid file path for deletion.")
if file_path.is_file():
try:
file_path.unlink()
# Optionally, add a success message to be displayed (e.g., via query params or session flash if implemented)
# For now, just redirect.
return RedirectResponse(url="/admin/files", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
# Log this error
raise HTTPException(status_code=500, detail=f"Could not delete file: {str(e)}")
else:
raise HTTPException(status_code=404, detail="File not found for deletion.")
@admin_router.get("/logs", response_class=HTMLResponse)
async def admin_logs_page(request: Request, admin_user: dict = Depends(get_current_admin)):
if isinstance(admin_user, RedirectResponse): return admin_user
if not admin_user: return RedirectResponse(url="/admin/login", status_code=status.HTTP_307_TEMPORARY_REDIRECT)
logs = get_log_lines(num_lines=500) # Show last 500 lines
return templates.TemplateResponse("admin_logs.html", {
"request": request,
"log_entries": logs
})