spwebsite / core /dependencies.py
geqintan's picture
update
133609a
from typing import Optional
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
from supabase import create_client, Client
import jwt
from typing import Optional
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
from supabase import create_client, Client
import jwt
from core.config import SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, SECRET_KEY, ALGORITHM
from core.models import User, TokenData # Import TokenData
# Dependency to get Supabase client
def get_supabase_client() -> Client:
try:
return create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
except Exception as e:
print(f"Error initializing Supabase client: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to connect to Supabase. Please check server configuration."
)
# Authentication schemes
oauth2_scheme = HTTPBearer(auto_error=False)
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Dependency to get current user from JWT token
async def get_current_user_from_token(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(oauth2_scheme),
supabase_client: Client = Depends(get_supabase_client)
) -> Optional[User]:
if not credentials:
return None
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials (token payload)",
headers={"WWW-Authenticate": "Bearer"},
)
# Fetch user from sp_users table, including is_admin
res = supabase_client.table('sp_users').select('id, email, is_admin, email_verified').eq('id', user_id).single().execute()
if res.data:
return User(
id=res.data['id'],
email=res.data['email'],
is_admin=res.data.get('is_admin', False), # Default to False if not present
email_verified=res.data.get('email_verified', False)
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials (user not found)",
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials (token invalid)",
headers={"WWW-Authenticate": "Bearer"},
)
except Exception as e:
print(f"Error in get_current_user_from_token: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error during token authentication: {e}",
)
# Dependency to get current user from API Key
async def get_current_user_from_api_key(
api_key: Optional[str] = Depends(api_key_header),
supabase_client: Client = Depends(get_supabase_client)
) -> Optional[User]:
if not api_key:
return None
try:
res = supabase_client.table('sp_user_api_keys').select('user_id').eq('api_key', api_key).single().execute()
if res.data and res.data['user_id']:
user_id = res.data['user_id']
# Fetch user details from sp_users table, including is_admin
user_res = supabase_client.table('sp_users').select('id, email, is_admin, email_verified').eq('id', user_id).single().execute()
if user_res.data:
return User(
id=user_res.data['id'],
email=user_res.data['email'],
is_admin=user_res.data.get('is_admin', False), # Default to False if not present
email_verified=user_res.data.get('email_verified', False)
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key or user not found",
headers={"X-API-Key": "Invalid"},
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key",
headers={"X-API-Key": "Invalid"},
)
except Exception as e:
print(f"Error in get_current_user_from_api_key: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal server error during API Key authentication: {e}",
)
# Combined dependency for authentication
async def get_current_active_user(
user_from_token: Optional[User] = Depends(get_current_user_from_token),
user_from_api_key: Optional[User] = Depends(get_current_user_from_api_key)
) -> User:
if user_from_token:
return user_from_token
if user_from_api_key:
return user_from_api_key
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer or X-API-Key"},
)
# New dependency to check for admin user
async def get_current_admin_user(current_user: User = Depends(get_current_active_user)) -> User:
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Operation forbidden: Not an administrator."
)
return current_user