File size: 1,788 Bytes
4dc70fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from typing import Optional
from fastapi import HTTPException, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from jose import jwt, JWTError

from .config import settings
from .supabase import get_client, is_enabled as supabase_enabled


security = HTTPBearer(auto_error=False)


class AuthUser(BaseModel):
    id: str
    email: Optional[str] = None
    role: Optional[str] = None


def decode_supabase_jwt(token: str) -> Optional[dict]:
    if not settings.supabase_jwt_secret:
        return None
    
    try:
        payload = jwt.decode(
            token,
            settings.supabase_jwt_secret,
            algorithms=["HS256"],
            audience="authenticated"
        )
        return payload
    except JWTError:
        return None


async def get_current_user(
    credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)
) -> Optional[AuthUser]:
    if not supabase_enabled():
        return None
    
    if not credentials:
        return None
    
    token = credentials.credentials
    payload = decode_supabase_jwt(token)
    
    if not payload:
        return None
    
    return AuthUser(
        id=payload.get("sub"),
        email=payload.get("email"),
        role=payload.get("role")
    )


async def require_auth(
    user: Optional[AuthUser] = Depends(get_current_user)
) -> AuthUser:
    if not supabase_enabled():
        raise HTTPException(status_code=503, detail="Authentication not configured")
    
    if not user:
        raise HTTPException(status_code=401, detail="Invalid or missing authentication token")
    
    return user


async def optional_auth(
    user: Optional[AuthUser] = Depends(get_current_user)
) -> Optional[AuthUser]:
    return user