bluewinliang commited on
Commit
9cc9909
·
verified ·
1 Parent(s): 8cbffea

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +533 -109
proxy_handler.py CHANGED
@@ -6,8 +6,9 @@ import json
6
  import logging
7
  import re
8
  import time
9
- from typing import AsyncGenerator, Dict, Any
10
  import httpx
 
11
  from fastapi.responses import StreamingResponse
12
 
13
  from config import settings
@@ -15,6 +16,7 @@ from cookie_manager import cookie_manager
15
  from models import (
16
  ChatCompletionRequest,
17
  ChatCompletionResponse,
 
18
  )
19
 
20
  logger = logging.getLogger(__name__)
@@ -22,25 +24,251 @@ logger = logging.getLogger(__name__)
22
 
23
  class ProxyHandler:
24
  def __init__(self):
25
- pass
 
 
 
 
 
26
 
27
  async def __aenter__(self):
28
  return self
29
 
30
  async def __aexit__(self, exc_type, exc_val, exc_tb):
31
- pass
32
 
33
- async def handle_chat_completion(self, request: ChatCompletionRequest):
34
- """Handle chat completion request by streaming or aggregating."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  is_streaming = (
36
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
37
  )
38
 
39
- response_generator = self._stream_proxy_logic(request)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
  if is_streaming:
 
42
  return StreamingResponse(
43
- response_generator,
44
  media_type="text/event-stream",
45
  headers={
46
  "Cache-Control": "no-cache",
@@ -48,159 +276,355 @@ class ProxyHandler:
48
  },
49
  )
50
  else:
51
- full_content = ""
52
- final_id = f"chatcmpl-{time.time()}"
53
- async for chunk_data in response_generator:
54
  if chunk_data.startswith("data: ") and not chunk_data.startswith("data: [DONE]"):
55
  try:
56
- chunk_json_str = chunk_data[6:].strip()
57
- if not chunk_json_str:
58
- continue
59
- chunk_json = json.loads(chunk_json_str)
60
- final_id = chunk_json.get("id", final_id)
61
- delta = chunk_json.get("choices", [{}])[0].get("delta", {})
62
- if delta and delta.get("content"):
63
- full_content += delta["content"]
64
- except json.JSONDecodeError:
65
- logger.warning(f"Could not decode JSON from stream chunk: {chunk_data}")
66
  continue
67
-
 
 
 
 
 
 
 
 
 
68
  return ChatCompletionResponse(
69
- id=final_id,
70
  created=int(time.time()),
71
  model=request.model,
72
  choices=[
73
  {
74
  "index": 0,
75
- "message": {"role": "assistant", "content": full_content.strip()},
76
  "finish_reason": "stop",
77
  }
78
  ],
79
  )
80
 
81
- async def _stream_proxy_logic(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
82
- """
83
- Core logic for streaming from Z.AI and transforming to OpenAI format.
84
- """
85
- cookie = await cookie_manager.get_next_cookie()
86
- if not cookie:
87
- error_chunk = {"error": {"message": "No available cookies", "type": "server_error", "code": 503}}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  yield f"data: {json.dumps(error_chunk)}\n\n"
89
- return
90
-
91
- target_model = settings.UPSTREAM_MODEL
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  import uuid
93
-
94
- # KEY FIX: Add "enable_thinking": True to the features object to force the thinking phase.
 
 
 
 
 
 
 
 
 
 
95
  request_data = {
96
- "stream": True,
97
  "model": target_model,
98
- "messages": request.model_dump(exclude_none=True)["messages"],
 
 
 
 
 
99
  "features": {
100
  "image_generation": False,
101
  "code_interpreter": False,
102
  "web_search": False,
103
  "auto_web_search": False,
104
- "enable_thinking": True, # This is the critical flag.
105
  },
106
  "id": str(uuid.uuid4()),
107
- "model_item": {"id": target_model, "name": "GLM-4.5", "owned_by": "openai"},
 
 
 
 
 
 
 
108
  "variables": {
109
  "{{USER_NAME}}": "User",
110
  "{{USER_LOCATION}}": "Unknown",
111
- "{{CURRENT_DATETIME}}": time.strftime("%Y-%m-%d %H:%M:%S"),
112
- },
113
  }
114
 
115
  headers = {
116
- "Content-Type": "application/json", "Authorization": f"Bearer {cookie}",
 
117
  "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
118
- "Accept": "application/json, text/event-stream", "Accept-Language": "zh-CN",
119
- "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', "sec-ch-ua-mobile": "?0",
120
- "sec-ch-ua-platform": '"macOS"', "x-fe-version": "prod-fe-1.0.53", "Origin": "https://chat.z.ai",
121
- "Referer": "https://chat.z.ai/",
 
 
 
 
122
  }
123
 
124
  completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
125
- buffer = ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126
 
127
- async with httpx.AsyncClient(
128
- timeout=httpx.Timeout(60.0, read=300.0), limits=httpx.Limits(max_connections=100, max_keepalive_connections=20), http2=True
129
- ) as client:
130
- try:
131
- async with client.stream("POST", settings.UPSTREAM_URL, json=request_data, headers=headers) as response:
132
  if response.status_code != 200:
133
  await cookie_manager.mark_cookie_failed(cookie)
134
- error_text = await response.aread()
135
- error_detail = error_text.decode('utf-8', errors='ignore')
136
- error_code = response.status_code
137
- error_type = "auth_error" if error_code == 401 else "server_error"
138
- error_chunk = {"error": {"message": f"Upstream error ({error_code}): {error_detail}", "type": error_type, "code": error_code}}
139
- yield f"data: {json.dumps(error_chunk)}\n\n"
140
- return
141
 
142
  await cookie_manager.mark_cookie_success(cookie)
143
 
144
- async for chunk in response.aiter_text():
 
 
 
 
 
145
  buffer += chunk
 
 
146
  while "\n" in buffer:
147
  line, buffer = buffer.split("\n", 1)
148
  line = line.strip()
149
 
150
- if not line.startswith("data:"):
151
- continue
152
-
153
- payload_str = line[6:].strip()
154
- if payload_str == "[DONE]":
155
  continue
156
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
157
  try:
158
- parsed = json.loads(payload_str)
159
  data = parsed.get("data", {})
160
  delta_content = data.get("delta_content", "")
161
- phase = data.get("phase", "").strip()
162
-
163
- if not delta_content:
164
- continue
165
-
166
- output_content = ""
167
- if settings.SHOW_THINK_TAGS:
168
- temp_content = re.sub(r'<details[^>]*>\s*<summary>.*?</summary>', '<think>', delta_content, flags=re.DOTALL)
169
- output_content = temp_content.replace('</details>', '</think>')
170
- else:
171
- if phase == "thinking":
172
- output_content = ""
173
- elif phase == "answer":
174
- if '</details>' in delta_content:
175
- output_content = delta_content.split('</details>', 1)[1]
176
- else:
177
- output_content = delta_content
178
-
179
- if output_content:
180
- yield f"data: {json.dumps(self._create_openai_chunk(completion_id, request.model, output_content))}\n\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
181
 
182
  except json.JSONDecodeError:
183
- logger.debug(f"Skipping non-JSON data line: {line}")
184
- continue
185
 
186
- final_chunk = self._create_openai_chunk(completion_id, request.model, "", is_final=True)
187
- yield f"data: {json.dumps(final_chunk)}\n\n"
188
- yield "data: [DONE]\n\n"
189
-
190
- except httpx.RequestError as e:
191
- logger.error(f"Streaming request error: {e}")
192
- await cookie_manager.mark_cookie_failed(cookie)
193
- error_chunk = {"error": {"message": f"Upstream service unavailable: {str(e)}", "type": "server_error"}}
194
- yield f"data: {json.dumps(error_chunk)}\n\n"
195
-
196
- def _create_openai_chunk(self, completion_id: str, model: str, content: str, is_final: bool = False) -> Dict[str, Any]:
197
- """A helper to create a standard OpenAI stream chunk."""
198
- chunk = {
199
- "id": completion_id, "object": "chat.completion.chunk", "created": int(time.time()),
200
- "model": model, "choices": [{"index": 0, "delta": {}, "finish_reason": None}]
201
- }
202
- if is_final:
203
- chunk["choices"][0]["finish_reason"] = "stop"
204
- else:
205
- chunk["choices"][0]["delta"] = {"content": content}
206
- return chunk
 
6
  import logging
7
  import re
8
  import time
9
+ from typing import AsyncGenerator, Dict, Any, Optional
10
  import httpx
11
+ from fastapi import HTTPException
12
  from fastapi.responses import StreamingResponse
13
 
14
  from config import settings
 
16
  from models import (
17
  ChatCompletionRequest,
18
  ChatCompletionResponse,
19
+ ChatCompletionStreamResponse,
20
  )
21
 
22
  logger = logging.getLogger(__name__)
 
24
 
25
  class ProxyHandler:
26
  def __init__(self):
27
+ # Configure httpx client for streaming support
28
+ self.client = httpx.AsyncClient(
29
+ timeout=httpx.Timeout(60.0, read=300.0), # Longer read timeout for streaming
30
+ limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
31
+ http2=True # Enable HTTP/2 for better streaming performance
32
+ )
33
 
34
  async def __aenter__(self):
35
  return self
36
 
37
  async def __aexit__(self, exc_type, exc_val, exc_tb):
38
+ await self.client.aclose()
39
 
40
+ def transform_content(self, content: str) -> str:
41
+ """Transform content by replacing HTML tags and optionally removing think tags"""
42
+ if not content:
43
+ return content
44
+
45
+ logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
46
+
47
+ # Optionally remove thinking content based on configuration
48
+ if not settings.SHOW_THINK_TAGS:
49
+ logger.debug("Removing thinking content from response")
50
+ original_length = len(content)
51
+
52
+ # Remove <details> blocks (thinking content) - handle both closed and unclosed tags
53
+ # First try to remove complete <details>...</details> blocks
54
+ content = re.sub(
55
+ r"<details[^>]*>.*?</details>", "", content, flags=re.DOTALL
56
+ )
57
+
58
+ # Then remove any remaining <details> opening tags and everything after them until we hit answer content
59
+ # Look for pattern: <details...><summary>...</summary>...content... and remove the thinking part
60
+ content = re.sub(
61
+ r"<details[^>]*>.*?(?=\s*[A-Z]|\s*\d|\s*$)",
62
+ "",
63
+ content,
64
+ flags=re.DOTALL,
65
+ )
66
+
67
+ content = content.strip()
68
+
69
+ logger.debug(
70
+ f"Content length after removing thinking content: {original_length} -> {len(content)}"
71
+ )
72
+ else:
73
+ logger.debug("Keeping thinking content, converting to <think> tags")
74
+
75
+ # Replace <details> with <think>
76
+ content = re.sub(r"<details[^>]*>", "<think>", content)
77
+ content = content.replace("</details>", "</think>")
78
+
79
+ # Remove <summary> tags and their content
80
+ content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
81
+
82
+ # If there's no closing </think>, add it at the end of thinking content
83
+ if "<think>" in content and "</think>" not in content:
84
+ # Find where thinking ends and answer begins
85
+ think_start = content.find("<think>")
86
+ if think_start != -1:
87
+ # Look for the start of the actual answer (usually starts with a capital letter or number)
88
+ answer_match = re.search(r"\n\s*[A-Z0-9]", content[think_start:])
89
+ if answer_match:
90
+ insert_pos = think_start + answer_match.start()
91
+ content = (
92
+ content[:insert_pos] + "</think>\n" + content[insert_pos:]
93
+ )
94
+ else:
95
+ content += "</think>"
96
+
97
+ return content.strip()
98
+
99
+ async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
100
+ """Proxy request to Z.AI API"""
101
+ cookie = await cookie_manager.get_next_cookie()
102
+ if not cookie:
103
+ raise HTTPException(status_code=503, detail="No available cookies")
104
+
105
+ # Transform model name
106
+ target_model = (
107
+ settings.UPSTREAM_MODEL
108
+ if request.model == settings.MODEL_NAME
109
+ else request.model
110
+ )
111
+
112
+ # Determine if this should be a streaming response
113
  is_streaming = (
114
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
115
  )
116
 
117
+ # Validate parameter compatibility
118
+ if is_streaming and not settings.SHOW_THINK_TAGS:
119
+ logger.warning("SHOW_THINK_TAGS=false is ignored for streaming responses")
120
+
121
+ # Prepare request data
122
+ request_data = request.model_dump(exclude_none=True)
123
+ request_data["model"] = target_model
124
+
125
+ # Build request data based on actual Z.AI format from zai-messages.md
126
+ import uuid
127
+
128
+ request_data = {
129
+ "stream": True, # Always request streaming from Z.AI for processing
130
+ "model": target_model,
131
+ "messages": request_data["messages"],
132
+ "background_tasks": {"title_generation": True, "tags_generation": True},
133
+ "chat_id": str(uuid.uuid4()),
134
+ "features": {
135
+ "image_generation": False,
136
+ "code_interpreter": False,
137
+ "web_search": False,
138
+ "auto_web_search": False,
139
+ },
140
+ "id": str(uuid.uuid4()),
141
+ "mcp_servers": ["deep-web-search"],
142
+ "model_item": {"id": target_model, "name": "GLM-4.5", "owned_by": "openai"},
143
+ "params": {},
144
+ "tool_servers": [],
145
+ "variables": {
146
+ "{{USER_NAME}}": "User",
147
+ "{{USER_LOCATION}}": "Unknown",
148
+ "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56",
149
+ },
150
+ }
151
+
152
+ logger.debug(f"Sending request data: {request_data}")
153
+
154
+ headers = {
155
+ "Content-Type": "application/json",
156
+ "Authorization": f"Bearer {cookie}",
157
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
158
+ "Accept": "application/json, text/event-stream",
159
+ "Accept-Language": "zh-CN",
160
+ "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
161
+ "sec-ch-ua-mobile": "?0",
162
+ "sec-ch-ua-platform": '"macOS"',
163
+ "x-fe-version": "prod-fe-1.0.53",
164
+ "Origin": "https://chat.z.ai",
165
+ "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
166
+ }
167
+
168
+ try:
169
+ # Use client.stream() for TRUE streaming response - this is the key fix!
170
+ async with self.client.stream(
171
+ "POST",
172
+ settings.UPSTREAM_URL,
173
+ json=request_data,
174
+ headers=headers,
175
+ timeout=httpx.Timeout(60.0, read=300.0)
176
+ ) as response:
177
+
178
+ if response.status_code == 401:
179
+ await cookie_manager.mark_cookie_failed(cookie)
180
+ raise HTTPException(status_code=401, detail="Invalid authentication")
181
+
182
+ if response.status_code != 200:
183
+ # For streaming, we need to read the error response properly
184
+ try:
185
+ error_text = await response.aread()
186
+ error_detail = error_text.decode('utf-8')
187
+ except:
188
+ error_detail = f"HTTP {response.status_code}"
189
+
190
+ raise HTTPException(
191
+ status_code=response.status_code,
192
+ detail=f"Upstream error: {error_detail}",
193
+ )
194
+
195
+ await cookie_manager.mark_cookie_success(cookie)
196
+ return {"response": response, "cookie": cookie}
197
+
198
+ except httpx.RequestError as e:
199
+ logger.error(f"Request error: {e}")
200
+ logger.error(f"Request error type: {type(e).__name__}")
201
+ logger.error(f"Request URL: {settings.UPSTREAM_URL}")
202
+ logger.error(f"Request timeout: {self.client.timeout}")
203
+ await cookie_manager.mark_cookie_failed(cookie)
204
+ raise HTTPException(
205
+ status_code=503, detail=f"Upstream service unavailable: {str(e)}"
206
+ )
207
+
208
+ async def process_streaming_response(
209
+ self, response: httpx.Response
210
+ ) -> AsyncGenerator[Dict[str, Any], None]:
211
+ """Process streaming response from Z.AI - TRUE real-time processing"""
212
+ buffer = ""
213
+
214
+ # Use aiter_text with small chunk size for real-time processing
215
+ async for chunk in response.aiter_text(chunk_size=1024): # Small chunks for responsiveness
216
+ if not chunk:
217
+ continue
218
+
219
+ buffer += chunk
220
+
221
+ # Process complete lines immediately
222
+ while "\n" in buffer:
223
+ line, buffer = buffer.split("\n", 1)
224
+ line = line.strip()
225
+
226
+ if not line.startswith("data: "):
227
+ continue
228
+
229
+ payload = line[6:].strip()
230
+ if payload == "[DONE]":
231
+ return
232
+
233
+ try:
234
+ parsed = json.loads(payload)
235
+
236
+ # Check for errors first
237
+ if parsed.get("error") or (parsed.get("data", {}).get("error")):
238
+ error_detail = (
239
+ parsed.get("error", {}).get("detail")
240
+ or parsed.get("data", {}).get("error", {}).get("detail")
241
+ or "Unknown error from upstream"
242
+ )
243
+ logger.error(f"Upstream error: {error_detail}")
244
+ raise HTTPException(
245
+ status_code=400, detail=f"Upstream error: {error_detail}"
246
+ )
247
+
248
+ # Clean up response data
249
+ if parsed.get("data"):
250
+ # Remove unwanted fields for cleaner processing
251
+ parsed["data"].pop("edit_index", None)
252
+ parsed["data"].pop("edit_content", None)
253
+
254
+ # Yield immediately for real-time streaming
255
+ yield parsed
256
+
257
+ except json.JSONDecodeError as e:
258
+ logger.debug(f"JSON decode error (skipping): {e}")
259
+ continue # Skip non-JSON lines
260
+
261
+ async def handle_chat_completion(self, request: ChatCompletionRequest):
262
+ """Handle chat completion request"""
263
+ # Determine final streaming mode
264
+ is_streaming = (
265
+ request.stream if request.stream is not None else settings.DEFAULT_STREAM
266
+ )
267
 
268
  if is_streaming:
269
+ # For streaming responses, use direct streaming proxy
270
  return StreamingResponse(
271
+ self.stream_proxy_response(request),
272
  media_type="text/event-stream",
273
  headers={
274
  "Cache-Control": "no-cache",
 
276
  },
277
  )
278
  else:
279
+ # For non-streaming responses, collect all streaming data first
280
+ chunks = []
281
+ async for chunk_data in self.stream_proxy_response(request):
282
  if chunk_data.startswith("data: ") and not chunk_data.startswith("data: [DONE]"):
283
  try:
284
+ chunk_json = json.loads(chunk_data[6:])
285
+ if chunk_json.get("choices", [{}])[0].get("delta", {}).get("content"):
286
+ chunks.append(chunk_json["choices"][0]["delta"]["content"])
287
+ except:
 
 
 
 
 
 
288
  continue
289
+
290
+ # Combine all content
291
+ full_content = "".join(chunks)
292
+
293
+ # Apply final transformation for non-streaming response
294
+ transformed_content = self.transform_content(full_content)
295
+
296
+ # Return as non-streaming response
297
+ import time
298
+ import uuid
299
  return ChatCompletionResponse(
300
+ id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
301
  created=int(time.time()),
302
  model=request.model,
303
  choices=[
304
  {
305
  "index": 0,
306
+ "message": {"role": "assistant", "content": transformed_content},
307
  "finish_reason": "stop",
308
  }
309
  ],
310
  )
311
 
312
+ async def stream_response(self, response: httpx.Response, model: str) -> AsyncGenerator[str, None]:
313
+ """Generate TRUE streaming response in OpenAI format - real-time processing"""
314
+ import uuid
315
+ import time
316
+
317
+ # Generate a unique completion ID
318
+ completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
319
+ current_phase = None
320
+
321
+ try:
322
+ # Real-time streaming: process each chunk immediately as it arrives
323
+ async for parsed in self.process_streaming_response(response):
324
+ try:
325
+ data = parsed.get("data", {})
326
+ delta_content = data.get("delta_content", "")
327
+ phase = data.get("phase", "")
328
+
329
+ # Track phase changes
330
+ if phase != current_phase:
331
+ current_phase = phase
332
+ logger.debug(f"Phase changed to: {phase}")
333
+
334
+ # Apply filtering based on SHOW_THINK_TAGS and phase
335
+ should_send_content = True
336
+
337
+ if not settings.SHOW_THINK_TAGS and phase == "thinking":
338
+ # Skip thinking content when SHOW_THINK_TAGS=false
339
+ should_send_content = False
340
+ logger.debug(f"Skipping thinking content (SHOW_THINK_TAGS=false)")
341
+
342
+ # Process and send content immediately if we should
343
+ if delta_content and should_send_content:
344
+ # Minimal transformation for real-time streaming
345
+ transformed_delta = delta_content
346
+
347
+ if settings.SHOW_THINK_TAGS:
348
+ # Simple tag replacement for streaming
349
+ transformed_delta = re.sub(r'<details[^>]*>', '<think>', transformed_delta)
350
+ transformed_delta = transformed_delta.replace('</details>', '</think>')
351
+ # Note: Skip complex regex for streaming performance
352
+
353
+ # Create and send OpenAI-compatible chunk immediately
354
+ openai_chunk = {
355
+ "id": completion_id,
356
+ "object": "chat.completion.chunk",
357
+ "created": int(time.time()),
358
+ "model": model,
359
+ "choices": [{
360
+ "index": 0,
361
+ "delta": {
362
+ "content": transformed_delta
363
+ },
364
+ "finish_reason": None
365
+ }]
366
+ }
367
+
368
+ # Yield immediately for real-time streaming
369
+ yield f"data: {json.dumps(openai_chunk)}\n\n"
370
+
371
+ except Exception as e:
372
+ logger.error(f"Error processing streaming chunk: {e}")
373
+ continue
374
+
375
+ # Send final completion chunk
376
+ final_chunk = {
377
+ "id": completion_id,
378
+ "object": "chat.completion.chunk",
379
+ "created": int(time.time()),
380
+ "model": model,
381
+ "choices": [{
382
+ "index": 0,
383
+ "delta": {},
384
+ "finish_reason": "stop"
385
+ }]
386
+ }
387
+
388
+ yield f"data: {json.dumps(final_chunk)}\n\n"
389
+ yield "data: [DONE]\n\n"
390
+
391
+ except Exception as e:
392
+ logger.error(f"Streaming error: {e}")
393
+ # Send error in OpenAI format
394
+ error_chunk = {
395
+ "error": {
396
+ "message": str(e),
397
+ "type": "server_error"
398
+ }
399
+ }
400
  yield f"data: {json.dumps(error_chunk)}\n\n"
401
+
402
+ async def non_stream_response(
403
+ self, response: httpx.Response, model: str
404
+ ) -> ChatCompletionResponse:
405
+ """Generate non-streaming response"""
406
+ chunks = []
407
+ async for parsed in self.process_streaming_response(response):
408
+ chunks.append(parsed)
409
+ logger.debug(f"Received chunk: {parsed}") # Debug log
410
+
411
+ if not chunks:
412
+ raise HTTPException(status_code=500, detail="No response from upstream")
413
+
414
+ logger.info(f"Total chunks received: {len(chunks)}")
415
+ logger.debug(f"First chunk structure: {chunks[0] if chunks else 'None'}")
416
+
417
+ # Aggregate content based on SHOW_THINK_TAGS setting
418
+ if settings.SHOW_THINK_TAGS:
419
+ # Include all content
420
+ full_content = "".join(
421
+ chunk.get("data", {}).get("delta_content", "") for chunk in chunks
422
+ )
423
+ else:
424
+ # Only include answer phase content
425
+ full_content = "".join(
426
+ chunk.get("data", {}).get("delta_content", "")
427
+ for chunk in chunks
428
+ if chunk.get("data", {}).get("phase") == "answer"
429
+ )
430
+
431
+ logger.info(f"Aggregated content length: {len(full_content)}")
432
+ logger.debug(
433
+ f"Full aggregated content: {full_content}"
434
+ ) # Show full content for debugging
435
+
436
+ # Apply content transformation (including think tag filtering)
437
+ transformed_content = self.transform_content(full_content)
438
+
439
+ logger.info(f"Transformed content length: {len(transformed_content)}")
440
+ logger.debug(f"Transformed content: {transformed_content[:200]}...")
441
+
442
+ # Create OpenAI-compatible response
443
+ return ChatCompletionResponse(
444
+ id=chunks[0].get("data", {}).get("id", "chatcmpl-unknown"),
445
+ created=int(time.time()),
446
+ model=model,
447
+ choices=[
448
+ {
449
+ "index": 0,
450
+ "message": {"role": "assistant", "content": transformed_content},
451
+ "finish_reason": "stop",
452
+ }
453
+ ],
454
+ )
455
+
456
+ async def stream_proxy_response(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
457
+ """TRUE streaming proxy - direct pass-through with minimal processing"""
458
  import uuid
459
+ import time
460
+
461
+ # Get cookie
462
+ cookie = await cookie_manager.get_next_cookie()
463
+ if not cookie:
464
+ raise HTTPException(status_code=503, detail="No valid authentication available")
465
+
466
+ # Prepare request data
467
+ request_data = request.model_dump(exclude_none=True)
468
+ target_model = "0727-360B-API" # Map GLM-4.5 to Z.AI model
469
+
470
+ # Build Z.AI request format
471
  request_data = {
472
+ "stream": True, # Always request streaming from Z.AI
473
  "model": target_model,
474
+ "messages": request_data["messages"],
475
+ "background_tasks": {
476
+ "title_generation": True,
477
+ "tags_generation": True
478
+ },
479
+ "chat_id": str(uuid.uuid4()),
480
  "features": {
481
  "image_generation": False,
482
  "code_interpreter": False,
483
  "web_search": False,
484
  "auto_web_search": False,
485
+ "enable_thinking": True,
486
  },
487
  "id": str(uuid.uuid4()),
488
+ "mcp_servers": ["deep-web-search"],
489
+ "model_item": {
490
+ "id": target_model,
491
+ "name": "GLM-4.5",
492
+ "owned_by": "openai"
493
+ },
494
+ "params": {},
495
+ "tool_servers": [],
496
  "variables": {
497
  "{{USER_NAME}}": "User",
498
  "{{USER_LOCATION}}": "Unknown",
499
+ "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56"
500
+ }
501
  }
502
 
503
  headers = {
504
+ "Content-Type": "application/json",
505
+ "Authorization": f"Bearer {cookie}",
506
  "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
507
+ "Accept": "application/json, text/event-stream",
508
+ "Accept-Language": "zh-CN",
509
+ "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
510
+ "sec-ch-ua-mobile": "?0",
511
+ "sec-ch-ua-platform": '"macOS"',
512
+ "x-fe-version": "prod-fe-1.0.53",
513
+ "Origin": "https://chat.z.ai",
514
+ "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
515
  }
516
 
517
  completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
518
+ current_phase = None
519
+
520
+ try:
521
+ # Create a new client for this streaming request to avoid conflicts
522
+ async with httpx.AsyncClient(
523
+ timeout=httpx.Timeout(60.0, read=300.0),
524
+ limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
525
+ http2=True
526
+ ) as stream_client:
527
+ async with stream_client.stream(
528
+ "POST",
529
+ settings.UPSTREAM_URL,
530
+ json=request_data,
531
+ headers=headers
532
+ ) as response:
533
+
534
+ if response.status_code == 401:
535
+ await cookie_manager.mark_cookie_failed(cookie)
536
+ raise HTTPException(status_code=401, detail="Invalid authentication")
537
 
 
 
 
 
 
538
  if response.status_code != 200:
539
  await cookie_manager.mark_cookie_failed(cookie)
540
+ raise HTTPException(status_code=response.status_code, detail="Upstream error")
 
 
 
 
 
 
541
 
542
  await cookie_manager.mark_cookie_success(cookie)
543
 
544
+ # Process streaming response in real-time
545
+ buffer = ""
546
+ async for chunk in response.aiter_text(chunk_size=1024):
547
+ if not chunk:
548
+ continue
549
+
550
  buffer += chunk
551
+
552
+ # Process complete lines immediately
553
  while "\n" in buffer:
554
  line, buffer = buffer.split("\n", 1)
555
  line = line.strip()
556
 
557
+ if not line.startswith("data: "):
 
 
 
 
558
  continue
559
 
560
+ payload = line[6:].strip()
561
+ if payload == "[DONE]":
562
+ # Send final chunk and done
563
+ final_chunk = {
564
+ "id": completion_id,
565
+ "object": "chat.completion.chunk",
566
+ "created": int(time.time()),
567
+ "model": request.model,
568
+ "choices": [{
569
+ "index": 0,
570
+ "delta": {},
571
+ "finish_reason": "stop"
572
+ }]
573
+ }
574
+ yield f"data: {json.dumps(final_chunk)}\n\n"
575
+ yield "data: [DONE]\n\n"
576
+ return
577
+
578
  try:
579
+ parsed = json.loads(payload)
580
  data = parsed.get("data", {})
581
  delta_content = data.get("delta_content", "")
582
+ phase = data.get("phase", "")
583
+
584
+ # Track phase changes
585
+ if phase != current_phase:
586
+ current_phase = phase
587
+ logger.debug(f"Phase changed to: {phase}")
588
+
589
+ # Apply filtering based on SHOW_THINK_TAGS and phase
590
+ should_send_content = True
591
+
592
+ if not settings.SHOW_THINK_TAGS and phase == "thinking":
593
+ should_send_content = False
594
+
595
+ # Process and send content immediately if we should
596
+ if delta_content and should_send_content:
597
+ # Minimal transformation for real-time streaming
598
+ transformed_delta = delta_content
599
+
600
+ if settings.SHOW_THINK_TAGS:
601
+ # Simple tag replacement for streaming
602
+ transformed_delta = re.sub(r'<details[^>]*>', '<think>', transformed_delta)
603
+ transformed_delta = transformed_delta.replace('</details>', '</think>')
604
+ transformed_delta = re.sub(r"<summary>.*?</summary>", "", transformed_delta, flags=re.DOTALL)
605
+
606
+ # Create and send OpenAI-compatible chunk immediately
607
+ openai_chunk = {
608
+ "id": completion_id,
609
+ "object": "chat.completion.chunk",
610
+ "created": int(time.time()),
611
+ "model": request.model,
612
+ "choices": [{
613
+ "index": 0,
614
+ "delta": {
615
+ "content": transformed_delta
616
+ },
617
+ "finish_reason": None
618
+ }]
619
+ }
620
+
621
+ # Yield immediately for real-time streaming
622
+ yield f"data: {json.dumps(openai_chunk)}\n\n"
623
 
624
  except json.JSONDecodeError:
625
+ continue # Skip non-JSON lines
 
626
 
627
+ except httpx.RequestError as e:
628
+ logger.error(f"Streaming request error: {e}")
629
+ await cookie_manager.mark_cookie_failed(cookie)
630
+ raise HTTPException(status_code=503, detail=f"Upstream service unavailable: {str(e)}")