"""Grok API 响应处理器 - 处理流式和非流式响应"""
import orjson
import uuid
import time
import asyncio
from typing import AsyncGenerator, Tuple, Any
from app.core.config import setting
from app.core.exception import GrokApiException
from app.core.logger import logger
from app.models.openai_schema import (
OpenAIChatCompletionResponse,
OpenAIChatCompletionChoice,
OpenAIChatCompletionMessage,
OpenAIChatCompletionChunkResponse,
OpenAIChatCompletionChunkChoice,
OpenAIChatCompletionChunkMessage
)
from app.services.grok.cache import image_cache_service, video_cache_service
class StreamTimeoutManager:
"""流式响应超时管理"""
def __init__(self, chunk_timeout: int = 120, first_timeout: int = 30, total_timeout: int = 600):
self.chunk_timeout = chunk_timeout
self.first_timeout = first_timeout
self.total_timeout = total_timeout
self.start_time = asyncio.get_event_loop().time()
self.last_chunk_time = self.start_time
self.first_received = False
def check_timeout(self) -> Tuple[bool, str]:
"""检查超时"""
now = asyncio.get_event_loop().time()
if not self.first_received and now - self.start_time > self.first_timeout:
return True, f"首次响应超时({self.first_timeout}秒)"
if self.total_timeout > 0 and now - self.start_time > self.total_timeout:
return True, f"总超时({self.total_timeout}秒)"
if self.first_received and now - self.last_chunk_time > self.chunk_timeout:
return True, f"数据块超时({self.chunk_timeout}秒)"
return False, ""
def mark_received(self):
"""标记收到数据"""
self.last_chunk_time = asyncio.get_event_loop().time()
self.first_received = True
def duration(self) -> float:
"""获取总耗时"""
return asyncio.get_event_loop().time() - self.start_time
class GrokResponseProcessor:
"""Grok响应处理器"""
@staticmethod
async def process_normal(response, auth_token: str, model: str = None) -> OpenAIChatCompletionResponse:
"""处理非流式响应"""
response_closed = False
try:
async for chunk in response.aiter_lines():
if not chunk:
continue
data = orjson.loads(chunk)
# 错误检查
if error := data.get("error"):
raise GrokApiException(
f"API错误: {error.get('message', '未知错误')}",
"API_ERROR",
{"code": error.get("code")}
)
grok_resp = data.get("result", {}).get("response", {})
# 视频响应
if video_resp := grok_resp.get("streamingVideoGenerationResponse"):
if video_url := video_resp.get("videoUrl"):
content = await GrokResponseProcessor._build_video_content(video_url, auth_token)
result = GrokResponseProcessor._build_response(content, model or "grok-imagine-0.9")
response_closed = True
response.close()
return result
# 模型响应
model_response = grok_resp.get("modelResponse")
if not model_response:
continue
if error_msg := model_response.get("error"):
raise GrokApiException(f"模型错误: {error_msg}", "MODEL_ERROR")
# 构建内容
content = model_response.get("message", "")
model_name = model_response.get("model")
# 处理图片
if images := model_response.get("generatedImageUrls"):
content = await GrokResponseProcessor._append_images(content, images, auth_token)
result = GrokResponseProcessor._build_response(content, model_name)
response_closed = True
response.close()
return result
raise GrokApiException("无响应数据", "NO_RESPONSE")
except orjson.JSONDecodeError as e:
logger.error(f"[Processor] JSON解析失败: {e}")
raise GrokApiException(f"JSON解析失败: {e}", "JSON_ERROR") from e
except Exception as e:
logger.error(f"[Processor] 处理错误: {type(e).__name__}: {e}")
raise GrokApiException(f"响应处理错误: {e}", "PROCESS_ERROR") from e
finally:
if not response_closed and hasattr(response, 'close'):
try:
response.close()
except Exception as e:
logger.warning(f"[Processor] 关闭响应失败: {e}")
@staticmethod
async def process_stream(response, auth_token: str, session: Any = None) -> AsyncGenerator[str, None]:
"""处理流式响应"""
# 状态变量
is_image = False
is_thinking = False
thinking_finished = False
model = None
filtered_tags = setting.grok_config.get("filtered_tags", "").split(",")
video_progress_started = False
last_video_progress = -1
response_closed = False
show_thinking = setting.grok_config.get("show_thinking", True)
# 超时管理
timeout_mgr = StreamTimeoutManager(
chunk_timeout=setting.grok_config.get("stream_chunk_timeout", 120),
first_timeout=setting.grok_config.get("stream_first_response_timeout", 30),
total_timeout=setting.grok_config.get("stream_total_timeout", 600)
)
def make_chunk(content: str, finish: str = None):
"""生成响应块"""
chunk_data = OpenAIChatCompletionChunkResponse(
id=f"chatcmpl-{uuid.uuid4()}",
created=int(time.time()),
model=model or "grok-4-mini-thinking-tahoe",
choices=[OpenAIChatCompletionChunkChoice(
index=0,
delta=OpenAIChatCompletionChunkMessage(
role="assistant",
content=content
) if content else {},
finish_reason=finish
)]
)
return f"data: {chunk_data.model_dump_json()}\n\n"
try:
async for chunk in response.aiter_lines():
# 超时检查
is_timeout, timeout_msg = timeout_mgr.check_timeout()
if is_timeout:
logger.warning(f"[Processor] {timeout_msg}")
yield make_chunk("", "stop")
yield "data: [DONE]\n\n"
return
logger.debug(f"[Processor] 收到数据块: {len(chunk)} bytes")
if not chunk:
continue
try:
data = orjson.loads(chunk)
# 错误检查
if error := data.get("error"):
error_msg = error.get('message', '未知错误')
logger.error(f"[Processor] API错误: {error_msg}")
yield make_chunk(f"Error: {error_msg}", "stop")
yield "data: [DONE]\n\n"
return
grok_resp = data.get("result", {}).get("response", {})
logger.debug(f"[Processor] 解析响应: {len(grok_resp)} bytes")
if not grok_resp:
continue
timeout_mgr.mark_received()
# 更新模型
if user_resp := grok_resp.get("userResponse"):
if m := user_resp.get("model"):
model = m
# 视频处理
if video_resp := grok_resp.get("streamingVideoGenerationResponse"):
progress = video_resp.get("progress", 0)
v_url = video_resp.get("videoUrl")
# 进度更新
if progress > last_video_progress:
last_video_progress = progress
if show_thinking:
if not video_progress_started:
content = f"视频已生成{progress}%\n"
video_progress_started = True
elif progress < 100:
content = f"视频已生成{progress}%\n"
else:
content = f"视频已生成{progress}%\n"
yield make_chunk(content)
# 视频URL
if v_url:
logger.debug("[Processor] 视频生成完成")
video_content = await GrokResponseProcessor._build_video_content(v_url, auth_token)
yield make_chunk(video_content)
continue
# 图片模式
if grok_resp.get("imageAttachmentInfo"):
is_image = True
token = grok_resp.get("token", "")
# 图片处理
if is_image:
if model_resp := grok_resp.get("modelResponse"):
image_mode = setting.global_config.get("image_mode", "url")
content = ""
for img in model_resp.get("generatedImageUrls", []):
try:
if image_mode == "base64":
# Base64模式 - 分块发送
base64_str = await image_cache_service.download_base64(f"/{img}", auth_token)
if base64_str:
# 分块发送大数据
if not base64_str.startswith("data:"):
parts = base64_str.split(",", 1)
if len(parts) == 2:
yield make_chunk(f"
# 8KB分块
for i in range(0, len(parts[1]), 8192):
yield make_chunk(parts[1][i:i+8192])
yield make_chunk(")\n")
else:
yield make_chunk(f"\n")
else:
yield make_chunk(f"\n")
else:
yield make_chunk(f"\n")
else:
# URL模式
await image_cache_service.download_image(f"/{img}", auth_token)
img_path = img.replace('/', '-')
base_url = setting.global_config.get("base_url", "")
img_url = f"{base_url}/images/{img_path}" if base_url else f"/images/{img_path}"
content += f"\n"
except Exception as e:
logger.warning(f"[Processor] 处理图片失败: {e}")
content += f"\n"
yield make_chunk(content.strip(), "stop")
return
elif token:
yield make_chunk(token)
# 对话处理
else:
if isinstance(token, list):
continue
if any(tag in token for tag in filtered_tags if token):
continue
current_is_thinking = grok_resp.get("isThinking", False)
message_tag = grok_resp.get("messageTag")
if thinking_finished and current_is_thinking:
continue
# 搜索结果处理
if grok_resp.get("toolUsageCardId"):
if web_search := grok_resp.get("webSearchResults"):
if current_is_thinking:
if show_thinking:
for result in web_search.get("results", []):
title = result.get("title", "")
url = result.get("url", "")
preview = result.get("preview", "")
preview_clean = preview.replace("\n", "") if isinstance(preview, str) else ""
token += f'\n- [{title}]({url} "{preview_clean}")'
token += "\n"
else:
continue
else:
continue
else:
continue
if token:
content = token
if message_tag == "header":
content = f"\n\n{token}\n\n"
# Thinking状态切换
should_skip = False
if not is_thinking and current_is_thinking:
if show_thinking:
content = f"\n{content}"
else:
should_skip = True
elif is_thinking and not current_is_thinking:
if show_thinking:
content = f"\n\n{content}"
thinking_finished = True
elif current_is_thinking:
if not show_thinking:
should_skip = True
if not should_skip:
yield make_chunk(content)
is_thinking = current_is_thinking
except (orjson.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f"[Processor] 解析失败: {e}")
continue
except Exception as e:
logger.warning(f"[Processor] 处理出错: {e}")
continue
yield make_chunk("", "stop")
yield "data: [DONE]\n\n"
logger.info(f"[Processor] 流式完成,耗时: {timeout_mgr.duration():.2f}秒")
except Exception as e:
logger.error(f"[Processor] 严重错误: {e}")
yield make_chunk(f"处理错误: {e}", "error")
yield "data: [DONE]\n\n"
finally:
if not response_closed and hasattr(response, 'close'):
try:
response.close()
logger.debug("[Processor] 响应已关闭")
except Exception as e:
logger.warning(f"[Processor] 关闭失败: {e}")
if session:
try:
await session.close()
logger.debug("[Processor] 会话已关闭")
except Exception as e:
logger.warning(f"[Processor] 关闭会话失败: {e}")
@staticmethod
async def _build_video_content(video_url: str, auth_token: str) -> str:
"""构建视频内容"""
logger.debug(f"[Processor] 检测到视频: {video_url}")
full_url = f"https://assets.grok.com/{video_url}"
try:
cache_path = await video_cache_service.download_video(f"/{video_url}", auth_token)
if cache_path:
video_path = video_url.replace('/', '-')
base_url = setting.global_config.get("base_url", "")
local_url = f"{base_url}/images/{video_path}" if base_url else f"/images/{video_path}"
return f'\n'
except Exception as e:
logger.warning(f"[Processor] 缓存视频失败: {e}")
return f'\n'
@staticmethod
async def _append_images(content: str, images: list, auth_token: str) -> str:
"""追加图片到内容"""
image_mode = setting.global_config.get("image_mode", "url")
for img in images:
try:
if image_mode == "base64":
base64_str = await image_cache_service.download_base64(f"/{img}", auth_token)
if base64_str:
content += f"\n"
else:
content += f"\n"
else:
cache_path = await image_cache_service.download_image(f"/{img}", auth_token)
if cache_path:
img_path = img.replace('/', '-')
base_url = setting.global_config.get("base_url", "")
img_url = f"{base_url}/images/{img_path}" if base_url else f"/images/{img_path}"
content += f"\n"
else:
content += f"\n"
except Exception as e:
logger.warning(f"[Processor] 处理图片失败: {e}")
content += f"\n"
return content
@staticmethod
def _build_response(content: str, model: str) -> OpenAIChatCompletionResponse:
"""构建响应对象"""
return OpenAIChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4()}",
object="chat.completion",
created=int(time.time()),
model=model,
choices=[OpenAIChatCompletionChoice(
index=0,
message=OpenAIChatCompletionMessage(
role="assistant",
content=content
),
finish_reason="stop"
)],
usage=None
)