|
|
from typing import Optional |
|
|
from fastapi import HTTPException, Depends, status |
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader |
|
|
from supabase import create_client, Client |
|
|
import jwt |
|
|
|
|
|
from typing import Optional |
|
|
from fastapi import HTTPException, Depends, status |
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader |
|
|
from supabase import create_client, Client |
|
|
import jwt |
|
|
|
|
|
from core.config import SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, SECRET_KEY, ALGORITHM |
|
|
from core.models import User, TokenData |
|
|
|
|
|
|
|
|
def get_supabase_client() -> Client: |
|
|
try: |
|
|
return create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
|
|
except Exception as e: |
|
|
print(f"Error initializing Supabase client: {e}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail="Failed to connect to Supabase. Please check server configuration." |
|
|
) |
|
|
|
|
|
|
|
|
oauth2_scheme = HTTPBearer(auto_error=False) |
|
|
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) |
|
|
|
|
|
|
|
|
async def get_current_user_from_token( |
|
|
credentials: Optional[HTTPAuthorizationCredentials] = Depends(oauth2_scheme), |
|
|
supabase_client: Client = Depends(get_supabase_client) |
|
|
) -> Optional[User]: |
|
|
if not credentials: |
|
|
return None |
|
|
|
|
|
token = credentials.credentials |
|
|
try: |
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
|
|
user_id: str = payload.get("sub") |
|
|
if user_id is None: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid authentication credentials (token payload)", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
|
|
|
|
|
|
res = supabase_client.table('sp_users').select('id, email, is_admin, email_verified').eq('id', user_id).single().execute() |
|
|
if res.data: |
|
|
return User( |
|
|
id=res.data['id'], |
|
|
email=res.data['email'], |
|
|
is_admin=res.data.get('is_admin', False), |
|
|
email_verified=res.data.get('email_verified', False) |
|
|
) |
|
|
else: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid authentication credentials (user not found)", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
except jwt.PyJWTError: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid authentication credentials (token invalid)", |
|
|
headers={"WWW-Authenticate": "Bearer"}, |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error in get_current_user_from_token: {e}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Internal server error during token authentication: {e}", |
|
|
) |
|
|
|
|
|
|
|
|
async def get_current_user_from_api_key( |
|
|
api_key: Optional[str] = Depends(api_key_header), |
|
|
supabase_client: Client = Depends(get_supabase_client) |
|
|
) -> Optional[User]: |
|
|
if not api_key: |
|
|
return None |
|
|
|
|
|
try: |
|
|
res = supabase_client.table('sp_user_api_keys').select('user_id').eq('api_key', api_key).single().execute() |
|
|
if res.data and res.data['user_id']: |
|
|
user_id = res.data['user_id'] |
|
|
|
|
|
user_res = supabase_client.table('sp_users').select('id, email, is_admin, email_verified').eq('id', user_id).single().execute() |
|
|
if user_res.data: |
|
|
return User( |
|
|
id=user_res.data['id'], |
|
|
email=user_res.data['email'], |
|
|
is_admin=user_res.data.get('is_admin', False), |
|
|
email_verified=user_res.data.get('email_verified', False) |
|
|
) |
|
|
else: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid API Key or user not found", |
|
|
headers={"X-API-Key": "Invalid"}, |
|
|
) |
|
|
else: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid API Key", |
|
|
headers={"X-API-Key": "Invalid"}, |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"Error in get_current_user_from_api_key: {e}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Internal server error during API Key authentication: {e}", |
|
|
) |
|
|
|
|
|
|
|
|
async def get_current_active_user( |
|
|
user_from_token: Optional[User] = Depends(get_current_user_from_token), |
|
|
user_from_api_key: Optional[User] = Depends(get_current_user_from_api_key) |
|
|
) -> User: |
|
|
if user_from_token: |
|
|
return user_from_token |
|
|
if user_from_api_key: |
|
|
return user_from_api_key |
|
|
|
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Not authenticated", |
|
|
headers={"WWW-Authenticate": "Bearer or X-API-Key"}, |
|
|
) |
|
|
|
|
|
|
|
|
async def get_current_admin_user(current_user: User = Depends(get_current_active_user)) -> User: |
|
|
if not current_user.is_admin: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_403_FORBIDDEN, |
|
|
detail="Operation forbidden: Not an administrator." |
|
|
) |
|
|
return current_user |
|
|
|