Spaces:
Sleeping
Sleeping
File size: 5,244 Bytes
d3f91ab | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | """
httpx_engine.py — 用 curl_cffi 直连 Qwen API(Chrome TLS 指纹)
优点:TLS 指纹与真实 Chrome 一致,无编码问题,支持流式早期中止
优化:使用全局连接池,避免频繁 TLS 握手
"""
import asyncio
import json
import logging
from curl_cffi.requests import AsyncSession
log = logging.getLogger("qwen2api.httpx_engine")
BASE_URL = "https://chat.qwen.ai"
_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Referer": "https://chat.qwen.ai/",
"Origin": "https://chat.qwen.ai",
"sec-ch-ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
}
_IMPERSONATE = "chrome124"
# ✅ 全局连接池(避免频繁 TLS 握手)
_global_session: AsyncSession = None
_session_lock: asyncio.Lock = None
async def _get_global_session() -> AsyncSession:
"""获取全局 AsyncSession"""
global _global_session, _session_lock
if _session_lock is None:
_session_lock = asyncio.Lock()
if _global_session is not None:
return _global_session
async with _session_lock:
if _global_session is not None:
return _global_session
_global_session = AsyncSession(impersonate=_IMPERSONATE, timeout=30)
log.info("[HttpxEngine] ✅ 全局连接池已初始化")
return _global_session
async def _close_global_session():
"""关闭全局 session"""
global _global_session
if _global_session:
try:
await _global_session.close()
log.info("[HttpxEngine] ✅ 全局连接池已关闭")
except Exception as e:
log.error(f"[HttpxEngine] 关闭连接池失败: {e}")
finally:
_global_session = None
class HttpxEngine:
"""Direct curl_cffi engine — Chrome TLS fingerprint, same interface as BrowserEngine."""
def __init__(self, pool_size: int = 3, base_url: str = BASE_URL):
self.base_url = base_url
self._started = False
self._ready = asyncio.Event()
async def start(self):
# ✅ 初始化全局连接池
await _get_global_session()
self._started = True
self._ready.set()
log.info("[HttpxEngine] 已启动(curl_cffi Chrome指纹直连模式 + 全局连接池)")
async def stop(self):
self._started = False
# ✅ 关闭全局连接池
await _close_global_session()
log.info("[HttpxEngine] 已停止")
def _auth_headers(self, token: str, cookies: str = "") -> dict:
headers = {**_HEADERS, "Authorization": f"Bearer {token}"}
if cookies:
headers["Cookie"] = cookies
return headers
async def api_call(self, method: str, path: str, token: str, body: dict = None, cookies: str = "") -> dict:
# ✅ 改进:使用全局 session 而不是创建新的
url = self.base_url + path
headers = {**self._auth_headers(token, cookies=cookies), "Content-Type": "application/json"}
data = json.dumps(body, ensure_ascii=False).encode() if body else None
try:
session = await _get_global_session()
resp = await session.request(method, url, headers=headers, data=data)
return {"status": resp.status_code, "body": resp.text}
except Exception as e:
log.error(f"[HttpxEngine] api_call error: {e}")
return {"status": 0, "body": str(e)}
async def fetch_chat(self, token: str, chat_id: str, payload: dict, buffered: bool = False, cookies: str = ""):
"""Stream Qwen SSE via curl_cffi with Chrome TLS fingerprint (with global connection pool)."""
# ✅ 改进:使用全局 session 而不是创建新的
url = self.base_url + f"/api/v2/chat/completions?chat_id={chat_id}"
headers = {
**self._auth_headers(token, cookies=cookies),
"Content-Type": "application/json",
"Accept": "text/event-stream",
}
body_bytes = json.dumps(payload, ensure_ascii=False).encode()
try:
session = await _get_global_session()
async with session.stream("POST", url, headers=headers, data=body_bytes) as resp:
if resp.status_code != 200:
body_chunks = []
async for chunk in resp.aiter_content():
body_chunks.append(chunk)
body_text = b"".join(body_chunks).decode(errors="replace")[:2000]
yield {"status": resp.status_code, "body": body_text}
return
async for chunk in resp.aiter_content():
decoded = chunk.decode("utf-8", errors="replace")
yield {"status": "streamed", "chunk": decoded}
except Exception as e:
log.error(f"[HttpxEngine] fetch_chat error: {e}")
yield {"status": 0, "body": str(e)}
|