ACA050's picture
Upload 520 files
2ed8996 verified
"""
Authentication dependencies for AegisLM SaaS Backend.
Production-ready dependency injection for authentication,
authorization, and user context management.
"""
from typing import Optional
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from core.database import get_db
from services.auth_service import AuthService
from db_models.user import User
from db_models.api_key import ApiKey
# HTTP Bearer token scheme (optional for dual auth support)
security = HTTPBearer(auto_error=False)
async def get_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""
Get current authenticated user from JWT token or session cookie.
Args:
request: HTTP request object
credentials: HTTP Authorization credentials (optional)
db: Database session
Returns:
User: Current authenticated user
Raises:
HTTPException: If authentication fails
"""
auth_service = AuthService(db)
user = None
# Try JWT authentication first
if credentials:
user = await auth_service.verify_token(credentials.credentials)
# Fallback to session-based authentication
if not user:
session_id = request.cookies.get("session_id")
if session_id:
session_data = await auth_service.get_session(session_id)
if session_data:
user = await auth_service.get_user_by_id(session_data.get("user_id"))
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User account is deactivated",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Get current active user.
Args:
current_user: Current authenticated user
Returns:
User: Current active user
Raises:
HTTPException: If user is not active
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User account is not active"
)
return current_user
async def get_current_verified_user(
current_user: User = Depends(get_current_active_user)
) -> User:
"""
Get current verified user.
Args:
current_user: Current active user
Returns:
User: Current verified user
Raises:
HTTPException: If user is not verified
"""
if not current_user.is_verified:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="User account is not verified"
)
return current_user
async def get_api_key_user(
api_key: str,
db: AsyncSession = Depends(get_db)
) -> User:
"""
Get user from API key.
Args:
api_key: API key string
db: Database session
Returns:
User: User associated with API key
Raises:
HTTPException: If API key is invalid
"""
# Find API key
result = await db.execute(
select(ApiKey).where(ApiKey.key == api_key)
)
api_key_obj = result.scalar_one_or_none()
if not api_key_obj:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
# Check if API key is valid
if not api_key_obj.is_valid():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key is expired or inactive"
)
# Get user
user = await db.get(User, api_key_obj.user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User account is deactivated"
)
# Update API key usage
api_key_obj.update_usage()
await db.commit()
return user
async def get_optional_current_user(
request: Request,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
db: AsyncSession = Depends(get_db)
) -> Optional[User]:
"""
Get current user if authenticated, otherwise None.
Supports both JWT and session-based authentication.
Args:
request: HTTP request object
credentials: HTTP Authorization credentials (optional)
db: Database session
Returns:
Optional[User]: Current user or None
"""
auth_service = AuthService(db)
user = None
# Try JWT authentication first
if credentials:
try:
user = await auth_service.verify_token(credentials.credentials)
except Exception:
pass
# Fallback to session-based authentication
if not user:
try:
session_id = request.cookies.get("session_id")
if session_id:
session_data = await auth_service.get_session(session_id)
if session_data:
user = await auth_service.get_user_by_id(session_data.get("user_id"))
except Exception:
pass
if user and user.is_active:
return user
return None
class RequirePermission:
"""
Permission requirement dependency.
This class can be used to create permission-based dependencies.
"""
def __init__(self, required_permission: str):
self.required_permission = required_permission
def __call__(self, current_user: User = Depends(get_current_active_user)) -> User:
"""
Check if user has required permission.
Args:
current_user: Current authenticated user
Returns:
User: Current user if has permission
Raises:
HTTPException: If user lacks permission
"""
if not current_user.has_permission(self.required_permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient permissions. Required: {self.required_permission}"
)
return current_user
# Common permission dependencies
RequireVerifiedUser = Depends(get_current_verified_user)
RequireActiveUser = Depends(get_current_active_user)
# Rate limiting dependencies
async def get_rate_limit_key(
current_user: Optional[User] = Depends(get_optional_current_user),
api_key_user: Optional[User] = None
) -> str:
"""
Get rate limit key for user.
Args:
current_user: Current authenticated user
api_key_user: User from API key
Returns:
str: Rate limit key
"""
if current_user:
return f"user:{current_user.id}"
elif api_key_user:
return f"api_key:{api_key_user.id}"
else:
# Use IP address or other identifier for anonymous users
return "anonymous"