"""
Utility functions for the application
"""
import json
import re
import time
import random
from typing import Dict, List, Optional, Any, Tuple, Generator
import requests
from fake_useragent import UserAgent
from app.core.config import settings
from app.core.token_manager import token_manager
# 全局 UserAgent 实例,避免每次调用都创建新实例
_user_agent_instance = None
def get_user_agent_instance() -> UserAgent:
"""获取或创建 UserAgent 实例(单例模式)"""
global _user_agent_instance
if _user_agent_instance is None:
_user_agent_instance = UserAgent()
return _user_agent_instance
def debug_log(message: str, *args) -> None:
"""Log debug message if debug mode is enabled"""
if settings.DEBUG_LOGGING:
if args:
print(f"[DEBUG] {message % args}")
else:
print(f"[DEBUG] {message}")
def generate_request_ids() -> Tuple[str, str]:
"""Generate unique IDs for chat and message"""
timestamp = int(time.time())
chat_id = f"{timestamp * 1000}-{timestamp}"
msg_id = str(timestamp * 1000000)
return chat_id, msg_id
def get_browser_headers(referer_chat_id: str = "") -> Dict[str, str]:
"""Get browser headers for API requests with dynamic User-Agent"""
# 获取 UserAgent 实例
ua = get_user_agent_instance()
# 随机选择一个浏览器类型,偏向使用 Chrome 和 Edge
browser_choices = ['chrome', 'chrome', 'chrome', 'edge', 'edge', 'firefox', 'safari']
browser_type = random.choice(browser_choices)
try:
# 根据浏览器类型获取 User-Agent
if browser_type == 'chrome':
user_agent = ua.chrome
elif browser_type == 'edge':
user_agent = ua.edge
elif browser_type == 'firefox':
user_agent = ua.firefox
elif browser_type == 'safari':
user_agent = ua.safari
else:
user_agent = ua.random
except:
# 如果获取失败,使用随机 User-Agent
user_agent = ua.random
# 提取浏览器版本信息
chrome_version = "139" # 默认版本
edge_version = "139"
if "Chrome/" in user_agent:
try:
chrome_version = user_agent.split("Chrome/")[1].split(".")[0]
except:
pass
if "Edg/" in user_agent:
try:
edge_version = user_agent.split("Edg/")[1].split(".")[0]
# Edge 基于 Chromium,使用 Edge 特定的 sec-ch-ua
sec_ch_ua = f'"Microsoft Edge";v="{edge_version}", "Chromium";v="{chrome_version}", "Not_A Brand";v="24"'
except:
sec_ch_ua = f'"Not_A Brand";v="8", "Chromium";v="{chrome_version}", "Google Chrome";v="{chrome_version}"'
elif "Firefox/" in user_agent:
# Firefox 不使用 sec-ch-ua
sec_ch_ua = None
else:
# Chrome 或其他基于 Chromium 的浏览器
sec_ch_ua = f'"Not_A Brand";v="8", "Chromium";v="{chrome_version}", "Google Chrome";v="{chrome_version}"'
# 构建动态 Headers
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": user_agent,
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"X-FE-Version": "prod-fe-1.0.70",
"Origin": settings.CLIENT_HEADERS["Origin"],
"Cache-Control": "no-cache",
"Pragma": "no-cache",
}
# 只有基于 Chromium 的浏览器才添加 sec-ch-ua
if sec_ch_ua:
headers["sec-ch-ua"] = sec_ch_ua
# 添加 Referer
if referer_chat_id:
headers["Referer"] = f"{settings.CLIENT_HEADERS['Origin']}/c/{referer_chat_id}"
# 调试日志
if settings.DEBUG_LOGGING:
debug_log(f"使用 User-Agent: {user_agent[:100]}...")
return headers
def get_anonymous_token() -> str:
"""Get anonymous token for authentication"""
headers = get_browser_headers()
headers.update({
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Referer": f"{settings.CLIENT_HEADERS['Origin']}/",
})
try:
response = requests.get(
f"{settings.CLIENT_HEADERS['Origin']}/api/v1/auths/",
headers=headers,
timeout=10.0
)
if response.status_code != 200:
raise Exception(f"anon token status={response.status_code}")
data = response.json()
token = data.get("token")
if not token:
raise Exception("anon token empty")
return token
except Exception as e:
debug_log(f"获取匿名token失败: {e}")
raise
def get_auth_token() -> str:
"""Get authentication token (anonymous or from token pool)"""
if settings.ANONYMOUS_MODE:
try:
token = get_anonymous_token()
debug_log(f"匿名token获取成功: {token[:10]}...")
return token
except Exception as e:
debug_log(f"匿名token获取失败,使用token池: {e}")
# Use token pool for load balancing
token = token_manager.get_next_token()
if token:
debug_log(f"从token池获取token: {token[:10]}...")
return token
else:
debug_log("token池无可用token,使用配置文件备用token")
return settings.BACKUP_TOKEN
def transform_thinking_content(content: str) -> str:
"""Transform thinking content according to configuration"""
# Remove summary tags
content = re.sub(r'(?s).*?', '', content)
# Clean up remaining tags
content = content.replace("", "").replace("", "").replace("", "")
content = content.strip()
if settings.THINKING_PROCESSING == "think":
content = re.sub(r']*>', '', content)
content = content.replace(" ", "")
elif settings.THINKING_PROCESSING == "strip":
content = re.sub(r']*>', '', content)
content = content.replace(" ", "")
# Remove line prefixes
content = content.lstrip("> ")
content = content.replace("\n> ", "\n")
return content.strip()
def call_upstream_api(
upstream_req: Any,
chat_id: str,
auth_token: str
) -> requests.Response:
"""Call upstream API with proper headers"""
headers = get_browser_headers(chat_id)
headers["Authorization"] = f"Bearer {auth_token}"
# 准备请求数据
request_data = upstream_req.model_dump(exclude_none=True)
request_json = upstream_req.model_dump_json()
debug_log(f"调用上游API: {settings.API_ENDPOINT}")
debug_log(f"请求体大小: {len(request_json)} 字符")
# 如果请求体太大,只显示部分内容
if len(request_json) > 1000:
debug_log(f"上游请求体 (截断): {request_json[:500]}...{request_json[-200:]}")
else:
debug_log(f"上游请求体: {request_json}")
# 设置代理(如果配置了)
proxies = {}
if settings.HTTP_PROXY:
proxies['http'] = settings.HTTP_PROXY
if settings.HTTPS_PROXY:
proxies['https'] = settings.HTTPS_PROXY
try:
response = requests.post(
settings.API_ENDPOINT,
json=request_data,
headers=headers,
timeout=(settings.CONNECTION_TIMEOUT, settings.REQUEST_TIMEOUT),
stream=True,
proxies=proxies if proxies else None,
verify=True,
)
debug_log(f"上游响应状态: {response.status_code}")
# 检查响应头
if settings.DEBUG_LOGGING:
content_type = response.headers.get('content-type', 'unknown')
content_length = response.headers.get('content-length', 'unknown')
debug_log(f"响应类型: {content_type}, 长度: {content_length}")
return response
except requests.exceptions.Timeout as e:
debug_log(f"请求超时: {e}")
raise Exception(f"上游API请求超时: {e}")
except requests.exceptions.ConnectionError as e:
debug_log(f"连接错误: {e}")
raise Exception(f"上游API连接失败: {e}")
except requests.exceptions.RequestException as e:
debug_log(f"请求异常: {e}")
raise Exception(f"上游API请求失败: {e}")
except Exception as e:
debug_log(f"未知错误: {e}")
raise