uploadkro2 / app.py
triflix's picture
Update app.py
0053eb2 verified
import os
import shutil
import aiofiles
from fastapi import (
FastAPI, UploadFile, File, HTTPException, Request, Depends, Form, APIRouter, status
)
import datetime
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse, Response
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pathlib import Path
import logging # Standard logging
import datetime # For admin logic timestamps
import math # For admin logic format_bytes
from typing import List, Optional # For admin logic type hinting
# --- Configuration ---
BASE_DIR = Path(__file__).resolve().parent
FILES_DIR = BASE_DIR / "uploaded_files"
STATIC_DIR = BASE_DIR / "static"
TEMPLATES_DIR = BASE_DIR / "templates"
# LOG_FILE constant removed
# Create directories if they don't exist
FILES_DIR.mkdir(parents=True, exist_ok=True)
STATIC_DIR.mkdir(parents=True, exist_ok=True)
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
# --- Logging Configuration (Console Only) ---
logger = logging.getLogger("file_app")
logger.setLevel(logging.INFO)
# No file handler, no propagate=False. Logs will go to console via root logger.
logger.info("Application starting up... Logging to console.")
app = FastAPI(title="File Uploader/Downloader")
# Setup Jinja2 templates
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
# --- Admin Panel Configuration & Logic ---
ADMIN_EMAIL = "admin@example.com"
ADMIN_PASSWORD = "adminpassword"
ADMIN_SESSION_COOKIE_NAME = "admin_session_id"
ADMIN_SESSION_SECRET_VALUE = "my_super_secret_admin_session_value_for_demo_only_change_me"
admin_router = APIRouter(
prefix="/admin",
tags=["Admin"],
)
# --- Admin Helper Functions ---
def get_file_info(file_path: Path):
try:
if file_path.is_file():
stat_info = file_path.stat()
return {
"name": file_path.name,
"size_bytes": stat_info.st_size,
"size_human": format_bytes(stat_info.st_size),
"modified_at": datetime.datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S UTC'),
"url": f"/download/{file_path.name}"
}
except Exception as e:
logger.error(f"Admin: Error getting file info for {file_path}: {e}")
return None
return None
def format_bytes(bytes_val, decimals=2):
if bytes_val == 0: return '0 Bytes'
k = 1024
dm = decimals if decimals >= 0 else 0
sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB']
i = 0
if bytes_val > 0:
i = int(math.floor(math.log(bytes_val) / math.log(k)))
if i >= len(sizes): i = len(sizes) -1
return f"{bytes_val / (k**i):.{dm}f} {sizes[i]}"
# get_log_lines function removed as file logging is removed.
# --- Admin Authentication Dependencies ---
async def get_current_admin(request: Request):
session_cookie = request.cookies.get(ADMIN_SESSION_COOKIE_NAME)
if session_cookie == ADMIN_SESSION_SECRET_VALUE:
return {"email": ADMIN_EMAIL}
current_path = request.url.path
if current_path not in [app.url_path_for("admin_login_page"), app.url_path_for("admin_authenticate")]:
logger.info(f"Admin: Unauthorized access attempt to {current_path}, redirecting to login.")
return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT)
return None
async def verify_admin_auth(request: Request):
admin_user = await get_current_admin(request)
if isinstance(admin_user, RedirectResponse):
raise HTTPException(
status_code=status.HTTP_307_TEMPORARY_REDIRECT,
detail="Redirecting to login.",
headers={"Location": app.url_path_for("admin_login_page")},
)
if not admin_user:
logger.warning(f"Admin: Strict auth verification failed for {request.url.path}.")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
return admin_user
# --- Admin Routes ---
@admin_router.get("/login", response_class=HTMLResponse, name="admin_login_page")
async def admin_login_page(request: Request):
logger.info("Admin: Login page accessed.")
current_year = datetime.datetime.now(datetime.timezone.utc).year # Get current UTC year
return templates.TemplateResponse(
"admin_login.html",
{
"request": request,
"url_for": app.url_path_for,
"current_year": current_year # Add current year to context
}
)
@admin_router.post("/auth", name="admin_authenticate")
async def admin_authenticate(request: Request, email: str = Form(...), password: str = Form(...)):
if email == ADMIN_EMAIL and password == ADMIN_PASSWORD:
logger.info(f"Admin: Successful login for email {email}.")
response = RedirectResponse(url=app.url_path_for("admin_dashboard"), status_code=status.HTTP_303_SEE_OTHER)
response.set_cookie(
key=ADMIN_SESSION_COOKIE_NAME, value=ADMIN_SESSION_SECRET_VALUE,
httponly=True, samesite="lax", max_age=30 * 60
)
return response
else:
logger.warning(f"Admin: Failed login attempt for email {email}.")
error_msg = "Invalid email or password."
return templates.TemplateResponse(
"admin_login.html",
{"request": request, "error": error_msg, "url_for": app.url_path_for},
status_code=status.HTTP_401_UNAUTHORIZED
)
@admin_router.get("/logout", name="admin_logout")
async def admin_logout(request: Request):
logger.info("Admin: Logout initiated.")
response = RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_303_SEE_OTHER)
response.delete_cookie(ADMIN_SESSION_COOKIE_NAME, httponly=True, samesite="lax")
return response
@admin_router.get("/dashboard", response_class=HTMLResponse, name="admin_dashboard")
async def admin_dashboard(request: Request, admin_user: dict = Depends(get_current_admin)):
if isinstance(admin_user, RedirectResponse): return admin_user
if not admin_user: return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT)
logger.info(f"Admin: Dashboard accessed by {admin_user.get('email')}.")
num_files = 0
try:
num_files = len([name for name in os.listdir(FILES_DIR) if (FILES_DIR / name).is_file()])
except Exception as e:
logger.error(f"Admin: Error counting files for dashboard: {e}")
# log_lines_count removed
return templates.TemplateResponse("admin_dashboard.html", {
"request": request,
"num_files": num_files,
"admin_user": admin_user,
"url_for": app.url_path_for
})
@admin_router.get("/files", response_class=HTMLResponse, name="admin_files_page")
async def admin_files_page(request: Request, admin_user: dict = Depends(get_current_admin)):
if isinstance(admin_user, RedirectResponse): return admin_user
if not admin_user: return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT)
logger.info(f"Admin: Files page accessed by {admin_user.get('email')}.")
uploaded_file_objects = []
try:
for item_name in sorted(os.listdir(FILES_DIR)):
item_path = FILES_DIR / item_name
info = get_file_info(item_path)
if info:
uploaded_file_objects.append(info)
except Exception as e:
logger.error(f"Admin: Error listing files: {e}")
return templates.TemplateResponse("admin_files.html", {
"request": request,
"files": uploaded_file_objects,
"admin_user": admin_user,
"url_for": app.url_path_for
})
@admin_router.post("/files/delete/{filename}", name="admin_delete_file")
async def admin_delete_file(request: Request, filename: str, admin_user: dict = Depends(verify_admin_auth)):
logger.info(f"Admin: Attempting to delete file '{filename}' by {admin_user.get('email')}.")
safe_filename = sanitize_filename(filename)
file_path = FILES_DIR / safe_filename
if not file_path.resolve().is_relative_to(FILES_DIR.resolve()):
logger.error(f"Admin: Invalid file path for deletion (traversal attempt) '{filename}'.")
raise HTTPException(status_code=400, detail="Invalid file path for deletion.")
if file_path.is_file():
try:
file_path.unlink()
logger.info(f"Admin: File '{filename}' deleted successfully by {admin_user.get('email')}.")
return RedirectResponse(url=app.url_path_for("admin_files_page") + "?message=File+deleted+successfully", status_code=status.HTTP_303_SEE_OTHER)
except Exception as e:
logger.error(f"Admin: Could not delete file '{filename}': {str(e)}")
raise HTTPException(status_code=500, detail=f"Could not delete file: {str(e)}")
else:
logger.warning(f"Admin: File not found for deletion '{filename}'.")
raise HTTPException(status_code=404, detail="File not found for deletion.")
# /admin/logs route and admin_logs_page function removed
# --- Main Application Helper Functions (Original) ---
def sanitize_filename(filename: str) -> str:
return os.path.basename(filename).strip()
# --- Main Application Routes (Original functionality with added logging) ---
@app.get("/", response_class=HTMLResponse, name="read_root")
async def read_root(request: Request):
logger.info(f"Root page accessed by {request.client.host}")
return templates.TemplateResponse("index.html", {"request": request, "url_for": app.url_path_for})
@app.post("/upload/", name="upload_file")
async def upload_file(request: Request, file: UploadFile = File(...)):
original_filename = file.filename
logger.info(f"Upload attempt for '{original_filename}' from {request.client.host}")
if not original_filename:
logger.warning(f"Upload failed: No filename provided by {request.client.host}")
raise HTTPException(status_code=400, detail="No filename provided.")
filename = sanitize_filename(original_filename)
if not filename:
logger.warning(f"Upload failed: Invalid filename '{original_filename}' after sanitization by {request.client.host}")
raise HTTPException(status_code=400, detail="Invalid filename after sanitization.")
file_path = FILES_DIR / filename
if not file_path.resolve().is_relative_to(FILES_DIR.resolve()):
logger.error(f"Upload security violation: Attempted directory traversal for '{filename}' by {request.client.host}")
raise HTTPException(status_code=400, detail="Invalid file path (attempted directory traversal).")
try:
async with aiofiles.open(file_path, "wb") as buffer:
while content := await file.read(1024 * 1024):
await buffer.write(content)
file_size = file_path.stat().st_size
logger.info(f"File '{filename}' uploaded successfully by {request.client.host}. Size: {file_size} bytes.")
except Exception as e:
logger.error(f"Could not save file '{filename}' from {request.client.host}: {str(e)}")
if file_path.exists():
try:
file_path.unlink()
logger.info(f"Cleaned up partially uploaded file '{filename}'.")
except Exception as unlink_e:
logger.error(f"Error cleaning up partially uploaded file '{filename}': {unlink_e}")
raise HTTPException(status_code=500, detail=f"Could not save file: {str(e)}")
return JSONResponse(
content={
"message": "File uploaded successfully",
"filename": filename,
"download_url": app.url_path_for("download_file", filename=filename)
}
)
@app.get("/download/{filename}", name="download_file")
async def download_file(request: Request, filename: str):
clean_filename = sanitize_filename(filename)
logger.info(f"Download attempt for '{clean_filename}' by {request.client.host}")
if not clean_filename:
logger.warning(f"Download failed: Invalid filename '{filename}' by {request.client.host}")
raise HTTPException(status_code=400, detail="Invalid filename.")
file_path = FILES_DIR / clean_filename
if not file_path.resolve().is_relative_to(FILES_DIR.resolve()) or not file_path.is_file():
logger.warning(f"Download failed: File not found or access denied for '{clean_filename}' by {request.client.host}")
raise HTTPException(status_code=404, detail="File not found or access denied.")
logger.info(f"File '{clean_filename}' downloaded by {request.client.host}")
return FileResponse(path=file_path, filename=clean_filename, media_type='application/octet-stream')
app.include_router(admin_router)
logger.info("Admin panel router included. File-based logging is disabled.")
if __name__ == "__main__":
import uvicorn
logger.info("Starting Uvicorn server for local development...")
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)