File size: 8,677 Bytes
f4baae1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
"""
Utility functions for the application
"""

import json
import re
import time
import random
from typing import Dict, List, Optional, Any, Tuple, Generator
import requests
from fake_useragent import UserAgent

from app.core.config import settings
from app.core.token_manager import token_manager

# 全局 UserAgent 实例,避免每次调用都创建新实例
_user_agent_instance = None

def get_user_agent_instance() -> UserAgent:
    """获取或创建 UserAgent 实例(单例模式)"""
    global _user_agent_instance
    if _user_agent_instance is None:
        _user_agent_instance = UserAgent()
    return _user_agent_instance


def debug_log(message: str, *args) -> None:
    """Log debug message if debug mode is enabled"""
    if settings.DEBUG_LOGGING:
        if args:
            print(f"[DEBUG] {message % args}")
        else:
            print(f"[DEBUG] {message}")


def generate_request_ids() -> Tuple[str, str]:
    """Generate unique IDs for chat and message"""
    timestamp = int(time.time())
    chat_id = f"{timestamp * 1000}-{timestamp}"
    msg_id = str(timestamp * 1000000)
    return chat_id, msg_id


def get_browser_headers(referer_chat_id: str = "") -> Dict[str, str]:
    """Get browser headers for API requests with dynamic User-Agent"""
    
    # 获取 UserAgent 实例
    ua = get_user_agent_instance()
    
    # 随机选择一个浏览器类型,偏向使用 Chrome 和 Edge
    browser_choices = ['chrome', 'chrome', 'chrome', 'edge', 'edge', 'firefox', 'safari']
    browser_type = random.choice(browser_choices)
    
    try:
        # 根据浏览器类型获取 User-Agent
        if browser_type == 'chrome':
            user_agent = ua.chrome
        elif browser_type == 'edge':
            user_agent = ua.edge
        elif browser_type == 'firefox':
            user_agent = ua.firefox
        elif browser_type == 'safari':
            user_agent = ua.safari
        else:
            user_agent = ua.random
    except:
        # 如果获取失败,使用随机 User-Agent
        user_agent = ua.random
    
    # 提取浏览器版本信息
    chrome_version = "139"  # 默认版本
    edge_version = "139"
    
    if "Chrome/" in user_agent:
        try:
            chrome_version = user_agent.split("Chrome/")[1].split(".")[0]
        except:
            pass
    
    if "Edg/" in user_agent:
        try:
            edge_version = user_agent.split("Edg/")[1].split(".")[0]
            # Edge 基于 Chromium,使用 Edge 特定的 sec-ch-ua
            sec_ch_ua = f'"Microsoft Edge";v="{edge_version}", "Chromium";v="{chrome_version}", "Not_A Brand";v="24"'
        except:
            sec_ch_ua = f'"Not_A Brand";v="8", "Chromium";v="{chrome_version}", "Google Chrome";v="{chrome_version}"'
    elif "Firefox/" in user_agent:
        # Firefox 不使用 sec-ch-ua
        sec_ch_ua = None
    else:
        # Chrome 或其他基于 Chromium 的浏览器
        sec_ch_ua = f'"Not_A Brand";v="8", "Chromium";v="{chrome_version}", "Google Chrome";v="{chrome_version}"'
    
    # 构建动态 Headers
    headers = {
        "Content-Type": "application/json",
        "Accept": "application/json, text/event-stream",
        "User-Agent": user_agent,
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7",
        "sec-ch-ua-mobile": "?0",
        "sec-ch-ua-platform": '"Windows"',
        "sec-fetch-dest": "empty",
        "sec-fetch-mode": "cors",
        "sec-fetch-site": "same-origin",
        "X-FE-Version": "prod-fe-1.0.70",
        "Origin": settings.CLIENT_HEADERS["Origin"],
        "Cache-Control": "no-cache",
        "Pragma": "no-cache",
    }
    
    # 只有基于 Chromium 的浏览器才添加 sec-ch-ua
    if sec_ch_ua:
        headers["sec-ch-ua"] = sec_ch_ua
    
    # 添加 Referer
    if referer_chat_id:
        headers["Referer"] = f"{settings.CLIENT_HEADERS['Origin']}/c/{referer_chat_id}"
    
    # 调试日志
    if settings.DEBUG_LOGGING:
        debug_log(f"使用 User-Agent: {user_agent[:100]}...")
    
    return headers


def get_anonymous_token() -> str:
    """Get anonymous token for authentication"""
    headers = get_browser_headers()
    headers.update({
        "Accept": "*/*",
        "Accept-Language": "zh-CN,zh;q=0.9",
        "Referer": f"{settings.CLIENT_HEADERS['Origin']}/",
    })
    
    try:
        response = requests.get(
            f"{settings.CLIENT_HEADERS['Origin']}/api/v1/auths/",
            headers=headers,
            timeout=10.0
        )
        
        if response.status_code != 200:
            raise Exception(f"anon token status={response.status_code}")
        
        data = response.json()
        token = data.get("token")
        if not token:
            raise Exception("anon token empty")
        
        return token
    except Exception as e:
        debug_log(f"获取匿名token失败: {e}")
        raise


def get_auth_token() -> str:
    """Get authentication token (anonymous or from token pool)"""
    if settings.ANONYMOUS_MODE:
        try:
            token = get_anonymous_token()
            debug_log(f"匿名token获取成功: {token[:10]}...")
            return token
        except Exception as e:
            debug_log(f"匿名token获取失败,使用token池: {e}")
    
    # Use token pool for load balancing
    token = token_manager.get_next_token()
    if token:
        debug_log(f"从token池获取token: {token[:10]}...")
        return token
    else:
        debug_log("token池无可用token,使用配置文件备用token")
        return settings.BACKUP_TOKEN


def transform_thinking_content(content: str) -> str:
    """Transform thinking content according to configuration"""
    # Remove summary tags
    content = re.sub(r'(?s)<summary>.*?</summary>', '', content)
    # Clean up remaining tags
    content = content.replace("</thinking>", "").replace("<Full>", "").replace("</Full>", "")
    content = content.strip()
    
    if settings.THINKING_PROCESSING == "think":
        content = re.sub(r'<details[^>]*>', '<span>', content)
        content = content.replace("</details>", "</span>")
    elif settings.THINKING_PROCESSING == "strip":
        content = re.sub(r'<details[^>]*>', '', content)
        content = content.replace("</details>", "")
    
    # Remove line prefixes
    content = content.lstrip("> ")
    content = content.replace("\n> ", "\n")
    
    return content.strip()


def call_upstream_api(
    upstream_req: Any,
    chat_id: str,
    auth_token: str
) -> requests.Response:
    """Call upstream API with proper headers"""
    headers = get_browser_headers(chat_id)
    headers["Authorization"] = f"Bearer {auth_token}"
    
    # 准备请求数据
    request_data = upstream_req.model_dump(exclude_none=True)
    request_json = upstream_req.model_dump_json()
    
    debug_log(f"调用上游API: {settings.API_ENDPOINT}")
    debug_log(f"请求体大小: {len(request_json)} 字符")
    
    # 如果请求体太大,只显示部分内容
    if len(request_json) > 1000:
        debug_log(f"上游请求体 (截断): {request_json[:500]}...{request_json[-200:]}")
    else:
        debug_log(f"上游请求体: {request_json}")
    
    # 设置代理(如果配置了)
    proxies = {}
    if settings.HTTP_PROXY:
        proxies['http'] = settings.HTTP_PROXY
    if settings.HTTPS_PROXY:
        proxies['https'] = settings.HTTPS_PROXY
    
    try:
        response = requests.post(
            settings.API_ENDPOINT,
            json=request_data,
            headers=headers,
            timeout=(settings.CONNECTION_TIMEOUT, settings.REQUEST_TIMEOUT),
            stream=True,
            proxies=proxies if proxies else None,
            verify=True,
        )
        
        debug_log(f"上游响应状态: {response.status_code}")
        
        # 检查响应头
        if settings.DEBUG_LOGGING:
            content_type = response.headers.get('content-type', 'unknown')
            content_length = response.headers.get('content-length', 'unknown')
            debug_log(f"响应类型: {content_type}, 长度: {content_length}")
        
        return response
        
    except requests.exceptions.Timeout as e:
        debug_log(f"请求超时: {e}")
        raise Exception(f"上游API请求超时: {e}")
    except requests.exceptions.ConnectionError as e:
        debug_log(f"连接错误: {e}")
        raise Exception(f"上游API连接失败: {e}")
    except requests.exceptions.RequestException as e:
        debug_log(f"请求异常: {e}")
        raise Exception(f"上游API请求失败: {e}")
    except Exception as e:
        debug_log(f"未知错误: {e}")
        raise