bluewinliang commited on
Commit
13cb7f1
·
verified ·
1 Parent(s): 916ea12

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +55 -35
proxy_handler.py CHANGED
@@ -84,8 +84,38 @@ class ProxyHandler:
84
  comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
85
  think_open = False
86
  phase_cur = None
87
- # FIX: Buffer to hold the LATEST (aggregate) thinking content.
88
- last_thinking_content = ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
91
  if resp.status_code != 200:
@@ -102,9 +132,7 @@ class ProxyHandler:
102
 
103
  payload_str = line[6:]
104
  if payload_str == '[DONE]':
105
- # If stream ends but </think> was not sent, send it now.
106
- if think_open:
107
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
108
  yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"; yield "data: [DONE]\n\n"; return
109
  try:
110
  dat = json.loads(payload_str).get("data", {})
@@ -114,33 +142,21 @@ class ProxyHandler:
114
  if new_phase: phase_cur = new_phase
115
  if not phase_cur: continue
116
 
117
- content = dat.get("edit_content") or dat.get("delta_content")
118
- if not content: continue
119
-
120
- # FIX: Implement "wait-then-flush" logic.
121
  if phase_cur == "thinking":
122
- # Keep overwriting with the latest content. Only the final, aggregate version will be stored.
123
- last_thinking_content = content
124
-
125
- elif phase_cur == "answer":
126
- # When the first answer chunk arrives, flush the buffered thinking content.
127
- if last_thinking_content and settings.SHOW_THINK_TAGS and not think_open:
128
- cleaned_thought = self._clean_thinking_content(last_thinking_content)
129
- if cleaned_thought:
130
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]})}\n\n"
131
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_thought}, 'finish_reason': None}]})}\n\n"
132
- think_open = True # Mark as flushed
133
 
134
- # Close the think tag if it's open.
135
- if think_open:
136
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
137
- think_open = False # Ensure it's only closed once
138
-
139
- # Process and yield the answer content.
140
- cleaned_answer = self._clean_answer_content(content)
141
- if cleaned_answer:
142
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_answer}, 'finish_reason': None}]})}\n\n"
143
 
 
 
 
 
 
144
  except Exception:
145
  logger.exception("Stream error"); raise
146
 
@@ -148,6 +164,7 @@ class ProxyHandler:
148
  ck = None
149
  try:
150
  body, headers, ck = await self._prep_upstream(req)
 
151
  last_thinking_content = ""
152
  raw_answer_parts = []
153
  phase_cur = None
@@ -172,14 +189,17 @@ class ProxyHandler:
172
  if new_phase: phase_cur = new_phase
173
  if not phase_cur: continue
174
 
175
- content = dat.get("edit_content") or dat.get("delta_content")
176
- if not content: continue
177
-
178
- # This logic correctly captures the last thinking content and all answer parts.
179
  if phase_cur == "thinking":
180
- last_thinking_content = content
 
 
 
 
 
181
  elif phase_cur == "answer":
182
- raw_answer_parts.append(content)
 
 
183
  else: continue
184
  break
185
 
 
84
  comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
85
  think_open = False
86
  phase_cur = None
87
+ # Buffer for what has been sent to the client to calculate deltas.
88
+ yielded_think_buffer = ""
89
+ # Authoritative state of the current full thinking content.
90
+ current_thinking_content = ""
91
+
92
+ async def yield_delta_content(content_type: str, text: str):
93
+ nonlocal think_open, yielded_think_buffer
94
+ if content_type == "thinking" and settings.SHOW_THINK_TAGS:
95
+ if not think_open:
96
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]})}\n\n"
97
+ think_open = True
98
+
99
+ cleaned_full_text = self._clean_thinking_content(text)
100
+
101
+ if cleaned_full_text.startswith(yielded_think_buffer):
102
+ delta_to_send = cleaned_full_text[len(yielded_think_buffer):]
103
+ else:
104
+ delta_to_send = cleaned_full_text
105
+
106
+ if delta_to_send:
107
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': delta_to_send}, 'finish_reason': None}]})}\n\n"
108
+
109
+ yielded_think_buffer = cleaned_full_text
110
+
111
+ elif content_type == "answer":
112
+ if think_open:
113
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
114
+ think_open = False
115
+
116
+ cleaned_text = self._clean_answer_content(text)
117
+ if cleaned_text:
118
+ yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': cleaned_text}, 'finish_reason': None}]})}\n\n"
119
 
120
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
121
  if resp.status_code != 200:
 
132
 
133
  payload_str = line[6:]
134
  if payload_str == '[DONE]':
135
+ if think_open: yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
 
 
136
  yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"; yield "data: [DONE]\n\n"; return
137
  try:
138
  dat = json.loads(payload_str).get("data", {})
 
142
  if new_phase: phase_cur = new_phase
143
  if not phase_cur: continue
144
 
145
+ # Correctly handle delta_content (append) vs edit_content (replace).
 
 
 
146
  if phase_cur == "thinking":
147
+ if "edit_content" in dat and dat["edit_content"] is not None:
148
+ current_thinking_content = dat["edit_content"]
149
+ elif "delta_content" in dat and dat["delta_content"] is not None:
150
+ current_thinking_content += dat["delta_content"]
 
 
 
 
 
 
 
151
 
152
+ async for item in yield_delta_content("thinking", current_thinking_content):
153
+ yield item
 
 
 
 
 
 
 
154
 
155
+ elif phase_cur == "answer":
156
+ content = dat.get("delta_content") or dat.get("edit_content")
157
+ if content:
158
+ async for item in yield_delta_content("answer", content):
159
+ yield item
160
  except Exception:
161
  logger.exception("Stream error"); raise
162
 
 
164
  ck = None
165
  try:
166
  body, headers, ck = await self._prep_upstream(req)
167
+ # Use a single string to hold the latest full thinking content.
168
  last_thinking_content = ""
169
  raw_answer_parts = []
170
  phase_cur = None
 
189
  if new_phase: phase_cur = new_phase
190
  if not phase_cur: continue
191
 
 
 
 
 
192
  if phase_cur == "thinking":
193
+ if "edit_content" in dat and dat["edit_content"] is not None:
194
+ last_thinking_content = dat["edit_content"]
195
+ elif "delta_content" in dat and dat["delta_content"] is not None:
196
+ # In non-stream, we only need the final complete version.
197
+ # 'edit_content' is usually that. We can build it as a fallback.
198
+ last_thinking_content += dat["delta_content"]
199
  elif phase_cur == "answer":
200
+ content = dat.get("delta_content") or dat.get("edit_content")
201
+ if content:
202
+ raw_answer_parts.append(content)
203
  else: continue
204
  break
205