File size: 5,797 Bytes
133609a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
from typing import Optional
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
from supabase import create_client, Client
import jwt
from typing import Optional
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
from supabase import create_client, Client
import jwt
from core.config import SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, SECRET_KEY, ALGORITHM
from core.models import User, TokenData # Import TokenData
# Dependency to get Supabase client
def get_supabase_client() -> Client:
try:
return create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
except Exception as e:
print(f"Error initializing Supabase client: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to connect to Supabase. Please check server configuration."
)
# Authentication schemes
oauth2_scheme = HTTPBearer(auto_error=False)
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Dependency to get current user from JWT token
async def get_current_user_from_token(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(oauth2_scheme),
supabase_client: Client = Depends(get_supabase_client)
) -> Optional[User]:
if not credentials:
return None
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials (token payload)",
headers={"WWW-Authenticate": "Bearer"},
)
# Fetch user from sp_users table, including is_admin
res = supabase_client.table('sp_users').select('id, email, is_admin, email_verified').eq('id', user_id).single().execute()
if res.data:
return User(
id=res.data['id'],
email=res.data['email'],
is_admin=res.data.get('is_admin', False), # Default to False if not present
email_verified=res.data.get('email_verified', False)
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials (user not found)",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials (token invalid)",
headers={"WWW-Authenticate": "Bearer"},
)
except Exception as e:
print(f"Error in get_current_user_from_token: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error during token authentication: {e}",
)
# Dependency to get current user from API Key
async def get_current_user_from_api_key(
api_key: Optional[str] = Depends(api_key_header),
supabase_client: Client = Depends(get_supabase_client)
) -> Optional[User]:
if not api_key:
return None
try:
res = supabase_client.table('sp_user_api_keys').select('user_id').eq('api_key', api_key).single().execute()
if res.data and res.data['user_id']:
user_id = res.data['user_id']
# Fetch user details from sp_users table, including is_admin
user_res = supabase_client.table('sp_users').select('id, email, is_admin, email_verified').eq('id', user_id).single().execute()
if user_res.data:
return User(
id=user_res.data['id'],
email=user_res.data['email'],
is_admin=user_res.data.get('is_admin', False), # Default to False if not present
email_verified=user_res.data.get('email_verified', False)
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key or user not found",
headers={"X-API-Key": "Invalid"},
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key",
headers={"X-API-Key": "Invalid"},
)
except Exception as e:
print(f"Error in get_current_user_from_api_key: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error during API Key authentication: {e}",
)
# Combined dependency for authentication
async def get_current_active_user(
user_from_token: Optional[User] = Depends(get_current_user_from_token),
user_from_api_key: Optional[User] = Depends(get_current_user_from_api_key)
) -> User:
if user_from_token:
return user_from_token
if user_from_api_key:
return user_from_api_key
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer or X-API-Key"},
)
# New dependency to check for admin user
async def get_current_admin_user(current_user: User = Depends(get_current_active_user)) -> User:
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Operation forbidden: Not an administrator."
)
return current_user
|