bluewinliang commited on
Commit
d5e67d1
·
verified ·
1 Parent(s): 13cb7f1

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +46 -46
proxy_handler.py CHANGED
@@ -83,13 +83,11 @@ class ProxyHandler:
83
  body, headers, ck = await self._prep_upstream(req)
84
  comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
85
  think_open = False
86
- phase_cur = None
87
- # Buffer for what has been sent to the client to calculate deltas.
88
  yielded_think_buffer = ""
89
- # Authoritative state of the current full thinking content.
90
- current_thinking_content = ""
91
 
92
- async def yield_delta_content(content_type: str, text: str):
93
  nonlocal think_open, yielded_think_buffer
94
  if content_type == "thinking" and settings.SHOW_THINK_TAGS:
95
  if not think_open:
@@ -97,15 +95,10 @@ class ProxyHandler:
97
  think_open = True
98
 
99
  cleaned_full_text = self._clean_thinking_content(text)
100
-
101
- if cleaned_full_text.startswith(yielded_think_buffer):
102
- delta_to_send = cleaned_full_text[len(yielded_think_buffer):]
103
- else:
104
- delta_to_send = cleaned_full_text
105
 
106
  if delta_to_send:
107
  yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': delta_to_send}, 'finish_reason': None}]})}\n\n"
108
-
109
  yielded_think_buffer = cleaned_full_text
110
 
111
  elif content_type == "answer":
@@ -138,25 +131,26 @@ class ProxyHandler:
138
  dat = json.loads(payload_str).get("data", {})
139
  except (json.JSONDecodeError, AttributeError): continue
140
 
141
- new_phase = dat.get("phase")
142
- if new_phase: phase_cur = new_phase
143
- if not phase_cur: continue
144
-
145
- # Correctly handle delta_content (append) vs edit_content (replace).
146
- if phase_cur == "thinking":
147
- if "edit_content" in dat and dat["edit_content"] is not None:
148
- current_thinking_content = dat["edit_content"]
149
- elif "delta_content" in dat and dat["delta_content"] is not None:
150
- current_thinking_content += dat["delta_content"]
151
-
152
- async for item in yield_delta_content("thinking", current_thinking_content):
153
- yield item
154
-
155
- elif phase_cur == "answer":
156
  content = dat.get("delta_content") or dat.get("edit_content")
157
- if content:
158
- async for item in yield_delta_content("answer", content):
159
- yield item
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
160
  except Exception:
161
  logger.exception("Stream error"); raise
162
 
@@ -164,42 +158,48 @@ class ProxyHandler:
164
  ck = None
165
  try:
166
  body, headers, ck = await self._prep_upstream(req)
167
- # Use a single string to hold the latest full thinking content.
168
  last_thinking_content = ""
169
  raw_answer_parts = []
170
- phase_cur = None
171
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
172
  if resp.status_code != 200:
173
  await cookie_manager.mark_cookie_failed(ck); error_detail = await resp.text()
174
  raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
175
  await cookie_manager.mark_cookie_success(ck)
176
 
 
 
 
 
177
  async for raw in resp.aiter_text():
178
  for line in raw.strip().split('\n'):
179
  line = line.strip()
180
  if not line.startswith('data: '): continue
181
-
182
  payload_str = line[6:]
183
  if payload_str == '[DONE]': break
184
  try:
185
  dat = json.loads(payload_str).get("data", {})
186
  except (json.JSONDecodeError, AttributeError): continue
187
 
188
- new_phase = dat.get("phase")
189
- if new_phase: phase_cur = new_phase
190
- if not phase_cur: continue
191
-
192
- if phase_cur == "thinking":
193
- if "edit_content" in dat and dat["edit_content"] is not None:
194
- last_thinking_content = dat["edit_content"]
195
- elif "delta_content" in dat and dat["delta_content"] is not None:
196
- # In non-stream, we only need the final complete version.
197
- # 'edit_content' is usually that. We can build it as a fallback.
198
- last_thinking_content += dat["delta_content"]
199
- elif phase_cur == "answer":
200
  content = dat.get("delta_content") or dat.get("edit_content")
201
- if content:
202
- raw_answer_parts.append(content)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
  else: continue
204
  break
205
 
 
83
  body, headers, ck = await self._prep_upstream(req)
84
  comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
85
  think_open = False
 
 
86
  yielded_think_buffer = ""
87
+ current_raw_thinking = ""
88
+ answer_started = False
89
 
90
+ async def yield_delta(content_type: str, text: str):
91
  nonlocal think_open, yielded_think_buffer
92
  if content_type == "thinking" and settings.SHOW_THINK_TAGS:
93
  if not think_open:
 
95
  think_open = True
96
 
97
  cleaned_full_text = self._clean_thinking_content(text)
98
+ delta_to_send = cleaned_full_text[len(yielded_think_buffer):] if cleaned_full_text.startswith(yielded_think_buffer) else cleaned_full_text
 
 
 
 
99
 
100
  if delta_to_send:
101
  yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': delta_to_send}, 'finish_reason': None}]})}\n\n"
 
102
  yielded_think_buffer = cleaned_full_text
103
 
104
  elif content_type == "answer":
 
131
  dat = json.loads(payload_str).get("data", {})
132
  except (json.JSONDecodeError, AttributeError): continue
133
 
134
+ if answer_started:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
135
  content = dat.get("delta_content") or dat.get("edit_content")
136
+ if content: async for item in yield_delta("answer", content): yield item
137
+ continue
138
+
139
+ content_chunk = dat.get("edit_content") or dat.get("delta_content") or ""
140
+ if dat.get("edit_content") is not None:
141
+ current_raw_thinking = content_chunk
142
+ else:
143
+ current_raw_thinking += content_chunk
144
+
145
+ match = re.search(r'(.*</details>)(.*)', current_raw_thinking, flags=re.DOTALL)
146
+ if match:
147
+ thinking_part, answer_part = match.groups()
148
+ async for item in yield_delta("thinking", thinking_part): yield item
149
+ if answer_part:
150
+ async for item in yield_delta("answer", answer_part): yield item
151
+ answer_started = True
152
+ else:
153
+ async for item in yield_delta("thinking", current_raw_thinking): yield item
154
  except Exception:
155
  logger.exception("Stream error"); raise
156
 
 
158
  ck = None
159
  try:
160
  body, headers, ck = await self._prep_upstream(req)
 
161
  last_thinking_content = ""
162
  raw_answer_parts = []
 
163
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
164
  if resp.status_code != 200:
165
  await cookie_manager.mark_cookie_failed(ck); error_detail = await resp.text()
166
  raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
167
  await cookie_manager.mark_cookie_success(ck)
168
 
169
+ phase_cur = None
170
+ current_raw_thinking = ""
171
+ answer_started = False
172
+
173
  async for raw in resp.aiter_text():
174
  for line in raw.strip().split('\n'):
175
  line = line.strip()
176
  if not line.startswith('data: '): continue
 
177
  payload_str = line[6:]
178
  if payload_str == '[DONE]': break
179
  try:
180
  dat = json.loads(payload_str).get("data", {})
181
  except (json.JSONDecodeError, AttributeError): continue
182
 
183
+ if answer_started:
 
 
 
 
 
 
 
 
 
 
 
184
  content = dat.get("delta_content") or dat.get("edit_content")
185
+ if content: raw_answer_parts.append(content)
186
+ continue
187
+
188
+ content_chunk = dat.get("edit_content") or dat.get("delta_content")
189
+ if not content_chunk: continue
190
+
191
+ if dat.get("edit_content") is not None:
192
+ current_raw_thinking = content_chunk
193
+ else:
194
+ current_raw_thinking += content_chunk
195
+
196
+ match = re.search(r'(.*</details>)(.*)', current_raw_thinking, flags=re.DOTALL)
197
+ if match:
198
+ last_thinking_content, answer_part = match.groups()
199
+ if answer_part: raw_answer_parts.append(answer_part)
200
+ answer_started = True
201
+ else:
202
+ last_thinking_content = current_raw_thinking
203
  else: continue
204
  break
205