Spaces:
Sleeping
Sleeping
| """ | |
| MongoDB数据库管理器,使用单文档设计和统一缓存。 | |
| 所有凭证数据存储在一个文档中,配置数据存储在另一个文档中,类似TOML文件结构。 | |
| """ | |
| import asyncio | |
| import os | |
| import time | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List, Optional | |
| from collections import deque | |
| import motor.motor_asyncio | |
| from log import log | |
| from .cache_manager import UnifiedCacheManager, CacheBackend | |
| class MongoDBCacheBackend(CacheBackend): | |
| """MongoDB缓存后端实现""" | |
| def __init__(self, db, collection_name: str, doc_key: str): | |
| self._db = db | |
| self._collection_name = collection_name | |
| self._doc_key = doc_key | |
| async def load_data(self) -> Dict[str, Any]: | |
| """从MongoDB文档加载数据""" | |
| try: | |
| collection = self._db[self._collection_name] | |
| doc = await collection.find_one({"key": self._doc_key}) | |
| if doc and "data" in doc: | |
| return doc["data"] | |
| return {} | |
| except Exception as e: | |
| log.error(f"Error loading data from MongoDB document {self._doc_key}: {e}") | |
| return {} | |
| async def write_data(self, data: Dict[str, Any]) -> bool: | |
| """将数据写入MongoDB文档""" | |
| try: | |
| collection = self._db[self._collection_name] | |
| doc = { | |
| "key": self._doc_key, | |
| "data": data, | |
| "updated_at": datetime.now(timezone.utc) | |
| } | |
| await collection.replace_one( | |
| {"key": self._doc_key}, | |
| doc, | |
| upsert=True | |
| ) | |
| return True | |
| except Exception as e: | |
| log.error(f"Error writing data to MongoDB document {self._doc_key}: {e}") | |
| return False | |
| class MongoDBManager: | |
| """MongoDB数据库管理器""" | |
| def __init__(self): | |
| self._client: Optional[motor.motor_asyncio.AsyncIOMotorClient] = None | |
| self._db: Optional[motor.motor_asyncio.AsyncIOMotorDatabase] = None | |
| self._initialized = False | |
| self._lock = asyncio.Lock() | |
| # 配置 | |
| self._connection_uri = None | |
| self._database_name = None | |
| # 单文档设计 - 所有凭证存在一个文档中(类似TOML文件) | |
| self._collection_name = "credentials_data" | |
| # 性能监控 | |
| self._operation_count = 0 | |
| self._operation_times = deque(maxlen=5000) | |
| # 统一缓存管理器 | |
| self._credentials_cache_manager: Optional[UnifiedCacheManager] = None | |
| self._config_cache_manager: Optional[UnifiedCacheManager] = None | |
| # 文档key定义 | |
| self._credentials_doc_key = "all_credentials" | |
| self._config_doc_key = "config_data" | |
| # 写入配置参数 | |
| self._write_delay = 1.0 # 写入延迟(秒) | |
| self._cache_ttl = 300 # 缓存TTL(秒) | |
| async def initialize(self): | |
| """初始化MongoDB连接""" | |
| async with self._lock: | |
| if self._initialized: | |
| return | |
| try: | |
| # 获取连接配置 | |
| self._connection_uri = os.getenv("MONGODB_URI") | |
| self._database_name = os.getenv("MONGODB_DATABASE", "gcli2api") | |
| if not self._connection_uri: | |
| raise ValueError("MONGODB_URI environment variable is required") | |
| # 建立连接 | |
| self._client = motor.motor_asyncio.AsyncIOMotorClient( | |
| self._connection_uri, | |
| serverSelectionTimeoutMS=5000, | |
| maxPoolSize=100, | |
| minPoolSize=10, | |
| maxIdleTimeMS=45000, | |
| waitQueueTimeoutMS=10000, | |
| ) | |
| # 验证连接 | |
| await self._client.admin.command('ping') | |
| # 获取数据库 | |
| self._db = self._client[self._database_name] | |
| # 创建索引 | |
| await self._create_indexes() | |
| # 创建缓存管理器 | |
| credentials_backend = MongoDBCacheBackend(self._db, self._collection_name, self._credentials_doc_key) | |
| config_backend = MongoDBCacheBackend(self._db, self._collection_name, self._config_doc_key) | |
| self._credentials_cache_manager = UnifiedCacheManager( | |
| credentials_backend, | |
| cache_ttl=self._cache_ttl, | |
| write_delay=self._write_delay, | |
| name="credentials" | |
| ) | |
| self._config_cache_manager = UnifiedCacheManager( | |
| config_backend, | |
| cache_ttl=self._cache_ttl, | |
| write_delay=self._write_delay, | |
| name="config" | |
| ) | |
| # 启动缓存管理器 | |
| await self._credentials_cache_manager.start() | |
| await self._config_cache_manager.start() | |
| self._initialized = True | |
| log.info(f"MongoDB connection established to {self._database_name} with unified cache") | |
| except Exception as e: | |
| log.error(f"Error initializing MongoDB: {e}") | |
| raise | |
| async def _create_indexes(self): | |
| """创建简单索引(单文档设计)""" | |
| try: | |
| # 单文档设计只需要主键索引 | |
| await self._db[self._collection_name].create_index("key", unique=True) | |
| await self._db[self._collection_name].create_index("updated_at") | |
| log.info("MongoDB indexes created for single-document design") | |
| except Exception as e: | |
| log.error(f"Error creating MongoDB indexes: {e}") | |
| async def close(self): | |
| """关闭MongoDB连接""" | |
| # 停止缓存管理器 | |
| if self._credentials_cache_manager: | |
| await self._credentials_cache_manager.stop() | |
| if self._config_cache_manager: | |
| await self._config_cache_manager.stop() | |
| if self._client: | |
| self._client.close() | |
| self._initialized = False | |
| log.info("MongoDB connection closed with unified cache flushed") | |
| def _ensure_initialized(self): | |
| """确保已初始化""" | |
| if not self._initialized: | |
| raise RuntimeError("MongoDB manager not initialized") | |
| def _get_default_state(self) -> Dict[str, Any]: | |
| """获取默认状态数据""" | |
| return { | |
| "error_codes": [], | |
| "disabled": False, | |
| "last_success": time.time(), | |
| "user_email": None, | |
| } | |
| def _get_default_stats(self) -> Dict[str, Any]: | |
| """获取默认统计数据""" | |
| return { | |
| "gemini_2_5_pro_calls": 0, | |
| "total_calls": 0, | |
| "next_reset_time": None, | |
| "daily_limit_gemini_2_5_pro": 100, | |
| "daily_limit_total": 1000 | |
| } | |
| # ============ 凭证管理 ============ | |
| async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool: | |
| """存储凭证数据到统一缓存""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| # 获取现有数据或创建新数据 | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| credential_entry = { | |
| "credential": credential_data, | |
| "state": existing_data.get("state", self._get_default_state()), | |
| "stats": existing_data.get("stats", self._get_default_stats()) | |
| } | |
| success = await self._credentials_cache_manager.set(filename, credential_entry) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Stored credential to unified cache: {filename} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error storing credential {filename} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]: | |
| """从统一缓存获取凭证数据""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| if credential_entry and "credential" in credential_entry: | |
| return credential_entry["credential"] | |
| return None | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error retrieving credential {filename} in {operation_time:.3f}s: {e}") | |
| return None | |
| async def list_credentials(self) -> List[str]: | |
| """从统一缓存列出所有凭证文件名""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| filenames = list(all_data.keys()) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Listed {len(filenames)} credentials from unified cache in {operation_time:.3f}s") | |
| return filenames | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error listing credentials in {operation_time:.3f}s: {e}") | |
| return [] | |
| async def delete_credential(self, filename: str) -> bool: | |
| """从统一缓存删除凭证及所有相关数据""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| success = await self._credentials_cache_manager.delete(filename) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Deleted credential from unified cache: {filename} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error deleting credential {filename} in {operation_time:.3f}s: {e}") | |
| return False | |
| # ============ 状态管理 ============ | |
| async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool: | |
| """更新凭证状态(使用统一缓存)""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| # 获取现有数据或创建新数据 | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| if not existing_data: | |
| existing_data = { | |
| "credential": {}, | |
| "state": self._get_default_state(), | |
| "stats": self._get_default_stats() | |
| } | |
| # 更新状态数据 | |
| existing_data["state"].update(state_updates) | |
| success = await self._credentials_cache_manager.set(filename, existing_data) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Updated credential state in unified cache: {filename} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error updating credential state {filename} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def get_credential_state(self, filename: str) -> Dict[str, Any]: | |
| """从统一缓存获取凭证状态""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| if credential_entry and "state" in credential_entry: | |
| log.debug(f"Retrieved credential state from unified cache: {filename} in {operation_time:.3f}s") | |
| return credential_entry["state"] | |
| else: | |
| # 返回默认状态 | |
| return self._get_default_state() | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting credential state {filename} in {operation_time:.3f}s: {e}") | |
| return self._get_default_state() | |
| async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]: | |
| """从统一缓存获取所有凭证状态""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| states = {} | |
| for filename, cred_data in all_data.items(): | |
| states[filename] = cred_data.get("state", self._get_default_state()) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Retrieved all credential states from unified cache ({len(states)}) in {operation_time:.3f}s") | |
| return states | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting all credential states in {operation_time:.3f}s: {e}") | |
| return {} | |
| # ============ 配置管理 ============ | |
| async def set_config(self, key: str, value: Any) -> bool: | |
| """设置配置到统一缓存""" | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.set(key, value) | |
| async def get_config(self, key: str, default: Any = None) -> Any: | |
| """从统一缓存获取配置""" | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.get(key, default) | |
| async def get_all_config(self) -> Dict[str, Any]: | |
| """从统一缓存获取所有配置""" | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.get_all() | |
| async def delete_config(self, key: str) -> bool: | |
| """从统一缓存删除配置""" | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.delete(key) | |
| # ============ 使用统计管理 ============ | |
| async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool: | |
| """更新使用统计(使用统一缓存)""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| # 获取现有数据或创建新数据 | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| if not existing_data: | |
| existing_data = { | |
| "credential": {}, | |
| "state": self._get_default_state(), | |
| "stats": self._get_default_stats() | |
| } | |
| # 更新统计数据 | |
| existing_data["stats"].update(stats_updates) | |
| success = await self._credentials_cache_manager.set(filename, existing_data) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Updated usage stats in unified cache: {filename} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error updating usage stats {filename} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def get_usage_stats(self, filename: str) -> Dict[str, Any]: | |
| """从统一缓存获取使用统计""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| if credential_entry and "stats" in credential_entry: | |
| log.debug(f"Retrieved usage stats from unified cache: {filename} in {operation_time:.3f}s") | |
| return credential_entry["stats"] | |
| else: | |
| return self._get_default_stats() | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting usage stats {filename} in {operation_time:.3f}s: {e}") | |
| return self._get_default_stats() | |
| async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]: | |
| """从统一缓存获取所有使用统计""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| stats = {} | |
| for filename, cred_data in all_data.items(): | |
| if "stats" in cred_data: | |
| stats[filename] = cred_data["stats"] | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Retrieved all usage stats from unified cache ({len(stats)}) in {operation_time:.3f}s") | |
| return stats | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting all usage stats in {operation_time:.3f}s: {e}") | |
| return {} |