bluewinliang commited on
Commit
5ac14df
·
verified ·
1 Parent(s): 5b62571

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +22 -25
proxy_handler.py CHANGED
@@ -24,15 +24,12 @@ class ProxyHandler:
24
  http2=True,
25
  )
26
 
27
- # FIX: 移除 __aenter__ 和 __aexit__,改用顯式的 aclose 方法
28
- # aenter/aexit 模式不適用於需要跨越請求生命週期的流式響應
29
  async def aclose(self):
30
  """Closes the underlying httpx client."""
31
  if not self.client.is_closed:
32
  await self.client.aclose()
33
 
34
  # ---------- text utilities ----------
35
- # _balance_think_tag, _clean_thinking, _clean_answer, _serialize_msgs 方法保持不變
36
  def _balance_think_tag(self, txt: str) -> str:
37
  opens = len(re.findall(r"<think>", txt))
38
  closes = len(re.findall(r"</think>", txt))
@@ -66,7 +63,6 @@ class ProxyHandler:
66
 
67
  # ---------- upstream ----------
68
  async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str]:
69
- # 此方法保持不變
70
  ck = await cookie_manager.get_next_cookie()
71
  if not ck: raise HTTPException(503, "No available cookies")
72
 
@@ -96,8 +92,6 @@ class ProxyHandler:
96
  body, headers, ck = await self._prep_upstream(req)
97
  comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
98
  think_open = False
99
-
100
- # FIX: 維護一個持久的 phase 狀態
101
  phase_cur = None
102
 
103
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
@@ -120,8 +114,11 @@ class ProxyHandler:
120
  payload = line[6:]
121
  if payload == '[DONE]':
122
  if think_open:
123
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]})}\n\n"
124
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]})}\n\n"
 
 
 
125
  yield "data: [DONE]\n\n"; return
126
 
127
  try:
@@ -132,45 +129,49 @@ class ProxyHandler:
132
  dat = parsed.get("data", {})
133
  delta, new_phase = dat.get("delta_content", ""), dat.get("phase")
134
 
135
- # FIX: 正確的狀態管理邏輯
136
- # 1. 如果收到了新的 phase,更新當前 phase
137
  if new_phase and new_phase != phase_cur:
138
- # 處理從 thinking 到 answer 的過渡
139
  if phase_cur == "thinking" and new_phase == "answer" and think_open and settings.SHOW_THINK_TAGS:
140
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>\n'}, 'finish_reason': None}]})}\n\n"
 
 
 
 
 
 
 
 
141
  think_open = False
142
  phase_cur = new_phase
143
 
144
- if not delta: continue # 如果沒有內容,則跳過
145
 
146
- # 2. 根據當前的 phase_cur 處理內容
147
  text_to_yield = ""
148
  if phase_cur == "thinking":
149
  if settings.SHOW_THINK_TAGS:
150
  if not think_open:
151
- yield f"data: {json.dumps({'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]})}\n\n"
 
152
  think_open = True
153
  text_to_yield = self._clean_thinking(delta)
154
  elif phase_cur == "answer":
155
  text_to_yield = self._clean_answer(delta)
156
 
157
- # 3. 發送處理後的內容
158
  if text_to_yield:
159
- out = {
160
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model,
161
  "choices": [{"index": 0, "delta": {"content": text_to_yield}, "finish_reason": None}],
162
  }
163
- yield f"data: {json.dumps(out)}\n\n"
164
 
165
  except httpx.RequestError as e:
166
  if ck: await cookie_manager.mark_cookie_invalid(ck)
167
  logger.error(f"Request error: {e}")
168
  err_msg = f"Connection error: {e}"
169
- err = {"choices": [{"delta": {"content": err_msg}, "finish_reason": "stop"}]}
170
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
171
  except Exception as e:
172
  logger.exception(f"Unexpected error in stream_proxy_response")
173
- err = {"choices": [{"delta": {"content": f"Internal error in stream"}, "finish_reason": "stop"}]}
174
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
175
 
176
  # ---------- non-stream ----------
@@ -180,7 +181,6 @@ class ProxyHandler:
180
  body, headers, ck = await self._prep_upstream(req)
181
  think_buf, answer_buf = [], []
182
 
183
- # FIX: 維護一個持久的 phase 狀態
184
  phase_cur = None
185
 
186
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
@@ -203,19 +203,16 @@ class ProxyHandler:
203
  dat = parsed.get("data", {})
204
  delta, new_phase = dat.get("delta_content", ""), dat.get("phase")
205
 
206
- # FIX: 正確的狀態管理邏輯
207
- # 1. 更新當前 phase
208
  if new_phase:
209
  phase_cur = new_phase
210
 
211
  if not delta: continue
212
 
213
- # 2. 根據當前的 phase_cur 存儲內容
214
  if phase_cur == "thinking":
215
  think_buf.append(delta)
216
  elif phase_cur == "answer":
217
  answer_buf.append(delta)
218
- else: # for-else, will be executed if loop finishes without break
219
  pass
220
 
221
  raw_answer = ''.join(answer_buf)
 
24
  http2=True,
25
  )
26
 
 
 
27
  async def aclose(self):
28
  """Closes the underlying httpx client."""
29
  if not self.client.is_closed:
30
  await self.client.aclose()
31
 
32
  # ---------- text utilities ----------
 
33
  def _balance_think_tag(self, txt: str) -> str:
34
  opens = len(re.findall(r"<think>", txt))
35
  closes = len(re.findall(r"</think>", txt))
 
63
 
64
  # ---------- upstream ----------
65
  async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str]:
 
66
  ck = await cookie_manager.get_next_cookie()
67
  if not ck: raise HTTPException(503, "No available cookies")
68
 
 
92
  body, headers, ck = await self._prep_upstream(req)
93
  comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
94
  think_open = False
 
 
95
  phase_cur = None
96
 
97
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
 
114
  payload = line[6:]
115
  if payload == '[DONE]':
116
  if think_open:
117
+ close_think_chunk = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]}
118
+ yield f"data: {json.dumps(close_think_chunk)}\n\n"
119
+
120
+ final_chunk = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]}
121
+ yield f"data: {json.dumps(final_chunk)}\n\n"
122
  yield "data: [DONE]\n\n"; return
123
 
124
  try:
 
129
  dat = parsed.get("data", {})
130
  delta, new_phase = dat.get("delta_content", ""), dat.get("phase")
131
 
 
 
132
  if new_phase and new_phase != phase_cur:
 
133
  if phase_cur == "thinking" and new_phase == "answer" and think_open and settings.SHOW_THINK_TAGS:
134
+ # --- FIX START ---
135
+ # 將包含反斜線的字典先創建好,再放入 f-string
136
+ close_payload = {
137
+ 'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()),
138
+ 'model': req.model,
139
+ 'choices': [{'index': 0, 'delta': {'content': '</think>\n'}, 'finish_reason': None}]
140
+ }
141
+ yield f"data: {json.dumps(close_payload)}\n\n"
142
+ # --- FIX END ---
143
  think_open = False
144
  phase_cur = new_phase
145
 
146
+ if not delta: continue
147
 
 
148
  text_to_yield = ""
149
  if phase_cur == "thinking":
150
  if settings.SHOW_THINK_TAGS:
151
  if not think_open:
152
+ open_think_chunk = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]}
153
+ yield f"data: {json.dumps(open_think_chunk)}\n\n"
154
  think_open = True
155
  text_to_yield = self._clean_thinking(delta)
156
  elif phase_cur == "answer":
157
  text_to_yield = self._clean_answer(delta)
158
 
 
159
  if text_to_yield:
160
+ out_chunk = {
161
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model,
162
  "choices": [{"index": 0, "delta": {"content": text_to_yield}, "finish_reason": None}],
163
  }
164
+ yield f"data: {json.dumps(out_chunk)}\n\n"
165
 
166
  except httpx.RequestError as e:
167
  if ck: await cookie_manager.mark_cookie_invalid(ck)
168
  logger.error(f"Request error: {e}")
169
  err_msg = f"Connection error: {e}"
170
+ err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": err_msg}, "finish_reason": "stop"}]}
171
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
172
  except Exception as e:
173
  logger.exception(f"Unexpected error in stream_proxy_response")
174
+ err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": f"Internal error in stream"}, "finish_reason": "stop"}]}
175
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
176
 
177
  # ---------- non-stream ----------
 
181
  body, headers, ck = await self._prep_upstream(req)
182
  think_buf, answer_buf = [], []
183
 
 
184
  phase_cur = None
185
 
186
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
 
203
  dat = parsed.get("data", {})
204
  delta, new_phase = dat.get("delta_content", ""), dat.get("phase")
205
 
 
 
206
  if new_phase:
207
  phase_cur = new_phase
208
 
209
  if not delta: continue
210
 
 
211
  if phase_cur == "thinking":
212
  think_buf.append(delta)
213
  elif phase_cur == "answer":
214
  answer_buf.append(delta)
215
+ else:
216
  pass
217
 
218
  raw_answer = ''.join(answer_buf)