bluewinliang commited on
Commit
57c7071
·
verified ·
1 Parent(s): e1d2fea

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +218 -553
proxy_handler.py CHANGED
@@ -6,7 +6,9 @@ import json
6
  import logging
7
  import re
8
  import time
9
- from typing import AsyncGenerator, Dict, Any, Optional
 
 
10
  import httpx
11
  from fastapi import HTTPException
12
  from fastapi.responses import StreamingResponse
@@ -16,7 +18,6 @@ from cookie_manager import cookie_manager
16
  from models import (
17
  ChatCompletionRequest,
18
  ChatCompletionResponse,
19
- ChatCompletionStreamResponse,
20
  )
21
 
22
  logger = logging.getLogger(__name__)
@@ -24,11 +25,10 @@ logger = logging.getLogger(__name__)
24
 
25
  class ProxyHandler:
26
  def __init__(self):
27
- # Configure httpx client for streaming support
28
  self.client = httpx.AsyncClient(
29
- timeout=httpx.Timeout(60.0, read=300.0), # Longer read timeout for streaming
30
  limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
31
- http2=True # Enable HTTP/2 for better streaming performance
32
  )
33
 
34
  async def __aenter__(self):
@@ -37,125 +37,43 @@ class ProxyHandler:
37
  async def __aexit__(self, exc_type, exc_val, exc_tb):
38
  await self.client.aclose()
39
 
40
- def transform_content(self, content: str) -> str:
41
- """Transform content by replacing HTML tags and optionally removing think tags"""
42
- if not content:
43
- return content
44
-
45
- logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
46
- logger.debug(f"Original content length: {len(content)}")
47
-
48
- # Optionally remove thinking content based on configuration
49
- if not settings.SHOW_THINK_TAGS:
50
- logger.debug("Removing thinking content from response")
51
- original_length = len(content)
52
-
53
- # Remove <details> blocks (thinking content) - handle both closed and unclosed tags
54
- # First try to remove complete <details>...</details> blocks
55
- content = re.sub(
56
- r"<details[^>]*>.*?</details>", "", content, flags=re.DOTALL
57
- )
58
-
59
- # Then remove any remaining unclosed <details> tags and their content
60
- # Look for <details> tag followed by content until we find what looks like answer content
61
- content = re.sub(
62
- r"<details[^>]*>.*?(?=\n\s*[A-Z][^<\n]*[.!?]|\n\s*\d+\.|\n\s*[-*]\s|$)",
63
- "",
64
- content,
65
- flags=re.DOTALL,
66
- )
67
 
68
- # Remove any remaining <summary> blocks
 
 
 
 
 
 
69
  content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
70
-
71
- content = content.strip()
72
-
73
- logger.debug(
74
- f"Content length after removing thinking content: {original_length} -> {len(content)}"
75
- )
76
  else:
77
- logger.debug("Keeping thinking content, converting to <think> tags")
78
-
79
- # First, remove <summary> tags and their content
80
- content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
81
 
82
- # Replace <details> with <think>
83
- content = re.sub(r"<details[^>]*>", "<think>", content)
84
- content = content.replace("</details>", "</think>")
85
-
86
- # Handle unclosed <think> tags - need better logic to find where thinking ends
87
- if "<think>" in content and "</think>" not in content:
88
- logger.debug("Found unclosed <think> tag, attempting to close it")
89
-
90
- # Find the position of <think>
91
- think_start = content.find("<think>")
92
- if think_start != -1:
93
- # Look for where the actual answer begins
94
- search_content = content[think_start + 7:] # Skip "<think>"
95
-
96
- # Look for patterns that indicate the start of the answer:
97
- patterns = [
98
- r'\n\n+([A-Z][^<\n]*)', # New paragraph starting with capital
99
- r'\n\n+(\d+\.)', # Numbered list
100
- r'\n\n+([A-Z][^<\n]*[.!?])', # Complete sentence
101
- r'\n\n+([*-]\s)', # Bullet point
102
- ]
103
-
104
- answer_start = None
105
- for pattern in patterns:
106
- match = re.search(pattern, search_content)
107
- if match:
108
- answer_start = think_start + 7 + match.start()
109
- logger.debug(f"Found answer start at position {answer_start}")
110
- break
111
-
112
- if answer_start:
113
- # Insert </think> before the answer
114
- content = content[:answer_start] + "\n</think>\n" + content[answer_start:].lstrip()
115
- else:
116
- # If no clear answer found, close at the end
117
- logger.debug("No clear answer pattern found, closing think tag at end")
118
- content = content.rstrip() + "\n</think>"
119
-
120
- # Clean up multiple newlines and spacing
121
- content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content)
122
-
123
- logger.debug(f"Final transformed content length: {len(content)}")
124
  return content.strip()
125
 
126
- async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
127
- """Proxy request to Z.AI API"""
 
128
  cookie = await cookie_manager.get_next_cookie()
129
  if not cookie:
130
  raise HTTPException(status_code=503, detail="No available cookies")
131
 
132
- # Transform model name
133
- target_model = (
134
- settings.UPSTREAM_MODEL
135
- if request.model == settings.MODEL_NAME
136
- else request.model
137
- )
138
-
139
- # Determine if this should be a streaming response
140
- is_streaming = (
141
- request.stream if request.stream is not None else settings.DEFAULT_STREAM
142
- )
143
-
144
- # Validate parameter compatibility
145
- if is_streaming and not settings.SHOW_THINK_TAGS:
146
- logger.warning("SHOW_THINK_TAGS=false is ignored for streaming responses")
147
 
148
- # Prepare request data
149
- request_data = request.model_dump(exclude_none=True)
150
- request_data["model"] = target_model
151
-
152
- # Build request data based on actual Z.AI format from zai-messages.md
153
- import uuid
154
-
155
- request_data = {
156
- "stream": True, # Always request streaming from Z.AI for processing
157
  "model": target_model,
158
- "messages": request_data["messages"],
159
  "background_tasks": {"title_generation": True, "tags_generation": True},
160
  "chat_id": str(uuid.uuid4()),
161
  "features": {
@@ -163,6 +81,7 @@ class ProxyHandler:
163
  "code_interpreter": False,
164
  "web_search": False,
165
  "auto_web_search": False,
 
166
  },
167
  "id": str(uuid.uuid4()),
168
  "mcp_servers": ["deep-web-search"],
@@ -172,16 +91,17 @@ class ProxyHandler:
172
  "variables": {
173
  "{{USER_NAME}}": "User",
174
  "{{USER_LOCATION}}": "Unknown",
175
- "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56",
176
  },
177
  }
178
 
179
- logger.debug(f"Sending request data: {request_data}")
180
-
181
  headers = {
182
  "Content-Type": "application/json",
183
  "Authorization": f"Bearer {cookie}",
184
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
 
 
 
185
  "Accept": "application/json, text/event-stream",
186
  "Accept-Language": "zh-CN",
187
  "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
@@ -189,410 +109,54 @@ class ProxyHandler:
189
  "sec-ch-ua-platform": '"macOS"',
190
  "x-fe-version": "prod-fe-1.0.53",
191
  "Origin": "https://chat.z.ai",
192
- "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
193
  }
 
194
 
195
- try:
196
- # Use client.stream() for TRUE streaming response - this is the key fix!
197
- async with self.client.stream(
198
- "POST",
199
- settings.UPSTREAM_URL,
200
- json=request_data,
201
- headers=headers,
202
- timeout=httpx.Timeout(60.0, read=300.0)
203
- ) as response:
204
-
205
- if response.status_code == 401:
206
- await cookie_manager.mark_cookie_failed(cookie)
207
- raise HTTPException(status_code=401, detail="Invalid authentication")
208
-
209
- if response.status_code != 200:
210
- # For streaming, we need to read the error response properly
211
- try:
212
- error_text = await response.aread()
213
- error_detail = error_text.decode('utf-8')
214
- except:
215
- error_detail = f"HTTP {response.status_code}"
216
-
217
- raise HTTPException(
218
- status_code=response.status_code,
219
- detail=f"Upstream error: {error_detail}",
220
- )
221
-
222
- await cookie_manager.mark_cookie_success(cookie)
223
- return {"response": response, "cookie": cookie}
224
-
225
- except httpx.RequestError as e:
226
- logger.error(f"Request error: {e}")
227
- logger.error(f"Request error type: {type(e).__name__}")
228
- logger.error(f"Request URL: {settings.UPSTREAM_URL}")
229
- logger.error(f"Request timeout: {self.client.timeout}")
230
- await cookie_manager.mark_cookie_failed(cookie)
231
- raise HTTPException(
232
- status_code=503, detail=f"Upstream service unavailable: {str(e)}"
233
- )
234
-
235
- async def process_streaming_response(
236
- self, response: httpx.Response
237
- ) -> AsyncGenerator[Dict[str, Any], None]:
238
- """Process streaming response from Z.AI - TRUE real-time processing"""
239
- buffer = ""
240
-
241
- # Use aiter_text with small chunk size for real-time processing
242
- async for chunk in response.aiter_text(chunk_size=1024): # Small chunks for responsiveness
243
- if not chunk:
244
- continue
245
-
246
- buffer += chunk
247
-
248
- # Process complete lines immediately
249
- while "\n" in buffer:
250
- line, buffer = buffer.split("\n", 1)
251
- line = line.strip()
252
-
253
- if not line.startswith("data: "):
254
- continue
255
-
256
- payload = line[6:].strip()
257
- if payload == "[DONE]":
258
- return
259
-
260
- try:
261
- parsed = json.loads(payload)
262
-
263
- # Check for errors first
264
- if parsed.get("error") or (parsed.get("data", {}).get("error")):
265
- error_detail = (
266
- parsed.get("error", {}).get("detail")
267
- or parsed.get("data", {}).get("error", {}).get("detail")
268
- or "Unknown error from upstream"
269
- )
270
- logger.error(f"Upstream error: {error_detail}")
271
- raise HTTPException(
272
- status_code=400, detail=f"Upstream error: {error_detail}"
273
- )
274
-
275
- # Clean up response data
276
- if parsed.get("data"):
277
- # Remove unwanted fields for cleaner processing
278
- parsed["data"].pop("edit_index", None)
279
- parsed["data"].pop("edit_content", None)
280
-
281
- # Yield immediately for real-time streaming
282
- yield parsed
283
-
284
- except json.JSONDecodeError as e:
285
- logger.debug(f"JSON decode error (skipping): {e}")
286
- continue # Skip non-JSON lines
287
-
288
- async def handle_chat_completion(self, request: ChatCompletionRequest):
289
- """Handle chat completion request"""
290
- # Determine final streaming mode
291
- is_streaming = (
292
- request.stream if request.stream is not None else settings.DEFAULT_STREAM
293
- )
294
-
295
- if is_streaming:
296
- # For streaming responses, use direct streaming proxy
297
- return StreamingResponse(
298
- self.stream_proxy_response(request),
299
- media_type="text/event-stream",
300
- headers={
301
- "Cache-Control": "no-cache",
302
- "Connection": "keep-alive",
303
- },
304
- )
305
- else:
306
- # For non-streaming responses, collect all streaming data first
307
- chunks = []
308
- async for chunk_data in self.stream_proxy_response(request):
309
- if chunk_data.startswith("data: ") and not chunk_data.startswith("data: [DONE]"):
310
- try:
311
- chunk_json = json.loads(chunk_data[6:])
312
- if chunk_json.get("choices", [{}])[0].get("delta", {}).get("content"):
313
- chunks.append(chunk_json["choices"][0]["delta"]["content"])
314
- except:
315
- continue
316
-
317
- # Combine all content
318
- full_content = "".join(chunks)
319
-
320
- # Apply final transformation for non-streaming response
321
- transformed_content = self.transform_content(full_content)
322
-
323
- # Return as non-streaming response
324
- import time
325
- import uuid
326
- return ChatCompletionResponse(
327
- id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
328
- created=int(time.time()),
329
- model=request.model,
330
- choices=[
331
- {
332
- "index": 0,
333
- "message": {"role": "assistant", "content": transformed_content},
334
- "finish_reason": "stop",
335
- }
336
- ],
337
- )
338
-
339
- async def stream_response(self, response: httpx.Response, model: str) -> AsyncGenerator[str, None]:
340
- """Generate TRUE streaming response in OpenAI format - real-time processing"""
341
- import uuid
342
- import time
343
-
344
- # Generate a unique completion ID
345
- completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
346
- current_phase = None
347
-
348
- try:
349
- # Real-time streaming: process each chunk immediately as it arrives
350
- async for parsed in self.process_streaming_response(response):
351
- try:
352
- data = parsed.get("data", {})
353
- delta_content = data.get("delta_content", "")
354
- phase = data.get("phase", "")
355
-
356
- # Track phase changes
357
- if phase != current_phase:
358
- current_phase = phase
359
- logger.debug(f"Phase changed to: {phase}")
360
-
361
- # Apply filtering based on SHOW_THINK_TAGS and phase
362
- should_send_content = True
363
-
364
- if not settings.SHOW_THINK_TAGS and phase == "thinking":
365
- # Skip thinking content when SHOW_THINK_TAGS=false
366
- should_send_content = False
367
- logger.debug(f"Skipping thinking content (SHOW_THINK_TAGS=false)")
368
-
369
- # Process and send content immediately if we should
370
- if delta_content and should_send_content:
371
- # Minimal transformation for real-time streaming - NO TAG PROCESSING!
372
- transformed_delta = delta_content
373
-
374
- # Only do basic replacements without complex regex for streaming
375
- if settings.SHOW_THINK_TAGS and ('<details' in transformed_delta or '</details>' in transformed_delta):
376
- # Simple string replacement only
377
- if '<details' in transformed_delta:
378
- transformed_delta = '<think>'
379
- elif '</details>' in transformed_delta:
380
- transformed_delta = '</think>'
381
- # Remove summary content if present
382
- if '<summary>' in transformed_delta or '</summary>' in transformed_delta:
383
- transformed_delta = '' # Skip summary content entirely
384
-
385
- # Create and send OpenAI-compatible chunk immediately
386
- if transformed_delta: # Only send if there's actual content
387
- openai_chunk = {
388
- "id": completion_id,
389
- "object": "chat.completion.chunk",
390
- "created": int(time.time()),
391
- "model": model,
392
- "choices": [{
393
- "index": 0,
394
- "delta": {
395
- "content": transformed_delta
396
- },
397
- "finish_reason": None
398
- }]
399
- }
400
-
401
- # Yield immediately for real-time streaming
402
- yield f"data: {json.dumps(openai_chunk)}\n\n"
403
-
404
- except Exception as e:
405
- logger.error(f"Error processing streaming chunk: {e}")
406
- continue
407
-
408
- # Send final completion chunk
409
- final_chunk = {
410
- "id": completion_id,
411
- "object": "chat.completion.chunk",
412
- "created": int(time.time()),
413
- "model": model,
414
- "choices": [{
415
- "index": 0,
416
- "delta": {},
417
- "finish_reason": "stop"
418
- }]
419
- }
420
-
421
- yield f"data: {json.dumps(final_chunk)}\n\n"
422
- yield "data: [DONE]\n\n"
423
-
424
- except Exception as e:
425
- logger.error(f"Streaming error: {e}")
426
- # Send error in OpenAI format
427
- error_chunk = {
428
- "error": {
429
- "message": str(e),
430
- "type": "server_error"
431
- }
432
- }
433
- yield f"data: {json.dumps(error_chunk)}\n\n"
434
-
435
- async def non_stream_response(
436
- self, response: httpx.Response, model: str
437
- ) -> ChatCompletionResponse:
438
- """Generate non-streaming response"""
439
- chunks = []
440
- async for parsed in self.process_streaming_response(response):
441
- chunks.append(parsed)
442
- logger.debug(f"Received chunk: {parsed}") # Debug log
443
-
444
- if not chunks:
445
- raise HTTPException(status_code=500, detail="No response from upstream")
446
-
447
- logger.info(f"Total chunks received: {len(chunks)}")
448
- logger.debug(f"First chunk structure: {chunks[0] if chunks else 'None'}")
449
-
450
- # Aggregate content based on SHOW_THINK_TAGS setting
451
- if settings.SHOW_THINK_TAGS:
452
- # Include all content
453
- full_content = "".join(
454
- chunk.get("data", {}).get("delta_content", "") for chunk in chunks
455
- )
456
- else:
457
- # Only include answer phase content
458
- full_content = "".join(
459
- chunk.get("data", {}).get("delta_content", "")
460
- for chunk in chunks
461
- if chunk.get("data", {}).get("phase") == "answer"
462
- )
463
-
464
- logger.info(f"Aggregated content length: {len(full_content)}")
465
- logger.debug(
466
- f"Full aggregated content: {full_content}"
467
- ) # Show full content for debugging
468
-
469
- # Apply content transformation (including think tag filtering)
470
- transformed_content = self.transform_content(full_content)
471
-
472
- logger.info(f"Transformed content length: {len(transformed_content)}")
473
- logger.debug(f"Transformed content: {transformed_content[:200]}...")
474
-
475
- # Create OpenAI-compatible response
476
- return ChatCompletionResponse(
477
- id=chunks[0].get("data", {}).get("id", "chatcmpl-unknown"),
478
- created=int(time.time()),
479
- model=model,
480
- choices=[
481
- {
482
- "index": 0,
483
- "message": {"role": "assistant", "content": transformed_content},
484
- "finish_reason": "stop",
485
- }
486
- ],
487
- )
488
-
489
  async def stream_proxy_response(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
490
- """TRUE streaming proxy - direct pass-through with minimal processing"""
491
- import uuid
492
- import time
493
-
494
- # Get cookie
495
- cookie = await cookie_manager.get_next_cookie()
496
- if not cookie:
497
- raise HTTPException(status_code=503, detail="No valid authentication available")
498
-
499
- # Prepare request data
500
- request_data = request.model_dump(exclude_none=True)
501
- target_model = "0727-360B-API" # Map GLM-4.5 to Z.AI model
502
-
503
- # Build Z.AI request format
504
- request_data = {
505
- "stream": True, # Always request streaming from Z.AI
506
- "model": target_model,
507
- "messages": request_data["messages"],
508
- "background_tasks": {
509
- "title_generation": True,
510
- "tags_generation": True
511
- },
512
- "chat_id": str(uuid.uuid4()),
513
- "features": {
514
- "image_generation": False,
515
- "code_interpreter": False,
516
- "web_search": False,
517
- "auto_web_search": False,
518
- "enable_thinking": True,
519
- },
520
- "id": str(uuid.uuid4()),
521
- "mcp_servers": ["deep-web-search"],
522
- "model_item": {
523
- "id": target_model,
524
- "name": "GLM-4.5",
525
- "owned_by": "openai"
526
- },
527
- "params": {},
528
- "tool_servers": [],
529
- "variables": {
530
- "{{USER_NAME}}": "User",
531
- "{{USER_LOCATION}}": "Unknown",
532
- "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56"
533
- }
534
- }
535
-
536
- headers = {
537
- "Content-Type": "application/json",
538
- "Authorization": f"Bearer {cookie}",
539
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
540
- "Accept": "application/json, text/event-stream",
541
- "Accept-Language": "zh-CN",
542
- "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
543
- "sec-ch-ua-mobile": "?0",
544
- "sec-ch-ua-platform": '"macOS"',
545
- "x-fe-version": "prod-fe-1.0.53",
546
- "Origin": "https://chat.z.ai",
547
- "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
548
- }
549
-
550
  completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
 
551
  current_phase = None
552
 
553
  try:
554
- # Create a new client for this streaming request to avoid conflicts
555
- async with httpx.AsyncClient(
556
- timeout=httpx.Timeout(60.0, read=300.0),
557
- limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
558
- http2=True
559
- ) as stream_client:
560
- async with stream_client.stream(
561
- "POST",
562
- settings.UPSTREAM_URL,
563
- json=request_data,
564
- headers=headers
565
- ) as response:
566
-
567
- if response.status_code == 401:
568
- await cookie_manager.mark_cookie_failed(cookie)
569
- raise HTTPException(status_code=401, detail="Invalid authentication")
570
-
571
- if response.status_code != 200:
572
- await cookie_manager.mark_cookie_failed(cookie)
573
- raise HTTPException(status_code=response.status_code, detail="Upstream error")
574
-
575
- await cookie_manager.mark_cookie_success(cookie)
576
-
577
- # Process streaming response in real-time
578
- buffer = ""
579
- async for chunk in response.aiter_text(chunk_size=1024):
580
- if not chunk:
581
- continue
582
 
583
- buffer += chunk
 
 
584
 
585
- # Process complete lines immediately
586
- while "\n" in buffer:
587
- line, buffer = buffer.split("\n", 1)
588
- line = line.strip()
 
 
589
 
590
- if not line.startswith("data: "):
591
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
592
 
593
- payload = line[6:].strip()
594
- if payload == "[DONE]":
595
- # Send final chunk and done
596
  final_chunk = {
597
  "id": completion_id,
598
  "object": "chat.completion.chunk",
@@ -608,62 +172,163 @@ class ProxyHandler:
608
  yield "data: [DONE]\n\n"
609
  return
610
 
 
611
  try:
612
  parsed = json.loads(payload)
613
- data = parsed.get("data", {})
614
- delta_content = data.get("delta_content", "")
615
- phase = data.get("phase", "")
616
-
617
- # Track phase changes
618
- if phase != current_phase:
619
- current_phase = phase
620
- logger.debug(f"Phase changed to: {phase}")
621
-
622
- # Apply filtering based on SHOW_THINK_TAGS and phase
623
- should_send_content = True
624
-
625
- if not settings.SHOW_THINK_TAGS and phase == "thinking":
626
- should_send_content = False
627
-
628
- # Process and send content immediately if we should
629
- if delta_content and should_send_content:
630
- # CRITICAL FIX: For streaming, do MINIMAL processing
631
- transformed_delta = delta_content
632
-
633
- # Only do safe replacements for streaming
634
- if settings.SHOW_THINK_TAGS:
635
- # Check for complete tag patterns only
636
- if transformed_delta == '<details>' or '<details ' in transformed_delta:
637
- transformed_delta = '<think>'
638
- elif transformed_delta == '</details>':
639
- transformed_delta = '</think>'
640
- elif '<summary>' in transformed_delta or '</summary>' in transformed_delta:
641
- # Skip summary content entirely
642
- continue
643
-
644
- # Create and send OpenAI-compatible chunk immediately
645
- if transformed_delta: # Only send if there's content
646
- openai_chunk = {
 
647
  "id": completion_id,
648
  "object": "chat.completion.chunk",
649
  "created": int(time.time()),
650
  "model": request.model,
651
  "choices": [{
652
  "index": 0,
653
- "delta": {
654
- "content": transformed_delta
655
- },
656
  "finish_reason": None
657
  }]
658
  }
 
 
659
 
660
- # Yield immediately for real-time streaming
661
- yield f"data: {json.dumps(openai_chunk)}\n\n"
662
 
663
- except json.JSONDecodeError:
664
- continue # Skip non-JSON lines
 
 
 
 
 
 
 
 
 
 
665
 
666
  except httpx.RequestError as e:
667
- logger.error(f"Streaming request error: {e}")
668
- await cookie_manager.mark_cookie_failed(cookie)
669
- raise HTTPException(status_code=503, detail=f"Upstream service unavailable: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  import logging
7
  import re
8
  import time
9
+ import uuid
10
+ from typing import AsyncGenerator, Dict, Any
11
+
12
  import httpx
13
  from fastapi import HTTPException
14
  from fastapi.responses import StreamingResponse
 
18
  from models import (
19
  ChatCompletionRequest,
20
  ChatCompletionResponse,
 
21
  )
22
 
23
  logger = logging.getLogger(__name__)
 
25
 
26
  class ProxyHandler:
27
  def __init__(self):
 
28
  self.client = httpx.AsyncClient(
29
+ timeout=httpx.Timeout(60.0, read=300.0),
30
  limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
31
+ http2=True,
32
  )
33
 
34
  async def __aenter__(self):
 
37
  async def __aexit__(self, exc_type, exc_val, exc_tb):
38
  await self.client.aclose()
39
 
40
+ # Content transformation utilities
41
+ def _balance_think_tag(self, text: str) -> str:
42
+ """Ensure matching number of <think> and </think> tags"""
43
+ open_cnt = len(re.findall(r"<think>", text))
44
+ close_cnt = len(re.findall(r"</think>", text))
45
+ if open_cnt > close_cnt:
46
+ text += "</think>" * (open_cnt - close_cnt)
47
+ return text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
+ def transform_content(self, content: str) -> str:
50
+ """Transform upstream HTML to <think> format"""
51
+ if settings.SHOW_THINK_TAGS:
52
+ # Replace <details> with <think>
53
+ content = re.sub(r"<details[^>]*>", "<think>", content)
54
+ content = re.sub(r"</details>", "</think>", content)
55
+ # Remove <summary> tags
56
  content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
57
+ content = self._balance_think_tag(content)
 
 
 
 
 
58
  else:
59
+ # Remove entire <details> blocks
60
+ content = re.sub(r"<details[^>]*>.*?</details>", "", content, flags=re.DOTALL)
 
 
61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  return content.strip()
63
 
64
+ # Upstream communication
65
+ async def _prepare_upstream(self, request: ChatCompletionRequest) -> (Dict[str, Any], str, str):
66
+ """Prepare request body and headers for upstream API"""
67
  cookie = await cookie_manager.get_next_cookie()
68
  if not cookie:
69
  raise HTTPException(status_code=503, detail="No available cookies")
70
 
71
+ target_model = settings.UPSTREAM_MODEL if request.model == settings.MODEL_NAME else request.model
 
 
 
 
 
 
 
 
 
 
 
 
 
 
72
 
73
+ req_body = {
74
+ "stream": True,
 
 
 
 
 
 
 
75
  "model": target_model,
76
+ "messages": request.messages,
77
  "background_tasks": {"title_generation": True, "tags_generation": True},
78
  "chat_id": str(uuid.uuid4()),
79
  "features": {
 
81
  "code_interpreter": False,
82
  "web_search": False,
83
  "auto_web_search": False,
84
+ "enable_thinking": True,
85
  },
86
  "id": str(uuid.uuid4()),
87
  "mcp_servers": ["deep-web-search"],
 
91
  "variables": {
92
  "{{USER_NAME}}": "User",
93
  "{{USER_LOCATION}}": "Unknown",
94
+ "{{CURRENT_DATETIME}}": time.strftime("%Y-%m-%d %H:%M:%S"),
95
  },
96
  }
97
 
 
 
98
  headers = {
99
  "Content-Type": "application/json",
100
  "Authorization": f"Bearer {cookie}",
101
+ "User-Agent": (
102
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
103
+ "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"
104
+ ),
105
  "Accept": "application/json, text/event-stream",
106
  "Accept-Language": "zh-CN",
107
  "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
 
109
  "sec-ch-ua-platform": '"macOS"',
110
  "x-fe-version": "prod-fe-1.0.53",
111
  "Origin": "https://chat.z.ai",
112
+ "Referer": "https://chat.z.ai/",
113
  }
114
+ return req_body, headers, cookie
115
 
116
+ # Main streaming logic
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
  async def stream_proxy_response(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
118
+ """Handle streaming proxy response"""
119
+ req_body, headers, cookie = await self._prepare_upstream(request)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
121
+ think_open = False # Track if <think> tag is open
122
  current_phase = None
123
 
124
  try:
125
+ async with self.client.stream("POST", settings.UPSTREAM_URL, json=req_body, headers=headers) as response:
126
+ if response.status_code != 200:
127
+ await cookie_manager.mark_cookie_invalid(cookie)
128
+ raise HTTPException(status_code=response.status_code, detail="Upstream API error")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
129
 
130
+ async for raw_chunk in response.aiter_text():
131
+ if not raw_chunk or raw_chunk.isspace():
132
+ continue
133
 
134
+ # Handle Server-Sent Events format
135
+ lines = raw_chunk.split('\n')
136
+ for line in lines:
137
+ line = line.strip()
138
+ if not line or line.startswith(':'):
139
+ continue
140
 
141
+ if line.startswith('data: '):
142
+ payload = line[6:]
143
+
144
+ if payload == '[DONE]':
145
+ # Close thinking tag if still open
146
+ if think_open and settings.SHOW_THINK_TAGS:
147
+ close_chunk = {
148
+ "id": completion_id,
149
+ "object": "chat.completion.chunk",
150
+ "created": int(time.time()),
151
+ "model": request.model,
152
+ "choices": [{
153
+ "index": 0,
154
+ "delta": {"content": "</think>"},
155
+ "finish_reason": None
156
+ }]
157
+ }
158
+ yield f"data: {json.dumps(close_chunk)}\n\n"
159
 
 
 
 
160
  final_chunk = {
161
  "id": completion_id,
162
  "object": "chat.completion.chunk",
 
172
  yield "data: [DONE]\n\n"
173
  return
174
 
175
+ # Handle normal JSON
176
  try:
177
  parsed = json.loads(payload)
178
+ except json.JSONDecodeError:
179
+ continue
180
+
181
+ data = parsed.get("data", {})
182
+ delta_content = data.get("delta_content", "")
183
+ phase = data.get("phase")
184
+
185
+ # Handle phase changes
186
+ if phase != current_phase:
187
+ current_phase = phase
188
+ if phase == "answer" and think_open and settings.SHOW_THINK_TAGS:
189
+ # Auto-close thinking phase
190
+ auto_close = {
191
+ "id": completion_id,
192
+ "object": "chat.completion.chunk",
193
+ "created": int(time.time()),
194
+ "model": request.model,
195
+ "choices": [{
196
+ "index": 0,
197
+ "delta": {"content": "</think>"},
198
+ "finish_reason": None
199
+ }]
200
+ }
201
+ yield f"data: {json.dumps(auto_close)}\n\n"
202
+ think_open = False
203
+
204
+ # Decide whether to output content
205
+ if phase == "thinking" and not settings.SHOW_THINK_TAGS:
206
+ continue # Skip thinking content
207
+
208
+ if delta_content:
209
+ if phase == "thinking" and settings.SHOW_THINK_TAGS:
210
+ # First time entering thinking phase, add <think> tag
211
+ if not think_open:
212
+ think_chunk = {
213
  "id": completion_id,
214
  "object": "chat.completion.chunk",
215
  "created": int(time.time()),
216
  "model": request.model,
217
  "choices": [{
218
  "index": 0,
219
+ "delta": {"content": "<think>"},
 
 
220
  "finish_reason": None
221
  }]
222
  }
223
+ yield f"data: {json.dumps(think_chunk)}\n\n"
224
+ think_open = True
225
 
226
+ # Transform content
227
+ transformed_content = self.transform_content(delta_content)
228
 
229
+ chunk = {
230
+ "id": completion_id,
231
+ "object": "chat.completion.chunk",
232
+ "created": int(time.time()),
233
+ "model": request.model,
234
+ "choices": [{
235
+ "index": 0,
236
+ "delta": {"content": transformed_content},
237
+ "finish_reason": None
238
+ }]
239
+ }
240
+ yield f"data: {json.dumps(chunk)}\n\n"
241
 
242
  except httpx.RequestError as e:
243
+ await cookie_manager.mark_cookie_invalid(cookie)
244
+ logger.error(f"Request error: {e}")
245
+ raise HTTPException(status_code=502, detail="Upstream connection error")
246
+ except Exception as e:
247
+ logger.error(f"Unexpected error: {e}")
248
+ raise HTTPException(status_code=500, detail="Internal server error")
249
+
250
+ async def non_stream_proxy_response(self, request: ChatCompletionRequest) -> ChatCompletionResponse:
251
+ """Handle non-streaming proxy response"""
252
+ req_body, headers, cookie = await self._prepare_upstream(request)
253
+ thinking_buf = []
254
+ answer_buf = []
255
+ current_phase = None
256
+
257
+ try:
258
+ async with self.client.stream("POST", settings.UPSTREAM_URL, json=req_body, headers=headers) as response:
259
+ if response.status_code != 200:
260
+ await cookie_manager.mark_cookie_invalid(cookie)
261
+ raise HTTPException(status_code=response.status_code, detail="Upstream API error")
262
+
263
+ async for raw_chunk in response.aiter_text():
264
+ if not raw_chunk or raw_chunk.isspace():
265
+ continue
266
+
267
+ lines = raw_chunk.split('\n')
268
+ for line in lines:
269
+ line = line.strip()
270
+ if not line or line.startswith(':') or not line.startswith('data: '):
271
+ continue
272
+
273
+ payload = line[6:]
274
+ if payload == '[DONE]':
275
+ break
276
+
277
+ try:
278
+ parsed = json.loads(payload)
279
+ except json.JSONDecodeError:
280
+ continue
281
+
282
+ data = parsed.get("data", {})
283
+ delta_content = data.get("delta_content", "")
284
+ phase = data.get("phase")
285
+
286
+ if phase != current_phase:
287
+ current_phase = phase
288
+
289
+ if delta_content:
290
+ if phase == "thinking":
291
+ thinking_buf.append(delta_content)
292
+ elif phase == "answer":
293
+ answer_buf.append(delta_content)
294
+
295
+ # Combine thinking and answer content
296
+ if settings.SHOW_THINK_TAGS and thinking_buf:
297
+ thinking_text = "<think>" + "".join(thinking_buf) + "</think>" if thinking_buf else ""
298
+ final_text = thinking_text + "".join(answer_buf)
299
+ else:
300
+ final_text = "".join(answer_buf)
301
+
302
+ final_text = self.transform_content(final_text)
303
+
304
+ return ChatCompletionResponse(
305
+ id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
306
+ created=int(time.time()),
307
+ model=request.model,
308
+ choices=[{
309
+ "index": 0,
310
+ "message": {"role": "assistant", "content": final_text},
311
+ "finish_reason": "stop"
312
+ }],
313
+ )
314
+
315
+ except httpx.RequestError as e:
316
+ await cookie_manager.mark_cookie_invalid(cookie)
317
+ logger.error(f"Request error: {e}")
318
+ raise HTTPException(status_code=502, detail="Upstream connection error")
319
+ except Exception as e:
320
+ logger.error(f"Unexpected error: {e}")
321
+ raise HTTPException(status_code=500, detail="Internal server error")
322
+
323
+ # FastAPI entry point
324
+ async def handle_chat_completion(self, request: ChatCompletionRequest):
325
+ """Main handler for chat completion requests"""
326
+ stream = bool(request.stream) if request.stream is not None else settings.DEFAULT_STREAM
327
+ if stream:
328
+ return StreamingResponse(
329
+ self.stream_proxy_response(request),
330
+ media_type="text/event-stream",
331
+ headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
332
+ )
333
+ else:
334
+ return await self.non_stream_proxy_response(request)