File size: 7,873 Bytes
da64db2
e673ce2
 
da64db2
 
 
e673ce2
da64db2
76ca861
e673ce2
76ca861
da64db2
e673ce2
da64db2
 
 
 
e673ce2
da64db2
e673ce2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
da64db2
e673ce2
da64db2
e673ce2
da64db2
e673ce2
 
 
 
 
 
 
 
 
 
 
da64db2
 
 
 
 
 
e673ce2
da64db2
 
 
76ca861
 
e673ce2
 
76ca861
 
e673ce2
76ca861
e673ce2
 
 
 
 
 
 
 
 
 
86d79a3
 
 
 
 
 
e673ce2
86d79a3
 
 
 
 
 
76ca861
e673ce2
 
 
 
 
 
 
 
 
 
 
 
76ca861
da64db2
76ca861
 
 
 
 
 
da64db2
86d79a3
da64db2
e673ce2
 
 
da64db2
 
 
 
86d79a3
e673ce2
86d79a3
 
e673ce2
86d79a3
 
 
 
e673ce2
 
 
 
 
 
 
 
 
 
86d79a3
 
 
 
 
 
 
 
 
 
e673ce2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""
Authentication middleware for JWT token validation with Supabase database integration.
Supports dual authentication: JWT tokens for users and HuggingFace API key for admin access.
"""
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional, Dict, Any
import os
import jwt
import logging
from datetime import datetime
from dotenv import load_dotenv
from ..services.database import get_user_session, get_user_by_username

load_dotenv()

security = HTTPBearer(auto_error=False)
logger = logging.getLogger(__name__)

def get_secret_key() -> str:
    """Get JWT secret key from environment"""
    secret_key = os.getenv("SECRET_KEY")
    if not secret_key:
        raise ValueError("SECRET_KEY environment variable not set. Cannot issue or verify JWTs.")
    return secret_key

def get_jwt_issuer() -> Optional[str]:
    """Get JWT issuer from environment"""
    return os.getenv("JWT_ISSUER")

def get_jwt_audience() -> Optional[str]:
    """Get JWT audience from environment"""
    return os.getenv("JWT_AUDIENCE")

def get_hf_api_key() -> Optional[str]:
    """Get HuggingFace API key from environment"""
    return os.getenv("HF_API_KEY")

async def authenticate_request(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool:
    """
    Primary authentication dependency for protected endpoints.
    Implements dual authentication strategy:
    1. HuggingFace API key (admin bypass) - simple string comparison
    2. JWT token (user authentication) - cryptographic validation + session verification
    
    For JWT tokens:
    - Validates signature, expiration, audience, and issuer
    - Checks session validity in Supabase database via 'jti' claim
    - Rejects revoked sessions
    
    Returns True if authentication succeeds, otherwise raises HTTPException.
    """
    expected_token = get_hf_api_key()
    
    if not credentials:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authentication required. Please provide a valid JWT token or HuggingFace API key.",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    token = credentials.credentials
    
    # Check HuggingFace API key first (admin bypass - performance optimization)
    if expected_token and token == expected_token:
        return True
    
    # Validate JWT token with full session verification
    try:
        secret_key = get_secret_key()
        issuer = get_jwt_issuer()
        audience = get_jwt_audience()
        payload = jwt.decode(
            token, 
            secret_key, 
            algorithms=["HS256"],
            audience=audience,
            issuer=issuer
        )
        
        # Check if session is still valid (not revoked)
        jti = payload.get("jti")
        if jti:
            session = await get_user_session(jti)
            if not session:
                logger.warning(f"JWT verification failed: Session has been revoked for jti: {jti}")
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Session has been revoked",
                    headers={"WWW-Authenticate": "Bearer"},
                )
        
        return True
    except jwt.ExpiredSignatureError:
        logger.warning("JWT verification failed: Token has expired")
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has expired",
            headers={"WWW-Authenticate": "Bearer"},
        )
    except jwt.InvalidTokenError as e:
        logger.warning(f"JWT verification failed: Invalid token - {e}")
        # Potential Issue: Broad exception handling. Catching InvalidTokenError is a safe default
        # to avoid leaking error details, but it can make debugging harder.
        # Consider logging the specific error here for internal monitoring.
        pass
    
    # If neither verification method worked, deny access
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Invalid authentication token",
        headers={"WWW-Authenticate": "Bearer"},
    )

async def optional_auth(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> bool:
    """
    Optional authentication - doesn't raise errors if no token provided.
    Returns True if authentication is successful, False otherwise.
    Used for endpoints that can work with or without authentication.
    """
    if not credentials:
        return False
    
    token = credentials.credentials
    expected_token = get_hf_api_key()
    
    # Check HF API key
    if expected_token and token == expected_token:
        return True
    
    # Check JWT token
    try:
        secret_key = get_secret_key()
        issuer = get_jwt_issuer()
        audience = get_jwt_audience()
        payload = jwt.decode(
            token, 
            secret_key, 
            algorithms=["HS256"],
            audience=audience,
            issuer=issuer
        )
        
        # Check session validity
        jti = payload.get("jti")
        if jti:
            session = await get_user_session(jti)
            return session is not None
        
        return True
    except jwt.InvalidTokenError:
        return False

async def get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]:
    """
    Extract authenticated user data from JWT token.
    
    Returns:
    - User data dict if authenticated with valid JWT token
    - None if using HuggingFace API key (no user context)
    - None if not authenticated or invalid token
    
    For JWT tokens:
    - Validates token signature and session in Supabase
    - Retrieves full user data from database using 'sub' claim (username)
    """
    if not credentials:
        return None
    
    token = credentials.credentials
    
    # Check if it's an HF API key (these don't have user context)
    expected_hf_token = get_hf_api_key()
    if expected_hf_token and token == expected_hf_token:
        return None  # HF API key users don't have user context
    
    # Try to decode JWT token
    try:
        secret_key = get_secret_key()
        issuer = get_jwt_issuer()
        audience = get_jwt_audience()
        payload = jwt.decode(
            token, 
            secret_key, 
            algorithms=["HS256"],
            audience=audience,
            issuer=issuer
        )
        
        # Check if session is still valid
        jti = payload.get("jti")
        if jti:
            session = await get_user_session(jti)
            if not session:
                return None
        
        # Get user data from database using username from token
        username = payload.get("sub")
        if username:
            user = await get_user_by_username(username)
            return user
        
        return None
    except jwt.InvalidTokenError:
        return None

async def require_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Dict[str, Any]:
    """
    Extract authenticated user data from JWT token - mandatory authentication.
    
    Use this dependency for endpoints that require user authentication.
    Raises HTTPException if:
    - No credentials provided
    - Using HuggingFace API key (no user context)
    - Invalid or expired JWT token
    - Revoked session
    
    Returns: User data dict from Supabase database
    """
    user = await get_current_user(credentials)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authentication required. Please provide a valid JWT token.",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user