bluewinliang commited on
Commit
afc2d88
·
verified ·
1 Parent(s): 11a0c6e

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +564 -193
proxy_handler.py CHANGED
@@ -37,6 +37,262 @@ class ProxyHandler:
37
  async def __aexit__(self, exc_type, exc_val, exc_tb):
38
  await self.client.aclose()
39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
  async def handle_chat_completion(self, request: ChatCompletionRequest):
41
  """Handle chat completion request"""
42
  # Determine final streaming mode
@@ -55,21 +311,203 @@ class ProxyHandler:
55
  },
56
  )
57
  else:
58
- # For non-streaming, collect and process the full response
59
- return await self.aggregate_non_stream_response(request)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
60
 
61
- async def _build_request_payload(self, request: ChatCompletionRequest) -> Dict[str, Any]:
62
- """Helper to build the Z.AI request payload"""
63
  import uuid
64
- target_model = "0727-360B-API"
65
-
66
- # Correctly serialize ChatMessage objects to dictionaries
67
- messages_as_dicts = [msg.model_dump() for msg in request.messages]
68
 
69
- return {
70
- "stream": True,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  "model": target_model,
72
- "messages": messages_as_dicts,
73
  "background_tasks": {
74
  "title_generation": True,
75
  "tags_generation": True
@@ -97,10 +535,8 @@ class ProxyHandler:
97
  "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56"
98
  }
99
  }
100
-
101
- def _build_request_headers(self, cookie: str) -> Dict[str, str]:
102
- """Helper to build request headers"""
103
- return {
104
  "Content-Type": "application/json",
105
  "Authorization": f"Bearer {cookie}",
106
  "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
@@ -111,191 +547,126 @@ class ProxyHandler:
111
  "sec-ch-ua-platform": '"macOS"',
112
  "x-fe-version": "prod-fe-1.0.53",
113
  "Origin": "https://chat.z.ai",
114
- "Referer": "https://chat.z.ai/",
115
  }
116
 
117
- async def stream_proxy_response(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
118
- """Handle true streaming response, transforming Z.AI stream to OpenAI format."""
119
- import uuid
120
-
121
- cookie = await cookie_manager.get_next_cookie()
122
- if not cookie:
123
- raise HTTPException(status_code=503, detail="No valid authentication available")
124
-
125
- request_data = await self._build_request_payload(request)
126
- headers = self._build_request_headers(cookie)
127
  completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
128
-
129
- is_in_think_block = False
130
 
131
  try:
132
- async with self.client.stream("POST", settings.UPSTREAM_URL, json=request_data, headers=headers) as response:
133
- if response.status_code == 401:
134
- await cookie_manager.mark_cookie_failed(cookie)
135
- raise HTTPException(status_code=401, detail="Invalid authentication")
136
- if response.status_code != 200:
137
- await cookie_manager.mark_cookie_failed(cookie)
138
- error_text = await response.aread()
139
- raise HTTPException(status_code=response.status_code, detail=f"Upstream error: {error_text.decode()}")
140
-
141
- await cookie_manager.mark_cookie_success(cookie)
142
-
143
- buffer = ""
144
- async for chunk in response.aiter_text():
145
- buffer += chunk
146
- while "\n" in buffer:
147
- line, buffer = buffer.split("\n", 1)
148
- if not line.startswith("data:"):
 
 
 
 
 
 
 
 
 
 
149
  continue
150
 
151
- payload = line[6:].strip()
152
- if payload == "[DONE]":
153
- break
154
-
155
- try:
156
- parsed = json.loads(payload)
157
- data = parsed.get("data", {})
158
- delta_content = data.get("delta_content", "")
159
- phase = data.get("phase", "")
160
-
161
- content_to_send = ""
162
-
163
- if settings.SHOW_THINK_TAGS:
164
- # State-based think tag management
165
- if phase == "thinking" and not is_in_think_block:
166
- content_to_send += "<think>"
167
- is_in_think_block = True
168
- elif phase == "answer" and is_in_think_block:
169
- content_to_send += "</think>\n"
170
- is_in_think_block = False
171
-
172
- # Clean up original tags regardless of phase
173
- cleaned_delta = re.sub(r"<details[^>]*>.*?</details>", "", delta_content, flags=re.DOTALL)
174
- cleaned_delta = re.sub(r"<summary>.*?</summary>", "", cleaned_delta, flags=re.DOTALL)
175
- content_to_send += cleaned_delta
176
-
177
- else: # SHOW_THINK_TAGS is false
178
- if phase == "answer":
179
- # Still need to clean the first answer chunk which might contain </details>
180
- cleaned_delta = re.sub(r"</details>", "", delta_content)
181
- content_to_send += cleaned_delta
182
-
183
- if content_to_send:
184
- openai_chunk = {
185
- "id": completion_id,
186
- "object": "chat.completion.chunk",
187
- "created": int(time.time()),
188
- "model": request.model,
189
- "choices": [{"index": 0, "delta": {"content": content_to_send}, "finish_reason": None}]
190
- }
191
- yield f"data: {json.dumps(openai_chunk)}\n\n"
192
-
193
- except json.JSONDecodeError:
194
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
195
 
196
  except httpx.RequestError as e:
 
197
  await cookie_manager.mark_cookie_failed(cookie)
198
- raise HTTPException(status_code=503, detail=f"Upstream service unavailable: {str(e)}")
199
- finally:
200
- if is_in_think_block:
201
- # Ensure the think tag is closed if stream ends unexpectedly
202
- closing_chunk = {
203
- "id": completion_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model,
204
- "choices": [{"index": 0, "delta": {"content": "\n</think>"}, "finish_reason": None}]
205
- }
206
- yield f"data: {json.dumps(closing_chunk)}\n\n"
207
-
208
- # Send final completion chunk
209
- final_chunk = {
210
- "id": completion_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model,
211
- "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
212
- }
213
- yield f"data: {json.dumps(final_chunk)}\n\n"
214
- yield "data: [DONE]\n\n"
215
-
216
-
217
- async def aggregate_non_stream_response(self, request: ChatCompletionRequest) -> ChatCompletionResponse:
218
- """Handle non-streaming response by aggregating the stream."""
219
- import uuid
220
-
221
- cookie = await cookie_manager.get_next_cookie()
222
- if not cookie:
223
- raise HTTPException(status_code=503, detail="No valid authentication available")
224
-
225
- request_data = await self._build_request_payload(request)
226
- headers = self._build_request_headers(cookie)
227
-
228
- content_parts = []
229
- is_in_think_block = False
230
-
231
- try:
232
- async with self.client.stream("POST", settings.UPSTREAM_URL, json=request_data, headers=headers) as response:
233
- if response.status_code == 401:
234
- await cookie_manager.mark_cookie_failed(cookie)
235
- raise HTTPException(status_code=401, detail="Invalid authentication")
236
- if response.status_code != 200:
237
- await cookie_manager.mark_cookie_failed(cookie)
238
- error_text = await response.aread()
239
- raise HTTPException(status_code=response.status_code, detail=f"Upstream error: {error_text.decode()}")
240
-
241
- await cookie_manager.mark_cookie_success(cookie)
242
-
243
- buffer = ""
244
- async for chunk in response.aiter_text():
245
- buffer += chunk
246
- while "\n" in buffer:
247
- line, buffer = buffer.split("\n", 1)
248
- if not line.startswith("data:"):
249
- continue
250
-
251
- payload = line[6:].strip()
252
- if payload == "[DONE]":
253
- break
254
-
255
- try:
256
- parsed = json.loads(payload)
257
- data = parsed.get("data", {})
258
- delta_content = data.get("delta_content", "")
259
- phase = data.get("phase", "")
260
-
261
- if settings.SHOW_THINK_TAGS:
262
- if phase == "thinking" and not is_in_think_block:
263
- content_parts.append("<think>")
264
- is_in_think_block = True
265
- elif phase == "answer" and is_in_think_block:
266
- content_parts.append("</think>\n")
267
- is_in_think_block = False
268
-
269
- cleaned_delta = re.sub(r"<details[^>]*>.*?</details>", "", delta_content, flags=re.DOTALL)
270
- cleaned_delta = re.sub(r"<summary>.*?</summary>", "", cleaned_delta, flags=re.DOTALL)
271
- content_parts.append(cleaned_delta)
272
-
273
- else: # SHOW_THINK_TAGS is false
274
- if phase == "answer":
275
- cleaned_delta = re.sub(r"</details>", "", delta_content)
276
- content_parts.append(cleaned_delta)
277
-
278
- except json.JSONDecodeError:
279
- continue
280
-
281
- if is_in_think_block:
282
- content_parts.append("</think>")
283
-
284
- except httpx.RequestError as e:
285
- await cookie_manager.mark_cookie_failed(cookie)
286
- raise HTTPException(status_code=503, detail=f"Upstream service unavailable: {str(e)}")
287
-
288
- full_content = "".join(content_parts).strip()
289
-
290
- return ChatCompletionResponse(
291
- id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
292
- created=int(time.time()),
293
- model=request.model,
294
- choices=[
295
- {
296
- "index": 0,
297
- "message": {"role": "assistant", "content": full_content},
298
- "finish_reason": "stop",
299
- }
300
- ],
301
- )
 
37
  async def __aexit__(self, exc_type, exc_val, exc_tb):
38
  await self.client.aclose()
39
 
40
+ def transform_content(self, content: str) -> str:
41
+ """Transform content by replacing HTML tags and optionally removing think tags"""
42
+ if not content:
43
+ return content
44
+
45
+ logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
46
+ logger.debug(f"Original content: {content[:200]}...")
47
+
48
+ # Optionally remove thinking content based on configuration
49
+ if not settings.SHOW_THINK_TAGS:
50
+ logger.debug("Removing thinking content from response")
51
+ original_length = len(content)
52
+
53
+ # Remove <details> blocks (thinking content) - handle both closed and unclosed tags
54
+ # First try to remove complete <details>...</details> blocks
55
+ content = re.sub(
56
+ r"<details[^>]*>.*?</details>", "", content, flags=re.DOTALL
57
+ )
58
+
59
+ # Then remove any remaining unclosed <details> tags and their content
60
+ # Look for <details> tag followed by content until we find what looks like answer content
61
+ content = re.sub(
62
+ r"<details[^>]*>.*?(?=\n\s*[A-Z][^<\n]*[.!?]|\n\s*\d+\.|\n\s*[-*]\s|$)",
63
+ "",
64
+ content,
65
+ flags=re.DOTALL,
66
+ )
67
+
68
+ # Remove any remaining <summary> blocks
69
+ content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
70
+
71
+ content = content.strip()
72
+
73
+ logger.debug(
74
+ f"Content length after removing thinking content: {original_length} -> {len(content)}"
75
+ )
76
+ else:
77
+ logger.debug("Keeping thinking content, converting to <think> tags")
78
+
79
+ # First, remove <summary> tags and their content
80
+ content = re.sub(r"<summary>.*?</summary>", "", content, flags=re.DOTALL)
81
+
82
+ # Replace <details> with <think>
83
+ content = re.sub(r"<details[^>]*>", "<think>", content)
84
+ content = content.replace("</details>", "</think>")
85
+
86
+ # Handle unclosed <think> tags - need better logic to find where thinking ends
87
+ if "<think>" in content and "</think>" not in content:
88
+ logger.debug("Found unclosed <think> tag, attempting to close it")
89
+
90
+ # Find the position of <think>
91
+ think_start = content.find("<think>")
92
+ if think_start != -1:
93
+ # Look for where the actual answer begins
94
+ # This is typically after the thinking content, marked by:
95
+ # 1. A line that starts with normal text (not continuation of thinking)
96
+ # 2. Often starts with a capital letter
97
+ # 3. Might be after some whitespace/newlines
98
+
99
+ search_content = content[think_start + 7:] # Skip "<think>"
100
+
101
+ # Look for patterns that indicate the start of the answer:
102
+ # - New paragraph with capital letter
103
+ # - Numbered list
104
+ # - Clear sentence structure
105
+ patterns = [
106
+ r'\n\n+([A-Z][^<\n]*)', # New paragraph starting with capital
107
+ r'\n\n+(\d+\.)', # Numbered list
108
+ r'\n\n+([A-Z][^<\n]*[.!?])', # Complete sentence
109
+ r'\n\n+([*-]\s)', # Bullet point
110
+ ]
111
+
112
+ answer_start = None
113
+ for pattern in patterns:
114
+ match = re.search(pattern, search_content)
115
+ if match:
116
+ answer_start = think_start + 7 + match.start()
117
+ logger.debug(f"Found answer start at position {answer_start} with pattern: {pattern}")
118
+ break
119
+
120
+ if answer_start:
121
+ # Insert </think> before the answer
122
+ content = content[:answer_start] + "\n</think>\n" + content[answer_start:].lstrip()
123
+ else:
124
+ # If no clear answer found, close at the end
125
+ logger.debug("No clear answer pattern found, closing think tag at end")
126
+ content = content.rstrip() + "\n</think>"
127
+
128
+ # Clean up multiple newlines and spacing
129
+ content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content)
130
+
131
+ logger.debug(f"Final transformed content: {content[:200]}...")
132
+ return content.strip()
133
+
134
+ async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
135
+ """Proxy request to Z.AI API"""
136
+ cookie = await cookie_manager.get_next_cookie()
137
+ if not cookie:
138
+ raise HTTPException(status_code=503, detail="No available cookies")
139
+
140
+ # Transform model name
141
+ target_model = (
142
+ settings.UPSTREAM_MODEL
143
+ if request.model == settings.MODEL_NAME
144
+ else request.model
145
+ )
146
+
147
+ # Determine if this should be a streaming response
148
+ is_streaming = (
149
+ request.stream if request.stream is not None else settings.DEFAULT_STREAM
150
+ )
151
+
152
+ # Validate parameter compatibility
153
+ if is_streaming and not settings.SHOW_THINK_TAGS:
154
+ logger.warning("SHOW_THINK_TAGS=false is ignored for streaming responses")
155
+
156
+ # Prepare request data
157
+ request_data = request.model_dump(exclude_none=True)
158
+ request_data["model"] = target_model
159
+
160
+ # Build request data based on actual Z.AI format from zai-messages.md
161
+ import uuid
162
+
163
+ request_data = {
164
+ "stream": True, # Always request streaming from Z.AI for processing
165
+ "model": target_model,
166
+ "messages": request_data["messages"],
167
+ "background_tasks": {"title_generation": True, "tags_generation": True},
168
+ "chat_id": str(uuid.uuid4()),
169
+ "features": {
170
+ "image_generation": False,
171
+ "code_interpreter": False,
172
+ "web_search": False,
173
+ "auto_web_search": False,
174
+ },
175
+ "id": str(uuid.uuid4()),
176
+ "mcp_servers": ["deep-web-search"],
177
+ "model_item": {"id": target_model, "name": "GLM-4.5", "owned_by": "openai"},
178
+ "params": {},
179
+ "tool_servers": [],
180
+ "variables": {
181
+ "{{USER_NAME}}": "User",
182
+ "{{USER_LOCATION}}": "Unknown",
183
+ "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56",
184
+ },
185
+ }
186
+
187
+ logger.debug(f"Sending request data: {request_data}")
188
+
189
+ headers = {
190
+ "Content-Type": "application/json",
191
+ "Authorization": f"Bearer {cookie}",
192
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
193
+ "Accept": "application/json, text/event-stream",
194
+ "Accept-Language": "zh-CN",
195
+ "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
196
+ "sec-ch-ua-mobile": "?0",
197
+ "sec-ch-ua-platform": '"macOS"',
198
+ "x-fe-version": "prod-fe-1.0.53",
199
+ "Origin": "https://chat.z.ai",
200
+ "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
201
+ }
202
+
203
+ try:
204
+ # Use client.stream() for TRUE streaming response - this is the key fix!
205
+ async with self.client.stream(
206
+ "POST",
207
+ settings.UPSTREAM_URL,
208
+ json=request_data,
209
+ headers=headers,
210
+ timeout=httpx.Timeout(60.0, read=300.0)
211
+ ) as response:
212
+
213
+ if response.status_code == 401:
214
+ await cookie_manager.mark_cookie_failed(cookie)
215
+ raise HTTPException(status_code=401, detail="Invalid authentication")
216
+
217
+ if response.status_code != 200:
218
+ # For streaming, we need to read the error response properly
219
+ try:
220
+ error_text = await response.aread()
221
+ error_detail = error_text.decode('utf-8')
222
+ except:
223
+ error_detail = f"HTTP {response.status_code}"
224
+
225
+ raise HTTPException(
226
+ status_code=response.status_code,
227
+ detail=f"Upstream error: {error_detail}",
228
+ )
229
+
230
+ await cookie_manager.mark_cookie_success(cookie)
231
+ return {"response": response, "cookie": cookie}
232
+
233
+ except httpx.RequestError as e:
234
+ logger.error(f"Request error: {e}")
235
+ logger.error(f"Request error type: {type(e).__name__}")
236
+ logger.error(f"Request URL: {settings.UPSTREAM_URL}")
237
+ logger.error(f"Request timeout: {self.client.timeout}")
238
+ await cookie_manager.mark_cookie_failed(cookie)
239
+ raise HTTPException(
240
+ status_code=503, detail=f"Upstream service unavailable: {str(e)}"
241
+ )
242
+
243
+ async def process_streaming_response(
244
+ self, response: httpx.Response
245
+ ) -> AsyncGenerator[Dict[str, Any], None]:
246
+ """Process streaming response from Z.AI - TRUE real-time processing"""
247
+ buffer = ""
248
+
249
+ # Use aiter_text with small chunk size for real-time processing
250
+ async for chunk in response.aiter_text(chunk_size=1024): # Small chunks for responsiveness
251
+ if not chunk:
252
+ continue
253
+
254
+ buffer += chunk
255
+
256
+ # Process complete lines immediately
257
+ while "\n" in buffer:
258
+ line, buffer = buffer.split("\n", 1)
259
+ line = line.strip()
260
+
261
+ if not line.startswith("data: "):
262
+ continue
263
+
264
+ payload = line[6:].strip()
265
+ if payload == "[DONE]":
266
+ return
267
+
268
+ try:
269
+ parsed = json.loads(payload)
270
+
271
+ # Check for errors first
272
+ if parsed.get("error") or (parsed.get("data", {}).get("error")):
273
+ error_detail = (
274
+ parsed.get("error", {}).get("detail")
275
+ or parsed.get("data", {}).get("error", {}).get("detail")
276
+ or "Unknown error from upstream"
277
+ )
278
+ logger.error(f"Upstream error: {error_detail}")
279
+ raise HTTPException(
280
+ status_code=400, detail=f"Upstream error: {error_detail}"
281
+ )
282
+
283
+ # Clean up response data
284
+ if parsed.get("data"):
285
+ # Remove unwanted fields for cleaner processing
286
+ parsed["data"].pop("edit_index", None)
287
+ parsed["data"].pop("edit_content", None)
288
+
289
+ # Yield immediately for real-time streaming
290
+ yield parsed
291
+
292
+ except json.JSONDecodeError as e:
293
+ logger.debug(f"JSON decode error (skipping): {e}")
294
+ continue # Skip non-JSON lines
295
+
296
  async def handle_chat_completion(self, request: ChatCompletionRequest):
297
  """Handle chat completion request"""
298
  # Determine final streaming mode
 
311
  },
312
  )
313
  else:
314
+ # For non-streaming responses, collect all streaming data first
315
+ chunks = []
316
+ async for chunk_data in self.stream_proxy_response(request):
317
+ if chunk_data.startswith("data: ") and not chunk_data.startswith("data: [DONE]"):
318
+ try:
319
+ chunk_json = json.loads(chunk_data[6:])
320
+ if chunk_json.get("choices", [{}])[0].get("delta", {}).get("content"):
321
+ chunks.append(chunk_json["choices"][0]["delta"]["content"])
322
+ except:
323
+ continue
324
+
325
+ # Combine all content
326
+ full_content = "".join(chunks)
327
+
328
+ # Apply final transformation for non-streaming response
329
+ transformed_content = self.transform_content(full_content)
330
+
331
+ # Return as non-streaming response
332
+ import time
333
+ import uuid
334
+ return ChatCompletionResponse(
335
+ id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
336
+ created=int(time.time()),
337
+ model=request.model,
338
+ choices=[
339
+ {
340
+ "index": 0,
341
+ "message": {"role": "assistant", "content": transformed_content},
342
+ "finish_reason": "stop",
343
+ }
344
+ ],
345
+ )
346
 
347
+ async def stream_response(self, response: httpx.Response, model: str) -> AsyncGenerator[str, None]:
348
+ """Generate TRUE streaming response in OpenAI format - real-time processing"""
349
  import uuid
350
+ import time
 
 
 
351
 
352
+ # Generate a unique completion ID
353
+ completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
354
+ current_phase = None
355
+
356
+ try:
357
+ # Real-time streaming: process each chunk immediately as it arrives
358
+ async for parsed in self.process_streaming_response(response):
359
+ try:
360
+ data = parsed.get("data", {})
361
+ delta_content = data.get("delta_content", "")
362
+ phase = data.get("phase", "")
363
+
364
+ # Track phase changes
365
+ if phase != current_phase:
366
+ current_phase = phase
367
+ logger.debug(f"Phase changed to: {phase}")
368
+
369
+ # Apply filtering based on SHOW_THINK_TAGS and phase
370
+ should_send_content = True
371
+
372
+ if not settings.SHOW_THINK_TAGS and phase == "thinking":
373
+ # Skip thinking content when SHOW_THINK_TAGS=false
374
+ should_send_content = False
375
+ logger.debug(f"Skipping thinking content (SHOW_THINK_TAGS=false)")
376
+
377
+ # Process and send content immediately if we should
378
+ if delta_content and should_send_content:
379
+ # Minimal transformation for real-time streaming
380
+ transformed_delta = delta_content
381
+
382
+ if settings.SHOW_THINK_TAGS:
383
+ # Simple tag replacement for streaming
384
+ transformed_delta = re.sub(r'<details[^>]*>', '<think>', transformed_delta)
385
+ transformed_delta = transformed_delta.replace('</details>', '</think>')
386
+ # Remove summary tags for streaming
387
+ transformed_delta = re.sub(r"<summary>.*?</summary>", "", transformed_delta, flags=re.DOTALL)
388
+
389
+ # Create and send OpenAI-compatible chunk immediately
390
+ openai_chunk = {
391
+ "id": completion_id,
392
+ "object": "chat.completion.chunk",
393
+ "created": int(time.time()),
394
+ "model": model,
395
+ "choices": [{
396
+ "index": 0,
397
+ "delta": {
398
+ "content": transformed_delta
399
+ },
400
+ "finish_reason": None
401
+ }]
402
+ }
403
+
404
+ # Yield immediately for real-time streaming
405
+ yield f"data: {json.dumps(openai_chunk)}\n\n"
406
+
407
+ except Exception as e:
408
+ logger.error(f"Error processing streaming chunk: {e}")
409
+ continue
410
+
411
+ # Send final completion chunk
412
+ final_chunk = {
413
+ "id": completion_id,
414
+ "object": "chat.completion.chunk",
415
+ "created": int(time.time()),
416
+ "model": model,
417
+ "choices": [{
418
+ "index": 0,
419
+ "delta": {},
420
+ "finish_reason": "stop"
421
+ }]
422
+ }
423
+
424
+ yield f"data: {json.dumps(final_chunk)}\n\n"
425
+ yield "data: [DONE]\n\n"
426
+
427
+ except Exception as e:
428
+ logger.error(f"Streaming error: {e}")
429
+ # Send error in OpenAI format
430
+ error_chunk = {
431
+ "error": {
432
+ "message": str(e),
433
+ "type": "server_error"
434
+ }
435
+ }
436
+ yield f"data: {json.dumps(error_chunk)}\n\n"
437
+
438
+ async def non_stream_response(
439
+ self, response: httpx.Response, model: str
440
+ ) -> ChatCompletionResponse:
441
+ """Generate non-streaming response"""
442
+ chunks = []
443
+ async for parsed in self.process_streaming_response(response):
444
+ chunks.append(parsed)
445
+ logger.debug(f"Received chunk: {parsed}") # Debug log
446
+
447
+ if not chunks:
448
+ raise HTTPException(status_code=500, detail="No response from upstream")
449
+
450
+ logger.info(f"Total chunks received: {len(chunks)}")
451
+ logger.debug(f"First chunk structure: {chunks[0] if chunks else 'None'}")
452
+
453
+ # Aggregate content based on SHOW_THINK_TAGS setting
454
+ if settings.SHOW_THINK_TAGS:
455
+ # Include all content
456
+ full_content = "".join(
457
+ chunk.get("data", {}).get("delta_content", "") for chunk in chunks
458
+ )
459
+ else:
460
+ # Only include answer phase content
461
+ full_content = "".join(
462
+ chunk.get("data", {}).get("delta_content", "")
463
+ for chunk in chunks
464
+ if chunk.get("data", {}).get("phase") == "answer"
465
+ )
466
+
467
+ logger.info(f"Aggregated content length: {len(full_content)}")
468
+ logger.debug(
469
+ f"Full aggregated content: {full_content}"
470
+ ) # Show full content for debugging
471
+
472
+ # Apply content transformation (including think tag filtering)
473
+ transformed_content = self.transform_content(full_content)
474
+
475
+ logger.info(f"Transformed content length: {len(transformed_content)}")
476
+ logger.debug(f"Transformed content: {transformed_content[:200]}...")
477
+
478
+ # Create OpenAI-compatible response
479
+ return ChatCompletionResponse(
480
+ id=chunks[0].get("data", {}).get("id", "chatcmpl-unknown"),
481
+ created=int(time.time()),
482
+ model=model,
483
+ choices=[
484
+ {
485
+ "index": 0,
486
+ "message": {"role": "assistant", "content": transformed_content},
487
+ "finish_reason": "stop",
488
+ }
489
+ ],
490
+ )
491
+
492
+ async def stream_proxy_response(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
493
+ """TRUE streaming proxy - direct pass-through with minimal processing"""
494
+ import uuid
495
+ import time
496
+
497
+ # Get cookie
498
+ cookie = await cookie_manager.get_next_cookie()
499
+ if not cookie:
500
+ raise HTTPException(status_code=503, detail="No valid authentication available")
501
+
502
+ # Prepare request data
503
+ request_data = request.model_dump(exclude_none=True)
504
+ target_model = "0727-360B-API" # Map GLM-4.5 to Z.AI model
505
+
506
+ # Build Z.AI request format
507
+ request_data = {
508
+ "stream": True, # Always request streaming from Z.AI
509
  "model": target_model,
510
+ "messages": request_data["messages"],
511
  "background_tasks": {
512
  "title_generation": True,
513
  "tags_generation": True
 
535
  "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56"
536
  }
537
  }
538
+
539
+ headers = {
 
 
540
  "Content-Type": "application/json",
541
  "Authorization": f"Bearer {cookie}",
542
  "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
 
547
  "sec-ch-ua-platform": '"macOS"',
548
  "x-fe-version": "prod-fe-1.0.53",
549
  "Origin": "https://chat.z.ai",
550
+ "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
551
  }
552
 
 
 
 
 
 
 
 
 
 
 
553
  completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
554
+ current_phase = None
555
+ collected_content = "" # For post-processing in non-streaming mode
556
 
557
  try:
558
+ # Create a new client for this streaming request to avoid conflicts
559
+ async with httpx.AsyncClient(
560
+ timeout=httpx.Timeout(60.0, read=300.0),
561
+ limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
562
+ http2=True
563
+ ) as stream_client:
564
+ async with stream_client.stream(
565
+ "POST",
566
+ settings.UPSTREAM_URL,
567
+ json=request_data,
568
+ headers=headers
569
+ ) as response:
570
+
571
+ if response.status_code == 401:
572
+ await cookie_manager.mark_cookie_failed(cookie)
573
+ raise HTTPException(status_code=401, detail="Invalid authentication")
574
+
575
+ if response.status_code != 200:
576
+ await cookie_manager.mark_cookie_failed(cookie)
577
+ raise HTTPException(status_code=response.status_code, detail="Upstream error")
578
+
579
+ await cookie_manager.mark_cookie_success(cookie)
580
+
581
+ # Process streaming response in real-time
582
+ buffer = ""
583
+ async for chunk in response.aiter_text(chunk_size=1024):
584
+ if not chunk:
585
  continue
586
 
587
+ buffer += chunk
588
+
589
+ # Process complete lines immediately
590
+ while "\n" in buffer:
591
+ line, buffer = buffer.split("\n", 1)
592
+ line = line.strip()
593
+
594
+ if not line.startswith("data: "):
595
+ continue
596
+
597
+ payload = line[6:].strip()
598
+ if payload == "[DONE]":
599
+ # For streaming mode, just send the final chunk and done
600
+ if request.stream:
601
+ final_chunk = {
602
+ "id": completion_id,
603
+ "object": "chat.completion.chunk",
604
+ "created": int(time.time()),
605
+ "model": request.model,
606
+ "choices": [{
607
+ "index": 0,
608
+ "delta": {},
609
+ "finish_reason": "stop"
610
+ }]
611
+ }
612
+ yield f"data: {json.dumps(final_chunk)}\n\n"
613
+ yield "data: [DONE]\n\n"
614
+ return
615
+
616
+ try:
617
+ parsed = json.loads(payload)
618
+ data = parsed.get("data", {})
619
+ delta_content = data.get("delta_content", "")
620
+ phase = data.get("phase", "")
621
+
622
+ # Track phase changes
623
+ if phase != current_phase:
624
+ current_phase = phase
625
+ logger.debug(f"Phase changed to: {phase}")
626
+
627
+ # Collect content for potential post-processing
628
+ if delta_content:
629
+ collected_content += delta_content
630
+
631
+ # Apply filtering based on SHOW_THINK_TAGS and phase
632
+ should_send_content = True
633
+
634
+ if not settings.SHOW_THINK_TAGS and phase == "thinking":
635
+ should_send_content = False
636
+
637
+ # Process and send content immediately if we should (for streaming)
638
+ if delta_content and should_send_content and request.stream:
639
+ # Minimal transformation for real-time streaming
640
+ transformed_delta = delta_content
641
+
642
+ if settings.SHOW_THINK_TAGS:
643
+ # Simple tag replacement for streaming
644
+ transformed_delta = re.sub(r'<details[^>]*>', '<think>', transformed_delta)
645
+ transformed_delta = transformed_delta.replace('</details>', '</think>')
646
+ transformed_delta = re.sub(r"<summary>.*?</summary>", "", transformed_delta, flags=re.DOTALL)
647
+
648
+ # Create and send OpenAI-compatible chunk immediately
649
+ openai_chunk = {
650
+ "id": completion_id,
651
+ "object": "chat.completion.chunk",
652
+ "created": int(time.time()),
653
+ "model": request.model,
654
+ "choices": [{
655
+ "index": 0,
656
+ "delta": {
657
+ "content": transformed_delta
658
+ },
659
+ "finish_reason": None
660
+ }]
661
+ }
662
+
663
+ # Yield immediately for real-time streaming
664
+ yield f"data: {json.dumps(openai_chunk)}\n\n"
665
+
666
+ except json.JSONDecodeError:
667
+ continue # Skip non-JSON lines
668
 
669
  except httpx.RequestError as e:
670
+ logger.error(f"Streaming request error: {e}")
671
  await cookie_manager.mark_cookie_failed(cookie)
672
+ raise HTTPException(status_code=503, detail=f"Upstream service unavailable: {str(e)}")