""" Authentication router for simple login system """ import os import secrets from datetime import datetime, timedelta from typing import Dict, Optional from fastapi import APIRouter, HTTPException, Response, Cookie, Form from fastapi.responses import JSONResponse from pydantic import BaseModel from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/auth", tags=["Authentication"]) # Session management SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", secrets.token_hex(32)) SESSION_MAX_AGE = 86400 # 24 hours in seconds serializer = URLSafeTimedSerializer(SESSION_SECRET_KEY) # In-memory session store (for simple use case) # For production, consider using Redis or database active_sessions: Dict[str, dict] = {} class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): success: bool message: str def create_session(username: str) -> str: """Create a new session token""" session_id = secrets.token_urlsafe(32) session_data = { "username": username, "created_at": datetime.utcnow().isoformat(), "expires_at": (datetime.utcnow() + timedelta(seconds=SESSION_MAX_AGE)).isoformat() } # Store session active_sessions[session_id] = session_data # Create signed token token = serializer.dumps(session_id) return token def verify_session(token: Optional[str]) -> Optional[dict]: """Verify session token and return session data""" if not token: return None try: # Verify signature and age session_id = serializer.loads(token, max_age=SESSION_MAX_AGE) # Check if session exists session_data = active_sessions.get(session_id) if not session_data: return None # Check expiration expires_at = datetime.fromisoformat(session_data["expires_at"]) if datetime.utcnow() > expires_at: # Clean up expired session active_sessions.pop(session_id, None) return None return session_data except (BadSignature, SignatureExpired): return None except Exception as e: logger.error(f"Session verification error: {e}") return None def verify_credentials(username: str, password: str) -> bool: """Verify username and password against environment variables""" expected_username = "volaris" expected_password = "volaris123" return username == expected_username and password == expected_password @router.post("/login", response_model=LoginResponse) async def login( response: Response, username: str = Form(...), password: str = Form(...) ): """ Login endpoint - validates credentials and creates session """ # Verify credentials if not verify_credentials(username, password): logger.warning(f"Failed login attempt for username: {username}") raise HTTPException(status_code=401, detail="Invalid username or password") # Create session token = create_session(username) # Set secure cookie # In development (HTTP), use lax samesite and secure=False # In production (HTTPS), use none samesite and secure=True is_production = os.getenv("ENVIRONMENT", "development") == "production" response.set_cookie( key="session_token", value=token, httponly=True, max_age=SESSION_MAX_AGE, samesite="none" if is_production else "lax", secure=is_production # Only secure in production with HTTPS ) logger.info(f"Successful login for user: {username}") return LoginResponse( success=True, message="Login successful" ) @router.post("/logout") async def logout( response: Response, session_token: Optional[str] = Cookie(None) ): """ Logout endpoint - invalidates session """ if session_token: try: session_id = serializer.loads(session_token, max_age=SESSION_MAX_AGE) active_sessions.pop(session_id, None) except Exception: pass # Clear cookie response.delete_cookie(key="session_token") return {"success": True, "message": "Logged out successfully"} @router.get("/verify") async def verify(session_token: Optional[str] = Cookie(None)): """ Verify if current session is valid """ session_data = verify_session(session_token) if not session_data: raise HTTPException(status_code=401, detail="Not authenticated") return { "authenticated": True, "username": session_data.get("username") } @router.get("/status") async def status(session_token: Optional[str] = Cookie(None)): """ Check authentication status without raising exception """ session_data = verify_session(session_token) return { "authenticated": session_data is not None, "username": session_data.get("username") if session_data else None }