Spaces:
Running
Running
Enhance API security and functionality by adding authentication middleware and session management. Updated app.py to include the new auth router and integrated authentication checks for protected endpoints. Modified requirements.txt to include necessary libraries for session handling. Updated .env.example to include authentication credentials. Improved retrieval functions with query expansion for better medical term matching and enriched context in responses.
ddc9c77
| """ | |
| Authentication router for simple login system | |
| """ | |
| import os | |
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Optional | |
| from fastapi import APIRouter, HTTPException, Response, Cookie, Form | |
| from fastapi.responses import JSONResponse | |
| from pydantic import BaseModel | |
| from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/auth", tags=["Authentication"]) | |
| # Session management | |
| SESSION_SECRET_KEY = os.getenv("SESSION_SECRET_KEY", secrets.token_hex(32)) | |
| SESSION_MAX_AGE = 86400 # 24 hours in seconds | |
| serializer = URLSafeTimedSerializer(SESSION_SECRET_KEY) | |
| # In-memory session store (for simple use case) | |
| # For production, consider using Redis or database | |
| active_sessions: Dict[str, dict] = {} | |
| class LoginRequest(BaseModel): | |
| username: str | |
| password: str | |
| class LoginResponse(BaseModel): | |
| success: bool | |
| message: str | |
| def create_session(username: str) -> str: | |
| """Create a new session token""" | |
| session_id = secrets.token_urlsafe(32) | |
| session_data = { | |
| "username": username, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "expires_at": (datetime.utcnow() + timedelta(seconds=SESSION_MAX_AGE)).isoformat() | |
| } | |
| # Store session | |
| active_sessions[session_id] = session_data | |
| # Create signed token | |
| token = serializer.dumps(session_id) | |
| return token | |
| def verify_session(token: Optional[str]) -> Optional[dict]: | |
| """Verify session token and return session data""" | |
| if not token: | |
| return None | |
| try: | |
| # Verify signature and age | |
| session_id = serializer.loads(token, max_age=SESSION_MAX_AGE) | |
| # Check if session exists | |
| session_data = active_sessions.get(session_id) | |
| if not session_data: | |
| return None | |
| # Check expiration | |
| expires_at = datetime.fromisoformat(session_data["expires_at"]) | |
| if datetime.utcnow() > expires_at: | |
| # Clean up expired session | |
| active_sessions.pop(session_id, None) | |
| return None | |
| return session_data | |
| except (BadSignature, SignatureExpired): | |
| return None | |
| except Exception as e: | |
| logger.error(f"Session verification error: {e}") | |
| return None | |
| def verify_credentials(username: str, password: str) -> bool: | |
| """Verify username and password against environment variables""" | |
| expected_username = os.getenv("AUTH_USERNAME", "volaris") | |
| expected_password = os.getenv("AUTH_PASSWORD", "volaris") | |
| return username == expected_username and password == expected_password | |
| async def login( | |
| response: Response, | |
| username: str = Form(...), | |
| password: str = Form(...) | |
| ): | |
| """ | |
| Login endpoint - validates credentials and creates session | |
| """ | |
| # Verify credentials | |
| if not verify_credentials(username, password): | |
| logger.warning(f"Failed login attempt for username: {username}") | |
| raise HTTPException(status_code=401, detail="Invalid username or password") | |
| # Create session | |
| token = create_session(username) | |
| # Set secure cookie | |
| response.set_cookie( | |
| key="session_token", | |
| value=token, | |
| httponly=True, | |
| max_age=SESSION_MAX_AGE, | |
| samesite="lax", | |
| secure=False # Set to True in production with HTTPS | |
| ) | |
| logger.info(f"Successful login for user: {username}") | |
| return LoginResponse( | |
| success=True, | |
| message="Login successful" | |
| ) | |
| async def logout( | |
| response: Response, | |
| session_token: Optional[str] = Cookie(None) | |
| ): | |
| """ | |
| Logout endpoint - invalidates session | |
| """ | |
| if session_token: | |
| try: | |
| session_id = serializer.loads(session_token, max_age=SESSION_MAX_AGE) | |
| active_sessions.pop(session_id, None) | |
| except Exception: | |
| pass | |
| # Clear cookie | |
| response.delete_cookie(key="session_token") | |
| return {"success": True, "message": "Logged out successfully"} | |
| async def verify(session_token: Optional[str] = Cookie(None)): | |
| """ | |
| Verify if current session is valid | |
| """ | |
| session_data = verify_session(session_token) | |
| if not session_data: | |
| raise HTTPException(status_code=401, detail="Not authenticated") | |
| return { | |
| "authenticated": True, | |
| "username": session_data.get("username") | |
| } | |
| async def status(session_token: Optional[str] = Cookie(None)): | |
| """ | |
| Check authentication status without raising exception | |
| """ | |
| session_data = verify_session(session_token) | |
| return { | |
| "authenticated": session_data is not None, | |
| "username": session_data.get("username") if session_data else None | |
| } | |