Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import aiofiles | |
| from fastapi import ( | |
| FastAPI, UploadFile, File, HTTPException, Request, Depends, Form, APIRouter, status | |
| ) | |
| import datetime | |
| from fastapi.responses import HTMLResponse, FileResponse, JSONResponse, RedirectResponse, Response | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from pathlib import Path | |
| import logging # Standard logging | |
| import datetime # For admin logic timestamps | |
| import math # For admin logic format_bytes | |
| from typing import List, Optional # For admin logic type hinting | |
| # --- Configuration --- | |
| BASE_DIR = Path(__file__).resolve().parent | |
| FILES_DIR = BASE_DIR / "uploaded_files" | |
| STATIC_DIR = BASE_DIR / "static" | |
| TEMPLATES_DIR = BASE_DIR / "templates" | |
| # LOG_FILE constant removed | |
| # Create directories if they don't exist | |
| FILES_DIR.mkdir(parents=True, exist_ok=True) | |
| STATIC_DIR.mkdir(parents=True, exist_ok=True) | |
| TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) | |
| # --- Logging Configuration (Console Only) --- | |
| logger = logging.getLogger("file_app") | |
| logger.setLevel(logging.INFO) | |
| # No file handler, no propagate=False. Logs will go to console via root logger. | |
| logger.info("Application starting up... Logging to console.") | |
| app = FastAPI(title="File Uploader/Downloader") | |
| # Setup Jinja2 templates | |
| templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) | |
| # --- Admin Panel Configuration & Logic --- | |
| ADMIN_EMAIL = "admin@example.com" | |
| ADMIN_PASSWORD = "adminpassword" | |
| ADMIN_SESSION_COOKIE_NAME = "admin_session_id" | |
| ADMIN_SESSION_SECRET_VALUE = "my_super_secret_admin_session_value_for_demo_only_change_me" | |
| admin_router = APIRouter( | |
| prefix="/admin", | |
| tags=["Admin"], | |
| ) | |
| # --- Admin Helper Functions --- | |
| def get_file_info(file_path: Path): | |
| try: | |
| if file_path.is_file(): | |
| stat_info = file_path.stat() | |
| return { | |
| "name": file_path.name, | |
| "size_bytes": stat_info.st_size, | |
| "size_human": format_bytes(stat_info.st_size), | |
| "modified_at": datetime.datetime.fromtimestamp(stat_info.st_mtime).strftime('%Y-%m-%d %H:%M:%S UTC'), | |
| "url": f"/download/{file_path.name}" | |
| } | |
| except Exception as e: | |
| logger.error(f"Admin: Error getting file info for {file_path}: {e}") | |
| return None | |
| return None | |
| def format_bytes(bytes_val, decimals=2): | |
| if bytes_val == 0: return '0 Bytes' | |
| k = 1024 | |
| dm = decimals if decimals >= 0 else 0 | |
| sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB', 'PB'] | |
| i = 0 | |
| if bytes_val > 0: | |
| i = int(math.floor(math.log(bytes_val) / math.log(k))) | |
| if i >= len(sizes): i = len(sizes) -1 | |
| return f"{bytes_val / (k**i):.{dm}f} {sizes[i]}" | |
| # get_log_lines function removed as file logging is removed. | |
| # --- Admin Authentication Dependencies --- | |
| async def get_current_admin(request: Request): | |
| session_cookie = request.cookies.get(ADMIN_SESSION_COOKIE_NAME) | |
| if session_cookie == ADMIN_SESSION_SECRET_VALUE: | |
| return {"email": ADMIN_EMAIL} | |
| current_path = request.url.path | |
| if current_path not in [app.url_path_for("admin_login_page"), app.url_path_for("admin_authenticate")]: | |
| logger.info(f"Admin: Unauthorized access attempt to {current_path}, redirecting to login.") | |
| return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT) | |
| return None | |
| async def verify_admin_auth(request: Request): | |
| admin_user = await get_current_admin(request) | |
| if isinstance(admin_user, RedirectResponse): | |
| raise HTTPException( | |
| status_code=status.HTTP_307_TEMPORARY_REDIRECT, | |
| detail="Redirecting to login.", | |
| headers={"Location": app.url_path_for("admin_login_page")}, | |
| ) | |
| if not admin_user: | |
| logger.warning(f"Admin: Strict auth verification failed for {request.url.path}.") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Not authenticated", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return admin_user | |
| # --- Admin Routes --- | |
| async def admin_login_page(request: Request): | |
| logger.info("Admin: Login page accessed.") | |
| current_year = datetime.datetime.now(datetime.timezone.utc).year # Get current UTC year | |
| return templates.TemplateResponse( | |
| "admin_login.html", | |
| { | |
| "request": request, | |
| "url_for": app.url_path_for, | |
| "current_year": current_year # Add current year to context | |
| } | |
| ) | |
| async def admin_authenticate(request: Request, email: str = Form(...), password: str = Form(...)): | |
| if email == ADMIN_EMAIL and password == ADMIN_PASSWORD: | |
| logger.info(f"Admin: Successful login for email {email}.") | |
| response = RedirectResponse(url=app.url_path_for("admin_dashboard"), status_code=status.HTTP_303_SEE_OTHER) | |
| response.set_cookie( | |
| key=ADMIN_SESSION_COOKIE_NAME, value=ADMIN_SESSION_SECRET_VALUE, | |
| httponly=True, samesite="lax", max_age=30 * 60 | |
| ) | |
| return response | |
| else: | |
| logger.warning(f"Admin: Failed login attempt for email {email}.") | |
| error_msg = "Invalid email or password." | |
| return templates.TemplateResponse( | |
| "admin_login.html", | |
| {"request": request, "error": error_msg, "url_for": app.url_path_for}, | |
| status_code=status.HTTP_401_UNAUTHORIZED | |
| ) | |
| async def admin_logout(request: Request): | |
| logger.info("Admin: Logout initiated.") | |
| response = RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_303_SEE_OTHER) | |
| response.delete_cookie(ADMIN_SESSION_COOKIE_NAME, httponly=True, samesite="lax") | |
| return response | |
| async def admin_dashboard(request: Request, admin_user: dict = Depends(get_current_admin)): | |
| if isinstance(admin_user, RedirectResponse): return admin_user | |
| if not admin_user: return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT) | |
| logger.info(f"Admin: Dashboard accessed by {admin_user.get('email')}.") | |
| num_files = 0 | |
| try: | |
| num_files = len([name for name in os.listdir(FILES_DIR) if (FILES_DIR / name).is_file()]) | |
| except Exception as e: | |
| logger.error(f"Admin: Error counting files for dashboard: {e}") | |
| # log_lines_count removed | |
| return templates.TemplateResponse("admin_dashboard.html", { | |
| "request": request, | |
| "num_files": num_files, | |
| "admin_user": admin_user, | |
| "url_for": app.url_path_for | |
| }) | |
| async def admin_files_page(request: Request, admin_user: dict = Depends(get_current_admin)): | |
| if isinstance(admin_user, RedirectResponse): return admin_user | |
| if not admin_user: return RedirectResponse(url=app.url_path_for("admin_login_page"), status_code=status.HTTP_307_TEMPORARY_REDIRECT) | |
| logger.info(f"Admin: Files page accessed by {admin_user.get('email')}.") | |
| uploaded_file_objects = [] | |
| try: | |
| for item_name in sorted(os.listdir(FILES_DIR)): | |
| item_path = FILES_DIR / item_name | |
| info = get_file_info(item_path) | |
| if info: | |
| uploaded_file_objects.append(info) | |
| except Exception as e: | |
| logger.error(f"Admin: Error listing files: {e}") | |
| return templates.TemplateResponse("admin_files.html", { | |
| "request": request, | |
| "files": uploaded_file_objects, | |
| "admin_user": admin_user, | |
| "url_for": app.url_path_for | |
| }) | |
| async def admin_delete_file(request: Request, filename: str, admin_user: dict = Depends(verify_admin_auth)): | |
| logger.info(f"Admin: Attempting to delete file '{filename}' by {admin_user.get('email')}.") | |
| safe_filename = sanitize_filename(filename) | |
| file_path = FILES_DIR / safe_filename | |
| if not file_path.resolve().is_relative_to(FILES_DIR.resolve()): | |
| logger.error(f"Admin: Invalid file path for deletion (traversal attempt) '{filename}'.") | |
| raise HTTPException(status_code=400, detail="Invalid file path for deletion.") | |
| if file_path.is_file(): | |
| try: | |
| file_path.unlink() | |
| logger.info(f"Admin: File '{filename}' deleted successfully by {admin_user.get('email')}.") | |
| return RedirectResponse(url=app.url_path_for("admin_files_page") + "?message=File+deleted+successfully", status_code=status.HTTP_303_SEE_OTHER) | |
| except Exception as e: | |
| logger.error(f"Admin: Could not delete file '{filename}': {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Could not delete file: {str(e)}") | |
| else: | |
| logger.warning(f"Admin: File not found for deletion '{filename}'.") | |
| raise HTTPException(status_code=404, detail="File not found for deletion.") | |
| # /admin/logs route and admin_logs_page function removed | |
| # --- Main Application Helper Functions (Original) --- | |
| def sanitize_filename(filename: str) -> str: | |
| return os.path.basename(filename).strip() | |
| # --- Main Application Routes (Original functionality with added logging) --- | |
| async def read_root(request: Request): | |
| logger.info(f"Root page accessed by {request.client.host}") | |
| return templates.TemplateResponse("index.html", {"request": request, "url_for": app.url_path_for}) | |
| async def upload_file(request: Request, file: UploadFile = File(...)): | |
| original_filename = file.filename | |
| logger.info(f"Upload attempt for '{original_filename}' from {request.client.host}") | |
| if not original_filename: | |
| logger.warning(f"Upload failed: No filename provided by {request.client.host}") | |
| raise HTTPException(status_code=400, detail="No filename provided.") | |
| filename = sanitize_filename(original_filename) | |
| if not filename: | |
| logger.warning(f"Upload failed: Invalid filename '{original_filename}' after sanitization by {request.client.host}") | |
| raise HTTPException(status_code=400, detail="Invalid filename after sanitization.") | |
| file_path = FILES_DIR / filename | |
| if not file_path.resolve().is_relative_to(FILES_DIR.resolve()): | |
| logger.error(f"Upload security violation: Attempted directory traversal for '{filename}' by {request.client.host}") | |
| raise HTTPException(status_code=400, detail="Invalid file path (attempted directory traversal).") | |
| try: | |
| async with aiofiles.open(file_path, "wb") as buffer: | |
| while content := await file.read(1024 * 1024): | |
| await buffer.write(content) | |
| file_size = file_path.stat().st_size | |
| logger.info(f"File '{filename}' uploaded successfully by {request.client.host}. Size: {file_size} bytes.") | |
| except Exception as e: | |
| logger.error(f"Could not save file '{filename}' from {request.client.host}: {str(e)}") | |
| if file_path.exists(): | |
| try: | |
| file_path.unlink() | |
| logger.info(f"Cleaned up partially uploaded file '{filename}'.") | |
| except Exception as unlink_e: | |
| logger.error(f"Error cleaning up partially uploaded file '{filename}': {unlink_e}") | |
| raise HTTPException(status_code=500, detail=f"Could not save file: {str(e)}") | |
| return JSONResponse( | |
| content={ | |
| "message": "File uploaded successfully", | |
| "filename": filename, | |
| "download_url": app.url_path_for("download_file", filename=filename) | |
| } | |
| ) | |
| async def download_file(request: Request, filename: str): | |
| clean_filename = sanitize_filename(filename) | |
| logger.info(f"Download attempt for '{clean_filename}' by {request.client.host}") | |
| if not clean_filename: | |
| logger.warning(f"Download failed: Invalid filename '{filename}' by {request.client.host}") | |
| raise HTTPException(status_code=400, detail="Invalid filename.") | |
| file_path = FILES_DIR / clean_filename | |
| if not file_path.resolve().is_relative_to(FILES_DIR.resolve()) or not file_path.is_file(): | |
| logger.warning(f"Download failed: File not found or access denied for '{clean_filename}' by {request.client.host}") | |
| raise HTTPException(status_code=404, detail="File not found or access denied.") | |
| logger.info(f"File '{clean_filename}' downloaded by {request.client.host}") | |
| return FileResponse(path=file_path, filename=clean_filename, media_type='application/octet-stream') | |
| app.include_router(admin_router) | |
| logger.info("Admin panel router included. File-based logging is disabled.") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| logger.info("Starting Uvicorn server for local development...") | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |