Spaces:
Paused
Paused
| """配置管理器 - 管理应用配置的读写""" | |
| import toml | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional, Literal | |
| # 默认配置 | |
| DEFAULT_GROK = { | |
| "api_key": "", | |
| "proxy_url": "", | |
| "proxy_pool_url": "", | |
| "proxy_pool_interval": 300, | |
| "cache_proxy_url": "", | |
| "cf_clearance": "", | |
| "x_statsig_id": "", | |
| "dynamic_statsig": True, | |
| "filtered_tags": "xaiartifact,xai:tool_usage_card", | |
| "show_thinking": True, | |
| "temporary": False, | |
| "max_upload_concurrency": 20, | |
| "max_request_concurrency": 100, | |
| "stream_first_response_timeout": 30, | |
| "stream_chunk_timeout": 120, | |
| "stream_total_timeout": 600, | |
| "retry_status_codes": [401, 429], # 可重试的HTTP状态码 | |
| } | |
| DEFAULT_GLOBAL = { | |
| "base_url": "http://localhost:8000", | |
| "log_level": "INFO", | |
| "image_mode": "url", | |
| "admin_password": "admin", | |
| "admin_username": "admin", | |
| "image_cache_max_size_mb": 512, | |
| "video_cache_max_size_mb": 1024, | |
| "max_upload_concurrency": 20, # 最大并发上传数 | |
| "max_request_concurrency": 50, # 最大并发请求数 | |
| "batch_save_interval": 1.0, # 批量保存间隔(秒) | |
| "batch_save_threshold": 10 # 触发批量保存的变更数阈值 | |
| } | |
| class ConfigManager: | |
| """配置管理器""" | |
| def __init__(self) -> None: | |
| """初始化配置""" | |
| self.config_path: Path = Path(__file__).parents[2] / "data" / "setting.toml" | |
| self._storage: Optional[Any] = None | |
| self._ensure_exists() | |
| self.global_config: Dict[str, Any] = self.load("global") | |
| self.grok_config: Dict[str, Any] = self.load("grok") | |
| def _ensure_exists(self) -> None: | |
| """确保配置存在""" | |
| if not self.config_path.exists(): | |
| self.config_path.parent.mkdir(parents=True, exist_ok=True) | |
| self._create_default() | |
| def _create_default(self) -> None: | |
| """创建默认配置""" | |
| default = {"grok": DEFAULT_GROK.copy(), "global": DEFAULT_GLOBAL.copy()} | |
| with open(self.config_path, "w", encoding="utf-8") as f: | |
| toml.dump(default, f) | |
| def _normalize_proxy(self, proxy: str) -> str: | |
| """标准化代理URL(sock5/socks5 → socks5h://)""" | |
| if not proxy: | |
| return proxy | |
| proxy = proxy.strip() | |
| if proxy.startswith("sock5h://"): | |
| proxy = proxy.replace("sock5h://", "socks5h://", 1) | |
| if proxy.startswith("sock5://"): | |
| proxy = proxy.replace("sock5://", "socks5://", 1) | |
| if proxy.startswith("socks5://"): | |
| return proxy.replace("socks5://", "socks5h://", 1) | |
| return proxy | |
| def _normalize_cf(self, cf: str) -> str: | |
| """标准化CF Clearance(自动添加前缀)""" | |
| if cf and not cf.startswith("cf_clearance="): | |
| return f"cf_clearance={cf}" | |
| return cf | |
| def set_storage(self, storage: Any) -> None: | |
| """设置存储实例""" | |
| self._storage = storage | |
| def load(self, section: Literal["global", "grok"]) -> Dict[str, Any]: | |
| """加载配置节""" | |
| try: | |
| with open(self.config_path, "r", encoding="utf-8") as f: | |
| config = toml.load(f)[section] | |
| # 标准化Grok配置 | |
| if section == "grok": | |
| if "proxy_url" in config: | |
| config["proxy_url"] = self._normalize_proxy(config["proxy_url"]) | |
| if "cache_proxy_url" in config: | |
| config["cache_proxy_url"] = self._normalize_proxy(config["cache_proxy_url"]) | |
| if "cf_clearance" in config: | |
| config["cf_clearance"] = self._normalize_cf(config["cf_clearance"]) | |
| return config | |
| except Exception as e: | |
| raise Exception(f"[Setting] 配置加载失败: {e}") from e | |
| async def reload(self) -> None: | |
| """重新加载配置""" | |
| self.global_config = self.load("global") | |
| self.grok_config = self.load("grok") | |
| async def _save_file(self, updates: Dict[str, Dict[str, Any]]) -> None: | |
| """保存到文件""" | |
| import aiofiles | |
| async with aiofiles.open(self.config_path, "r", encoding="utf-8") as f: | |
| config = toml.loads(await f.read()) | |
| for section, data in updates.items(): | |
| if section in config: | |
| config[section].update(data) | |
| async with aiofiles.open(self.config_path, "w", encoding="utf-8") as f: | |
| await f.write(toml.dumps(config)) | |
| async def _save_storage(self, updates: Dict[str, Dict[str, Any]]) -> None: | |
| """保存到存储""" | |
| config = await self._storage.load_config() | |
| for section, data in updates.items(): | |
| if section in config: | |
| config[section].update(data) | |
| await self._storage.save_config(config) | |
| def _prepare_grok(self, grok: Dict[str, Any]) -> Dict[str, Any]: | |
| """准备Grok配置(移除前缀)""" | |
| processed = grok.copy() | |
| if "cf_clearance" in processed: | |
| cf = processed["cf_clearance"] | |
| if cf and cf.startswith("cf_clearance="): | |
| processed["cf_clearance"] = cf.replace("cf_clearance=", "", 1) | |
| return processed | |
| async def save(self, global_config: Optional[Dict[str, Any]] = None, grok_config: Optional[Dict[str, Any]] = None) -> None: | |
| """保存配置""" | |
| updates = {} | |
| if global_config: | |
| updates["global"] = global_config | |
| if grok_config: | |
| updates["grok"] = self._prepare_grok(grok_config) | |
| # 选择存储方式 | |
| if self._storage: | |
| await self._save_storage(updates) | |
| else: | |
| await self._save_file(updates) | |
| await self.reload() | |
| async def get_proxy_async(self, proxy_type: Literal["service", "cache"] = "service") -> str: | |
| """异步获取代理URL(支持代理池) | |
| Args: | |
| proxy_type: 代理类型 | |
| - service: 服务代理(client/upload) | |
| - cache: 缓存代理(cache) | |
| """ | |
| from app.core.proxy_pool import proxy_pool | |
| if proxy_type == "cache": | |
| cache_proxy = self.grok_config.get("cache_proxy_url", "") | |
| if cache_proxy: | |
| return cache_proxy | |
| # 从代理池获取 | |
| return await proxy_pool.get_proxy() or "" | |
| def get_proxy(self, proxy_type: Literal["service", "cache"] = "service") -> str: | |
| """获取代理URL(同步方法,用于向后兼容) | |
| Args: | |
| proxy_type: 代理类型 | |
| - service: 服务代理(client/upload) | |
| - cache: 缓存代理(cache) | |
| """ | |
| from app.core.proxy_pool import proxy_pool | |
| if proxy_type == "cache": | |
| cache_proxy = self.grok_config.get("cache_proxy_url", "") | |
| if cache_proxy: | |
| return cache_proxy | |
| # 返回当前代理(如果是代理池,返回最后一次获取的) | |
| return proxy_pool.get_current_proxy() or self.grok_config.get("proxy_url", "") | |
| # 全局实例 | |
| setting = ConfigManager() | |