wu981526092's picture
Security & HF Spaces fixes: Enable CSRF, auth middleware, persistent storage
ea856a6
"""
Authentication routes for Hugging Face OAuth integration.
These routes are only active when running in HF Spaces environment.
"""
import os
import logging
import secrets
from typing import Optional
from pathlib import Path
from fastapi import APIRouter, Request, Response, HTTPException
from fastapi.responses import RedirectResponse, HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from utils.environment import should_enable_auth, get_oauth_config, is_huggingface_space
import requests
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/auth", tags=["authentication"])
# Setup templates
templates_dir = Path(__file__).parent.parent / "templates"
templates = Jinja2Templates(directory=str(templates_dir))
@router.get("/status")
async def auth_status(request: Request):
"""Get authentication status and configuration."""
config = get_oauth_config()
user = getattr(request.state, "user", None)
if not user:
try:
user = request.session.get("user")
except (AttributeError, AssertionError):
user = None
return {
"auth_enabled": should_enable_auth(),
"environment": "huggingface_spaces" if is_huggingface_space() else "local_development",
"oauth_available": bool(config),
"login_required": True, # Mandatory for OpenAI API protection
"authenticated": bool(user), # Frontend expects this field
"user": user, # Frontend expects complete user object
"user_authenticated": bool(user), # Legacy field for compatibility
"user_info": {
"auth_method": user.get("auth_method") if user else None,
"username": user.get("username") if user else None,
} if user else None,
"hf_sign_detected": bool(request.query_params.get("__sign")) if is_huggingface_space() else False,
}
@router.get("/oauth-config")
async def get_oauth_config_for_frontend():
"""Get OAuth configuration for frontend use (public information only)."""
if not should_enable_auth():
return {"oauth_enabled": False}
config = get_oauth_config()
if not config:
return {"oauth_enabled": False, "error": "OAuth not configured"}
# Only return public information (never return client_secret)
return {
"oauth_enabled": True,
"client_id": config["client_id"],
"scopes": config["scopes"],
"provider_url": config["provider_url"],
"is_hf_spaces": is_huggingface_space()
}
@router.get("/login")
async def login(request: Request):
"""
Initiate OAuth login flow.
Only available in HF Spaces environment.
"""
if not should_enable_auth():
return JSONResponse(
content={"message": "Authentication not required in local development"},
status_code=200
)
oauth_config = get_oauth_config()
if not oauth_config:
raise HTTPException(
status_code=500,
detail="OAuth not configured in this environment"
)
# Generate state for CSRF protection
state = secrets.token_urlsafe(32)
try:
request.session["oauth_state"] = state
logger.info(f"๐Ÿ”‘ OAuth state saved: {state[:8]}...")
except Exception as e:
logger.error(f"Failed to save OAuth state: {e}")
raise HTTPException(status_code=500, detail="Session configuration error")
# Get the current host for redirect URI (HF official way)
if is_huggingface_space():
# Use SPACE_HOST as recommended by HF docs
space_host = os.getenv("SPACE_HOST")
if space_host:
base_url = f"https://{space_host}"
else:
# Fallback to manual construction
space_id = os.getenv("SPACE_ID", "")
if space_id:
space_domain = space_id.replace("/", "-").lower()
base_url = f"https://{space_domain}.hf.space"
else:
base_url = str(request.base_url).rstrip('/')
else:
base_url = str(request.base_url).rstrip('/')
redirect_uri = f"{base_url}/auth/oauth-callback"
# Build authorization URL
auth_url = (
f"{oauth_config['provider_url']}/oauth/authorize"
f"?client_id={oauth_config['client_id']}"
f"&redirect_uri={redirect_uri}"
f"&response_type=code"
f"&scope=read-repos" # Use only supported scope
f"&state={state}"
)
return RedirectResponse(url=auth_url, status_code=302)
@router.get("/callback")
async def oauth_callback(request: Request, code: str, state: str):
"""Legacy callback endpoint (keep for compatibility)"""
return await handle_oauth_callback(request, code, state)
@router.get("/oauth-callback")
async def oauth_callback_new(request: Request, code: str, state: str):
"""New OAuth callback endpoint for direct OAuth flow"""
return await handle_oauth_callback(request, code, state)
async def handle_oauth_callback(request: Request, code: str, state: str):
"""
Handle OAuth callback from Hugging Face.
"""
if not should_enable_auth():
return RedirectResponse(url="/", status_code=302)
oauth_config = get_oauth_config()
if not oauth_config:
raise HTTPException(status_code=500, detail="OAuth not configured")
# Verify state parameter (CSRF protection)
try:
stored_state = request.session.get("oauth_state")
logger.info(f"๐Ÿ” Verifying OAuth state - stored: {stored_state[:8] if stored_state else 'None'}..., received: {state[:8]}...")
except Exception as e:
logger.error(f"Failed to access session for state verification: {e}")
stored_state = None
if not stored_state:
logger.error("๐Ÿšซ No stored OAuth state found - CSRF protection triggered")
raise HTTPException(
status_code=400,
detail="No stored state found. Your session may have expired. Please try logging in again."
)
elif stored_state != state:
logger.error(f"๐Ÿšซ OAuth state mismatch - CSRF protection triggered")
raise HTTPException(
status_code=400,
detail="State parameter mismatch. Please try logging in again."
)
else:
logger.info("โœ… OAuth state verification successful")
# Exchange code for tokens
# Get the current host for redirect URI (HF official way)
if is_huggingface_space():
# Use SPACE_HOST as recommended by HF docs
space_host = os.getenv("SPACE_HOST")
if space_host:
base_url = f"https://{space_host}"
else:
# Fallback to manual construction
space_id = os.getenv("SPACE_ID", "")
if space_id:
space_domain = space_id.replace("/", "-").lower()
base_url = f"https://{space_domain}.hf.space"
else:
base_url = str(request.base_url).rstrip('/')
else:
base_url = str(request.base_url).rstrip('/')
redirect_uri = f"{base_url}/auth/oauth-callback"
try:
token_response = requests.post(
f"{oauth_config['provider_url']}/oauth/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": oauth_config['client_id'],
"client_secret": oauth_config['client_secret'],
},
timeout=10
)
token_response.raise_for_status()
tokens = token_response.json()
except requests.RequestException as e:
logger.error(f"Token exchange failed: {e}")
raise HTTPException(status_code=400, detail="Token exchange failed")
access_token = tokens.get("access_token")
if not access_token:
raise HTTPException(status_code=400, detail="No access token received")
# Get user information
try:
user_response = requests.get(
f"{oauth_config['provider_url']}/api/whoami-v2",
headers={"Authorization": f"Bearer {access_token}"},
timeout=10
)
user_response.raise_for_status()
user_info = user_response.json()
# Debug: log user info structure
logger.info(f"๐Ÿ” HF User info received: {list(user_info.keys())}")
logger.info(f"๐Ÿ“ User details - id: {user_info.get('id')}, name: {user_info.get('name')}, login: {user_info.get('login')}")
except requests.RequestException as e:
logger.error(f"User info fetch failed: {e}")
raise HTTPException(status_code=400, detail="Failed to fetch user information")
# Store user in session
# Try multiple fields for username as HF API might use different field names
username = (user_info.get("login") or
user_info.get("username") or
user_info.get("name") or
user_info.get("id") or
"unknown_user")
user_data = {
"id": user_info.get("id"),
"name": user_info.get("name") or user_info.get("fullName") or username,
"username": username,
"email": user_info.get("email"),
"avatar_url": user_info.get("avatarUrl") or user_info.get("avatar"),
"access_token": access_token, # Store for future API calls if needed
"auth_method": "oauth"
}
try:
request.session["user"] = user_data
logger.info(f"๐Ÿ’พ User data saved to session: {user_data['username']}")
# Verify session was saved
stored_user = request.session.get("user")
if stored_user:
logger.info(f"โœ… Session verification successful: {stored_user['username']}")
else:
logger.error("โŒ Session verification failed - user not found after saving")
# Debug session state after saving
if is_huggingface_space():
session_keys = list(request.session.keys()) if hasattr(request.session, 'keys') else []
logger.info(f"๐Ÿ” Auth callback - session keys after save: {session_keys}")
logger.info(f"๐Ÿ” Auth callback - full session: {dict(request.session)}")
except Exception as e:
logger.error(f"โŒ Failed to save user to session: {e}")
raise HTTPException(status_code=500, detail="Session save failed")
# Clean up state
request.session.pop("oauth_state", None)
logger.info(f"User logged in: {user_info.get('name')} ({user_info.get('login')})")
# Redirect to main application
return RedirectResponse(url="/", status_code=302)
@router.get("/logout")
async def logout(request: Request):
"""Log out the current user."""
if hasattr(request, "session"):
request.session.clear()
return RedirectResponse(url="/", status_code=302)
@router.get("/user")
async def get_current_user(request: Request):
"""Get current user information."""
if not should_enable_auth():
return {"message": "Authentication disabled in local development"}
user = getattr(request.state, "user", None)
if not user:
try:
user = request.session.get("user")
except (AttributeError, AssertionError):
user = None
if not user:
raise HTTPException(status_code=401, detail="Not authenticated")
# Return user info without sensitive data
return {
"id": user.get("id"),
"name": user.get("name"),
"username": user.get("username"),
"email": user.get("email"),
"avatar_url": user.get("avatar_url"),
"auth_method": user.get("auth_method", "unknown"),
"authenticated": True,
}
@router.get("/login-page")
async def login_page(request: Request):
"""
Serve the login page using Jinja2 template.
"""
if not should_enable_auth():
return RedirectResponse(url="/", status_code=302)
return templates.TemplateResponse("login.html", {"request": request})
@router.get("/debug")
async def debug_session(request: Request):
"""Debug endpoint to check session state."""
try:
session_data = dict(request.session) if hasattr(request, 'session') else {}
user = request.session.get("user") if hasattr(request, 'session') else None
return {
"session_available": hasattr(request, 'session'),
"session_data_keys": list(session_data.keys()),
"user_in_session": bool(user),
"user_info": {
"username": user.get("username") if user else None,
"auth_method": user.get("auth_method") if user else None
} if user else None,
"request_state_user": bool(getattr(request.state, "user", None)),
}
except Exception as e:
return {"error": str(e), "session_available": False}