bluewinliang commited on
Commit
40459ae
·
verified ·
1 Parent(s): d47fbbf

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +321 -103
proxy_handler.py CHANGED
@@ -1,12 +1,13 @@
1
  """
2
- Proxy handler for Z.AI API requests (OpenAI-compatible)
3
  """
 
4
  import json
5
  import logging
 
6
  import time
7
  import uuid
8
  from typing import AsyncGenerator, Dict, Any, Optional
9
-
10
  import httpx
11
  from fastapi import HTTPException
12
  from fastapi.responses import StreamingResponse
@@ -24,7 +25,6 @@ logger = logging.getLogger(__name__)
24
 
25
  class ProxyHandler:
26
  def __init__(self):
27
- # Z.AI 端連線逾時 60 秒
28
  self.client = httpx.AsyncClient(timeout=60.0)
29
 
30
  async def __aenter__(self):
@@ -33,137 +33,355 @@ class ProxyHandler:
33
  async def __aexit__(self, exc_type, exc_val, exc_tb):
34
  await self.client.aclose()
35
 
36
- # --------- 文字前處理 ---------
37
  def transform_content(self, content: str) -> str:
38
- """
39
- 依照專案設定將 Z.AI 傳回的 HTML / THINK TAG 等轉成純文字
40
- """
41
  if not content:
42
  return content
43
- # 例:過濾 <br/> 與 <think>
44
- content = content.replace("<br/>", "\n")
 
 
45
  if not settings.SHOW_THINK_TAGS:
46
- content = content.replace("<think>", "").replace("</think>", "")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  return content.strip()
48
 
49
- # --------- 主要進入點 ---------
50
- async def proxy_request(self, request: ChatCompletionRequest):
51
- """
52
- OpenAI API 相容的 proxy 入口
53
- """
54
  cookie = await cookie_manager.get_next_cookie()
55
  if not cookie:
56
  raise HTTPException(status_code=503, detail="No available cookies")
57
 
58
- # 若對外聲稱的 model 與內部實際 model 不同,在此轉換
59
  target_model = (
60
  settings.UPSTREAM_MODEL
61
  if request.model == settings.MODEL_NAME
62
  else request.model
63
  )
64
 
65
- # 決定是否串流
66
- is_streaming: bool = (
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
68
  )
69
 
70
- # 向 Z.AI 串流或一次性取資料
71
  if is_streaming:
72
- # 建立 SSE StreamingResponse
73
  return StreamingResponse(
74
- self.stream_response(request, target_model, cookie),
75
  media_type="text/event-stream",
76
  headers={
77
  "Cache-Control": "no-cache",
78
  "Connection": "keep-alive",
79
- # 若有 nginx 建議加上 X-Accel-Buffering: no
80
  },
81
  )
82
  else:
83
- # 非串流:拿到完整內容後包成 ChatCompletionResponse
84
- content = await self.get_full_response(request, target_model, cookie)
85
- return ChatCompletionResponse(
86
- id=f"chatcmpl-{uuid.uuid4()}",
87
- created=int(time.time()),
88
- model=target_model,
89
- choices=[
90
- {
91
- "index": 0,
92
- "message": {"role": "assistant", "content": content},
93
- "finish_reason": "stop",
94
- }
95
- ],
96
- )
97
 
98
- # --------- 非串流邏輯 ---------
99
- async def get_full_response(
100
- self,
101
- request: ChatCompletionRequest,
102
- target_model: str,
103
- cookie: str,
104
- ) -> str:
105
- """
106
- 向 Z.AI 取完整回覆並回傳轉換後文字
107
- """
108
- resp = await self.client.post(
109
- settings.ZAI_ENDPOINT,
110
- headers={"Cookie": cookie},
111
- json=request.model_dump(exclude_none=True),
112
- )
113
- resp.raise_for_status()
114
- data = resp.json()
115
- return self.transform_content(data["choices"][0]["message"]["content"])
116
 
117
- # --------- 串流邏輯 ---------
118
- async def stream_response(
119
- self,
120
- request: ChatCompletionRequest,
121
- target_model: str,
122
- cookie: str,
123
- ) -> AsyncGenerator[str, None]:
124
- """
125
- 將 Z.AI 串流資料即時轉成 OpenAI SSE 片段
126
- """
127
- # 呼叫 Z.AI 串流端點(假設支援 HTTP chunk)
128
- async with self.client.stream(
129
- "POST",
130
- settings.ZAI_STREAM_ENDPOINT,
131
- headers={"Cookie": cookie},
132
- json=request.model_dump(exclude_none=True),
133
- ) as resp:
134
- resp.raise_for_status()
135
- async for line in resp.aiter_lines():
136
- if not line:
137
- continue
138
- # Z.AI 每行可能已是 json;自行視格式解析
139
- try:
140
- raw = json.loads(line)
141
- except json.JSONDecodeError:
142
- logger.debug("skip non-json line from Z.AI: %s", line)
143
- continue
144
 
145
- # 取得文字增量
146
- delta_text = self.transform_content(raw.get("delta", ""))
147
- if delta_text == "":
148
- continue
149
 
150
- # 組成 OpenAI stream chunk
151
- chunk: Dict[str, Any] = {
152
- "id": raw.get("id", f"chatcmpl-{uuid.uuid4()}"),
153
- "object": "chat.completion.chunk",
154
- "created": int(time.time()),
155
- "model": target_model,
156
- "choices": [
157
- {
158
- "index": 0,
159
- "delta": {"content": delta_text},
160
- "finish_reason": None,
161
- }
162
- ],
163
- }
 
 
 
 
164
 
165
- # 送出 SSE formatted line
166
- yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
167
 
168
- # Z.AI 結束後送出 [DONE]
169
- yield "data: [DONE]\n\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """
2
+ Proxy handler for Z.AI API requests
3
  """
4
+
5
  import json
6
  import logging
7
+ import re
8
  import time
9
  import uuid
10
  from typing import AsyncGenerator, Dict, Any, Optional
 
11
  import httpx
12
  from fastapi import HTTPException
13
  from fastapi.responses import StreamingResponse
 
25
 
26
  class ProxyHandler:
27
  def __init__(self):
 
28
  self.client = httpx.AsyncClient(timeout=60.0)
29
 
30
  async def __aenter__(self):
 
33
  async def __aexit__(self, exc_type, exc_val, exc_tb):
34
  await self.client.aclose()
35
 
 
36
  def transform_content(self, content: str) -> str:
37
+ """Transform content by replacing HTML tags and optionally removing think tags"""
 
 
38
  if not content:
39
  return content
40
+
41
+ logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
42
+
43
+ # Optionally remove thinking content based on configuration
44
  if not settings.SHOW_THINK_TAGS:
45
+ logger.debug("Removing thinking content from response")
46
+ original_length = len(content)
47
+
48
+ # Remove <details> blocks (thinking content) - handle both closed and unclosed tags
49
+ # First try to remove complete <details>...</details> blocks
50
+ content = re.sub(
51
+ r"<details[^>]*>.*?</details>", "", content, flags=re.DOTALL
52
+ )
53
+
54
+ # Then remove any remaining <details> opening tags and everything after them until we hit answer content
55
+ # Look for pattern: <details...><summary>...</summary>...content... and remove the thinking part
56
+ content = re.sub(
57
+ r"<details[^>]*>.*?(?=\s*[A-Z]|\s*\d|\s*$)",
58
+ "",
59
+ content,
60
+ flags=re.DOTALL,
61
+ )
62
+
63
+ content = content.strip()
64
+
65
+ logger.debug(
66
+ f"Content length after removing thinking content: {original_length} -> {len(content)}"
67
+ )
68
+ else:
69
+ logger.debug("Keeping thinking content, converting to <think> tags")
70
+
71
+ # Replace <details> with <think>
72
+ content = re.sub(r"<details[^>]*>", "<think>", content)
73
+ content = content.replace("</details>", "</think>")
74
+
75
+ # Remove <summary> tags and their content
76
+ content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
77
+
78
+ # If there's no closing </think>, add it at the end of thinking content
79
+ if "<think>" in content and "</think>" not in content:
80
+ # Find where thinking ends and answer begins
81
+ think_start = content.find("<think>")
82
+ if think_start != -1:
83
+ # Look for the start of the actual answer (usually starts with a capital letter or number)
84
+ answer_match = re.search(r"\n\s*[A-Z0-9]", content[think_start:])
85
+ if answer_match:
86
+ insert_pos = think_start + answer_match.start()
87
+ content = (
88
+ content[:insert_pos] + "</think>\n" + content[insert_pos:]
89
+ )
90
+ else:
91
+ content += "</think>"
92
+
93
  return content.strip()
94
 
95
+ async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
96
+ """Proxy request to Z.AI API"""
 
 
 
97
  cookie = await cookie_manager.get_next_cookie()
98
  if not cookie:
99
  raise HTTPException(status_code=503, detail="No available cookies")
100
 
101
+ # Transform model name
102
  target_model = (
103
  settings.UPSTREAM_MODEL
104
  if request.model == settings.MODEL_NAME
105
  else request.model
106
  )
107
 
108
+ # Determine if this should be a streaming response
109
+ is_streaming = (
110
+ request.stream if request.stream is not None else settings.DEFAULT_STREAM
111
+ )
112
+
113
+ # Validate parameter compatibility
114
+ if is_streaming and not settings.SHOW_THINK_TAGS:
115
+ logger.warning("SHOW_THINK_TAGS=false is ignored for streaming responses")
116
+
117
+ # Prepare request data
118
+ request_data = request.model_dump(exclude_none=True)
119
+ request_data["model"] = target_model
120
+
121
+ # Build request data based on actual Z.AI format from zai-messages.md
122
+ request_data = {
123
+ "stream": True, # Always request streaming from Z.AI for processing
124
+ "model": target_model,
125
+ "messages": request_data["messages"],
126
+ "background_tasks": {"title_generation": True, "tags_generation": True},
127
+ "chat_id": str(uuid.uuid4()),
128
+ "features": {
129
+ "image_generation": False,
130
+ "code_interpreter": False,
131
+ "web_search": False,
132
+ "auto_web_search": False,
133
+ },
134
+ "id": str(uuid.uuid4()),
135
+ "mcp_servers": ["deep-web-search"],
136
+ "model_item": {"id": target_model, "name": "GLM-4.5", "owned_by": "openai"},
137
+ "params": {},
138
+ "tool_servers": [],
139
+ "variables": {
140
+ "{{USER_NAME}}": "User",
141
+ "{{USER_LOCATION}}": "Unknown",
142
+ "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56",
143
+ },
144
+ }
145
+
146
+ logger.debug(f"Sending request data: {request_data}")
147
+
148
+ headers = {
149
+ "Content-Type": "application/json",
150
+ "Authorization": f"Bearer {cookie}",
151
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
152
+ "Accept": "application/json, text/event-stream",
153
+ "Accept-Language": "zh-CN",
154
+ "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
155
+ "sec-ch-ua-mobile": "?0",
156
+ "sec-ch-ua-platform": '"macOS"',
157
+ "x-fe-version": "prod-fe-1.0.53",
158
+ "Origin": "https://chat.z.ai",
159
+ "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
160
+ }
161
+
162
+ try:
163
+ response = await self.client.post(
164
+ settings.UPSTREAM_URL, json=request_data, headers=headers
165
+ )
166
+
167
+ if response.status_code == 401:
168
+ await cookie_manager.mark_cookie_failed(cookie)
169
+ raise HTTPException(status_code=401, detail="Invalid authentication")
170
+
171
+ if response.status_code != 200:
172
+ raise HTTPException(
173
+ status_code=response.status_code,
174
+ detail=f"Upstream error: {response.text}",
175
+ )
176
+
177
+ await cookie_manager.mark_cookie_success(cookie)
178
+ return {"response": response, "cookie": cookie}
179
+
180
+ except httpx.RequestError as e:
181
+ logger.error(f"Request error: {e}")
182
+ logger.error(f"Request error type: {type(e).__name__}")
183
+ logger.error(f"Request URL: {settings.UPSTREAM_URL}")
184
+ logger.error(f"Request timeout: {self.client.timeout}")
185
+ await cookie_manager.mark_cookie_failed(cookie)
186
+ raise HTTPException(
187
+ status_code=503, detail=f"Upstream service unavailable: {str(e)}"
188
+ )
189
+
190
+ async def process_streaming_response(
191
+ self, response: httpx.Response
192
+ ) -> AsyncGenerator[Dict[str, Any], None]:
193
+ """Process streaming response from Z.AI"""
194
+ buffer = ""
195
+
196
+ async for chunk in response.aiter_text():
197
+ buffer += chunk
198
+ lines = buffer.split("\n")
199
+ buffer = lines[-1] # Keep incomplete line in buffer
200
+
201
+ for line in lines[:-1]:
202
+ line = line.strip()
203
+ if not line.startswith("data: "):
204
+ continue
205
+
206
+ payload = line[6:].strip()
207
+ if payload == "[DONE]":
208
+ return
209
+
210
+ try:
211
+ parsed = json.loads(payload)
212
+
213
+ # Check for errors first
214
+ if parsed.get("error") or (parsed.get("data", {}).get("error")):
215
+ error_detail = (
216
+ parsed.get("error", {}).get("detail")
217
+ or parsed.get("data", {}).get("error", {}).get("detail")
218
+ or "Unknown error from upstream"
219
+ )
220
+ logger.error(f"Upstream error: {error_detail}")
221
+ raise HTTPException(
222
+ status_code=400, detail=f"Upstream error: {error_detail}"
223
+ )
224
+
225
+ # Transform the response
226
+ if parsed.get("data"):
227
+ # Remove unwanted fields
228
+ parsed["data"].pop("edit_index", None)
229
+ parsed["data"].pop("edit_content", None)
230
+
231
+ # Note: We don't transform delta_content here because <think> tags
232
+ # might span multiple chunks. We'll transform the final aggregated content.
233
+
234
+ yield parsed
235
+
236
+ except json.JSONDecodeError:
237
+ continue # Skip non-JSON lines
238
+
239
+ async def stream_response(
240
+ self, response: httpx.Response, model: str
241
+ ) -> AsyncGenerator[str, None]:
242
+ """Generate OpenAI-compatible streaming response"""
243
+ try:
244
+ async for parsed in self.process_streaming_response(response):
245
+ # 取得增量內容
246
+ delta_content = parsed.get("data", {}).get("delta_content", "")
247
+
248
+ # 根據設定決定是否過濾思考內容
249
+ if not settings.SHOW_THINK_TAGS:
250
+ # 只在回答階段輸出內容
251
+ phase = parsed.get("data", {}).get("phase", "")
252
+ if phase != "answer":
253
+ continue
254
+
255
+ # 如果有內容才輸出
256
+ if delta_content:
257
+ # 建立 OpenAI 格式的 chunk
258
+ chunk = {
259
+ "id": parsed.get("data", {}).get("id", f"chatcmpl-{uuid.uuid4()}"),
260
+ "object": "chat.completion.chunk",
261
+ "created": int(time.time()),
262
+ "model": model,
263
+ "choices": [
264
+ {
265
+ "index": 0,
266
+ "delta": {"content": delta_content},
267
+ "finish_reason": None,
268
+ }
269
+ ],
270
+ }
271
+
272
+ yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
273
+
274
+ # 發送完成標記
275
+ final_chunk = {
276
+ "id": f"chatcmpl-{uuid.uuid4()}",
277
+ "object": "chat.completion.chunk",
278
+ "created": int(time.time()),
279
+ "model": model,
280
+ "choices": [
281
+ {
282
+ "index": 0,
283
+ "delta": {},
284
+ "finish_reason": "stop",
285
+ }
286
+ ],
287
+ }
288
+ yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"
289
+ yield "data: [DONE]\n\n"
290
+
291
+ except Exception as e:
292
+ logger.error(f"Error in stream_response: {e}")
293
+ # 發送錯誤訊息
294
+ error_chunk = {
295
+ "id": f"chatcmpl-{uuid.uuid4()}",
296
+ "object": "chat.completion.chunk",
297
+ "created": int(time.time()),
298
+ "model": model,
299
+ "choices": [
300
+ {
301
+ "index": 0,
302
+ "delta": {"content": f"Error: {str(e)}"},
303
+ "finish_reason": "stop",
304
+ }
305
+ ],
306
+ }
307
+ yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"
308
+ yield "data: [DONE]\n\n"
309
+
310
+ async def handle_chat_completion(self, request: ChatCompletionRequest):
311
+ """Handle chat completion request"""
312
+ proxy_result = await self.proxy_request(request)
313
+ response = proxy_result["response"]
314
+
315
+ # Determine final streaming mode
316
+ is_streaming = (
317
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
318
  )
319
 
 
320
  if is_streaming:
321
+ # For streaming responses, SHOW_THINK_TAGS setting is ignored
322
  return StreamingResponse(
323
+ self.stream_response(response, request.model),
324
  media_type="text/event-stream",
325
  headers={
326
  "Cache-Control": "no-cache",
327
  "Connection": "keep-alive",
328
+ "X-Accel-Buffering": "no", # 對 nginx 有用
329
  },
330
  )
331
  else:
332
+ # For non-streaming responses, SHOW_THINK_TAGS setting applies
333
+ return await self.non_stream_response(response, request.model)
 
 
 
 
 
 
 
 
 
 
 
 
334
 
335
+ async def non_stream_response(
336
+ self, response: httpx.Response, model: str
337
+ ) -> ChatCompletionResponse:
338
+ """Generate non-streaming response"""
339
+ chunks = []
340
+ async for parsed in self.process_streaming_response(response):
341
+ chunks.append(parsed)
342
+ logger.debug(f"Received chunk: {parsed}") # Debug log
 
 
 
 
 
 
 
 
 
 
343
 
344
+ if not chunks:
345
+ raise HTTPException(status_code=500, detail="No response from upstream")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
346
 
347
+ logger.info(f"Total chunks received: {len(chunks)}")
348
+ logger.debug(f"First chunk structure: {chunks[0] if chunks else 'None'}")
 
 
349
 
350
+ # Aggregate content based on SHOW_THINK_TAGS setting
351
+ if settings.SHOW_THINK_TAGS:
352
+ # Include all content
353
+ full_content = "".join(
354
+ chunk.get("data", {}).get("delta_content", "") for chunk in chunks
355
+ )
356
+ else:
357
+ # Only include answer phase content
358
+ full_content = "".join(
359
+ chunk.get("data", {}).get("delta_content", "")
360
+ for chunk in chunks
361
+ if chunk.get("data", {}).get("phase") == "answer"
362
+ )
363
+
364
+ logger.info(f"Aggregated content length: {len(full_content)}")
365
+ logger.debug(
366
+ f"Full aggregated content: {full_content}"
367
+ ) # Show full content for debugging
368
 
369
+ # Apply content transformation (including think tag filtering)
370
+ transformed_content = self.transform_content(full_content)
371
 
372
+ logger.info(f"Transformed content length: {len(transformed_content)}")
373
+ logger.debug(f"Transformed content: {transformed_content[:200]}...")
374
+
375
+ # Create OpenAI-compatible response
376
+ return ChatCompletionResponse(
377
+ id=chunks[0].get("data", {}).get("id", "chatcmpl-unknown"),
378
+ created=int(time.time()),
379
+ model=model,
380
+ choices=[
381
+ {
382
+ "index": 0,
383
+ "message": {"role": "assistant", "content": transformed_content},
384
+ "finish_reason": "stop",
385
+ }
386
+ ],
387
+ )