| """
|
| OpenAI API endpoints
|
| """
|
|
|
| import time
|
| import json
|
| import asyncio
|
| from datetime import datetime
|
| from typing import List, Dict, Any
|
| from fastapi import APIRouter, Header, HTTPException
|
| from fastapi.responses import StreamingResponse
|
| import httpx
|
|
|
| from app.core.config import settings
|
| from app.models.schemas import OpenAIRequest, Message, ModelsResponse, Model
|
| from app.utils.helpers import debug_log
|
| from app.core.zai_transformer import ZAITransformer, generate_uuid
|
| from app.utils.sse_tool_handler import SSEToolHandler
|
|
|
| router = APIRouter()
|
|
|
|
|
| transformer = ZAITransformer()
|
|
|
|
|
| @router.get("/v1/models")
|
| async def list_models():
|
| """List available models"""
|
| current_time = int(time.time())
|
| response = ModelsResponse(
|
| data=[
|
| Model(id=settings.PRIMARY_MODEL, created=current_time, owned_by="z.ai"),
|
| Model(id=settings.THINKING_MODEL, created=current_time, owned_by="z.ai"),
|
| Model(id=settings.SEARCH_MODEL, created=current_time, owned_by="z.ai"),
|
| Model(id=settings.AIR_MODEL, created=current_time, owned_by="z.ai"),
|
| Model(id=settings.GLM_46_MODEL, created=current_time, owned_by="z.ai"),
|
| Model(id=settings.GLM_46_THINKING_MODEL, created=current_time, owned_by="z.ai"),
|
| ]
|
| )
|
| return response
|
|
|
|
|
| @router.post("/v1/chat/completions")
|
| async def chat_completions(request: OpenAIRequest, authorization: str = Header(...)):
|
| """Handle chat completion requests with ZAI transformer"""
|
| role = request.messages[0].role if request.messages else "unknown"
|
| debug_log(f"😶🌫️ 收到 客户端 请求 - 模型: {request.model}, 流式: {request.stream}, 消息数: {len(request.messages)}, 角色: {role}, 工具数: {len(request.tools) if request.tools else 0}")
|
|
|
| try:
|
|
|
| if not settings.SKIP_AUTH_TOKEN:
|
| if not authorization.startswith("Bearer "):
|
| raise HTTPException(status_code=401, detail="Missing or invalid Authorization header")
|
|
|
| api_key = authorization[7:]
|
| if api_key != settings.AUTH_TOKEN:
|
| raise HTTPException(status_code=401, detail="Invalid API key")
|
|
|
|
|
| request_dict = request.model_dump()
|
| debug_log("🔄 开始转换请求格式: OpenAI -> Z.AI")
|
|
|
| transformed = await transformer.transform_request_in(request_dict)
|
|
|
|
|
| async def stream_response():
|
| """流式响应生成器(包含重试机制)"""
|
| retry_count = 0
|
| last_error = None
|
| current_token = transformed.get("token", "")
|
|
|
| while retry_count <= settings.MAX_RETRIES:
|
| try:
|
|
|
| if retry_count > 0:
|
| delay = 2.0
|
| debug_log(f"重试请求 ({retry_count}/{settings.MAX_RETRIES}) - 等待 {delay:.1f}s")
|
| await asyncio.sleep(delay)
|
|
|
|
|
| if current_token:
|
| transformer.mark_token_failure(current_token, Exception(f"Retry {retry_count}: {last_error}"))
|
|
|
|
|
| debug_log("🔑 重新获取令牌用于重试...")
|
| new_token = await transformer.get_token()
|
| if not new_token:
|
| debug_log("❌ 重试时无法获取有效的认证令牌")
|
| raise Exception("重试时无法获取有效的认证令牌")
|
| transformed["config"]["headers"]["Authorization"] = f"Bearer {new_token}"
|
| current_token = new_token
|
|
|
| async with httpx.AsyncClient(timeout=60.0) as client:
|
|
|
|
|
| async with client.stream(
|
| "POST",
|
| transformed["config"]["url"],
|
| json=transformed["body"],
|
| headers=transformed["config"]["headers"],
|
| ) as response:
|
|
|
| if response.status_code == 400:
|
|
|
| error_text = await response.aread()
|
| error_msg = error_text.decode('utf-8', errors='ignore')
|
| debug_log(f"❌ 上游返回 400 错误 (尝试 {retry_count + 1}/{settings.MAX_RETRIES + 1})")
|
| debug_log(f"上游错误响应: {error_msg}")
|
|
|
| retry_count += 1
|
| last_error = f"400 Bad Request: {error_msg}"
|
|
|
|
|
| if retry_count <= settings.MAX_RETRIES:
|
| continue
|
| else:
|
|
|
| debug_log(f"❌ 达到最大重试次数 ({settings.MAX_RETRIES}),请求失败")
|
| error_response = {
|
| "error": {
|
| "message": f"Request failed after {settings.MAX_RETRIES} retries: {last_error}",
|
| "type": "upstream_error",
|
| "code": 400
|
| }
|
| }
|
| yield f"data: {json.dumps(error_response)}\n\n"
|
| yield "data: [DONE]\n\n"
|
| return
|
|
|
| elif response.status_code == 401:
|
|
|
| debug_log(f"❌ 认证失败 (401),标记token失效")
|
| if current_token:
|
| transformer.mark_token_failure(current_token, Exception("401 Unauthorized"))
|
|
|
| retry_count += 1
|
| last_error = "401 Unauthorized - Token may be invalid"
|
|
|
| if retry_count <= settings.MAX_RETRIES:
|
| continue
|
| else:
|
| error_response = {
|
| "error": {
|
| "message": "Authentication failed after retries",
|
| "type": "auth_error",
|
| "code": 401
|
| }
|
| }
|
| yield f"data: {json.dumps(error_response)}\n\n"
|
| yield "data: [DONE]\n\n"
|
| return
|
|
|
| elif response.status_code == 429:
|
|
|
| debug_log(f"❌ 速率限制 (429),将延长等待时间重试")
|
| retry_count += 1
|
| last_error = "429 Rate Limited"
|
|
|
| if retry_count <= settings.MAX_RETRIES:
|
| continue
|
| else:
|
| error_response = {
|
| "error": {
|
| "message": "Rate limit exceeded",
|
| "type": "rate_limit_error",
|
| "code": 429
|
| }
|
| }
|
| yield f"data: {json.dumps(error_response)}\n\n"
|
| yield "data: [DONE]\n\n"
|
| return
|
|
|
| elif response.status_code != 200:
|
|
|
| error_text = await response.aread()
|
| error_msg = error_text.decode('utf-8', errors='ignore')
|
| debug_log(f"❌ 上游返回错误: {response.status_code}, 详情: {error_msg}")
|
|
|
|
|
| retryable_codes = [502, 503, 504]
|
| if response.status_code in retryable_codes and retry_count < settings.MAX_RETRIES:
|
| retry_count += 1
|
| last_error = f"{response.status_code}: {error_msg}"
|
| debug_log(f"⚠️ 服务器错误 {response.status_code},准备重试")
|
| continue
|
|
|
|
|
| error_response = {
|
| "error": {
|
| "message": f"Upstream error: {response.status_code}",
|
| "type": "upstream_error",
|
| "code": response.status_code,
|
| "details": error_msg[:500]
|
| }
|
| }
|
| yield f"data: {json.dumps(error_response)}\n\n"
|
| yield "data: [DONE]\n\n"
|
| return
|
|
|
|
|
| debug_log(f"✅ Z.AI 响应成功,开始处理 SSE 流")
|
| if retry_count > 0:
|
| debug_log(f"✨ 第 {retry_count} 次重试成功")
|
|
|
|
|
| if current_token:
|
| transformer.mark_token_success(current_token)
|
|
|
|
|
| has_tools = transformed["body"].get("tools") is not None
|
| has_mcp_servers = bool(transformed["body"].get("mcp_servers"))
|
| tool_handler = None
|
|
|
|
|
| if has_tools or has_mcp_servers:
|
| chat_id = transformed["body"]["chat_id"]
|
| model = request.model
|
| tool_handler = SSEToolHandler(chat_id, model)
|
|
|
| if has_tools and has_mcp_servers:
|
| debug_log(f"🔧 初始化工具处理器: {len(transformed['body'].get('tools', []))} 个OpenAI工具 + {len(transformed['body'].get('mcp_servers', []))} 个MCP服务器")
|
| elif has_tools:
|
| debug_log(f"🔧 初始化工具处理器: {len(transformed['body'].get('tools', []))} 个OpenAI工具")
|
| elif has_mcp_servers:
|
| debug_log(f"🔧 初始化工具处理器: {len(transformed['body'].get('mcp_servers', []))} 个MCP服务器")
|
|
|
|
|
| has_thinking = False
|
| thinking_signature = None
|
| first_thinking_chunk = True
|
|
|
|
|
| buffer = bytearray()
|
| incomplete_line = ""
|
| line_count = 0
|
| chunk_count = 0
|
| last_activity = time.time()
|
| debug_log("📡 开始接收 SSE 流数据...")
|
|
|
| async for chunk in response.aiter_bytes():
|
| chunk_count += 1
|
| last_activity = time.time()
|
|
|
| if not chunk:
|
| continue
|
|
|
|
|
| buffer.extend(chunk)
|
|
|
|
|
| try:
|
|
|
| text_data = buffer.decode('utf-8')
|
|
|
|
|
| lines = text_data.split('\n')
|
|
|
|
|
| if not text_data.endswith('\n'):
|
| incomplete_line = lines[-1]
|
| lines = lines[:-1]
|
| else:
|
|
|
| if incomplete_line:
|
| lines[0] = incomplete_line + lines[0]
|
| incomplete_line = ""
|
|
|
|
|
| buffer = bytearray()
|
| if incomplete_line:
|
| buffer.extend(incomplete_line.encode('utf-8'))
|
|
|
|
|
| for current_line in lines:
|
| line_count += 1
|
| if not current_line.strip():
|
| continue
|
|
|
| if current_line.startswith("data:"):
|
| chunk_str = current_line[5:].strip()
|
| if not chunk_str or chunk_str == "[DONE]":
|
| if chunk_str == "[DONE]":
|
| debug_log("📡 收到 [DONE] 信号")
|
| yield "data: [DONE]\n\n"
|
| continue
|
|
|
|
|
|
|
| try:
|
| chunk = json.loads(chunk_str)
|
|
|
| if chunk.get("type") == "chat:completion":
|
| data = chunk.get("data", {})
|
| phase = data.get("phase")
|
|
|
|
|
| if phase and phase != getattr(stream_response, '_last_phase', None):
|
| debug_log(f"📈 SSE 阶段: {phase}")
|
| stream_response._last_phase = phase
|
|
|
|
|
| if phase == "tool_call" and tool_handler:
|
| for output in tool_handler.process_tool_call_phase(data, True):
|
| yield output
|
|
|
|
|
| elif phase == "other" and tool_handler:
|
| for output in tool_handler.process_other_phase(data, True):
|
| yield output
|
|
|
|
|
| elif phase == "thinking":
|
| if not has_thinking:
|
| has_thinking = True
|
|
|
| role_chunk = {
|
| "choices": [
|
| {
|
| "delta": {"role": "assistant"},
|
| "finish_reason": None,
|
| "index": 0,
|
| "logprobs": None,
|
| }
|
| ],
|
| "created": int(time.time()),
|
| "id": transformed["body"]["chat_id"],
|
| "model": request.model,
|
| "object": "chat.completion.chunk",
|
| "system_fingerprint": "fp_zai_001",
|
| }
|
| yield f"data: {json.dumps(role_chunk)}\n\n"
|
|
|
| delta_content = data.get("delta_content", "")
|
| if delta_content:
|
|
|
| if delta_content.startswith("<details"):
|
| content = (
|
| delta_content.split("</summary>\n>")[-1].strip()
|
| if "</summary>\n>" in delta_content
|
| else delta_content
|
| )
|
| else:
|
| content = delta_content
|
|
|
|
|
| if first_thinking_chunk:
|
| formatted_content = f"<think>{content}"
|
| first_thinking_chunk = False
|
| else:
|
| formatted_content = content
|
|
|
| thinking_chunk = {
|
| "choices": [
|
| {
|
| "delta": {
|
| "role": "assistant",
|
| "content": formatted_content,
|
| },
|
| "finish_reason": None,
|
| "index": 0,
|
| "logprobs": None,
|
| }
|
| ],
|
| "created": int(time.time()),
|
| "id": transformed["body"]["chat_id"],
|
| "model": request.model,
|
| "object": "chat.completion.chunk",
|
| "system_fingerprint": "fp_zai_001",
|
| }
|
| yield f"data: {json.dumps(thinking_chunk)}\n\n"
|
|
|
|
|
| elif phase == "answer":
|
| edit_content = data.get("edit_content", "")
|
| delta_content = data.get("delta_content", "")
|
|
|
|
|
| if not has_thinking:
|
| has_thinking = True
|
| role_chunk = {
|
| "choices": [
|
| {
|
| "delta": {"role": "assistant"},
|
| "finish_reason": None,
|
| "index": 0,
|
| "logprobs": None,
|
| }
|
| ],
|
| "created": int(time.time()),
|
| "id": transformed["body"]["chat_id"],
|
| "model": request.model,
|
| "object": "chat.completion.chunk",
|
| "system_fingerprint": "fp_zai_001",
|
| }
|
| debug_log("➡️ 发送初始角色chunk")
|
| yield f"data: {json.dumps(role_chunk)}\n\n"
|
|
|
|
|
| if edit_content and "</details>\n" in edit_content:
|
| if has_thinking and not first_thinking_chunk:
|
|
|
| thinking_signature = str(int(time.time() * 1000))
|
| sig_chunk = {
|
| "choices": [
|
| {
|
| "delta": {
|
| "role": "assistant",
|
| "content": "</think>",
|
| },
|
| "finish_reason": None,
|
| "index": 0,
|
| "logprobs": None,
|
| }
|
| ],
|
| "created": int(time.time()),
|
| "id": transformed["body"]["chat_id"],
|
| "model": request.model,
|
| "object": "chat.completion.chunk",
|
| "system_fingerprint": "fp_zai_001",
|
| }
|
| yield f"data: {json.dumps(sig_chunk)}\n\n"
|
|
|
|
|
| content_after = edit_content.split("</details>\n")[-1]
|
| if content_after:
|
| content_chunk = {
|
| "choices": [
|
| {
|
| "delta": {
|
| "role": "assistant",
|
| "content": content_after,
|
| },
|
| "finish_reason": None,
|
| "index": 0,
|
| "logprobs": None,
|
| }
|
| ],
|
| "created": int(time.time()),
|
| "id": transformed["body"]["chat_id"],
|
| "model": request.model,
|
| "object": "chat.completion.chunk",
|
| "system_fingerprint": "fp_zai_001",
|
| }
|
| yield f"data: {json.dumps(content_chunk)}\n\n"
|
|
|
|
|
| elif delta_content:
|
|
|
| if not has_thinking:
|
| has_thinking = True
|
| role_chunk = {
|
| "choices": [
|
| {
|
| "delta": {"role": "assistant"},
|
| "finish_reason": None,
|
| "index": 0,
|
| "logprobs": None,
|
| }
|
| ],
|
| "created": int(time.time()),
|
| "id": transformed["body"]["chat_id"],
|
| "model": request.model,
|
| "object": "chat.completion.chunk",
|
| "system_fingerprint": "fp_zai_001",
|
| }
|
| debug_log("➡️ 发送初始角色chunk")
|
| yield f"data: {json.dumps(role_chunk)}\n\n"
|
|
|
| content_chunk = {
|
| "choices": [
|
| {
|
| "delta": {
|
| "content": delta_content,
|
| },
|
| "finish_reason": None,
|
| "index": 0,
|
| "logprobs": None,
|
| }
|
| ],
|
| "created": int(time.time()),
|
| "id": transformed["body"]["chat_id"],
|
| "model": request.model,
|
| "object": "chat.completion.chunk",
|
| "system_fingerprint": "fp_zai_001",
|
| }
|
| output_data = f"data: {json.dumps(content_chunk)}\n\n"
|
|
|
| yield output_data
|
|
|
|
|
| if data.get("usage"):
|
| debug_log(f"📦 完成响应 - 使用统计: {json.dumps(data['usage'])}")
|
|
|
|
|
| if not tool_handler or not tool_handler.has_tool_call:
|
| finish_chunk = {
|
| "choices": [
|
| {
|
| "delta": {},
|
| "finish_reason": "stop",
|
| "index": 0,
|
| "logprobs": None,
|
| }
|
| ],
|
| "usage": data["usage"],
|
| "created": int(time.time()),
|
| "id": transformed["body"]["chat_id"],
|
| "model": request.model,
|
| "object": "chat.completion.chunk",
|
| "system_fingerprint": "fp_zai_001",
|
| }
|
| finish_output = f"data: {json.dumps(finish_chunk)}\n\n"
|
| debug_log("➡️ 发送完成信号")
|
| yield finish_output
|
| debug_log("➡️ 发送 [DONE]")
|
| yield "data: [DONE]\n\n"
|
|
|
| except json.JSONDecodeError as e:
|
| debug_log(f"❌ JSON解析错误: {e}, 内容: {chunk_str[:200]}")
|
| except Exception as e:
|
| debug_log(f"❌ 处理chunk错误: {e}")
|
|
|
| except UnicodeDecodeError:
|
|
|
| debug_log(f"⚠️ 数据解码失败,缓冲区大小: {len(buffer)}")
|
| if len(buffer) > 1024 * 1024:
|
| debug_log("❌ 缓冲区过大,清空重试")
|
| buffer = bytearray()
|
| incomplete_line = ""
|
| except Exception as e:
|
| debug_log(f"❌ Buffer处理异常: {e}")
|
|
|
| buffer = bytearray()
|
| incomplete_line = ""
|
|
|
|
|
| if time.time() - last_activity > 30:
|
| debug_log("⚠️ 检测到长时间无活动,可能连接中断")
|
| break
|
|
|
|
|
| if not tool_handler or not tool_handler.has_tool_call:
|
| debug_log("📤 发送最终 [DONE] 信号")
|
| yield "data: [DONE]\n\n"
|
|
|
| debug_log(f"✅ SSE 流处理完成,共处理 {line_count} 行数据,{chunk_count} 个数据块")
|
|
|
|
|
| is_complete = True
|
| completion_issues = []
|
|
|
| if line_count == 0:
|
| is_complete = False
|
| completion_issues.append("没有处理任何数据行")
|
| elif chunk_count == 0:
|
| is_complete = False
|
| completion_issues.append("没有收到任何数据块")
|
| elif chunk_count > 0:
|
| debug_log(f"📊 平均每个数据块包含 {line_count/chunk_count:.1f} 行")
|
|
|
|
|
| if tool_handler and tool_handler.has_tool_call:
|
| if not tool_handler.completed_tools:
|
| completion_issues.append("工具调用未正常完成")
|
| else:
|
| debug_log(f"✅ 工具调用完成: {len(tool_handler.completed_tools)} 个工具")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| if is_complete and not completion_issues:
|
| debug_log("✅ 响应完整性检查通过")
|
| else:
|
| debug_log(f"⚠️ 响应完整性问题: {', '.join(completion_issues)}")
|
|
|
|
|
| critical_issues = ["没有处理任何数据行", "没有收到任何数据块"]
|
| has_critical_issue = any(issue in completion_issues for issue in critical_issues)
|
|
|
| if has_critical_issue and retry_count < settings.MAX_RETRIES:
|
| debug_log("🔄 检测到严重完整性问题,准备重试")
|
| retry_count += 1
|
| last_error = f"Incomplete response: {', '.join(completion_issues)}"
|
| continue
|
|
|
|
|
| return
|
|
|
| except Exception as e:
|
| debug_log(f"❌ 流处理错误: {e}")
|
| import traceback
|
| debug_log(traceback.format_exc())
|
|
|
|
|
| if current_token:
|
| transformer.mark_token_failure(current_token, e)
|
|
|
|
|
| retry_count += 1
|
| last_error = str(e)
|
|
|
| if retry_count > settings.MAX_RETRIES:
|
|
|
| debug_log(f"❌ 达到最大重试次数 ({settings.MAX_RETRIES}),流处理失败")
|
| error_response = {
|
| "error": {
|
| "message": f"Stream processing failed after {settings.MAX_RETRIES} retries: {last_error}",
|
| "type": "stream_error"
|
| }
|
| }
|
| yield f"data: {json.dumps(error_response)}\n\n"
|
| yield "data: [DONE]\n\n"
|
| return
|
|
|
|
|
| debug_log("🚀 启动 SSE 流式响应")
|
|
|
|
|
| async def logged_stream():
|
| chunk_count = 0
|
| try:
|
| debug_log("📤 开始向客户端流式传输数据...")
|
| async for chunk in stream_response():
|
| chunk_count += 1
|
|
|
| yield chunk
|
| debug_log(f"✅ 流式传输完成,共发送 {chunk_count} 个数据块")
|
| except Exception as e:
|
| debug_log(f"❌ 流式传输中断: {e}")
|
| raise
|
|
|
| return StreamingResponse(
|
| logged_stream(),
|
| media_type="text/event-stream",
|
| headers={
|
| "Cache-Control": "no-cache",
|
| "Connection": "keep-alive",
|
| },
|
| )
|
|
|
| except HTTPException:
|
| raise
|
| except Exception as e:
|
| debug_log(f"❌ 处理请求时发生错误: {str(e)}")
|
| import traceback
|
|
|
| debug_log(f"❌ 错误堆栈: {traceback.format_exc()}")
|
| raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |