bluewinliang commited on
Commit
15da475
·
verified ·
1 Parent(s): 4f7eefe

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +45 -37
proxy_handler.py CHANGED
@@ -34,14 +34,14 @@ class ProxyHandler:
34
  """
35
  if not text:
36
  return ""
37
- # Remove tool call blocks first
38
  cleaned_text = re.sub(r'<glm_block.*?</glm_block>', '', text, flags=re.DOTALL)
39
  # Remove all other HTML-like tags
40
  cleaned_text = re.sub(r'<[^>]+>', '', cleaned_text)
41
  # Remove specific metadata patterns
42
  cleaned_text = re.sub(r'true" duration="\d+">\s*Thought for \d+ seconds', '', cleaned_text)
43
- # Remove leading markdown quote symbols
44
- # cleaned_text = re.sub(r'^\s*>\s*', '', cleaned_text, flags=re.MULTILINE)
45
  # Remove "Thinking..." headers
46
  cleaned_text = cleaned_text.replace("Thinking…", "")
47
  # Final strip to clean up residual whitespace
@@ -56,7 +56,7 @@ class ProxyHandler:
56
  return ""
57
  # Remove only tool call blocks
58
  cleaned_text = re.sub(r'<glm_block.*?</glm_block>', '', text, flags=re.DOTALL)
59
- return cleaned_text # <-- FIX: Removed .strip() to preserve whitespace in chunks
60
 
61
  def _serialize_msgs(self, msgs) -> list:
62
  """Converts message objects to a list of dictionaries."""
@@ -81,39 +81,43 @@ class ProxyHandler:
81
  ck = None
82
  try:
83
  body, headers, ck = await self._prep_upstream(req)
84
- comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"; think_open = False; phase_cur = None
 
 
 
 
85
 
86
  async def yield_content(content_type: str, text: str):
87
- nonlocal think_open
 
88
 
89
- # Apply cleaning based on content type
90
- cleaned_text = ""
91
- if content_type == "thinking":
92
- cleaned_text = self._clean_thinking_content(text)
93
- elif content_type == "answer":
94
- # Use the non-stripping cleaner for stream chunks
95
- cleaned_text = self._clean_answer_content(text)
96
-
97
- if not cleaned_text and not (content_type == "answer" and text):
98
- # For answer, even if cleaning results in empty (e.g. only a glm_block),
99
- # the original might have been just whitespace, so we allow it to pass if text was present.
100
- # This logic is now simpler: if there's no text, don't yield.
101
- if not text: return
102
-
103
  if content_type == "thinking" and settings.SHOW_THINK_TAGS:
104
  if not think_open:
105
  yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]})}\n\n"
106
  think_open = True
107
- if cleaned_text:
108
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_text}, 'finish_reason': None}]})}\n\n"
109
-
 
 
 
 
 
 
 
 
 
 
 
 
110
  elif content_type == "answer":
111
  if think_open:
112
  yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
113
  think_open = False
114
 
115
- # Yield the cleaned text, which now preserves crucial whitespace
116
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_text}, 'finish_reason': None}]})}\n\n"
 
117
 
118
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
119
  if resp.status_code != 200:
@@ -140,16 +144,17 @@ class ProxyHandler:
140
  if new_phase: phase_cur = new_phase
141
  if not phase_cur: continue
142
 
143
- content = dat.get("delta_content") or dat.get("edit_content")
 
144
  if not content: continue
145
 
146
  match = re.search(r'(.*</details>)(.*)', content, flags=re.DOTALL)
147
  if match:
148
  thinking_part, answer_part = match.groups()
149
- async for item in yield_content("thinking", thinking_part): yield item
150
- async for item in yield_content("answer", answer_part): yield item
151
  else:
152
- async for item in yield_content(phase_cur, content): yield item
153
  except Exception:
154
  logger.exception("Stream error"); raise
155
 
@@ -157,7 +162,10 @@ class ProxyHandler:
157
  ck = None
158
  try:
159
  body, headers, ck = await self._prep_upstream(req)
160
- raw_thinking_parts = []; raw_answer_parts = []; phase_cur = None
 
 
 
161
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
162
  if resp.status_code != 200:
163
  await cookie_manager.mark_cookie_failed(ck); error_detail = await resp.text()
@@ -179,17 +187,17 @@ class ProxyHandler:
179
  if new_phase: phase_cur = new_phase
180
  if not phase_cur: continue
181
 
182
- content = dat.get("delta_content") or dat.get("edit_content")
183
  if not content: continue
184
 
185
  match = re.search(r'(.*</details>)(.*)', content, flags=re.DOTALL)
186
  if match:
187
  thinking_part, answer_part = match.groups()
188
- raw_thinking_parts.append(thinking_part)
189
  raw_answer_parts.append(answer_part)
190
  else:
191
  if phase_cur == "thinking":
192
- raw_thinking_parts.append(content)
193
  elif phase_cur == "answer":
194
  raw_answer_parts.append(content)
195
  else: continue
@@ -197,12 +205,12 @@ class ProxyHandler:
197
 
198
  # Clean the joined answer text, then strip the final result.
199
  full_answer = ''.join(raw_answer_parts)
200
- cleaned_ans_text = self._clean_answer_content(full_answer).strip() # <-- FIX: Apply .strip() here
201
  final_content = cleaned_ans_text
202
 
203
- if settings.SHOW_THINK_TAGS and raw_thinking_parts:
204
- # Aggressively clean the thinking part.
205
- cleaned_think_text = self._clean_thinking_content(''.join(raw_thinking_parts))
206
  if cleaned_think_text:
207
  final_content = f"<think>{cleaned_think_text}</think>{cleaned_ans_text}"
208
 
 
34
  """
35
  if not text:
36
  return ""
37
+ # Remove tool call blocks
38
  cleaned_text = re.sub(r'<glm_block.*?</glm_block>', '', text, flags=re.DOTALL)
39
  # Remove all other HTML-like tags
40
  cleaned_text = re.sub(r'<[^>]+>', '', cleaned_text)
41
  # Remove specific metadata patterns
42
  cleaned_text = re.sub(r'true" duration="\d+">\s*Thought for \d+ seconds', '', cleaned_text)
43
+ # FIX: Remove only first-level markdown quotes, preserving sub-quotes like '>>'
44
+ cleaned_text = re.sub(r'^\s*>\s*(?!>)', '', cleaned_text, flags=re.MULTILINE)
45
  # Remove "Thinking..." headers
46
  cleaned_text = cleaned_text.replace("Thinking…", "")
47
  # Final strip to clean up residual whitespace
 
56
  return ""
57
  # Remove only tool call blocks
58
  cleaned_text = re.sub(r'<glm_block.*?</glm_block>', '', text, flags=re.DOTALL)
59
+ return cleaned_text
60
 
61
  def _serialize_msgs(self, msgs) -> list:
62
  """Converts message objects to a list of dictionaries."""
 
81
  ck = None
82
  try:
83
  body, headers, ck = await self._prep_upstream(req)
84
+ comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
85
+ think_open = False
86
+ phase_cur = None
87
+ # FIX: Buffer to track streamed thinking content and prevent duplication.
88
+ streamed_think_buffer = ""
89
 
90
  async def yield_content(content_type: str, text: str):
91
+ nonlocal think_open, streamed_think_buffer
92
+ if not text: return
93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  if content_type == "thinking" and settings.SHOW_THINK_TAGS:
95
  if not think_open:
96
  yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]})}\n\n"
97
  think_open = True
98
+
99
+ # Clean the full incoming text first.
100
+ cleaned_full_text = self._clean_thinking_content(text)
101
+
102
+ # Send only the new part of the content.
103
+ if cleaned_full_text.startswith(streamed_think_buffer):
104
+ delta_to_send = cleaned_full_text[len(streamed_think_buffer):]
105
+ if delta_to_send:
106
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': delta_to_send}, 'finish_reason': None}]})}\n\n"
107
+ else: # If content radically changes, send the whole new content.
108
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_full_text}, 'finish_reason': None}]})}\n\n"
109
+
110
+ # Update the buffer with the latest full content.
111
+ streamed_think_buffer = cleaned_full_text
112
+
113
  elif content_type == "answer":
114
  if think_open:
115
  yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
116
  think_open = False
117
 
118
+ cleaned_text = self._clean_answer_content(text)
119
+ if cleaned_text:
120
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_text}, 'finish_reason': None}]})}\n\n"
121
 
122
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
123
  if resp.status_code != 200:
 
144
  if new_phase: phase_cur = new_phase
145
  if not phase_cur: continue
146
 
147
+ # Use edit_content first as it's the complete version, fallback to delta.
148
+ content = dat.get("edit_content") or dat.get("delta_content")
149
  if not content: continue
150
 
151
  match = re.search(r'(.*</details>)(.*)', content, flags=re.DOTALL)
152
  if match:
153
  thinking_part, answer_part = match.groups()
154
+ await yield_content("thinking", thinking_part)
155
+ await yield_content("answer", answer_part)
156
  else:
157
+ await yield_content(phase_cur, content)
158
  except Exception:
159
  logger.exception("Stream error"); raise
160
 
 
162
  ck = None
163
  try:
164
  body, headers, ck = await self._prep_upstream(req)
165
+ # FIX: Use single variables to store the latest content, preventing duplication.
166
+ last_thinking_content = ""
167
+ raw_answer_parts = []
168
+ phase_cur = None
169
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
170
  if resp.status_code != 200:
171
  await cookie_manager.mark_cookie_failed(ck); error_detail = await resp.text()
 
187
  if new_phase: phase_cur = new_phase
188
  if not phase_cur: continue
189
 
190
+ content = dat.get("edit_content") or dat.get("delta_content")
191
  if not content: continue
192
 
193
  match = re.search(r'(.*</details>)(.*)', content, flags=re.DOTALL)
194
  if match:
195
  thinking_part, answer_part = match.groups()
196
+ last_thinking_content = thinking_part
197
  raw_answer_parts.append(answer_part)
198
  else:
199
  if phase_cur == "thinking":
200
+ last_thinking_content = content # Overwrite with the latest
201
  elif phase_cur == "answer":
202
  raw_answer_parts.append(content)
203
  else: continue
 
205
 
206
  # Clean the joined answer text, then strip the final result.
207
  full_answer = ''.join(raw_answer_parts)
208
+ cleaned_ans_text = self._clean_answer_content(full_answer).strip()
209
  final_content = cleaned_ans_text
210
 
211
+ if settings.SHOW_THINK_TAGS and last_thinking_content:
212
+ # Aggressively clean the final thinking content.
213
+ cleaned_think_text = self._clean_thinking_content(last_thinking_content)
214
  if cleaned_think_text:
215
  final_content = f"<think>{cleaned_think_text}</think>{cleaned_ans_text}"
216