|
|
""" |
|
|
Utility functions for the application |
|
|
""" |
|
|
|
|
|
import json |
|
|
import re |
|
|
import time |
|
|
import random |
|
|
from typing import Dict, List, Optional, Any, Tuple, Generator |
|
|
import requests |
|
|
from fake_useragent import UserAgent |
|
|
|
|
|
from app.core.config import settings |
|
|
from app.core.token_manager import token_manager |
|
|
|
|
|
|
|
|
_user_agent_instance = None |
|
|
|
|
|
def get_user_agent_instance() -> UserAgent: |
|
|
"""获取或创建 UserAgent 实例(单例模式)""" |
|
|
global _user_agent_instance |
|
|
if _user_agent_instance is None: |
|
|
_user_agent_instance = UserAgent() |
|
|
return _user_agent_instance |
|
|
|
|
|
|
|
|
def debug_log(message: str, *args) -> None: |
|
|
"""Log debug message if debug mode is enabled""" |
|
|
if settings.DEBUG_LOGGING: |
|
|
if args: |
|
|
print(f"[DEBUG] {message % args}") |
|
|
else: |
|
|
print(f"[DEBUG] {message}") |
|
|
|
|
|
|
|
|
def generate_request_ids() -> Tuple[str, str]: |
|
|
"""Generate unique IDs for chat and message""" |
|
|
timestamp = int(time.time()) |
|
|
chat_id = f"{timestamp * 1000}-{timestamp}" |
|
|
msg_id = str(timestamp * 1000000) |
|
|
return chat_id, msg_id |
|
|
|
|
|
|
|
|
def get_browser_headers(referer_chat_id: str = "") -> Dict[str, str]: |
|
|
"""Get browser headers for API requests with dynamic User-Agent""" |
|
|
|
|
|
|
|
|
ua = get_user_agent_instance() |
|
|
|
|
|
|
|
|
browser_choices = ['chrome', 'chrome', 'chrome', 'edge', 'edge', 'firefox', 'safari'] |
|
|
browser_type = random.choice(browser_choices) |
|
|
|
|
|
try: |
|
|
|
|
|
if browser_type == 'chrome': |
|
|
user_agent = ua.chrome |
|
|
elif browser_type == 'edge': |
|
|
user_agent = ua.edge |
|
|
elif browser_type == 'firefox': |
|
|
user_agent = ua.firefox |
|
|
elif browser_type == 'safari': |
|
|
user_agent = ua.safari |
|
|
else: |
|
|
user_agent = ua.random |
|
|
except: |
|
|
|
|
|
user_agent = ua.random |
|
|
|
|
|
|
|
|
chrome_version = "139" |
|
|
edge_version = "139" |
|
|
|
|
|
if "Chrome/" in user_agent: |
|
|
try: |
|
|
chrome_version = user_agent.split("Chrome/")[1].split(".")[0] |
|
|
except: |
|
|
pass |
|
|
|
|
|
if "Edg/" in user_agent: |
|
|
try: |
|
|
edge_version = user_agent.split("Edg/")[1].split(".")[0] |
|
|
|
|
|
sec_ch_ua = f'"Microsoft Edge";v="{edge_version}", "Chromium";v="{chrome_version}", "Not_A Brand";v="24"' |
|
|
except: |
|
|
sec_ch_ua = f'"Not_A Brand";v="8", "Chromium";v="{chrome_version}", "Google Chrome";v="{chrome_version}"' |
|
|
elif "Firefox/" in user_agent: |
|
|
|
|
|
sec_ch_ua = None |
|
|
else: |
|
|
|
|
|
sec_ch_ua = f'"Not_A Brand";v="8", "Chromium";v="{chrome_version}", "Google Chrome";v="{chrome_version}"' |
|
|
|
|
|
|
|
|
headers = { |
|
|
"Content-Type": "application/json", |
|
|
"Accept": "application/json, text/event-stream", |
|
|
"User-Agent": user_agent, |
|
|
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-US;q=0.7", |
|
|
"sec-ch-ua-mobile": "?0", |
|
|
"sec-ch-ua-platform": '"Windows"', |
|
|
"sec-fetch-dest": "empty", |
|
|
"sec-fetch-mode": "cors", |
|
|
"sec-fetch-site": "same-origin", |
|
|
"X-FE-Version": "prod-fe-1.0.70", |
|
|
"Origin": settings.CLIENT_HEADERS["Origin"], |
|
|
"Cache-Control": "no-cache", |
|
|
"Pragma": "no-cache", |
|
|
} |
|
|
|
|
|
|
|
|
if sec_ch_ua: |
|
|
headers["sec-ch-ua"] = sec_ch_ua |
|
|
|
|
|
|
|
|
if referer_chat_id: |
|
|
headers["Referer"] = f"{settings.CLIENT_HEADERS['Origin']}/c/{referer_chat_id}" |
|
|
|
|
|
|
|
|
if settings.DEBUG_LOGGING: |
|
|
debug_log(f"使用 User-Agent: {user_agent[:100]}...") |
|
|
|
|
|
return headers |
|
|
|
|
|
|
|
|
def get_anonymous_token() -> str: |
|
|
"""Get anonymous token for authentication""" |
|
|
headers = get_browser_headers() |
|
|
headers.update({ |
|
|
"Accept": "*/*", |
|
|
"Accept-Language": "zh-CN,zh;q=0.9", |
|
|
"Referer": f"{settings.CLIENT_HEADERS['Origin']}/", |
|
|
}) |
|
|
|
|
|
try: |
|
|
response = requests.get( |
|
|
f"{settings.CLIENT_HEADERS['Origin']}/api/v1/auths/", |
|
|
headers=headers, |
|
|
timeout=10.0 |
|
|
) |
|
|
|
|
|
if response.status_code != 200: |
|
|
raise Exception(f"anon token status={response.status_code}") |
|
|
|
|
|
data = response.json() |
|
|
token = data.get("token") |
|
|
if not token: |
|
|
raise Exception("anon token empty") |
|
|
|
|
|
return token |
|
|
except Exception as e: |
|
|
debug_log(f"获取匿名token失败: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
def get_auth_token() -> str: |
|
|
"""Get authentication token (anonymous or from token pool)""" |
|
|
if settings.ANONYMOUS_MODE: |
|
|
try: |
|
|
token = get_anonymous_token() |
|
|
debug_log(f"匿名token获取成功: {token[:10]}...") |
|
|
return token |
|
|
except Exception as e: |
|
|
debug_log(f"匿名token获取失败,使用token池: {e}") |
|
|
|
|
|
|
|
|
token = token_manager.get_next_token() |
|
|
if token: |
|
|
debug_log(f"从token池获取token: {token[:10]}...") |
|
|
return token |
|
|
else: |
|
|
debug_log("token池无可用token,使用配置文件备用token") |
|
|
return settings.BACKUP_TOKEN |
|
|
|
|
|
|
|
|
def transform_thinking_content(content: str) -> str: |
|
|
"""Transform thinking content according to configuration""" |
|
|
|
|
|
content = re.sub(r'(?s)<summary>.*?</summary>', '', content) |
|
|
|
|
|
content = content.replace("</thinking>", "").replace("<Full>", "").replace("</Full>", "") |
|
|
content = content.strip() |
|
|
|
|
|
if settings.THINKING_PROCESSING == "think": |
|
|
content = re.sub(r'<details[^>]*>', '<span>', content) |
|
|
content = content.replace("</details>", "</span>") |
|
|
elif settings.THINKING_PROCESSING == "strip": |
|
|
content = re.sub(r'<details[^>]*>', '', content) |
|
|
content = content.replace("</details>", "") |
|
|
|
|
|
|
|
|
content = content.lstrip("> ") |
|
|
content = content.replace("\n> ", "\n") |
|
|
|
|
|
return content.strip() |
|
|
|
|
|
|
|
|
def call_upstream_api( |
|
|
upstream_req: Any, |
|
|
chat_id: str, |
|
|
auth_token: str |
|
|
) -> requests.Response: |
|
|
"""Call upstream API with proper headers""" |
|
|
headers = get_browser_headers(chat_id) |
|
|
headers["Authorization"] = f"Bearer {auth_token}" |
|
|
|
|
|
|
|
|
request_data = upstream_req.model_dump(exclude_none=True) |
|
|
request_json = upstream_req.model_dump_json() |
|
|
|
|
|
debug_log(f"调用上游API: {settings.API_ENDPOINT}") |
|
|
debug_log(f"请求体大小: {len(request_json)} 字符") |
|
|
|
|
|
|
|
|
if len(request_json) > 1000: |
|
|
debug_log(f"上游请求体 (截断): {request_json[:500]}...{request_json[-200:]}") |
|
|
else: |
|
|
debug_log(f"上游请求体: {request_json}") |
|
|
|
|
|
|
|
|
proxies = {} |
|
|
if settings.HTTP_PROXY: |
|
|
proxies['http'] = settings.HTTP_PROXY |
|
|
if settings.HTTPS_PROXY: |
|
|
proxies['https'] = settings.HTTPS_PROXY |
|
|
|
|
|
try: |
|
|
response = requests.post( |
|
|
settings.API_ENDPOINT, |
|
|
json=request_data, |
|
|
headers=headers, |
|
|
timeout=(settings.CONNECTION_TIMEOUT, settings.REQUEST_TIMEOUT), |
|
|
stream=True, |
|
|
proxies=proxies if proxies else None, |
|
|
verify=True, |
|
|
) |
|
|
|
|
|
debug_log(f"上游响应状态: {response.status_code}") |
|
|
|
|
|
|
|
|
if settings.DEBUG_LOGGING: |
|
|
content_type = response.headers.get('content-type', 'unknown') |
|
|
content_length = response.headers.get('content-length', 'unknown') |
|
|
debug_log(f"响应类型: {content_type}, 长度: {content_length}") |
|
|
|
|
|
return response |
|
|
|
|
|
except requests.exceptions.Timeout as e: |
|
|
debug_log(f"请求超时: {e}") |
|
|
raise Exception(f"上游API请求超时: {e}") |
|
|
except requests.exceptions.ConnectionError as e: |
|
|
debug_log(f"连接错误: {e}") |
|
|
raise Exception(f"上游API连接失败: {e}") |
|
|
except requests.exceptions.RequestException as e: |
|
|
debug_log(f"请求异常: {e}") |
|
|
raise Exception(f"上游API请求失败: {e}") |
|
|
except Exception as e: |
|
|
debug_log(f"未知错误: {e}") |
|
|
raise |
|
|
|