2api / src /converter /fake_stream.py
lin7zhi's picture
Upload folder using huggingface_hub
69fec20 verified
from typing import Any, Dict, List, Tuple
import json
from src.converter.utils import extract_content_and_reasoning
from log import log
from src.converter.openai2gemini import _convert_usage_metadata
def safe_get_nested(obj: Any, *keys: str, default: Any = None) -> Any:
"""安全获取嵌套字典值
Args:
obj: 字典对象
*keys: 嵌套键路径
default: 默认值
Returns:
获取到的值或默认值
"""
for key in keys:
if not isinstance(obj, dict):
return default
obj = obj.get(key, default)
if obj is default:
return default
return obj
def parse_response_for_fake_stream(response_data: Dict[str, Any]) -> tuple:
"""从完整响应中提取内容和推理内容(用于假流式)
Args:
response_data: Gemini API 响应数据
Returns:
(content, reasoning_content, finish_reason, images): 内容、推理内容、结束原因和图片数据的元组
"""
import json
# 处理GeminiCLI的response包装格式
if "response" in response_data and "candidates" not in response_data:
log.debug(f"[FAKE_STREAM] Unwrapping response field")
response_data = response_data["response"]
candidates = response_data.get("candidates", [])
log.debug(f"[FAKE_STREAM] Found {len(candidates)} candidates")
if not candidates:
return "", "", "STOP", []
candidate = candidates[0]
finish_reason = candidate.get("finishReason", "STOP")
parts = safe_get_nested(candidate, "content", "parts", default=[])
log.debug(f"[FAKE_STREAM] Extracted {len(parts)} parts: {json.dumps(parts, ensure_ascii=False)}")
content, reasoning_content, images = extract_content_and_reasoning(parts)
log.debug(f"[FAKE_STREAM] Content length: {len(content)}, Reasoning length: {len(reasoning_content)}, Images count: {len(images)}")
return content, reasoning_content, finish_reason, images
def extract_fake_stream_content(response: Any) -> Tuple[str, str, Dict[str, int]]:
"""
从 Gemini 非流式响应中提取内容,用于假流式处理
Args:
response: Gemini API 响应对象
Returns:
(content, reasoning_content, usage) 元组
"""
from src.converter.utils import extract_content_and_reasoning
# 解析响应体
if hasattr(response, "body"):
body_str = (
response.body.decode()
if isinstance(response.body, bytes)
else str(response.body)
)
elif hasattr(response, "content"):
body_str = (
response.content.decode()
if isinstance(response.content, bytes)
else str(response.content)
)
else:
body_str = str(response)
try:
response_data = json.loads(body_str)
# GeminiCLI 返回的格式是 {"response": {...}, "traceId": "..."}
# 需要先提取 response 字段
if "response" in response_data:
gemini_response = response_data["response"]
else:
gemini_response = response_data
# 从Gemini响应中提取内容,使用思维链分离逻辑
content = ""
reasoning_content = ""
images = []
if "candidates" in gemini_response and gemini_response["candidates"]:
# Gemini格式响应 - 使用思维链分离
candidate = gemini_response["candidates"][0]
if "content" in candidate and "parts" in candidate["content"]:
parts = candidate["content"]["parts"]
content, reasoning_content, images = extract_content_and_reasoning(parts)
elif "choices" in gemini_response and gemini_response["choices"]:
# OpenAI格式响应
content = gemini_response["choices"][0].get("message", {}).get("content", "")
# 如果没有正常内容但有思维内容,给出警告
if not content and reasoning_content:
log.warning("Fake stream response contains only thinking content")
content = "[模型正在思考中,请稍后再试或重新提问]"
# 如果完全没有内容,提供默认回复
if not content:
log.warning(f"No content found in response: {gemini_response}")
content = "[响应为空,请重新尝试]"
# 转换usageMetadata为OpenAI格式
usage = _convert_usage_metadata(gemini_response.get("usageMetadata"))
return content, reasoning_content, usage
except json.JSONDecodeError:
# 如果不是JSON,直接返回原始文本
return body_str, "", None
def _build_candidate(parts: List[Dict[str, Any]], finish_reason: str = "STOP") -> Dict[str, Any]:
"""构建标准候选响应结构
Args:
parts: parts 列表
finish_reason: 结束原因
Returns:
候选响应字典
"""
return {
"candidates": [{
"content": {"parts": parts, "role": "model"},
"finishReason": finish_reason,
"index": 0,
}]
}
def create_openai_heartbeat_chunk() -> Dict[str, Any]:
"""
创建 OpenAI 格式的心跳块(用于假流式)
Returns:
心跳响应块字典
"""
return {
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": ""},
"finish_reason": None,
}
]
}
def build_gemini_fake_stream_chunks(content: str, reasoning_content: str, finish_reason: str, images: List[Dict[str, Any]] = None, chunk_size: int = 50) -> List[Dict[str, Any]]:
"""构建假流式响应的数据块
Args:
content: 主要内容
reasoning_content: 推理内容
finish_reason: 结束原因
images: 图片数据列表(可选)
chunk_size: 每个chunk的字符数(默认50)
Returns:
响应数据块列表
"""
if images is None:
images = []
log.debug(f"[build_gemini_fake_stream_chunks] Input - content: {repr(content)}, reasoning: {repr(reasoning_content)}, finish_reason: {finish_reason}, images count: {len(images)}")
chunks = []
# 如果没有正常内容但有思维内容,提供默认回复
if not content:
default_text = "[模型正在思考中,请稍后再试或重新提问]" if reasoning_content else "[响应为空,请重新尝试]"
return [_build_candidate([{"text": default_text}], finish_reason)]
# 分块发送主要内容
first_chunk = True
for i in range(0, len(content), chunk_size):
chunk_text = content[i:i + chunk_size]
is_last_chunk = (i + chunk_size >= len(content)) and not reasoning_content
chunk_finish_reason = finish_reason if is_last_chunk else None
# 如果是第一个chunk且有图片,将图片包含在parts中
parts = []
if first_chunk and images:
# 在Gemini格式中,需要将image_url格式转换为inlineData格式
for img in images:
if img.get("type") == "image_url":
url = img.get("image_url", {}).get("url", "")
# 解析 data URL: data:{mime_type};base64,{data}
if url.startswith("data:"):
parts_of_url = url.split(";base64,")
if len(parts_of_url) == 2:
mime_type = parts_of_url[0].replace("data:", "")
base64_data = parts_of_url[1]
parts.append({
"inlineData": {
"mimeType": mime_type,
"data": base64_data
}
})
first_chunk = False
parts.append({"text": chunk_text})
chunk_data = _build_candidate(parts, chunk_finish_reason)
log.debug(f"[build_gemini_fake_stream_chunks] Generated chunk: {chunk_data}")
chunks.append(chunk_data)
# 如果有推理内容,分块发送
if reasoning_content:
for i in range(0, len(reasoning_content), chunk_size):
chunk_text = reasoning_content[i:i + chunk_size]
is_last_chunk = i + chunk_size >= len(reasoning_content)
chunk_finish_reason = finish_reason if is_last_chunk else None
chunks.append(_build_candidate([{"text": chunk_text, "thought": True}], chunk_finish_reason))
log.debug(f"[build_gemini_fake_stream_chunks] Total chunks generated: {len(chunks)}")
return chunks
def create_gemini_heartbeat_chunk() -> Dict[str, Any]:
"""创建 Gemini 格式的心跳数据块
Returns:
心跳数据块
"""
chunk = _build_candidate([{"text": ""}])
chunk["candidates"][0]["finishReason"] = None
return chunk
def build_openai_fake_stream_chunks(content: str, reasoning_content: str, finish_reason: str, model: str, images: List[Dict[str, Any]] = None, chunk_size: int = 50) -> List[Dict[str, Any]]:
"""构建 OpenAI 格式的假流式响应数据块
Args:
content: 主要内容
reasoning_content: 推理内容
finish_reason: 结束原因(如 "STOP", "MAX_TOKENS")
model: 模型名称
images: 图片数据列表(可选)
chunk_size: 每个chunk的字符数(默认50)
Returns:
OpenAI 格式的响应数据块列表
"""
import time
import uuid
if images is None:
images = []
log.debug(f"[build_openai_fake_stream_chunks] Input - content: {repr(content)}, reasoning: {repr(reasoning_content)}, finish_reason: {finish_reason}, images count: {len(images)}")
chunks = []
response_id = f"chatcmpl-{uuid.uuid4().hex[:24]}"
created = int(time.time())
# 映射 Gemini finish_reason 到 OpenAI 格式
openai_finish_reason = None
if finish_reason == "STOP":
openai_finish_reason = "stop"
elif finish_reason == "MAX_TOKENS":
openai_finish_reason = "length"
elif finish_reason in ["SAFETY", "RECITATION"]:
openai_finish_reason = "content_filter"
# 如果没有正常内容但有思维内容,提供默认回复
if not content:
default_text = "[模型正在思考中,请稍后再试或重新提问]" if reasoning_content else "[响应为空,请重新尝试]"
return [{
"id": response_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {"content": default_text},
"finish_reason": openai_finish_reason,
}]
}]
# 分块发送主要内容
first_chunk = True
for i in range(0, len(content), chunk_size):
chunk_text = content[i:i + chunk_size]
is_last_chunk = (i + chunk_size >= len(content)) and not reasoning_content
chunk_finish = openai_finish_reason if is_last_chunk else None
delta_content = {}
# 如果是第一个chunk且有图片,构建包含图片的content数组
if first_chunk and images:
delta_content["content"] = images + [{"type": "text", "text": chunk_text}]
first_chunk = False
else:
delta_content["content"] = chunk_text
chunk_data = {
"id": response_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": delta_content,
"finish_reason": chunk_finish,
}]
}
log.debug(f"[build_openai_fake_stream_chunks] Generated chunk: {chunk_data}")
chunks.append(chunk_data)
# 如果有推理内容,分块发送(使用 reasoning_content 字段)
if reasoning_content:
for i in range(0, len(reasoning_content), chunk_size):
chunk_text = reasoning_content[i:i + chunk_size]
is_last_chunk = i + chunk_size >= len(reasoning_content)
chunk_finish = openai_finish_reason if is_last_chunk else None
chunks.append({
"id": response_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [{
"index": 0,
"delta": {"reasoning_content": chunk_text},
"finish_reason": chunk_finish,
}]
})
log.debug(f"[build_openai_fake_stream_chunks] Total chunks generated: {len(chunks)}")
return chunks
def create_anthropic_heartbeat_chunk() -> Dict[str, Any]:
"""
创建 Anthropic 格式的心跳块(用于假流式)
Returns:
心跳响应块字典
"""
return {
"type": "ping"
}
def build_anthropic_fake_stream_chunks(content: str, reasoning_content: str, finish_reason: str, model: str, images: List[Dict[str, Any]] = None, chunk_size: int = 50) -> List[Dict[str, Any]]:
"""构建 Anthropic 格式的假流式响应数据块
Args:
content: 主要内容
reasoning_content: 推理内容(thinking content)
finish_reason: 结束原因(如 "STOP", "MAX_TOKENS")
model: 模型名称
images: 图片数据列表(可选)
chunk_size: 每个chunk的字符数(默认50)
Returns:
Anthropic SSE 格式的响应数据块列表
"""
import uuid
if images is None:
images = []
log.debug(f"[build_anthropic_fake_stream_chunks] Input - content: {repr(content)}, reasoning: {repr(reasoning_content)}, finish_reason: {finish_reason}, images count: {len(images)}")
chunks = []
message_id = f"msg_{uuid.uuid4().hex}"
# 映射 Gemini finish_reason 到 Anthropic 格式
anthropic_stop_reason = "end_turn"
if finish_reason == "MAX_TOKENS":
anthropic_stop_reason = "max_tokens"
elif finish_reason in ["SAFETY", "RECITATION"]:
anthropic_stop_reason = "end_turn"
# 1. 发送 message_start 事件
chunks.append({
"type": "message_start",
"message": {
"id": message_id,
"type": "message",
"role": "assistant",
"model": model,
"content": [],
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0}
}
})
# 如果没有正常内容但有思维内容,提供默认回复
if not content:
default_text = "[模型正在思考中,请稍后再试或重新提问]" if reasoning_content else "[响应为空,请重新尝试]"
# content_block_start
chunks.append({
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""}
})
# content_block_delta
chunks.append({
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": default_text}
})
# content_block_stop
chunks.append({
"type": "content_block_stop",
"index": 0
})
# message_delta
chunks.append({
"type": "message_delta",
"delta": {"stop_reason": anthropic_stop_reason, "stop_sequence": None},
"usage": {"output_tokens": 0}
})
# message_stop
chunks.append({
"type": "message_stop"
})
return chunks
block_index = 0
# 2. 如果有推理内容,先发送 thinking 块
if reasoning_content:
# thinking content_block_start
chunks.append({
"type": "content_block_start",
"index": block_index,
"content_block": {"type": "thinking", "thinking": ""}
})
# 分块发送推理内容
for i in range(0, len(reasoning_content), chunk_size):
chunk_text = reasoning_content[i:i + chunk_size]
chunks.append({
"type": "content_block_delta",
"index": block_index,
"delta": {"type": "thinking_delta", "thinking": chunk_text}
})
# thinking content_block_stop
chunks.append({
"type": "content_block_stop",
"index": block_index
})
block_index += 1
# 3. 如果有图片,发送图片块
if images:
for img in images:
if img.get("type") == "image_url":
url = img.get("image_url", {}).get("url", "")
# 解析 data URL: data:{mime_type};base64,{data}
if url.startswith("data:"):
parts_of_url = url.split(";base64,")
if len(parts_of_url) == 2:
mime_type = parts_of_url[0].replace("data:", "")
base64_data = parts_of_url[1]
# image content_block_start
chunks.append({
"type": "content_block_start",
"index": block_index,
"content_block": {
"type": "image",
"source": {
"type": "base64",
"media_type": mime_type,
"data": base64_data
}
}
})
# image content_block_stop
chunks.append({
"type": "content_block_stop",
"index": block_index
})
block_index += 1
# 4. 发送主要内容(text 块)
# text content_block_start
chunks.append({
"type": "content_block_start",
"index": block_index,
"content_block": {"type": "text", "text": ""}
})
# 分块发送主要内容
for i in range(0, len(content), chunk_size):
chunk_text = content[i:i + chunk_size]
chunks.append({
"type": "content_block_delta",
"index": block_index,
"delta": {"type": "text_delta", "text": chunk_text}
})
# text content_block_stop
chunks.append({
"type": "content_block_stop",
"index": block_index
})
# 5. 发送 message_delta
chunks.append({
"type": "message_delta",
"delta": {"stop_reason": anthropic_stop_reason, "stop_sequence": None},
"usage": {"output_tokens": len(content) + len(reasoning_content)}
})
# 6. 发送 message_stop
chunks.append({
"type": "message_stop"
})
log.debug(f"[build_anthropic_fake_stream_chunks] Total chunks generated: {len(chunks)}")
return chunks