silver / dependencies.py
Song
hi
238cf71
"""
Authentication and authorization dependencies for Silver Table Assistant.
Provides JWT verification and user role management using Supabase Auth.
"""
import os
import jwt
from typing import Optional, Dict, Any, Callable
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from supabase import create_client, Client
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_session
from models import Profile
# Initialize Supabase client
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise ValueError("SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY environment variables are required")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
# Security scheme
security = HTTPBearer()
class AuthenticationError(HTTPException):
"""Custom exception for authentication failures."""
def __init__(self, detail: str = "Could not validate credentials"):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
headers={"WWW-Authenticate": "Bearer"},
)
class AuthorizationError(HTTPException):
"""Custom exception for authorization failures."""
def __init__(self, detail: str = "Not enough permissions"):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail=detail,
)
class User:
"""User class representing an authenticated user."""
def __init__(
self,
user_id: str,
email: Optional[str] = None,
role: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
raw_user: Optional[Dict[str, Any]] = None
):
self.user_id = user_id
self.email = email
self.role = role or "family" # Default to family
self.metadata = metadata or {}
self.raw_user = raw_user or {}
async def verify_jwt_token(token: str) -> Dict[str, Any]:
"""
Verify JWT token using Supabase Auth.
Args:
token: JWT token to verify
Returns:
Decoded user information
Raises:
AuthenticationError: If token is invalid
"""
try:
# Verify the token with Supabase
response = supabase.auth.get_user(token)
if response.user is None:
raise AuthenticationError("Invalid token")
return {
"user_id": response.user.id,
"email": response.user.email,
"role": response.user.user_metadata.get("role", "user"),
"metadata": response.user.user_metadata,
"raw_user": response.user.__dict__
}
except Exception as e:
raise AuthenticationError(f"Token verification failed: {str(e)}")
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
"""
Get current authenticated user from JWT token.
Args:
credentials: HTTP authorization credentials
Returns:
User object with authentication information
Raises:
AuthenticationError: If authentication fails
"""
if not credentials:
raise AuthenticationError("No credentials provided")
try:
# Extract token from Bearer scheme
token = credentials.credentials
# Verify token and get user info
user_data = await verify_jwt_token(token)
return User(
user_id=user_data["user_id"],
email=user_data["email"],
role=user_data["role"],
metadata=user_data["metadata"],
raw_user=user_data["raw_user"]
)
except AuthenticationError:
raise
except Exception as e:
raise AuthenticationError(f"Authentication failed: {str(e)}")
async def get_optional_user(
request: Request
) -> Optional[User]:
"""
Get current user if authenticated, otherwise return None.
Args:
request: FastAPI request object
Returns:
User object or None
"""
try:
# Extract token from Authorization header if present
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ", 1)[1]
credentials = HTTPAuthorizationCredentials(scheme="Bearer", credentials=token)
return await get_current_user(credentials)
except AuthenticationError:
return None
except Exception:
return None
def require_role(required_role: str) -> Callable:
"""
Dependency factory to require specific user role.
Args:
required_role: Required user role
Returns:
Dependency function
"""
async def role_checker(user: User = Depends(get_current_user)) -> User:
if user.role != required_role and user.role != "admin":
raise AuthorizationError(
f"Required role: {required_role}, your role: {user.role}"
)
return user
return role_checker
def require_roles(allowed_roles: list[str]) -> Callable:
"""
Dependency factory to require one of multiple user roles.
Args:
allowed_roles: List of allowed user roles
Returns:
Dependency function
"""
async def roles_checker(user: User = Depends(get_current_user)) -> User:
if user.role not in allowed_roles and user.role != "admin":
raise AuthorizationError(
f"Required roles: {allowed_roles}, your role: {user.role}"
)
return user
return roles_checker
async def get_user_profile(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_session)
) -> Optional[Profile]:
"""
Get user's first profile from database (legacy/helper).
"""
try:
from sqlmodel import select
from uuid import UUID
user_id_uuid = UUID(user.user_id)
result = await db.execute(
select(Profile).where(Profile.user_id == user_id_uuid)
)
profile = result.scalars().first()
return profile
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to fetch user profile: {str(e)}"
)
async def get_or_create_user_profile(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_session)
) -> Profile:
"""
Get existing user profile or create a generic one.
"""
profile = await get_user_profile(user, db)
if profile is None:
from models import Profile
from uuid import UUID, uuid4
user_id_uuid = UUID(user.user_id)
# Create new profile with default values matching new schema
profile = Profile(
id=uuid4(),
user_id=user_id_uuid,
name=user.email.split('@')[0] if user.email else "User",
age=70,
gender="male",
height=165.0,
weight=60.0,
chronic_diseases=[],
dietary_restrictions=[],
chewing_ability="normal"
)
db.add(profile)
await db.commit()
await db.refresh(profile)
return profile
def get_user_metadata(user: User = Depends(get_current_user)) -> Dict[str, Any]:
"""
Get user metadata for use in AI prompts and recommendations.
Args:
user: Authenticated user
Returns:
User metadata dictionary
"""
return {
"user_id": user.user_id,
"email": user.email,
"role": user.role,
**user.metadata
}
def get_admin_user(
admin_only: User = Depends(require_role("admin"))
) -> User:
"""
Dependency for admin-only endpoints.
Args:
admin_only: User with admin role
Returns:
Admin user
"""
return admin_only
# Utility functions for role-based access control
def is_admin(user: User) -> bool:
"""Check if user is an admin."""
return user.role == "admin"
def is_staff(user: User) -> bool:
"""Check if user is staff or admin."""
return user.role in ["staff", "admin"]
def can_access_health_data(user: User) -> bool:
"""Check if user can access health-related data."""
return user.role in ["admin", "staff", "user"]
def can_manage_orders(user: User) -> bool:
"""Check if user can manage orders."""
return user.role in ["admin", "staff"]
def can_view_analytics(user: User) -> bool:
"""Check if user can view analytics."""
return user.role == "admin"
# CORS and security utilities
def get_allowed_origins() -> list[str]:
"""Get allowed CORS origins from environment."""
from config import settings
return settings.cors_origins
def get_api_version() -> str:
"""Get API version from environment or default."""
from config import settings
return settings.api_version