File size: 5,244 Bytes
d3f91ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
httpx_engine.py — 用 curl_cffi 直连 Qwen API(Chrome TLS 指纹)
优点:TLS 指纹与真实 Chrome 一致,无编码问题,支持流式早期中止
优化:使用全局连接池,避免频繁 TLS 握手
"""

import asyncio
import json
import logging
from curl_cffi.requests import AsyncSession

log = logging.getLogger("qwen2api.httpx_engine")

BASE_URL = "https://chat.qwen.ai"

_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
    "Accept": "application/json, text/plain, */*",
    "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
    "Referer": "https://chat.qwen.ai/",
    "Origin": "https://chat.qwen.ai",
    "sec-ch-ua": '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-platform": '"Windows"',
    "sec-fetch-dest": "empty",
    "sec-fetch-mode": "cors",
    "sec-fetch-site": "same-origin",
}

_IMPERSONATE = "chrome124"

# ✅ 全局连接池(避免频繁 TLS 握手)
_global_session: AsyncSession = None
_session_lock: asyncio.Lock = None


async def _get_global_session() -> AsyncSession:
    """获取全局 AsyncSession"""
    global _global_session, _session_lock

    if _session_lock is None:
        _session_lock = asyncio.Lock()

    if _global_session is not None:
        return _global_session

    async with _session_lock:
        if _global_session is not None:
            return _global_session

        _global_session = AsyncSession(impersonate=_IMPERSONATE, timeout=30)
        log.info("[HttpxEngine] ✅ 全局连接池已初始化")
        return _global_session


async def _close_global_session():
    """关闭全局 session"""
    global _global_session
    if _global_session:
        try:
            await _global_session.close()
            log.info("[HttpxEngine] ✅ 全局连接池已关闭")
        except Exception as e:
            log.error(f"[HttpxEngine] 关闭连接池失败: {e}")
        finally:
            _global_session = None


class HttpxEngine:
    """Direct curl_cffi engine — Chrome TLS fingerprint, same interface as BrowserEngine."""

    def __init__(self, pool_size: int = 3, base_url: str = BASE_URL):
        self.base_url = base_url
        self._started = False
        self._ready = asyncio.Event()

    async def start(self):
        # ✅ 初始化全局连接池
        await _get_global_session()
        self._started = True
        self._ready.set()
        log.info("[HttpxEngine] 已启动(curl_cffi Chrome指纹直连模式 + 全局连接池)")

    async def stop(self):
        self._started = False
        # ✅ 关闭全局连接池
        await _close_global_session()
        log.info("[HttpxEngine] 已停止")

    def _auth_headers(self, token: str, cookies: str = "") -> dict:
        headers = {**_HEADERS, "Authorization": f"Bearer {token}"}
        if cookies:
            headers["Cookie"] = cookies
        return headers

    async def api_call(self, method: str, path: str, token: str, body: dict = None, cookies: str = "") -> dict:
        # ✅ 改进:使用全局 session 而不是创建新的
        url = self.base_url + path
        headers = {**self._auth_headers(token, cookies=cookies), "Content-Type": "application/json"}
        data = json.dumps(body, ensure_ascii=False).encode() if body else None
        try:
            session = await _get_global_session()
            resp = await session.request(method, url, headers=headers, data=data)
            return {"status": resp.status_code, "body": resp.text}
        except Exception as e:
            log.error(f"[HttpxEngine] api_call error: {e}")
            return {"status": 0, "body": str(e)}

    async def fetch_chat(self, token: str, chat_id: str, payload: dict, buffered: bool = False, cookies: str = ""):
        """Stream Qwen SSE via curl_cffi with Chrome TLS fingerprint (with global connection pool)."""
        # ✅ 改进:使用全局 session 而不是创建新的
        url = self.base_url + f"/api/v2/chat/completions?chat_id={chat_id}"
        headers = {
            **self._auth_headers(token, cookies=cookies),
            "Content-Type": "application/json",
            "Accept": "text/event-stream",
        }
        body_bytes = json.dumps(payload, ensure_ascii=False).encode()

        try:
            session = await _get_global_session()
            async with session.stream("POST", url, headers=headers, data=body_bytes) as resp:
                if resp.status_code != 200:
                    body_chunks = []
                    async for chunk in resp.aiter_content():
                        body_chunks.append(chunk)
                    body_text = b"".join(body_chunks).decode(errors="replace")[:2000]
                    yield {"status": resp.status_code, "body": body_text}
                    return

                async for chunk in resp.aiter_content():
                    decoded = chunk.decode("utf-8", errors="replace")
                    yield {"status": "streamed", "chunk": decoded}

        except Exception as e:
            log.error(f"[HttpxEngine] fetch_chat error: {e}")
            yield {"status": 0, "body": str(e)}