| |
|
|
| """ |
| OpenAI Compatible API Server for Z.AI |
| ===================================== |
| |
| This module provides an OpenAI-compatible API server that forwards requests |
| to the Z.AI chat service with proper authentication and response formatting. |
| """ |
|
|
| import json |
| import os |
| import re |
| import time |
| from datetime import datetime |
| from typing import Dict, List, Optional, Any, Union, Generator, Tuple |
|
|
| import requests |
| from fastapi import FastAPI, Request, Response, HTTPException, Header |
| from fastapi.responses import StreamingResponse, JSONResponse |
| from pydantic import BaseModel, Field |
|
|
|
|
| |
| |
| |
|
|
| class ServerConfig: |
| """Centralized server configuration""" |
| |
| |
| API_ENDPOINT: str = os.getenv("API_ENDPOINT", "https://chat.z.ai/api/chat/completions") |
| AUTH_TOKEN: str = os.getenv("AUTH_TOKEN", "sk-your-api-key") |
| BACKUP_TOKEN: str = os.getenv("BACKUP_TOKEN", "eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6IjMxNmJjYjQ4LWZmMmYtNGExNS04NTNkLWYyYTI5YjY3ZmYwZiIsImVtYWlsIjoiR3Vlc3QtMTc1NTg0ODU4ODc4OEBndWVzdC5jb20ifQ.PktllDySS3trlyuFpTeIZf-7hl8Qu1qYF3BxjgIul0BrNux2nX9hVzIjthLXKMWAf9V0qM8Vm_iyDqkjPGsaiQ") |
| |
| |
| PRIMARY_MODEL: str = os.getenv("PRIMARY_MODEL", "GLM-4.5") |
| THINKING_MODEL: str = os.getenv("THINKING_MODEL", "GLM-4.5-Thinking") |
| SEARCH_MODEL: str = os.getenv("SEARCH_MODEL", "GLM-4.5-Search") |
| |
| |
| LISTEN_PORT: int = int(os.getenv("LISTEN_PORT", "8080")) |
| DEBUG_LOGGING: bool = os.getenv("DEBUG_LOGGING", "true").lower() == "true" |
| |
| |
| THINKING_PROCESSING: str = os.getenv("THINKING_PROCESSING", "think") |
| ANONYMOUS_MODE: bool = os.getenv("ANONYMOUS_MODE", "true").lower() == "true" |
| TOOL_SUPPORT: bool = os.getenv("TOOL_SUPPORT", "true").lower() == "true" |
| SCAN_LIMIT: int = int(os.getenv("SCAN_LIMIT", "200000")) |
| |
| |
| CLIENT_HEADERS: Dict[str, str] = { |
| "Content-Type": "application/json", |
| "Accept": "application/json, text/event-stream", |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36 Edg/139.0.0.0", |
| "Accept-Language": "zh-CN", |
| "sec-ch-ua": '"Not;A=Brand";v="99", "Microsoft Edge";v="139", "Chromium";v="139"', |
| "sec-ch-ua-mobile": "?0", |
| "sec-ch-ua-platform": '"Windows"', |
| "X-FE-Version": "prod-fe-1.0.70", |
| "Origin": "https://chat.z.ai", |
| } |
|
|
|
|
| |
| |
| |
|
|
| class Message(BaseModel): |
| """Chat message model""" |
| role: str |
| content: Optional[str] = None |
| reasoning_content: Optional[str] = None |
| tool_calls: Optional[List[Dict[str, Any]]] = None |
|
|
|
|
| class OpenAIRequest(BaseModel): |
| """OpenAI-compatible request model""" |
| model: str |
| messages: List[Message] |
| stream: Optional[bool] = False |
| temperature: Optional[float] = None |
| max_tokens: Optional[int] = None |
| tools: Optional[List[Dict[str, Any]]] = None |
| tool_choice: Optional[Any] = None |
|
|
|
|
| class ModelItem(BaseModel): |
| """Model information item""" |
| id: str |
| name: str |
| owned_by: str |
|
|
|
|
| class UpstreamRequest(BaseModel): |
| """Upstream service request model""" |
| stream: bool |
| model: str |
| messages: List[Message] |
| params: Dict[str, Any] = {} |
| features: Dict[str, Any] = {} |
| background_tasks: Optional[Dict[str, bool]] = None |
| chat_id: Optional[str] = None |
| id: Optional[str] = None |
| mcp_servers: Optional[List[str]] = None |
| model_item: Optional[ModelItem] = None |
| tool_servers: Optional[List[str]] = None |
| variables: Optional[Dict[str, str]] = None |
| model_config = {'protected_namespaces': ()} |
|
|
|
|
| class Delta(BaseModel): |
| """Stream delta model""" |
| role: Optional[str] = None |
| content: Optional[str] = None |
| reasoning_content: Optional[str] = None |
| tool_calls: Optional[List[Dict[str, Any]]] = None |
|
|
|
|
| class Choice(BaseModel): |
| """Response choice model""" |
| index: int |
| message: Optional[Message] = None |
| delta: Optional[Delta] = None |
| finish_reason: Optional[str] = None |
|
|
|
|
| class Usage(BaseModel): |
| """Token usage statistics""" |
| prompt_tokens: int = 0 |
| completion_tokens: int = 0 |
| total_tokens: int = 0 |
|
|
|
|
| class OpenAIResponse(BaseModel): |
| """OpenAI-compatible response model""" |
| id: str |
| object: str |
| created: int |
| model: str |
| choices: List[Choice] |
| usage: Optional[Usage] = None |
|
|
|
|
| class UpstreamError(BaseModel): |
| """Upstream error model""" |
| detail: str |
| code: int |
|
|
|
|
| class UpstreamDataInner(BaseModel): |
| """Inner upstream data model""" |
| error: Optional[UpstreamError] = None |
|
|
|
|
| class UpstreamDataData(BaseModel): |
| """Upstream data content model""" |
| delta_content: str = "" |
| edit_content: str = "" |
| phase: str = "" |
| done: bool = False |
| usage: Optional[Usage] = None |
| error: Optional[UpstreamError] = None |
| inner: Optional[UpstreamDataInner] = None |
|
|
|
|
| class UpstreamData(BaseModel): |
| """Upstream data model""" |
| type: str |
| data: UpstreamDataData |
| error: Optional[UpstreamError] = None |
|
|
|
|
| class Model(BaseModel): |
| """Model information for listing""" |
| id: str |
| object: str = "model" |
| created: int |
| owned_by: str |
|
|
|
|
| class ModelsResponse(BaseModel): |
| """Models list response model""" |
| object: str = "list" |
| data: List[Model] |
|
|
|
|
| |
| |
| |
|
|
| class SSEParser: |
| """Server-Sent Events parser for streaming responses""" |
| |
| def __init__(self, response: requests.Response, debug_mode: bool = False): |
| """Initialize SSE parser |
| |
| Args: |
| response: requests.Response object with stream=True |
| debug_mode: Enable debug logging |
| """ |
| self.response = response |
| self.debug_mode = debug_mode |
| self.buffer = "" |
| self.line_count = 0 |
| |
| def debug_log(self, format_str: str, *args) -> None: |
| """Log debug message if debug mode is enabled""" |
| if self.debug_mode: |
| if args: |
| print(f"[SSE_PARSER] {format_str % args}") |
| else: |
| print(f"[SSE_PARSER] {format_str}") |
| |
| def iter_events(self) -> Generator[Dict[str, Any], None, None]: |
| """Iterate over SSE events |
| |
| Yields: |
| dict: Parsed SSE event data |
| """ |
| self.debug_log("开始解析 SSE 流") |
| |
| for line in self.response.iter_lines(): |
| self.line_count += 1 |
| |
| |
| if not line: |
| continue |
| |
| |
| if isinstance(line, bytes): |
| try: |
| line = line.decode('utf-8') |
| except UnicodeDecodeError: |
| self.debug_log(f"第{self.line_count}行解码失败,跳过") |
| continue |
| |
| |
| if line.startswith(':'): |
| continue |
| |
| |
| if ':' in line: |
| field, value = line.split(':', 1) |
| field = field.strip() |
| value = value.lstrip() |
| |
| if field == 'data': |
| self.debug_log(f"收到数据 (第{self.line_count}行): {value}") |
| |
| |
| try: |
| data = json.loads(value) |
| yield { |
| 'type': 'data', |
| 'data': data, |
| 'raw': value |
| } |
| except json.JSONDecodeError: |
| yield { |
| 'type': 'data', |
| 'data': value, |
| 'raw': value, |
| 'is_json': False |
| } |
| |
| elif field == 'event': |
| yield {'type': 'event', 'event': value} |
| |
| elif field == 'id': |
| yield {'type': 'id', 'id': value} |
| |
| elif field == 'retry': |
| try: |
| retry = int(value) |
| yield {'type': 'retry', 'retry': retry} |
| except ValueError: |
| self.debug_log(f"无效的 retry 值: {value}") |
| |
| def iter_data_only(self) -> Generator[Dict[str, Any], None, None]: |
| """Iterate only over data events""" |
| for event in self.iter_events(): |
| if event['type'] == 'data': |
| yield event |
| |
| def iter_json_data(self, model_class: Optional[type] = None) -> Generator[Dict[str, Any], None, None]: |
| """Iterate only over JSON data events with optional validation |
| |
| Args: |
| model_class: Optional Pydantic model class for validation |
| |
| Yields: |
| dict: JSON data events |
| """ |
| for event in self.iter_events(): |
| if event['type'] == 'data' and event.get('is_json', True): |
| try: |
| if model_class: |
| data = model_class.model_validate_json(event['raw']) |
| yield { |
| 'type': 'data', |
| 'data': data, |
| 'raw': event['raw'] |
| } |
| else: |
| yield event |
| except Exception as e: |
| self.debug_log(f"数据验证失败: {e}") |
| continue |
| |
| def close(self) -> None: |
| """Close the response connection""" |
| if hasattr(self.response, 'close'): |
| self.response.close() |
| |
| def __enter__(self): |
| """Context manager entry""" |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb) -> None: |
| """Context manager exit""" |
| self.close() |
|
|
|
|
| |
| |
| |
|
|
| def generate_tool_prompt(tools: List[Dict[str, Any]]) -> str: |
| """Generate tool injection prompt with enhanced formatting""" |
| if not tools: |
| return "" |
| |
| tool_definitions = [] |
| for tool in tools: |
| if tool.get("type") != "function": |
| continue |
| |
| function_spec = tool.get("function", {}) or {} |
| function_name = function_spec.get("name", "unknown") |
| function_description = function_spec.get("description", "") |
| parameters = function_spec.get("parameters", {}) or {} |
| |
| |
| tool_info = [f"## {function_name}", f"**Purpose**: {function_description}"] |
| |
| |
| parameter_properties = parameters.get("properties", {}) or {} |
| required_parameters = set(parameters.get("required", []) or []) |
| |
| if parameter_properties: |
| tool_info.append("**Parameters**:") |
| for param_name, param_details in parameter_properties.items(): |
| param_type = (param_details or {}).get("type", "any") |
| param_desc = (param_details or {}).get("description", "") |
| requirement_flag = "**Required**" if param_name in required_parameters else "*Optional*" |
| tool_info.append(f"- `{param_name}` ({param_type}) - {requirement_flag}: {param_desc}") |
| |
| tool_definitions.append("\n".join(tool_info)) |
| |
| if not tool_definitions: |
| return "" |
| |
| |
| prompt_template = ( |
| "\n\n# AVAILABLE FUNCTIONS\n" + |
| "\n\n---\n".join(tool_definitions) + |
| "\n\n# USAGE INSTRUCTIONS\n" |
| "When you need to execute a function, respond ONLY with a JSON object containing tool_calls:\n" |
| "```json\n" |
| "{\n" |
| ' "tool_calls": [\n' |
| " {\n" |
| ' "id": "call_" + unique_id,\n' |
| ' "type": "function",\n' |
| ' "function": {\n' |
| ' "name": "function_name",\n' |
| ' "arguments": {\n' |
| ' "param1": "value1"\n' |
| ' }\n' |
| " }\n" |
| " }\n" |
| " ]\n" |
| "}\n" |
| "```\n" |
| "Important: No explanatory text before or after the JSON.\n" |
| ) |
| |
| return prompt_template |
|
|
|
|
| def process_messages_with_tools( |
| messages: List[Dict[str, Any]], |
| tools: Optional[List[Dict[str, Any]]] = None, |
| tool_choice: Optional[Any] = None |
| ) -> List[Dict[str, Any]]: |
| """Process messages and inject tool prompts""" |
| processed: List[Dict[str, Any]] = [] |
| |
| if tools and ServerConfig.TOOL_SUPPORT and (tool_choice != "none"): |
| tools_prompt = generate_tool_prompt(tools) |
| has_system = any(m.get("role") == "system" for m in messages) |
| |
| if has_system: |
| for m in messages: |
| if m.get("role") == "system": |
| mm = dict(m) |
| content = mm.get("content", "") |
| if content is None: |
| content = "" |
| mm["content"] = content + tools_prompt |
| processed.append(mm) |
| else: |
| processed.append(m) |
| else: |
| processed = [{"role": "system", "content": "你是一个有用的助手。" + tools_prompt}] + messages |
| |
| |
| if tool_choice in ("required", "auto"): |
| if processed and processed[-1].get("role") == "user": |
| last = dict(processed[-1]) |
| content = last.get("content", "") |
| if content is None: |
| content = "" |
| last["content"] = content + "\n\n请根据需要使用提供的工具函数。" |
| processed[-1] = last |
| elif isinstance(tool_choice, dict) and tool_choice.get("type") == "function": |
| fname = (tool_choice.get("function") or {}).get("name") |
| if fname and processed and processed[-1].get("role") == "user": |
| last = dict(processed[-1]) |
| content = last.get("content", "") |
| if content is None: |
| content = "" |
| last["content"] = content + f"\n\n请使用 {fname} 函数来处理这个请求。" |
| processed[-1] = last |
| else: |
| processed = list(messages) |
| |
| |
| final_msgs: List[Dict[str, Any]] = [] |
| for m in processed: |
| role = m.get("role") |
| if role in ("tool", "function"): |
| tool_name = m.get("name", "unknown") |
| tool_content = m.get("content", "") |
| if isinstance(tool_content, dict): |
| tool_content = json.dumps(tool_content, ensure_ascii=False) |
| elif tool_content is None: |
| tool_content = "" |
| |
| |
| content = f"工具 {tool_name} 返回结果:\n```json\n{tool_content}\n```" |
| if not content.strip(): |
| content = f"工具 {tool_name} 执行完成" |
| |
| final_msgs.append({ |
| "role": "assistant", |
| "content": content, |
| }) |
| else: |
| final_msgs.append(m) |
| |
| return final_msgs |
|
|
|
|
| |
| TOOL_CALL_FENCE_PATTERN = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL) |
| TOOL_CALL_INLINE_PATTERN = re.compile(r"(\{[^{}]{0,10000}\"tool_calls\".*?\})", re.DOTALL) |
| FUNCTION_CALL_PATTERN = re.compile(r"调用函数\s*[::]\s*([\w\-\.]+)\s*(?:参数|arguments)[::]\s*(\{.*?\})", re.DOTALL) |
|
|
|
|
| def extract_tool_invocations(text: str) -> Optional[List[Dict[str, Any]]]: |
| """Extract tool invocations from response text""" |
| if not text: |
| return None |
| |
| |
| scannable_text = text[:ServerConfig.SCAN_LIMIT] |
| |
| |
| json_blocks = TOOL_CALL_FENCE_PATTERN.findall(scannable_text) |
| for json_block in json_blocks: |
| try: |
| parsed_data = json.loads(json_block) |
| tool_calls = parsed_data.get("tool_calls") |
| if tool_calls and isinstance(tool_calls, list): |
| return tool_calls |
| except (json.JSONDecodeError, AttributeError): |
| continue |
| |
| |
| inline_match = TOOL_CALL_INLINE_PATTERN.search(scannable_text) |
| if inline_match: |
| try: |
| inline_json = inline_match.group(1) |
| parsed_data = json.loads(inline_json) |
| tool_calls = parsed_data.get("tool_calls") |
| if tool_calls and isinstance(tool_calls, list): |
| return tool_calls |
| except (json.JSONDecodeError, AttributeError): |
| pass |
| |
| |
| natural_lang_match = FUNCTION_CALL_PATTERN.search(scannable_text) |
| if natural_lang_match: |
| function_name = natural_lang_match.group(1).strip() |
| arguments_str = natural_lang_match.group(2).strip() |
| try: |
| |
| json.loads(arguments_str) |
| return [{ |
| "id": f"invoke_{int(time.time() * 1000000)}", |
| "type": "function", |
| "function": { |
| "name": function_name, |
| "arguments": arguments_str |
| } |
| }] |
| except json.JSONDecodeError: |
| return None |
| |
| return None |
|
|
|
|
| def remove_tool_json_content(text: str) -> str: |
| """Remove tool JSON content from response text""" |
| def remove_tool_call_block(match: re.Match) -> str: |
| json_content = match.group(1) |
| try: |
| parsed_data = json.loads(json_content) |
| if "tool_calls" in parsed_data: |
| return "" |
| except (json.JSONDecodeError, AttributeError): |
| pass |
| return match.group(0) |
| |
| |
| cleaned_text = TOOL_CALL_FENCE_PATTERN.sub(remove_tool_call_block, text) |
| |
| cleaned_text = TOOL_CALL_INLINE_PATTERN.sub("", cleaned_text) |
| return cleaned_text.strip() |
|
|
|
|
| |
| |
| |
|
|
| def debug_log(message: str, *args) -> None: |
| """Log debug message if debug mode is enabled""" |
| if ServerConfig.DEBUG_LOGGING: |
| if args: |
| print(f"[DEBUG] {message % args}") |
| else: |
| print(f"[DEBUG] {message}") |
|
|
|
|
| def generate_request_ids() -> Tuple[str, str]: |
| """Generate unique IDs for chat and message""" |
| timestamp = int(time.time()) |
| chat_id = f"{timestamp * 1000}-{timestamp}" |
| msg_id = str(timestamp * 1000000) |
| return chat_id, msg_id |
|
|
|
|
| def get_browser_headers(referer_chat_id: str = "") -> Dict[str, str]: |
| """Get browser headers for API requests""" |
| headers = ServerConfig.CLIENT_HEADERS.copy() |
| |
| if referer_chat_id: |
| headers["Referer"] = f"{ServerConfig.CLIENT_HEADERS['Origin']}/c/{referer_chat_id}" |
| |
| return headers |
|
|
|
|
| def get_anonymous_token() -> str: |
| """Get anonymous token for authentication""" |
| headers = get_browser_headers() |
| headers.update({ |
| "Accept": "*/*", |
| "Accept-Language": "zh-CN,zh;q=0.9", |
| "Referer": f"{ServerConfig.CLIENT_HEADERS['Origin']}/", |
| }) |
| |
| try: |
| response = requests.get( |
| f"{ServerConfig.CLIENT_HEADERS['Origin']}/api/v1/auths/", |
| headers=headers, |
| timeout=10.0 |
| ) |
| |
| if response.status_code != 200: |
| raise Exception(f"anon token status={response.status_code}") |
| |
| data = response.json() |
| token = data.get("token") |
| if not token: |
| raise Exception("anon token empty") |
| |
| return token |
| except Exception as e: |
| debug_log(f"获取匿名token失败: {e}") |
| raise |
|
|
|
|
| def get_auth_token() -> str: |
| """Get authentication token (anonymous or fixed)""" |
| if ServerConfig.ANONYMOUS_MODE: |
| try: |
| token = get_anonymous_token() |
| debug_log(f"匿名token获取成功: {token[:10]}...") |
| return token |
| except Exception as e: |
| debug_log(f"匿名token获取失败,回退固定token: {e}") |
| |
| return ServerConfig.BACKUP_TOKEN |
|
|
|
|
| def transform_thinking_content(content: str) -> str: |
| """Transform thinking content according to configuration""" |
| |
| content = re.sub(r'(?s)<summary>.*?</summary>', '', content) |
| |
| content = content.replace("</thinking>", "").replace("<Full>", "").replace("</Full>", "") |
| content = content.strip() |
| |
| if ServerConfig.THINKING_PROCESSING == "think": |
| content = re.sub(r'<details[^>]*>', '<think>', content) |
| content = content.replace("</details>", "</think>") |
| elif ServerConfig.THINKING_PROCESSING == "strip": |
| content = re.sub(r'<details[^>]*>', '', content) |
| content = content.replace("</details>", "") |
| |
| |
| content = content.lstrip("> ") |
| content = content.replace("\n> ", "\n") |
| |
| return content.strip() |
|
|
|
|
| def create_openai_response_chunk( |
| model: str, |
| delta: Optional[Delta] = None, |
| finish_reason: Optional[str] = None |
| ) -> OpenAIResponse: |
| """Create OpenAI response chunk for streaming""" |
| return OpenAIResponse( |
| id=f"chatcmpl-{int(time.time())}", |
| object="chat.completion.chunk", |
| created=int(time.time()), |
| model=model, |
| choices=[Choice( |
| index=0, |
| delta=delta or Delta(), |
| finish_reason=finish_reason |
| )] |
| ) |
|
|
|
|
| def handle_upstream_error(error: UpstreamError) -> Generator[str, None, None]: |
| """Handle upstream error response""" |
| debug_log(f"上游错误: code={error.code}, detail={error.detail}") |
| |
| |
| end_chunk = create_openai_response_chunk( |
| model=ServerConfig.PRIMARY_MODEL, |
| finish_reason="stop" |
| ) |
| yield f"data: {end_chunk.model_dump_json()}\n\n" |
| yield "data: [DONE]\n\n" |
|
|
|
|
| def call_upstream_api( |
| upstream_req: UpstreamRequest, |
| chat_id: str, |
| auth_token: str |
| ) -> requests.Response: |
| """Call upstream API with proper headers""" |
| headers = get_browser_headers(chat_id) |
| headers["Authorization"] = f"Bearer {auth_token}" |
| |
| debug_log(f"调用上游API: {ServerConfig.API_ENDPOINT}") |
| debug_log(f"上游请求体: {upstream_req.model_dump_json()}") |
| |
| response = requests.post( |
| ServerConfig.API_ENDPOINT, |
| json=upstream_req.model_dump(exclude_none=True), |
| headers=headers, |
| timeout=60.0, |
| stream=True |
| ) |
| |
| debug_log(f"上游响应状态: {response.status_code}") |
| return response |
|
|
|
|
| |
| |
| |
|
|
| class ResponseHandler: |
| """Base class for response handling""" |
| |
| def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str): |
| self.upstream_req = upstream_req |
| self.chat_id = chat_id |
| self.auth_token = auth_token |
| |
| def _call_upstream(self) -> requests.Response: |
| """Call upstream API with error handling""" |
| try: |
| return call_upstream_api(self.upstream_req, self.chat_id, self.auth_token) |
| except Exception as e: |
| debug_log(f"调用上游失败: {e}") |
| raise |
| |
| def _handle_upstream_error(self, response: requests.Response) -> None: |
| """Handle upstream error response""" |
| debug_log(f"上游返回错误状态: {response.status_code}") |
| if ServerConfig.DEBUG_LOGGING: |
| debug_log(f"上游错误响应: {response.text}") |
|
|
|
|
| class StreamResponseHandler(ResponseHandler): |
| """Handler for streaming responses""" |
| |
| def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False): |
| super().__init__(upstream_req, chat_id, auth_token) |
| self.has_tools = has_tools |
| self.buffered_content = "" |
| self.tool_calls = None |
| |
| def handle(self) -> Generator[str, None, None]: |
| """Handle streaming response""" |
| debug_log(f"开始处理流式响应 (chat_id={self.chat_id})") |
| |
| try: |
| response = self._call_upstream() |
| except Exception: |
| yield "data: {\"error\": \"Failed to call upstream\"}\n\n" |
| return |
| |
| if response.status_code != 200: |
| self._handle_upstream_error(response) |
| yield "data: {\"error\": \"Upstream error\"}\n\n" |
| return |
| |
| |
| first_chunk = create_openai_response_chunk( |
| model=ServerConfig.PRIMARY_MODEL, |
| delta=Delta(role="assistant") |
| ) |
| yield f"data: {first_chunk.model_dump_json()}\n\n" |
| |
| |
| debug_log("开始读取上游SSE流") |
| sent_initial_answer = False |
| |
| with SSEParser(response, debug_mode=ServerConfig.DEBUG_LOGGING) as parser: |
| for event in parser.iter_json_data(UpstreamData): |
| upstream_data = event['data'] |
| |
| |
| if self._has_error(upstream_data): |
| error = self._get_error(upstream_data) |
| yield from handle_upstream_error(error) |
| break |
| |
| debug_log(f"解析成功 - 类型: {upstream_data.type}, 阶段: {upstream_data.data.phase}, " |
| f"内容长度: {len(upstream_data.data.delta_content)}, 完成: {upstream_data.data.done}") |
| |
| |
| yield from self._process_content(upstream_data, sent_initial_answer) |
| |
| |
| if upstream_data.data.done or upstream_data.data.phase == "done": |
| debug_log("检测到流结束信号") |
| yield from self._send_end_chunk() |
| break |
| |
| def _has_error(self, upstream_data: UpstreamData) -> bool: |
| """Check if upstream data contains error""" |
| return bool( |
| upstream_data.error or |
| upstream_data.data.error or |
| (upstream_data.data.inner and upstream_data.data.inner.error) |
| ) |
| |
| def _get_error(self, upstream_data: UpstreamData) -> UpstreamError: |
| """Get error from upstream data""" |
| return ( |
| upstream_data.error or |
| upstream_data.data.error or |
| (upstream_data.data.inner.error if upstream_data.data.inner else None) |
| ) |
| |
| def _process_content( |
| self, |
| upstream_data: UpstreamData, |
| sent_initial_answer: bool |
| ) -> Generator[str, None, None]: |
| """Process content from upstream data""" |
| content = upstream_data.data.delta_content or upstream_data.data.edit_content |
| |
| if not content: |
| return |
| |
| |
| if upstream_data.data.phase == "thinking": |
| content = transform_thinking_content(content) |
| |
| |
| if self.has_tools: |
| self.buffered_content += content |
| else: |
| |
| if (not sent_initial_answer and |
| upstream_data.data.edit_content and |
| upstream_data.data.phase == "answer"): |
| |
| content = self._extract_edit_content(upstream_data.data.edit_content) |
| if content: |
| debug_log(f"发送普通内容: {content}") |
| chunk = create_openai_response_chunk( |
| model=ServerConfig.PRIMARY_MODEL, |
| delta=Delta(content=content) |
| ) |
| yield f"data: {chunk.model_dump_json()}\n\n" |
| sent_initial_answer = True |
| |
| |
| if upstream_data.data.delta_content: |
| if content: |
| if upstream_data.data.phase == "thinking": |
| debug_log(f"发送思考内容: {content}") |
| chunk = create_openai_response_chunk( |
| model=ServerConfig.PRIMARY_MODEL, |
| delta=Delta(reasoning_content=content) |
| ) |
| else: |
| debug_log(f"发送普通内容: {content}") |
| chunk = create_openai_response_chunk( |
| model=ServerConfig.PRIMARY_MODEL, |
| delta=Delta(content=content) |
| ) |
| yield f"data: {chunk.model_dump_json()}\n\n" |
| |
| def _extract_edit_content(self, edit_content: str) -> str: |
| """Extract content from edit_content field""" |
| parts = edit_content.split("</details>") |
| return parts[1] if len(parts) > 1 else "" |
| |
| def _send_end_chunk(self) -> Generator[str, None, None]: |
| """Send end chunk and DONE signal""" |
| if self.has_tools: |
| |
| self.tool_calls = extract_tool_invocations(self.buffered_content) |
| |
| if self.tool_calls: |
| |
| tool_calls_list = [] |
| for i, tc in enumerate(self.tool_calls): |
| tool_calls_list.append({ |
| "index": i, |
| "id": tc.get("id"), |
| "type": tc.get("type", "function"), |
| "function": tc.get("function", {}), |
| }) |
| |
| out_chunk = create_openai_response_chunk( |
| model=ServerConfig.PRIMARY_MODEL, |
| delta=Delta(tool_calls=tool_calls_list) |
| ) |
| yield f"data: {out_chunk.model_dump_json()}\n\n" |
| finish_reason = "tool_calls" |
| else: |
| |
| trimmed_content = remove_tool_json_content(self.buffered_content) |
| if trimmed_content: |
| content_chunk = create_openai_response_chunk( |
| model=ServerConfig.PRIMARY_MODEL, |
| delta=Delta(content=trimmed_content) |
| ) |
| yield f"data: {content_chunk.model_dump_json()}\n\n" |
| finish_reason = "stop" |
| else: |
| finish_reason = "stop" |
| |
| |
| end_chunk = create_openai_response_chunk( |
| model=ServerConfig.PRIMARY_MODEL, |
| finish_reason=finish_reason |
| ) |
| yield f"data: {end_chunk.model_dump_json()}\n\n" |
| yield "data: [DONE]\n\n" |
| debug_log("流式响应完成") |
|
|
|
|
| class NonStreamResponseHandler(ResponseHandler): |
| """Handler for non-streaming responses""" |
| |
| def __init__(self, upstream_req: UpstreamRequest, chat_id: str, auth_token: str, has_tools: bool = False): |
| super().__init__(upstream_req, chat_id, auth_token) |
| self.has_tools = has_tools |
| |
| def handle(self) -> JSONResponse: |
| """Handle non-streaming response""" |
| debug_log(f"开始处理非流式响应 (chat_id={self.chat_id})") |
| |
| try: |
| response = self._call_upstream() |
| except Exception as e: |
| debug_log(f"调用上游失败: {e}") |
| raise HTTPException(status_code=502, detail="Failed to call upstream") |
| |
| if response.status_code != 200: |
| self._handle_upstream_error(response) |
| raise HTTPException(status_code=502, detail="Upstream error") |
| |
| |
| full_content = [] |
| debug_log("开始收集完整响应内容") |
| |
| with SSEParser(response, debug_mode=ServerConfig.DEBUG_LOGGING) as parser: |
| for event in parser.iter_json_data(UpstreamData): |
| upstream_data = event['data'] |
| |
| if upstream_data.data.delta_content: |
| content = upstream_data.data.delta_content |
| |
| if upstream_data.data.phase == "thinking": |
| content = transform_thinking_content(content) |
| |
| if content: |
| full_content.append(content) |
| |
| if upstream_data.data.done or upstream_data.data.phase == "done": |
| debug_log("检测到完成信号,停止收集") |
| break |
| |
| final_content = "".join(full_content) |
| debug_log(f"内容收集完成,最终长度: {len(final_content)}") |
| |
| |
| tool_calls = None |
| finish_reason = "stop" |
| message_content = final_content |
| |
| if self.has_tools: |
| tool_calls = extract_tool_invocations(final_content) |
| if tool_calls: |
| |
| message_content = None |
| finish_reason = "tool_calls" |
| else: |
| |
| message_content = remove_tool_json_content(final_content) |
| |
| |
| response_data = OpenAIResponse( |
| id=f"chatcmpl-{int(time.time())}", |
| object="chat.completion", |
| created=int(time.time()), |
| model=ServerConfig.PRIMARY_MODEL, |
| choices=[Choice( |
| index=0, |
| message=Message( |
| role="assistant", |
| content=message_content, |
| tool_calls=tool_calls |
| ), |
| finish_reason=finish_reason |
| )], |
| usage=Usage() |
| ) |
| |
| debug_log("非流式响应发送完成") |
| return JSONResponse(content=response_data.model_dump(exclude_none=True)) |
|
|
|
|
| |
| |
| |
|
|
| app = FastAPI( |
| title="OpenAI Compatible API Server", |
| description="An OpenAI-compatible API server for Z.AI chat service", |
| version="1.0.0" |
| ) |
|
|
|
|
| |
| @app.middleware("http") |
| async def add_cors_headers(request: Request, call_next): |
| """Add CORS headers to responses""" |
| response = await call_next(request) |
| response.headers.update({ |
| "Access-Control-Allow-Origin": "*", |
| "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS", |
| "Access-Control-Allow-Headers": "Content-Type, Authorization", |
| "Access-Control-Allow-Credentials": "true" |
| }) |
| return response |
|
|
|
|
| |
| |
| |
|
|
| @app.options("/") |
| async def handle_options(): |
| """Handle OPTIONS requests""" |
| return Response(status_code=200) |
|
|
|
|
| @app.get("/") |
| async def root(): |
| """Root endpoint""" |
| return {"message": "OpenAI Compatible API Server"} |
|
|
|
|
| @app.get("/v1/models") |
| async def list_models(): |
| """List available models""" |
| current_time = int(time.time()) |
| response = ModelsResponse( |
| data=[ |
| Model( |
| id=ServerConfig.PRIMARY_MODEL, |
| created=current_time, |
| owned_by="z.ai" |
| ), |
| Model( |
| id=ServerConfig.THINKING_MODEL, |
| created=current_time, |
| owned_by="z.ai" |
| ), |
| Model( |
| id=ServerConfig.SEARCH_MODEL, |
| created=current_time, |
| owned_by="z.ai" |
| ), |
| ] |
| ) |
| return response |
|
|
|
|
| @app.post("/v1/chat/completions") |
| async def chat_completions( |
| request: OpenAIRequest, |
| authorization: str = Header(...) |
| ): |
| """Handle chat completion requests""" |
| debug_log("收到chat completions请求") |
| |
| try: |
| |
| if not authorization.startswith("Bearer "): |
| debug_log("缺少或无效的Authorization头") |
| raise HTTPException(status_code=401, detail="Missing or invalid Authorization header") |
| |
| api_key = authorization[7:] |
| if api_key != ServerConfig.AUTH_TOKEN: |
| debug_log(f"无效的API key: {api_key}") |
| raise HTTPException(status_code=401, detail="Invalid API key") |
| |
| debug_log(f"API key验证通过,AUTH_TOKEN={api_key[:8]}......") |
| debug_log(f"请求解析成功 - 模型: {request.model}, 流式: {request.stream}, 消息数: {len(request.messages)}") |
| |
| |
| chat_id, msg_id = generate_request_ids() |
| |
| |
| processed_messages = process_messages_with_tools( |
| [m.model_dump() for m in request.messages], |
| request.tools, |
| request.tool_choice |
| ) |
| |
| |
| upstream_messages = [] |
| for msg in processed_messages: |
| content = msg.get("content") |
| |
| if content is None: |
| content = "" |
| |
| upstream_messages.append(Message( |
| role=msg["role"], |
| content=content, |
| reasoning_content=msg.get("reasoning_content") |
| )) |
| |
| |
| is_thinking = request.model == ServerConfig.THINKING_MODEL |
| is_search = request.model == ServerConfig.SEARCH_MODEL |
| search_mcp = "deep-web-search" if is_search else "" |
| |
| |
| upstream_req = UpstreamRequest( |
| stream=True, |
| chat_id=chat_id, |
| id=msg_id, |
| model="0727-360B-API", |
| messages=upstream_messages, |
| params={}, |
| features={ |
| "enable_thinking": is_thinking, |
| "web_search": is_search, |
| "auto_web_search": is_search, |
| }, |
| background_tasks={ |
| "title_generation": False, |
| "tags_generation": False, |
| }, |
| mcp_servers=[search_mcp] if search_mcp else [], |
| model_item=ModelItem( |
| id="0727-360B-API", |
| name="GLM-4.5", |
| owned_by="openai" |
| ), |
| tool_servers=[], |
| variables={ |
| "{{USER_NAME}}": "User", |
| "{{USER_LOCATION}}": "Unknown", |
| "{{CURRENT_DATETIME}}": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
| } |
| ) |
| |
| |
| auth_token = get_auth_token() |
| |
| |
| has_tools = (ServerConfig.TOOL_SUPPORT and |
| request.tools and |
| len(request.tools) > 0 and |
| request.tool_choice != "none") |
| |
| |
| if request.stream: |
| handler = StreamResponseHandler(upstream_req, chat_id, auth_token, has_tools) |
| return StreamingResponse( |
| handler.handle(), |
| media_type="text/event-stream", |
| headers={ |
| "Cache-Control": "no-cache", |
| "Connection": "keep-alive", |
| } |
| ) |
| else: |
| handler = NonStreamResponseHandler(upstream_req, chat_id, auth_token, has_tools) |
| return handler.handle() |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| debug_log(f"处理请求时发生错误: {str(e)}") |
| import traceback |
| debug_log(f"错误堆栈: {traceback.format_exc()}") |
| raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run("main:app", host="0.0.0.0", port=ServerConfig.LISTEN_PORT, reload=True) |