Spaces:
Sleeping
Sleeping
| """ | |
| Authentication and authorization middleware. | |
| Extracts tenant_id from JWT token in production mode. | |
| """ | |
| from fastapi import Request, HTTPException, Depends | |
| from typing import Optional, Dict, Any | |
| import logging | |
| from jose import JWTError, jwt | |
| from app.config import settings | |
| logger = logging.getLogger(__name__) | |
| async def verify_tenant_access( | |
| request: Request, | |
| tenant_id: str, | |
| user_id: str | |
| ) -> bool: | |
| """ | |
| Verify that the user has access to the specified tenant. | |
| TODO: Implement actual authentication logic: | |
| 1. Extract JWT token from Authorization header | |
| 2. Verify token signature | |
| 3. Extract user_id and tenant_id from token | |
| 4. Verify user belongs to tenant | |
| 5. Check permissions | |
| Args: | |
| request: FastAPI request object | |
| tenant_id: Tenant ID from request | |
| user_id: User ID from request | |
| Returns: | |
| True if access is granted, False otherwise | |
| """ | |
| # TODO: Implement actual authentication | |
| # For now, this is a placeholder that always returns True | |
| # In production, you MUST implement proper auth | |
| # Example implementation: | |
| # token = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| # if not token: | |
| # return False | |
| # | |
| # decoded = verify_jwt_token(token) | |
| # if decoded["user_id"] != user_id or decoded["tenant_id"] != tenant_id: | |
| # return False | |
| # | |
| # return True | |
| logger.warning("⚠️ Authentication middleware not implemented - using placeholder") | |
| return True | |
| def get_tenant_from_token(request: Request) -> Optional[str]: | |
| """ | |
| Extract tenant_id from authentication token. | |
| In production mode, extracts tenant_id from JWT token. | |
| In dev mode, returns None (allows request tenant_id). | |
| Args: | |
| request: FastAPI request object | |
| Returns: | |
| Tenant ID if found in token, None otherwise | |
| """ | |
| if settings.ENV == "dev": | |
| # Dev mode: allow request tenant_id | |
| return None | |
| # Production mode: extract from JWT | |
| auth_header = request.headers.get("Authorization", "") | |
| if not auth_header.startswith("Bearer "): | |
| logger.warning("Missing or invalid Authorization header") | |
| return None | |
| token = auth_header.replace("Bearer ", "").strip() | |
| if not token: | |
| return None | |
| try: | |
| # TODO: Replace with your actual JWT secret key | |
| # For now, this is a placeholder that expects a specific token format | |
| # In production, you should: | |
| # 1. Get JWT_SECRET from environment | |
| # 2. Verify token signature | |
| # 3. Extract tenant_id from token payload | |
| # Example implementation (replace with your actual JWT verification): | |
| # JWT_SECRET = os.getenv("JWT_SECRET", "your-secret-key") | |
| # decoded = jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) | |
| # return decoded.get("tenant_id") | |
| # Placeholder: Try to decode without verification (for testing) | |
| # In production, you MUST verify the signature | |
| try: | |
| decoded = jwt.decode(token, options={"verify_signature": False}) | |
| tenant_id = decoded.get("tenant_id") | |
| if tenant_id: | |
| logger.info(f"Extracted tenant_id from token: {tenant_id}") | |
| return tenant_id | |
| except jwt.DecodeError: | |
| logger.warning("Failed to decode JWT token") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error extracting tenant from token: {e}") | |
| return None | |
| return None | |
| async def get_auth_context(request: Request) -> Dict[str, Any]: | |
| """ | |
| Get authentication context from request. | |
| DEV mode: | |
| - Allows X-Tenant-Id and X-User-Id headers | |
| - Falls back to defaults if missing | |
| PROD mode: | |
| - Requires Authorization: Bearer <JWT> | |
| - Verifies JWT using JWT_SECRET | |
| - Extracts tenant_id and user_id from token claims | |
| - NEVER accepts tenant_id from request body/query params | |
| Args: | |
| request: FastAPI request object | |
| Returns: | |
| Dictionary with user_id and tenant_id | |
| Raises: | |
| HTTPException: If authentication fails (production mode only) | |
| """ | |
| if settings.ENV == "dev": | |
| # Dev mode: allow headers, fallback to defaults | |
| tenant_id = request.headers.get("X-Tenant-Id", "dev_tenant") | |
| user_id = request.headers.get("X-User-Id", "dev_user") | |
| return { | |
| "user_id": user_id, | |
| "tenant_id": tenant_id | |
| } | |
| # Production mode: require JWT token | |
| auth_header = request.headers.get("Authorization") | |
| if not auth_header or not auth_header.startswith("Bearer "): | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Authentication required. Provide valid Bearer token in Authorization header." | |
| ) | |
| token = auth_header.replace("Bearer ", "").strip() | |
| if not token: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid token format." | |
| ) | |
| # Verify JWT token | |
| if not settings.JWT_SECRET: | |
| logger.error("JWT_SECRET not configured in production mode") | |
| raise HTTPException( | |
| status_code=500, | |
| detail="Server configuration error: JWT_SECRET not set" | |
| ) | |
| try: | |
| decoded = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"]) | |
| user_id = decoded.get("user_id") or decoded.get("sub") | |
| tenant_id = decoded.get("tenant_id") | |
| if not user_id or not tenant_id: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Token missing required claims (user_id, tenant_id)." | |
| ) | |
| logger.info(f"Authenticated user: {user_id}, tenant: {tenant_id}") | |
| return { | |
| "user_id": user_id, | |
| "tenant_id": tenant_id, | |
| "email": decoded.get("email"), | |
| "role": decoded.get("role") | |
| } | |
| except JWTError as e: | |
| logger.warning(f"JWT verification failed: {e}") | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid or expired token." | |
| ) | |
| except Exception as e: | |
| logger.error(f"Auth error: {e}", exc_info=True) | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Authentication failed." | |
| ) | |
| # FastAPI dependency for easy use in endpoints | |
| async def require_auth(request: Request) -> Dict[str, Any]: | |
| """ | |
| FastAPI dependency for requiring authentication. | |
| Alias for get_auth_context for backward compatibility. | |
| """ | |
| return await get_auth_context(request) | |