gcli2api / src /panel /auth.py
a3216's picture
sync: github -> hf space
c50496f
"""
认证路由模块 - 处理 /auth/* 相关的HTTP请求
"""
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from log import log
from src.auth import (
asyncio_complete_auth_flow,
complete_auth_flow_from_callback_url,
create_auth_url,
get_auth_status,
verify_password,
)
from src.models import (
LoginRequest,
AuthStartRequest,
AuthCallbackRequest,
AuthCallbackUrlRequest,
)
from src.utils import verify_panel_token
# 创建路由器
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/login")
async def login(request: LoginRequest):
"""用户登录(简化版:直接返回密码作为token)"""
try:
if await verify_password(request.password):
# 直接使用密码作为token,简化认证流程
return JSONResponse(content={"token": request.password, "message": "登录成功"})
else:
raise HTTPException(status_code=401, detail="密码错误")
except HTTPException:
raise
except Exception as e:
log.error(f"登录失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/start")
async def start_auth(request: AuthStartRequest, token: str = Depends(verify_panel_token)):
"""开始认证流程,支持自动检测项目ID"""
try:
# 如果没有提供项目ID,尝试自动检测
project_id = request.project_id
if not project_id:
log.info("用户未提供项目ID,后续将使用自动检测...")
# 使用认证令牌作为用户会话标识
user_session = token if token else None
result = await create_auth_url(
project_id, user_session, mode=request.mode
)
if result["success"]:
return JSONResponse(
content={
"auth_url": result["auth_url"],
"state": result["state"],
"auto_project_detection": result.get("auto_project_detection", False),
"detected_project_id": result.get("detected_project_id"),
}
)
else:
raise HTTPException(status_code=500, detail=result["error"])
except HTTPException:
raise
except Exception as e:
log.error(f"开始认证流程失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/callback")
async def auth_callback(request: AuthCallbackRequest, token: str = Depends(verify_panel_token)):
"""处理认证回调,支持自动检测项目ID"""
try:
# 项目ID现在是可选的,在回调处理中进行自动检测
project_id = request.project_id
# 使用认证令牌作为用户会话标识
user_session = token if token else None
# 异步等待OAuth回调完成
result = await asyncio_complete_auth_flow(
project_id, user_session, mode=request.mode
)
if result["success"]:
# 单项目认证成功
return JSONResponse(
content={
"credentials": result["credentials"],
"file_path": result["file_path"],
"message": "认证成功,凭证已保存",
"auto_detected_project": result.get("auto_detected_project", False),
}
)
else:
# 如果需要手动项目ID或项目选择,在响应中标明
if result.get("requires_manual_project_id"):
# 使用JSON响应
return JSONResponse(
status_code=400,
content={"error": result["error"], "requires_manual_project_id": True},
)
elif result.get("requires_project_selection"):
# 返回项目列表供用户选择
return JSONResponse(
status_code=400,
content={
"error": result["error"],
"requires_project_selection": True,
"available_projects": result["available_projects"],
},
)
else:
raise HTTPException(status_code=400, detail=result["error"])
except HTTPException:
raise
except Exception as e:
log.error(f"处理认证回调失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/callback-url")
async def auth_callback_url(request: AuthCallbackUrlRequest, token: str = Depends(verify_panel_token)):
"""从回调URL直接完成认证"""
try:
# 验证URL格式
if not request.callback_url or not request.callback_url.startswith(("http://", "https://")):
raise HTTPException(status_code=400, detail="请提供有效的回调URL")
# 从回调URL完成认证
result = await complete_auth_flow_from_callback_url(
request.callback_url, request.project_id, mode=request.mode
)
if result["success"]:
# 单项目认证成功
return JSONResponse(
content={
"credentials": result["credentials"],
"file_path": result["file_path"],
"message": "从回调URL认证成功,凭证已保存",
"auto_detected_project": result.get("auto_detected_project", False),
}
)
else:
# 处理各种错误情况
if result.get("requires_manual_project_id"):
return JSONResponse(
status_code=400,
content={"error": result["error"], "requires_manual_project_id": True},
)
elif result.get("requires_project_selection"):
return JSONResponse(
status_code=400,
content={
"error": result["error"],
"requires_project_selection": True,
"available_projects": result["available_projects"],
},
)
else:
raise HTTPException(status_code=400, detail=result["error"])
except HTTPException:
raise
except Exception as e:
log.error(f"从回调URL处理认证失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/status/{project_id}")
async def check_auth_status(project_id: str, token: str = Depends(verify_panel_token)):
"""检查认证状态"""
try:
if not project_id:
raise HTTPException(status_code=400, detail="Project ID 不能为空")
status = get_auth_status(project_id)
return JSONResponse(content=status)
except Exception as e:
log.error(f"检查认证状态失败: {e}")
raise HTTPException(status_code=500, detail=str(e))