bluewinliang commited on
Commit
b5903c1
·
verified ·
1 Parent(s): 7b42cea

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +85 -73
proxy_handler.py CHANGED
@@ -112,24 +112,26 @@ class ProxyHandler:
112
 
113
  # ---------- stream ----------
114
  async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
115
- client = None
116
  try:
117
- client = httpx.AsyncClient(
118
- timeout=httpx.Timeout(60.0, read=300.0),
119
- limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
120
- http2=True,
121
- )
122
  body, headers, ck = await self._prep_upstream(req)
123
  comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
124
- think_open, phase_cur, first_answer_chunk = False, None, True
 
125
 
126
- async with client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
127
  if resp.status_code != 200:
128
  await cookie_manager.mark_cookie_invalid(ck)
 
 
 
 
 
 
129
  err = {
130
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
131
  "model": req.model,
132
- "choices": [{"index": 0, "delta": {"content": f"Error: {resp.status_code}"}, "finish_reason": "stop"}],
133
  }
134
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
135
 
@@ -141,13 +143,15 @@ class ProxyHandler:
141
 
142
  payload = line[6:]
143
  if payload == '[DONE]':
144
- if think_open and settings.SHOW_THINK_TAGS:
145
  close_c = {
146
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
147
  "model": req.model,
148
  "choices": [{"index": 0, "delta": {"content": "</think>"}, "finish_reason": None}],
149
  }
150
  yield f"data: {json.dumps(close_c)}\n\n"
 
 
151
  final_c = {
152
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
153
  "model": req.model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
@@ -157,57 +161,56 @@ class ProxyHandler:
157
  try:
158
  parsed = json.loads(payload)
159
  except json.JSONDecodeError:
 
160
  continue
 
161
  dat = parsed.get("data", {})
162
  delta, phase = dat.get("delta_content", ""), dat.get("phase")
163
- if not delta: continue
164
 
165
- # phase change handling
166
- if phase != phase_cur:
167
- phase_cur = phase
168
- if phase == "answer":
 
169
  if think_open and settings.SHOW_THINK_TAGS:
170
- # 關閉 think 標籤
171
- auto_close = {
172
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
173
  "model": req.model,
174
- "choices": [{"index": 0, "delta": {"content": "</think>"}, "finish_reason": None}],
175
  }
176
- yield f"data: {json.dumps(auto_close)}\n\n"
177
  think_open = False
178
- first_answer_chunk = True # 標記第一個answer chunk
179
 
180
- if phase == "thinking" and not settings.SHOW_THINK_TAGS:
181
- continue
 
 
 
 
 
 
 
 
 
 
 
 
 
182
 
183
- # 處理內容,但不要過度清理
184
- if phase == "thinking":
185
- if settings.SHOW_THINK_TAGS and not think_open:
186
- open_c = {
187
- "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
188
- "model": req.model,
189
- "choices": [{"index": 0, "delta": {"content": "<think>"}, "finish_reason": None}],
190
- }
191
- yield f"data: {json.dumps(open_c)}\n\n"
192
- think_open = True
193
- # 對thinking內容進行最小化清理
194
- text = self._clean_thinking(delta) if delta else ""
195
- else: # answer phase
196
- # 對answer內容進行最小化清理,主要只移除details標籤
197
- text = self._clean_answer(delta) if delta else ""
198
- # 如果是第一個answer chunk且顯示think tags,在前面加上換行
199
- if first_answer_chunk and settings.SHOW_THINK_TAGS and text.strip():
200
- text = "\n" + text
201
- first_answer_chunk = False
202
 
203
- # 只有在text有內容時才輸出
204
- if text:
 
205
  out = {
206
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
207
  "model": req.model,
208
- "choices": [{"index": 0, "delta": {"content": text}, "finish_reason": None}],
209
  }
210
  yield f"data: {json.dumps(out)}\n\n"
 
211
 
212
  except httpx.RequestError as e:
213
  logger.error(f"Request error: {e}")
@@ -218,32 +221,31 @@ class ProxyHandler:
218
  }
219
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
220
  except Exception as e:
221
- logger.error(f"Unexpected error: {e}")
222
  err = {
223
  "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "object": "chat.completion.chunk",
224
  "created": int(time.time()), "model": req.model,
225
- "choices": [{"index": 0, "delta": {"content": f"Internal error: {e}"}, "finish_reason": "stop"}],
226
  }
227
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
228
- finally:
229
- if client: await client.aclose()
230
 
231
  # ---------- non-stream ----------
232
  async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
233
- client = None
 
 
234
  try:
235
- client = httpx.AsyncClient(
236
- timeout=httpx.Timeout(60.0, read=300.0),
237
- limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
238
- http2=True,
239
- )
240
  body, headers, ck = await self._prep_upstream(req)
241
  think_buf, answer_buf = [], []
242
-
243
- async with client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
 
244
  if resp.status_code != 200:
245
  await cookie_manager.mark_cookie_invalid(ck)
246
- raise HTTPException(resp.status_code, "Upstream error")
 
 
 
247
  async for raw in resp.aiter_text():
248
  if not raw or raw.isspace(): continue
249
  for line in raw.split('\n'):
@@ -251,46 +253,56 @@ class ProxyHandler:
251
  if not line or line.startswith(':') or not line.startswith('data: '): continue
252
  payload = line[6:]
253
  if payload == '[DONE]': break
254
- try: parsed = json.loads(payload)
255
- except json.JSONDecodeError: continue
 
 
256
  dat = parsed.get("data", {})
257
  delta, phase = dat.get("delta_content", ""), dat.get("phase")
 
258
  if not delta: continue
259
- # 直接保存原始內容,不在這裡進行清理
260
  if phase == "thinking":
261
  think_buf.append(delta)
262
  elif phase == "answer":
263
  answer_buf.append(delta)
 
 
 
264
 
265
  # 合併內容後再進行清理
266
  raw_answer = ''.join(answer_buf)
267
- ans_text = self._clean_answer(raw_answer) if raw_answer else ""
268
-
 
269
  if settings.SHOW_THINK_TAGS and think_buf:
270
  raw_thinking = ''.join(think_buf)
271
- think_text = self._clean_thinking(raw_thinking) if raw_thinking else ""
272
- final = f"<think>{think_text}</think>\n{ans_text}"
273
- else:
274
- final = ans_text
275
 
276
  return ChatCompletionResponse(
277
  id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
278
  created=int(time.time()),
279
  model=req.model,
280
- choices=[{"index": 0, "message": {"role": "assistant", "content": final}, "finish_reason": "stop"}],
 
281
  )
282
  except httpx.RequestError as e:
283
  logger.error(f"Non-stream request error: {e}")
284
- if "ck" in locals(): await cookie_manager.mark_cookie_invalid(ck)
285
- raise HTTPException(502, "Connection error")
286
  except Exception as e:
287
- logger.error(f"Non-stream unexpected error: {e}")
288
- raise HTTPException(500, "Internal error")
289
- finally:
290
- if client: await client.aclose()
291
 
292
  # ---------- FastAPI entry ----------
293
  async def handle_chat_completion(self, req: ChatCompletionRequest):
 
 
 
294
  stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM
295
  if stream:
296
  return StreamingResponse(
 
112
 
113
  # ---------- stream ----------
114
  async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
115
+ # 使用 self.client 而不是每次都創建新的 client,以利用連接池
116
  try:
 
 
 
 
 
117
  body, headers, ck = await self._prep_upstream(req)
118
  comp_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
119
+ think_open, phase_cur = False, None
120
+ # FIX: 移除了 first_answer_chunk 標誌,改用更可靠的狀態管理
121
 
122
+ async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers, timeout=self.client.timeout) as resp:
123
  if resp.status_code != 200:
124
  await cookie_manager.mark_cookie_invalid(ck)
125
+ err_msg = f"Upstream error: {resp.status_code}"
126
+ try:
127
+ err_body = await resp.aread()
128
+ err_msg += f" - {err_body.decode()}"
129
+ except Exception:
130
+ pass
131
  err = {
132
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
133
  "model": req.model,
134
+ "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],
135
  }
136
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
137
 
 
143
 
144
  payload = line[6:]
145
  if payload == '[DONE]':
146
+ if think_open: # 如果 <think> 標籤仍然是打開的,在結束前關閉它
147
  close_c = {
148
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
149
  "model": req.model,
150
  "choices": [{"index": 0, "delta": {"content": "</think>"}, "finish_reason": None}],
151
  }
152
  yield f"data: {json.dumps(close_c)}\n\n"
153
+ think_open = False
154
+
155
  final_c = {
156
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
157
  "model": req.model, "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
 
161
  try:
162
  parsed = json.loads(payload)
163
  except json.JSONDecodeError:
164
+ logger.warning(f"Failed to decode JSON payload: {payload}")
165
  continue
166
+
167
  dat = parsed.get("data", {})
168
  delta, phase = dat.get("delta_content", ""), dat.get("phase")
 
169
 
170
+ # --- 主要修復邏輯開始 ---
171
+ # 1. 處理階段變化 (Phase Transition)
172
+ if phase and phase != phase_cur:
173
+ # 當從 'thinking' 切換到 'answer' 時,關閉 <think> 標籤
174
+ if phase_cur == "thinking" and phase == "answer":
175
  if think_open and settings.SHOW_THINK_TAGS:
176
+ close_c = {
 
177
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
178
  "model": req.model,
179
+ "choices": [{"index": 0, "delta": {"content": "</think>\n"}, "finish_reason": None}],
180
  }
181
+ yield f"data: {json.dumps(close_c)}\n\n"
182
  think_open = False
183
+ phase_cur = phase
184
 
185
+ # 2. 處理內容 (Content Processing)
186
+ # 這個邏輯塊獨立於階段變化,確保當前 chunk 的內容總是被處理
187
+ text_to_yield = ""
188
+ if phase_cur == "thinking":
189
+ if settings.SHOW_THINK_TAGS:
190
+ # 如果 think 標籤還沒打開,就打開它
191
+ if not think_open:
192
+ open_c = {
193
+ "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
194
+ "model": req.model,
195
+ "choices": [{"index": 0, "delta": {"content": "<think>"}, "finish_reason": None}],
196
+ }
197
+ yield f"data: {json.dumps(open_c)}\n\n"
198
+ think_open = True
199
+ text_to_yield = self._clean_thinking(delta)
200
 
201
+ elif phase_cur == "answer":
202
+ text_to_yield = self._clean_answer(delta)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
 
204
+ # 3. 發送內容 (Yield Content)
205
+ # 只有在 text_to_yield 有實際內容時才發送,避免發送空 chunk
206
+ if text_to_yield:
207
  out = {
208
  "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()),
209
  "model": req.model,
210
+ "choices": [{"index": 0, "delta": {"content": text_to_yield}, "finish_reason": None}],
211
  }
212
  yield f"data: {json.dumps(out)}\n\n"
213
+ # --- 主要修復邏輯結束 ---
214
 
215
  except httpx.RequestError as e:
216
  logger.error(f"Request error: {e}")
 
221
  }
222
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
223
  except Exception as e:
224
+ logger.exception(f"Unexpected error in stream_proxy_response: {e}") # 使用 exception 打印 traceback
225
  err = {
226
  "id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "object": "chat.completion.chunk",
227
  "created": int(time.time()), "model": req.model,
228
+ "choices": [{"index": 0, "delta": {"content": f"Internal server error."}, "finish_reason": "stop"}],
229
  }
230
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
 
 
231
 
232
  # ---------- non-stream ----------
233
  async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
234
+ # 非流式邏輯本身比較穩健,因為它先收集所有數據再處理。
235
+ # 此處的邏輯已是最佳實踐,無需大改。
236
+ ck = None # 在 try 外部定義,以便 finally 中可以訪問
237
  try:
 
 
 
 
 
238
  body, headers, ck = await self._prep_upstream(req)
239
  think_buf, answer_buf = [], []
240
+
241
+ # 確保使用實例的 client 和其 timeout 設置
242
+ async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers, timeout=self.client.timeout) as resp:
243
  if resp.status_code != 200:
244
  await cookie_manager.mark_cookie_invalid(ck)
245
+ error_detail = await resp.text()
246
+ logger.error(f"Upstream error {resp.status_code}: {error_detail}")
247
+ raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
248
+
249
  async for raw in resp.aiter_text():
250
  if not raw or raw.isspace(): continue
251
  for line in raw.split('\n'):
 
253
  if not line or line.startswith(':') or not line.startswith('data: '): continue
254
  payload = line[6:]
255
  if payload == '[DONE]': break
256
+ try:
257
+ parsed = json.loads(payload)
258
+ except json.JSONDecodeError:
259
+ continue
260
  dat = parsed.get("data", {})
261
  delta, phase = dat.get("delta_content", ""), dat.get("phase")
262
+
263
  if not delta: continue
264
+
265
  if phase == "thinking":
266
  think_buf.append(delta)
267
  elif phase == "answer":
268
  answer_buf.append(delta)
269
+ # 循環結束後 break
270
+ else: # for-else 語句,如果 for 循環正常結束(非 break),則執行
271
+ pass # 此處不需要做任何事
272
 
273
  # 合併內容後再進行清理
274
  raw_answer = ''.join(answer_buf)
275
+ ans_text = self._clean_answer(raw_answer)
276
+
277
+ final_content = ans_text
278
  if settings.SHOW_THINK_TAGS and think_buf:
279
  raw_thinking = ''.join(think_buf)
280
+ think_text = self._clean_thinking(raw_thinking)
281
+ # 確保 thinking 內容不為空時才添加標籤和換行
282
+ if think_text:
283
+ final_content = f"<think>{think_text}</think>\n{ans_text}"
284
 
285
  return ChatCompletionResponse(
286
  id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
287
  created=int(time.time()),
288
  model=req.model,
289
+ choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": "stop"}],
290
+ # 可以在此添加 usage 信息,如果 API 返回的話
291
  )
292
  except httpx.RequestError as e:
293
  logger.error(f"Non-stream request error: {e}")
294
+ if ck: await cookie_manager.mark_cookie_invalid(ck)
295
+ raise HTTPException(502, f"Connection error to upstream: {e}")
296
  except Exception as e:
297
+ logger.exception(f"Non-stream unexpected error: {e}") # 使用 exception 打印 traceback
298
+ raise HTTPException(500, "Internal server error")
299
+
 
300
 
301
  # ---------- FastAPI entry ----------
302
  async def handle_chat_completion(self, req: ChatCompletionRequest):
303
+ # 移除對 self.client 的重複創建,改用 __aenter__ 和 __aexit__
304
+ # 在 FastAPI 中,通常使用 Depends 來管理依賴項的生命週期
305
+ # 但這裡 ProxyHandler 作為一個普通類,這樣的寫法也是可以的
306
  stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM
307
  if stream:
308
  return StreamingResponse(