MukeshKapoor25's picture
feat(auth): Implement JWT authentication for appointment endpoints
a7715d0
"""
Authentication dependencies for E-commerce microservice.
Validates customer JWT tokens issued by auth-ms.
"""
import logging
from typing import Optional, Dict, Any
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from pydantic import BaseModel
from app.core.config import settings
logger = logging.getLogger(__name__)
security = HTTPBearer()
class CustomerToken(BaseModel):
"""Customer user model from JWT token payload."""
customer_id: str
phone_number: Optional[str] = None
email: Optional[str] = None
name: Optional[str] = None
user_type: str = "customer" # To distinguish from system users
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""
Verify JWT token from auth microservice.
Args:
token: JWT token string
Returns:
Token payload dict if valid, None otherwise
"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
return payload
except JWTError as e:
logger.warning(f"Token verification failed: {str(e)}")
return None
except Exception as e:
logger.error(f"Unexpected error during token verification: {str(e)}")
return None
async def get_current_customer(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> CustomerToken:
"""
Get current authenticated customer from JWT token.
Extracts customer_id from the 'sub' claim in the JWT.
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Verify token
payload = verify_token(credentials.credentials)
if payload is None:
logger.warning("Token verification returned None")
raise credentials_exception
# Extract customer_id from 'sub' claim
customer_id: str = payload.get("sub")
token_type: str = payload.get("type", "customer")
if customer_id is None:
logger.warning(f"Missing customer_id in token payload")
raise credentials_exception
# Ensure this is a customer token, not a system user token
if token_type != "customer":
logger.warning(f"Invalid token type: {token_type}, expected 'customer'")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="This endpoint requires customer authentication"
)
logger.debug(f"Customer authenticated: {customer_id}")
return CustomerToken(
customer_id=customer_id,
phone_number=payload.get("mobile"),
email=payload.get("email"),
name=payload.get("name"),
user_type="customer"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in get_current_customer: {str(e)}")
raise credentials_exception
async def get_optional_customer(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False))
) -> Optional[CustomerToken]:
"""
Get current customer if token is provided, otherwise return None.
Useful for endpoints that work with or without authentication.
"""
if credentials is None:
return None
try:
# Verify token
payload = verify_token(credentials.credentials)
if payload is None:
return None
customer_id: str = payload.get("sub")
token_type: str = payload.get("type", "customer")
if customer_id is None or token_type != "customer":
return None
return CustomerToken(
customer_id=customer_id,
phone_number=payload.get("mobile"),
email=payload.get("email"),
name=payload.get("name"),
user_type="customer"
)
except Exception:
return None