bluewinliang commited on
Commit
c24f65c
·
verified ·
1 Parent(s): d64c814

Upload proxy_handler.py

Browse files
Files changed (1) hide show
  1. proxy_handler.py +344 -40
proxy_handler.py CHANGED
@@ -24,7 +24,12 @@ logger = logging.getLogger(__name__)
24
 
25
  class ProxyHandler:
26
  def __init__(self):
27
- self.client = httpx.AsyncClient(timeout=60.0)
 
 
 
 
 
28
 
29
  async def __aenter__(self):
30
  return self
@@ -161,22 +166,34 @@ class ProxyHandler:
161
  }
162
 
163
  try:
164
- response = await self.client.post(
165
- settings.UPSTREAM_URL, json=request_data, headers=headers
166
- )
167
-
168
- if response.status_code == 401:
169
- await cookie_manager.mark_cookie_failed(cookie)
170
- raise HTTPException(status_code=401, detail="Invalid authentication")
171
-
172
- if response.status_code != 200:
173
- raise HTTPException(
174
- status_code=response.status_code,
175
- detail=f"Upstream error: {response.text}",
176
- )
177
-
178
- await cookie_manager.mark_cookie_success(cookie)
179
- return {"response": response, "cookie": cookie}
 
 
 
 
 
 
 
 
 
 
 
 
180
 
181
  except httpx.RequestError as e:
182
  logger.error(f"Request error: {e}")
@@ -191,16 +208,21 @@ class ProxyHandler:
191
  async def process_streaming_response(
192
  self, response: httpx.Response
193
  ) -> AsyncGenerator[Dict[str, Any], None]:
194
- """Process streaming response from Z.AI"""
195
  buffer = ""
196
 
197
- async for chunk in response.aiter_text():
 
 
 
 
198
  buffer += chunk
199
- lines = buffer.split("\n")
200
- buffer = lines[-1] # Keep incomplete line in buffer
201
 
202
- for line in lines[:-1]:
 
 
203
  line = line.strip()
 
204
  if not line.startswith("data: "):
205
  continue
206
 
@@ -223,34 +245,30 @@ class ProxyHandler:
223
  status_code=400, detail=f"Upstream error: {error_detail}"
224
  )
225
 
226
- # Transform the response
227
  if parsed.get("data"):
228
- # Remove unwanted fields
229
  parsed["data"].pop("edit_index", None)
230
  parsed["data"].pop("edit_content", None)
231
 
232
- # Note: We don't transform delta_content here because <think> tags
233
- # might span multiple chunks. We'll transform the final aggregated content.
234
-
235
  yield parsed
236
 
237
- except json.JSONDecodeError:
 
238
  continue # Skip non-JSON lines
239
 
240
  async def handle_chat_completion(self, request: ChatCompletionRequest):
241
  """Handle chat completion request"""
242
- proxy_result = await self.proxy_request(request)
243
- response = proxy_result["response"]
244
-
245
  # Determine final streaming mode
246
  is_streaming = (
247
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
248
  )
249
 
250
  if is_streaming:
251
- # For streaming responses, SHOW_THINK_TAGS setting is ignored
252
  return StreamingResponse(
253
- self.stream_response(response, request.model),
254
  media_type="text/event-stream",
255
  headers={
256
  "Cache-Control": "no-cache",
@@ -258,13 +276,125 @@ class ProxyHandler:
258
  },
259
  )
260
  else:
261
- # For non-streaming responses, SHOW_THINK_TAGS setting applies
262
- return await self.non_stream_response(response, request.model)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263
 
264
- """Generate streaming response"""
265
- async for parsed in self.process_streaming_response(response):
266
- yield f"data: {json.dumps(parsed)}\n\n"
267
- yield "data: [DONE]\n\n"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
268
 
269
  async def non_stream_response(
270
  self, response: httpx.Response, model: str
@@ -318,4 +448,178 @@ class ProxyHandler:
318
  "finish_reason": "stop",
319
  }
320
  ],
321
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
 
25
  class ProxyHandler:
26
  def __init__(self):
27
+ # Configure httpx client for streaming support
28
+ self.client = httpx.AsyncClient(
29
+ timeout=httpx.Timeout(60.0, read=300.0), # Longer read timeout for streaming
30
+ limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
31
+ http2=True # Enable HTTP/2 for better streaming performance
32
+ )
33
 
34
  async def __aenter__(self):
35
  return self
 
166
  }
167
 
168
  try:
169
+ # Use client.stream() for TRUE streaming response - this is the key fix!
170
+ async with self.client.stream(
171
+ "POST",
172
+ settings.UPSTREAM_URL,
173
+ json=request_data,
174
+ headers=headers,
175
+ timeout=httpx.Timeout(60.0, read=300.0)
176
+ ) as response:
177
+
178
+ if response.status_code == 401:
179
+ await cookie_manager.mark_cookie_failed(cookie)
180
+ raise HTTPException(status_code=401, detail="Invalid authentication")
181
+
182
+ if response.status_code != 200:
183
+ # For streaming, we need to read the error response properly
184
+ try:
185
+ error_text = await response.aread()
186
+ error_detail = error_text.decode('utf-8')
187
+ except:
188
+ error_detail = f"HTTP {response.status_code}"
189
+
190
+ raise HTTPException(
191
+ status_code=response.status_code,
192
+ detail=f"Upstream error: {error_detail}",
193
+ )
194
+
195
+ await cookie_manager.mark_cookie_success(cookie)
196
+ return {"response": response, "cookie": cookie}
197
 
198
  except httpx.RequestError as e:
199
  logger.error(f"Request error: {e}")
 
208
  async def process_streaming_response(
209
  self, response: httpx.Response
210
  ) -> AsyncGenerator[Dict[str, Any], None]:
211
+ """Process streaming response from Z.AI - TRUE real-time processing"""
212
  buffer = ""
213
 
214
+ # Use aiter_text with small chunk size for real-time processing
215
+ async for chunk in response.aiter_text(chunk_size=1024): # Small chunks for responsiveness
216
+ if not chunk:
217
+ continue
218
+
219
  buffer += chunk
 
 
220
 
221
+ # Process complete lines immediately
222
+ while "\n" in buffer:
223
+ line, buffer = buffer.split("\n", 1)
224
  line = line.strip()
225
+
226
  if not line.startswith("data: "):
227
  continue
228
 
 
245
  status_code=400, detail=f"Upstream error: {error_detail}"
246
  )
247
 
248
+ # Clean up response data
249
  if parsed.get("data"):
250
+ # Remove unwanted fields for cleaner processing
251
  parsed["data"].pop("edit_index", None)
252
  parsed["data"].pop("edit_content", None)
253
 
254
+ # Yield immediately for real-time streaming
 
 
255
  yield parsed
256
 
257
+ except json.JSONDecodeError as e:
258
+ logger.debug(f"JSON decode error (skipping): {e}")
259
  continue # Skip non-JSON lines
260
 
261
  async def handle_chat_completion(self, request: ChatCompletionRequest):
262
  """Handle chat completion request"""
 
 
 
263
  # Determine final streaming mode
264
  is_streaming = (
265
  request.stream if request.stream is not None else settings.DEFAULT_STREAM
266
  )
267
 
268
  if is_streaming:
269
+ # For streaming responses, use direct streaming proxy
270
  return StreamingResponse(
271
+ self.stream_proxy_response(request),
272
  media_type="text/event-stream",
273
  headers={
274
  "Cache-Control": "no-cache",
 
276
  },
277
  )
278
  else:
279
+ # For non-streaming responses, collect all streaming data first
280
+ chunks = []
281
+ async for chunk_data in self.stream_proxy_response(request):
282
+ if chunk_data.startswith("data: ") and not chunk_data.startswith("data: [DONE]"):
283
+ try:
284
+ chunk_json = json.loads(chunk_data[6:])
285
+ if chunk_json.get("choices", [{}])[0].get("delta", {}).get("content"):
286
+ chunks.append(chunk_json["choices"][0]["delta"]["content"])
287
+ except:
288
+ continue
289
+
290
+ # Combine all content
291
+ full_content = "".join(chunks)
292
+
293
+ # Return as non-streaming response
294
+ import time
295
+ import uuid
296
+ return ChatCompletionResponse(
297
+ id=f"chatcmpl-{uuid.uuid4().hex[:29]}",
298
+ created=int(time.time()),
299
+ model=request.model,
300
+ choices=[
301
+ {
302
+ "index": 0,
303
+ "message": {"role": "assistant", "content": full_content},
304
+ "finish_reason": "stop",
305
+ }
306
+ ],
307
+ )
308
 
309
+ async def stream_response(self, response: httpx.Response, model: str) -> AsyncGenerator[str, None]:
310
+ """Generate TRUE streaming response in OpenAI format - real-time processing"""
311
+ import uuid
312
+ import time
313
+
314
+ # Generate a unique completion ID
315
+ completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
316
+ current_phase = None
317
+
318
+ try:
319
+ # Real-time streaming: process each chunk immediately as it arrives
320
+ async for parsed in self.process_streaming_response(response):
321
+ try:
322
+ data = parsed.get("data", {})
323
+ delta_content = data.get("delta_content", "")
324
+ phase = data.get("phase", "")
325
+
326
+ # Track phase changes
327
+ if phase != current_phase:
328
+ current_phase = phase
329
+ logger.debug(f"Phase changed to: {phase}")
330
+
331
+ # Apply filtering based on SHOW_THINK_TAGS and phase
332
+ should_send_content = True
333
+
334
+ if not settings.SHOW_THINK_TAGS and phase == "thinking":
335
+ # Skip thinking content when SHOW_THINK_TAGS=false
336
+ should_send_content = False
337
+ logger.debug(f"Skipping thinking content (SHOW_THINK_TAGS=false)")
338
+
339
+ # Process and send content immediately if we should
340
+ if delta_content and should_send_content:
341
+ # Minimal transformation for real-time streaming
342
+ transformed_delta = delta_content
343
+
344
+ if settings.SHOW_THINK_TAGS:
345
+ # Simple tag replacement for streaming
346
+ transformed_delta = re.sub(r'<details[^>]*>', '<think>', transformed_delta)
347
+ transformed_delta = transformed_delta.replace('</details>', '</think>')
348
+ # Note: Skip complex regex for streaming performance
349
+
350
+ # Create and send OpenAI-compatible chunk immediately
351
+ openai_chunk = {
352
+ "id": completion_id,
353
+ "object": "chat.completion.chunk",
354
+ "created": int(time.time()),
355
+ "model": model,
356
+ "choices": [{
357
+ "index": 0,
358
+ "delta": {
359
+ "content": transformed_delta
360
+ },
361
+ "finish_reason": None
362
+ }]
363
+ }
364
+
365
+ # Yield immediately for real-time streaming
366
+ yield f"data: {json.dumps(openai_chunk)}\n\n"
367
+
368
+ except Exception as e:
369
+ logger.error(f"Error processing streaming chunk: {e}")
370
+ continue
371
+
372
+ # Send final completion chunk
373
+ final_chunk = {
374
+ "id": completion_id,
375
+ "object": "chat.completion.chunk",
376
+ "created": int(time.time()),
377
+ "model": model,
378
+ "choices": [{
379
+ "index": 0,
380
+ "delta": {},
381
+ "finish_reason": "stop"
382
+ }]
383
+ }
384
+
385
+ yield f"data: {json.dumps(final_chunk)}\n\n"
386
+ yield "data: [DONE]\n\n"
387
+
388
+ except Exception as e:
389
+ logger.error(f"Streaming error: {e}")
390
+ # Send error in OpenAI format
391
+ error_chunk = {
392
+ "error": {
393
+ "message": str(e),
394
+ "type": "server_error"
395
+ }
396
+ }
397
+ yield f"data: {json.dumps(error_chunk)}\n\n"
398
 
399
  async def non_stream_response(
400
  self, response: httpx.Response, model: str
 
448
  "finish_reason": "stop",
449
  }
450
  ],
451
+ )
452
+
453
+ async def stream_proxy_response(self, request: ChatCompletionRequest) -> AsyncGenerator[str, None]:
454
+ """TRUE streaming proxy - direct pass-through with minimal processing"""
455
+ import uuid
456
+ import time
457
+
458
+ # Get cookie
459
+ cookie = await cookie_manager.get_next_cookie()
460
+ if not cookie:
461
+ raise HTTPException(status_code=503, detail="No valid authentication available")
462
+
463
+ # Prepare request data
464
+ request_data = request.model_dump(exclude_none=True)
465
+ target_model = "0727-360B-API" # Map GLM-4.5 to Z.AI model
466
+
467
+ # Build Z.AI request format
468
+ request_data = {
469
+ "stream": True, # Always request streaming from Z.AI
470
+ "model": target_model,
471
+ "messages": request_data["messages"],
472
+ "background_tasks": {
473
+ "title_generation": True,
474
+ "tags_generation": True
475
+ },
476
+ "chat_id": str(uuid.uuid4()),
477
+ "features": {
478
+ "image_generation": False,
479
+ "code_interpreter": False,
480
+ "web_search": False,
481
+ "auto_web_search": False
482
+ },
483
+ "id": str(uuid.uuid4()),
484
+ "mcp_servers": ["deep-web-search"],
485
+ "model_item": {
486
+ "id": target_model,
487
+ "name": "GLM-4.5",
488
+ "owned_by": "openai"
489
+ },
490
+ "params": {},
491
+ "tool_servers": [],
492
+ "variables": {
493
+ "{{USER_NAME}}": "User",
494
+ "{{USER_LOCATION}}": "Unknown",
495
+ "{{CURRENT_DATETIME}}": "2025-08-04 16:46:56"
496
+ }
497
+ }
498
+
499
+ headers = {
500
+ "Content-Type": "application/json",
501
+ "Authorization": f"Bearer {cookie}",
502
+ "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36",
503
+ "Accept": "application/json, text/event-stream",
504
+ "Accept-Language": "zh-CN",
505
+ "sec-ch-ua": '"Not)A;Brand";v="8", "Chromium";v="138", "Google Chrome";v="138"',
506
+ "sec-ch-ua-mobile": "?0",
507
+ "sec-ch-ua-platform": '"macOS"',
508
+ "x-fe-version": "prod-fe-1.0.53",
509
+ "Origin": "https://chat.z.ai",
510
+ "Referer": "https://chat.z.ai/c/069723d5-060b-404f-992c-4705f1554c4c",
511
+ }
512
+
513
+ completion_id = f"chatcmpl-{uuid.uuid4().hex[:29]}"
514
+ current_phase = None
515
+
516
+ try:
517
+ # Create a new client for this streaming request to avoid conflicts
518
+ async with httpx.AsyncClient(
519
+ timeout=httpx.Timeout(60.0, read=300.0),
520
+ limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
521
+ http2=True
522
+ ) as stream_client:
523
+ async with stream_client.stream(
524
+ "POST",
525
+ settings.UPSTREAM_URL,
526
+ json=request_data,
527
+ headers=headers
528
+ ) as response:
529
+
530
+ if response.status_code == 401:
531
+ await cookie_manager.mark_cookie_failed(cookie)
532
+ raise HTTPException(status_code=401, detail="Invalid authentication")
533
+
534
+ if response.status_code != 200:
535
+ await cookie_manager.mark_cookie_failed(cookie)
536
+ raise HTTPException(status_code=response.status_code, detail="Upstream error")
537
+
538
+ await cookie_manager.mark_cookie_success(cookie)
539
+
540
+ # Process streaming response in real-time
541
+ buffer = ""
542
+ async for chunk in response.aiter_text(chunk_size=1024):
543
+ if not chunk:
544
+ continue
545
+
546
+ buffer += chunk
547
+
548
+ # Process complete lines immediately
549
+ while "\n" in buffer:
550
+ line, buffer = buffer.split("\n", 1)
551
+ line = line.strip()
552
+
553
+ if not line.startswith("data: "):
554
+ continue
555
+
556
+ payload = line[6:].strip()
557
+ if payload == "[DONE]":
558
+ # Send final chunk and done
559
+ final_chunk = {
560
+ "id": completion_id,
561
+ "object": "chat.completion.chunk",
562
+ "created": int(time.time()),
563
+ "model": request.model,
564
+ "choices": [{
565
+ "index": 0,
566
+ "delta": {},
567
+ "finish_reason": "stop"
568
+ }]
569
+ }
570
+ yield f"data: {json.dumps(final_chunk)}\n\n"
571
+ yield "data: [DONE]\n\n"
572
+ return
573
+
574
+ try:
575
+ parsed = json.loads(payload)
576
+ data = parsed.get("data", {})
577
+ delta_content = data.get("delta_content", "")
578
+ phase = data.get("phase", "")
579
+
580
+ # Track phase changes
581
+ if phase != current_phase:
582
+ current_phase = phase
583
+ logger.debug(f"Phase changed to: {phase}")
584
+
585
+ # Apply filtering based on SHOW_THINK_TAGS and phase
586
+ should_send_content = True
587
+
588
+ if not settings.SHOW_THINK_TAGS and phase == "thinking":
589
+ should_send_content = False
590
+
591
+ # Process and send content immediately if we should
592
+ if delta_content and should_send_content:
593
+ # Minimal transformation for real-time streaming
594
+ transformed_delta = delta_content
595
+
596
+ if settings.SHOW_THINK_TAGS:
597
+ # Simple tag replacement for streaming
598
+ transformed_delta = re.sub(r'<details[^>]*>', '<think>', transformed_delta)
599
+ transformed_delta = transformed_delta.replace('</details>', '</think>')
600
+
601
+ # Create and send OpenAI-compatible chunk immediately
602
+ openai_chunk = {
603
+ "id": completion_id,
604
+ "object": "chat.completion.chunk",
605
+ "created": int(time.time()),
606
+ "model": request.model,
607
+ "choices": [{
608
+ "index": 0,
609
+ "delta": {
610
+ "content": transformed_delta
611
+ },
612
+ "finish_reason": None
613
+ }]
614
+ }
615
+
616
+ # Yield immediately for real-time streaming
617
+ yield f"data: {json.dumps(openai_chunk)}\n\n"
618
+
619
+ except json.JSONDecodeError:
620
+ continue # Skip non-JSON lines
621
+
622
+ except httpx.RequestError as e:
623
+ logger.error(f"Streaming request error: {e}")
624
+ await cookie_manager.mark_cookie_failed(cookie)
625
+ raise HTTPException(status_code=503, detail=f"Upstream service unavailable: {str(e)}")