Spaces:
Paused
Paused
File size: 7,762 Bytes
7672657 b62d4a1 7672657 14c0c90 7672657 14c0c90 66ee4db 14c0c90 7672657 b62d4a1 14c0c90 7672657 b62d4a1 14c0c90 b62d4a1 7672657 b62d4a1 7672657 b62d4a1 7672657 b62d4a1 7672657 b62d4a1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from supabase import create_client, Client
from typing import Optional
import os
from dotenv import load_dotenv
from pydantic import BaseModel
import jwt
from datetime import datetime, timedelta
# Load environment variables
load_dotenv()
# Supabase configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
# Development mode configuration
DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true"
DEV_SECRET_TOKEN = os.getenv("DEV_SECRET_TOKEN", "dev-secret-token-change-this")
DEV_USER_EMAIL = os.getenv("DEV_USER_EMAIL") # Email of the dev user in Supabase
# Validate environment variables
if not SUPABASE_URL:
raise RuntimeError("SUPABASE_URL environment variable is not set. Please check your .env file.")
if not SUPABASE_KEY:
raise RuntimeError("SUPABASE_KEY environment variable is not set. Please check your .env file.")
try:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
except Exception as e:
raise RuntimeError(f"Failed to initialize Supabase client: {str(e)}. Please check your Supabase credentials.")
# Security scheme for JWT
security = HTTPBearer()
class User(BaseModel):
id: str
email: str
role: str = "user"
class TokenData(BaseModel):
email: Optional[str] = None
def get_dev_user() -> Optional[User]:
"""
Get a development user for testing purposes.
This requires DEV_USER_EMAIL and optionally DEV_USER_ID to be set.
If DEV_USER_ID is not provided, a default dev user ID will be used.
"""
if not DEV_USER_EMAIL:
return None
try:
# Use DEV_USER_ID if provided, otherwise use a default dev ID
dev_user_id = os.getenv("DEV_USER_ID", "dev-user-id-change-this")
print(f"β
Creating dev user: {DEV_USER_EMAIL} (ID: {dev_user_id})")
return User(
id=dev_user_id,
email=DEV_USER_EMAIL,
role="user"
)
except Exception as e:
print(f"β οΈ Could not create dev user: {str(e)}")
# Return a fallback dev user
return User(
id="dev-user-fallback",
email=DEV_USER_EMAIL or "dev@example.com",
role="user"
)
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
"""
Validate JWT token and return the current user.
Supports backend JWT tokens, Supabase tokens, and development mode bypass.
"""
token = credentials.credentials
# Development mode bypass
if DEV_MODE and token == DEV_SECRET_TOKEN:
print(f"π§ Development mode: Using dev token for authentication")
dev_user = get_dev_user()
if dev_user:
print(f"β
Development mode authenticated for user: {dev_user.email}")
return dev_user
else:
print(f"β Development mode enabled but no valid dev user found")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Development mode enabled but DEV_USER_EMAIL not set or user not found"
)
# First, try to decode as a backend JWT token
try:
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this")
payload = jwt.decode(token, secret_key, algorithms=["HS256"])
user_id = payload.get("sub")
email = payload.get("email")
if user_id and email:
print(f"β
Successfully authenticated with backend JWT token for user: {email}")
return User(
id=user_id,
email=email,
role="user"
)
except jwt.InvalidTokenError:
# If JWT decoding fails, try Supabase token verification
print("π Backend JWT decode failed, trying Supabase token...")
pass
except Exception as e:
# Log other JWT errors but continue to Supabase fallback
print(f"β οΈ JWT decode error: {str(e)}")
# Fallback to Supabase token verification
try:
user = supabase.auth.get_user(token)
if not user or not user.user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Use the user info from Supabase Auth directly
print(f"β
Successfully authenticated with Supabase token for user: {user.user.email}")
return User(
id=user.user.id,
email=user.user.email,
role="user" # Default role
)
except Exception as e:
print(f"β Both authentication methods failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
"""
Check if the current user is active
"""
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Inactive user",
headers={"WWW-Authenticate": "Bearer"},
)
return current_user
# Role-based access control
def require_role(required_role: str):
def role_checker(current_user: User = Depends(get_current_active_user)):
if current_user.role != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
return role_checker
async def get_supabase_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""
Extract Supabase token from Authorization header
"""
return credentials.credentials
def verify_supabase_token(token: str) -> dict:
"""
Verify Supabase token and return user data
"""
try:
user_response = supabase.auth.get_user(token)
if not user_response or not user_response.user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Supabase token"
)
# Return user data as dictionary for easier handling
return {
"id": user_response.user.id,
"email": user_response.user.email,
"user_metadata": user_response.user.user_metadata or {},
"app_metadata": user_response.user.app_metadata or {},
"created_at": user_response.user.created_at,
"updated_at": user_response.user.updated_at
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token verification failed: {str(e)}"
)
def create_backend_jwt_token(user_data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a backend JWT token for the user
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=7) # Token expires in 7 days
to_encode = {
"sub": user_data["id"],
"email": user_data["email"],
"exp": expire,
"iat": datetime.utcnow()
}
# Use a secret key for JWT signing (you should set this in your .env file)
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this")
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm="HS256")
return encoded_jwt |