bluewinliang commited on
Commit
d47fbbf
·
verified ·
1 Parent(s): 16171fa

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +110 -400
proxy_handler.py CHANGED
@@ -1,12 +1,12 @@
1
  """
2
- Proxy handler for Z.AI API requests
3
  """
4
-
5
  import json
6
  import logging
7
- import re
8
  import time
 
9
  from typing import AsyncGenerator, Dict, Any, Optional
 
10
  import httpx
11
  from fastapi import HTTPException
12
  from fastapi.responses import StreamingResponse
@@ -24,6 +24,7 @@ logger = logging.getLogger(__name__)
24
 
25
  class ProxyHandler:
26
  def __init__(self):
 
27
  self.client = httpx.AsyncClient(timeout=60.0)
28
 
29
  async def __aenter__(self):
@@ -32,428 +33,137 @@ class ProxyHandler:
32
  async def __aexit__(self, exc_type, exc_val, exc_tb):
33
  await self.client.aclose()
34
 
 
35
  def transform_content(self, content: str) -> str:
36
- """Transform content by replacing HTML tags and optionally removing think tags"""
 
 
37
  if not content:
38
  return content
39
-
40
- logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
41
-
42
- # Optionally remove thinking content based on configuration
43
  if not settings.SHOW_THINK_TAGS:
44
- logger.debug("Removing thinking content from response")
45
- original_length = len(content)
46
-
47
- # Remove <details> blocks (thinking content) - handle both closed and unclosed tags
48
- content = re.sub(
49
- r"<details[^>]*>.*?</details>", "", content, flags=re.DOTALL
50
- )
51
- content = re.sub(
52
- r"<details[^>]*>.*?(?=\s*[A-Z]|\s*\d|\s*$)",
53
- "",
54
- content,
55
- flags=re.DOTALL,
56
- )
57
- content = content.strip()
58
-
59
- logger.debug(
60
- f"Content length after removing thinking content: {original_length} -> {len(content)}"
61
- )
62
- else:
63
- logger.debug("Keeping thinking content, converting to <think> tags")
64
-
65
- # Replace <details> with <think>
66
- content = re.sub(r"<details[^>]*>", "<think>", content)
67
- content = content.replace("</details>", "</think>")
68
- # Remove <summary> tags and their content
69
- content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
70
-
71
- # If there's no closing </think>, add it at the end of thinking content
72
- if "<think>" in content and "</think>" not in content:
73
- think_start = content.find("<think>")
74
- if think_start != -1:
75
- answer_match = re.search(r"\n\s*[A-Z0-9]", content[think_start:])
76
- if answer_match:
77
- insert_pos = think_start + answer_match.start()
78
- content = (
79
- content[:insert_pos] + "</think>\n" + content[insert_pos:]
80
- )
81
- else:
82
- content += "</think>"
83
-
84
  return content.strip()
85
 
86
- def transform_delta_content(self, content: str) -> str:
87
- """Transform delta content for streaming"""
88
- if not content:
89
- return content
90
-
91
- # Convert <details> to <think> and remove summary tags
92
- content = re.sub(r"<details[^>]*>", "<think>", content)
93
- content = content.replace("</details>", "</think>")
94
- content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
95
-
96
- return content
97
-
98
- async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
99
- """Proxy request to Z.AI API"""
100
-
101
- cookie = await cookie_manager.get_next_cookie()
102
- if not cookie:
103
- raise HTTPException(status_code=503, detail="No available cookies")
104
-
105
- # Transform model name
106
- target_model = (
107
- settings.UPSTREAM_MODEL
108
- if request.model == settings.MODEL_NAME
109
- else request.model
110
- )
111
-
112
- # Build request data based on the actual Z.AI API format
113
- import uuid
114
- from datetime import datetime
115
-
116
- current_time = datetime.now()
117
-
118
- # Generate unique IDs for the request
119
- chat_id = str(uuid.uuid4())
120
- request_id = str(uuid.uuid4())
121
-
122
- # Transform messages to include message_id
123
- messages_with_ids = []
124
- for msg in request.model_dump()["messages"]:
125
- message_with_id = {
126
- **msg,
127
- "message_id": str(uuid.uuid4()), # Add message_id to each message
128
- }
129
- messages_with_ids.append(message_with_id)
130
-
131
- request_data = {
132
- "stream": True,
133
- "model": target_model,
134
- "messages": messages_with_ids, # Use messages with IDs
135
- "chat_id": chat_id, # Add chat_id
136
- "id": request_id, # Add request ID
137
- "params": {},
138
- "tool_servers": [],
139
- "features": {
140
- "image_generation": False,
141
- "code_interpreter": False,
142
- "web_search": False,
143
- "auto_web_search": False,
144
- "preview_mode": True,
145
- "flags": [],
146
- "features": [
147
- {"type": "mcp", "server": "vibe-coding", "status": "hidden"},
148
- {"type": "mcp", "server": "ppt-maker", "status": "hidden"},
149
- {"type": "mcp", "server": "image-search", "status": "hidden"},
150
- ],
151
- "enable_thinking": True,
152
- },
153
- "variables": {
154
- "{{USER_NAME}}": "User",
155
- "{{USER_LOCATION}}": "Unknown",
156
- "{{CURRENT_DATETIME}}": current_time.strftime("%Y-%m-%d %H:%M:%S"),
157
- "{{CURRENT_DATE}}": current_time.strftime("%Y-%m-%d"),
158
- "{{CURRENT_TIME}}": current_time.strftime("%H:%M:%S"),
159
- "{{CURRENT_WEEKDAY}}": current_time.strftime("%A"),
160
- "{{CURRENT_TIMEZONE}}": "Asia/Taipei",
161
- "{{USER_LANGUAGE}}": "zh-CN",
162
- },
163
- "model_item": {
164
- "id": target_model,
165
- "name": "GLM-4.5",
166
- "owned_by": "openai",
167
- "openai": {
168
- "id": target_model,
169
- "name": target_model,
170
- "owned_by": "openai",
171
- "openai": {"id": target_model},
172
- "urlIdx": 1,
173
- },
174
- "urlIdx": 1,
175
- "info": {
176
- "id": target_model,
177
- "user_id": "7080a6c5-5fcc-4ea4-a85f-3b3fac905cf2",
178
- "base_model_id": None,
179
- "name": "GLM-4.5",
180
- "params": {"top_p": 0.95, "temperature": 0.6, "max_tokens": 80000},
181
- "meta": {
182
- "profile_image_url": "/static/favicon.png",
183
- "description": "Most advanced model, proficient in coding and tool use",
184
- "capabilities": {
185
- "vision": False,
186
- "citations": False,
187
- "preview_mode": False,
188
- "web_search": False,
189
- "language_detection": False,
190
- "restore_n_source": False,
191
- "mcp": True,
192
- "file_qa": True,
193
- "returnFc": True,
194
- "returnThink": True,
195
- "think": True,
196
- },
197
- "mcpServerIds": [
198
- "deep-web-search",
199
- "ppt-maker",
200
- "image-search",
201
- "vibe-coding",
202
- ],
203
- },
204
- },
205
- },
206
- }
207
-
208
- logger.debug(f"Sending request data: {json.dumps(request_data, indent=2)}")
209
-
210
- # Use the exact headers from your curl request
211
- headers = {
212
- "Accept": "*/*",
213
- "Accept-Language": "zh-CN",
214
- "Authorization": f"Bearer {cookie}",
215
- "Connection": "keep-alive",
216
- "Content-Type": "application/json",
217
- "Cookie": f"token={cookie}",
218
- "Origin": "https://chat.z.ai",
219
- "Referer": "https://chat.z.ai/",
220
- "Sec-Fetch-Dest": "empty",
221
- "Sec-Fetch-Mode": "cors",
222
- "Sec-Fetch-Site": "same-origin",
223
- "User-Agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Mobile Safari/537.36",
224
- "X-FE-Version": "prod-fe-1.0.57",
225
- "sec-ch-ua": '"Chromium";v="137", "Not/A)Brand";v="24"',
226
- "sec-ch-ua-mobile": "?1",
227
- "sec-ch-ua-platform": '"Android"',
228
- }
229
-
230
- try:
231
- response = await self.client.post(
232
- settings.UPSTREAM_URL, json=request_data, headers=headers
233
  )
234
 
235
- if response.status_code == 401:
236
- await cookie_manager.mark_cookie_failed(cookie)
237
- raise HTTPException(status_code=401, detail="Invalid authentication")
238
-
239
- if response.status_code != 200:
240
- logger.error(f"Upstream error: {response.status_code} - {response.text}")
241
- raise HTTPException(
242
- status_code=response.status_code,
243
- detail=f"Upstream error: {response.text}",
244
- )
245
-
246
- await cookie_manager.mark_cookie_success(cookie)
247
- return {"response": response, "cookie": cookie}
248
-
249
- except httpx.RequestError as e:
250
- logger.error(f"Request error: {e}")
251
- await cookie_manager.mark_cookie_failed(cookie)
252
- raise HTTPException(status_code=503, detail="Upstream service unavailable")
253
-
254
- async def process_streaming_response_real_time(
255
- self, response: httpx.Response
256
- ) -> AsyncGenerator[Dict[str, Any], None]:
257
- """Process streaming response in real time - truly streaming"""
258
- buffer = ""
259
-
260
- async for chunk in response.aiter_text():
261
- if not chunk:
262
- continue
263
-
264
- buffer += chunk
265
- lines = buffer.split("\n")
266
- buffer = lines[-1] # Keep incomplete line in buffer
267
-
268
- for line in lines[:-1]:
269
- line = line.strip()
270
- if not line.startswith("data: "):
271
- continue
272
-
273
- payload = line[6:].strip()
274
- if payload == "[DONE]":
275
- return
276
-
277
- try:
278
- parsed = json.loads(payload)
279
-
280
- # Check for errors first
281
- if parsed.get("error") or (parsed.get("data", {}).get("error")):
282
- error_detail = (
283
- parsed.get("error", {}).get("detail")
284
- or parsed.get("data", {}).get("error", {}).get("detail")
285
- or "Unknown error from upstream"
286
- )
287
- logger.error(f"Upstream error: {error_detail}")
288
- raise HTTPException(
289
- status_code=400, detail=f"Upstream error: {error_detail}"
290
- )
291
-
292
- # Transform the response immediately
293
- if parsed.get("data"):
294
- # Remove unwanted fields
295
- parsed["data"].pop("edit_index", None)
296
- parsed["data"].pop("edit_content", None)
297
-
298
- # Transform delta_content immediately for streaming
299
- delta_content = parsed["data"].get("delta_content", "")
300
- if delta_content and settings.SHOW_THINK_TAGS:
301
- transformed = self.transform_delta_content(delta_content)
302
- parsed["data"]["delta_content"] = transformed.lstrip()
303
-
304
- # Yield immediately - this is true streaming!
305
- yield parsed
306
-
307
- except json.JSONDecodeError as e:
308
- logger.debug(
309
- f"JSON decode error for line: {line[:100]}... Error: {e}"
310
- )
311
- continue # Skip non-JSON lines
312
-
313
- async def handle_chat_completion(self, request: ChatCompletionRequest):
314
- """Handle chat completion request"""
315
- proxy_result = await self.proxy_request(request)
316
- response = proxy_result["response"]
317
-
318
- # Determine final streaming mode
319
- is_streaming = (
320
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
321
  )
322
 
 
323
  if is_streaming:
 
324
  return StreamingResponse(
325
- self.stream_response_real_time(response, request.model),
326
  media_type="text/event-stream",
327
  headers={
328
  "Cache-Control": "no-cache",
329
  "Connection": "keep-alive",
 
330
  },
331
  )
332
  else:
333
- return await self.non_stream_response(response, request.model)
 
 
 
 
 
 
 
 
 
 
 
 
 
334
 
335
- async def stream_response_real_time(
336
- self, response: httpx.Response, model: str
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
337
  ) -> AsyncGenerator[str, None]:
338
- """Generate truly real-time streaming response in OpenAI format"""
339
- import uuid
340
- import time
341
-
342
- # Generate a unique completion ID
343
- completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
344
-
345
- try:
346
- # Process each chunk immediately as it arrives - true streaming!
347
- async for parsed in self.process_streaming_response_real_time(response):
 
 
 
 
 
348
  try:
349
- data = parsed.get("data", {})
350
- delta_content = data.get("delta_content", "")
351
- phase = data.get("phase", "")
352
-
353
- # For SHOW_THINK_TAGS=false, filter out non-answer content
354
- if (
355
- not settings.SHOW_THINK_TAGS
356
- and phase != "answer"
357
- and delta_content
358
- ):
359
- logger.debug(
360
- f"Skipping content in {phase} phase (SHOW_THINK_TAGS=false)"
361
- )
362
- continue
363
-
364
- # Send content immediately if available
365
- if delta_content:
366
- openai_chunk = {
367
- "id": completion_id,
368
- "object": "chat.completion.chunk",
369
- "created": int(time.time()),
370
- "model": model,
371
- "choices": [
372
- {
373
- "index": 0,
374
- "delta": {"content": delta_content},
375
- "finish_reason": None,
376
- }
377
- ],
378
- }
379
-
380
- chunk_json = json.dumps(openai_chunk)
381
- yield f"data: {chunk_json}\n\n"
382
- logger.debug(f"Sent chunk: {chunk_json[:100]}...")
383
-
384
- except Exception as e:
385
- logger.error(f"Error processing streaming chunk: {e}")
386
  continue
387
 
388
- # Send final completion chunk
389
- final_chunk = {
390
- "id": completion_id,
391
- "object": "chat.completion.chunk",
392
- "created": int(time.time()),
393
- "model": model,
394
- "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
395
- }
396
-
397
- yield f"data: {json.dumps(final_chunk)}\n\n"
398
- yield "data: [DONE]\n\n"
399
-
400
- except Exception as e:
401
- logger.error(f"Streaming error: {e}")
402
- # Send error in OpenAI format
403
- error_chunk = {"error": {"message": str(e), "type": "server_error"}}
404
- yield f"data: {json.dumps(error_chunk)}\n\n"
405
-
406
- async def non_stream_response(
407
- self, response: httpx.Response, model: str
408
- ) -> ChatCompletionResponse:
409
- """Generate non-streaming response by collecting all chunks"""
410
- chunks = []
411
-
412
- # For non-streaming, we still collect all chunks first
413
- async for parsed in self.process_streaming_response_real_time(response):
414
- chunks.append(parsed)
415
- logger.debug(
416
- f"Collected chunk: {parsed.get('data', {}).get('delta_content', '')[:50]}..."
417
- )
418
-
419
- if not chunks:
420
- raise HTTPException(status_code=500, detail="No response from upstream")
421
-
422
- logger.info(f"Total chunks collected: {len(chunks)}")
423
-
424
- # Aggregate content based on SHOW_THINK_TAGS setting
425
- if settings.SHOW_THINK_TAGS:
426
- full_content = "".join(
427
- chunk.get("data", {}).get("delta_content", "") for chunk in chunks
428
- )
429
- else:
430
- full_content = "".join(
431
- chunk.get("data", {}).get("delta_content", "")
432
- for chunk in chunks
433
- if chunk.get("data", {}).get("phase") == "answer"
434
- )
435
-
436
- logger.info(f"Aggregated content length: {len(full_content)}")
437
 
438
- # Apply final content transformation for non-streaming
439
- transformed_content = self.transform_content(full_content)
 
 
 
 
 
 
 
 
 
 
 
 
440
 
441
- logger.info(f"Final transformed content length: {len(transformed_content)}")
 
442
 
443
- # Create OpenAI-compatible response
444
- return ChatCompletionResponse(
445
- id=(
446
- chunks[0].get("data", {}).get("id", "chatcmpl-unknown")
447
- if chunks
448
- else "chatcmpl-unknown"
449
- ),
450
- created=int(time.time()),
451
- model=model,
452
- choices=[
453
- {
454
- "index": 0,
455
- "message": {"role": "assistant", "content": transformed_content},
456
- "finish_reason": "stop",
457
- }
458
- ],
459
- )
 
1
  """
2
+ Proxy handler for Z.AI API requests (OpenAI-compatible)
3
  """
 
4
  import json
5
  import logging
 
6
  import time
7
+ import uuid
8
  from typing import AsyncGenerator, Dict, Any, Optional
9
+
10
  import httpx
11
  from fastapi import HTTPException
12
  from fastapi.responses import StreamingResponse
 
24
 
25
  class ProxyHandler:
26
  def __init__(self):
27
+ # Z.AI 端連線逾時 60 秒
28
  self.client = httpx.AsyncClient(timeout=60.0)
29
 
30
  async def __aenter__(self):
 
33
  async def __aexit__(self, exc_type, exc_val, exc_tb):
34
  await self.client.aclose()
35
 
36
+ # --------- 文字前處理 ---------
37
  def transform_content(self, content: str) -> str:
38
+ """
39
+ 依照專案設定將 Z.AI 傳回的 HTML / THINK TAG 等轉成純文字
40
+ """
41
  if not content:
42
  return content
43
+ # 例:過濾 <br/> 與 <think>
44
+ content = content.replace("<br/>", "\n")
 
 
45
  if not settings.SHOW_THINK_TAGS:
46
+ content = content.replace("<think>", "").replace("</think>", "")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  return content.strip()
48
 
49
+ # --------- 主要進入點 ---------
50
+ async def proxy_request(self, request: ChatCompletionRequest):
51
+ """
52
+ OpenAI API 相容的 proxy 入口
53
+ """
54
+ cookie = await cookie_manager.get_next_cookie()
55
+ if not cookie:
56
+ raise HTTPException(status_code=503, detail="No available cookies")
57
+
58
+ # 若對外聲稱的 model 與內部實際 model 不同,在此轉換
59
+ target_model = (
60
+ settings.UPSTREAM_MODEL
61
+ if request.model == settings.MODEL_NAME
62
+ else request.model
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
  )
64
 
65
+ # 決定是否串流
66
+ is_streaming: bool = (
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
68
  )
69
 
70
+ # 向 Z.AI 串流或一次性取資料
71
  if is_streaming:
72
+ # 建立 SSE StreamingResponse
73
  return StreamingResponse(
74
+ self.stream_response(request, target_model, cookie),
75
  media_type="text/event-stream",
76
  headers={
77
  "Cache-Control": "no-cache",
78
  "Connection": "keep-alive",
79
+ # 若有 nginx 建議加上 X-Accel-Buffering: no
80
  },
81
  )
82
  else:
83
+ # 非串流:拿到完整內容後包成 ChatCompletionResponse
84
+ content = await self.get_full_response(request, target_model, cookie)
85
+ return ChatCompletionResponse(
86
+ id=f"chatcmpl-{uuid.uuid4()}",
87
+ created=int(time.time()),
88
+ model=target_model,
89
+ choices=[
90
+ {
91
+ "index": 0,
92
+ "message": {"role": "assistant", "content": content},
93
+ "finish_reason": "stop",
94
+ }
95
+ ],
96
+ )
97
 
98
+ # --------- 非串流邏輯 ---------
99
+ async def get_full_response(
100
+ self,
101
+ request: ChatCompletionRequest,
102
+ target_model: str,
103
+ cookie: str,
104
+ ) -> str:
105
+ """
106
+ 向 Z.AI 取完整回覆並回傳轉換後文字
107
+ """
108
+ resp = await self.client.post(
109
+ settings.ZAI_ENDPOINT,
110
+ headers={"Cookie": cookie},
111
+ json=request.model_dump(exclude_none=True),
112
+ )
113
+ resp.raise_for_status()
114
+ data = resp.json()
115
+ return self.transform_content(data["choices"][0]["message"]["content"])
116
+
117
+ # --------- 串流邏輯 ---------
118
+ async def stream_response(
119
+ self,
120
+ request: ChatCompletionRequest,
121
+ target_model: str,
122
+ cookie: str,
123
  ) -> AsyncGenerator[str, None]:
124
+ """
125
+ Z.AI 串流資料即時轉成 OpenAI SSE 片段
126
+ """
127
+ # 呼叫 Z.AI 串流端點(假設支援 HTTP chunk)
128
+ async with self.client.stream(
129
+ "POST",
130
+ settings.ZAI_STREAM_ENDPOINT,
131
+ headers={"Cookie": cookie},
132
+ json=request.model_dump(exclude_none=True),
133
+ ) as resp:
134
+ resp.raise_for_status()
135
+ async for line in resp.aiter_lines():
136
+ if not line:
137
+ continue
138
+ # Z.AI 每行可能已是 json;自行視格式解析
139
  try:
140
+ raw = json.loads(line)
141
+ except json.JSONDecodeError:
142
+ logger.debug("skip non-json line from Z.AI: %s", line)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
  continue
144
 
145
+ # 取得文字增量
146
+ delta_text = self.transform_content(raw.get("delta", ""))
147
+ if delta_text == "":
148
+ continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
149
 
150
+ # 組成 OpenAI stream chunk
151
+ chunk: Dict[str, Any] = {
152
+ "id": raw.get("id", f"chatcmpl-{uuid.uuid4()}"),
153
+ "object": "chat.completion.chunk",
154
+ "created": int(time.time()),
155
+ "model": target_model,
156
+ "choices": [
157
+ {
158
+ "index": 0,
159
+ "delta": {"content": delta_text},
160
+ "finish_reason": None,
161
+ }
162
+ ],
163
+ }
164
 
165
+ # 送出 SSE formatted line
166
+ yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
167
 
168
+ # Z.AI 結束後送出 [DONE]
169
+ yield "data: [DONE]\n\n"