bluewinliang commited on
Commit
be9067e
·
verified ·
1 Parent(s): 827f3c3

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +83 -100
proxy_handler.py CHANGED
@@ -26,65 +26,50 @@ class ProxyHandler:
26
  if not self.client.is_closed:
27
  await self.client.aclose()
28
 
29
- def _clean_thinking_content(self, text: str) -> str:
30
  """
31
- Aggressively cleans raw thinking content strings based on observed patterns
32
- from the Z.AI API, inspired by a reference Cloudflare implementation.
33
- Removes tool calls, specific HTML-like tags, and other metadata while preserving
34
- the core thought process content.
35
  """
36
  if not text:
37
  return ""
38
-
39
- cleaned_text = text
40
-
41
- # 1. Remove entire blocks where the content is also unwanted metadata.
42
- # e.g., <summary>Thinking...</summary> or <glm_block>...</glm_block>
43
- cleaned_text = re.sub(r'<summary>.*?</summary>', '', cleaned_text, flags=re.DOTALL)
44
  cleaned_text = re.sub(r'<glm_block.*?</glm_block>', '', cleaned_text, flags=re.DOTALL)
45
 
46
- # 2. Remove specific structural tags, but keep the content between them.
47
- # Inspired by the reference implementation's targeted replaces.
48
- # e.g., <details> content </details> becomes just 'content'
49
- cleaned_text = cleaned_text.replace("</thinking>", "")
50
- cleaned_text = cleaned_text.replace("<Full>", "")
51
- cleaned_text = cleaned_text.replace("</Full>", "")
52
- # This regex handles <details>, <details open>, and </details>
53
- cleaned_text = re.sub(r'</?details[^>]*>', '', cleaned_text)
54
-
55
- # 3. Handle markdown blockquotes.
56
- cleaned_text = re.sub(r'^\s*>\s*(?!>)', '', cleaned_text, flags=re.MULTILINE)
57
-
58
- # 4. Remove other known text artifacts.
59
- cleaned_text = cleaned_text.replace("Thinking…", "")
60
 
61
- # 5. Final strip to clean up residual whitespace from removed elements.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
62
  return cleaned_text.strip()
63
 
64
- def _clean_answer_content(self, text: str) -> str:
65
- """
66
- Cleans unwanted tags from answer content.
67
- Does NOT strip whitespace to preserve markdown in streams.
68
- """
69
- if not text:
70
- return ""
71
- # Remove tool call blocks
72
- cleaned_text = re.sub(r'<glm_block.*?</glm_block>', '', text, flags=re.DOTALL)
73
- # Remove any residual details/summary blocks that might leak into the answer
74
- cleaned_text = re.sub(r'<details[^>]*>.*?</details>', '', cleaned_text, flags=re.DOTALL)
75
- cleaned_text = re.sub(r'<summary>.*?</summary>', '', cleaned_text, flags=re.DOTALL)
76
- return cleaned_text
77
-
78
- def _serialize_msgs(self, msgs) -> list:
79
- """Converts message objects to a list of dictionaries."""
80
- out = []
81
- for m in msgs:
82
- if hasattr(m, "dict"): out.append(m.dict())
83
- elif hasattr(m, "model_dump"): out.append(m.model_dump())
84
- elif isinstance(m, dict): out.append(m)
85
- else: out.append({"role": getattr(m, "role", "user"), "content": getattr(m, "content", str(m))})
86
- return out
87
-
88
  async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str]:
89
  """Prepares the request body, headers, and cookie for the upstream API."""
90
  ck = await cookie_manager.get_next_cookie()
@@ -102,6 +87,7 @@ class ProxyHandler:
102
  think_open = False
103
  yielded_think_buffer = ""
104
  current_raw_thinking = ""
 
105
 
106
  async def yield_delta(content_type: str, text: str):
107
  nonlocal think_open, yielded_think_buffer
@@ -141,39 +127,41 @@ class ProxyHandler:
141
 
142
  payload_str = line[6:]
143
  if payload_str == '[DONE]':
144
- if think_open:
145
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
146
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n";
147
- yield "data: [DONE]\n\n";
148
- return
149
  try:
150
  dat = json.loads(payload_str).get("data", {})
151
- except (json.JSONDecodeError, AttributeError):
152
- continue
153
 
154
- # --- START OF REFACTORED LOGIC ---
155
- phase = dat.get("phase")
156
- content_chunk = dat.get("delta_content") or dat.get("edit_content")
157
-
158
- if not content_chunk:
 
159
  continue
160
 
161
- if phase == "thinking":
162
- # Accumulate raw thinking content. `edit_content` replaces the buffer.
163
- if dat.get("edit_content") is not None:
164
- current_raw_thinking = content_chunk
165
- else:
166
- current_raw_thinking += content_chunk
167
- # Yield the processed delta of the accumulated thinking content
168
- async for item in yield_delta("thinking", current_raw_thinking):
169
- yield item
170
 
171
- elif phase == "answer":
172
- # Directly yield the answer chunk for processing
173
- async for item in yield_delta("answer", content_chunk):
 
 
 
 
 
 
 
 
 
 
 
174
  yield item
175
- # --- END OF REFACTORED LOGIC ---
176
-
177
  except Exception:
178
  logger.exception("Stream error"); raise
179
 
@@ -202,32 +190,27 @@ class ProxyHandler:
202
  dat = json.loads(payload_str).get("data", {})
203
  except (json.JSONDecodeError, AttributeError): continue
204
 
205
- # Use the more robust phase-based logic for non-stream as well
206
- phase = dat.get("phase")
207
- content_chunk = dat.get("delta_content") or dat.get("edit_content")
208
-
209
- if not content_chunk:
210
  continue
211
 
212
- if phase == "thinking":
213
- answer_started = False # Ensure we are in thinking mode
214
- if dat.get("edit_content") is not None:
215
- current_raw_thinking = content_chunk
216
- else:
217
- current_raw_thinking += content_chunk
218
- last_thinking_content = current_raw_thinking
219
 
220
- elif phase == "answer":
221
- if not answer_started:
222
- # First answer chunk might contain leftover thinking part, clean it.
223
- cleaned_chunk = self._clean_answer_content(content_chunk)
224
- if cleaned_chunk:
225
- raw_answer_parts.append(cleaned_chunk)
226
- answer_started = True
227
- else:
228
- raw_answer_parts.append(content_chunk)
229
- else:
230
- continue
231
  break
232
 
233
  full_answer = ''.join(raw_answer_parts)
@@ -252,4 +235,4 @@ class ProxyHandler:
252
  if stream:
253
  return StreamingResponse(self.stream_proxy_response(req), media_type="text/event-stream",
254
  headers={"Cache-Control": "no-cache", "Connection": "keep-alive"})
255
-
 
26
  if not self.client.is_closed:
27
  await self.client.aclose()
28
 
29
+ def _clean_thinking_content(self, text: str) -> str:
30
  """
31
+ Aggressively cleans raw thinking content strings.
32
+ Removes tool calls, HTML-like tags, and other metadata.
33
+ Based on the Cloudflare Workers cleanThinkingContent implementation.
 
34
  """
35
  if not text:
36
  return ""
37
+
38
+ # Remove <summary>...</summary> tags and content
39
+ cleaned_text = re.sub(r'<summary>.*?</summary>', '', text, flags=re.DOTALL)
40
+
41
+ # Remove tool call blocks
 
42
  cleaned_text = re.sub(r'<glm_block.*?</glm_block>', '', cleaned_text, flags=re.DOTALL)
43
 
44
+ # Clean up specific thinking-related tags
45
+ cleaned_text = cleaned_text.replace('</thinking>', '')
46
+ cleaned_text = cleaned_text.replace('<Full>', '')
47
+ cleaned_text = cleaned_text.replace('</Full>', '')
48
+
49
+ # Remove details tags (both opening and closing)
50
+ cleaned_text = re.sub(r'<details[^>]*>', '', cleaned_text)
51
+ cleaned_text = re.sub(r'</details>', '', cleaned_text)
52
+
53
+ # Remove other HTML-like tags
54
+ cleaned_text = re.sub(r'<[^>]+>', '', cleaned_text)
 
 
 
55
 
56
+ # Remove metadata patterns
57
+ cleaned_text = re.sub(r'true"\s+duration="\d+"[^>]*>\s*Thought for \d+ seconds', '', cleaned_text)
58
+
59
+ # Handle quote symbols (按 Cloudflare Workers 逻辑)
60
+ # First handle leading "> "
61
+ if cleaned_text.startswith('> '):
62
+ cleaned_text = cleaned_text[2:]
63
+ # Then handle "\n> " patterns (but preserve ">>" for nested quotes)
64
+ cleaned_text = re.sub(r'\n>\s+(?!>)', '\n', cleaned_text)
65
+
66
+ # Remove "Thinking..." headers (various encodings)
67
+ cleaned_text = cleaned_text.replace("Thinking…", "")
68
+ cleaned_text = cleaned_text.replace("Thinking...", "")
69
+
70
+ # Final trim
71
  return cleaned_text.strip()
72
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
  async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str]:
74
  """Prepares the request body, headers, and cookie for the upstream API."""
75
  ck = await cookie_manager.get_next_cookie()
 
87
  think_open = False
88
  yielded_think_buffer = ""
89
  current_raw_thinking = ""
90
+ answer_started = False
91
 
92
  async def yield_delta(content_type: str, text: str):
93
  nonlocal think_open, yielded_think_buffer
 
127
 
128
  payload_str = line[6:]
129
  if payload_str == '[DONE]':
130
+ if think_open: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
131
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"; yield "data: [DONE]\n\n"; return
 
 
 
132
  try:
133
  dat = json.loads(payload_str).get("data", {})
134
+ except (json.JSONDecodeError, AttributeError): continue
 
135
 
136
+ if answer_started:
137
+ content = dat.get("delta_content") or dat.get("edit_content")
138
+ # FIX: Expanded to a proper indented block
139
+ if content:
140
+ async for item in yield_delta("answer", content):
141
+ yield item
142
  continue
143
 
144
+ content_chunk = dat.get("edit_content") or dat.get("delta_content") or ""
145
+ if dat.get("edit_content") is not None:
146
+ current_raw_thinking = content_chunk
147
+ else:
148
+ current_raw_thinking += content_chunk
 
 
 
 
149
 
150
+ match = re.search(r'(.*</details>)(.*)', current_raw_thinking, flags=re.DOTALL)
151
+ if match:
152
+ thinking_part, answer_part = match.groups()
153
+ # FIX: Expanded to a proper indented block
154
+ async for item in yield_delta("thinking", thinking_part):
155
+ yield item
156
+ # FIX: Expanded to a proper indented block
157
+ if answer_part:
158
+ async for item in yield_delta("answer", answer_part):
159
+ yield item
160
+ answer_started = True
161
+ else:
162
+ # FIX: Expanded to a proper indented block
163
+ async for item in yield_delta("thinking", current_raw_thinking):
164
  yield item
 
 
165
  except Exception:
166
  logger.exception("Stream error"); raise
167
 
 
190
  dat = json.loads(payload_str).get("data", {})
191
  except (json.JSONDecodeError, AttributeError): continue
192
 
193
+ if answer_started:
194
+ content = dat.get("delta_content") or dat.get("edit_content")
195
+ if content: raw_answer_parts.append(content)
 
 
196
  continue
197
 
198
+ content_chunk = dat.get("edit_content") or dat.get("delta_content")
199
+ if not content_chunk: continue
200
+
201
+ if dat.get("edit_content") is not None:
202
+ current_raw_thinking = content_chunk
203
+ else:
204
+ current_raw_thinking += content_chunk
205
 
206
+ match = re.search(r'(.*</details>)(.*)', current_raw_thinking, flags=re.DOTALL)
207
+ if match:
208
+ last_thinking_content, answer_part = match.groups()
209
+ if answer_part: raw_answer_parts.append(answer_part)
210
+ answer_started = True
211
+ else:
212
+ last_thinking_content = current_raw_thinking
213
+ else: continue
 
 
 
214
  break
215
 
216
  full_answer = ''.join(raw_answer_parts)
 
235
  if stream:
236
  return StreamingResponse(self.stream_proxy_response(req), media_type="text/event-stream",
237
  headers={"Cache-Control": "no-cache", "Connection": "keep-alive"})
238
+ return await self.non_stream_proxy_response(req)