from typing import Optional from fastapi import HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader from supabase import create_client, Client import jwt from typing import Optional from fastapi import HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader from supabase import create_client, Client import jwt from core.config import SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, SECRET_KEY, ALGORITHM from core.models import User, TokenData # Import TokenData # Dependency to get Supabase client def get_supabase_client() -> Client: try: return create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) except Exception as e: print(f"Error initializing Supabase client: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to connect to Supabase. Please check server configuration." ) # Authentication schemes oauth2_scheme = HTTPBearer(auto_error=False) api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) # Dependency to get current user from JWT token async def get_current_user_from_token( credentials: Optional[HTTPAuthorizationCredentials] = Depends(oauth2_scheme), supabase_client: Client = Depends(get_supabase_client) ) -> Optional[User]: if not credentials: return None token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials (token payload)", headers={"WWW-Authenticate": "Bearer"}, ) # Fetch user from sp_users table, including is_admin res = supabase_client.table('sp_users').select('id, email, is_admin, email_verified').eq('id', user_id).single().execute() if res.data: return User( id=res.data['id'], email=res.data['email'], is_admin=res.data.get('is_admin', False), # Default to False if not present email_verified=res.data.get('email_verified', False) ) else: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials (user not found)", headers={"WWW-Authenticate": "Bearer"}, ) except jwt.PyJWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials (token invalid)", headers={"WWW-Authenticate": "Bearer"}, ) except Exception as e: print(f"Error in get_current_user_from_token: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during token authentication: {e}", ) # Dependency to get current user from API Key async def get_current_user_from_api_key( api_key: Optional[str] = Depends(api_key_header), supabase_client: Client = Depends(get_supabase_client) ) -> Optional[User]: if not api_key: return None try: res = supabase_client.table('sp_user_api_keys').select('user_id').eq('api_key', api_key).single().execute() if res.data and res.data['user_id']: user_id = res.data['user_id'] # Fetch user details from sp_users table, including is_admin user_res = supabase_client.table('sp_users').select('id, email, is_admin, email_verified').eq('id', user_id).single().execute() if user_res.data: return User( id=user_res.data['id'], email=user_res.data['email'], is_admin=user_res.data.get('is_admin', False), # Default to False if not present email_verified=user_res.data.get('email_verified', False) ) else: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API Key or user not found", headers={"X-API-Key": "Invalid"}, ) else: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API Key", headers={"X-API-Key": "Invalid"}, ) except Exception as e: print(f"Error in get_current_user_from_api_key: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal server error during API Key authentication: {e}", ) # Combined dependency for authentication async def get_current_active_user( user_from_token: Optional[User] = Depends(get_current_user_from_token), user_from_api_key: Optional[User] = Depends(get_current_user_from_api_key) ) -> User: if user_from_token: return user_from_token if user_from_api_key: return user_from_api_key raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated", headers={"WWW-Authenticate": "Bearer or X-API-Key"}, ) # New dependency to check for admin user async def get_current_admin_user(current_user: User = Depends(get_current_active_user)) -> User: if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Operation forbidden: Not an administrator." ) return current_user