gcli2api / src /api /geminicli.py
a3216's picture
sync: github -> hf space
c50496f
"""
GeminiCli API Client - Handles all communication with GeminiCli API.
This module is used by both OpenAI compatibility layer and native Gemini endpoints.
GeminiCli API 客户端 - 处理与 GeminiCli API 的所有通信
"""
import sys
from pathlib import Path
# 添加项目根目录到Python路径(用于直接运行测试)
if __name__ == "__main__":
project_root = Path(__file__).resolve().parent.parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
import asyncio
import json
from typing import Any, Dict, Optional, Callable, Tuple
from fastapi import Response
from config import get_code_assist_endpoint, get_auto_ban_error_codes
from log import log
from src.credential_manager import credential_manager
from src.httpx_client import stream_post_async, post_async
# 导入共同的基础功能
from src.api.utils import (
handle_error_with_retry,
get_retry_config,
record_api_call_success,
record_api_call_error,
parse_and_log_cooldown,
)
from src.utils import get_geminicli_user_agent
# ==================== 全局凭证管理器 ====================
# 使用全局单例 credential_manager,自动初始化
# ==================== 请求准备 ====================
async def prepare_request_headers_and_payload(
payload: dict, credential_data: dict, target_url: str
):
"""
从凭证数据准备请求头和最终payload
Args:
payload: 原始请求payload
credential_data: 凭证数据字典
target_url: 目标URL
Returns:
元组: (headers, final_payload, target_url)
Raises:
Exception: 如果凭证中缺少必要字段
"""
token = credential_data.get("token") or credential_data.get("access_token", "")
if not token:
raise Exception("凭证中没有找到有效的访问令牌(token或access_token字段)")
source_request = payload.get("request", {})
# 内部API使用Bearer Token和项目ID
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": get_geminicli_user_agent(payload.get("model", "")),
}
project_id = credential_data.get("project_id", "")
if not project_id:
raise Exception("项目ID不存在于凭证数据中")
final_payload = {
"model": payload.get("model"),
"project": project_id,
"request": source_request,
}
return headers, final_payload, target_url
def _is_retryable_status(status_code: int, disable_error_codes: list[int]) -> bool:
"""统一判断是否属于可重试状态码。"""
return status_code in (429, 503) or status_code in disable_error_codes
async def _switch_credential_for_retry(
*,
next_cred_task: Optional[asyncio.Task],
retry_interval: float,
refresh_credential_fast: Callable[[], Any],
apply_cred_result: Callable[[Tuple[str, Dict[str, Any]]], bool],
log_prefix: str,
) -> Tuple[bool, Optional[asyncio.Task]]:
"""优先使用预热凭证,失败后退回同步刷新。"""
if next_cred_task is not None:
try:
cred_result = await next_cred_task
next_cred_task = None
if cred_result and apply_cred_result(cred_result):
await asyncio.sleep(retry_interval)
return True, next_cred_task
except Exception as e:
log.warning(f"{log_prefix} 预热凭证任务失败: {e}")
next_cred_task = None
await asyncio.sleep(retry_interval)
if await refresh_credential_fast():
return True, next_cred_task
return False, next_cred_task
# ==================== 新的流式和非流式请求函数 ====================
async def stream_request(
body: Dict[str, Any],
native: bool = False,
headers: Optional[Dict[str, str]] = None,
):
"""
流式请求函数
Args:
body: 请求体
native: 是否返回原生bytes流,False则返回str流
headers: 额外的请求头
Yields:
Response对象(错误时)或 bytes流/str流(成功时)
"""
# 获取有效凭证
model_name = body.get("model", "")
# 1. 获取有效凭证
cred_result = await credential_manager.get_valid_credential(
mode="geminicli", model_name=model_name
)
if not cred_result:
# 如果返回值是None,直接返回错误500
yield Response(
content=json.dumps({"error": "当前无可用凭证"}),
status_code=500,
media_type="application/json"
)
return
current_file, credential_data = cred_result
# 2. 构建URL和请求头
try:
auth_headers, final_payload, target_url = await prepare_request_headers_and_payload(
body, credential_data,
f"{await get_code_assist_endpoint()}/v1internal:streamGenerateContent?alt=sse"
)
# 合并自定义headers
if headers:
auth_headers.update(headers)
except Exception as e:
log.error(f"准备请求失败: {e}")
yield Response(
content=json.dumps({"error": f"准备请求失败: {str(e)}"}),
status_code=500,
media_type="application/json"
)
return
# 3. 调用stream_post_async进行请求
retry_config = await get_retry_config()
max_retries = retry_config["max_retries"]
retry_interval = retry_config["retry_interval"]
DISABLE_ERROR_CODES = await get_auto_ban_error_codes() # 禁用凭证的错误码
last_error_response = None # 记录最后一次的错误响应
next_cred_task = None # 预热的下一个凭证任务
# 内部函数:快速更新凭证(只更新token和project_id,避免重建整个请求)
async def refresh_credential_fast():
nonlocal current_file, credential_data, auth_headers, final_payload
cred_result = await credential_manager.get_valid_credential(
mode="geminicli", model_name=model_name
)
if not cred_result:
return None
current_file, credential_data = cred_result
try:
# 只更新token和project_id,不重建整个headers和payload
token = credential_data.get("token") or credential_data.get("access_token", "")
project_id = credential_data.get("project_id", "")
if not token or not project_id:
return None
# 直接更新现有的headers和payload
auth_headers["Authorization"] = f"Bearer {token}"
final_payload["project"] = project_id
return True
except Exception:
return None
def apply_cred_result(cred_result: Tuple[str, Dict[str, Any]]) -> bool:
nonlocal current_file, credential_data, auth_headers, final_payload
current_file, credential_data = cred_result
token = credential_data.get("token") or credential_data.get("access_token", "")
project_id = credential_data.get("project_id", "")
if not token or not project_id:
return False
auth_headers["Authorization"] = f"Bearer {token}"
final_payload["project"] = project_id
return True
for attempt in range(max_retries + 1):
success_recorded = False # 标记是否已记录成功
need_retry = False # 标记是否需要重试
try:
async for chunk in stream_post_async(
url=target_url,
body=final_payload,
native=native,
headers=auth_headers
):
# 判断是否是Response对象
if isinstance(chunk, Response):
status_code = chunk.status_code
last_error_response = chunk # 记录最后一次错误
# 缓存错误解析结果,避免重复decode
error_body = None
try:
error_body = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body)
except Exception:
error_body = ""
# 如果错误码是429、503或者在禁用码当中,做好记录后进行重试
if _is_retryable_status(status_code, DISABLE_ERROR_CODES):
log.warning(f"[GEMINICLI STREAM] 流式请求失败 (status={status_code}), 凭证: {current_file}, 响应: {error_body[:500] if error_body else '无'}")
# 解析冷却时间
cooldown_until = None
if (status_code == 429 or status_code == 503) and error_body:
try:
cooldown_until = await parse_and_log_cooldown(error_body, mode="geminicli")
except Exception:
pass
# 预热下一个凭证
if next_cred_task is None and attempt < max_retries:
next_cred_task = asyncio.create_task(
credential_manager.get_valid_credential(
mode="geminicli", model_name=model_name
)
)
# 记录错误并切换凭证
await record_api_call_error(
credential_manager, current_file, status_code,
cooldown_until, mode="geminicli", model_name=model_name,
error_message=error_body
)
# 检查是否应该重试
should_retry = await handle_error_with_retry(
credential_manager, status_code, current_file,
retry_config["retry_enabled"], attempt, max_retries, retry_interval,
mode="geminicli"
)
if should_retry and attempt < max_retries:
need_retry = True
break # 跳出内层循环,准备重试
else:
# 不重试,直接返回原始错误
log.error(f"[GEMINICLI STREAM] 达到最大重试次数或不应重试,返回原始错误")
yield chunk
return
elif status_code == 404 and "preview" in model_name.lower():
# 特殊处理:preview模型返回404,说明该凭证不支持preview模型
log.warning(f"[GEMINICLI STREAM] Preview模型404错误,凭证不支持preview: {current_file}")
# 将该凭证的preview状态设置为False
try:
await credential_manager.update_credential_state(
current_file, {"preview": False}, mode="geminicli"
)
log.info(f"[GEMINICLI STREAM] 已将凭证 {current_file} 的preview状态设置为False")
except Exception as e:
log.error(f"[GEMINICLI STREAM] 更新凭证preview状态失败: {e}")
# 记录404错误
await record_api_call_error(
credential_manager, current_file, status_code,
None, mode="geminicli", model_name=model_name,
error_message=error_body
)
# 预热下一个凭证(会自动跳过preview=False的凭证)
if next_cred_task is None and attempt < max_retries:
next_cred_task = asyncio.create_task(
credential_manager.get_valid_credential(
mode="geminicli", model_name=model_name
)
)
# 触发重试
if attempt < max_retries:
need_retry = True
break
else:
log.error(f"[GEMINICLI STREAM] 达到最大重试次数,返回404错误")
yield chunk
return
else:
# 错误码不在禁用码当中,直接返回,无需重试
log.error(f"[GEMINICLI STREAM] 流式请求失败,非重试错误码 (status={status_code}), 凭证: {current_file}, 响应: {error_body[:500] if error_body else '无'}")
await record_api_call_error(
credential_manager, current_file, status_code,
None, mode="geminicli", model_name=model_name,
error_message=error_body
)
yield chunk
return
else:
# 不是Response,说明是真流,直接yield返回
# 只在第一个chunk时记录成功
if not success_recorded:
await record_api_call_success(
credential_manager, current_file, mode="geminicli", model_name=model_name
)
success_recorded = True
log.debug(f"[GEMINICLI STREAM] 开始接收流式响应,模型: {model_name}")
yield chunk
# 流式请求完成,检查结果
if success_recorded:
log.debug(f"[GEMINICLI STREAM] 流式响应完成,模型: {model_name}")
return
# 统一处理重试
if need_retry:
# 如果已经是最后一次尝试,不再重试,直接返回错误
if attempt >= max_retries:
log.error(f"[GEMINICLI STREAM] 达到最大重试次数,返回错误")
if last_error_response:
yield last_error_response
else:
yield Response(
content=json.dumps({"error": "请求失败,所有重试均已耗尽"}),
status_code=429,
media_type="application/json"
)
return
log.info(f"[GEMINICLI STREAM] 重试请求 (attempt {attempt + 2}/{max_retries + 1})...")
switched, next_cred_task = await _switch_credential_for_retry(
next_cred_task=next_cred_task,
retry_interval=retry_interval,
refresh_credential_fast=refresh_credential_fast,
apply_cred_result=apply_cred_result,
log_prefix="[GEMINICLI STREAM]",
)
if not switched:
log.error("[GEMINICLI STREAM] 重试时无可用凭证或刷新失败")
yield Response(
content=json.dumps({"error": "当前无可用凭证"}),
status_code=500,
media_type="application/json"
)
return
continue # 重试
except Exception as e:
log.error(f"[GEMINICLI STREAM] 流式请求异常: {e}, 凭证: {current_file}")
if attempt < max_retries:
log.info(f"[GEMINICLI STREAM] 异常后重试 (attempt {attempt + 2}/{max_retries + 1})...")
await asyncio.sleep(retry_interval)
continue
else:
# 所有重试都失败,返回最后一次的错误(如果有)
log.error(f"[GEMINICLI STREAM] 所有重试均失败,最后异常: {e}")
if last_error_response:
yield last_error_response
else:
# 如果没有记录到错误响应,返回500错误
yield Response(
content=json.dumps({"error": f"流式请求异常: {str(e)}"}),
status_code=500,
media_type="application/json"
)
return
# 所有重试均已耗尽(for循环正常结束),返回最后记录的错误
log.error("[GEMINICLI STREAM] 所有重试均失败")
if last_error_response:
yield last_error_response
else:
yield Response(
content=json.dumps({"error": "请求失败,所有重试均已耗尽"}),
status_code=429,
media_type="application/json"
)
async def non_stream_request(
body: Dict[str, Any],
headers: Optional[Dict[str, str]] = None,
) -> Response:
"""
非流式请求函数
Args:
body: 请求体
native: 保留参数以保持接口一致性(实际未使用)
headers: 额外的请求头
Returns:
Response对象
"""
# 获取有效凭证
model_name = body.get("model", "")
# 1. 获取有效凭证
cred_result = await credential_manager.get_valid_credential(
mode="geminicli", model_name=model_name
)
if not cred_result:
# 如果返回值是None,直接返回错误500
return Response(
content=json.dumps({"error": "当前无可用凭证"}),
status_code=500,
media_type="application/json"
)
current_file, credential_data = cred_result
# 2. 构建URL和请求头
try:
auth_headers, final_payload, target_url = await prepare_request_headers_and_payload(
body, credential_data,
f"{await get_code_assist_endpoint()}/v1internal:generateContent"
)
# 合并自定义headers
if headers:
auth_headers.update(headers)
except Exception as e:
log.error(f"准备请求失败: {e}")
return Response(
content=json.dumps({"error": f"准备请求失败: {str(e)}"}),
status_code=500,
media_type="application/json"
)
# 3. 调用post_async进行请求
retry_config = await get_retry_config()
max_retries = retry_config["max_retries"]
retry_interval = retry_config["retry_interval"]
DISABLE_ERROR_CODES = await get_auto_ban_error_codes() # 禁用凭证的错误码
last_error_response = None # 记录最后一次的错误响应
next_cred_task = None # 预热的下一个凭证任务
# 内部函数:快速更新凭证(只更新token和project_id,避免重建整个请求)
async def refresh_credential_fast():
nonlocal current_file, credential_data, auth_headers, final_payload
cred_result = await credential_manager.get_valid_credential(
mode="geminicli", model_name=model_name
)
if not cred_result:
return None
current_file, credential_data = cred_result
try:
# 只更新token和project_id,不重建整个headers和payload
token = credential_data.get("token") or credential_data.get("access_token", "")
project_id = credential_data.get("project_id", "")
if not token or not project_id:
return None
# 直接更新现有的headers和payload
auth_headers["Authorization"] = f"Bearer {token}"
final_payload["project"] = project_id
return True
except Exception:
return None
def apply_cred_result(cred_result: Tuple[str, Dict[str, Any]]) -> bool:
nonlocal current_file, credential_data, auth_headers, final_payload
current_file, credential_data = cred_result
token = credential_data.get("token") or credential_data.get("access_token", "")
project_id = credential_data.get("project_id", "")
if not token or not project_id:
return False
auth_headers["Authorization"] = f"Bearer {token}"
final_payload["project"] = project_id
return True
for attempt in range(max_retries + 1):
try:
response = await post_async(
url=target_url,
json=final_payload,
headers=auth_headers,
timeout=300.0
)
status_code = response.status_code
# 成功
if status_code == 200:
await record_api_call_success(
credential_manager, current_file, mode="geminicli", model_name=model_name
)
# 创建响应头,移除压缩相关的header避免重复解压
response_headers = dict(response.headers)
response_headers.pop('content-encoding', None)
response_headers.pop('content-length', None)
return Response(
content=response.content,
status_code=200,
headers=response_headers
)
# 失败 - 记录最后一次错误
# 创建响应头,移除压缩相关的header避免重复解压
error_headers = dict(response.headers)
error_headers.pop('content-encoding', None)
error_headers.pop('content-length', None)
last_error_response = Response(
content=response.content,
status_code=status_code,
headers=error_headers
)
# 判断是否需要重试
# 缓存错误文本,避免重复解析
error_text = ""
try:
error_text = response.text
except Exception:
pass
# 统一处理所有需要重试的错误码(429、503、禁用码)
if _is_retryable_status(status_code, DISABLE_ERROR_CODES):
log.warning(f"[NON-STREAM] 非流式请求失败 (status={status_code}), 凭证: {current_file}, 响应: {error_text[:500] if error_text else '无'}")
# 解析冷却时间
cooldown_until = None
if (status_code == 429 or status_code == 503) and error_text:
try:
cooldown_until = await parse_and_log_cooldown(error_text, mode="geminicli")
except Exception:
pass
# 并行预热下一个凭证,不阻塞当前处理
if next_cred_task is None and attempt < max_retries:
next_cred_task = asyncio.create_task(
credential_manager.get_valid_credential(
mode="geminicli", model_name=model_name
)
)
# 记录错误并切换凭证
await record_api_call_error(
credential_manager, current_file, status_code,
cooldown_until, mode="geminicli", model_name=model_name,
error_message=error_text
)
# 检查是否应该重试(会自动处理禁用逻辑)
should_retry = await handle_error_with_retry(
credential_manager, status_code, current_file,
retry_config["retry_enabled"], attempt, max_retries, retry_interval,
mode="geminicli"
)
if should_retry and attempt < max_retries:
# 重新获取凭证并重试
log.info(f"[NON-STREAM] 重试请求 (attempt {attempt + 2}/{max_retries + 1})...")
switched, next_cred_task = await _switch_credential_for_retry(
next_cred_task=next_cred_task,
retry_interval=retry_interval,
refresh_credential_fast=refresh_credential_fast,
apply_cred_result=apply_cred_result,
log_prefix="[NON-STREAM]",
)
if not switched:
log.error("[NON-STREAM] 重试时无可用凭证或刷新失败")
return Response(
content=json.dumps({"error": "当前无可用凭证"}),
status_code=500,
media_type="application/json"
)
continue # 重试
else:
# 不重试,直接返回原始错误
log.error(f"[NON-STREAM] 达到最大重试次数或不应重试,返回原始错误")
return last_error_response
elif status_code == 404 and "preview" in model_name.lower():
# 特殊处理:preview模型返回404,说明该凭证不支持preview模型
log.warning(f"[NON-STREAM] Preview模型404错误,凭证不支持preview: {current_file}")
# 将该凭证的preview状态设置为False
try:
await credential_manager.update_credential_state(
current_file, {"preview": False}, mode="geminicli"
)
log.info(f"[NON-STREAM] 已将凭证 {current_file} 的preview状态设置为False")
except Exception as e:
log.error(f"[NON-STREAM] 更新凭证preview状态失败: {e}")
# 记录404错误
await record_api_call_error(
credential_manager, current_file, status_code,
None, mode="geminicli", model_name=model_name,
error_message=error_text
)
# 预热下一个凭证(会自动跳过preview=False的凭证)
if next_cred_task is None and attempt < max_retries:
next_cred_task = asyncio.create_task(
credential_manager.get_valid_credential(
mode="geminicli", model_name=model_name
)
)
# 触发重试
if attempt < max_retries:
log.info(f"[NON-STREAM] 重试请求 (attempt {attempt + 2}/{max_retries + 1})...")
switched, next_cred_task = await _switch_credential_for_retry(
next_cred_task=next_cred_task,
retry_interval=retry_interval,
refresh_credential_fast=refresh_credential_fast,
apply_cred_result=apply_cred_result,
log_prefix="[NON-STREAM]",
)
if not switched:
log.error("[NON-STREAM] 重试时无可用凭证或刷新失败")
return Response(
content=json.dumps({"error": "当前无可用凭证"}),
status_code=500,
media_type="application/json"
)
continue # 重试
else:
log.error(f"[NON-STREAM] 达到最大重试次数,返回404错误")
return last_error_response
else:
# 错误码不在重试范围内,直接返回
log.error(f"[NON-STREAM] 非流式请求失败,非重试错误码 (status={status_code}), 凭证: {current_file}, 响应: {error_text[:500] if error_text else '无'}")
await record_api_call_error(
credential_manager, current_file, status_code,
None, mode="geminicli", model_name=model_name,
error_message=error_text
)
return last_error_response
except Exception as e:
log.error(f"非流式请求异常: {e}, 凭证: {current_file}")
if attempt < max_retries:
log.info(f"[NON-STREAM] 异常后重试 (attempt {attempt + 2}/{max_retries + 1})...")
await asyncio.sleep(retry_interval)
continue
else:
# 所有重试都失败,返回最后一次的错误(如果有)或500错误
log.error(f"[NON-STREAM] 所有重试均失败,最后异常: {e}")
if last_error_response:
return last_error_response
else:
return Response(
content=json.dumps({"error": f"请求异常: {str(e)}"}),
status_code=500,
media_type="application/json"
)
# 所有重试都失败,返回最后一次的原始错误
log.error("[NON-STREAM] 所有重试均失败")
return last_error_response
# ==================== 测试代码 ====================
if __name__ == "__main__":
"""
测试代码:演示API返回的流式和非流式数据格式
运行方式: python src/api/geminicli.py
"""
print("=" * 80)
print("GeminiCli API 测试")
print("=" * 80)
# 测试请求体
test_body = {
"model": "gemini-2.5-flash",
"request": {
"contents": [
{
"role": "user",
"parts": [{"text": "Hello, tell me a joke in one sentence."}]
}
]
}
}
async def test_stream_request():
"""测试流式请求"""
print("\n" + "=" * 80)
print("【测试1】流式请求 (stream_request with native=False)")
print("=" * 80)
print(f"请求体: {json.dumps(test_body, indent=2, ensure_ascii=False)}\n")
print("流式响应数据 (每个chunk):")
print("-" * 80)
chunk_count = 0
async for chunk in stream_request(body=test_body, native=False):
chunk_count += 1
if isinstance(chunk, Response):
# 错误响应
print(f"\n❌ 错误响应:")
print(f" 状态码: {chunk.status_code}")
print(f" Content-Type: {chunk.headers.get('content-type', 'N/A')}")
try:
content = chunk.body.decode('utf-8') if isinstance(chunk.body, bytes) else str(chunk.body)
print(f" 内容: {content}")
except Exception as e:
print(f" 内容解析失败: {e}")
else:
# 正常的流式数据块 (str类型)
print(f"\nChunk #{chunk_count}:")
print(f" 类型: {type(chunk).__name__}")
print(f" 长度: {len(chunk) if hasattr(chunk, '__len__') else 'N/A'}")
print(f" 内容预览: {repr(chunk[:200] if len(chunk) > 200 else chunk)}")
# 如果是SSE格式,尝试解析
if isinstance(chunk, str) and chunk.startswith("data: "):
try:
data_line = chunk.strip()
if data_line.startswith("data: "):
json_str = data_line[6:] # 去掉 "data: " 前缀
json_data = json.loads(json_str)
print(f" 解析后的JSON: {json.dumps(json_data, indent=4, ensure_ascii=False)}")
except Exception as e:
print(f" SSE解析尝试失败: {e}")
print(f"\n总共收到 {chunk_count} 个chunk")
async def test_non_stream_request():
"""测试非流式请求"""
print("\n" + "=" * 80)
print("【测试2】非流式请求 (non_stream_request)")
print("=" * 80)
print(f"请求体: {json.dumps(test_body, indent=2, ensure_ascii=False)}\n")
response = await non_stream_request(body=test_body)
print("非流式响应数据:")
print("-" * 80)
print(f"状态码: {response.status_code}")
print(f"Content-Type: {response.headers.get('content-type', 'N/A')}")
print(f"\n响应头: {dict(response.headers)}\n")
try:
content = response.body.decode('utf-8') if isinstance(response.body, bytes) else str(response.body)
print(f"响应内容 (原始):\n{content}\n")
# 尝试解析JSON
try:
json_data = json.loads(content)
print(f"响应内容 (格式化JSON):")
print(json.dumps(json_data, indent=2, ensure_ascii=False))
except json.JSONDecodeError:
print("(非JSON格式)")
except Exception as e:
print(f"内容解析失败: {e}")
async def main():
"""主测试函数"""
try:
# 测试流式请求
await test_stream_request()
# 测试非流式请求
await test_non_stream_request()
print("\n" + "=" * 80)
print("测试完成")
print("=" * 80)
except Exception as e:
print(f"\n❌ 测试过程中出现异常: {e}")
import traceback
traceback.print_exc()
# 运行测试
asyncio.run(main())