emu-rag / src /api /routers /auth_microsoft.py
Sarp Bilgiç
local auth
83caf02
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi_sso.sso.microsoft import MicrosoftSSO
from sqlmodel.ext.asyncio.session import AsyncSession
import logging
import jwt
from typing import Annotated, Optional
from src.core.settings import settings
from src.api.dependencies.clients import get_db
from src.api.services.auth_service import AuthService
from src.api.selectors.user.get_or_create_user import get_or_create_user
from src.api.dependencies.auth import get_auth_service
from src.api.dependencies.rate_limit import login_rate_limiter
from fastapi.responses import JSONResponse
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/v1/auth",
tags=["auth"]
)
microsoft_sso = MicrosoftSSO(
client_id=settings.microsoft_client_id,
client_secret=settings.microsoft_client_secret,
tenant="common",
redirect_uri=settings.microsoft_redirect_uri,
allow_insecure_http=True,
scope=["openid", "email", "profile"]
)
@router.get("/microsoft/login", dependencies=[Depends(login_rate_limiter)])
async def microsoft_login():
async with microsoft_sso:
return await microsoft_sso.get_login_redirect()
@router.get("/microsoft/callback", dependencies=[Depends(login_rate_limiter)])
async def microsoft_callback(
request: Request,
db: Annotated[AsyncSession, Depends(get_db)],
auth_service: Annotated[AuthService, Depends(get_auth_service)]
):
try:
async with microsoft_sso:
user_data = await microsoft_sso.verify_and_process(request)
if not user_data:
logger.error("Microsoft SSO verification failed")
raise HTTPException(
status_code=401,
detail="Authentication failed: Unable to verify Microsoft account"
)
email: Optional[str] = None
user_id: Optional[str] = None
display_name: Optional[str] = None
first_name: Optional[str] = None
id_token = getattr(microsoft_sso, '_id_token', None)
if id_token:
try:
decoded_token = jwt.decode(id_token, options={"verify_signature": False})
email = auth_service.extract_email_from_token(decoded_token)
user_info = auth_service.extract_user_info_from_token(decoded_token)
user_id = user_info["user_id"]
display_name = user_info["display_name"]
first_name = user_info["first_name"]
except jwt.DecodeError as e:
logger.warning(f"Failed to decode ID token: {e}")
if not email:
user_info = auth_service.extract_user_info_from_sso_user_data(user_data)
email = user_info["email"]
user_id = user_info["user_id"]
display_name = user_info["display_name"]
first_name = user_info["first_name"]
if not email:
logger.error("No email found in Microsoft SSO response")
raise HTTPException(
status_code=400,
detail="Email not provided by Microsoft account"
)
auth_service.validate_emu_email(email)
user = await get_or_create_user(
email=email,
user_id=user_id,
display_name=display_name,
first_name=first_name,
db=db
)
access_token = auth_service.create_access_token(data={"sub": user.email})
response = JSONResponse(content={
"access_token": access_token,
"token_type": "bearer",
"user": {
"email": user.email,
"username": user.username,
}
})
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=True,
samesite="lax",
max_age=settings.access_token_expire_minutes * 60
)
return response
except HTTPException:
raise
except Exception as e:
logger.exception(f"Unexpected error during Microsoft OAuth callback: {e}")
raise HTTPException(
status_code=500,
detail="An internal error occurred during authentication. Please try again later."
)