grok2api / app /core /config.py
JXJBing's picture
Upload 45 files
1a9e2c2 verified
"""配置管理器 - 管理应用配置的读写"""
import toml
from pathlib import Path
from typing import Dict, Any, Optional, Literal
# 默认配置
DEFAULT_GROK = {
"api_key": "",
"proxy_url": "",
"proxy_pool_url": "",
"proxy_pool_interval": 300,
"cache_proxy_url": "",
"cf_clearance": "",
"x_statsig_id": "",
"dynamic_statsig": True,
"filtered_tags": "xaiartifact,xai:tool_usage_card",
"show_thinking": True,
"temporary": False,
"max_upload_concurrency": 20,
"max_request_concurrency": 100,
"stream_first_response_timeout": 30,
"stream_chunk_timeout": 120,
"stream_total_timeout": 600,
"retry_status_codes": [401, 429], # 可重试的HTTP状态码
}
DEFAULT_GLOBAL = {
"base_url": "http://localhost:8000",
"log_level": "INFO",
"image_mode": "url",
"admin_password": "admin",
"admin_username": "admin",
"image_cache_max_size_mb": 512,
"video_cache_max_size_mb": 1024,
"max_upload_concurrency": 20, # 最大并发上传数
"max_request_concurrency": 50, # 最大并发请求数
"batch_save_interval": 1.0, # 批量保存间隔(秒)
"batch_save_threshold": 10 # 触发批量保存的变更数阈值
}
class ConfigManager:
"""配置管理器"""
def __init__(self) -> None:
"""初始化配置"""
self.config_path: Path = Path(__file__).parents[2] / "data" / "setting.toml"
self._storage: Optional[Any] = None
self._ensure_exists()
self.global_config: Dict[str, Any] = self.load("global")
self.grok_config: Dict[str, Any] = self.load("grok")
def _ensure_exists(self) -> None:
"""确保配置存在"""
if not self.config_path.exists():
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self._create_default()
def _create_default(self) -> None:
"""创建默认配置"""
default = {"grok": DEFAULT_GROK.copy(), "global": DEFAULT_GLOBAL.copy()}
with open(self.config_path, "w", encoding="utf-8") as f:
toml.dump(default, f)
def _normalize_proxy(self, proxy: str) -> str:
"""标准化代理URL(sock5/socks5 → socks5h://)"""
if not proxy:
return proxy
proxy = proxy.strip()
if proxy.startswith("sock5h://"):
proxy = proxy.replace("sock5h://", "socks5h://", 1)
if proxy.startswith("sock5://"):
proxy = proxy.replace("sock5://", "socks5://", 1)
if proxy.startswith("socks5://"):
return proxy.replace("socks5://", "socks5h://", 1)
return proxy
def _normalize_cf(self, cf: str) -> str:
"""标准化CF Clearance(自动添加前缀)"""
if cf and not cf.startswith("cf_clearance="):
return f"cf_clearance={cf}"
return cf
def set_storage(self, storage: Any) -> None:
"""设置存储实例"""
self._storage = storage
def load(self, section: Literal["global", "grok"]) -> Dict[str, Any]:
"""加载配置节"""
try:
with open(self.config_path, "r", encoding="utf-8") as f:
config = toml.load(f)[section]
# 标准化Grok配置
if section == "grok":
if "proxy_url" in config:
config["proxy_url"] = self._normalize_proxy(config["proxy_url"])
if "cache_proxy_url" in config:
config["cache_proxy_url"] = self._normalize_proxy(config["cache_proxy_url"])
if "cf_clearance" in config:
config["cf_clearance"] = self._normalize_cf(config["cf_clearance"])
return config
except Exception as e:
raise Exception(f"[Setting] 配置加载失败: {e}") from e
async def reload(self) -> None:
"""重新加载配置"""
self.global_config = self.load("global")
self.grok_config = self.load("grok")
async def _save_file(self, updates: Dict[str, Dict[str, Any]]) -> None:
"""保存到文件"""
import aiofiles
async with aiofiles.open(self.config_path, "r", encoding="utf-8") as f:
config = toml.loads(await f.read())
for section, data in updates.items():
if section in config:
config[section].update(data)
async with aiofiles.open(self.config_path, "w", encoding="utf-8") as f:
await f.write(toml.dumps(config))
async def _save_storage(self, updates: Dict[str, Dict[str, Any]]) -> None:
"""保存到存储"""
config = await self._storage.load_config()
for section, data in updates.items():
if section in config:
config[section].update(data)
await self._storage.save_config(config)
def _prepare_grok(self, grok: Dict[str, Any]) -> Dict[str, Any]:
"""准备Grok配置(移除前缀)"""
processed = grok.copy()
if "cf_clearance" in processed:
cf = processed["cf_clearance"]
if cf and cf.startswith("cf_clearance="):
processed["cf_clearance"] = cf.replace("cf_clearance=", "", 1)
return processed
async def save(self, global_config: Optional[Dict[str, Any]] = None, grok_config: Optional[Dict[str, Any]] = None) -> None:
"""保存配置"""
updates = {}
if global_config:
updates["global"] = global_config
if grok_config:
updates["grok"] = self._prepare_grok(grok_config)
# 选择存储方式
if self._storage:
await self._save_storage(updates)
else:
await self._save_file(updates)
await self.reload()
async def get_proxy_async(self, proxy_type: Literal["service", "cache"] = "service") -> str:
"""异步获取代理URL(支持代理池)
Args:
proxy_type: 代理类型
- service: 服务代理(client/upload)
- cache: 缓存代理(cache)
"""
from app.core.proxy_pool import proxy_pool
if proxy_type == "cache":
cache_proxy = self.grok_config.get("cache_proxy_url", "")
if cache_proxy:
return cache_proxy
# 从代理池获取
return await proxy_pool.get_proxy() or ""
def get_proxy(self, proxy_type: Literal["service", "cache"] = "service") -> str:
"""获取代理URL(同步方法,用于向后兼容)
Args:
proxy_type: 代理类型
- service: 服务代理(client/upload)
- cache: 缓存代理(cache)
"""
from app.core.proxy_pool import proxy_pool
if proxy_type == "cache":
cache_proxy = self.grok_config.get("cache_proxy_url", "")
if cache_proxy:
return cache_proxy
# 返回当前代理(如果是代理池,返回最后一次获取的)
return proxy_pool.get_current_proxy() or self.grok_config.get("proxy_url", "")
# 全局实例
setting = ConfigManager()