3dai-backend / auth.py
Maksymilian Jankowski
deploy fix, removed celery and reids
66ee4db
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from supabase import create_client, Client
from typing import Optional
import os
from dotenv import load_dotenv
from pydantic import BaseModel
import jwt
from datetime import datetime, timedelta
# Load environment variables
load_dotenv()
# Supabase configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
# Development mode configuration
DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true"
DEV_SECRET_TOKEN = os.getenv("DEV_SECRET_TOKEN", "dev-secret-token-change-this")
DEV_USER_EMAIL = os.getenv("DEV_USER_EMAIL") # Email of the dev user in Supabase
# Validate environment variables
if not SUPABASE_URL:
raise RuntimeError("SUPABASE_URL environment variable is not set. Please check your .env file.")
if not SUPABASE_KEY:
raise RuntimeError("SUPABASE_KEY environment variable is not set. Please check your .env file.")
try:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
except Exception as e:
raise RuntimeError(f"Failed to initialize Supabase client: {str(e)}. Please check your Supabase credentials.")
# Security scheme for JWT
security = HTTPBearer()
class User(BaseModel):
id: str
email: str
role: str = "user"
class TokenData(BaseModel):
email: Optional[str] = None
def get_dev_user() -> Optional[User]:
"""
Get a development user for testing purposes.
This requires DEV_USER_EMAIL and optionally DEV_USER_ID to be set.
If DEV_USER_ID is not provided, a default dev user ID will be used.
"""
if not DEV_USER_EMAIL:
return None
try:
# Use DEV_USER_ID if provided, otherwise use a default dev ID
dev_user_id = os.getenv("DEV_USER_ID", "dev-user-id-change-this")
print(f"βœ… Creating dev user: {DEV_USER_EMAIL} (ID: {dev_user_id})")
return User(
id=dev_user_id,
email=DEV_USER_EMAIL,
role="user"
)
except Exception as e:
print(f"⚠️ Could not create dev user: {str(e)}")
# Return a fallback dev user
return User(
id="dev-user-fallback",
email=DEV_USER_EMAIL or "dev@example.com",
role="user"
)
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
"""
Validate JWT token and return the current user.
Supports backend JWT tokens, Supabase tokens, and development mode bypass.
"""
token = credentials.credentials
# Development mode bypass
if DEV_MODE and token == DEV_SECRET_TOKEN:
print(f"πŸ”§ Development mode: Using dev token for authentication")
dev_user = get_dev_user()
if dev_user:
print(f"βœ… Development mode authenticated for user: {dev_user.email}")
return dev_user
else:
print(f"❌ Development mode enabled but no valid dev user found")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Development mode enabled but DEV_USER_EMAIL not set or user not found"
)
# First, try to decode as a backend JWT token
try:
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this")
payload = jwt.decode(token, secret_key, algorithms=["HS256"])
user_id = payload.get("sub")
email = payload.get("email")
if user_id and email:
print(f"βœ… Successfully authenticated with backend JWT token for user: {email}")
return User(
id=user_id,
email=email,
role="user"
)
except jwt.InvalidTokenError:
# If JWT decoding fails, try Supabase token verification
print("πŸ”„ Backend JWT decode failed, trying Supabase token...")
pass
except Exception as e:
# Log other JWT errors but continue to Supabase fallback
print(f"⚠️ JWT decode error: {str(e)}")
# Fallback to Supabase token verification
try:
user = supabase.auth.get_user(token)
if not user or not user.user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Use the user info from Supabase Auth directly
print(f"βœ… Successfully authenticated with Supabase token for user: {user.user.email}")
return User(
id=user.user.id,
email=user.user.email,
role="user" # Default role
)
except Exception as e:
print(f"❌ Both authentication methods failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Authentication failed: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
"""
Check if the current user is active
"""
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Inactive user",
headers={"WWW-Authenticate": "Bearer"},
)
return current_user
# Role-based access control
def require_role(required_role: str):
def role_checker(current_user: User = Depends(get_current_active_user)):
if current_user.role != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user
return role_checker
async def get_supabase_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""
Extract Supabase token from Authorization header
"""
return credentials.credentials
def verify_supabase_token(token: str) -> dict:
"""
Verify Supabase token and return user data
"""
try:
user_response = supabase.auth.get_user(token)
if not user_response or not user_response.user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Supabase token"
)
# Return user data as dictionary for easier handling
return {
"id": user_response.user.id,
"email": user_response.user.email,
"user_metadata": user_response.user.user_metadata or {},
"app_metadata": user_response.user.app_metadata or {},
"created_at": user_response.user.created_at,
"updated_at": user_response.user.updated_at
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token verification failed: {str(e)}"
)
def create_backend_jwt_token(user_data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a backend JWT token for the user
"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=7) # Token expires in 7 days
to_encode = {
"sub": user_data["id"],
"email": user_data["email"],
"exp": expire,
"iat": datetime.utcnow()
}
# Use a secret key for JWT signing (you should set this in your .env file)
secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this")
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm="HS256")
return encoded_jwt