bluewinliang commited on
Commit
900bbdd
·
verified ·
1 Parent(s): aabc532

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +623 -598
proxy_handler.py CHANGED
@@ -1,598 +1,623 @@
1
- """Proxy handler for Z.AI API requests"""
2
- import json
3
- import logging
4
- import re
5
- import time
6
- from typing import AsyncGenerator, Dict, Any, Optional
7
-
8
- import httpx
9
- from fastapi import HTTPException
10
- from fastapi.responses import StreamingResponse
11
-
12
- from config import settings
13
- from cookie_manager import cookie_manager
14
- from models import (
15
- ChatCompletionRequest,
16
- ChatCompletionResponse,
17
- ChatCompletionStreamResponse,
18
- )
19
-
20
- logger = logging.getLogger(__name__)
21
-
22
-
23
- class ProxyHandler:
24
- def __init__(self):
25
- # Configure httpx client for streaming support
26
- self.client = httpx.AsyncClient(
27
- timeout=httpx.Timeout(60.0, read=300.0), # Longer read timeout for streaming
28
- limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
29
- http2=True # Enable HTTP/2 for better streaming performance
30
- )
31
-
32
- async def __aenter__(self):
33
- return self
34
-
35
- async def __aexit__(self, exc_type, exc_val, exc_tb):
36
- await self.client.aclose()
37
-
38
- def transform_content(self, content: str) -> str:
39
- """Transform content by replacing HTML tags and optionally removing think tags"""
40
- if not content:
41
- return content
42
-
43
- logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
44
-
45
- # Optionally remove thinking content based on configuration
46
- if not settings.SHOW_THINK_TAGS:
47
- logger.debug("Removing thinking content from response")
48
- original_length = len(content)
49
-
50
- # Remove <details> blocks (thinking content) - handle both closed and unclosed tags
51
- # First try to remove complete <details>...</details> blocks
52
- content = re.sub(
53
- r"<details[^>]*>.*?</details>", "", content, flags=re.DOTALL
54
- )
55
- # Then remove any remaining <details> opening tags and everything after them until we hit answer content
56
- content = re.sub(
57
- r"<details[^>]*>.*?(?=\s*[A-Z0-9\u4e00-\u9fff]|\s*$)",
58
- "",
59
- content,
60
- flags=re.DOTALL,
61
- )
62
- content = content.strip()
63
- logger.debug(
64
- f"Content length after removing thinking content: {original_length} -> {len(content)}"
65
- )
66
- else:
67
- logger.debug("Keeping thinking content, converting to <think> tags")
68
- # Replace <details> with <think>
69
- content = re.sub(r"<details[^>]*>", "<think>", content)
70
- content = content.replace("</details>", "</think>")
71
- # Remove <summary> tags and their content
72
- content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
73
- # Auto-close <think> when缺失
74
- open_cnt = content.count("<think>")
75
- close_cnt = content.count("</think>")
76
- if open_cnt > close_cnt:
77
- content += "</think>"
78
-
79
- return content.strip()
80
-
81
- async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
82
- """Proxy request to Z.AI API"""
83
- cookie = await cookie_manager.get_next_cookie()
84
- if not cookie:
85
- raise HTTPException(status_code=503, detail="No available cookies")
86
-
87
- # Transform model name
88
- target_model = (
89
- settings.UPSTREAM_MODEL
90
- if request.model == settings.MODEL_NAME
91
- else request.model
92
- )
93
-
94
- # Determine if this should be a streaming response
95
- is_streaming = (
96
- request.stream if request.stream is not None else settings.DEFAULT_STREAM
97
- )
98
-
99
- # Validate parameter compatibility
100
- if is_streaming and not settings.SHOW_THINK_TAGS:
101
- logger.warning("SHOW_THINK_TAGS=false is ignored for streaming responses")
102
-
103
- # Prepare request data
104
- request_data = request.model_dump(exclude_none=True)
105
- request_data["model"] = target_model
106
-
107
- # Build request data based on actual Z.AI format from zai-messages.md
108
- import uuid
109
-
110
- request_data = {
111
- "stream": True, # Always request streaming from Z.AI for processing
112
- "model": target_model,
113
- "messages": request_data["messages"],
114
- "background_tasks": {"title_generation": True, "tags_generation": True},
115
- "chat_id": str(uuid.uuid4()),
116
- "features": {
117
- "image_generation": False,
118
- "code_interpreter": False,
119
- "web_search": False,
120
- "auto_web_search": False,
121
- },
122
- "id": str(uuid.uuid4()),
123
- "mcp_servers": ["deep-web-search"],
124
- "model_item": {"id": target_model, "name": "GLM-4.5", "owned_by": "openai"},
125
- "params": {},
126
- "tool_servers": [],
127
- "variables": {
128
- "{{USER_NAME}}": "User",
129
- "{{USER_LOCATION}}": "Unknown",
130
- "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56",
131
- },
132
- }
133
-
134
- logger.debug(f"Sending request data: {request_data}")
135
-
136
- headers = {
137
- "Content-Type": "application/json",
138
- "Authorization": f"Bearer {cookie}",
139
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
140
- "Accept": "application/json, text/event-stream",
141
- "Accept-Language": "zh-CN",
142
- "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
143
- "sec-ch-ua-mobile": "?0",
144
- "sec-ch-ua-platform": '"macOS"',
145
- "x-fe-version": "prod-fe-1.0.53",
146
- "Origin": "https://chat.z.ai",
147
- "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
148
- }
149
-
150
- try:
151
- # Use client.stream() for TRUE streaming response - this is the key fix!
152
- async with self.client.stream(
153
- "POST",
154
- settings.UPSTREAM_URL,
155
- json=request_data,
156
- headers=headers,
157
- timeout=httpx.Timeout(60.0, read=300.0)
158
- ) as response:
159
-
160
- if response.status_code == 401:
161
- await cookie_manager.mark_cookie_failed(cookie)
162
- raise HTTPException(status_code=401, detail="Invalid authentication")
163
- if response.status_code != 200:
164
- # For streaming, we need to read the error response properly
165
- try:
166
- error_text = await response.aread()
167
- error_detail = error_text.decode('utf-8')
168
- except:
169
- error_detail = f"HTTP {response.status_code}"
170
- raise HTTPException(
171
- status_code=response.status_code,
172
- detail=f"Upstream error: {error_detail}",
173
- )
174
- await cookie_manager.mark_cookie_success(cookie)
175
- return {"response": response, "cookie": cookie}
176
- except httpx.RequestError as e:
177
- logger.error(f"Request error: {e}")
178
- logger.error(f"Request error type: {type(e).__name__}")
179
- logger.error(f"Request URL: {settings.UPSTREAM_URL}")
180
- logger.error(f"Request timeout: {self.client.timeout}")
181
- await cookie_manager.mark_cookie_failed(cookie)
182
- raise HTTPException(
183
- status_code=503, detail=f"Upstream service unavailable: {str(e)}"
184
- )
185
-
186
- async def process_streaming_response(
187
- self, response: httpx.Response
188
- ) -> AsyncGenerator[Dict[str, Any], None]:
189
- """Process streaming response from Z.AI - TRUE real-time processing"""
190
- buffer = ""
191
- # Use aiter_text with small chunk size for real-time processing
192
- async for chunk in response.aiter_text(chunk_size=1024): # Small chunks for responsiveness
193
- if not chunk:
194
- continue
195
-
196
- buffer += chunk
197
- # Process complete lines immediately
198
- while "\n" in buffer:
199
- line, buffer = buffer.split("\n", 1)
200
- line = line.strip()
201
-
202
- if not line.startswith("data: "):
203
- continue
204
-
205
- payload = line[6:].strip()
206
- if payload == "[DONE]":
207
- return
208
-
209
- try:
210
- parsed = json.loads(payload)
211
- # Check for errors first
212
- if parsed.get("error") or (parsed.get("data", {}).get("error")):
213
- error_detail = (
214
- parsed.get("error", {}).get("detail")
215
- or parsed.get("data", {}).get("error", {}).get("detail")
216
- or "Unknown error from upstream"
217
- )
218
- logger.error(f"Upstream error: {error_detail}")
219
- raise HTTPException(
220
- status_code=400, detail=f"Upstream error: {error_detail}"
221
- )
222
-
223
- # Clean up response data
224
- if parsed.get("data"):
225
- # Remove unwanted fields for cleaner processing
226
- parsed["data"].pop("edit_index", None)
227
- parsed["data"].pop("edit_content", None)
228
-
229
- # Yield immediately for real-time streaming
230
- yield parsed
231
- except json.JSONDecodeError as e:
232
- logger.debug(f"JSON decode error (skipping): {e}")
233
- continue # Skip non-JSON lines
234
-
235
- async def handle_chat_completion(self, request: ChatCompletionRequest):
236
- """Handle chat completion request"""
237
- # Determine final streaming mode
238
- is_streaming = (
239
- request.stream if request.stream is not None else settings.DEFAULT_STREAM
240
- )
241
-
242
- if is_streaming:
243
- # For streaming responses, use direct streaming proxy
244
- return StreamingResponse(
245
- self.stream_proxy_response(request),
246
- media_type="text/event-stream",
247
- headers={
248
- "Cache-Control": "no-cache",
249
- "Connection": "keep-alive",
250
- },
251
- )
252
- else:
253
- # For non-streaming responses, collect all streaming data first
254
- chunks = []
255
- async for chunk_data in self.stream_proxy_response(request):
256
- if chunk_data.startswith("data: ") and not chunk_data.startswith("data: [DONE]"):
257
- try:
258
- chunk_json = json.loads(chunk_data[6:])
259
- if chunk_json.get("choices", [{}])[0].get("delta", {}).get("content"):
260
- chunks.append(chunk_json["choices"][0]["delta"]["content"])
261
- except:
262
- continue
263
-
264
- # Combine all content
265
- full_content = "".join(chunks)
266
-
267
- # Return as non-streaming response
268
- import time
269
- import uuid
270
- return ChatCompletionResponse(
271
- id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
272
- created=int(time.time()),
273
- model=request.model,
274
- choices=[
275
- {
276
- "index": 0,
277
- "message": {"role": "assistant", "content": full_content},
278
- "finish_reason": "stop",
279
- }
280
- ],
281
- )
282
-
283
- async def stream_response(self, response: httpx.Response, model: str) -> AsyncGenerator[str, None]:
284
- """Generate TRUE streaming response in OpenAI format - real-time processing"""
285
- import uuid
286
- import time
287
-
288
- # Generate a unique completion ID
289
- completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
290
- current_phase = None
291
- role_sent = False
292
-
293
- try:
294
- # Real-time streaming: process each chunk immediately as it arrives
295
- async for parsed in self.process_streaming_response(response):
296
- try:
297
- data = parsed.get("data", {})
298
- delta_content = data.get("delta_content", "")
299
- edit_content = data.get("edit_content", "")
300
- phase = data.get("phase", "")
301
-
302
- content_piece = delta_content if delta_content else edit_content
303
-
304
- # Track phase changes
305
- if phase != current_phase:
306
- current_phase = phase
307
- logger.debug(f"Phase changed to: {phase}")
308
-
309
- # Send role chunk once at beginning
310
- if not role_sent:
311
- role_sent = True
312
- role_chunk = {
313
- "id": completion_id,
314
- "object": "chat.completion.chunk",
315
- "created": int(time.time()),
316
- "model": model,
317
- "choices": [{
318
- "index": 0,
319
- "delta": {"role": "assistant"},
320
- "finish_reason": None
321
- }]
322
- }
323
- yield f"data: {json.dumps(role_chunk)}\n\n"
324
-
325
- # Apply filtering based on SHOW_THINK_TAGS and phase
326
- should_send_content = True
327
- if not settings.SHOW_THINK_TAGS and phase == "thinking":
328
- should_send_content = False
329
-
330
- # Process and send content immediately if we should
331
- if content_piece and should_send_content:
332
- transformed_delta = content_piece
333
-
334
- if settings.SHOW_THINK_TAGS:
335
- transformed_delta = re.sub(r'<details[^>]*>', '<think>', transformed_delta)
336
- transformed_delta = transformed_delta.replace('</details>', '</think>')
337
- transformed_delta = re.sub(r'<summary>.*?</summary>', '', transformed_delta, flags=re.DOTALL)
338
-
339
- # Make sure <think> closed when phase switches
340
- if settings.SHOW_THINK_TAGS and phase != "thinking" and transformed_delta and not transformed_delta.startswith("</think>"):
341
- if transformed_delta.lstrip().startswith(">"):
342
- transformed_delta = "</think>\n" + transformed_delta
343
-
344
- openai_chunk = {
345
- "id": completion_id,
346
- "object": "chat.completion.chunk",
347
- "created": int(time.time()),
348
- "model": model,
349
- "choices": [{
350
- "index": 0,
351
- "delta": {
352
- "content": transformed_delta
353
- },
354
- "finish_reason": None
355
- }]
356
- }
357
- yield f"data: {json.dumps(openai_chunk)}\n\n"
358
- except Exception as e:
359
- logger.error(f"Error processing streaming chunk: {e}")
360
- continue
361
-
362
- # Send final completion chunk
363
- final_chunk = {
364
- "id": completion_id,
365
- "object": "chat.completion.chunk",
366
- "created": int(time.time()),
367
- "model": model,
368
- "choices": [{
369
- "index": 0,
370
- "delta": {},
371
- "finish_reason": "stop"
372
- }]
373
- }
374
- yield f"data: {json.dumps(final_chunk)}\n\n"
375
- yield "data: [DONE]\n\n"
376
- except Exception as e:
377
- logger.error(f"Streaming error: {e}")
378
- error_chunk = {
379
- "error": {
380
- "message": str(e),
381
- "type": "server_error"
382
- }
383
- }
384
- yield f"data: {json.dumps(error_chunk)}\n\n"
385
-
386
- async def non_stream_response(
387
- self, response: httpx.Response, model: str
388
- ) -> ChatCompletionResponse:
389
- """Generate non-streaming response"""
390
- chunks = []
391
- async for parsed in self.process_streaming_response(response):
392
- chunks.append(parsed)
393
- logger.debug(f"Received chunk: {parsed}") # Debug log
394
-
395
- if not chunks:
396
- raise HTTPException(status_code=500, detail="No response from upstream")
397
-
398
- logger.info(f"Total chunks received: {len(chunks)}")
399
- logger.debug(f"First chunk structure: {chunks[0] if chunks else 'None'}")
400
-
401
- if settings.SHOW_THINK_TAGS:
402
- full_content = "".join(
403
- (c.get("data", {}).get("delta_content", "") or c.get("data", {}).get("edit_content", ""))
404
- for c in chunks
405
- )
406
- else:
407
- full_content = "".join(
408
- (c.get("data", {}).get("delta_content", "") or c.get("data", {}).get("edit_content", ""))
409
- for c in chunks
410
- if c.get("data", {}).get("phase") == "answer"
411
- )
412
-
413
- transformed_content = self.transform_content(full_content)
414
-
415
- return ChatCompletionResponse(
416
- id=chunks[0].get("data", {}).get("id", "chatcmpl-unknown"),
417
- created=int(time.time()),
418
- model=model,
419
- choices=[
420
- {
421
- "index": 0,
422
- "message": {"role": "assistant", "content": transformed_content},
423
- "finish_reason": "stop",
424
- }
425
- ],
426
- )
427
-
428
- async def stream_proxy_response(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
429
- """TRUE streaming proxy - direct pass-through with minimal processing"""
430
- import uuid
431
- import time
432
-
433
- cookie = await cookie_manager.get_next_cookie()
434
- if not cookie:
435
- raise HTTPException(status_code=503, detail="No valid authentication available")
436
-
437
- request_data = request.model_dump(exclude_none=True)
438
- target_model = "0727-360B-API"
439
-
440
- request_data = {
441
- "stream": True,
442
- "model": target_model,
443
- "messages": request_data["messages"],
444
- "background_tasks": {
445
- "title_generation": True,
446
- "tags_generation": True
447
- },
448
- "chat_id": str(uuid.uuid4()),
449
- "features": {
450
- "image_generation": False,
451
- "code_interpreter": False,
452
- "web_search": False,
453
- "auto_web_search": False
454
- },
455
- "id": str(uuid.uuid4()),
456
- "mcp_servers": ["deep-web-search"],
457
- "model_item": {
458
- "id": target_model,
459
- "name": "GLM-4.5",
460
- "owned_by": "openai"
461
- },
462
- "params": {},
463
- "tool_servers": [],
464
- "variables": {
465
- "{{USER_NAME}}": "User",
466
- "{{USER_LOCATION}}": "Unknown",
467
- "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56"
468
- }
469
- }
470
-
471
- headers = {
472
- "Content-Type": "application/json",
473
- "Authorization": f"Bearer {cookie}",
474
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
475
- "Accept": "application/json, text/event-stream",
476
- "Accept-Language": "zh-CN",
477
- "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
478
- "sec-ch-ua-mobile": "?0",
479
- "sec-ch-ua-platform": '"macOS"',
480
- "x-fe-version": "prod-fe-1.0.53",
481
- "Origin": "https://chat.z.ai",
482
- "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
483
- }
484
-
485
- completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
486
- current_phase = None
487
- role_sent = False
488
-
489
- try:
490
- async with httpx.AsyncClient(
491
- timeout=httpx.Timeout(60.0, read=300.0),
492
- limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
493
- http2=True
494
- ) as stream_client:
495
- async with stream_client.stream(
496
- "POST",
497
- settings.UPSTREAM_URL,
498
- json=request_data,
499
- headers=headers
500
- ) as response:
501
-
502
- if response.status_code == 401:
503
- await cookie_manager.mark_cookie_failed(cookie)
504
- raise HTTPException(status_code=401, detail="Invalid authentication")
505
- if response.status_code != 200:
506
- await cookie_manager.mark_cookie_failed(cookie)
507
- raise HTTPException(status_code=response.status_code, detail="Upstream error")
508
-
509
- await cookie_manager.mark_cookie_success(cookie)
510
-
511
- buffer = ""
512
- async for chunk in response.aiter_text(chunk_size=1024):
513
- if not chunk:
514
- continue
515
-
516
- buffer += chunk
517
- while "\n" in buffer:
518
- line, buffer = buffer.split("\n", 1)
519
- line = line.strip()
520
-
521
- if not line.startswith("data: "):
522
- continue
523
-
524
- payload = line[6:].strip()
525
- if payload == "[DONE]":
526
- final_chunk = {
527
- "id": completion_id,
528
- "object": "chat.completion.chunk",
529
- "created": int(time.time()),
530
- "model": request.model,
531
- "choices": [{
532
- "index": 0,
533
- "delta": {},
534
- "finish_reason": "stop"
535
- }]
536
- }
537
- yield f"data: {json.dumps(final_chunk)}\n\n"
538
- yield "data: [DONE]\n\n"
539
- return
540
-
541
- try:
542
- parsed = json.loads(payload)
543
- data = parsed.get("data", {})
544
- delta_content = data.get("delta_content", "")
545
- edit_content = data.get("edit_content", "")
546
- phase = data.get("phase", "")
547
-
548
- content_piece = delta_content if delta_content else edit_content
549
-
550
- if not role_sent:
551
- role_sent = True
552
- role_chunk = {
553
- "id": completion_id,
554
- "object": "chat.completion.chunk",
555
- "created": int(time.time()),
556
- "model": request.model,
557
- "choices": [{
558
- "index": 0,
559
- "delta": {"role": "assistant"},
560
- "finish_reason": None
561
- }]
562
- }
563
- yield f"data: {json.dumps(role_chunk)}\n\n"
564
-
565
- should_send_content = True
566
- if not settings.SHOW_THINK_TAGS and phase == "thinking":
567
- should_send_content = False
568
-
569
- if content_piece and should_send_content:
570
- transformed_delta = content_piece
571
- if settings.SHOW_THINK_TAGS:
572
- transformed_delta = re.sub(r'<details[^>]*>', '<think>', transformed_delta)
573
- transformed_delta = transformed_delta.replace('</details>', '</think>')
574
- transformed_delta = re.sub(r'<summary>.*?</summary>', '', transformed_delta, flags=re.DOTALL)
575
- if settings.SHOW_THINK_TAGS and phase != "thinking" and transformed_delta and not transformed_delta.startswith("</think>"):
576
- if transformed_delta.lstrip().startswith(">") or transformed_delta[0].isalnum():
577
- transformed_delta = "</think>\n" + transformed_delta
578
-
579
- openai_chunk = {
580
- "id": completion_id,
581
- "object": "chat.completion.chunk",
582
- "created": int(time.time()),
583
- "model": request.model,
584
- "choices": [{
585
- "index": 0,
586
- "delta": {
587
- "content": transformed_delta
588
- },
589
- "finish_reason": None
590
- }]
591
- }
592
- yield f"data: {json.dumps(openai_chunk)}\n\n"
593
- except json.JSONDecodeError:
594
- continue
595
- except httpx.RequestError as e:
596
- logger.error(f"Streaming request error: {e}")
597
- await cookie_manager.mark_cookie_failed(cookie)
598
- raise HTTPException(status_code=503, detail=f"Upstream service unavailable: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Proxy handler for Z.AI API requests
3
+ """
4
+
5
+ import json
6
+ import logging
7
+ import re
8
+ import time
9
+ from typing import AsyncGenerator, Dict, Any, Optional
10
+ import httpx
11
+ from fastapi import HTTPException
12
+ from fastapi.responses import StreamingResponse
13
+
14
+ from config import settings
15
+ from cookie_manager import cookie_manager
16
+ from models import (
17
+ ChatCompletionRequest,
18
+ ChatCompletionResponse,
19
+ ChatCompletionStreamResponse,
20
+ )
21
+
22
+ logger = logging.getLogger(__name__)
23
+
24
+
25
+ class ProxyHandler:
26
+ def __init__(self):
27
+ # Configure httpx client for streaming support
28
+ self.client = httpx.AsyncClient(
29
+ timeout=httpx.Timeout(60.0, read=300.0), # Longer read timeout for streaming
30
+ limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
31
+ http2=True # Enable HTTP/2 for better streaming performance
32
+ )
33
+
34
+ async def __aenter__(self):
35
+ return self
36
+
37
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
38
+ await self.client.aclose()
39
+
40
+ def transform_content(self, content: str) -> str:
41
+ """Transform content by replacing HTML tags and optionally removing think tags"""
42
+ if not content:
43
+ return content
44
+
45
+ logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
46
+
47
+ # Optionally remove thinking content based on configuration
48
+ if not settings.SHOW_THINK_TAGS:
49
+ logger.debug("Removing thinking content from response")
50
+ original_length = len(content)
51
+
52
+ # Remove <thinking>...</thinking> blocks (thinking content) - handle both closed and unclosed tags
53
+ # First try to remove complete <thinking>...</thinking> blocks
54
+ content = re.sub(
55
+ r"<thinking>.*?</thinking>", "", content, flags=re.DOTALL
56
+ )
57
+
58
+ # Then remove any remaining <thinking> opening tags and everything after them until we hit answer content
59
+ # Look for pattern: <thinking>...content... and remove the thinking part
60
+ content = re.sub(
61
+ r"<thinking>.*?(?=\s[A-Z]|\s\d|\s*$)",
62
+ "",
63
+ content,
64
+ flags=re.DOTALL,
65
+ )
66
+
67
+ content = content.strip()
68
+
69
+ logger.debug(
70
+ f"Content length after removing thinking content: {original_length} -> {len(content)}"
71
+ )
72
+ else:
73
+ logger.debug("Keeping thinking content, converting to <thinking>...</thinking>")
74
+
75
+ # Remove <thinking> tags and their content
76
+ content = re.sub(r"<thinking>(.*?)</thinking>", r"<thinking>\1</thinking>", content, flags=re.DOTALL)
77
+
78
+ # If there's no closing </thinking>, add it at the end of thinking content
79
+ if "<thinking>" in content:
80
+ # Find where thinking ends and answer begins
81
+ think_start = content.find("<thinking>")
82
+ if think_start != -1:
83
+ # Look for typical answer patterns after thinking
84
+ patterns = [r"\n[A-Z]", r"\n\d+\.", r"\nTo ", r"\nThe "]
85
+
86
+ for pattern in patterns:
87
+ match = re.search(pattern, content[think_start:])
88
+ if match:
89
+ insert_pos = think_start + match.start()
90
+ content = (
91
+ content[:insert_pos].replace("<thinking>", "<thinking>") +
92
+ "</thinking>\n" + content[insert_pos:]
93
+ )
94
+ else:
95
+ content += "</thinking>"
96
+
97
+ return content.strip()
98
+
99
+ async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
100
+ """Proxy request to Z.AI API"""
101
+ cookie = await cookie_manager.get_next_cookie()
102
+ if not cookie:
103
+ raise HTTPException(status_code=503, detail="No available cookies")
104
+
105
+ # Transform model name
106
+ target_model = (
107
+ settings.UPSTREAM_MODEL
108
+ if request.model == settings.MODEL_NAME
109
+ else request.model
110
+ )
111
+
112
+ # Determine if this should be a streaming response
113
+ is_streaming = (
114
+ request.stream if request.stream is not None else settings.DEFAULT_STREAM
115
+ )
116
+
117
+ # Validate parameter compatibility
118
+ if is_streaming and not settings.SHOW_THINK_TAGS:
119
+ logger.warning("SHOW_THINK_TAGS=false is ignored for streaming responses")
120
+
121
+ # Prepare request data
122
+ request_data = request.model_dump(exclude_none=True)
123
+ request_data["model"] = target_model
124
+
125
+ # Build request data based on actual Z.AI format from zai-messages.md
126
+ import uuid
127
+
128
+ request_data = {
129
+ "stream": True, # Always request streaming from Z.AI for processing
130
+ "model": target_model,
131
+ "messages": request_data["messages"],
132
+ "background_tasks": {"title_generation": True, "tags_generation": True},
133
+ "chat_id": str(uuid.uuid4()),
134
+ "features": {
135
+ "image_generation": False,
136
+ "code_interpreter": False,
137
+ "web_search": False,
138
+ "auto_web_search": False,
139
+ },
140
+ "id": str(uuid.uuid4()),
141
+ "mcp_servers": ["deep-web-search"],
142
+ "model_item": {"id": target_model, "name": "GLM-4.5", "owned_by": "openai"},
143
+ "params": {},
144
+ "tool_servers": [],
145
+ "variables": {
146
+ "{{USER_NAME}}": "User",
147
+ "{{USER_LOCATION}}": "Unknown",
148
+ "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56",
149
+ },
150
+ }
151
+
152
+ logger.debug(f"Sending request data: {request_data}")
153
+
154
+ headers = {
155
+ "Content-Type": "application/json",
156
+ "Authorization": f"Bearer {cookie}",
157
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
158
+ "Accept": "application/json, text/event-stream",
159
+ "Accept-Language": "zh-CN",
160
+ "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
161
+ "sec-ch-ua-mobile": "?0",
162
+ "sec-ch-ua-platform": '"macOS"',
163
+ "x-fe-version": "prod-fe-1.0.53",
164
+ "Origin": "https://chat.z.ai",
165
+ "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
166
+ }
167
+
168
+ try:
169
+ # Use client.stream() for TRUE streaming response - this is the key fix!
170
+ async with self.client.stream(
171
+ "POST",
172
+ settings.UPSTREAM_URL,
173
+ json=request_data,
174
+ headers=headers,
175
+ timeout=httpx.Timeout(60.0, read=300.0)
176
+ ) as response:
177
+
178
+ if response.status_code == 401:
179
+ await cookie_manager.mark_cookie_failed(cookie)
180
+ raise HTTPException(status_code=401, detail="Invalid authentication")
181
+
182
+ if response.status_code != 200:
183
+ # For streaming, we need to read the error response properly
184
+ try:
185
+ error_text = await response.aread()
186
+ error_detail = error_text.decode('utf-8')
187
+ except:
188
+ error_detail = f"HTTP {response.status_code}"
189
+
190
+ raise HTTPException(
191
+ status_code=response.status_code,
192
+ detail=f"Upstream error: {error_detail}",
193
+ )
194
+
195
+ await cookie_manager.mark_cookie_success(cookie)
196
+ return {"response": response, "cookie": cookie}
197
+
198
+ except httpx.RequestError as e:
199
+ logger.error(f"Request error: {e}")
200
+ logger.error(f"Request error type: {type(e).__name__}")
201
+ logger.error(f"Request URL: {settings.UPSTREAM_URL}")
202
+ logger.error(f"Request timeout: {self.client.timeout}")
203
+ await cookie_manager.mark_cookie_failed(cookie)
204
+ raise HTTPException(
205
+ status_code=503, detail=f"Upstream service unavailable: {str(e)}"
206
+ )
207
+
208
+ async def process_streaming_response(
209
+ self, response: httpx.Response
210
+ ) -> AsyncGenerator[Dict[str, Any], None]:
211
+ """Process streaming response from Z.AI - TRUE real-time processing"""
212
+ buffer = ""
213
+
214
+ # Use aiter_text with small chunk size for real-time processing
215
+ async for chunk in response.aiter_text(chunk_size=1024): # Small chunks for responsiveness
216
+ if not chunk:
217
+ continue
218
+
219
+ buffer += chunk
220
+
221
+ # Process complete lines immediately
222
+ while "\n" in buffer:
223
+ line, buffer = buffer.split("\n", 1)
224
+ line = line.strip()
225
+
226
+ if not line.startswith("data: "):
227
+ continue
228
+
229
+ payload = line[6:].strip()
230
+ if payload == "[DONE]":
231
+ return
232
+
233
+ try:
234
+ parsed = json.loads(payload)
235
+
236
+ # Check for errors first
237
+ if parsed.get("error") or (parsed.get("data", {}).get("error")):
238
+ error_detail = (
239
+ parsed.get("error", {}).get("detail")
240
+ or parsed.get("data", {}).get("error", {}).get("detail")
241
+ or "Unknown error from upstream"
242
+ )
243
+ logger.error(f"Upstream error: {error_detail}")
244
+ raise HTTPException(
245
+ status_code=400, detail=f"Upstream error: {error_detail}"
246
+ )
247
+
248
+ # Clean up response data
249
+ if parsed.get("data"):
250
+ # Remove unwanted fields for cleaner processing
251
+ parsed["data"].pop("edit_index", None)
252
+ parsed["data"].pop("edit_content", None)
253
+
254
+ # Yield immediately for real-time streaming
255
+ yield parsed
256
+
257
+ except json.JSONDecodeError as e:
258
+ logger.debug(f"JSON decode error (skipping): {e}")
259
+ continue # Skip non-JSON lines
260
+
261
+ async def handle_chat_completion(self, request: ChatCompletionRequest):
262
+ """Handle chat completion request"""
263
+ # Determine final streaming mode
264
+ is_streaming = (
265
+ request.stream if request.stream is not None else settings.DEFAULT_STREAM
266
+ )
267
+
268
+ if is_streaming:
269
+ # For streaming responses, use direct streaming proxy
270
+ return StreamingResponse(
271
+ self.stream_proxy_response(request),
272
+ media_type="text/event-stream",
273
+ headers={
274
+ "Cache-Control": "no-cache",
275
+ "Connection": "keep-alive",
276
+ },
277
+ )
278
+ else:
279
+ # For non-streaming responses, collect all streaming data first
280
+ chunks = []
281
+ async for chunk_data in self.stream_proxy_response(request):
282
+ if chunk_data.startswith("data: ") and not chunk_data.startswith("data: [DONE]"):
283
+ try:
284
+ chunk_json = json.loads(chunk_data[6:])
285
+ if chunk_json.get("choices", [{}])[0].get("delta", {}).get("content"):
286
+ chunks.append(chunk_json["choices"][0]["delta"]["content"])
287
+ except:
288
+ continue
289
+
290
+ # Combine all content
291
+ full_content = "".join(chunks)
292
+
293
+ # Return as non-streaming response
294
+ import time
295
+ import uuid
296
+ return ChatCompletionResponse(
297
+ id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
298
+ created=int(time.time()),
299
+ model=request.model,
300
+ choices=[
301
+ {
302
+ "index": 0,
303
+ "message": {"role": "assistant", "content": full_content},
304
+ "finish_reason": "stop",
305
+ }
306
+ ],
307
+ )
308
+
309
+ async def stream_response(self, response: httpx.Response, model: str) -> AsyncGenerator[str, None]:
310
+ """Generate TRUE streaming response in OpenAI format - real-time processing"""
311
+ import uuid
312
+ import time
313
+
314
+ # Generate a unique completion ID
315
+ completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
316
+ current_phase = None
317
+
318
+ try:
319
+ # Real-time streaming: process each chunk immediately as it arrives
320
+ async for parsed in self.process_streaming_response(response):
321
+ try:
322
+ data = parsed.get("data", {})
323
+ delta_content = data.get("delta_content", "")
324
+ phase = data.get("phase", "")
325
+
326
+ # Track phase changes
327
+ if phase != current_phase:
328
+ current_phase = phase
329
+ logger.debug(f"Phase changed to: {phase}")
330
+
331
+ # Apply filtering based on SHOW_THINK_TAGS and phase
332
+ should_send_content = True
333
+
334
+ if not settings.SHOW_THINK_TAGS and phase == "thinking":
335
+ # Skip thinking content when SHOW_THINK_TAGS=false
336
+ should_send_content = False
337
+ logger.debug(f"Skipping thinking content (SHOW_THINK_TAGS=false)")
338
+
339
+ # Process and send content immediately if we should
340
+ if delta_content and should_send_content:
341
+ # Minimal transformation for real-time streaming
342
+ transformed_delta = delta_content
343
+
344
+ if settings.SHOW_THINK_TAGS:
345
+ # Simple tag replacement for streaming
346
+ transformed_delta = re.sub(r'<thinking[^>]*>', '<thinking>', transformed_delta)
347
+ # Note: Skip complex regex for streaming performance
348
+
349
+ # Create and send OpenAI-compatible chunk immediately
350
+ openai_chunk = {
351
+ "id": completion_id,
352
+ "object": "chat.completion.chunk",
353
+ "created": int(time.time()),
354
+ "model": model,
355
+ "choices": [{
356
+ "index": 0,
357
+ "delta": {
358
+ "content": transformed_delta
359
+ },
360
+ "finish_reason": None
361
+ }]
362
+ }
363
+
364
+ # Yield immediately for real-time streaming
365
+ yield f"data: {json.dumps(openai_chunk)}\n\n"
366
+
367
+ except Exception as e:
368
+ logger.error(f"Error processing streaming chunk: {e}")
369
+ continue
370
+
371
+ # Send final completion chunk
372
+ final_chunk = {
373
+ "id": completion_id,
374
+ "object": "chat.completion.chunk",
375
+ "created": int(time.time()),
376
+ "model": model,
377
+ "choices": [{
378
+ "index": 0,
379
+ "delta": {},
380
+ "finish_reason": "stop"
381
+ }]
382
+ }
383
+
384
+ yield f"data: {json.dumps(final_chunk)}\n\n"
385
+ yield "data: [DONE]\n\n"
386
+
387
+ except Exception as e:
388
+ logger.error(f"Streaming error: {e}")
389
+ # Send error in OpenAI format
390
+ error_chunk = {
391
+ "error": {
392
+ "message": str(e),
393
+ "type": "server_error"
394
+ }
395
+ }
396
+ yield f"data: {json.dumps(error_chunk)}\n\n"
397
+
398
+ async def non_stream_response(
399
+ self, response: httpx.Response, model: str
400
+ ) -> ChatCompletionResponse:
401
+ """Generate non-streaming response"""
402
+ chunks = []
403
+ async for parsed in self.process_streaming_response(response):
404
+ chunks.append(parsed)
405
+ logger.debug(f"Received chunk: {parsed}") # Debug log
406
+
407
+ if not chunks:
408
+ raise HTTPException(status_code=500, detail="No response from upstream")
409
+
410
+ logger.info(f"Total chunks received: {len(chunks)}")
411
+ logger.debug(f"First chunk structure: {chunks[0] if chunks else 'None'}")
412
+
413
+ # Aggregate content based on SHOW_THINK_TAGS setting
414
+ if settings.SHOW_THINK_TAGS:
415
+ # Include all content
416
+ full_content = "".join(
417
+ chunk.get("data", {}).get("delta_content", "") for chunk in chunks
418
+ )
419
+ else:
420
+ # Only include answer phase content
421
+ full_content = "".join(
422
+ chunk.get("data", {}).get("delta_content", "")
423
+ for chunk in chunks
424
+ if chunk.get("data", {}).get("phase") == "answer"
425
+ )
426
+
427
+ logger.info(f"Aggregated content length: {len(full_content)}")
428
+ logger.debug(
429
+ f"Full aggregated content: {full_content}"
430
+ ) # Show full content for debugging
431
+
432
+ # Apply content transformation (including think tag filtering)
433
+ transformed_content = self.transform_content(full_content)
434
+
435
+ logger.info(f"Transformed content length: {len(transformed_content)}")
436
+ logger.debug(f"Transformed content: {transformed_content[:200]}...")
437
+
438
+ # Create OpenAI-compatible response
439
+ return ChatCompletionResponse(
440
+ id=chunks[0].get("data", {}).get("id", "chatcmpl-unknown"),
441
+ created=int(time.time()),
442
+ model=model,
443
+ choices=[
444
+ {
445
+ "index": 0,
446
+ "message": {"role": "assistant", "content": transformed_content},
447
+ "finish_reason": "stop",
448
+ }
449
+ ],
450
+ )
451
+
452
+ async def stream_proxy_response(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
453
+ """TRUE streaming proxy - direct pass-through with minimal processing"""
454
+ import uuid
455
+ import time
456
+
457
+ # Get cookie
458
+ cookie = await cookie_manager.get_next_cookie()
459
+ if not cookie:
460
+ raise HTTPException(status_code=503, detail="No valid authentication available")
461
+
462
+ # Prepare request data
463
+ request_data = request.model_dump(exclude_none=True)
464
+ target_model = "0727-360B-API" # Map GLM-4.5 to Z.AI model
465
+
466
+ # Build Z.AI request format
467
+ request_data = {
468
+ "stream": True, # Always request streaming from Z.AI
469
+ "model": target_model,
470
+ "messages": request_data["messages"],
471
+ "background_tasks": {
472
+ "title_generation": True,
473
+ "tags_generation": True
474
+ },
475
+ "chat_id": str(uuid.uuid4()),
476
+ "features": {
477
+ "image_generation": False,
478
+ "code_interpreter": False,
479
+ "web_search": False,
480
+ "auto_web_search": False
481
+ },
482
+ "id": str(uuid.uuid4()),
483
+ "mcp_servers": ["deep-web-search"],
484
+ "model_item": {
485
+ "id": target_model,
486
+ "name": "GLM-4.5",
487
+ "owned_by": "openai"
488
+ },
489
+ "params": {},
490
+ "tool_servers": [],
491
+ "variables": {
492
+ "{{USER_NAME}}": "User",
493
+ "{{USER_LOCATION}}": "Unknown",
494
+ "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56"
495
+ }
496
+ }
497
+
498
+ headers = {
499
+ "Content-Type": "application/json",
500
+ "Authorization": f"Bearer {cookie}",
501
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
502
+ "Accept": "application/json, text/event-stream",
503
+ "Accept-Language": "zh-CN",
504
+ "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
505
+ "sec-ch-ua-mobile": "?0",
506
+ "sec-ch-ua-platform": '"macOS"',
507
+ "x-fe-version": "prod-fe-1.0.53",
508
+ "Origin": "https://chat.z.ai",
509
+ "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
510
+ }
511
+
512
+ completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
513
+ current_phase = None
514
+
515
+ try:
516
+ # Create a new client for this streaming request to avoid conflicts
517
+ async with httpx.AsyncClient(
518
+ timeout=httpx.Timeout(60.0, read=300.0),
519
+ limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
520
+ http2=True
521
+ ) as stream_client:
522
+ async with stream_client.stream(
523
+ "POST",
524
+ settings.UPSTREAM_URL,
525
+ json=request_data,
526
+ headers=headers
527
+ ) as response:
528
+
529
+ if response.status_code == 401:
530
+ await cookie_manager.mark_cookie_failed(cookie)
531
+ raise HTTPException(status_code=401, detail="Invalid authentication")
532
+
533
+ if response.status_code != 200:
534
+ await cookie_manager.mark_cookie_failed(cookie)
535
+ raise HTTPException(status_code=response.status_code, detail="Upstream error")
536
+
537
+ await cookie_manager.mark_cookie_success(cookie)
538
+
539
+ # Process streaming response in real-time
540
+ buffer = ""
541
+ async for chunk in response.aiter_text(chunk_size=1024):
542
+ if not chunk:
543
+ continue
544
+
545
+ buffer += chunk
546
+
547
+ # Process complete lines immediately
548
+ while "\n" in buffer:
549
+ line, buffer = buffer.split("\n", 1)
550
+ line = line.strip()
551
+
552
+ if not line.startswith("data: "):
553
+ continue
554
+
555
+ payload = line[6:].strip()
556
+ if payload == "[DONE]":
557
+ # Send final chunk and done
558
+ final_chunk = {
559
+ "id": completion_id,
560
+ "object": "chat.completion.chunk",
561
+ "created": int(time.time()),
562
+ "model": request.model,
563
+ "choices": [{
564
+ "index": 0,
565
+ "delta": {},
566
+ "finish_reason": "stop"
567
+ }]
568
+ }
569
+ yield f"data: {json.dumps(final_chunk)}\n\n"
570
+ yield "data: [DONE]\n\n"
571
+ return
572
+
573
+ try:
574
+ parsed = json.loads(payload)
575
+ data = parsed.get("data", {})
576
+ delta_content = data.get("delta_content", "")
577
+ phase = data.get("phase", "")
578
+
579
+ # Track phase changes
580
+ if phase != current_phase:
581
+ current_phase = phase
582
+ logger.debug(f"Phase changed to: {phase}")
583
+
584
+ # Apply filtering based on SHOW_THINK_TAGS and phase
585
+ should_send_content = True
586
+
587
+ if not settings.SHOW_THINK_TAGS and phase == "thinking":
588
+ should_send_content = False
589
+
590
+ # Process and send content immediately if we should
591
+ if delta_content and should_send_content:
592
+ # Minimal transformation for real-time streaming
593
+ transformed_delta = delta_content
594
+
595
+ if settings.SHOW_THINK_TAGS:
596
+ # Simple tag replacement for streaming
597
+ transformed_delta = re.sub(r'<thinking[^>]*>', '<thinking>', transformed_delta)
598
+
599
+ # Create and send OpenAI-compatible chunk immediately
600
+ openai_chunk = {
601
+ "id": completion_id,
602
+ "object": "chat.completion.chunk",
603
+ "created": int(time.time()),
604
+ "model": request.model,
605
+ "choices": [{
606
+ "index": 0,
607
+ "delta": {
608
+ "content": transformed_delta
609
+ },
610
+ "finish_reason": None
611
+ }]
612
+ }
613
+
614
+ # Yield immediately for real-time streaming
615
+ yield f"data: {json.dumps(openai_chunk)}\n\n"
616
+
617
+ except json.JSONDecodeError:
618
+ continue # Skip non-JSON lines
619
+
620
+ except httpx.RequestError as e:
621
+ logger.error(f"Streaming request error: {e}")
622
+ await cookie_manager.mark_cookie_failed(cookie)
623
+ raise HTTPException(status_code=503, detail=f"Upstream service unavailable: {str(e)}")