chih.yikuan
email-done
5ee5085
"""Google OAuth2 authentication endpoints."""
from __future__ import annotations
import json
from urllib.parse import urlencode
from typing import Optional
from fastapi import APIRouter, Request, HTTPException
from fastapi.responses import RedirectResponse
import httpx
from .config import get_settings, GOOGLE_SCOPES
from .database import save_oauth_tokens, get_oauth_tokens, delete_oauth_tokens
router = APIRouter(prefix="/auth", tags=["authentication"])
# Store state tokens temporarily (in production, use Redis or similar)
_state_store: dict[str, str] = {}
def get_redirect_uri(request: Request, settings) -> str:
"""Get the OAuth redirect URI, auto-detecting from request if not configured."""
if settings.google_redirect_uri and settings.google_redirect_uri != "http://localhost:8000/auth/callback":
return settings.google_redirect_uri
# Auto-detect from request
# Check for common proxy headers first
forwarded_proto = request.headers.get("x-forwarded-proto", "http")
forwarded_host = request.headers.get("x-forwarded-host") or request.headers.get("host", "localhost:8000")
# Normalize 127.0.0.1 to localhost for local development (Google OAuth requires exact match)
if forwarded_host.startswith("127.0.0.1"):
forwarded_host = forwarded_host.replace("127.0.0.1", "localhost")
# Build the redirect URI
return f"{forwarded_proto}://{forwarded_host}/auth/callback"
@router.get("/start")
async def start_auth(teacher_email: str, request: Request):
"""
Start OAuth flow by redirecting to Google consent screen.
Query params:
teacher_email: The teacher's email address
"""
settings = get_settings()
if not settings.google_client_id or not settings.google_client_secret:
raise HTTPException(
status_code=500,
detail="Google OAuth not configured. Please set GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET."
)
# Generate state token for CSRF protection
import secrets
state = secrets.token_urlsafe(32)
# Get the redirect URI (auto-detect or from config)
redirect_uri = get_redirect_uri(request, settings)
# Store both email and redirect_uri in state for callback
_state_store[state] = {"email": teacher_email, "redirect_uri": redirect_uri}
# Build authorization URL
# Use "select_account consent" to allow account selection first, then show consent
# This is more flexible and works better when user isn't logged in
auth_params = {
"client_id": settings.google_client_id,
"redirect_uri": redirect_uri,
"response_type": "code",
"scope": " ".join(GOOGLE_SCOPES),
"access_type": "offline", # Get refresh token
"prompt": "select_account consent", # Allow account selection, then show consent
"state": state,
# Only add login_hint if email is provided and valid
# This helps pre-fill but doesn't break if user isn't logged in
}
# Add login_hint only if email looks valid (helps pre-fill but not required)
if teacher_email and "@" in teacher_email:
auth_params["login_hint"] = teacher_email
print(f"[OAuth] Starting auth for {teacher_email}")
print(f"[OAuth] Redirect URI: {redirect_uri}")
print(f"[OAuth] Scopes: {GOOGLE_SCOPES}")
print(f"[OAuth] Full auth URL: https://accounts.google.com/o/oauth2/v2/auth?{urlencode(auth_params)}")
auth_url = f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode(auth_params)}"
return RedirectResponse(url=auth_url)
def get_frontend_url(request: Request, settings) -> str:
"""Get the frontend URL, auto-detecting from request if not configured."""
if settings.frontend_url and settings.frontend_url != "http://localhost:3000":
return settings.frontend_url
# Auto-detect from request (same origin for full-stack deployment)
forwarded_proto = request.headers.get("x-forwarded-proto", "http")
forwarded_host = request.headers.get("x-forwarded-host") or request.headers.get("host", "localhost:3000")
return f"{forwarded_proto}://{forwarded_host}"
@router.get("/callback")
async def auth_callback(request: Request, code: str = None, state: str = None, error: str = None, error_description: str = None):
"""
Handle OAuth callback from Google.
Query params:
code: Authorization code from Google
state: State token for CSRF verification
error: Error message if authorization failed
error_description: Detailed error description from Google
"""
settings = get_settings()
frontend_url = get_frontend_url(request, settings)
if error:
# Log the full error details
error_msg = error
if error_description:
error_msg = f"{error}: {error_description}"
print(f"[OAuth] Error callback: {error_msg}")
print(f"[OAuth] Full query params: {dict(request.query_params)}")
return RedirectResponse(
url=f"{frontend_url}?auth_error={error_msg}"
)
if not code or not state:
return RedirectResponse(
url=f"{frontend_url}?auth_error=missing_params"
)
# Verify state and get stored data
state_data = _state_store.pop(state, None)
if not state_data:
return RedirectResponse(
url=f"{frontend_url}?auth_error=invalid_state"
)
teacher_email = state_data["email"]
redirect_uri = state_data["redirect_uri"]
print(f"[OAuth] Callback for {teacher_email}")
print(f"[OAuth] Using redirect_uri: {redirect_uri}")
# Exchange code for tokens
try:
async with httpx.AsyncClient() as client:
token_response = await client.post(
"https://oauth2.googleapis.com/token",
data={
"client_id": settings.google_client_id,
"client_secret": settings.google_client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
},
)
if token_response.status_code != 200:
print(f"[OAuth] Token exchange failed: {token_response.text}")
return RedirectResponse(
url=f"{frontend_url}?auth_error=token_exchange_failed"
)
tokens = token_response.json()
# Verify the user's email matches
userinfo_response = await client.get(
"https://www.googleapis.com/oauth2/v2/userinfo",
headers={"Authorization": f"Bearer {tokens['access_token']}"}
)
if userinfo_response.status_code == 200:
userinfo = userinfo_response.json()
verified_email = userinfo.get("email", "")
# Use the verified email from Google
if verified_email:
teacher_email = verified_email
# Save tokens
await save_oauth_tokens(teacher_email, {
"access_token": tokens.get("access_token"),
"refresh_token": tokens.get("refresh_token"),
"expires_in": tokens.get("expires_in"),
})
print(f"[OAuth] Success for {teacher_email}")
return RedirectResponse(
url=f"{frontend_url}?auth_success=true&email={teacher_email}"
)
except Exception as e:
print(f"[OAuth] Error: {e}")
return RedirectResponse(
url=f"{frontend_url}?auth_error={str(e)}"
)
@router.get("/status")
async def auth_status(teacher_email: str):
"""
Check if a teacher is authenticated.
Query params:
teacher_email: The teacher's email address
"""
tokens = await get_oauth_tokens(teacher_email)
return {
"authenticated": tokens is not None,
"email": teacher_email if tokens else None
}
@router.post("/disconnect")
async def disconnect(teacher_email: str):
"""
Disconnect a teacher's Google account.
Query params:
teacher_email: The teacher's email address
"""
await delete_oauth_tokens(teacher_email)
return {"status": "ok", "message": "Google account disconnected"}
@router.get("/debug")
async def debug_oauth(request: Request):
"""Debug endpoint to check OAuth configuration."""
settings = get_settings()
redirect_uri = get_redirect_uri(request, settings)
frontend_url = get_frontend_url(request, settings)
return {
"google_client_id_configured": bool(settings.google_client_id),
"google_client_secret_configured": bool(settings.google_client_secret),
"configured_redirect_uri": settings.google_redirect_uri,
"auto_detected_redirect_uri": redirect_uri,
"configured_frontend_url": settings.frontend_url,
"auto_detected_frontend_url": frontend_url,
"request_headers": {
"host": request.headers.get("host"),
"x-forwarded-host": request.headers.get("x-forwarded-host"),
"x-forwarded-proto": request.headers.get("x-forwarded-proto"),
}
}