bluewinliang commited on
Commit
6b2e3a0
·
verified ·
1 Parent(s): 5ac14df

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +66 -82
proxy_handler.py CHANGED
@@ -25,21 +25,17 @@ class ProxyHandler:
25
  )
26
 
27
  async def aclose(self):
28
- """Closes the underlying httpx client."""
29
  if not self.client.is_closed:
30
  await self.client.aclose()
31
 
32
- # ---------- text utilities ----------
33
  def _balance_think_tag(self, txt: str) -> str:
34
  opens = len(re.findall(r"<think>", txt))
35
  closes = len(re.findall(r"</think>", txt))
36
- if opens > closes:
37
- txt += "</think>" * (opens - closes)
38
  elif closes > opens:
39
- for _ in range(closes - opens):
40
- txt = re.sub(r"</think>(?!.*</think>)", "", txt, 1)
41
  return txt
42
-
43
  def _clean_thinking(self, s: str) -> str:
44
  if not s: return s
45
  s = re.sub(r'<details[^>]*>.*?</details>', '', s, flags=re.DOTALL)
@@ -47,11 +43,9 @@ class ProxyHandler:
47
  s = re.sub(r'<[^>]+>', '', s)
48
  s = re.sub(r'^\s*>\s*', '', s, flags=re.MULTILINE)
49
  return s.strip()
50
-
51
  def _clean_answer(self, s: str) -> str:
52
  if not s: return s
53
  return re.sub(r"<details[^>]*>.*?</details>", "", s, flags=re.DOTALL)
54
-
55
  def _serialize_msgs(self, msgs) -> list:
56
  out = []
57
  for m in msgs:
@@ -60,31 +54,15 @@ class ProxyHandler:
60
  elif isinstance(m, dict): out.append(m)
61
  else: out.append({"role": getattr(m, "role", "user"), "content": getattr(m, "content", str(m))})
62
  return out
63
-
64
- # ---------- upstream ----------
65
  async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str]:
66
  ck = await cookie_manager.get_next_cookie()
67
  if not ck: raise HTTPException(503, "No available cookies")
68
-
69
  model = settings.UPSTREAM_MODEL if req.model == settings.MODEL_NAME else req.model
70
- body = {
71
- "stream": True, "model": model, "messages": self._serialize_msgs(req.messages),
72
- "background_tasks": {"title_generation": True, "tags_generation": True}, "chat_id": str(uuid.uuid4()),
73
- "features": {"image_generation": False, "code_interpreter": False, "web_search": False, "auto_web_search": False, "enable_thinking": True,},
74
- "id": str(uuid.uuid4()), "mcp_servers": ["deep-web-search"],
75
- "model_item": {"id": model, "name": "GLM-4.5", "owned_by": "openai"}, "params": {}, "tool_servers": [],
76
- "variables": {"{{USER_NAME}}": "User", "{{USER_LOCATION}}": "Unknown", "{{CURRENT_DATETIME}}": time.strftime("%Y-%m-%d %H:%M:%S"),},
77
- }
78
- headers = {
79
- "Content-Type": "application/json", "Authorization": f"Bearer {ck}",
80
- "User-Agent": ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"),
81
- "Accept": "application/json, text/event-stream", "Accept-Language": "zh-CN",
82
- "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
83
- "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "x-fe-version": "prod-fe-1.0.53",
84
- "Origin": "https://chat.z.ai", "Referer": "https://chat.z.ai/",
85
- }
86
  return body, headers, ck
87
 
 
88
  # ---------- stream ----------
89
  async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
90
  ck = None
@@ -97,73 +75,69 @@ class ProxyHandler:
97
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
98
  if resp.status_code != 200:
99
  await cookie_manager.mark_cookie_invalid(ck)
 
100
  err_body = await resp.aread()
101
  err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}"
102
- err = {
103
- "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model,
104
- "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],
105
- }
106
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
107
 
108
  async for raw in resp.aiter_text():
109
- if not raw or raw.isspace(): continue
110
- for line in raw.split('\n'):
111
  line = line.strip()
112
  if not line or not line.startswith('data: '): continue
113
 
114
- payload = line[6:]
115
- if payload == '[DONE]':
116
  if think_open:
117
- close_think_chunk = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]}
118
- yield f"data: {json.dumps(close_think_chunk)}\n\n"
119
-
120
- final_chunk = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]}
121
- yield f"data: {json.dumps(final_chunk)}\n\n"
122
  yield "data: [DONE]\n\n"; return
123
 
124
  try:
125
- parsed = json.loads(payload)
 
126
  except json.JSONDecodeError:
127
  continue
128
-
129
- dat = parsed.get("data", {})
130
- delta, new_phase = dat.get("delta_content", ""), dat.get("phase")
131
 
 
 
 
 
 
 
 
 
132
  if new_phase and new_phase != phase_cur:
133
- if phase_cur == "thinking" and new_phase == "answer" and think_open and settings.SHOW_THINK_TAGS:
134
- # --- FIX START ---
135
- # 將包含反斜線的字典先創建好,再放入 f-string
136
- close_payload = {
137
- 'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()),
138
- 'model': req.model,
139
- 'choices': [{'index': 0, 'delta': {'content': '</think>\n'}, 'finish_reason': None}]
140
- }
141
- yield f"data: {json.dumps(close_payload)}\n\n"
142
- # --- FIX END ---
143
- think_open = False
144
  phase_cur = new_phase
145
 
146
- if not delta: continue
147
-
148
  text_to_yield = ""
149
- if phase_cur == "thinking":
150
  if settings.SHOW_THINK_TAGS:
151
  if not think_open:
152
- open_think_chunk = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]}
153
- yield f"data: {json.dumps(open_think_chunk)}\n\n"
154
  think_open = True
155
  text_to_yield = self._clean_thinking(delta)
156
- elif phase_cur == "answer":
157
- text_to_yield = self._clean_answer(delta)
158
 
 
 
 
 
159
  if text_to_yield:
160
- out_chunk = {
161
- "id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model,
162
- "choices": [{"index": 0, "delta": {"content": text_to_yield}, "finish_reason": None}],
163
- }
164
- yield f"data: {json.dumps(out_chunk)}\n\n"
165
-
166
  except httpx.RequestError as e:
 
167
  if ck: await cookie_manager.mark_cookie_invalid(ck)
168
  logger.error(f"Request error: {e}")
169
  err_msg = f"Connection error: {e}"
@@ -174,13 +148,13 @@ class ProxyHandler:
174
  err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": f"Internal error in stream"}, "finish_reason": "stop"}]}
175
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
176
 
 
177
  # ---------- non-stream ----------
178
  async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
179
  ck = None
180
  try:
181
  body, headers, ck = await self._prep_upstream(req)
182
  think_buf, answer_buf = [], []
183
-
184
  phase_cur = None
185
 
186
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
@@ -190,30 +164,38 @@ class ProxyHandler:
190
  raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
191
 
192
  async for raw in resp.aiter_text():
193
- if not raw or raw.isspace(): continue
194
- for line in raw.split('\n'):
195
  line = line.strip()
196
  if not line or not line.startswith('data: '): continue
197
- payload = line[6:]
198
- if payload == '[DONE]': break
199
 
200
- try: parsed = json.loads(payload)
201
- except json.JSONDecodeError: continue
202
 
203
- dat = parsed.get("data", {})
204
- delta, new_phase = dat.get("delta_content", ""), dat.get("phase")
 
 
 
205
 
 
 
 
 
 
206
  if new_phase:
207
  phase_cur = new_phase
208
 
 
209
  if not delta: continue
210
 
211
- if phase_cur == "thinking":
212
  think_buf.append(delta)
213
- elif phase_cur == "answer":
214
  answer_buf.append(delta)
215
- else:
216
- pass
 
 
217
 
218
  raw_answer = ''.join(answer_buf)
219
  ans_text = self._clean_answer(raw_answer)
@@ -230,6 +212,7 @@ class ProxyHandler:
230
  choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": "stop"}],
231
  )
232
  except httpx.RequestError as e:
 
233
  if ck: await cookie_manager.mark_cookie_invalid(ck)
234
  logger.error(f"Non-stream request error: {e}")
235
  raise HTTPException(502, f"Connection error: {e}")
@@ -237,6 +220,7 @@ class ProxyHandler:
237
  logger.exception(f"Non-stream unexpected error")
238
  raise HTTPException(500, "Internal server error")
239
 
 
240
  # ---------- FastAPI entry ----------
241
  async def handle_chat_completion(self, req: ChatCompletionRequest):
242
  stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM
 
25
  )
26
 
27
  async def aclose(self):
 
28
  if not self.client.is_closed:
29
  await self.client.aclose()
30
 
31
+ # _balance_think_tag, _clean_thinking, _clean_answer, _serialize_msgs, _prep_upstream 保持不變
32
  def _balance_think_tag(self, txt: str) -> str:
33
  opens = len(re.findall(r"<think>", txt))
34
  closes = len(re.findall(r"</think>", txt))
35
+ if opens > closes: txt += "</think>" * (opens - closes)
 
36
  elif closes > opens:
37
+ for _ in range(closes - opens): txt = re.sub(r"</think>(?!.*</think>)", "", txt, 1)
 
38
  return txt
 
39
  def _clean_thinking(self, s: str) -> str:
40
  if not s: return s
41
  s = re.sub(r'<details[^>]*>.*?</details>', '', s, flags=re.DOTALL)
 
43
  s = re.sub(r'<[^>]+>', '', s)
44
  s = re.sub(r'^\s*>\s*', '', s, flags=re.MULTILINE)
45
  return s.strip()
 
46
  def _clean_answer(self, s: str) -> str:
47
  if not s: return s
48
  return re.sub(r"<details[^>]*>.*?</details>", "", s, flags=re.DOTALL)
 
49
  def _serialize_msgs(self, msgs) -> list:
50
  out = []
51
  for m in msgs:
 
54
  elif isinstance(m, dict): out.append(m)
55
  else: out.append({"role": getattr(m, "role", "user"), "content": getattr(m, "content", str(m))})
56
  return out
 
 
57
  async def _prep_upstream(self, req: ChatCompletionRequest) -> Tuple[Dict[str, Any], Dict[str, str], str]:
58
  ck = await cookie_manager.get_next_cookie()
59
  if not ck: raise HTTPException(503, "No available cookies")
 
60
  model = settings.UPSTREAM_MODEL if req.model == settings.MODEL_NAME else req.model
61
+ body = { "stream": True, "model": model, "messages": self._serialize_msgs(req.messages), "background_tasks": {"title_generation": True, "tags_generation": True}, "chat_id": str(uuid.uuid4()), "features": {"image_generation": False, "code_interpreter": False, "web_search": False, "auto_web_search": False, "enable_thinking": True,}, "id": str(uuid.uuid4()), "mcp_servers": ["deep-web-search"], "model_item": {"id": model, "name": "GLM-4.5", "owned_by": "openai"}, "params": {}, "tool_servers": [], "variables": {"{{USER_NAME}}": "User", "{{USER_LOCATION}}": "Unknown", "{{CURRENT_DATETIME}}": time.strftime("%Y-%m-%d %H:%M:%S"),},}
62
+ headers = { "Content-Type": "application/json", "Authorization": f"Bearer {ck}", "User-Agent": ("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"), "Accept": "application/json, text/event-stream", "Accept-Language": "zh-CN", "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "x-fe-version": "prod-fe-1.0.53", "Origin": "https://chat.z.ai", "Referer": "https://chat.z.ai/",}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
  return body, headers, ck
64
 
65
+
66
  # ---------- stream ----------
67
  async def stream_proxy_response(self, req: ChatCompletionRequest) -> AsyncGenerator[str, None]:
68
  ck = None
 
75
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
76
  if resp.status_code != 200:
77
  await cookie_manager.mark_cookie_invalid(ck)
78
+ # ... (error handling code remains the same)
79
  err_body = await resp.aread()
80
  err_msg = f"Error: {resp.status_code} - {err_body.decode(errors='ignore')}"
81
+ err = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": err_msg}, "finish_reason": "stop"}],}
 
 
 
82
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"; return
83
 
84
  async for raw in resp.aiter_text():
85
+ for line in raw.strip().split('\n'):
 
86
  line = line.strip()
87
  if not line or not line.startswith('data: '): continue
88
 
89
+ payload_str = line[6:]
90
+ if payload_str == '[DONE]':
91
  if think_open:
92
+ close_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>'}, 'finish_reason': None}]}
93
+ yield f"data: {json.dumps(close_payload)}\n\n"
94
+ final_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]}
95
+ yield f"data: {json.dumps(final_payload)}\n\n"
 
96
  yield "data: [DONE]\n\n"; return
97
 
98
  try:
99
+ parsed = json.loads(payload_str)
100
+ dat = parsed.get("data", {})
101
  except json.JSONDecodeError:
102
  continue
 
 
 
103
 
104
+ # --- FIX START: REVISED LOGIC ---
105
+ delta = dat.get("delta_content", "")
106
+ new_phase = dat.get("phase")
107
+
108
+ # Step 1: Determine the effective phase for THIS chunk
109
+ effective_phase = new_phase or phase_cur
110
+
111
+ # Step 2: Handle phase transition side-effects (closing tags) BEFORE processing content
112
  if new_phase and new_phase != phase_cur:
113
+ if phase_cur == "thinking" and new_phase == "answer":
114
+ if think_open and settings.SHOW_THINK_TAGS:
115
+ close_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '</think>\n'}, 'finish_reason': None}]}
116
+ yield f"data: {json.dumps(close_payload)}\n\n"
117
+ think_open = False
118
+ # Update phase_cur AFTER handling the transition
 
 
 
 
 
119
  phase_cur = new_phase
120
 
121
+ # Step 3: Process the delta content of THIS chunk based on its effective phase
 
122
  text_to_yield = ""
123
+ if effective_phase == "thinking":
124
  if settings.SHOW_THINK_TAGS:
125
  if not think_open:
126
+ open_payload = {'id': comp_id, 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': req.model, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]}
127
+ yield f"data: {json.dumps(open_payload)}\n\n"
128
  think_open = True
129
  text_to_yield = self._clean_thinking(delta)
 
 
130
 
131
+ elif effective_phase == "answer":
132
+ text_to_yield = self._clean_answer(delta)
133
+
134
+ # Step 4: Yield the processed content if it exists
135
  if text_to_yield:
136
+ chunk_payload = {"id": comp_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": req.model, "choices": [{"index": 0, "delta": {"content": text_to_yield}, "finish_reason": None}],}
137
+ yield f"data: {json.dumps(chunk_payload)}\n\n"
138
+ # --- FIX END ---
 
 
 
139
  except httpx.RequestError as e:
140
+ # ... (error handling code remains the same)
141
  if ck: await cookie_manager.mark_cookie_invalid(ck)
142
  logger.error(f"Request error: {e}")
143
  err_msg = f"Connection error: {e}"
 
148
  err = {"id": f"chatcmpl-{uuid.uuid4().hex[:29]}", "choices": [{"delta": {"content": f"Internal error in stream"}, "finish_reason": "stop"}]}
149
  yield f"data: {json.dumps(err)}\n\n"; yield "data: [DONE]\n\n"
150
 
151
+
152
  # ---------- non-stream ----------
153
  async def non_stream_proxy_response(self, req: ChatCompletionRequest) -> ChatCompletionResponse:
154
  ck = None
155
  try:
156
  body, headers, ck = await self._prep_upstream(req)
157
  think_buf, answer_buf = [], []
 
158
  phase_cur = None
159
 
160
  async with self.client.stream("POST", settings.UPSTREAM_URL, json=body, headers=headers) as resp:
 
164
  raise HTTPException(resp.status_code, f"Upstream error: {error_detail}")
165
 
166
  async for raw in resp.aiter_text():
167
+ for line in raw.strip().split('\n'):
 
168
  line = line.strip()
169
  if not line or not line.startswith('data: '): continue
 
 
170
 
171
+ payload_str = line[6:]
172
+ if payload_str == '[DONE]': break
173
 
174
+ try:
175
+ parsed = json.loads(payload_str)
176
+ dat = parsed.get("data", {})
177
+ except json.JSONDecodeError:
178
+ continue
179
 
180
+ # --- FIX START: REVISED LOGIC ---
181
+ delta = dat.get("delta_content", "")
182
+ new_phase = dat.get("phase")
183
+
184
+ # Step 1: Update the current phase if a new one is provided
185
  if new_phase:
186
  phase_cur = new_phase
187
 
188
+ # Step 2: Append the delta to the correct buffer based on the updated phase
189
  if not delta: continue
190
 
191
+ if phase_cur == "thinking":
192
  think_buf.append(delta)
193
+ elif phase_cur == "answer":
194
  answer_buf.append(delta)
195
+ # --- FIX END ---
196
+ else: # continue for inner loop
197
+ continue
198
+ break # break for outer loop
199
 
200
  raw_answer = ''.join(answer_buf)
201
  ans_text = self._clean_answer(raw_answer)
 
212
  choices=[{"index": 0, "message": {"role": "assistant", "content": final_content}, "finish_reason": "stop"}],
213
  )
214
  except httpx.RequestError as e:
215
+ # ... (error handling code remains the same)
216
  if ck: await cookie_manager.mark_cookie_invalid(ck)
217
  logger.error(f"Non-stream request error: {e}")
218
  raise HTTPException(502, f"Connection error: {e}")
 
220
  logger.exception(f"Non-stream unexpected error")
221
  raise HTTPException(500, "Internal server error")
222
 
223
+
224
  # ---------- FastAPI entry ----------
225
  async def handle_chat_completion(self, req: ChatCompletionRequest):
226
  stream = bool(req.stream) if req.stream is not None else settings.DEFAULT_STREAM