zai / app.py
hongshi-files's picture
Update app.py
eb32949 verified
# -*- coding: utf-8 -*-
"""
Z.ai 2 API - 带可视化监控页面的优化版本
将 Z.ai 代理为 OpenAI Compatible 格式,支持免令牌、智能处理思考链、图片上传(仅登录后)等功能
基于 https://github.com/kbykb/OpenAI-Compatible-API-Proxy-for-Z 重构。
"""
import os, json, re, requests, logging, uuid, base64
from datetime import datetime, timedelta
from flask import Flask, request, Response, jsonify, make_response, render_template_string
from concurrent.futures import ThreadPoolExecutor
import threading
import time
from collections import defaultdict, deque
from dotenv import load_dotenv
load_dotenv()
# 配置
BASE = str(os.getenv("BASE", "https://chat.z.ai"))
PORT = int(os.getenv("PORT", "8080"))
MODEL = str(os.getenv("MODEL", "GLM-4.5"))
TOKEN = str(os.getenv("TOKEN", "")).strip()
DEBUG_MODE = str(os.getenv("DEBUG", "false")).lower() == "true"
THINK_TAGS_MODE = str(os.getenv("THINK_TAGS_MODE", "reasoning"))
ANONYMOUS_MODE = str(os.getenv("ANONYMOUS_MODE", "true")).lower() == "true"
CLEAN_SEARCH_REFS = str(os.getenv("CLEAN_SEARCH_REFS", "true")).lower() == "true" # 添加清理搜索引用的开关
# 性能优化配置
MAX_WORKERS = int(os.getenv("MAX_WORKERS", "10"))
TOKEN_CACHE_TIMEOUT = int(os.getenv("TOKEN_CACHE_TIMEOUT", "300")) # 5分钟
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "60")) # 请求超时时间
# tiktoken 预加载
cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tiktoken') + os.sep
os.environ["TIKTOKEN_CACHE_DIR"] = cache_dir
try:
assert os.path.exists(os.path.join(cache_dir, "9b5ad71b2ce5302211f9c61530b329a4922fc6a4")) # cl100k_base.tiktoken
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
except:
print("警告:tiktoken缓存文件不存在,将在线下载")
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
# 创建会话对象(连接池)
session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(
pool_connections=MAX_WORKERS,
pool_maxsize=MAX_WORKERS,
max_retries=3,
pool_block=False
))
session.mount('http://', requests.adapters.HTTPAdapter(
pool_connections=MAX_WORKERS,
pool_maxsize=MAX_WORKERS,
max_retries=3,
pool_block=False
))
# 设置超时
session.timeout = REQUEST_TIMEOUT
BROWSER_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"X-FE-Version": "prod-fe-1.0.76",
"sec-ch-ua": '"Not;A=Brand";v="99", "Edge";v="139"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"Origin": BASE,
}
# 日志
logging.basicConfig(level=logging.DEBUG if DEBUG_MODE else logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
log = logging.getLogger(__name__)
def debug(msg, *args):
if DEBUG_MODE: log.debug(msg, *args)
# Flask 应用
app = Flask(__name__)
# 线程池
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
# 缓存
token_cache = {"token": None, "timestamp": 0, "lock": threading.Lock()}
model_cache = {"models": None, "timestamp": 0, "lock": threading.Lock()}
# 统计数据
stats = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"model_usage": defaultdict(int),
"hourly_stats": defaultdict(lambda: {"requests": 0, "tokens": 0}),
"response_times": deque(maxlen=100),
"active_connections": 0,
"start_time": datetime.now(),
"last_request_time": None,
"error_types": defaultdict(int),
"search_usage": 0,
"thinking_usage": 0
}
stats_lock = threading.Lock()
phaseBak = "thinking"
# 预编译正则表达式
REASONING_RE = re.compile(r"(?s)<details[^>]*?>.*?</details>")
SUMMARY_RE = re.compile(r'\n*<summary>.*?</summary>\n*')
DETAILS_OPEN_RE = re.compile(r"<details[^>]*>\n*")
DETAILS_CLOSE_RE = re.compile(r"\n*</details>")
REASONING_CONTENT_RE = re.compile(r"(?s)^(.*?</reasoning>)(.*)$")
SUMMARY_MATCH_RE = re.compile(r"(?s)<summary>.*?</summary>")
DURATION_MATCH_RE = re.compile(r'duration="(\d+)"')
NEWLINE_ARROW_RE = re.compile(r'\n>\s?')
# 添加搜索引用清理的正则表达式
SEARCH_REF_RE = re.compile(r'\[ref_id=[^\]]+\]\n?') # 匹配 [ref_id=...] 格式的引用
SEARCH_URL_RE = re.compile(r'†https?://[^\s\n]+') # 清理带†符号的URL
# HTML模板
HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Z.ai API 监控面板</title>
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
padding: 20px;
}
.container {
max-width: 1400px;
margin: 0 auto;
}
.header {
background: rgba(255, 255, 255, 0.95);
border-radius: 15px;
padding: 25px;
margin-bottom: 25px;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.1);
backdrop-filter: blur(10px);
}
.header h1 {
color: #333;
font-size: 2.5em;
margin-bottom: 10px;
background: linear-gradient(45deg, #667eea, #764ba2);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
background-clip: text;
}
.status-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 8px;
animation: pulse 2s infinite;
}
.status-online {
background: #4CAF50;
}
.status-offline {
background: #f44336;
}
@keyframes pulse {
0% { opacity: 1; }
50% { opacity: 0.5; }
100% { opacity: 1; }
}
.stats-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(280px, 1fr));
gap: 20px;
margin-bottom: 25px;
}
.stat-card {
background: rgba(255, 255, 255, 0.95);
border-radius: 15px;
padding: 25px;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.1);
backdrop-filter: blur(10px);
transition: transform 0.3s ease, box-shadow 0.3s ease;
}
.stat-card:hover {
transform: translateY(-5px);
box-shadow: 0 15px 40px rgba(0, 0, 0, 0.15);
}
.stat-card h3 {
color: #666;
font-size: 0.9em;
margin-bottom: 10px;
text-transform: uppercase;
letter-spacing: 1px;
}
.stat-value {
font-size: 2.5em;
font-weight: bold;
color: #333;
margin-bottom: 5px;
}
.stat-change {
font-size: 0.9em;
color: #666;
}
.chart-container {
background: rgba(255, 255, 255, 0.95);
border-radius: 15px;
padding: 25px;
margin-bottom: 25px;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.1);
backdrop-filter: blur(10px);
}
.chart-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(500px, 1fr));
gap: 20px;
margin-bottom: 25px;
}
.chart-box {
background: rgba(255, 255, 255, 0.95);
border-radius: 15px;
padding: 25px;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.1);
backdrop-filter: blur(10px);
}
.info-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 20px;
}
.info-card {
background: rgba(255, 255, 255, 0.95);
border-radius: 15px;
padding: 25px;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.1);
backdrop-filter: blur(10px);
}
.info-card h3 {
color: #333;
margin-bottom: 15px;
font-size: 1.2em;
}
.info-item {
display: flex;
justify-content: space-between;
padding: 8px 0;
border-bottom: 1px solid #eee;
}
.info-item:last-child {
border-bottom: none;
}
.info-label {
color: #666;
font-size: 0.9em;
}
.info-value {
color: #333;
font-weight: 500;
}
.refresh-btn {
background: linear-gradient(45deg, #667eea, #764ba2);
color: white;
border: none;
padding: 12px 24px;
border-radius: 25px;
cursor: pointer;
font-size: 1em;
transition: all 0.3s ease;
margin-top: 15px;
}
.refresh-btn:hover {
transform: translateY(-2px);
box-shadow: 0 5px 15px rgba(102, 126, 234, 0.4);
}
.loading {
display: inline-block;
width: 20px;
height: 20px;
border: 3px solid #f3f3f3;
border-top: 3px solid #667eea;
border-radius: 50%;
animation: spin 1s linear infinite;
margin-left: 10px;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.alert {
background: #ff6b6b;
color: white;
padding: 15px;
border-radius: 10px;
margin-bottom: 20px;
display: none;
}
.alert.success {
background: #4CAF50;
}
.alert.warning {
background: #ff9800;
}
.test-btn {
background: #4CAF50;
color: white;
border: none;
padding: 10px 20px;
border-radius: 20px;
cursor: pointer;
font-size: 0.9em;
margin: 5px;
transition: all 0.3s ease;
}
.test-btn:hover {
background: #45a049;
transform: translateY(-1px);
}
.connection-status {
background: rgba(255, 255, 255, 0.95);
border-radius: 10px;
padding: 15px;
margin-bottom: 20px;
text-align: center;
}
.connection-status.connected {
border-left: 4px solid #4CAF50;
}
.connection-status.disconnected {
border-left: 4px solid #f44336;
}
.test-section {
background: rgba(255, 255, 255, 0.95);
border-radius: 15px;
padding: 20px;
margin-bottom: 20px;
box-shadow: 0 10px 30px rgba(0, 0, 0, 0.1);
}
.test-section h3 {
margin-bottom: 15px;
color: #333;
}
.test-options {
display: flex;
flex-wrap: wrap;
gap: 10px;
margin-bottom: 15px;
}
.model-option {
background: #f0f0f0;
border: none;
padding: 8px 15px;
border-radius: 20px;
cursor: pointer;
font-size: 0.9em;
transition: all 0.3s ease;
}
.model-option:hover {
background: #e0e0e0;
}
.model-option.selected {
background: #667eea;
color: white;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>
<span class="status-indicator status-online"></span>
Z.ai API 监控面板
</h1>
<p style="color: #666; margin-top: 10px;">
实时监控API调用状态、Token使用情况和系统性能
</p>
<div style="margin-top: 15px;">
<button class="test-btn" onclick="testConnection()">测试连接</button>
<button class="refresh-btn" onclick="refreshData()">
刷新数据
<span class="loading" id="loading" style="display: none;"></span>
</button>
</div>
</div>
<div class="connection-status" id="connection-status">
<strong>连接状态:</strong>
<span id="connection-text">检查中...</span>
</div>
<div class="alert" id="alert"></div>
<div class="test-section">
<h3>API测试</h3>
<div class="test-options">
<button class="model-option selected" data-model="{{MODEL}}" onclick="selectModel(this)">基础模型</button>
<button class="model-option" data-model="{{MODEL}}-search" onclick="selectModel(this)">搜索功能</button>
<button class="model-option" data-model="{{MODEL}}-think" onclick="selectModel(this)">思考功能</button>
<button class="model-option" data-model="{{MODEL}}-search-think" onclick="selectModel(this)">搜索+思考</button>
</div>
<button class="test-btn" onclick="testAPI()">测试选中的模型</button>
</div>
<div class="stats-grid">
<div class="stat-card">
<h3>总请求数</h3>
<div class="stat-value" id="total-requests">0</div>
<div class="stat-change">成功率: <span id="success-rate">0%</span></div>
</div>
<div class="stat-card">
<h3>Token使用量</h3>
<div class="stat-value" id="total-tokens">0</div>
<div class="stat-change">提示: <span id="prompt-tokens">0</span> | 完成: <span id="completion-tokens">0</span></div>
</div>
<div class="stat-card">
<h3>活跃连接</h3>
<div class="stat-value" id="active-connections">0</div>
<div class="stat-change">平均响应时间: <span id="avg-response-time">0ms</span></div>
</div>
<div class="stat-card">
<h3>运行时间</h3>
<div class="stat-value" id="uptime">0h</div>
<div class="stat-change">最后请求: <span id="last-request">从未</span></div>
</div>
</div>
<div class="chart-grid">
<div class="chart-box">
<h3 style="margin-bottom: 20px; color: #333;">每小时请求统计</h3>
<canvas id="hourly-chart"></canvas>
</div>
<div class="chart-box">
<h3 style="margin-bottom: 20px; color: #333;">模型使用分布</h3>
<canvas id="model-chart"></canvas>
</div>
</div>
<div class="chart-container">
<h3 style="margin-bottom: 20px; color: #333;">响应时间趋势</h3>
<canvas id="response-time-chart"></canvas>
</div>
<div class="info-grid">
<div class="info-card">
<h3>系统信息</h3>
<div class="info-item">
<span class="info-label">API版本</span>
<span class="info-value">v2.0</span>
</div>
<div class="info-item">
<span class="info-label">基础URL</span>
<span class="info-value" id="base-url">{{BASE}}</span>
</div>
<div class="info-item">
<span class="info-label">默认模型</span>
<span class="info-value" id="default-model">{{MODEL}}</span>
</div>
<div class="info-item">
<span class="info-label">工作线程</span>
<span class="info-value" id="max-workers">{{MAX_WORKERS}}</span>
</div>
<div class="info-item">
<span class="info-label">调试模式</span>
<span class="info-value" id="debug-mode">{{DEBUG_MODE}}</span>
</div>
<div class="info-item">
<span class="info-label">清理搜索引用</span>
<span class="info-value" id="clean-refs">{{CLEAN_SEARCH_REFS}}</span>
</div>
</div>
<div class="info-card">
<h3>功能使用统计</h3>
<div class="info-item">
<span class="info-label">联网搜索</span>
<span class="info-value" id="search-usage">0</span>
</div>
<div class="info-item">
<span class="info-label">深度思考</span>
<span class="info-value" id="thinking-usage">0</span>
</div>
<div class="info-item">
<span class="info-label">匿名模式</span>
<span class="info-value" id="anonymous-mode">{{ANONYMOUS_MODE}}</span>
</div>
<div class="info-item">
<span class="info-label">思考模式</span>
<span class="info-value" id="think-mode">{{THINK_TAGS_MODE}}</span>
</div>
</div>
<div class="info-card">
<h3>错误统计</h3>
<div class="info-item">
<span class="info-label">失败请求</span>
<span class="info-value" id="failed-requests">0</span>
</div>
<div class="info-item">
<span class="info-label">超时错误</span>
<span class="info-value" id="timeout-errors">0</span>
</div>
<div class="info-item">
<span class="info-label">上游错误</span>
<span class="info-value" id="upstream-errors">0</span>
</div>
<div class="info-item">
<span class="info-label">内部错误</span>
<span class="info-value" id="internal-errors">0</span>
</div>
</div>
</div>
</div>
<script>
let charts = {};
let selectedModel = '{{MODEL}}';
function initCharts() {
// 每小时统计图表
const hourlyCtx = document.getElementById('hourly-chart').getContext('2d');
charts.hourly = new Chart(hourlyCtx, {
type: 'bar',
data: {
labels: [],
datasets: [{
label: '请求数',
data: [],
backgroundColor: 'rgba(102, 126, 234, 0.8)',
borderColor: 'rgba(102, 126, 234, 1)',
borderWidth: 1
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
// 模型使用分布图表
const modelCtx = document.getElementById('model-chart').getContext('2d');
charts.model = new Chart(modelCtx, {
type: 'doughnut',
data: {
labels: [],
datasets: [{
data: [],
backgroundColor: [
'rgba(255, 99, 132, 0.8)',
'rgba(54, 162, 235, 0.8)',
'rgba(255, 206, 86, 0.8)',
'rgba(75, 192, 192, 0.8)',
'rgba(153, 102, 255, 0.8)'
]
}]
},
options: {
responsive: true
}
});
// 响应时间趋势图表
const responseTimeCtx = document.getElementById('response-time-chart').getContext('2d');
charts.responseTime = new Chart(responseTimeCtx, {
type: 'line',
data: {
labels: [],
datasets: [{
label: '响应时间 (ms)',
data: [],
borderColor: 'rgba(75, 192, 192, 1)',
backgroundColor: 'rgba(75, 192, 192, 0.2)',
tension: 0.4
}]
},
options: {
responsive: true,
scales: {
y: {
beginAtZero: true
}
}
}
});
}
function selectModel(element) {
// 移除所有选中状态
document.querySelectorAll('.model-option').forEach(el => {
el.classList.remove('selected');
});
// 添加选中状态
element.classList.add('selected');
selectedModel = element.getAttribute('data-model');
}
function updateStats(data) {
console.log('更新统计数据:', data);
// 更新基本统计
document.getElementById('total-requests').textContent = data.total_requests.toLocaleString();
document.getElementById('total-tokens').textContent = data.total_tokens.toLocaleString();
document.getElementById('prompt-tokens').textContent = data.prompt_tokens.toLocaleString();
document.getElementById('completion-tokens').textContent = data.completion_tokens.toLocaleString();
document.getElementById('active-connections').textContent = data.active_connections;
// 计算成功率
const successRate = data.total_requests > 0 ?
((data.successful_requests / data.total_requests) * 100).toFixed(1) : 0;
document.getElementById('success-rate').textContent = successRate + '%';
// 计算平均响应时间
const avgResponseTime = data.response_times.length > 0 ?
(data.response_times.reduce((a, b) => a + b, 0) / data.response_times.length).toFixed(0) : 0;
document.getElementById('avg-response-time').textContent = avgResponseTime + 'ms';
// 更新运行时间
const uptime = Math.floor((Date.now() / 1000 - data.start_time) / 3600);
document.getElementById('uptime').textContent = uptime + 'h';
// 更新最后请求时间
const lastRequest = data.last_request_time ?
new Date(data.last_request_time * 1000).toLocaleString() : '从未';
document.getElementById('last-request').textContent = lastRequest;
// 更新功能使用统计
document.getElementById('search-usage').textContent = data.search_usage;
document.getElementById('thinking-usage').textContent = data.thinking_usage;
// 更新错误统计
document.getElementById('failed-requests').textContent = data.failed_requests;
document.getElementById('timeout-errors').textContent = data.error_types['timeout'] || 0;
document.getElementById('upstream-errors').textContent = data.error_types['upstream_error'] || 0;
document.getElementById('internal-errors').textContent = data.error_types['internal_error'] || 0;
// 更新图表
updateCharts(data);
}
function updateCharts(data) {
// 更新每小时统计图表
const hourlyLabels = Object.keys(data.hourly_stats).sort().slice(-24);
const hourlyData = hourlyLabels.map(hour => data.hourly_stats[hour].requests);
charts.hourly.data.labels = hourlyLabels.map(hour => {
const date = new Date(hour);
return date.getHours() + ':00';
});
charts.hourly.data.datasets[0].data = hourlyData;
charts.hourly.update();
// 更新模型使用分布图表
const modelLabels = Object.keys(data.model_usage);
const modelData = Object.values(data.model_usage);
charts.model.data.labels = modelLabels.length > 0 ? modelLabels : ['暂无数据'];
charts.model.data.datasets[0].data = modelData.length > 0 ? modelData : [1];
charts.model.update();
// 更新响应时间趋势图表
const responseTimeLabels = data.response_times.map((_, index) => index + 1);
charts.responseTime.data.labels = responseTimeLabels;
charts.responseTime.data.datasets[0].data = data.response_times.length > 0 ? data.response_times : [0];
charts.responseTime.update();
}
function testConnection() {
const statusDiv = document.getElementById('connection-status');
const statusText = document.getElementById('connection-text');
statusText.textContent = '测试中...';
statusDiv.className = 'connection-status';
fetch('/api/test-connection')
.then(response => response.json())
.then(data => {
if (data.success) {
statusText.textContent = '连接正常';
statusDiv.className = 'connection-status connected';
showAlert('连接测试成功', 'success');
} else {
statusText.textContent = '连接失败: ' + data.error;
statusDiv.className = 'connection-status disconnected';
showAlert('连接测试失败: ' + data.error, 'error');
}
})
.catch(error => {
statusText.textContent = '连接测试失败: ' + error.message;
statusDiv.className = 'connection-status disconnected';
showAlert('连接测试失败: ' + error.message, 'error');
});
}
function refreshData() {
const loading = document.getElementById('loading');
loading.style.display = 'inline-block';
fetch('/api/stats')
.then(response => response.json())
.then(data => {
console.log('获取到的数据:', data);
updateStats(data);
showAlert('数据刷新成功', 'success');
})
.catch(error => {
console.error('Error:', error);
showAlert('数据刷新失败: ' + error.message, 'error');
})
.finally(() => {
loading.style.display = 'none';
});
}
function testAPI() {
const loading = document.getElementById('loading');
loading.style.display = 'inline-block';
fetch('/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
model: selectedModel,
messages: [
{role: 'user', content: '你好,这是一个测试请求'}
],
stream: false
})
})
.then(response => response.json())
.then(data => {
console.log('测试API响应:', data);
if (data.error) {
showAlert('API测试失败: ' + data.error.message, 'error');
} else {
showAlert('API测试成功!', 'success');
setTimeout(refreshData, 1000); // 1秒后刷新数据
}
})
.catch(error => {
console.error('API测试失败:', error);
showAlert('API测试失败: ' + error.message, 'error');
})
.finally(() => {
loading.style.display = 'none';
});
}
function showAlert(message, type) {
const alert = document.getElementById('alert');
alert.textContent = message;
alert.className = 'alert ' + type;
alert.style.display = 'block';
setTimeout(() => {
alert.style.display = 'none';
}, 3000);
}
// 初始化
document.addEventListener('DOMContentLoaded', function() {
initCharts();
testConnection();
refreshData();
// 自动刷新
setInterval(refreshData, 10000); // 10秒刷新一次
setInterval(testConnection, 30000); // 30秒测试一次连接
});
</script>
</body>
</html>
"""
# 工具函数
class utils:
@staticmethod
class request:
@staticmethod
def chat(data, chat_id):
start_time = time.time()
try:
debug("发送请求到: %s", f"{BASE}/api/chat/completions")
debug("请求数据: %s", json.dumps(data))
debug("请求头: %s", {**BROWSER_HEADERS, "Authorization": f"Bearer {utils.request.token()}", "Referer": f"{BASE}/c/{chat_id}"})
response = session.post(
f"{BASE}/api/chat/completions",
json=data,
headers={**BROWSER_HEADERS, "Authorization": f"Bearer {utils.request.token()}", "Referer": f"{BASE}/c/{chat_id}"},
stream=True,
timeout=REQUEST_TIMEOUT,
verify=False # 如果遇到SSL问题可以暂时禁用
)
debug("响应状态码: %d", response.status_code)
debug("响应头: %s", dict(response.headers))
# 记录响应时间
response_time = int((time.time() - start_time) * 1000)
with stats_lock:
stats["response_times"].append(response_time)
stats["last_request_time"] = datetime.now().timestamp()
return response
except requests.exceptions.ConnectionError as e:
debug("连接错误: %s", e)
with stats_lock:
stats["error_types"]["connection_error"] += 1
raise
except requests.exceptions.Timeout as e:
debug("请求超时: %s", e)
with stats_lock:
stats["error_types"]["timeout"] += 1
raise
except requests.exceptions.RequestException as e:
debug("请求异常: %s", e)
with stats_lock:
stats["error_types"]["request_exception"] += 1
raise
except Exception as e:
debug("未知错误: %s", e)
with stats_lock:
stats["error_types"]["unknown_error"] += 1
raise
@staticmethod
def image(data_url, chat_id):
try:
if ANONYMOUS_MODE or not data_url.startswith("data:"):
return None
header, encoded = data_url.split(",", 1)
mime_type = header.split(";")[0].split(":")[1] if ":" in header else "image/jpeg"
image_data = base64.b64decode(encoded)
filename = str(uuid.uuid4())
debug("上传文件:%s", filename)
response = session.post(
f"{BASE}/api/v1/files/",
files={"file": (filename, image_data, mime_type)},
headers={**BROWSER_HEADERS, "Authorization": f"Bearer {utils.request.token()}", "Referer": f"{BASE}/c/{chat_id}"},
timeout=30,
verify=False
)
if response.status_code == 200:
result = response.json()
return f"{result.get('id')}_{result.get('filename')}"
else:
raise Exception(response.text)
except Exception as e:
debug("图片上传失败: %s", e)
return None
@staticmethod
def id(prefix = "msg") -> str:
return f"{prefix}-{int(datetime.now().timestamp()*1e9)}"
@staticmethod
def token() -> str:
if not ANONYMOUS_MODE:
return TOKEN
# 检查缓存
with token_cache["lock"]:
if (token_cache["token"] and
datetime.now().timestamp() - token_cache["timestamp"] < TOKEN_CACHE_TIMEOUT):
return token_cache["token"]
try:
debug("获取匿名令牌从: %s", f"{BASE}/api/v1/auths/")
r = session.get(f"{BASE}/api/v1/auths/", headers=BROWSER_HEADERS, timeout=8, verify=False)
debug("令牌响应状态码: %d", r.status_code)
if r.status_code == 200:
token = r.json().get("token")
if token:
token_cache["token"] = token
token_cache["timestamp"] = datetime.now().timestamp()
debug("获取匿名令牌成功: %s...", token[:15])
return token
else:
debug("获取匿名令牌失败: %s", r.text)
except Exception as e:
debug("匿名令牌获取异常: %s", e)
return TOKEN
@staticmethod
def response(resp):
resp.headers.update({
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
})
return resp
@staticmethod
class response:
@staticmethod
def clean_search_references(content):
"""
清理搜索结果中的引用标记
去除 [ref_id=...] 格式的引用,只保留实际内容
"""
if not content:
return content
# 去除所有 [ref_id=...] 格式的引用
cleaned = SEARCH_REF_RE.sub('', content)
# 清理带†符号的URL
cleaned = SEARCH_URL_RE.sub('', cleaned)
# 清理多余的空行
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
# 去除开头和结尾的空白
cleaned = cleaned.strip()
return cleaned
@staticmethod
def parse(stream):
for line in stream.iter_lines():
if not line or not line.startswith(b"data: "):
continue
try:
data = json.loads(line[6:].decode("utf-8", "ignore"))
except:
continue
yield data
@staticmethod
def format(data):
data_obj = data.get("data", "")
if not data_obj:
return None
phase = data_obj.get("phase", "other")
content = data_obj.get("delta_content") or data_obj.get("edit_content") or ""
# 处理工具调用类型
if phase == "tool_call":
return None
if not content:
return None
# 检查是否是工具调用完成信号
if "finish_reason" in data_obj and data_obj["finish_reason"] == "tool_calls":
return None
# 清理搜索引用(如果存在且启用了清理功能)
if CLEAN_SEARCH_REFS and '[ref_id=' in content:
debug("检测到搜索引用,正在清理...")
original_length = len(content)
content = utils.response.clean_search_references(content)
debug("清理搜索引用完成,原长度: %d, 新长度: %d", original_length, len(content))
# 如果清理后内容为空,返回 None
if not content:
return None
global phaseBak
before = ""
if phase == "thinking" or (phase == "answer" and "summary>" in content):
# 使用预编译的正则表达式
content = REASONING_RE.sub("", content)
content = content.replace("</thinking>", "").replace("<Full>", "").replace("</Full>", "")
if phase == "thinking":
content = SUMMARY_RE.sub('\n\n', content)
# 以 <reasoning> 为基底
content = DETAILS_OPEN_RE.sub("<reasoning>\n\n", content)
content = DETAILS_CLOSE_RE.sub("\n\n</reasoning>", content)
if phase == "answer":
match = REASONING_CONTENT_RE.match(content)
if match:
before, after = match.groups()
if after.strip():
if phaseBak == "thinking":
stripped_after = after.lstrip('\n')
content = "\n\n</reasoning>\n\n" + stripped_after
elif phaseBak == "answer":
content = ""
else:
content = "\n\n</reasoning>"
# 优化思考标签处理
if THINK_TAGS_MODE == "reasoning":
if phase == "thinking":
content = NEWLINE_ARROW_RE.sub('\n', content)
content = SUMMARY_RE.sub('', content)
content = content.replace("<reasoning>", "").replace("</reasoning>", "")
elif THINK_TAGS_MODE == "think":
if phase == "thinking":
content = NEWLINE_ARROW_RE.sub('\n', content)
content = SUMMARY_RE.sub('', content)
content = content.replace("<reasoning>", "").replace("</reasoning>", "")
elif THINK_TAGS_MODE == "strip":
content = SUMMARY_RE.sub('', content)
content = content.replace("<reasoning>", "").replace("</reasoning>", "")
elif THINK_TAGS_MODE == "details":
if phase == "thinking":
content = NEWLINE_ARROW_RE.sub('\n', content)
content = content.replace("<reasoning>", "<details type=\"reasoning\" open><div>", content)
thoughts = ""
if phase == "answer":
summary_match = SUMMARY_MATCH_RE.search(before)
duration_match = DURATION_MATCH_RE.search(before)
if summary_match:
thoughts = "\n\n" + summary_match.group()
elif duration_match:
thoughts = '\n\n<summary>Thought for ' + duration_match.group(1) + ' seconds</summary>'
content = content.replace("</reasoning>", "</div>" + thoughts + "</details>")
else:
content = content.replace("</reasoning>", "</reasoning>\n\n")
debug("警告:THINK_TAGS_MODE 传入了未知的替换模式,将使用 <reasoning> 标签。")
phaseBak = phase
if phase == "thinking" and THINK_TAGS_MODE == "reasoning":
return {"role": "assistant", "reasoning_content": content}
elif content:
return {"role": "assistant", "content": content}
else:
return None
@staticmethod
def count(text):
if not text:
return 0
return len(enc.encode(text))
# 统计更新函数
def update_stats(model, prompt_tokens, completion_tokens, enable_search, enable_thinking, success=True):
with stats_lock:
stats["total_requests"] += 1
if success:
stats["successful_requests"] += 1
else:
stats["failed_requests"] += 1
stats["total_tokens"] += prompt_tokens + completion_tokens
stats["prompt_tokens"] += prompt_tokens
stats["completion_tokens"] += completion_tokens
stats["model_usage"][model] += 1
if enable_search:
stats["search_usage"] += 1
if enable_thinking:
stats["thinking_usage"] += 1
# 更新每小时统计
current_hour = datetime.now().strftime("%Y-%m-%d %H:00")
stats["hourly_stats"][current_hour]["requests"] += 1
stats["hourly_stats"][current_hour]["tokens"] += prompt_tokens + completion_tokens
# 更新最后请求时间
stats["last_request_time"] = datetime.now().timestamp()
debug("统计更新: 模型=%s, 提示token=%d, 完成token=%d, 搜索=%s, 思考=%s",
model, prompt_tokens, completion_tokens, enable_search, enable_thinking)
# 模型名称处理函数
def parse_model_name(model_name):
"""
解析模型名称,提取基础模型和功能标志
例如: "GLM-4.5-search" -> ("GLM-4.5", {"search": True})
"""
# 默认值
base_model = model_name
enable_search = False
enable_thinking = True
# 检查功能后缀
if "-search" in model_name:
enable_search = True
base_model = model_name.replace("-search", "")
if "-think" in model_name:
enable_thinking = True
base_model = base_model.replace("-think", "")
if "-no-think" in model_name:
enable_thinking = False
base_model = base_model.replace("-no-think", "")
# 处理组合后缀
if "-search-think" in model_name:
enable_search = True
enable_thinking = True
base_model = model_name.replace("-search-think", "")
debug("解析模型名称: %s -> 基础模型=%s, 搜索=%s, 思考=%s",
model_name, base_model, enable_search, enable_thinking)
return base_model, enable_search, enable_thinking
# 路由
@app.route("/")
def index():
return render_template_string(HTML_TEMPLATE,
BASE=BASE,
MODEL=MODEL,
MAX_WORKERS=MAX_WORKERS,
DEBUG_MODE=DEBUG_MODE,
ANONYMOUS_MODE=ANONYMOUS_MODE,
THINK_TAGS_MODE=THINK_TAGS_MODE,
CLEAN_SEARCH_REFS=CLEAN_SEARCH_REFS
)
@app.route("/api/test-connection")
def test_connection():
try:
# 测试基础连接
debug("测试基础连接到: %s", BASE)
r = session.get(BASE, timeout=5, verify=False)
debug("基础连接响应状态码: %d", r.status_code)
# 测试API连接
debug("测试API连接到: %s", f"{BASE}/api/v1/auths/")
r = session.get(f"{BASE}/api/v1/auths/", headers=BROWSER_HEADERS, timeout=5, verify=False)
debug("API连接响应状态码: %d", r.status_code)
if r.status_code == 200:
return jsonify({"success": True, "message": "连接正常"})
else:
return jsonify({"success": False, "error": f"HTTP {r.status_code}: {r.text}"})
except requests.exceptions.ConnectionError as e:
debug("连接测试失败: %s", e)
return jsonify({"success": False, "error": f"连接失败: {str(e)}"})
except requests.exceptions.Timeout as e:
debug("连接测试超时: %s", e)
return jsonify({"success": False, "error": f"连接超时: {str(e)}"})
except Exception as e:
debug("连接测试异常: %s", e)
return jsonify({"success": False, "error": f"未知错误: {str(e)}"})
@app.route("/api/stats")
def get_stats():
with stats_lock:
# 复制统计数据
stats_copy = {
"total_requests": stats["total_requests"],
"successful_requests": stats["successful_requests"],
"failed_requests": stats["failed_requests"],
"total_tokens": stats["total_tokens"],
"prompt_tokens": stats["prompt_tokens"],
"completion_tokens": stats["completion_tokens"],
"model_usage": dict(stats["model_usage"]),
"hourly_stats": dict(stats["hourly_stats"]),
"response_times": list(stats["response_times"]),
"active_connections": stats["active_connections"],
"start_time": stats["start_time"].timestamp(),
"last_request_time": stats["last_request_time"],
"error_types": dict(stats["error_types"]),
"search_usage": stats["search_usage"],
"thinking_usage": stats["thinking_usage"]
}
debug("返回统计数据: %s", stats_copy)
return jsonify(stats_copy)
@app.route("/v1/models", methods=["GET", "POST", "OPTIONS"])
def models():
if request.method == "OPTIONS":
return utils.request.response(make_response())
try:
# 检查缓存
with model_cache["lock"]:
if (model_cache["models"] and
datetime.now().timestamp() - model_cache["timestamp"] < 300): # 5分钟缓存
return utils.request.response(jsonify(model_cache["models"]))
def format_model_name(name: str) -> str:
if not name:
return ""
parts = name.split('-')
if len(parts) == 1:
return parts[0].upper()
formatted = [parts[0].upper()]
for p in parts[1:]:
if not p:
formatted.append("")
elif p.isdigit():
formatted.append(p)
elif any(c.isalpha() for c in p):
formatted.append(p.capitalize())
else:
formatted.append(p)
return "-".join(formatted)
def is_english_letter(ch: str) -> bool:
return 'A' <= ch <= 'Z' or 'a' <= ch <= 'z'
headers = {**BROWSER_HEADERS, "Authorization": f"Bearer {utils.request.token()}"}
debug("获取模型列表从: %s", f"{BASE}/api/models")
r = session.get(f"{BASE}/api/models", headers=headers, timeout=8, verify=False)
debug("模型列表响应状态码: %d", r.status_code)
if r.status_code != 200:
debug("获取模型列表失败: %s", r.text)
return utils.request.response(jsonify({"error":"fetch models failed"})), 500
r = r.json()
models = []
base_models = []
for m in r.get("data", []):
if not m.get("info", {}).get("is_active", True):
continue
model_id, model_name = m.get("id"), m.get("name")
if model_id.startswith(("GLM", "Z")):
model_name = model_id
if not model_name or not is_english_letter(model_name[0]):
model_name = format_model_name(model_id)
base_models.append({
"id": model_id,
"object": "model",
"name": model_name,
"created": m.get("info", {}).get("created_at", int(datetime.now().timestamp())),
"owned_by": "z.ai"
})
# 为每个基础模型创建功能变体
for base_model in base_models:
models.append(base_model)
base_id = base_model["id"]
base_name = base_model["name"]
created = base_model["created"]
# 批量创建变体
variants = [
{"id": f"{base_id}-search", "name": f"{base_name} (Search)", "description": "启用联网搜索功能"},
{"id": f"{base_id}-think", "name": f"{base_name} (Deep Thinking)", "description": "启用深度思考功能"},
{"id": f"{base_id}-no-think", "name": f"{base_name} (No Thinking)", "description": "禁用深度思考功能"},
{"id": f"{base_id}-search-think", "name": f"{base_name} (Search + Deep Thinking)", "description": "启用联网搜索和深度思考功能"}
]
for variant in variants:
models.append({
"id": variant["id"],
"object": "model",
"name": variant["name"],
"created": created,
"owned_by": "z.ai",
"description": variant["description"]
})
result = {"object":"list","data":models}
# 缓存结果
with model_cache["lock"]:
model_cache["models"] = result
model_cache["timestamp"] = datetime.now().timestamp()
return utils.request.response(jsonify(result))
except Exception as e:
debug("模型列表失败: %s", e)
return utils.request.response(jsonify({"error":"fetch models failed"})), 500
@app.route("/v1/chat/completions", methods=["GET", "POST", "OPTIONS"])
def OpenAI_Compatible():
if request.method == "OPTIONS":
return utils.request.response(make_response())
start_time = time.time()
success = False
prompt_tokens = 0
completion_tokens = 0
enable_search = False
enable_thinking = True
try:
with stats_lock:
stats["active_connections"] += 1
odata = request.get_json(force=True, silent=True) or {}
debug("收到请求: %s", json.dumps(odata))
id = utils.request.id("chat")
model = odata.get("model", MODEL)
messages = odata.get("messages", [])
features = odata.get("features", { "enable_thinking": True })
stream = odata.get("stream", False)
include_usage = stream and odata.get("stream_options", {}).get("include_usage", False)
# 解析模型名称,提取基础模型和功能标志
base_model, enable_search, enable_thinking = parse_model_name(model)
# 更新features参数
features = {
"enable_search": enable_search,
"enable_thinking": enable_thinking,
"web_search": enable_search,
"auto_web_search": enable_search,
"preview_mode": True,
"flags": []
}
# 处理图片上传(异步)
def process_images():
for message in messages:
if isinstance(message.get("content"), list):
for content_item in message["content"]:
if content_item.get("type") == "image_url":
url = content_item.get("image_url", {}).get("url", "")
if url.startswith("data:"):
file_url = utils.request.image(url, id)
if file_url:
content_item["image_url"]["url"] = file_url
# 如果有图片,异步处理
if any(isinstance(msg.get("content"), list) and
any(item.get("type") == "image_url" for item in msg.get("content", []))
for msg in messages):
executor.submit(process_images)
else:
process_images()
# 构建请求数据,使用基础模型名称
data = {
**odata,
"stream": True,
"chat_id": id,
"id": utils.request.id(),
"model": base_model, # 使用基础模型名称
"messages": messages,
"features": features
}
debug("准备发送请求到上游,基础模型: %s, 功能: 搜索=%s, 思考=%s",
base_model, enable_search, enable_thinking)
try:
response = utils.request.chat(data, id)
if response.status_code != 200:
debug("上游响应错误: %s", response.text)
with stats_lock:
stats["error_types"]["upstream_error"] += 1
return utils.request.response(jsonify({
"error": {
"message": f"上游服务器错误: {response.status_code}",
"type": "upstream_error",
"code": "upstream_error"
}
})), response.status_code
if not response.content:
debug("上游响应为空")
return utils.request.response(jsonify({
"error": {
"message": "上游服务器返回空响应",
"type": "empty_response",
"code": "empty_response"
}
})), 500
except requests.exceptions.ConnectionError as e:
debug("连接错误: %s", e)
with stats_lock:
stats["error_types"]["connection_error"] += 1
return utils.request.response(jsonify({
"error": {
"message": f"连接错误: {str(e)}",
"type": "connection_error",
"code": "connection_error"
}
})), 500
except requests.exceptions.Timeout:
debug("请求超时")
return utils.request.response(jsonify({
"error": {
"message": "请求超时",
"type": "timeout",
"code": "timeout"
}
})), 500
except requests.exceptions.RequestException as e:
debug("请求异常: %s", e)
return utils.request.response(jsonify({
"error": {
"message": f"请求异常: {str(e)}",
"type": "request_exception",
"code": "request_exception"
}
})), 500
# 优化token计算
prompt_text = "".join(
c if isinstance(c, str) else (c.get("text", "") if isinstance(c, dict) and c.get("type") == "text" else "")
for m in messages
for c in ([m["content"]] if isinstance(m.get("content"), str) else (m.get("content") or []))
)
prompt_tokens = utils.response.count(prompt_text)
if stream:
def stream():
nonlocal completion_tokens, success
completion_str = ""
tool_call_detected = False
try:
for data in utils.response.parse(response):
raw_data = data.get("data", {})
is_done = raw_data.get("done", False)
phase = raw_data.get("phase", "other")
# 快速跳过工具调用
if phase == "tool_call":
tool_call_detected = True
continue
if "finish_reason" in raw_data and raw_data["finish_reason"] == "tool_calls":
continue
delta = utils.response.format(data)
if delta:
yield "data: " + json.dumps({
"id": utils.request.id('chatcmpl'),
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model, # 返回原始请求的模型名称
"choices": [{"index": 0, "delta": delta, "message": delta, "finish_reason": "stop" if is_done else None}]
}) + "\n\n"
# 累积内容
if "content" in delta:
completion_str += delta["content"]
if "reasoning_content" in delta:
completion_str += delta["reasoning_content"]
if is_done:
yield "data: " + json.dumps({
'id': utils.request.id('chatcmpl'),
'object': 'chat.completion.chunk',
'created': int(datetime.now().timestamp()),
'model': model, # 返回原始请求的模型名称
'choices': [{'index': 0, 'delta': {"role": "assistant"}, 'message': {"role": "assistant"}, 'finish_reason': "stop"}]
}) + "\n\n"
break
# 发送usage
if include_usage and completion_str:
completion_tokens = utils.response.count(completion_str)
update_stats(model, prompt_tokens, completion_tokens, enable_search, enable_thinking, True)
success = True
yield "data: " + json.dumps({
"id": utils.request.id('chatcmpl'),
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model, # 返回原始请求的模型名称
"choices": [],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
}) + "\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
debug("流式响应处理异常: %s", e)
yield "data: " + json.dumps({
"error": {
"message": f"流式响应处理异常: {str(e)}",
"type": "stream_error",
"code": "stream_error"
}
}) + "\n\n"
return Response(stream(), mimetype="text/event-stream")
else:
# 非流式处理
contents = {"content": [], "reasoning_content": []}
for odata in utils.response.parse(response):
if odata.get("data", {}).get("done"):
break
delta = utils.response.format(odata)
if delta:
if "content" in delta:
contents["content"].append(delta["content"])
if "reasoning_content" in delta:
contents["reasoning_content"].append(delta["reasoning_content"])
final_message = {"role": "assistant"}
completion_str = ""
if contents["reasoning_content"]:
final_message["reasoning_content"] = "".join(contents["reasoning_content"])
completion_str += "".join(contents["reasoning_content"])
if contents["content"]:
final_message["content"] = "".join(contents["content"])
completion_str += "".join(contents["content"])
completion_tokens = utils.response.count(completion_str)
update_stats(model, prompt_tokens, completion_tokens, enable_search, enable_thinking, True)
success = True
return utils.request.response(jsonify({
"id": utils.request.id("chatcmpl"),
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": model, # 返回原始请求的模型名称
"choices": [{
"index": 0,
"delta": final_message,
"message": final_message,
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": prompt_tokens + completion_tokens
}
}))
except Exception as e:
debug("处理请求时出错: %s", e)
with stats_lock:
stats["error_types"]["internal_error"] += 1
return utils.request.response(jsonify({
"error": {
"message": str(e),
"type": "internal_error",
"code": "internal_error"
}
})), 500
finally:
with stats_lock:
stats["active_connections"] -= 1
# 更新统计(即使失败也要记录)
if not success:
update_stats(model, prompt_tokens, completion_tokens, enable_search, enable_thinking, False)
# 健康检查
@app.route("/health", methods=["GET"])
def health():
return jsonify({
"status": "ok",
"timestamp": datetime.now().timestamp(),
"uptime": datetime.now().timestamp() - stats["start_time"].timestamp()
})
# 主入口
if __name__ == "__main__":
log.info("---------------------------------------------------------------------")
log.info("Z.ai 2 API - 带可视化监控页面的优化版本")
log.info("将 Z.ai 代理为 OpenAI Compatible 格式")
log.info("基于 https://github.com/kbykb/OpenAI-Compatible-API-Proxy-for-Z 重构")
log.info("---------------------------------------------------------------------")
log.info("服务端口:%s", PORT)
log.info("上游地址:%s", BASE)
log.info("备选模型:%s", MODEL)
log.info("思考处理:%s", THINK_TAGS_MODE)
log.info("访客模式:%s", ANONYMOUS_MODE)
log.info("清理搜索引用:%s", CLEAN_SEARCH_REFS)
log.info("显示调试:%s", DEBUG_MODE)
log.info("最大工作线程:%s", MAX_WORKERS)
log.info("请求超时:%s秒", REQUEST_TIMEOUT)
log.info("监控面板:http://localhost:%s", PORT)
# 使用生产环境服务器
from werkzeug.serving import WSGIRequestHandler
WSGIRequestHandler.protocol_version = "HTTP/1.1"
app.run(host="0.0.0.0", port=PORT, threaded=True, debug=DEBUG_MODE)