from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from supabase import create_client, Client from typing import Optional import os from dotenv import load_dotenv from pydantic import BaseModel import jwt from datetime import datetime, timedelta # Load environment variables load_dotenv() # Supabase configuration SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") # Development mode configuration DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true" DEV_SECRET_TOKEN = os.getenv("DEV_SECRET_TOKEN", "dev-secret-token-change-this") DEV_USER_EMAIL = os.getenv("DEV_USER_EMAIL") # Email of the dev user in Supabase # Validate environment variables if not SUPABASE_URL: raise RuntimeError("SUPABASE_URL environment variable is not set. Please check your .env file.") if not SUPABASE_KEY: raise RuntimeError("SUPABASE_KEY environment variable is not set. Please check your .env file.") try: supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) except Exception as e: raise RuntimeError(f"Failed to initialize Supabase client: {str(e)}. Please check your Supabase credentials.") # Security scheme for JWT security = HTTPBearer() class User(BaseModel): id: str email: str role: str = "user" class TokenData(BaseModel): email: Optional[str] = None def get_dev_user() -> Optional[User]: """ Get a development user for testing purposes. This requires DEV_USER_EMAIL and optionally DEV_USER_ID to be set. If DEV_USER_ID is not provided, a default dev user ID will be used. """ if not DEV_USER_EMAIL: return None try: # Use DEV_USER_ID if provided, otherwise use a default dev ID dev_user_id = os.getenv("DEV_USER_ID", "dev-user-id-change-this") print(f"✅ Creating dev user: {DEV_USER_EMAIL} (ID: {dev_user_id})") return User( id=dev_user_id, email=DEV_USER_EMAIL, role="user" ) except Exception as e: print(f"⚠️ Could not create dev user: {str(e)}") # Return a fallback dev user return User( id="dev-user-fallback", email=DEV_USER_EMAIL or "dev@example.com", role="user" ) async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User: """ Validate JWT token and return the current user. Supports backend JWT tokens, Supabase tokens, and development mode bypass. """ token = credentials.credentials # Development mode bypass if DEV_MODE and token == DEV_SECRET_TOKEN: print(f"🔧 Development mode: Using dev token for authentication") dev_user = get_dev_user() if dev_user: print(f"✅ Development mode authenticated for user: {dev_user.email}") return dev_user else: print(f"❌ Development mode enabled but no valid dev user found") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Development mode enabled but DEV_USER_EMAIL not set or user not found" ) # First, try to decode as a backend JWT token try: secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this") payload = jwt.decode(token, secret_key, algorithms=["HS256"]) user_id = payload.get("sub") email = payload.get("email") if user_id and email: print(f"✅ Successfully authenticated with backend JWT token for user: {email}") return User( id=user_id, email=email, role="user" ) except jwt.InvalidTokenError: # If JWT decoding fails, try Supabase token verification print("🔄 Backend JWT decode failed, trying Supabase token...") pass except Exception as e: # Log other JWT errors but continue to Supabase fallback print(f"⚠️ JWT decode error: {str(e)}") # Fallback to Supabase token verification try: user = supabase.auth.get_user(token) if not user or not user.user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) # Use the user info from Supabase Auth directly print(f"✅ Successfully authenticated with Supabase token for user: {user.user.email}") return User( id=user.user.id, email=user.user.email, role="user" # Default role ) except Exception as e: print(f"❌ Both authentication methods failed: {str(e)}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Authentication failed: {str(e)}", headers={"WWW-Authenticate": "Bearer"}, ) def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: """ Check if the current user is active """ if not current_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user", headers={"WWW-Authenticate": "Bearer"}, ) return current_user # Role-based access control def require_role(required_role: str): def role_checker(current_user: User = Depends(get_current_active_user)): if current_user.role != required_role: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) return current_user return role_checker async def get_supabase_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: """ Extract Supabase token from Authorization header """ return credentials.credentials def verify_supabase_token(token: str) -> dict: """ Verify Supabase token and return user data """ try: user_response = supabase.auth.get_user(token) if not user_response or not user_response.user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Supabase token" ) # Return user data as dictionary for easier handling return { "id": user_response.user.id, "email": user_response.user.email, "user_metadata": user_response.user.user_metadata or {}, "app_metadata": user_response.user.app_metadata or {}, "created_at": user_response.user.created_at, "updated_at": user_response.user.updated_at } except Exception as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Token verification failed: {str(e)}" ) def create_backend_jwt_token(user_data: dict, expires_delta: Optional[timedelta] = None) -> str: """ Create a backend JWT token for the user """ if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=7) # Token expires in 7 days to_encode = { "sub": user_data["id"], "email": user_data["email"], "exp": expire, "iat": datetime.utcnow() } # Use a secret key for JWT signing (you should set this in your .env file) secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this") encoded_jwt = jwt.encode(to_encode, secret_key, algorithm="HS256") return encoded_jwt