bluewinliang commited on
Commit
d64c814
·
verified ·
1 Parent(s): 57d07dc

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +34 -121
proxy_handler.py CHANGED
@@ -6,7 +6,6 @@ import json
6
  import logging
7
  import re
8
  import time
9
- import uuid
10
  from typing import AsyncGenerator, Dict, Any, Optional
11
  import httpx
12
  from fastapi import HTTPException
@@ -39,8 +38,6 @@ class ProxyHandler:
39
  return content
40
 
41
  logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
42
- logger.debug(f"Original content length: {len(content)}")
43
- logger.debug(f"Original content preview: {content[:200]}...")
44
 
45
  # Optionally remove thinking content based on configuration
46
  if not settings.SHOW_THINK_TAGS:
@@ -92,8 +89,6 @@ class ProxyHandler:
92
  else:
93
  content += "</think>"
94
 
95
- logger.debug(f"Final transformed content length: {len(content)}")
96
- logger.debug(f"Final transformed content preview: {content[:200]}...")
97
  return content.strip()
98
 
99
  async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
@@ -114,10 +109,17 @@ class ProxyHandler:
114
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
115
  )
116
 
 
 
 
 
117
  # Prepare request data
118
  request_data = request.model_dump(exclude_none=True)
 
119
 
120
  # Build request data based on actual Z.AI format from zai-messages.md
 
 
121
  request_data = {
122
  "stream": True, # Always request streaming from Z.AI for processing
123
  "model": target_model,
@@ -208,7 +210,6 @@ class ProxyHandler:
208
 
209
  try:
210
  parsed = json.loads(payload)
211
- logger.debug(f"Parsed chunk: {parsed}")
212
 
213
  # Check for errors first
214
  if parsed.get("error") or (parsed.get("data", {}).get("error")):
@@ -228,107 +229,16 @@ class ProxyHandler:
228
  parsed["data"].pop("edit_index", None)
229
  parsed["data"].pop("edit_content", None)
230
 
 
 
 
231
  yield parsed
232
 
233
- except json.JSONDecodeError as e:
234
- logger.debug(f"Failed to parse JSON: {line}, error: {e}")
235
  continue # Skip non-JSON lines
236
 
237
- async def stream_response(
238
- self, response: httpx.Response, model: str
239
- ) -> AsyncGenerator[str, None]:
240
- """Generate OpenAI-compatible streaming response"""
241
- try:
242
- chunk_id = f"chatcmpl-{uuid.uuid4()}"
243
-
244
- async for parsed in self.process_streaming_response(response):
245
- # 取得增量內容
246
- delta_content = parsed.get("data", {}).get("delta_content", "")
247
- phase = parsed.get("data", {}).get("phase", "")
248
-
249
- logger.debug(f"Processing chunk - phase: {phase}, content: {delta_content[:50]}...")
250
-
251
- # 如果沒有內容就跳過
252
- if not delta_content:
253
- continue
254
-
255
- # 根據 SHOW_THINK_TAGS 設定決定是否輸出
256
- should_output = True
257
-
258
- if not settings.SHOW_THINK_TAGS:
259
- # 如果不顯示思考標籤,只輸出答案階段的內容
260
- if phase != "answer":
261
- logger.debug(f"Skipping non-answer phase: {phase}")
262
- should_output = False
263
- else:
264
- # 如果顯示思考標籤,對思考內容進行轉換
265
- if phase == "thinking":
266
- # 將 <details> 轉換為 <think>
267
- delta_content = delta_content.replace("<details", "<think")
268
- delta_content = delta_content.replace("</details>", "</think>")
269
- # 移除 <summary> 標籤
270
- delta_content = re.sub(r"<summary>.*?</summary>", "", delta_content, flags=re.DOTALL)
271
-
272
- if should_output and delta_content:
273
- # 建立 OpenAI 格式的 chunk
274
- chunk = {
275
- "id": chunk_id,
276
- "object": "chat.completion.chunk",
277
- "created": int(time.time()),
278
- "model": model,
279
- "choices": [
280
- {
281
- "index": 0,
282
- "delta": {"content": delta_content},
283
- "finish_reason": None,
284
- }
285
- ],
286
- }
287
-
288
- chunk_json = json.dumps(chunk, ensure_ascii=False)
289
- logger.debug(f"Yielding chunk: {chunk_json}")
290
- yield f"data: {chunk_json}\n\n"
291
-
292
- # 發送完成標記
293
- final_chunk = {
294
- "id": chunk_id,
295
- "object": "chat.completion.chunk",
296
- "created": int(time.time()),
297
- "model": model,
298
- "choices": [
299
- {
300
- "index": 0,
301
- "delta": {},
302
- "finish_reason": "stop",
303
- }
304
- ],
305
- }
306
- yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"
307
- yield "data: [DONE]\n\n"
308
-
309
- except Exception as e:
310
- logger.error(f"Error in stream_response: {e}")
311
- # 發送錯誤訊息
312
- error_chunk = {
313
- "id": f"chatcmpl-{uuid.uuid4()}",
314
- "object": "chat.completion.chunk",
315
- "created": int(time.time()),
316
- "model": model,
317
- "choices": [
318
- {
319
- "index": 0,
320
- "delta": {"content": f"Error: {str(e)}"},
321
- "finish_reason": "stop",
322
- }
323
- ],
324
- }
325
- yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"
326
- yield "data: [DONE]\n\n"
327
-
328
  async def handle_chat_completion(self, request: ChatCompletionRequest):
329
  """Handle chat completion request"""
330
- logger.info(f"Handling chat completion request - stream: {request.stream}, SHOW_THINK_TAGS: {settings.SHOW_THINK_TAGS}")
331
-
332
  proxy_result = await self.proxy_request(request)
333
  response = proxy_result["response"]
334
 
@@ -337,23 +247,25 @@ class ProxyHandler:
337
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
338
  )
339
 
340
- logger.info(f"Final streaming mode: {is_streaming}")
341
-
342
  if is_streaming:
343
- # For streaming responses
344
  return StreamingResponse(
345
  self.stream_response(response, request.model),
346
  media_type="text/event-stream",
347
  headers={
348
  "Cache-Control": "no-cache",
349
  "Connection": "keep-alive",
350
- "X-Accel-Buffering": "no", # 對 nginx 有用
351
  },
352
  )
353
  else:
354
  # For non-streaming responses, SHOW_THINK_TAGS setting applies
355
  return await self.non_stream_response(response, request.model)
356
 
 
 
 
 
 
357
  async def non_stream_response(
358
  self, response: httpx.Response, model: str
359
  ) -> ChatCompletionResponse:
@@ -361,7 +273,7 @@ class ProxyHandler:
361
  chunks = []
362
  async for parsed in self.process_streaming_response(response):
363
  chunks.append(parsed)
364
- logger.debug(f"Received chunk: {parsed}")
365
 
366
  if not chunks:
367
  raise HTTPException(status_code=500, detail="No response from upstream")
@@ -370,28 +282,29 @@ class ProxyHandler:
370
  logger.debug(f"First chunk structure: {chunks[0] if chunks else 'None'}")
371
 
372
  # Aggregate content based on SHOW_THINK_TAGS setting
373
- full_content = ""
374
-
375
- for chunk in chunks:
376
- delta_content = chunk.get("data", {}).get("delta_content", "")
377
- phase = chunk.get("data", {}).get("phase", "")
378
-
379
- if settings.SHOW_THINK_TAGS:
380
- # Include all content
381
- full_content += delta_content
382
- else:
383
- # Only include answer phase content
384
- if phase == "answer":
385
- full_content += delta_content
386
 
387
  logger.info(f"Aggregated content length: {len(full_content)}")
388
- logger.debug(f"Full aggregated content preview: {full_content[:200]}...")
 
 
389
 
390
  # Apply content transformation (including think tag filtering)
391
  transformed_content = self.transform_content(full_content)
392
 
393
  logger.info(f"Transformed content length: {len(transformed_content)}")
394
- logger.debug(f"Transformed content preview: {transformed_content[:200]}...")
395
 
396
  # Create OpenAI-compatible response
397
  return ChatCompletionResponse(
 
6
  import logging
7
  import re
8
  import time
 
9
  from typing import AsyncGenerator, Dict, Any, Optional
10
  import httpx
11
  from fastapi import HTTPException
 
38
  return content
39
 
40
  logger.debug(f"SHOW_THINK_TAGS setting: {settings.SHOW_THINK_TAGS}")
 
 
41
 
42
  # Optionally remove thinking content based on configuration
43
  if not settings.SHOW_THINK_TAGS:
 
89
  else:
90
  content += "</think>"
91
 
 
 
92
  return content.strip()
93
 
94
  async def proxy_request(self, request: ChatCompletionRequest) -> Dict[str, Any]:
 
109
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
110
  )
111
 
112
+ # Validate parameter compatibility
113
+ if is_streaming and not settings.SHOW_THINK_TAGS:
114
+ logger.warning("SHOW_THINK_TAGS=false is ignored for streaming responses")
115
+
116
  # Prepare request data
117
  request_data = request.model_dump(exclude_none=True)
118
+ request_data["model"] = target_model
119
 
120
  # Build request data based on actual Z.AI format from zai-messages.md
121
+ import uuid
122
+
123
  request_data = {
124
  "stream": True, # Always request streaming from Z.AI for processing
125
  "model": target_model,
 
210
 
211
  try:
212
  parsed = json.loads(payload)
 
213
 
214
  # Check for errors first
215
  if parsed.get("error") or (parsed.get("data", {}).get("error")):
 
229
  parsed["data"].pop("edit_index", None)
230
  parsed["data"].pop("edit_content", None)
231
 
232
+ # Note: We don't transform delta_content here because <think> tags
233
+ # might span multiple chunks. We'll transform the final aggregated content.
234
+
235
  yield parsed
236
 
237
+ except json.JSONDecodeError:
 
238
  continue # Skip non-JSON lines
239
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
240
  async def handle_chat_completion(self, request: ChatCompletionRequest):
241
  """Handle chat completion request"""
 
 
242
  proxy_result = await self.proxy_request(request)
243
  response = proxy_result["response"]
244
 
 
247
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
248
  )
249
 
 
 
250
  if is_streaming:
251
+ # For streaming responses, SHOW_THINK_TAGS setting is ignored
252
  return StreamingResponse(
253
  self.stream_response(response, request.model),
254
  media_type="text/event-stream",
255
  headers={
256
  "Cache-Control": "no-cache",
257
  "Connection": "keep-alive",
 
258
  },
259
  )
260
  else:
261
  # For non-streaming responses, SHOW_THINK_TAGS setting applies
262
  return await self.non_stream_response(response, request.model)
263
 
264
+ """Generate streaming response"""
265
+ async for parsed in self.process_streaming_response(response):
266
+ yield f"data: {json.dumps(parsed)}\n\n"
267
+ yield "data: [DONE]\n\n"
268
+
269
  async def non_stream_response(
270
  self, response: httpx.Response, model: str
271
  ) -> ChatCompletionResponse:
 
273
  chunks = []
274
  async for parsed in self.process_streaming_response(response):
275
  chunks.append(parsed)
276
+ logger.debug(f"Received chunk: {parsed}") # Debug log
277
 
278
  if not chunks:
279
  raise HTTPException(status_code=500, detail="No response from upstream")
 
282
  logger.debug(f"First chunk structure: {chunks[0] if chunks else 'None'}")
283
 
284
  # Aggregate content based on SHOW_THINK_TAGS setting
285
+ if settings.SHOW_THINK_TAGS:
286
+ # Include all content
287
+ full_content = "".join(
288
+ chunk.get("data", {}).get("delta_content", "") for chunk in chunks
289
+ )
290
+ else:
291
+ # Only include answer phase content
292
+ full_content = "".join(
293
+ chunk.get("data", {}).get("delta_content", "")
294
+ for chunk in chunks
295
+ if chunk.get("data", {}).get("phase") == "answer"
296
+ )
 
297
 
298
  logger.info(f"Aggregated content length: {len(full_content)}")
299
+ logger.debug(
300
+ f"Full aggregated content: {full_content}"
301
+ ) # Show full content for debugging
302
 
303
  # Apply content transformation (including think tag filtering)
304
  transformed_content = self.transform_content(full_content)
305
 
306
  logger.info(f"Transformed content length: {len(transformed_content)}")
307
+ logger.debug(f"Transformed content: {transformed_content[:200]}...")
308
 
309
  # Create OpenAI-compatible response
310
  return ChatCompletionResponse(