""" Web路由模块 - 处理认证相关的HTTP请求和控制面板功能 用于与上级web.py集成 """ import asyncio import datetime import io import json import os import time import zipfile from collections import deque from typing import List from fastapi import ( APIRouter, Depends, File, HTTPException, Request, UploadFile, WebSocket, WebSocketDisconnect, ) from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, Response from starlette.websockets import WebSocketState import config from log import log from src.auth import ( asyncio_complete_auth_flow, complete_auth_flow_from_callback_url, create_auth_url, get_auth_status, verify_password, ) from src.credential_manager import CredentialManager from .models import ( LoginRequest, AuthStartRequest, AuthCallbackRequest, AuthCallbackUrlRequest, CredFileActionRequest, CredFileBatchActionRequest, ConfigSaveRequest, ) from src.storage_adapter import get_storage_adapter from src.utils import verify_panel_token, GEMINICLI_USER_AGENT, ANTIGRAVITY_USER_AGENT from src.api.antigravity import fetch_quota_info from src.google_oauth_api import Credentials, fetch_project_id from config import get_code_assist_endpoint, get_antigravity_api_url # 创建路由器 router = APIRouter() # 创建credential manager实例(延迟初始化,在首次使用时自动初始化) credential_manager = CredentialManager() # WebSocket连接管理 class ConnectionManager: def __init__(self, max_connections: int = 3): # 进一步降低最大连接数 # 使用双端队列严格限制内存使用 self.active_connections: deque = deque(maxlen=max_connections) self.max_connections = max_connections self._last_cleanup = 0 self._cleanup_interval = 120 # 120秒清理一次死连接 async def connect(self, websocket: WebSocket): # 自动清理死连接 self._auto_cleanup() # 限制最大连接数,防止内存无限增长 if len(self.active_connections) >= self.max_connections: await websocket.close(code=1008, reason="Too many connections") return False await websocket.accept() self.active_connections.append(websocket) log.debug(f"WebSocket连接建立,当前连接数: {len(self.active_connections)}") return True def disconnect(self, websocket: WebSocket): # 使用更高效的方式移除连接 try: self.active_connections.remove(websocket) except ValueError: pass # 连接已不存在 log.debug(f"WebSocket连接断开,当前连接数: {len(self.active_connections)}") async def send_personal_message(self, message: str, websocket: WebSocket): try: await websocket.send_text(message) except Exception: self.disconnect(websocket) async def broadcast(self, message: str): # 使用更高效的方式处理广播,避免索引操作 dead_connections = [] for conn in self.active_connections: try: await conn.send_text(message) except Exception: dead_connections.append(conn) # 批量移除死连接 for dead_conn in dead_connections: self.disconnect(dead_conn) def _auto_cleanup(self): """自动清理死连接""" current_time = time.time() if current_time - self._last_cleanup > self._cleanup_interval: self.cleanup_dead_connections() self._last_cleanup = current_time def cleanup_dead_connections(self): """清理已断开的连接""" original_count = len(self.active_connections) # 使用列表推导式过滤活跃连接,更高效 alive_connections = deque( [ conn for conn in self.active_connections if hasattr(conn, "client_state") and conn.client_state != WebSocketState.DISCONNECTED ], maxlen=self.max_connections, ) self.active_connections = alive_connections cleaned = original_count - len(self.active_connections) if cleaned > 0: log.debug(f"清理了 {cleaned} 个死连接,剩余连接数: {len(self.active_connections)}") manager = ConnectionManager() async def ensure_credential_manager_initialized(): """确保credential manager已初始化""" if not credential_manager._initialized: await credential_manager.initialize() async def get_credential_manager(): """获取全局凭证管理器实例(已废弃,直接使用模块级的 credential_manager)""" global credential_manager # 确保已初始化(在首次使用时自动初始化) await credential_manager._ensure_initialized() return credential_manager def is_mobile_user_agent(user_agent: str) -> bool: """检测是否为移动设备用户代理""" if not user_agent: return False user_agent_lower = user_agent.lower() mobile_keywords = [ "mobile", "android", "iphone", "ipad", "ipod", "blackberry", "windows phone", "samsung", "htc", "motorola", "nokia", "palm", "webos", "opera mini", "opera mobi", "fennec", "minimo", "symbian", "psp", "nintendo", "tablet", ] return any(keyword in user_agent_lower for keyword in mobile_keywords) @router.get("/", response_class=HTMLResponse) async def serve_control_panel(request: Request): """提供统一控制面板""" try: user_agent = request.headers.get("user-agent", "") is_mobile = is_mobile_user_agent(user_agent) if is_mobile: html_file_path = "front/control_panel_mobile.html" else: html_file_path = "front/control_panel.html" with open(html_file_path, "r", encoding="utf-8") as f: html_content = f.read() return HTMLResponse(content=html_content) except Exception as e: log.error(f"加载控制面板页面失败: {e}") raise HTTPException(status_code=500, detail="服务器内部错误") @router.post("/auth/login") async def login(request: LoginRequest): """用户登录(简化版:直接返回密码作为token)""" try: if await verify_password(request.password): # 直接使用密码作为token,简化认证流程 return JSONResponse(content={"token": request.password, "message": "登录成功"}) else: raise HTTPException(status_code=401, detail="密码错误") except HTTPException: raise except Exception as e: log.error(f"登录失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/auth/start") async def start_auth(request: AuthStartRequest, token: str = Depends(verify_panel_token)): """开始认证流程,支持自动检测项目ID""" try: # 如果没有提供项目ID,尝试自动检测 project_id = request.project_id if not project_id: log.info("用户未提供项目ID,后续将使用自动检测...") # 使用认证令牌作为用户会话标识 user_session = token if token else None result = await create_auth_url( project_id, user_session, mode=request.mode ) if result["success"]: return JSONResponse( content={ "auth_url": result["auth_url"], "state": result["state"], "auto_project_detection": result.get("auto_project_detection", False), "detected_project_id": result.get("detected_project_id"), } ) else: raise HTTPException(status_code=500, detail=result["error"]) except HTTPException: raise except Exception as e: log.error(f"开始认证流程失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/auth/callback") async def auth_callback(request: AuthCallbackRequest, token: str = Depends(verify_panel_token)): """处理认证回调,支持自动检测项目ID""" try: # 项目ID现在是可选的,在回调处理中进行自动检测 project_id = request.project_id # 使用认证令牌作为用户会话标识 user_session = token if token else None # 异步等待OAuth回调完成 result = await asyncio_complete_auth_flow( project_id, user_session, mode=request.mode ) if result["success"]: # 单项目认证成功 return JSONResponse( content={ "credentials": result["credentials"], "file_path": result["file_path"], "message": "认证成功,凭证已保存", "auto_detected_project": result.get("auto_detected_project", False), } ) else: # 如果需要手动项目ID或项目选择,在响应中标明 if result.get("requires_manual_project_id"): # 使用JSON响应 return JSONResponse( status_code=400, content={"error": result["error"], "requires_manual_project_id": True}, ) elif result.get("requires_project_selection"): # 返回项目列表供用户选择 return JSONResponse( status_code=400, content={ "error": result["error"], "requires_project_selection": True, "available_projects": result["available_projects"], }, ) else: raise HTTPException(status_code=400, detail=result["error"]) except HTTPException: raise except Exception as e: log.error(f"处理认证回调失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/auth/callback-url") async def auth_callback_url(request: AuthCallbackUrlRequest, token: str = Depends(verify_panel_token)): """从回调URL直接完成认证""" try: # 验证URL格式 if not request.callback_url or not request.callback_url.startswith(("http://", "https://")): raise HTTPException(status_code=400, detail="请提供有效的回调URL") # 从回调URL完成认证 result = await complete_auth_flow_from_callback_url( request.callback_url, request.project_id, mode=request.mode ) if result["success"]: # 单项目认证成功 return JSONResponse( content={ "credentials": result["credentials"], "file_path": result["file_path"], "message": "从回调URL认证成功,凭证已保存", "auto_detected_project": result.get("auto_detected_project", False), } ) else: # 处理各种错误情况 if result.get("requires_manual_project_id"): return JSONResponse( status_code=400, content={"error": result["error"], "requires_manual_project_id": True}, ) elif result.get("requires_project_selection"): return JSONResponse( status_code=400, content={ "error": result["error"], "requires_project_selection": True, "available_projects": result["available_projects"], }, ) else: raise HTTPException(status_code=400, detail=result["error"]) except HTTPException: raise except Exception as e: log.error(f"从回调URL处理认证失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/auth/status/{project_id}") async def check_auth_status(project_id: str, token: str = Depends(verify_panel_token)): """检查认证状态""" try: if not project_id: raise HTTPException(status_code=400, detail="Project ID 不能为空") status = get_auth_status(project_id) return JSONResponse(content=status) except Exception as e: log.error(f"检查认证状态失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================= # 工具函数 (Helper Functions) # ============================================================================= def validate_mode(mode: str = "geminicli") -> str: """ 验证 mode 参数 Args: mode: 模式字符串 ("geminicli" 或 "antigravity") Returns: str: 验证后的 mode 字符串 Raises: HTTPException: 如果 mode 参数无效 """ if mode not in ["geminicli", "antigravity"]: raise HTTPException( status_code=400, detail=f"无效的 mode 参数: {mode},只支持 'geminicli' 或 'antigravity'" ) return mode def get_env_locked_keys() -> set: """获取被环境变量锁定的配置键集合""" env_locked_keys = set() # 使用 config.py 中统一维护的映射表 for env_key, config_key in config.ENV_MAPPINGS.items(): if os.getenv(env_key): env_locked_keys.add(config_key) return env_locked_keys async def extract_json_files_from_zip(zip_file: UploadFile) -> List[dict]: """从ZIP文件中提取JSON文件""" try: # 读取ZIP文件内容 zip_content = await zip_file.read() # 不限制ZIP文件大小,只在处理时控制文件数量 files_data = [] with zipfile.ZipFile(io.BytesIO(zip_content), "r") as zip_ref: # 获取ZIP中的所有文件 file_list = zip_ref.namelist() json_files = [ f for f in file_list if f.endswith(".json") and not f.startswith("__MACOSX/") ] if not json_files: raise HTTPException(status_code=400, detail="ZIP文件中没有找到JSON文件") log.info(f"从ZIP文件 {zip_file.filename} 中找到 {len(json_files)} 个JSON文件") for json_filename in json_files: try: # 读取JSON文件内容 with zip_ref.open(json_filename) as json_file: content = json_file.read() try: content_str = content.decode("utf-8") except UnicodeDecodeError: log.warning(f"跳过编码错误的文件: {json_filename}") continue # 使用原始文件名(去掉路径) filename = os.path.basename(json_filename) files_data.append({"filename": filename, "content": content_str}) except Exception as e: log.warning(f"处理ZIP中的文件 {json_filename} 时出错: {e}") continue log.info(f"成功从ZIP文件中提取 {len(files_data)} 个有效的JSON文件") return files_data except zipfile.BadZipFile: raise HTTPException(status_code=400, detail="无效的ZIP文件格式") except Exception as e: log.error(f"处理ZIP文件失败: {e}") raise HTTPException(status_code=500, detail=f"处理ZIP文件失败: {str(e)}") async def upload_credentials_common( files: List[UploadFile], mode: str = "geminicli" ) -> JSONResponse: """批量上传凭证文件的通用函数""" mode = validate_mode(mode) if not files: raise HTTPException(status_code=400, detail="请选择要上传的文件") # 检查文件数量限制 if len(files) > 100: raise HTTPException( status_code=400, detail=f"文件数量过多,最多支持100个文件,当前:{len(files)}个" ) files_data = [] for file in files: # 检查文件类型:支持JSON和ZIP if file.filename.endswith(".zip"): zip_files_data = await extract_json_files_from_zip(file) files_data.extend(zip_files_data) log.info(f"从ZIP文件 {file.filename} 中提取了 {len(zip_files_data)} 个JSON文件") elif file.filename.endswith(".json"): # 处理单个JSON文件 - 流式读取 content_chunks = [] while True: chunk = await file.read(8192) if not chunk: break content_chunks.append(chunk) content = b"".join(content_chunks) try: content_str = content.decode("utf-8") except UnicodeDecodeError: raise HTTPException( status_code=400, detail=f"文件 {file.filename} 编码格式不支持" ) files_data.append({"filename": file.filename, "content": content_str}) else: raise HTTPException( status_code=400, detail=f"文件 {file.filename} 格式不支持,只支持JSON和ZIP文件" ) batch_size = 1000 all_results = [] total_success = 0 for i in range(0, len(files_data), batch_size): batch_files = files_data[i : i + batch_size] async def process_single_file(file_data): try: filename = file_data["filename"] # 确保文件名只保存basename,避免路径问题 filename = os.path.basename(filename) content_str = file_data["content"] credential_data = json.loads(content_str) # 根据凭证类型调用不同的添加方法 if mode == "antigravity": await credential_manager.add_antigravity_credential(filename, credential_data) else: await credential_manager.add_credential(filename, credential_data) log.debug(f"成功上传 {mode} 凭证文件: {filename}") return {"filename": filename, "status": "success", "message": "上传成功"} except json.JSONDecodeError as e: return { "filename": file_data["filename"], "status": "error", "message": f"JSON格式错误: {str(e)}", } except Exception as e: return { "filename": file_data["filename"], "status": "error", "message": f"处理失败: {str(e)}", } log.info(f"开始并发处理 {len(batch_files)} 个 {mode} 文件...") concurrent_tasks = [process_single_file(file_data) for file_data in batch_files] batch_results = await asyncio.gather(*concurrent_tasks, return_exceptions=True) processed_results = [] batch_uploaded_count = 0 for result in batch_results: if isinstance(result, Exception): processed_results.append( { "filename": "unknown", "status": "error", "message": f"处理异常: {str(result)}", } ) else: processed_results.append(result) if result["status"] == "success": batch_uploaded_count += 1 all_results.extend(processed_results) total_success += batch_uploaded_count batch_num = (i // batch_size) + 1 total_batches = (len(files_data) + batch_size - 1) // batch_size log.info( f"批次 {batch_num}/{total_batches} 完成: 成功 " f"{batch_uploaded_count}/{len(batch_files)} 个 {mode} 文件" ) if total_success > 0: return JSONResponse( content={ "uploaded_count": total_success, "total_count": len(files_data), "results": all_results, "message": f"批量上传完成: 成功 {total_success}/{len(files_data)} 个 {mode} 文件", } ) else: raise HTTPException(status_code=400, detail=f"没有 {mode} 文件上传成功") async def get_creds_status_common( offset: int, limit: int, status_filter: str, mode: str = "geminicli", error_code_filter: str = None, cooldown_filter: str = None ) -> JSONResponse: """获取凭证文件状态的通用函数""" mode = validate_mode(mode) # 验证分页参数 if offset < 0: raise HTTPException(status_code=400, detail="offset 必须大于等于 0") if limit not in [20, 50, 100, 200, 500, 1000]: raise HTTPException(status_code=400, detail="limit 只能是 20、50、100、200、500 或 1000") if status_filter not in ["all", "enabled", "disabled"]: raise HTTPException(status_code=400, detail="status_filter 只能是 all、enabled 或 disabled") if cooldown_filter and cooldown_filter not in ["all", "in_cooldown", "no_cooldown"]: raise HTTPException(status_code=400, detail="cooldown_filter 只能是 all、in_cooldown 或 no_cooldown") storage_adapter = await get_storage_adapter() backend_info = await storage_adapter.get_backend_info() backend_type = backend_info.get("backend_type", "unknown") # 优先使用高性能的分页摘要查询 if hasattr(storage_adapter._backend, 'get_credentials_summary'): result = await storage_adapter._backend.get_credentials_summary( offset=offset, limit=limit, status_filter=status_filter, mode=mode, error_code_filter=error_code_filter if error_code_filter and error_code_filter != "all" else None, cooldown_filter=cooldown_filter if cooldown_filter and cooldown_filter != "all" else None ) creds_list = [] for summary in result["items"]: cred_info = { "filename": os.path.basename(summary["filename"]), "user_email": summary["user_email"], "disabled": summary["disabled"], "error_codes": summary["error_codes"], "last_success": summary["last_success"], "backend_type": backend_type, "model_cooldowns": summary.get("model_cooldowns", {}), } creds_list.append(cred_info) return JSONResponse(content={ "items": creds_list, "total": result["total"], "offset": offset, "limit": limit, "has_more": (offset + limit) < result["total"], "stats": result.get("stats", {"total": 0, "normal": 0, "disabled": 0}), }) # 回退到传统方式(MongoDB/其他后端) all_credentials = await storage_adapter.list_credentials(mode=mode) all_states = await storage_adapter.get_all_credential_states(mode=mode) # 应用状态筛选 filtered_credentials = [] for filename in all_credentials: file_status = all_states.get(filename, {"disabled": False}) is_disabled = file_status.get("disabled", False) if status_filter == "all": filtered_credentials.append(filename) elif status_filter == "enabled" and not is_disabled: filtered_credentials.append(filename) elif status_filter == "disabled" and is_disabled: filtered_credentials.append(filename) total_count = len(filtered_credentials) paginated_credentials = filtered_credentials[offset:offset + limit] creds_list = [] for filename in paginated_credentials: file_status = all_states.get(filename, { "error_codes": [], "disabled": False, "last_success": time.time(), "user_email": None, }) cred_info = { "filename": os.path.basename(filename), "user_email": file_status.get("user_email"), "disabled": file_status.get("disabled", False), "error_codes": file_status.get("error_codes", []), "last_success": file_status.get("last_success", time.time()), "backend_type": backend_type, "model_cooldowns": file_status.get("model_cooldowns", {}), } creds_list.append(cred_info) return JSONResponse(content={ "items": creds_list, "total": total_count, "offset": offset, "limit": limit, "has_more": (offset + limit) < total_count, }) async def download_all_creds_common(mode: str = "geminicli") -> Response: """打包下载所有凭证文件的通用函数""" mode = validate_mode(mode) zip_filename = "antigravity_credentials.zip" if mode == "antigravity" else "credentials.zip" storage_adapter = await get_storage_adapter() credential_filenames = await storage_adapter.list_credentials(mode=mode) if not credential_filenames: raise HTTPException(status_code=404, detail=f"没有找到 {mode} 凭证文件") log.info(f"开始打包 {len(credential_filenames)} 个 {mode} 凭证文件...") zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file: success_count = 0 for idx, filename in enumerate(credential_filenames, 1): try: credential_data = await storage_adapter.get_credential(filename, mode=mode) if credential_data: content = json.dumps(credential_data, ensure_ascii=False, indent=2) zip_file.writestr(os.path.basename(filename), content) success_count += 1 if idx % 10 == 0: log.debug(f"打包进度: {idx}/{len(credential_filenames)}") except Exception as e: log.warning(f"处理 {mode} 凭证文件 {filename} 时出错: {e}") continue log.info(f"打包完成: 成功 {success_count}/{len(credential_filenames)} 个文件") zip_buffer.seek(0) return Response( content=zip_buffer.getvalue(), media_type="application/zip", headers={"Content-Disposition": f"attachment; filename={zip_filename}"}, ) async def fetch_user_email_common(filename: str, mode: str = "geminicli") -> JSONResponse: """获取指定凭证文件用户邮箱的通用函数""" mode = validate_mode(mode) filename_only = os.path.basename(filename) if not filename_only.endswith(".json"): raise HTTPException(status_code=404, detail="无效的文件名") storage_adapter = await get_storage_adapter() credential_data = await storage_adapter.get_credential(filename_only, mode=mode) if not credential_data: raise HTTPException(status_code=404, detail="凭证文件不存在") email = await credential_manager.get_or_fetch_user_email(filename_only, mode=mode) if email: return JSONResponse( content={ "filename": filename_only, "user_email": email, "message": "成功获取用户邮箱", } ) else: return JSONResponse( content={ "filename": filename_only, "user_email": None, "message": "无法获取用户邮箱,可能凭证已过期或权限不足", }, status_code=400, ) async def refresh_all_user_emails_common(mode: str = "geminicli") -> JSONResponse: """刷新所有凭证文件用户邮箱的通用函数 - 只为没有邮箱的凭证获取 利用 get_all_credential_states 批量获取状态 """ mode = validate_mode(mode) storage_adapter = await get_storage_adapter() # 一次性批量获取所有凭证的状态 all_states = await storage_adapter.get_all_credential_states(mode=mode) results = [] success_count = 0 skipped_count = 0 # 在内存中筛选出需要获取邮箱的凭证 for filename, state in all_states.items(): try: cached_email = state.get("user_email") if cached_email: # 已有邮箱,跳过获取 skipped_count += 1 results.append({ "filename": os.path.basename(filename), "user_email": cached_email, "success": True, "skipped": True, }) continue # 没有邮箱,尝试获取 email = await credential_manager.get_or_fetch_user_email(filename, mode=mode) if email: success_count += 1 results.append({ "filename": os.path.basename(filename), "user_email": email, "success": True, }) else: results.append({ "filename": os.path.basename(filename), "user_email": None, "success": False, "error": "无法获取邮箱", }) except Exception as e: results.append({ "filename": os.path.basename(filename), "user_email": None, "success": False, "error": str(e), }) total_count = len(all_states) return JSONResponse( content={ "success_count": success_count, "total_count": total_count, "skipped_count": skipped_count, "results": results, "message": f"成功获取 {success_count}/{total_count} 个邮箱地址,跳过 {skipped_count} 个已有邮箱的凭证", } ) async def deduplicate_credentials_by_email_common(mode: str = "geminicli") -> JSONResponse: """批量去重凭证文件的通用函数 - 删除邮箱相同的凭证(只保留一个)""" mode = validate_mode(mode) storage_adapter = await get_storage_adapter() try: duplicate_info = await storage_adapter._backend.get_duplicate_credentials_by_email( mode=mode ) duplicate_groups = duplicate_info.get("duplicate_groups", []) no_email_files = duplicate_info.get("no_email_files", []) total_count = duplicate_info.get("total_count", 0) if not duplicate_groups: return JSONResponse( content={ "deleted_count": 0, "kept_count": total_count, "total_count": total_count, "unique_emails_count": duplicate_info.get("unique_email_count", 0), "no_email_count": len(no_email_files), "duplicate_groups": [], "delete_errors": [], "message": "没有发现重复的凭证(相同邮箱)", } ) # 执行删除操作 deleted_count = 0 delete_errors = [] result_duplicate_groups = [] for group in duplicate_groups: email = group["email"] kept_file = group["kept_file"] duplicate_files = group["duplicate_files"] deleted_files_in_group = [] for filename in duplicate_files: try: success = await credential_manager.remove_credential(filename, mode=mode) if success: deleted_count += 1 deleted_files_in_group.append(os.path.basename(filename)) log.info(f"去重删除凭证: {filename} (邮箱: {email}) (mode={mode})") else: delete_errors.append(f"{os.path.basename(filename)}: 删除失败") except Exception as e: delete_errors.append(f"{os.path.basename(filename)}: {str(e)}") log.error(f"去重删除凭证 {filename} 时出错: {e}") result_duplicate_groups.append({ "email": email, "kept_file": os.path.basename(kept_file), "deleted_files": deleted_files_in_group, "duplicate_count": len(deleted_files_in_group), }) kept_count = total_count - deleted_count return JSONResponse( content={ "deleted_count": deleted_count, "kept_count": kept_count, "total_count": total_count, "unique_emails_count": duplicate_info.get("unique_email_count", 0), "no_email_count": len(no_email_files), "duplicate_groups": result_duplicate_groups, "delete_errors": delete_errors, "message": f"去重完成:删除 {deleted_count} 个重复凭证,保留 {kept_count} 个凭证({duplicate_info.get('unique_email_count', 0)} 个唯一邮箱)", } ) except Exception as e: log.error(f"批量去重凭证时出错: {e}") return JSONResponse( status_code=500, content={ "deleted_count": 0, "kept_count": 0, "total_count": 0, "message": f"去重操作失败: {str(e)}", } ) # ============================================================================= # 路由处理函数 (Route Handlers) # ============================================================================= @router.post("/auth/upload") async def upload_credentials( files: List[UploadFile] = File(...), token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """批量上传认证文件""" try: mode = validate_mode(mode) return await upload_credentials_common(files, mode=mode) except HTTPException: raise except Exception as e: log.error(f"批量上传失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/creds/status") async def get_creds_status( token: str = Depends(verify_panel_token), offset: int = 0, limit: int = 50, status_filter: str = "all", error_code_filter: str = "all", cooldown_filter: str = "all", mode: str = "geminicli" ): """ 获取凭证文件的状态(轻量级摘要,不包含完整凭证数据,支持分页和状态筛选) Args: offset: 跳过的记录数(默认0) limit: 每页返回的记录数(默认50,可选:20, 50, 100, 200, 500, 1000) status_filter: 状态筛选(all=全部, enabled=仅启用, disabled=仅禁用) error_code_filter: 错误码筛选(all=全部, 或具体错误码如"400", "403") cooldown_filter: 冷却状态筛选(all=全部, in_cooldown=冷却中, no_cooldown=未冷却) mode: 凭证模式(geminicli 或 antigravity) Returns: 包含凭证列表、总数、分页信息的响应 """ try: mode = validate_mode(mode) return await get_creds_status_common( offset, limit, status_filter, mode=mode, error_code_filter=error_code_filter, cooldown_filter=cooldown_filter ) except HTTPException: raise except Exception as e: log.error(f"获取凭证状态失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/creds/detail/{filename}") async def get_cred_detail( filename: str, token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """ 按需获取单个凭证的详细数据(包含完整凭证内容) 用于用户查看/编辑凭证详情 """ try: mode = validate_mode(mode) # 验证文件名 if not filename.endswith(".json"): raise HTTPException(status_code=400, detail="无效的文件名") storage_adapter = await get_storage_adapter() backend_info = await storage_adapter.get_backend_info() backend_type = backend_info.get("backend_type", "unknown") # 获取凭证数据 credential_data = await storage_adapter.get_credential(filename, mode=mode) if not credential_data: raise HTTPException(status_code=404, detail="凭证不存在") # 获取状态信息 file_status = await storage_adapter.get_credential_state(filename, mode=mode) if not file_status: file_status = { "error_codes": [], "disabled": False, "last_success": time.time(), "user_email": None, } result = { "status": file_status, "content": credential_data, "filename": os.path.basename(filename), "backend_type": backend_type, "user_email": file_status.get("user_email"), "model_cooldowns": file_status.get("model_cooldowns", {}), } if backend_type == "file" and os.path.exists(filename): result.update({ "size": os.path.getsize(filename), "modified_time": os.path.getmtime(filename), }) return JSONResponse(content=result) except HTTPException: raise except Exception as e: log.error(f"获取凭证详情失败 {filename}: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/creds/action") async def creds_action( request: CredFileActionRequest, token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """对凭证文件执行操作(启用/禁用/删除)""" try: mode = validate_mode(mode) log.info(f"Received request: {request}") filename = request.filename action = request.action log.info(f"Performing action '{action}' on file: {filename} (mode={mode})") # 验证文件名 if not filename.endswith(".json"): log.error(f"无效的文件名: {filename}(不是.json文件)") raise HTTPException(status_code=400, detail=f"无效的文件名: {filename}") # 获取存储适配器 storage_adapter = await get_storage_adapter() # 对于删除操作,不需要检查凭证数据是否完整,只需检查条目是否存在 # 对于其他操作,需要确保凭证数据存在且完整 if action != "delete": # 检查凭证数据是否存在 credential_data = await storage_adapter.get_credential(filename, mode=mode) if not credential_data: log.error(f"凭证未找到: {filename} (mode={mode})") raise HTTPException(status_code=404, detail="凭证文件不存在") if action == "enable": log.info(f"Web请求: 启用文件 {filename} (mode={mode})") result = await credential_manager.set_cred_disabled(filename, False, mode=mode) log.info(f"[WebRoute] set_cred_disabled 返回结果: {result}") if result: log.info(f"Web请求: 文件 {filename} 已成功启用 (mode={mode})") return JSONResponse(content={"message": f"已启用凭证文件 {os.path.basename(filename)}"}) else: log.error(f"Web请求: 文件 {filename} 启用失败 (mode={mode})") raise HTTPException(status_code=500, detail="启用凭证失败,可能凭证不存在") elif action == "disable": log.info(f"Web请求: 禁用文件 {filename} (mode={mode})") result = await credential_manager.set_cred_disabled(filename, True, mode=mode) log.info(f"[WebRoute] set_cred_disabled 返回结果: {result}") if result: log.info(f"Web请求: 文件 {filename} 已成功禁用 (mode={mode})") return JSONResponse(content={"message": f"已禁用凭证文件 {os.path.basename(filename)}"}) else: log.error(f"Web请求: 文件 {filename} 禁用失败 (mode={mode})") raise HTTPException(status_code=500, detail="禁用凭证失败,可能凭证不存在") elif action == "delete": try: # 使用 CredentialManager 删除凭证(包含队列/状态同步) success = await credential_manager.remove_credential(filename, mode=mode) if success: log.info(f"通过管理器成功删除凭证: {filename} (mode={mode})") return JSONResponse( content={"message": f"已删除凭证文件 {os.path.basename(filename)}"} ) else: raise HTTPException(status_code=500, detail="删除凭证失败") except Exception as e: log.error(f"删除凭证 {filename} 时出错: {e}") raise HTTPException(status_code=500, detail=f"删除文件失败: {str(e)}") else: raise HTTPException(status_code=400, detail="无效的操作类型") except HTTPException: raise except Exception as e: log.error(f"凭证文件操作失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/creds/batch-action") async def creds_batch_action( request: CredFileBatchActionRequest, token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """批量对凭证文件执行操作(启用/禁用/删除)""" try: mode = validate_mode(mode) action = request.action filenames = request.filenames if not filenames: raise HTTPException(status_code=400, detail="文件名列表不能为空") log.info(f"对 {len(filenames)} 个文件执行批量操作 '{action}'") success_count = 0 errors = [] storage_adapter = await get_storage_adapter() for filename in filenames: try: # 验证文件名安全性 if not filename.endswith(".json"): errors.append(f"{filename}: 无效的文件类型") continue # 对于删除操作,不需要检查凭证数据完整性 # 对于其他操作,需要确保凭证数据存在 if action != "delete": credential_data = await storage_adapter.get_credential(filename, mode=mode) if not credential_data: errors.append(f"{filename}: 凭证不存在") continue # 执行相应操作 if action == "enable": await credential_manager.set_cred_disabled(filename, False, mode=mode) success_count += 1 elif action == "disable": await credential_manager.set_cred_disabled(filename, True, mode=mode) success_count += 1 elif action == "delete": try: delete_success = await credential_manager.remove_credential(filename, mode=mode) if delete_success: success_count += 1 log.info(f"成功删除批量中的凭证: {filename}") else: errors.append(f"{filename}: 删除失败") continue except Exception as e: errors.append(f"{filename}: 删除文件失败 - {str(e)}") continue else: errors.append(f"{filename}: 无效的操作类型") continue except Exception as e: log.error(f"处理 {filename} 时出错: {e}") errors.append(f"{filename}: 处理失败 - {str(e)}") continue # 构建返回消息 result_message = f"批量操作完成:成功处理 {success_count}/{len(filenames)} 个文件" if errors: result_message += "\n错误详情:\n" + "\n".join(errors) response_data = { "success_count": success_count, "total_count": len(filenames), "errors": errors, "message": result_message, } return JSONResponse(content=response_data) except HTTPException: raise except Exception as e: log.error(f"批量凭证文件操作失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/creds/download/{filename}") async def download_cred_file( filename: str, token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """下载单个凭证文件""" try: mode = validate_mode(mode) # 验证文件名安全性 if not filename.endswith(".json"): raise HTTPException(status_code=404, detail="无效的文件名") # 获取存储适配器 storage_adapter = await get_storage_adapter() # 从存储系统获取凭证数据 credential_data = await storage_adapter.get_credential(filename, mode=mode) if not credential_data: raise HTTPException(status_code=404, detail="文件不存在") # 转换为JSON字符串 content = json.dumps(credential_data, ensure_ascii=False, indent=2) from fastapi.responses import Response return Response( content=content, media_type="application/json", headers={"Content-Disposition": f"attachment; filename={filename}"}, ) except HTTPException: raise except Exception as e: log.error(f"下载凭证文件失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/creds/fetch-email/{filename}") async def fetch_user_email( filename: str, token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """获取指定凭证文件的用户邮箱地址""" try: mode = validate_mode(mode) return await fetch_user_email_common(filename, mode=mode) except HTTPException: raise except Exception as e: log.error(f"获取用户邮箱失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/creds/refresh-all-emails") async def refresh_all_user_emails( token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """刷新所有凭证文件的用户邮箱地址""" try: mode = validate_mode(mode) return await refresh_all_user_emails_common(mode=mode) except Exception as e: log.error(f"批量获取用户邮箱失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/creds/deduplicate-by-email") async def deduplicate_credentials_by_email( token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """批量去重凭证文件 - 删除邮箱相同的凭证(只保留一个)""" try: mode = validate_mode(mode) return await deduplicate_credentials_by_email_common(mode=mode) except Exception as e: log.error(f"批量去重凭证失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/creds/download-all") async def download_all_creds( token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """ 打包下载所有凭证文件(流式处理,按需加载每个凭证数据) 只在实际下载时才加载完整凭证内容,最大化性能 """ try: mode = validate_mode(mode) return await download_all_creds_common(mode=mode) except HTTPException: raise except Exception as e: log.error(f"打包下载失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.get("/config/get") async def get_config(token: str = Depends(verify_panel_token)): """获取当前配置""" try: # 读取当前配置(包括环境变量和TOML文件中的配置) current_config = {} # 基础配置 current_config["code_assist_endpoint"] = await config.get_code_assist_endpoint() current_config["credentials_dir"] = await config.get_credentials_dir() current_config["proxy"] = await config.get_proxy_config() or "" # 代理端点配置 current_config["oauth_proxy_url"] = await config.get_oauth_proxy_url() current_config["googleapis_proxy_url"] = await config.get_googleapis_proxy_url() current_config["resource_manager_api_url"] = await config.get_resource_manager_api_url() current_config["service_usage_api_url"] = await config.get_service_usage_api_url() current_config["antigravity_api_url"] = await config.get_antigravity_api_url() # 自动封禁配置 current_config["auto_ban_enabled"] = await config.get_auto_ban_enabled() current_config["auto_ban_error_codes"] = await config.get_auto_ban_error_codes() # 429重试配置 current_config["retry_429_max_retries"] = await config.get_retry_429_max_retries() current_config["retry_429_enabled"] = await config.get_retry_429_enabled() current_config["retry_429_interval"] = await config.get_retry_429_interval() # 抗截断配置 current_config["anti_truncation_max_attempts"] = await config.get_anti_truncation_max_attempts() # 兼容性配置 current_config["compatibility_mode_enabled"] = await config.get_compatibility_mode_enabled() # 思维链返回配置 current_config["return_thoughts_to_frontend"] = await config.get_return_thoughts_to_frontend() # Antigravity流式转非流式配置 current_config["antigravity_stream2nostream"] = await config.get_antigravity_stream2nostream() # 服务器配置 current_config["host"] = await config.get_server_host() current_config["port"] = await config.get_server_port() current_config["api_password"] = await config.get_api_password() current_config["panel_password"] = await config.get_panel_password() current_config["password"] = await config.get_server_password() # 从存储系统读取配置 storage_adapter = await get_storage_adapter() storage_config = await storage_adapter.get_all_config() # 获取环境变量锁定的配置键 env_locked_keys = get_env_locked_keys() # 合并存储系统配置(不覆盖环境变量) for key, value in storage_config.items(): if key not in env_locked_keys: current_config[key] = value return JSONResponse(content={"config": current_config, "env_locked": list(env_locked_keys)}) except Exception as e: log.error(f"获取配置失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @router.post("/config/save") async def save_config(request: ConfigSaveRequest, token: str = Depends(verify_panel_token)): """保存配置""" try: new_config = request.config log.debug(f"收到的配置数据: {list(new_config.keys())}") log.debug(f"收到的password值: {new_config.get('password', 'NOT_FOUND')}") # 验证配置项 if "retry_429_max_retries" in new_config: if ( not isinstance(new_config["retry_429_max_retries"], int) or new_config["retry_429_max_retries"] < 0 ): raise HTTPException(status_code=400, detail="最大429重试次数必须是大于等于0的整数") if "retry_429_enabled" in new_config: if not isinstance(new_config["retry_429_enabled"], bool): raise HTTPException(status_code=400, detail="429重试开关必须是布尔值") # 验证新的配置项 if "retry_429_interval" in new_config: try: interval = float(new_config["retry_429_interval"]) if interval < 0.01 or interval > 10: raise HTTPException(status_code=400, detail="429重试间隔必须在0.01-10秒之间") except (ValueError, TypeError): raise HTTPException(status_code=400, detail="429重试间隔必须是有效的数字") if "anti_truncation_max_attempts" in new_config: if ( not isinstance(new_config["anti_truncation_max_attempts"], int) or new_config["anti_truncation_max_attempts"] < 1 or new_config["anti_truncation_max_attempts"] > 10 ): raise HTTPException( status_code=400, detail="抗截断最大重试次数必须是1-10之间的整数" ) if "compatibility_mode_enabled" in new_config: if not isinstance(new_config["compatibility_mode_enabled"], bool): raise HTTPException(status_code=400, detail="兼容性模式开关必须是布尔值") if "return_thoughts_to_frontend" in new_config: if not isinstance(new_config["return_thoughts_to_frontend"], bool): raise HTTPException(status_code=400, detail="思维链返回开关必须是布尔值") if "antigravity_stream2nostream" in new_config: if not isinstance(new_config["antigravity_stream2nostream"], bool): raise HTTPException(status_code=400, detail="Antigravity流式转非流式开关必须是布尔值") # 验证服务器配置 if "host" in new_config: if not isinstance(new_config["host"], str) or not new_config["host"].strip(): raise HTTPException(status_code=400, detail="服务器主机地址不能为空") if "port" in new_config: if ( not isinstance(new_config["port"], int) or new_config["port"] < 1 or new_config["port"] > 65535 ): raise HTTPException(status_code=400, detail="端口号必须是1-65535之间的整数") if "api_password" in new_config: if not isinstance(new_config["api_password"], str): raise HTTPException(status_code=400, detail="API访问密码必须是字符串") if "panel_password" in new_config: if not isinstance(new_config["panel_password"], str): raise HTTPException(status_code=400, detail="控制面板密码必须是字符串") if "password" in new_config: if not isinstance(new_config["password"], str): raise HTTPException(status_code=400, detail="访问密码必须是字符串") # 获取环境变量锁定的配置键 env_locked_keys = get_env_locked_keys() # 直接使用存储适配器保存配置 storage_adapter = await get_storage_adapter() for key, value in new_config.items(): if key not in env_locked_keys: await storage_adapter.set_config(key, value) if key in ("password", "api_password", "panel_password"): log.debug(f"设置{key}字段为: {value}") # 重新加载配置缓存(关键!) await config.reload_config() # 验证保存后的结果 test_api_password = await config.get_api_password() test_panel_password = await config.get_panel_password() test_password = await config.get_server_password() log.debug(f"保存后立即读取的API密码: {test_api_password}") log.debug(f"保存后立即读取的面板密码: {test_panel_password}") log.debug(f"保存后立即读取的通用密码: {test_password}") # 构建响应消息 response_data = { "message": "配置保存成功", "saved_config": {k: v for k, v in new_config.items() if k not in env_locked_keys}, } return JSONResponse(content=response_data) except HTTPException: raise except Exception as e: log.error(f"保存配置失败: {e}") raise HTTPException(status_code=500, detail=str(e)) # ============================================================================= # 实时日志WebSocket (Real-time Logs WebSocket) # ============================================================================= @router.post("/auth/logs/clear") async def clear_logs(token: str = Depends(verify_panel_token)): """清空日志文件""" try: # 直接使用环境变量获取日志文件路径 log_file_path = os.getenv("LOG_FILE", "log.txt") # 检查日志文件是否存在 if os.path.exists(log_file_path): try: # 清空文件内容(保留文件),确保以UTF-8编码写入 with open(log_file_path, "w", encoding="utf-8", newline="") as f: f.write("") f.flush() # 强制刷新到磁盘 log.info(f"日志文件已清空: {log_file_path}") # 通知所有WebSocket连接日志已清空 await manager.broadcast("--- 日志文件已清空 ---") return JSONResponse( content={"message": f"日志文件已清空: {os.path.basename(log_file_path)}"} ) except Exception as e: log.error(f"清空日志文件失败: {e}") raise HTTPException(status_code=500, detail=f"清空日志文件失败: {str(e)}") else: return JSONResponse(content={"message": "日志文件不存在"}) except Exception as e: log.error(f"清空日志文件失败: {e}") raise HTTPException(status_code=500, detail=f"清空日志文件失败: {str(e)}") @router.get("/auth/logs/download") async def download_logs(token: str = Depends(verify_panel_token)): """下载日志文件""" try: # 直接使用环境变量获取日志文件路径 log_file_path = os.getenv("LOG_FILE", "log.txt") # 检查日志文件是否存在 if not os.path.exists(log_file_path): raise HTTPException(status_code=404, detail="日志文件不存在") # 检查文件是否为空 file_size = os.path.getsize(log_file_path) if file_size == 0: raise HTTPException(status_code=404, detail="日志文件为空") # 生成文件名(包含时间戳) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"gcli2api_logs_{timestamp}.txt" log.info(f"下载日志文件: {log_file_path}") return FileResponse( path=log_file_path, filename=filename, media_type="text/plain", headers={"Content-Disposition": f"attachment; filename={filename}"}, ) except HTTPException: raise except Exception as e: log.error(f"下载日志文件失败: {e}") raise HTTPException(status_code=500, detail=f"下载日志文件失败: {str(e)}") @router.websocket("/auth/logs/stream") async def websocket_logs(websocket: WebSocket): """WebSocket端点,用于实时日志流""" # 检查连接数限制 if not await manager.connect(websocket): return try: # 直接使用环境变量获取日志文件路径 log_file_path = os.getenv("LOG_FILE", "log.txt") # 发送初始日志(限制为最后50行,减少内存占用) if os.path.exists(log_file_path): try: with open(log_file_path, "r", encoding="utf-8") as f: lines = f.readlines() # 只发送最后50行,减少初始内存消耗 for line in lines[-50:]: if line.strip(): await websocket.send_text(line.strip()) except Exception as e: await websocket.send_text(f"Error reading log file: {e}") # 监控日志文件变化 last_size = os.path.getsize(log_file_path) if os.path.exists(log_file_path) else 0 max_read_size = 8192 # 限制单次读取大小为8KB,防止大量日志造成内存激增 check_interval = 2 # 增加检查间隔,减少CPU和I/O开销 # 创建后台任务监听客户端断开 # 即使没有日志更新,receive_text() 也能即时感知断开 async def listen_for_disconnect(): try: while True: await websocket.receive_text() except Exception: pass listener_task = asyncio.create_task(listen_for_disconnect()) try: while websocket.client_state == WebSocketState.CONNECTED: # 使用 asyncio.wait 同时等待定时器和断开信号 # timeout=check_interval 替代了 asyncio.sleep done, pending = await asyncio.wait( [listener_task], timeout=check_interval, return_when=asyncio.FIRST_COMPLETED ) # 如果监听任务结束(通常是因为连接断开),则退出循环 if listener_task in done: break if os.path.exists(log_file_path): current_size = os.path.getsize(log_file_path) if current_size > last_size: # 限制读取大小,防止单次读取过多内容 read_size = min(current_size - last_size, max_read_size) try: with open(log_file_path, "r", encoding="utf-8", errors="replace") as f: f.seek(last_size) new_content = f.read(read_size) # 处理编码错误的情况 if not new_content: last_size = current_size continue # 分行发送,避免发送不完整的行 lines = new_content.splitlines(keepends=True) if lines: # 如果最后一行没有换行符,保留到下次处理 if not lines[-1].endswith("\n") and len(lines) > 1: # 除了最后一行,其他都发送 for line in lines[:-1]: if line.strip(): await websocket.send_text(line.rstrip()) # 更新位置,但要退回最后一行的字节数 last_size += len(new_content.encode("utf-8")) - len( lines[-1].encode("utf-8") ) else: # 所有行都发送 for line in lines: if line.strip(): await websocket.send_text(line.rstrip()) last_size += len(new_content.encode("utf-8")) except UnicodeDecodeError as e: # 遇到编码错误时,跳过这部分内容 log.warning(f"WebSocket日志读取编码错误: {e}, 跳过部分内容") last_size = current_size except Exception as e: await websocket.send_text(f"Error reading new content: {e}") # 发生其他错误时,重置文件位置 last_size = current_size # 如果文件被截断(如清空日志),重置位置 elif current_size < last_size: last_size = 0 await websocket.send_text("--- 日志已清空 ---") finally: # 确保清理监听任务 if not listener_task.done(): listener_task.cancel() try: await listener_task except asyncio.CancelledError: pass except WebSocketDisconnect: pass except Exception as e: log.error(f"WebSocket logs error: {e}") finally: manager.disconnect(websocket) async def verify_credential_project_common(filename: str, mode: str = "geminicli") -> JSONResponse: """验证并重新获取凭证的project id的通用函数""" mode = validate_mode(mode) # 验证文件名 if not filename.endswith(".json"): raise HTTPException(status_code=400, detail="无效的文件名") storage_adapter = await get_storage_adapter() # 获取凭证数据 credential_data = await storage_adapter.get_credential(filename, mode=mode) if not credential_data: raise HTTPException(status_code=404, detail="凭证不存在") # 创建凭证对象 credentials = Credentials.from_dict(credential_data) # 确保token有效(自动刷新) token_refreshed = await credentials.refresh_if_needed() # 如果token被刷新了,更新存储 if token_refreshed: log.info(f"Token已自动刷新: {filename} (mode={mode})") credential_data = credentials.to_dict() await storage_adapter.store_credential(filename, credential_data, mode=mode) # 获取API端点和对应的User-Agent if mode == "antigravity": api_base_url = await get_antigravity_api_url() user_agent = ANTIGRAVITY_USER_AGENT else: api_base_url = await get_code_assist_endpoint() user_agent = GEMINICLI_USER_AGENT # 重新获取project id project_id = await fetch_project_id( access_token=credentials.access_token, user_agent=user_agent, api_base_url=api_base_url ) if project_id: # 更新凭证数据中的project_id credential_data["project_id"] = project_id await storage_adapter.store_credential(filename, credential_data, mode=mode) # 检验成功后自动解除禁用状态并清除错误码 await storage_adapter.update_credential_state(filename, { "disabled": False, "error_codes": [] }, mode=mode) log.info(f"检验 {mode} 凭证成功: {filename} - Project ID: {project_id} - 已解除禁用并清除错误码") return JSONResponse(content={ "success": True, "filename": filename, "project_id": project_id, "message": "检验成功!Project ID已更新,已解除禁用状态并清除错误码,403错误应该已恢复" }) else: return JSONResponse( status_code=400, content={ "success": False, "filename": filename, "message": "检验失败:无法获取Project ID,请检查凭证是否有效" } ) @router.post("/creds/verify-project/{filename}") async def verify_credential_project( filename: str, token: str = Depends(verify_panel_token), mode: str = "geminicli" ): """ 检验凭证的project id,重新获取project id 检验成功可以使403错误恢复 """ try: mode = validate_mode(mode) return await verify_credential_project_common(filename, mode=mode) except HTTPException: raise except Exception as e: log.error(f"检验凭证Project ID失败 {filename}: {e}") raise HTTPException(status_code=500, detail=f"检验失败: {str(e)}") @router.get("/creds/quota/{filename}") async def get_credential_quota( filename: str, token: str = Depends(verify_panel_token), mode: str = "antigravity" ): """ 获取指定凭证的额度信息(仅支持 antigravity 模式) """ try: mode = validate_mode(mode) # 验证文件名 if not filename.endswith(".json"): raise HTTPException(status_code=400, detail="无效的文件名") storage_adapter = await get_storage_adapter() # 获取凭证数据 credential_data = await storage_adapter.get_credential(filename, mode=mode) if not credential_data: raise HTTPException(status_code=404, detail="凭证不存在") # 使用 Credentials 对象自动处理 token 刷新 from .google_oauth_api import Credentials creds = Credentials.from_dict(credential_data) # 自动刷新 token(如果需要) await creds.refresh_if_needed() # 如果 token 被刷新了,更新存储 updated_data = creds.to_dict() if updated_data != credential_data: log.info(f"Token已自动刷新: {filename}") await storage_adapter.store_credential(filename, updated_data, mode=mode) credential_data = updated_data # 获取访问令牌 access_token = credential_data.get("access_token") or credential_data.get("token") if not access_token: raise HTTPException(status_code=400, detail="凭证中没有访问令牌") # 获取额度信息 quota_info = await fetch_quota_info(access_token) if quota_info.get("success"): return JSONResponse(content={ "success": True, "filename": filename, "models": quota_info.get("models", {}) }) else: return JSONResponse( status_code=400, content={ "success": False, "filename": filename, "error": quota_info.get("error", "未知错误") } ) except HTTPException: raise except Exception as e: log.error(f"获取凭证额度失败 {filename}: {e}") raise HTTPException(status_code=500, detail=f"获取额度失败: {str(e)}") @router.get("/version/info") async def get_version_info(check_update: bool = False): """ 获取当前版本信息 - 从version.txt读取 可选参数 check_update: 是否检查GitHub上的最新版本 """ try: # 获取项目根目录 project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) version_file = os.path.join(project_root, "version.txt") # 读取version.txt if not os.path.exists(version_file): return JSONResponse({ "success": False, "error": "version.txt文件不存在" }) version_data = {} with open(version_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if '=' in line: key, value = line.split('=', 1) version_data[key] = value # 检查必要字段 if 'short_hash' not in version_data: return JSONResponse({ "success": False, "error": "version.txt格式错误" }) response_data = { "success": True, "version": version_data.get('short_hash', 'unknown'), "full_hash": version_data.get('full_hash', ''), "message": version_data.get('message', ''), "date": version_data.get('date', '') } # 如果需要检查更新 if check_update: try: from src.httpx_client import get_async # 直接获取GitHub上的version.txt文件 github_version_url = "https://raw.githubusercontent.com/su-kaka/gcli2api/refs/heads/master/version.txt" # 使用统一的httpx客户端 resp = await get_async(github_version_url, timeout=10.0) if resp.status_code == 200: # 解析远程version.txt remote_version_data = {} for line in resp.text.strip().split('\n'): line = line.strip() if '=' in line: key, value = line.split('=', 1) remote_version_data[key] = value latest_hash = remote_version_data.get('full_hash', '') latest_short_hash = remote_version_data.get('short_hash', '') current_hash = version_data.get('full_hash', '') has_update = (current_hash != latest_hash) if current_hash and latest_hash else None response_data['check_update'] = True response_data['has_update'] = has_update response_data['latest_version'] = latest_short_hash response_data['latest_hash'] = latest_hash response_data['latest_message'] = remote_version_data.get('message', '') response_data['latest_date'] = remote_version_data.get('date', '') else: # GitHub获取失败,但不影响基本版本信息 response_data['check_update'] = False response_data['update_error'] = f"GitHub返回错误: {resp.status_code}" except Exception as e: log.debug(f"检查更新失败: {e}") response_data['check_update'] = False response_data['update_error'] = str(e) return JSONResponse(response_data) except Exception as e: log.error(f"获取版本信息失败: {e}") return JSONResponse({ "success": False, "error": str(e) })