MukeshKapoor25's picture
Initial commit
b143975
"""
Authentication dependencies for Analytics Microservice.
Validates JWT tokens issued by the Auth microservice.
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from pydantic import BaseModel
from app.core.config import settings
security = HTTPBearer()
class TokenUser(BaseModel):
user_id: str
username: str
role_id: str
merchant_id: str
merchant_type: Optional[str] = None
metadata: Optional[dict] = None
def has_role(self, *roles: str) -> bool:
return self.role_id in roles
def is_admin(self) -> bool:
return "admin" in self.role_id.lower()
def is_super_admin(self) -> bool:
return "super_admin" in self.role_id.lower()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> TokenUser:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
credentials.credentials,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM],
)
user_id: str = payload.get("user_id") or payload.get("sub")
if not user_id:
raise credentials_exception
return TokenUser(
user_id=user_id,
username=payload.get("username", ""),
role_id=payload.get("role_id", ""),
merchant_id=payload.get("merchant_id", ""),
merchant_type=payload.get("merchant_type"),
metadata=payload.get("metadata"),
)
except JWTError:
raise credentials_exception