Spaces:
Sleeping
Sleeping
| """ | |
| Authentication dependencies for FastAPI. | |
| Validates JWT tokens and provides user context. | |
| """ | |
| from typing import Optional | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from jose import JWTError, jwt | |
| from pydantic import BaseModel | |
| from app.core.config import settings | |
| security = HTTPBearer() | |
| class TokenUser(BaseModel): | |
| """User information extracted from JWT token""" | |
| user_id: str | |
| username: str | |
| role: str | |
| merchant_id: str | |
| merchant_type: Optional[str] = None | |
| metadata: Optional[dict] = None | |
| def has_role(self, *roles: str) -> bool: | |
| """Check if user has any of the specified roles""" | |
| return self.role in roles | |
| def is_admin(self) -> bool: | |
| """Check if user has admin privileges""" | |
| return "admin" in self.role.lower() | |
| async def get_current_user( | |
| credentials: HTTPAuthorizationCredentials = Depends(security) | |
| ) -> TokenUser: | |
| """ | |
| Get current authenticated user from JWT token. | |
| Validates the token and extracts user information. | |
| """ | |
| credentials_exception = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| # Decode and verify JWT token | |
| payload = jwt.decode( | |
| credentials.credentials, | |
| settings.SECRET_KEY, | |
| algorithms=[settings.ALGORITHM] | |
| ) | |
| user_id: str = payload.get("sub") | |
| username: str = payload.get("username") | |
| role: str = payload.get("role") | |
| merchant_id: str = payload.get("merchant_id") | |
| merchant_type: str = payload.get("merchant_type") | |
| metadata: dict = payload.get("metadata") | |
| if user_id is None or username is None or merchant_id is None: | |
| raise credentials_exception | |
| # Create TokenUser from payload | |
| return TokenUser( | |
| user_id=user_id, | |
| username=username, | |
| role=role or "user", | |
| merchant_id=merchant_id, | |
| merchant_type=merchant_type, | |
| metadata=metadata | |
| ) | |
| except JWTError: | |
| raise credentials_exception | |
| except Exception: | |
| raise credentials_exception | |
| async def get_current_active_user( | |
| current_user: TokenUser = Depends(get_current_user) | |
| ) -> TokenUser: | |
| """Get current active user.""" | |
| return current_user | |