|
|
|
|
|
|
|
|
|
|
|
import json |
|
|
import time |
|
|
import uuid |
|
|
import random |
|
|
import hashlib |
|
|
import hmac |
|
|
import urllib.parse |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Any, Optional, Generator, AsyncGenerator |
|
|
import httpx |
|
|
import asyncio |
|
|
from fake_useragent import UserAgent |
|
|
|
|
|
from app.core.config import settings |
|
|
from app.utils.helpers import debug_log |
|
|
from app.core.token_manager import token_manager |
|
|
|
|
|
|
|
|
_user_agent_instance = None |
|
|
|
|
|
|
|
|
def get_user_agent_instance() -> UserAgent: |
|
|
"""获取或创建 UserAgent 实例(单例模式)""" |
|
|
global _user_agent_instance |
|
|
if _user_agent_instance is None: |
|
|
_user_agent_instance = UserAgent() |
|
|
return _user_agent_instance |
|
|
|
|
|
|
|
|
def get_dynamic_headers(chat_id: str = "", user_agent: str = "") -> Dict[str, str]: |
|
|
"""生成动态浏览器headers,包含随机User-Agent""" |
|
|
if not user_agent: |
|
|
ua = get_user_agent_instance() |
|
|
|
|
|
browser_choices = ["chrome", "chrome", "chrome", "edge", "edge", "firefox", "safari"] |
|
|
browser_type = random.choice(browser_choices) |
|
|
|
|
|
try: |
|
|
if browser_type == "chrome": |
|
|
user_agent = ua.chrome |
|
|
elif browser_type == "edge": |
|
|
user_agent = ua.edge |
|
|
elif browser_type == "firefox": |
|
|
user_agent = ua.firefox |
|
|
elif browser_type == "safari": |
|
|
user_agent = ua.safari |
|
|
else: |
|
|
user_agent = ua.random |
|
|
except: |
|
|
user_agent = ua.random |
|
|
|
|
|
|
|
|
chrome_version = "140" |
|
|
edge_version = "140" |
|
|
|
|
|
if "Chrome/" in user_agent: |
|
|
try: |
|
|
chrome_version = user_agent.split("Chrome/")[1].split(".")[0] |
|
|
except: |
|
|
pass |
|
|
|
|
|
if "Edg/" in user_agent: |
|
|
try: |
|
|
edge_version = user_agent.split("Edg/")[1].split(".")[0] |
|
|
sec_ch_ua = f'"Microsoft Edge";v="{edge_version}", "Chromium";v="{chrome_version}", "Not=A?Brand";v="24"' |
|
|
except: |
|
|
sec_ch_ua = f'"Chromium";v="{chrome_version}", "Not=A?Brand";v="24", "Microsoft Edge";v="{edge_version}"' |
|
|
elif "Firefox/" in user_agent: |
|
|
sec_ch_ua = None |
|
|
else: |
|
|
sec_ch_ua = f'"Chromium";v="{chrome_version}", "Not=A?Brand";v="24", "Google Chrome";v="{chrome_version}"' |
|
|
|
|
|
headers = { |
|
|
"Accept": "*/*", |
|
|
"Accept-Encoding": "gzip, deflate, br, zstd", |
|
|
"Accept-Language": "zh-CN", |
|
|
"Content-Type": "application/json", |
|
|
"User-Agent": user_agent, |
|
|
"X-Fe-Version": "prod-fe-1.0.83", |
|
|
"Origin": "https://chat.z.ai", |
|
|
"Connection": "keep-alive", |
|
|
"Sec-Fetch-Dest": "empty", |
|
|
"Sec-Fetch-Mode": "cors", |
|
|
"Sec-Fetch-Site": "same-origin", |
|
|
} |
|
|
|
|
|
if sec_ch_ua: |
|
|
headers["Sec-Ch-Ua"] = sec_ch_ua |
|
|
headers["Sec-Ch-Ua-Mobile"] = "?0" |
|
|
headers["Sec-Ch-Ua-Platform"] = '"Windows"' |
|
|
|
|
|
if chat_id: |
|
|
headers["Referer"] = f"https://chat.z.ai/c/{chat_id}" |
|
|
else: |
|
|
headers["Referer"] = "https://chat.z.ai/" |
|
|
|
|
|
return headers |
|
|
|
|
|
|
|
|
def generate_uuid() -> str: |
|
|
"""生成UUID v4""" |
|
|
return str(uuid.uuid4()) |
|
|
|
|
|
|
|
|
def generate_signature(data: str, timestamp: str, secret_key: str = "") -> str: |
|
|
"""生成请求签名 |
|
|
|
|
|
Args: |
|
|
data: 请求数据 |
|
|
timestamp: 时间戳 |
|
|
secret_key: 密钥(使用配置中的值) |
|
|
|
|
|
Returns: |
|
|
签名字符串 |
|
|
""" |
|
|
if not settings.ENABLE_SIGNATURE: |
|
|
return "" |
|
|
|
|
|
if not secret_key: |
|
|
secret_key = settings.SIGNATURE_SECRET_KEY |
|
|
|
|
|
|
|
|
sign_string = f"{data}{timestamp}{secret_key}" |
|
|
|
|
|
|
|
|
if settings.SIGNATURE_ALGORITHM.lower() == "md5": |
|
|
signature = hashlib.md5(sign_string.encode('utf-8')).hexdigest() |
|
|
elif settings.SIGNATURE_ALGORITHM.lower() == "sha1": |
|
|
signature = hashlib.sha1(sign_string.encode('utf-8')).hexdigest() |
|
|
else: |
|
|
signature = hashlib.sha256(sign_string.encode('utf-8')).hexdigest() |
|
|
|
|
|
return signature |
|
|
|
|
|
|
|
|
def build_query_params( |
|
|
timestamp: int, |
|
|
request_id: str, |
|
|
token: str, |
|
|
user_agent: str, |
|
|
chat_id: str = "" |
|
|
) -> Dict[str, str]: |
|
|
"""构建查询参数,模拟真实的浏览器请求 |
|
|
|
|
|
Args: |
|
|
timestamp: 时间戳(毫秒) |
|
|
request_id: 请求ID |
|
|
token: 用户token |
|
|
user_agent: 用户代理字符串 |
|
|
chat_id: 聊天ID |
|
|
|
|
|
Returns: |
|
|
查询参数字典 |
|
|
""" |
|
|
|
|
|
user_id = "guest-user-" + str(abs(hash(token)) % 1000000) |
|
|
|
|
|
|
|
|
encoded_user_agent = urllib.parse.quote_plus(user_agent) |
|
|
|
|
|
|
|
|
current_time = datetime.now() |
|
|
local_time = current_time.isoformat() + "Z" |
|
|
utc_time = current_time.strftime("%a, %d %b %Y %H:%M:%S GMT") |
|
|
|
|
|
|
|
|
current_url = f"https://chat.z.ai/c/{chat_id}" if chat_id else "https://chat.z.ai/" |
|
|
pathname = f"/c/{chat_id}" if chat_id else "/" |
|
|
|
|
|
query_params = { |
|
|
"timestamp": str(timestamp), |
|
|
"requestId": request_id, |
|
|
"version": "0.0.1", |
|
|
"platform": "web", |
|
|
"user_id": user_id, |
|
|
"token": token, |
|
|
"user_agent": encoded_user_agent, |
|
|
"language": "zh-CN", |
|
|
"languages": "zh-CN,en,en-GB,en-US", |
|
|
"timezone": "Asia/Shanghai", |
|
|
"cookie_enabled": "true", |
|
|
"screen_width": "1536", |
|
|
"screen_height": "864", |
|
|
"screen_resolution": "1536x864", |
|
|
"viewport_height": "331", |
|
|
"viewport_width": "1528", |
|
|
"viewport_size": "1528x331", |
|
|
"color_depth": "24", |
|
|
"pixel_ratio": "1.25", |
|
|
"current_url": urllib.parse.quote_plus(current_url), |
|
|
"pathname": pathname, |
|
|
"search": "", |
|
|
"hash": "", |
|
|
"host": "chat.z.ai", |
|
|
"hostname": "chat.z.ai", |
|
|
"protocol": "https:", |
|
|
"referrer": "", |
|
|
"title": "Chat with Z.ai - Free AI Chatbot powered by GLM-4.5", |
|
|
"timezone_offset": "-480", |
|
|
"local_time": local_time, |
|
|
"utc_time": utc_time, |
|
|
"is_mobile": "false", |
|
|
"is_touch": "false", |
|
|
"max_touch_points": "10", |
|
|
"browser_name": "Chrome", |
|
|
"os_name": "Windows", |
|
|
|
|
|
} |
|
|
|
|
|
return query_params |
|
|
|
|
|
|
|
|
def get_auth_token_sync() -> str: |
|
|
"""同步获取认证令牌(用于非异步场景)""" |
|
|
if settings.ANONYMOUS_MODE: |
|
|
try: |
|
|
headers = get_dynamic_headers() |
|
|
with httpx.Client() as client: |
|
|
response = client.get("https://chat.z.ai/api/v1/auths/", headers=headers, timeout=10.0) |
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
token = data.get("token", "") |
|
|
if token: |
|
|
debug_log(f"获取访客令牌成功: {token[:20]}...") |
|
|
return token |
|
|
except Exception as e: |
|
|
debug_log(f"获取访客令牌失败: {e}") |
|
|
|
|
|
|
|
|
token = token_manager.get_next_token() |
|
|
if token: |
|
|
debug_log(f"从token池获取令牌: {token[:20]}...") |
|
|
return token |
|
|
|
|
|
|
|
|
debug_log("⚠️ 没有可用的备份token") |
|
|
return "" |
|
|
|
|
|
|
|
|
class ZAITransformer: |
|
|
"""ZAI转换器类""" |
|
|
|
|
|
def __init__(self): |
|
|
"""初始化转换器""" |
|
|
self.name = "zai" |
|
|
self.base_url = "https://chat.z.ai" |
|
|
self.api_url = settings.API_ENDPOINT |
|
|
self.auth_url = f"{self.base_url}/api/v1/auths/" |
|
|
|
|
|
|
|
|
self.model_mapping = { |
|
|
settings.PRIMARY_MODEL: "0727-360B-API", |
|
|
settings.THINKING_MODEL: "0727-360B-API", |
|
|
settings.SEARCH_MODEL: "0727-360B-API", |
|
|
settings.AIR_MODEL: "0727-106B-API", |
|
|
settings.GLM_46_MODEL: "GLM-4-6-API-V1", |
|
|
settings.GLM_46_THINKING_MODEL: "GLM-4-6-API-V1", |
|
|
} |
|
|
|
|
|
async def get_token(self) -> str: |
|
|
"""异步获取认证令牌""" |
|
|
if settings.ANONYMOUS_MODE: |
|
|
try: |
|
|
headers = get_dynamic_headers() |
|
|
async with httpx.AsyncClient() as client: |
|
|
response = await client.get(self.auth_url, headers=headers, timeout=10.0) |
|
|
if response.status_code == 200: |
|
|
data = response.json() |
|
|
token = data.get("token", "") |
|
|
if token: |
|
|
debug_log(f"获取访客令牌成功: {token[:20]}...") |
|
|
return token |
|
|
except Exception as e: |
|
|
debug_log(f"异步获取访客令牌失败: {e}") |
|
|
|
|
|
|
|
|
token = token_manager.get_next_token() |
|
|
if token: |
|
|
debug_log(f"从token池获取令牌: {token[:20]}...") |
|
|
return token |
|
|
|
|
|
|
|
|
debug_log("⚠️ 没有可用的备份token") |
|
|
return "" |
|
|
|
|
|
def mark_token_success(self, token: str): |
|
|
"""标记token使用成功""" |
|
|
token_manager.mark_token_success(token) |
|
|
|
|
|
def mark_token_failure(self, token: str, error: Exception = None): |
|
|
"""标记token使用失败""" |
|
|
token_manager.mark_token_failed(token) |
|
|
|
|
|
async def transform_request_in(self, request: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
转换OpenAI请求为z.ai格式 |
|
|
整合现有功能:模型映射、MCP服务器等 |
|
|
""" |
|
|
debug_log(f"🔄 开始转换 OpenAI 请求到 Z.AI 格式: {request.get('model', settings.PRIMARY_MODEL)} -> Z.AI") |
|
|
|
|
|
|
|
|
token = await self.get_token() |
|
|
debug_log(f" 使用令牌: {token[:20] if token else 'None'}...") |
|
|
|
|
|
|
|
|
if not token: |
|
|
debug_log("❌ 无法获取有效的认证令牌") |
|
|
raise Exception("无法获取有效的认证令牌,请检查匿名模式配置或token池配置") |
|
|
|
|
|
|
|
|
requested_model = request.get("model", settings.PRIMARY_MODEL) |
|
|
is_thinking = (requested_model == settings.THINKING_MODEL or |
|
|
requested_model == settings.GLM_46_THINKING_MODEL or |
|
|
request.get("reasoning", False)) |
|
|
is_search = requested_model == settings.SEARCH_MODEL |
|
|
is_air = requested_model == settings.AIR_MODEL |
|
|
|
|
|
|
|
|
upstream_model_id = self.model_mapping.get(requested_model, "0727-360B-API") |
|
|
debug_log(f" 模型映射: {requested_model} -> {upstream_model_id}") |
|
|
debug_log(f" 模型特性检测: is_search={is_search}, is_thinking={is_thinking}, is_air={is_air}") |
|
|
|
|
|
|
|
|
debug_log(f" 开始处理 {len(request.get('messages', []))} 条消息") |
|
|
messages = [] |
|
|
for idx, orig_msg in enumerate(request.get("messages", [])): |
|
|
msg = orig_msg.copy() |
|
|
|
|
|
|
|
|
if msg.get("role") == "system": |
|
|
msg["role"] = "user" |
|
|
content = msg.get("content") |
|
|
|
|
|
if isinstance(content, list): |
|
|
msg["content"] = [ |
|
|
{"type": "text", "text": "This is a system command, you must enforce compliance."} |
|
|
] + content |
|
|
elif isinstance(content, str): |
|
|
msg["content"] = f"This is a system command, you must enforce compliance.{content}" |
|
|
|
|
|
|
|
|
elif msg.get("role") == "user": |
|
|
content = msg.get("content") |
|
|
if isinstance(content, list): |
|
|
new_content = [] |
|
|
for part_idx, part in enumerate(content): |
|
|
|
|
|
if ( |
|
|
part.get("type") == "image_url" |
|
|
and part.get("image_url", {}).get("url") |
|
|
and isinstance(part["image_url"]["url"], str) |
|
|
): |
|
|
debug_log(f" 消息[{idx}]内容[{part_idx}]: 检测到图片URL") |
|
|
|
|
|
new_content.append(part) |
|
|
else: |
|
|
new_content.append(part) |
|
|
msg["content"] = new_content |
|
|
|
|
|
|
|
|
elif msg.get("role") == "assistant" and msg.get("reasoning_content"): |
|
|
|
|
|
pass |
|
|
|
|
|
messages.append(msg) |
|
|
|
|
|
|
|
|
mcp_servers = [] |
|
|
if is_search: |
|
|
mcp_servers.append("deep-web-search") |
|
|
debug_log(f"🔍 检测到搜索模型,添加 deep-web-search MCP 服务器") |
|
|
|
|
|
debug_log(f" MCP服务器列表: {mcp_servers}") |
|
|
|
|
|
|
|
|
chat_id = generate_uuid() |
|
|
|
|
|
body = { |
|
|
"stream": True, |
|
|
"model": upstream_model_id, |
|
|
"messages": messages, |
|
|
"params": {}, |
|
|
"features": { |
|
|
"image_generation": False, |
|
|
"web_search": is_search, |
|
|
"auto_web_search": is_search, |
|
|
"preview_mode": False, |
|
|
"flags": [], |
|
|
"features": [], |
|
|
"enable_thinking": is_thinking, |
|
|
}, |
|
|
"background_tasks": { |
|
|
"title_generation": False, |
|
|
"tags_generation": False, |
|
|
}, |
|
|
"mcp_servers": mcp_servers, |
|
|
"variables": { |
|
|
"{{USER_NAME}}": "Guest", |
|
|
"{{USER_LOCATION}}": "Unknown", |
|
|
"{{CURRENT_DATETIME}}": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"{{CURRENT_DATE}}": datetime.now().strftime("%Y-%m-%d"), |
|
|
"{{CURRENT_TIME}}": datetime.now().strftime("%H:%M:%S"), |
|
|
"{{CURRENT_WEEKDAY}}": datetime.now().strftime("%A"), |
|
|
"{{CURRENT_TIMEZONE}}": "Asia/Shanghai", |
|
|
"{{USER_LANGUAGE}}": "zh-CN", |
|
|
}, |
|
|
"model_item": { |
|
|
"id": upstream_model_id, |
|
|
"name": requested_model, |
|
|
"owned_by": "z.ai" |
|
|
}, |
|
|
"chat_id": chat_id, |
|
|
"id": generate_uuid(), |
|
|
} |
|
|
|
|
|
|
|
|
if settings.TOOL_SUPPORT and not is_thinking and request.get("tools"): |
|
|
body["tools"] = request["tools"] |
|
|
debug_log(f"启用工具支持: {len(request['tools'])} 个工具") |
|
|
else: |
|
|
body["tools"] = None |
|
|
|
|
|
|
|
|
timestamp = int(time.time() * 1000) |
|
|
request_id = generate_uuid() |
|
|
|
|
|
|
|
|
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0" |
|
|
dynamic_headers = get_dynamic_headers(chat_id, user_agent) |
|
|
|
|
|
|
|
|
query_params = build_query_params(timestamp, request_id, token, user_agent, chat_id) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
url_with_params = f"{self.api_url}?" + "&".join([f"{k}={v}" for k, v in query_params.items()]) |
|
|
|
|
|
headers = { |
|
|
**dynamic_headers, |
|
|
"Authorization": f"Bearer {token}", |
|
|
"Cache-Control": "no-cache", |
|
|
"Pragma": "no-cache", |
|
|
} |
|
|
|
|
|
|
|
|
debug_log(" 🔓 签名验证已禁用") |
|
|
|
|
|
config = { |
|
|
"url": url_with_params, |
|
|
"headers": headers, |
|
|
} |
|
|
|
|
|
debug_log("✅ 请求转换完成") |
|
|
|
|
|
|
|
|
debug_log(f" 📋 发送到Z.AI的关键信息:") |
|
|
debug_log(f" - 上游模型: {body['model']}") |
|
|
debug_log(f" - MCP服务器: {body['mcp_servers']}") |
|
|
debug_log(f" - web_search: {body['features']['web_search']}") |
|
|
debug_log(f" - auto_web_search: {body['features']['auto_web_search']}") |
|
|
debug_log(f" - 消息数量: {len(body['messages'])}") |
|
|
tools_count = len(body.get('tools') or []) |
|
|
debug_log(f" - 工具数量: {tools_count}") |
|
|
|
|
|
|
|
|
return { |
|
|
"body": body, |
|
|
"config": config, |
|
|
"token": token |
|
|
} |
|
|
|