Spaces:
Running
Running
| """ | |
| Authentication router for simple login system | |
| """ | |
| import os | |
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Optional | |
| from fastapi import APIRouter, HTTPException, Response, Cookie, Form, Request | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired | |
| import logging | |
| from urllib.parse import urlparse | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/auth", tags=["Authentication"]) | |
| # Session management | |
| SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", secrets.token_hex(32)) | |
| SESSION_MAX_AGE = 86400 # 24 hours in seconds | |
| serializer = URLSafeTimedSerializer(SESSION_SECRET_KEY) | |
| # In-memory session store (for simple use case) | |
| # For production, consider using Redis or database | |
| active_sessions: Dict[str, dict] = {} | |
| class LoginRequest(BaseModel): | |
| username: str | |
| password: str | |
| class LoginResponse(BaseModel): | |
| success: bool | |
| message: str | |
| def create_session(username: str) -> str: | |
| """Create a new session token""" | |
| session_id = secrets.token_urlsafe(32) | |
| session_data = { | |
| "username": username, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "expires_at": (datetime.utcnow() + timedelta(seconds=SESSION_MAX_AGE)).isoformat() | |
| } | |
| # Store session | |
| active_sessions[session_id] = session_data | |
| # Create signed token | |
| token = serializer.dumps(session_id) | |
| return token | |
| def verify_session(token: Optional[str]) -> Optional[dict]: | |
| """Verify session token and return session data""" | |
| if not token: | |
| return None | |
| try: | |
| # Verify signature and age | |
| session_id = serializer.loads(token, max_age=SESSION_MAX_AGE) | |
| # Check if session exists | |
| session_data = active_sessions.get(session_id) | |
| if not session_data: | |
| return None | |
| # Check expiration | |
| expires_at = datetime.fromisoformat(session_data["expires_at"]) | |
| if datetime.utcnow() > expires_at: | |
| # Clean up expired session | |
| active_sessions.pop(session_id, None) | |
| return None | |
| return session_data | |
| except (BadSignature, SignatureExpired): | |
| return None | |
| except Exception as e: | |
| logger.error(f"Session verification error: {e}") | |
| return None | |
| def verify_credentials(username: str, password: str) -> bool: | |
| """Verify username and password against environment variables""" | |
| expected_username = "volaris" | |
| expected_password = "volaris123" | |
| return username == expected_username and password == expected_password | |
| async def login( | |
| response: Response, | |
| request: Request, | |
| username: str = Form(...), | |
| password: str = Form(...) | |
| ): | |
| """ | |
| Login endpoint - validates credentials and creates session | |
| """ | |
| # Log login attempt | |
| logger.info(f"Login attempt for username: {username}, Origin: {request.headers.get('origin')}") | |
| # Verify credentials | |
| if not verify_credentials(username, password): | |
| logger.warning(f"Failed login attempt for username: {username}") | |
| raise HTTPException(status_code=401, detail="Invalid username or password") | |
| # Create session | |
| token = create_session(username) | |
| logger.info(f"Session created for user: {username}") | |
| # Set secure cookie | |
| # Detect if we're running on HTTPS (Hugging Face Spaces use HTTPS) | |
| is_https = request.url.scheme == "https" or request.headers.get("x-forwarded-proto") == "https" | |
| # For HTTPS (production/HF Spaces), use SameSite=None with Secure=True for cross-origin | |
| # For HTTP (local dev), use SameSite=Lax with Secure=False | |
| if is_https: | |
| samesite = "none" | |
| secure = True | |
| else: | |
| samesite = "lax" | |
| secure = False | |
| logger.info(f"Setting cookie with samesite={samesite}, secure={secure}, is_https={is_https}") | |
| response.set_cookie( | |
| key="session_token", | |
| value=token, | |
| httponly=True, | |
| max_age=SESSION_MAX_AGE, | |
| samesite=samesite, | |
| secure=secure, | |
| path="/" | |
| ) | |
| logger.info(f"Successful login for user: {username}") | |
| return LoginResponse( | |
| success=True, | |
| message="Login successful" | |
| ) | |
| async def logout( | |
| response: Response, | |
| session_token: Optional[str] = Cookie(None) | |
| ): | |
| """ | |
| Logout endpoint - invalidates session | |
| """ | |
| if session_token: | |
| try: | |
| session_id = serializer.loads(session_token, max_age=SESSION_MAX_AGE) | |
| active_sessions.pop(session_id, None) | |
| except Exception: | |
| pass | |
| # Clear cookie | |
| response.delete_cookie(key="session_token") | |
| return {"success": True, "message": "Logged out successfully"} | |
| async def verify(session_token: Optional[str] = Cookie(None)): | |
| """ | |
| Verify if current session is valid | |
| """ | |
| session_data = verify_session(session_token) | |
| if not session_data: | |
| raise HTTPException(status_code=401, detail="Not authenticated") | |
| return { | |
| "authenticated": True, | |
| "username": session_data.get("username") | |
| } | |
| async def status(request: Request, session_token: Optional[str] = Cookie(None)): | |
| """ | |
| Check authentication status without raising exception | |
| """ | |
| logger.info(f"Status check - Cookie present: {session_token is not None}, Origin: {request.headers.get('origin')}") | |
| session_data = verify_session(session_token) | |
| if session_data: | |
| logger.info(f"Status check - Authenticated as: {session_data.get('username')}") | |
| else: | |
| logger.info("Status check - Not authenticated") | |
| return { | |
| "authenticated": session_data is not None, | |
| "username": session_data.get("username") if session_data else None | |
| } | |