Spaces:
Paused
Paused
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from supabase import create_client, Client | |
| from typing import Optional | |
| import os | |
| from dotenv import load_dotenv | |
| from pydantic import BaseModel | |
| import jwt | |
| from datetime import datetime, timedelta | |
| # Load environment variables | |
| load_dotenv() | |
| # Supabase configuration | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_KEY") | |
| # Development mode configuration | |
| DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true" | |
| DEV_SECRET_TOKEN = os.getenv("DEV_SECRET_TOKEN", "dev-secret-token-change-this") | |
| DEV_USER_EMAIL = os.getenv("DEV_USER_EMAIL") # Email of the dev user in Supabase | |
| # Validate environment variables | |
| if not SUPABASE_URL: | |
| raise RuntimeError("SUPABASE_URL environment variable is not set. Please check your .env file.") | |
| if not SUPABASE_KEY: | |
| raise RuntimeError("SUPABASE_KEY environment variable is not set. Please check your .env file.") | |
| try: | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to initialize Supabase client: {str(e)}. Please check your Supabase credentials.") | |
| # Security scheme for JWT | |
| security = HTTPBearer() | |
| class User(BaseModel): | |
| id: str | |
| email: str | |
| role: str = "user" | |
| class TokenData(BaseModel): | |
| email: Optional[str] = None | |
| def get_dev_user() -> Optional[User]: | |
| """ | |
| Get a development user for testing purposes. | |
| This requires DEV_USER_EMAIL and optionally DEV_USER_ID to be set. | |
| If DEV_USER_ID is not provided, a default dev user ID will be used. | |
| """ | |
| if not DEV_USER_EMAIL: | |
| return None | |
| try: | |
| # Use DEV_USER_ID if provided, otherwise use a default dev ID | |
| dev_user_id = os.getenv("DEV_USER_ID", "dev-user-id-change-this") | |
| print(f"β Creating dev user: {DEV_USER_EMAIL} (ID: {dev_user_id})") | |
| return User( | |
| id=dev_user_id, | |
| email=DEV_USER_EMAIL, | |
| role="user" | |
| ) | |
| except Exception as e: | |
| print(f"β οΈ Could not create dev user: {str(e)}") | |
| # Return a fallback dev user | |
| return User( | |
| id="dev-user-fallback", | |
| email=DEV_USER_EMAIL or "dev@example.com", | |
| role="user" | |
| ) | |
| async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User: | |
| """ | |
| Validate JWT token and return the current user. | |
| Supports backend JWT tokens, Supabase tokens, and development mode bypass. | |
| """ | |
| token = credentials.credentials | |
| # Development mode bypass | |
| if DEV_MODE and token == DEV_SECRET_TOKEN: | |
| print(f"π§ Development mode: Using dev token for authentication") | |
| dev_user = get_dev_user() | |
| if dev_user: | |
| print(f"β Development mode authenticated for user: {dev_user.email}") | |
| return dev_user | |
| else: | |
| print(f"β Development mode enabled but no valid dev user found") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Development mode enabled but DEV_USER_EMAIL not set or user not found" | |
| ) | |
| # First, try to decode as a backend JWT token | |
| try: | |
| secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this") | |
| payload = jwt.decode(token, secret_key, algorithms=["HS256"]) | |
| user_id = payload.get("sub") | |
| email = payload.get("email") | |
| if user_id and email: | |
| print(f"β Successfully authenticated with backend JWT token for user: {email}") | |
| return User( | |
| id=user_id, | |
| email=email, | |
| role="user" | |
| ) | |
| except jwt.InvalidTokenError: | |
| # If JWT decoding fails, try Supabase token verification | |
| print("π Backend JWT decode failed, trying Supabase token...") | |
| pass | |
| except Exception as e: | |
| # Log other JWT errors but continue to Supabase fallback | |
| print(f"β οΈ JWT decode error: {str(e)}") | |
| # Fallback to Supabase token verification | |
| try: | |
| user = supabase.auth.get_user(token) | |
| if not user or not user.user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid authentication credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Use the user info from Supabase Auth directly | |
| print(f"β Successfully authenticated with Supabase token for user: {user.user.email}") | |
| return User( | |
| id=user.user.id, | |
| email=user.user.email, | |
| role="user" # Default role | |
| ) | |
| except Exception as e: | |
| print(f"β Both authentication methods failed: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=f"Authentication failed: {str(e)}", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: | |
| """ | |
| Check if the current user is active | |
| """ | |
| if not current_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Inactive user", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| return current_user | |
| # Role-based access control | |
| def require_role(required_role: str): | |
| def role_checker(current_user: User = Depends(get_current_active_user)): | |
| if current_user.role != required_role: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Not enough permissions" | |
| ) | |
| return current_user | |
| return role_checker | |
| async def get_supabase_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: | |
| """ | |
| Extract Supabase token from Authorization header | |
| """ | |
| return credentials.credentials | |
| def verify_supabase_token(token: str) -> dict: | |
| """ | |
| Verify Supabase token and return user data | |
| """ | |
| try: | |
| user_response = supabase.auth.get_user(token) | |
| if not user_response or not user_response.user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid Supabase token" | |
| ) | |
| # Return user data as dictionary for easier handling | |
| return { | |
| "id": user_response.user.id, | |
| "email": user_response.user.email, | |
| "user_metadata": user_response.user.user_metadata or {}, | |
| "app_metadata": user_response.user.app_metadata or {}, | |
| "created_at": user_response.user.created_at, | |
| "updated_at": user_response.user.updated_at | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail=f"Token verification failed: {str(e)}" | |
| ) | |
| def create_backend_jwt_token(user_data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| """ | |
| Create a backend JWT token for the user | |
| """ | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.utcnow() + timedelta(days=7) # Token expires in 7 days | |
| to_encode = { | |
| "sub": user_data["id"], | |
| "email": user_data["email"], | |
| "exp": expire, | |
| "iat": datetime.utcnow() | |
| } | |
| # Use a secret key for JWT signing (you should set this in your .env file) | |
| secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this") | |
| encoded_jwt = jwt.encode(to_encode, secret_key, algorithm="HS256") | |
| return encoded_jwt |