""" API 代理服务 - 调用 Google Antigravity API """ import httpx import json import uuid from typing import AsyncGenerator from models import OpenAIChatRequest, Account from protocol_converter import convert_openai_to_gemini, map_model_name, convert_gemini_to_openai_chunk from load_balancer import load_balancer # Google Antigravity 内部 API 端点 GEMINI_API_URL = "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal:streamGenerateContent" async def stream_chat_completion( request: OpenAIChatRequest, account: Account ) -> AsyncGenerator[str, None]: """ 流式 Chat Completion 1. 将 OpenAI 请求转换为 Gemini 格式 2. 调用 Google Antigravity API 3. 将 Gemini 响应转换回 OpenAI 格式 """ # 转换请求 gemini_request = convert_openai_to_gemini(request) # 映射模型名 upstream_model = map_model_name(request.model) # 构建请求体 request_body = { "project": account.token.project_id or "default-project", "requestId": str(uuid.uuid4()), "model": upstream_model, "userAgent": "antigravity-hf", "request": { "contents": gemini_request["contents"], "systemInstruction": gemini_request["systemInstruction"], "generationConfig": gemini_request["generationConfig"], "sessionId": account.token.session_id } } # 发送请求 url = f"{GEMINI_API_URL}?alt=sse" async with httpx.AsyncClient(timeout=120.0) as client: async with client.stream( "POST", url, json=request_body, headers={ "Authorization": f"Bearer {account.token.access_token}", "Host": "daily-cloudcode-pa.sandbox.googleapis.com", "User-Agent": "antigravity-hf/1.0", "Content-Type": "application/json" } ) as response: if response.status_code != 200: error_text = await response.aread() error_msg = f"上游服务错误 ({response.status_code}): {error_text.decode()}" # 标记错误 load_balancer.mark_account_error( account.id, response.status_code, error_msg ) # 返回错误 yield f"data: {json.dumps({'error': error_msg})}\n\n" return # 处理 SSE 流 buffer = "" async for chunk in response.aiter_text(): buffer += chunk # 按行解析 SSE 事件 while "\n" in buffer: line, buffer = buffer.split("\n", 1) line = line.strip() if not line: continue if line.startswith("data: "): data = line[6:] if data == "[DONE]": yield "data: [DONE]\n\n" break try: gemini_data = json.loads(data) openai_chunk = convert_gemini_to_openai_chunk( gemini_data, request.model ) yield f"data: {json.dumps(openai_chunk)}\n\n" except json.JSONDecodeError: continue # 标记成功 load_balancer.mark_account_success(account.id) async def chat_completion( request: OpenAIChatRequest, account: Account ) -> dict: """ 非流式 Chat Completion """ from protocol_converter import convert_gemini_to_openai_response # 转换请求 gemini_request = convert_openai_to_gemini(request) upstream_model = map_model_name(request.model) request_body = { "project": account.token.project_id or "default-project", "requestId": str(uuid.uuid4()), "model": upstream_model, "userAgent": "antigravity-hf", "request": { "contents": gemini_request["contents"], "systemInstruction": gemini_request["systemInstruction"], "generationConfig": gemini_request["generationConfig"], "sessionId": account.token.session_id } } # 非流式 URL url = "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal:generateContent" async with httpx.AsyncClient(timeout=120.0) as client: response = await client.post( url, json=request_body, headers={ "Authorization": f"Bearer {account.token.access_token}", "Host": "daily-cloudcode-pa.sandbox.googleapis.com", "User-Agent": "antigravity-hf/1.0", "Content-Type": "application/json" } ) if response.status_code != 200: error_msg = f"上游服务错误 ({response.status_code}): {response.text}" load_balancer.mark_account_error(account.id, response.status_code, error_msg) return {"error": error_msg} load_balancer.mark_account_success(account.id) gemini_data = response.json() return convert_gemini_to_openai_response(gemini_data, request.model)