antigravity-proxy / proxy_service.py
asemxin
Initial commit: Antigravity API Proxy
ec41d51
"""
API 代理服务 - 调用 Google Antigravity API
"""
import httpx
import json
import uuid
from typing import AsyncGenerator
from models import OpenAIChatRequest, Account
from protocol_converter import convert_openai_to_gemini, map_model_name, convert_gemini_to_openai_chunk
from load_balancer import load_balancer
# Google Antigravity 内部 API 端点
GEMINI_API_URL = "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal:streamGenerateContent"
async def stream_chat_completion(
request: OpenAIChatRequest,
account: Account
) -> AsyncGenerator[str, None]:
"""
流式 Chat Completion
1. 将 OpenAI 请求转换为 Gemini 格式
2. 调用 Google Antigravity API
3. 将 Gemini 响应转换回 OpenAI 格式
"""
# 转换请求
gemini_request = convert_openai_to_gemini(request)
# 映射模型名
upstream_model = map_model_name(request.model)
# 构建请求体
request_body = {
"project": account.token.project_id or "default-project",
"requestId": str(uuid.uuid4()),
"model": upstream_model,
"userAgent": "antigravity-hf",
"request": {
"contents": gemini_request["contents"],
"systemInstruction": gemini_request["systemInstruction"],
"generationConfig": gemini_request["generationConfig"],
"sessionId": account.token.session_id
}
}
# 发送请求
url = f"{GEMINI_API_URL}?alt=sse"
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream(
"POST",
url,
json=request_body,
headers={
"Authorization": f"Bearer {account.token.access_token}",
"Host": "daily-cloudcode-pa.sandbox.googleapis.com",
"User-Agent": "antigravity-hf/1.0",
"Content-Type": "application/json"
}
) as response:
if response.status_code != 200:
error_text = await response.aread()
error_msg = f"上游服务错误 ({response.status_code}): {error_text.decode()}"
# 标记错误
load_balancer.mark_account_error(
account.id,
response.status_code,
error_msg
)
# 返回错误
yield f"data: {json.dumps({'error': error_msg})}\n\n"
return
# 处理 SSE 流
buffer = ""
async for chunk in response.aiter_text():
buffer += chunk
# 按行解析 SSE 事件
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.strip()
if not line:
continue
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
yield "data: [DONE]\n\n"
break
try:
gemini_data = json.loads(data)
openai_chunk = convert_gemini_to_openai_chunk(
gemini_data,
request.model
)
yield f"data: {json.dumps(openai_chunk)}\n\n"
except json.JSONDecodeError:
continue
# 标记成功
load_balancer.mark_account_success(account.id)
async def chat_completion(
request: OpenAIChatRequest,
account: Account
) -> dict:
"""
非流式 Chat Completion
"""
from protocol_converter import convert_gemini_to_openai_response
# 转换请求
gemini_request = convert_openai_to_gemini(request)
upstream_model = map_model_name(request.model)
request_body = {
"project": account.token.project_id or "default-project",
"requestId": str(uuid.uuid4()),
"model": upstream_model,
"userAgent": "antigravity-hf",
"request": {
"contents": gemini_request["contents"],
"systemInstruction": gemini_request["systemInstruction"],
"generationConfig": gemini_request["generationConfig"],
"sessionId": account.token.session_id
}
}
# 非流式 URL
url = "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal:generateContent"
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
url,
json=request_body,
headers={
"Authorization": f"Bearer {account.token.access_token}",
"Host": "daily-cloudcode-pa.sandbox.googleapis.com",
"User-Agent": "antigravity-hf/1.0",
"Content-Type": "application/json"
}
)
if response.status_code != 200:
error_msg = f"上游服务错误 ({response.status_code}): {response.text}"
load_balancer.mark_account_error(account.id, response.status_code, error_msg)
return {"error": error_msg}
load_balancer.mark_account_success(account.id)
gemini_data = response.json()
return convert_gemini_to_openai_response(gemini_data, request.model)