""" Authentication and authorization middleware. Extracts tenant_id from JWT token in production mode. """ from fastapi import Request, HTTPException, Depends from typing import Optional, Dict, Any import logging from jose import JWTError, jwt from app.config import settings logger = logging.getLogger(__name__) async def verify_tenant_access( request: Request, tenant_id: str, user_id: str ) -> bool: """ Verify that the user has access to the specified tenant. TODO: Implement actual authentication logic: 1. Extract JWT token from Authorization header 2. Verify token signature 3. Extract user_id and tenant_id from token 4. Verify user belongs to tenant 5. Check permissions Args: request: FastAPI request object tenant_id: Tenant ID from request user_id: User ID from request Returns: True if access is granted, False otherwise """ # TODO: Implement actual authentication # For now, this is a placeholder that always returns True # In production, you MUST implement proper auth # Example implementation: # token = request.headers.get("Authorization", "").replace("Bearer ", "") # if not token: # return False # # decoded = verify_jwt_token(token) # if decoded["user_id"] != user_id or decoded["tenant_id"] != tenant_id: # return False # # return True logger.warning("⚠️ Authentication middleware not implemented - using placeholder") return True def get_tenant_from_token(request: Request) -> Optional[str]: """ Extract tenant_id from authentication token. In production mode, extracts tenant_id from JWT token. In dev mode, returns None (allows request tenant_id). Args: request: FastAPI request object Returns: Tenant ID if found in token, None otherwise """ if settings.ENV == "dev": # Dev mode: allow request tenant_id return None # Production mode: extract from JWT auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): logger.warning("Missing or invalid Authorization header") return None token = auth_header.replace("Bearer ", "").strip() if not token: return None try: # TODO: Replace with your actual JWT secret key # For now, this is a placeholder that expects a specific token format # In production, you should: # 1. Get JWT_SECRET from environment # 2. Verify token signature # 3. Extract tenant_id from token payload # Example implementation (replace with your actual JWT verification): # JWT_SECRET = os.getenv("JWT_SECRET", "your-secret-key") # decoded = jwt.decode(token, JWT_SECRET, algorithms=["HS256"]) # return decoded.get("tenant_id") # Placeholder: Try to decode without verification (for testing) # In production, you MUST verify the signature try: decoded = jwt.decode(token, options={"verify_signature": False}) tenant_id = decoded.get("tenant_id") if tenant_id: logger.info(f"Extracted tenant_id from token: {tenant_id}") return tenant_id except jwt.DecodeError: logger.warning("Failed to decode JWT token") return None except Exception as e: logger.error(f"Error extracting tenant from token: {e}") return None return None async def get_auth_context(request: Request) -> Dict[str, Any]: """ Get authentication context from request. DEV mode: - Allows X-Tenant-Id and X-User-Id headers - Falls back to defaults if missing PROD mode: - Requires Authorization: Bearer - Verifies JWT using JWT_SECRET - Extracts tenant_id and user_id from token claims - NEVER accepts tenant_id from request body/query params Args: request: FastAPI request object Returns: Dictionary with user_id and tenant_id Raises: HTTPException: If authentication fails (production mode only) """ if settings.ENV == "dev": # Dev mode: allow headers, fallback to defaults tenant_id = request.headers.get("X-Tenant-Id", "dev_tenant") user_id = request.headers.get("X-User-Id", "dev_user") return { "user_id": user_id, "tenant_id": tenant_id } # Production mode: require JWT token auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): raise HTTPException( status_code=401, detail="Authentication required. Provide valid Bearer token in Authorization header." ) token = auth_header.replace("Bearer ", "").strip() if not token: raise HTTPException( status_code=401, detail="Invalid token format." ) # Verify JWT token if not settings.JWT_SECRET: logger.error("JWT_SECRET not configured in production mode") raise HTTPException( status_code=500, detail="Server configuration error: JWT_SECRET not set" ) try: decoded = jwt.decode(token, settings.JWT_SECRET, algorithms=["HS256"]) user_id = decoded.get("user_id") or decoded.get("sub") tenant_id = decoded.get("tenant_id") if not user_id or not tenant_id: raise HTTPException( status_code=401, detail="Token missing required claims (user_id, tenant_id)." ) logger.info(f"Authenticated user: {user_id}, tenant: {tenant_id}") return { "user_id": user_id, "tenant_id": tenant_id, "email": decoded.get("email"), "role": decoded.get("role") } except JWTError as e: logger.warning(f"JWT verification failed: {e}") raise HTTPException( status_code=401, detail="Invalid or expired token." ) except Exception as e: logger.error(f"Auth error: {e}", exc_info=True) raise HTTPException( status_code=401, detail="Authentication failed." ) # FastAPI dependency for easy use in endpoints async def require_auth(request: Request) -> Dict[str, Any]: """ FastAPI dependency for requiring authentication. Alias for get_auth_context for backward compatibility. """ return await get_auth_context(request)