"""Google OAuth2 authentication endpoints.""" from __future__ import annotations import json from urllib.parse import urlencode from typing import Optional from fastapi import APIRouter, Request, HTTPException from fastapi.responses import RedirectResponse import httpx from .config import get_settings, GOOGLE_SCOPES from .database import save_oauth_tokens, get_oauth_tokens, delete_oauth_tokens router = APIRouter(prefix="/auth", tags=["authentication"]) # Store state tokens temporarily (in production, use Redis or similar) _state_store: dict[str, str] = {} def get_redirect_uri(request: Request, settings) -> str: """Get the OAuth redirect URI, auto-detecting from request if not configured.""" if settings.google_redirect_uri and settings.google_redirect_uri != "http://localhost:8000/auth/callback": return settings.google_redirect_uri # Auto-detect from request # Check for common proxy headers first forwarded_proto = request.headers.get("x-forwarded-proto", "http") forwarded_host = request.headers.get("x-forwarded-host") or request.headers.get("host", "localhost:8000") # Normalize 127.0.0.1 to localhost for local development (Google OAuth requires exact match) if forwarded_host.startswith("127.0.0.1"): forwarded_host = forwarded_host.replace("127.0.0.1", "localhost") # Build the redirect URI return f"{forwarded_proto}://{forwarded_host}/auth/callback" @router.get("/start") async def start_auth(teacher_email: str, request: Request): """ Start OAuth flow by redirecting to Google consent screen. Query params: teacher_email: The teacher's email address """ settings = get_settings() if not settings.google_client_id or not settings.google_client_secret: raise HTTPException( status_code=500, detail="Google OAuth not configured. Please set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET." ) # Generate state token for CSRF protection import secrets state = secrets.token_urlsafe(32) # Get the redirect URI (auto-detect or from config) redirect_uri = get_redirect_uri(request, settings) # Store both email and redirect_uri in state for callback _state_store[state] = {"email": teacher_email, "redirect_uri": redirect_uri} # Build authorization URL # Use "select_account consent" to allow account selection first, then show consent # This is more flexible and works better when user isn't logged in auth_params = { "client_id": settings.google_client_id, "redirect_uri": redirect_uri, "response_type": "code", "scope": " ".join(GOOGLE_SCOPES), "access_type": "offline", # Get refresh token "prompt": "select_account consent", # Allow account selection, then show consent "state": state, # Only add login_hint if email is provided and valid # This helps pre-fill but doesn't break if user isn't logged in } # Add login_hint only if email looks valid (helps pre-fill but not required) if teacher_email and "@" in teacher_email: auth_params["login_hint"] = teacher_email print(f"[OAuth] Starting auth for {teacher_email}") print(f"[OAuth] Redirect URI: {redirect_uri}") print(f"[OAuth] Scopes: {GOOGLE_SCOPES}") print(f"[OAuth] Full auth URL: https://accounts.google.com/o/oauth2/v2/auth?{urlencode(auth_params)}") auth_url = f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode(auth_params)}" return RedirectResponse(url=auth_url) def get_frontend_url(request: Request, settings) -> str: """Get the frontend URL, auto-detecting from request if not configured.""" if settings.frontend_url and settings.frontend_url != "http://localhost:3000": return settings.frontend_url # Auto-detect from request (same origin for full-stack deployment) forwarded_proto = request.headers.get("x-forwarded-proto", "http") forwarded_host = request.headers.get("x-forwarded-host") or request.headers.get("host", "localhost:3000") return f"{forwarded_proto}://{forwarded_host}" @router.get("/callback") async def auth_callback(request: Request, code: str = None, state: str = None, error: str = None, error_description: str = None): """ Handle OAuth callback from Google. Query params: code: Authorization code from Google state: State token for CSRF verification error: Error message if authorization failed error_description: Detailed error description from Google """ settings = get_settings() frontend_url = get_frontend_url(request, settings) if error: # Log the full error details error_msg = error if error_description: error_msg = f"{error}: {error_description}" print(f"[OAuth] Error callback: {error_msg}") print(f"[OAuth] Full query params: {dict(request.query_params)}") return RedirectResponse( url=f"{frontend_url}?auth_error={error_msg}" ) if not code or not state: return RedirectResponse( url=f"{frontend_url}?auth_error=missing_params" ) # Verify state and get stored data state_data = _state_store.pop(state, None) if not state_data: return RedirectResponse( url=f"{frontend_url}?auth_error=invalid_state" ) teacher_email = state_data["email"] redirect_uri = state_data["redirect_uri"] print(f"[OAuth] Callback for {teacher_email}") print(f"[OAuth] Using redirect_uri: {redirect_uri}") # Exchange code for tokens try: async with httpx.AsyncClient() as client: token_response = await client.post( "https://oauth2.googleapis.com/token", data={ "client_id": settings.google_client_id, "client_secret": settings.google_client_secret, "code": code, "grant_type": "authorization_code", "redirect_uri": redirect_uri, }, ) if token_response.status_code != 200: print(f"[OAuth] Token exchange failed: {token_response.text}") return RedirectResponse( url=f"{frontend_url}?auth_error=token_exchange_failed" ) tokens = token_response.json() # Verify the user's email matches userinfo_response = await client.get( "https://www.googleapis.com/oauth2/v2/userinfo", headers={"Authorization": f"Bearer {tokens['access_token']}"} ) if userinfo_response.status_code == 200: userinfo = userinfo_response.json() verified_email = userinfo.get("email", "") # Use the verified email from Google if verified_email: teacher_email = verified_email # Save tokens await save_oauth_tokens(teacher_email, { "access_token": tokens.get("access_token"), "refresh_token": tokens.get("refresh_token"), "expires_in": tokens.get("expires_in"), }) print(f"[OAuth] Success for {teacher_email}") return RedirectResponse( url=f"{frontend_url}?auth_success=true&email={teacher_email}" ) except Exception as e: print(f"[OAuth] Error: {e}") return RedirectResponse( url=f"{frontend_url}?auth_error={str(e)}" ) @router.get("/status") async def auth_status(teacher_email: str): """ Check if a teacher is authenticated. Query params: teacher_email: The teacher's email address """ tokens = await get_oauth_tokens(teacher_email) return { "authenticated": tokens is not None, "email": teacher_email if tokens else None } @router.post("/disconnect") async def disconnect(teacher_email: str): """ Disconnect a teacher's Google account. Query params: teacher_email: The teacher's email address """ await delete_oauth_tokens(teacher_email) return {"status": "ok", "message": "Google account disconnected"} @router.get("/debug") async def debug_oauth(request: Request): """Debug endpoint to check OAuth configuration.""" settings = get_settings() redirect_uri = get_redirect_uri(request, settings) frontend_url = get_frontend_url(request, settings) return { "google_client_id_configured": bool(settings.google_client_id), "google_client_secret_configured": bool(settings.google_client_secret), "configured_redirect_uri": settings.google_redirect_uri, "auto_detected_redirect_uri": redirect_uri, "configured_frontend_url": settings.frontend_url, "auto_detected_frontend_url": frontend_url, "request_headers": { "host": request.headers.get("host"), "x-forwarded-host": request.headers.get("x-forwarded-host"), "x-forwarded-proto": request.headers.get("x-forwarded-proto"), } }