bluewinliang commited on
Commit
1f7adcb
·
verified ·
1 Parent(s): 6b2e3a0

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +77 -101
proxy_handler.py CHANGED
@@ -8,7 +8,6 @@ import httpx
8
  from fastapi import HTTPException
9
  from fastapi.responses import StreamingResponse
10
 
11
- # 確保這些導入與您的項目結構匹配
12
  from config import settings
13
  from cookie_manager import cookie_manager
14
  from models import ChatCompletionRequest, ChatCompletionResponse
@@ -28,24 +27,25 @@ class ProxyHandler:
28
  if not self.client.is_closed:
29
  await self.client.aclose()
30
 
31
- # _balance_think_tag, _clean_thinking, _clean_answer, _serialize_msgs, _prep_upstream 保持不變
32
- def _balance_think_tag(self, txt: str) -> str:
33
- opens = len(re.findall(r"<think>", txt))
34
- closes = len(re.findall(r"</think>", txt))
35
- if opens > closes: txt += "</think>" * (opens - closes)
36
- elif closes > opens:
37
- for _ in range(closes - opens): txt = re.sub(r"</think>(?!.*</think>)", "", txt, 1)
38
- return txt
39
  def _clean_thinking(self, s: str) -> str:
40
- if not s: return s
 
 
41
  s = re.sub(r'<details[^>]*>.*?</details>', '', s, flags=re.DOTALL)
42
  s = re.sub(r'<summary[^>]*>.*?</summary>', '', s, flags=re.DOTALL)
 
43
  s = re.sub(r'<[^>]+>', '', s)
 
44
  s = re.sub(r'^\s*>\s*', '', s, flags=re.MULTILINE)
45
- return s.strip()
 
46
  def _clean_answer(self, s: str) -> str:
47
- if not s: return s
 
48
  return re.sub(r"<details[^>]*>.*?</details>", "", s, flags=re.DOTALL)
 
 
49
  def _serialize_msgs(self, msgs) -> list:
50
  out = []
51
  for m in msgs:
@@ -62,8 +62,7 @@ class ProxyHandler:
62
  headers = { "Content-Type": "application/json", "Authorization": f"Bearer {ck}", "User-Agent": ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"), "Accept": "application/json, text/event-stream", "Accept-Language": "zh-CN", "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "x-fe-version": "prod-fe-1.0.53", "Origin": "https://chat.z.ai", "Referer": "https://chat.z.ai/",}
63
  return body, headers, ck
64
 
65
-
66
- # ---------- stream ----------
67
  async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
68
  ck = None
69
  try:
@@ -75,9 +74,7 @@ class ProxyHandler:
75
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
76
  if resp.status_code != 200:
77
  await cookie_manager.mark_cookie_invalid(ck)
78
- # ... (error handling code remains the same)
79
- err_body = await resp.aread()
80
- err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}"
81
  err = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],}
82
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
83
 
@@ -85,149 +82,128 @@ class ProxyHandler:
85
  for line in raw.strip().split('\n'):
86
  line = line.strip()
87
  if not line or not line.startswith('data: '): continue
88
-
89
  payload_str = line[6:]
90
  if payload_str == '[DONE]':
91
  if think_open:
92
- close_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]}
93
- yield f"data: {json.dumps(close_payload)}\n\n"
94
- final_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]}
95
- yield f"data: {json.dumps(final_payload)}\n\n"
96
- yield "data: [DONE]\n\n"; return
97
-
98
  try:
99
- parsed = json.loads(payload_str)
100
- dat = parsed.get("data", {})
101
- except json.JSONDecodeError:
102
  continue
103
-
104
- # --- FIX START: REVISED LOGIC ---
105
  delta = dat.get("delta_content", "")
106
  new_phase = dat.get("phase")
107
 
108
- # Step 1: Determine the effective phase for THIS chunk
109
- effective_phase = new_phase or phase_cur
110
-
111
- # Step 2: Handle phase transition side-effects (closing tags) BEFORE processing content
112
- if new_phase and new_phase != phase_cur:
113
- if phase_cur == "thinking" and new_phase == "answer":
114
- if think_open and settings.SHOW_THINK_TAGS:
115
- close_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>\n'}, 'finish_reason': None}]}
116
- yield f"data: {json.dumps(close_payload)}\n\n"
117
- think_open = False
118
- # Update phase_cur AFTER handling the transition
119
  phase_cur = new_phase
120
 
121
- # Step 3: Process the delta content of THIS chunk based on its effective phase
 
 
 
122
  text_to_yield = ""
123
- if effective_phase == "thinking":
124
- if settings.SHOW_THINK_TAGS:
125
  if not think_open:
126
  open_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]}
127
  yield f"data: {json.dumps(open_payload)}\n\n"
128
  think_open = True
129
  text_to_yield = self._clean_thinking(delta)
130
-
131
- elif effective_phase == "answer":
132
  text_to_yield = self._clean_answer(delta)
133
-
134
- # Step 4: Yield the processed content if it exists
135
  if text_to_yield:
136
- chunk_payload = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": text_to_yield}, "finish_reason": None}],}
137
- yield f"data: {json.dumps(chunk_payload)}\n\n"
138
- # --- FIX END ---
139
  except httpx.RequestError as e:
140
- # ... (error handling code remains the same)
141
  if ck: await cookie_manager.mark_cookie_invalid(ck)
142
- logger.error(f"Request error: {e}")
143
- err_msg = f"Connection error: {e}"
144
  err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": err_msg}, "finish_reason": "stop"}]}
145
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
146
- except Exception as e:
147
- logger.exception(f"Unexpected error in stream_proxy_response")
148
- err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": f"Internal error in stream"}, "finish_reason": "stop"}]}
149
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
150
 
151
-
152
- # ---------- non-stream ----------
153
  async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
154
  ck = None
155
  try:
156
  body, headers, ck = await self._prep_upstream(req)
157
- think_buf, answer_buf = [], []
 
158
  phase_cur = None
159
 
160
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
161
  if resp.status_code != 200:
162
- await cookie_manager.mark_cookie_invalid(ck)
163
- error_detail = await resp.text()
164
  raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
165
 
166
  async for raw in resp.aiter_text():
167
  for line in raw.strip().split('\n'):
168
  line = line.strip()
169
  if not line or not line.startswith('data: '): continue
170
-
171
  payload_str = line[6:]
172
  if payload_str == '[DONE]': break
173
-
174
  try:
175
- parsed = json.loads(payload_str)
176
- dat = parsed.get("data", {})
177
- except json.JSONDecodeError:
178
- continue
179
 
180
- # --- FIX START: REVISED LOGIC ---
181
- delta = dat.get("delta_content", "")
182
  new_phase = dat.get("phase")
183
-
184
- # Step 1: Update the current phase if a new one is provided
185
  if new_phase:
186
  phase_cur = new_phase
187
 
188
- # Step 2: Append the delta to the correct buffer based on the updated phase
189
- if not delta: continue
190
-
191
- if phase_cur == "thinking":
192
- think_buf.append(delta)
193
- elif phase_cur == "answer":
194
- answer_buf.append(delta)
195
- # --- FIX END ---
196
- else: # continue for inner loop
197
- continue
198
- break # break for outer loop
199
 
200
- raw_answer = ''.join(answer_buf)
201
- ans_text = self._clean_answer(raw_answer)
202
-
 
 
 
 
 
 
 
203
  final_content = ans_text
 
204
  if settings.SHOW_THINK_TAGS and think_buf:
205
- raw_thinking = ''.join(think_buf)
206
- think_text = self._clean_thinking(raw_thinking)
207
  if think_text:
208
- final_content = f"<think>{think_text}</think>\n{ans_text}"
 
 
209
 
210
  return ChatCompletionResponse(
211
  id=f"chatcmpl-{uuid.uuid4().hex[:29]}", created=int(time.time()), model=req.model,
212
  choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": "stop"}],
213
  )
214
  except httpx.RequestError as e:
215
- # ... (error handling code remains the same)
216
  if ck: await cookie_manager.mark_cookie_invalid(ck)
217
- logger.error(f"Non-stream request error: {e}")
218
- raise HTTPException(502, f"Connection error: {e}")
219
- except Exception as e:
220
- logger.exception(f"Non-stream unexpected error")
221
- raise HTTPException(500, "Internal server error")
222
-
223
 
224
- # ---------- FastAPI entry ----------
225
  async def handle_chat_completion(self, req: ChatCompletionRequest):
226
  stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM
227
  if stream:
228
- return StreamingResponse(
229
- self.stream_proxy_response(req),
230
- media_type="text/event-stream",
231
- headers={"Cache-Control": "no-cache", "Connection": "keep-alive"},
232
- )
233
  return await self.non_stream_proxy_response(req)
 
8
  from fastapi import HTTPException
9
  from fastapi.responses import StreamingResponse
10
 
 
11
  from config import settings
12
  from cookie_manager import cookie_manager
13
  from models import ChatCompletionRequest, ChatCompletionResponse
 
27
  if not self.client.is_closed:
28
  await self.client.aclose()
29
 
30
+ # ---------- text utilities (REVISED) ----------
 
 
 
 
 
 
 
31
  def _clean_thinking(self, s: str) -> str:
32
+ # 只做最核心的清理,暫時移除 .strip() 以觀察影響
33
+ if not s: return ""
34
+ # 移除可能包含敏感信息的 details/summary 塊
35
  s = re.sub(r'<details[^>]*>.*?</details>', '', s, flags=re.DOTALL)
36
  s = re.sub(r'<summary[^>]*>.*?</summary>', '', s, flags=re.DOTALL)
37
+ # 移除其他HTML標籤,保留內容
38
  s = re.sub(r'<[^>]+>', '', s)
39
+ # 移除行首的markdown引用符號
40
  s = re.sub(r'^\s*>\s*', '', s, flags=re.MULTILINE)
41
+ return s # FIX: Removed .strip()
42
+
43
  def _clean_answer(self, s: str) -> str:
44
+ # 只做最核心的清理,即移除 details 塊,保留所有原始間距
45
+ if not s: return ""
46
  return re.sub(r"<details[^>]*>.*?</details>", "", s, flags=re.DOTALL)
47
+
48
+ # _serialize_msgs and _prep_upstream remain the same
49
  def _serialize_msgs(self, msgs) -> list:
50
  out = []
51
  for m in msgs:
 
62
  headers = { "Content-Type": "application/json", "Authorization": f"Bearer {ck}", "User-Agent": ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"), "Accept": "application/json, text/event-stream", "Accept-Language": "zh-CN", "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "x-fe-version": "prod-fe-1.0.53", "Origin": "https://chat.z.ai", "Referer": "https://chat.z.ai/",}
63
  return body, headers, ck
64
 
65
+ # ---------- stream (REVISED) ----------
 
66
  async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
67
  ck = None
68
  try:
 
74
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
75
  if resp.status_code != 200:
76
  await cookie_manager.mark_cookie_invalid(ck)
77
+ err_body = await resp.aread(); err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}"
 
 
78
  err = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],}
79
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
80
 
 
82
  for line in raw.strip().split('\n'):
83
  line = line.strip()
84
  if not line or not line.startswith('data: '): continue
 
85
  payload_str = line[6:]
86
  if payload_str == '[DONE]':
87
  if think_open:
88
+ payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]}
89
+ yield f"data: {json.dumps(payload)}\n\n"
90
+ payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]}
91
+ yield f"data: {json.dumps(payload)}\n\n"; yield "data: [DONE]\n\n"; return
 
 
92
  try:
93
+ dat = json.loads(payload_str).get("data", {})
94
+ except (json.JSONDecodeError, AttributeError):
 
95
  continue
96
+
97
+ # --- Final Revised Logic ---
98
  delta = dat.get("delta_content", "")
99
  new_phase = dat.get("phase")
100
 
101
+ # 1. Detect phase transition
102
+ is_transition = new_phase and new_phase != phase_cur
103
+ if is_transition:
104
+ # Close the <think> tag if we are moving from thinking to answer
105
+ if phase_cur == "thinking" and new_phase == "answer" and think_open and settings.SHOW_THINK_TAGS:
106
+ close_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]}
107
+ yield f"data: {json.dumps(close_payload)}\n\n" # FIX: No longer adding '\n' here
108
+ think_open = False
 
 
 
109
  phase_cur = new_phase
110
 
111
+ # 2. Determine the phase for the current delta
112
+ current_content_phase = phase_cur or new_phase
113
+
114
+ # 3. Process and yield content based on its phase
115
  text_to_yield = ""
116
+ if current_content_phase == "thinking":
117
+ if delta and settings.SHOW_THINK_TAGS:
118
  if not think_open:
119
  open_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]}
120
  yield f"data: {json.dumps(open_payload)}\n\n"
121
  think_open = True
122
  text_to_yield = self._clean_thinking(delta)
123
+ elif current_content_phase == "answer":
 
124
  text_to_yield = self._clean_answer(delta)
125
+
 
126
  if text_to_yield:
127
+ content_payload = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": text_to_yield}, "finish_reason": None}],}
128
+ yield f"data: {json.dumps(content_payload)}\n\n"
 
129
  except httpx.RequestError as e:
 
130
  if ck: await cookie_manager.mark_cookie_invalid(ck)
131
+ logger.error(f"Request error: {e}"); err_msg = f"Connection error: {e}"
 
132
  err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": err_msg}, "finish_reason": "stop"}]}
133
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
134
+ except Exception:
135
+ logger.exception("Unexpected error in stream_proxy_response")
136
+ err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": "Internal error in stream"}, "finish_reason": "stop"}]}
137
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
138
 
139
+ # ---------- non-stream (REVISED) ----------
 
140
  async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
141
  ck = None
142
  try:
143
  body, headers, ck = await self._prep_upstream(req)
144
+ # Use a list of tuples to store (phase, content) to preserve order and context
145
+ full_content = []
146
  phase_cur = None
147
 
148
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
149
  if resp.status_code != 200:
150
+ await cookie_manager.mark_cookie_invalid(ck); error_detail = await resp.text()
 
151
  raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
152
 
153
  async for raw in resp.aiter_text():
154
  for line in raw.strip().split('\n'):
155
  line = line.strip()
156
  if not line or not line.startswith('data: '): continue
 
157
  payload_str = line[6:]
158
  if payload_str == '[DONE]': break
 
159
  try:
160
+ dat = json.loads(payload_str).get("data", {})
161
+ except (json.JSONDecodeError, AttributeError): continue
 
 
162
 
163
+ delta = dat.get("delta_content")
 
164
  new_phase = dat.get("phase")
165
+
 
166
  if new_phase:
167
  phase_cur = new_phase
168
 
169
+ if delta and phase_cur:
170
+ full_content.append((phase_cur, delta))
171
+ else: continue
172
+ break
 
 
 
 
 
 
 
173
 
174
+ # Post-processing after collecting all chunks
175
+ think_buf = []
176
+ answer_buf = []
177
+ for phase, content in full_content:
178
+ if phase == "thinking":
179
+ think_buf.append(self._clean_thinking(content))
180
+ elif phase == "answer":
181
+ answer_buf.append(self._clean_answer(content))
182
+
183
+ ans_text = ''.join(answer_buf)
184
  final_content = ans_text
185
+
186
  if settings.SHOW_THINK_TAGS and think_buf:
187
+ think_text = ''.join(think_buf).strip() # .strip() here is safe
 
188
  if think_text:
189
+ # Manually add a newline if the answer doesn't start with one
190
+ newline = "\n" if ans_text and not ans_text.startswith(('\n', '\r')) else ""
191
+ final_content = f"<think>{think_text}</think>{newline}{ans_text}"
192
 
193
  return ChatCompletionResponse(
194
  id=f"chatcmpl-{uuid.uuid4().hex[:29]}", created=int(time.time()), model=req.model,
195
  choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": "stop"}],
196
  )
197
  except httpx.RequestError as e:
 
198
  if ck: await cookie_manager.mark_cookie_invalid(ck)
199
+ logger.error(f"Non-stream request error: {e}"); raise HTTPException(502, f"Connection error: {e}")
200
+ except Exception:
201
+ logger.exception("Non-stream unexpected error"); raise HTTPException(500, "Internal server error")
 
 
 
202
 
203
+ # FastAPI entry point remains the same
204
  async def handle_chat_completion(self, req: ChatCompletionRequest):
205
  stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM
206
  if stream:
207
+ return StreamingResponse(self.stream_proxy_response(req), media_type="text/event-stream",
208
+ headers={"Cache-Control": "no-cache", "Connection": "keep-alive"})
 
 
 
209
  return await self.non_stream_proxy_response(req)