Spaces:
Sleeping
Sleeping
File size: 5,600 Bytes
ec41d51 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
"""
API 代理服务 - 调用 Google Antigravity API
"""
import httpx
import json
import uuid
from typing import AsyncGenerator
from models import OpenAIChatRequest, Account
from protocol_converter import convert_openai_to_gemini, map_model_name, convert_gemini_to_openai_chunk
from load_balancer import load_balancer
# Google Antigravity 内部 API 端点
GEMINI_API_URL = "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal:streamGenerateContent"
async def stream_chat_completion(
request: OpenAIChatRequest,
account: Account
) -> AsyncGenerator[str, None]:
"""
流式 Chat Completion
1. 将 OpenAI 请求转换为 Gemini 格式
2. 调用 Google Antigravity API
3. 将 Gemini 响应转换回 OpenAI 格式
"""
# 转换请求
gemini_request = convert_openai_to_gemini(request)
# 映射模型名
upstream_model = map_model_name(request.model)
# 构建请求体
request_body = {
"project": account.token.project_id or "default-project",
"requestId": str(uuid.uuid4()),
"model": upstream_model,
"userAgent": "antigravity-hf",
"request": {
"contents": gemini_request["contents"],
"systemInstruction": gemini_request["systemInstruction"],
"generationConfig": gemini_request["generationConfig"],
"sessionId": account.token.session_id
}
}
# 发送请求
url = f"{GEMINI_API_URL}?alt=sse"
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream(
"POST",
url,
json=request_body,
headers={
"Authorization": f"Bearer {account.token.access_token}",
"Host": "daily-cloudcode-pa.sandbox.googleapis.com",
"User-Agent": "antigravity-hf/1.0",
"Content-Type": "application/json"
}
) as response:
if response.status_code != 200:
error_text = await response.aread()
error_msg = f"上游服务错误 ({response.status_code}): {error_text.decode()}"
# 标记错误
load_balancer.mark_account_error(
account.id,
response.status_code,
error_msg
)
# 返回错误
yield f"data: {json.dumps({'error': error_msg})}\n\n"
return
# 处理 SSE 流
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
# 按行解析 SSE 事件
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
yield "data: [DONE]\n\n"
break
try:
gemini_data = json.loads(data)
openai_chunk = convert_gemini_to_openai_chunk(
gemini_data,
request.model
)
yield f"data: {json.dumps(openai_chunk)}\n\n"
except json.JSONDecodeError:
continue
# 标记成功
load_balancer.mark_account_success(account.id)
async def chat_completion(
request: OpenAIChatRequest,
account: Account
) -> dict:
"""
非流式 Chat Completion
"""
from protocol_converter import convert_gemini_to_openai_response
# 转换请求
gemini_request = convert_openai_to_gemini(request)
upstream_model = map_model_name(request.model)
request_body = {
"project": account.token.project_id or "default-project",
"requestId": str(uuid.uuid4()),
"model": upstream_model,
"userAgent": "antigravity-hf",
"request": {
"contents": gemini_request["contents"],
"systemInstruction": gemini_request["systemInstruction"],
"generationConfig": gemini_request["generationConfig"],
"sessionId": account.token.session_id
}
}
# 非流式 URL
url = "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal:generateContent"
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
url,
json=request_body,
headers={
"Authorization": f"Bearer {account.token.access_token}",
"Host": "daily-cloudcode-pa.sandbox.googleapis.com",
"User-Agent": "antigravity-hf/1.0",
"Content-Type": "application/json"
}
)
if response.status_code != 200:
error_msg = f"上游服务错误 ({response.status_code}): {response.text}"
load_balancer.mark_account_error(account.id, response.status_code, error_msg)
return {"error": error_msg}
load_balancer.mark_account_success(account.id)
gemini_data = response.json()
return convert_gemini_to_openai_response(gemini_data, request.model)
|