File size: 7,762 Bytes
7672657
 
 
 
 
 
 
b62d4a1
 
7672657
 
 
 
 
 
 
 
14c0c90
 
 
 
 
7672657
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14c0c90
 
 
 
 
 
 
 
 
 
 
66ee4db
14c0c90
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7672657
 
b62d4a1
14c0c90
7672657
b62d4a1
 
14c0c90
 
 
 
 
 
 
 
 
 
 
 
 
 
b62d4a1
7672657
b62d4a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7672657
 
 
 
 
 
 
b62d4a1
7672657
 
 
 
 
 
b62d4a1
7672657
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b62d4a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from supabase import create_client, Client
from typing import Optional
import os
from dotenv import load_dotenv
from pydantic import BaseModel
import jwt
from datetime import datetime, timedelta

# Load environment variables
load_dotenv()

# Supabase configuration
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")

# Development mode configuration
DEV_MODE = os.getenv("DEV_MODE", "false").lower() == "true"
DEV_SECRET_TOKEN = os.getenv("DEV_SECRET_TOKEN", "dev-secret-token-change-this")
DEV_USER_EMAIL = os.getenv("DEV_USER_EMAIL")  # Email of the dev user in Supabase

# Validate environment variables
if not SUPABASE_URL:
    raise RuntimeError("SUPABASE_URL environment variable is not set. Please check your .env file.")
if not SUPABASE_KEY:
    raise RuntimeError("SUPABASE_KEY environment variable is not set. Please check your .env file.")

try:
    supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
except Exception as e:
    raise RuntimeError(f"Failed to initialize Supabase client: {str(e)}. Please check your Supabase credentials.")

# Security scheme for JWT
security = HTTPBearer()

class User(BaseModel):
    id: str
    email: str
    role: str = "user"

class TokenData(BaseModel):
    email: Optional[str] = None

def get_dev_user() -> Optional[User]:
    """
    Get a development user for testing purposes.
    This requires DEV_USER_EMAIL and optionally DEV_USER_ID to be set.
    If DEV_USER_ID is not provided, a default dev user ID will be used.
    """
    if not DEV_USER_EMAIL:
        return None
        
    try:
        # Use DEV_USER_ID if provided, otherwise use a default dev ID
        dev_user_id = os.getenv("DEV_USER_ID", "dev-user-id-change-this")
        
        print(f"βœ… Creating dev user: {DEV_USER_EMAIL} (ID: {dev_user_id})")
        return User(
            id=dev_user_id,
            email=DEV_USER_EMAIL,
            role="user"
        )
            
    except Exception as e:
        print(f"⚠️ Could not create dev user: {str(e)}")
        # Return a fallback dev user
        return User(
            id="dev-user-fallback", 
            email=DEV_USER_EMAIL or "dev@example.com",
            role="user"
        )

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> User:
    """
    Validate JWT token and return the current user.
    Supports backend JWT tokens, Supabase tokens, and development mode bypass.
    """
    token = credentials.credentials
    
    # Development mode bypass
    if DEV_MODE and token == DEV_SECRET_TOKEN:
        print(f"πŸ”§ Development mode: Using dev token for authentication")
        dev_user = get_dev_user()
        if dev_user:
            print(f"βœ… Development mode authenticated for user: {dev_user.email}")
            return dev_user
        else:
            print(f"❌ Development mode enabled but no valid dev user found")
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail="Development mode enabled but DEV_USER_EMAIL not set or user not found"
            )
    
    # First, try to decode as a backend JWT token
    try:
        secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this")
        payload = jwt.decode(token, secret_key, algorithms=["HS256"])
        user_id = payload.get("sub")
        email = payload.get("email")
        
        if user_id and email:
            print(f"βœ… Successfully authenticated with backend JWT token for user: {email}")
            return User(
                id=user_id,
                email=email,
                role="user"
            )
    except jwt.InvalidTokenError:
        # If JWT decoding fails, try Supabase token verification
        print("πŸ”„ Backend JWT decode failed, trying Supabase token...")
        pass
    except Exception as e:
        # Log other JWT errors but continue to Supabase fallback
        print(f"⚠️ JWT decode error: {str(e)}")
    
    # Fallback to Supabase token verification
    try:
        user = supabase.auth.get_user(token)
        if not user or not user.user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid authentication credentials",
                headers={"WWW-Authenticate": "Bearer"},
            )
        # Use the user info from Supabase Auth directly
        print(f"βœ… Successfully authenticated with Supabase token for user: {user.user.email}")
        return User(
            id=user.user.id,
            email=user.user.email,
            role="user"  # Default role
        )
    except Exception as e:
        print(f"❌ Both authentication methods failed: {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Authentication failed: {str(e)}",
            headers={"WWW-Authenticate": "Bearer"},
        )

def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
    """
    Check if the current user is active
    """
    if not current_user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Inactive user",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return current_user

# Role-based access control
def require_role(required_role: str):
    def role_checker(current_user: User = Depends(get_current_active_user)):
        if current_user.role != required_role:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Not enough permissions"
            )
        return current_user
    return role_checker

async def get_supabase_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
    """
    Extract Supabase token from Authorization header
    """
    return credentials.credentials

def verify_supabase_token(token: str) -> dict:
    """
    Verify Supabase token and return user data
    """
    try:
        user_response = supabase.auth.get_user(token)
        if not user_response or not user_response.user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid Supabase token"
            )
        # Return user data as dictionary for easier handling
        return {
            "id": user_response.user.id,
            "email": user_response.user.email,
            "user_metadata": user_response.user.user_metadata or {},
            "app_metadata": user_response.user.app_metadata or {},
            "created_at": user_response.user.created_at,
            "updated_at": user_response.user.updated_at
        }
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Token verification failed: {str(e)}"
        )

def create_backend_jwt_token(user_data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """
    Create a backend JWT token for the user
    """
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(days=7)  # Token expires in 7 days
    
    to_encode = {
        "sub": user_data["id"],
        "email": user_data["email"],
        "exp": expire,
        "iat": datetime.utcnow()
    }
    
    # Use a secret key for JWT signing (you should set this in your .env file)
    secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-this")
    encoded_jwt = jwt.encode(to_encode, secret_key, algorithm="HS256")
    return encoded_jwt