Spaces:
Sleeping
Sleeping
| """ | |
| Redis数据库管理器,使用哈希表设计和统一缓存。 | |
| 所有凭证数据存储在一个哈希表中,配置数据存储在另一个哈希表中。 | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import time | |
| from typing import Dict, Any, List, Optional | |
| from collections import deque | |
| import redis.asyncio as redis | |
| from log import log | |
| from .cache_manager import UnifiedCacheManager, CacheBackend | |
| class RedisCacheBackend(CacheBackend): | |
| """Redis缓存后端实现""" | |
| def __init__(self, redis_client: redis.Redis, hash_name: str): | |
| self._client = redis_client | |
| self._hash_name = hash_name | |
| async def load_data(self) -> Dict[str, Any]: | |
| """从Redis哈希表加载数据""" | |
| try: | |
| hash_data = await self._client.hgetall(self._hash_name) | |
| if not hash_data: | |
| return {} | |
| result = {} | |
| for key, value_str in hash_data.items(): | |
| try: | |
| result[key] = json.loads(value_str) | |
| except json.JSONDecodeError as e: | |
| log.error(f"Error deserializing Redis data for key {key}: {e}") | |
| continue | |
| return result | |
| except Exception as e: | |
| log.error(f"Error loading data from Redis hash {self._hash_name}: {e}") | |
| return {} | |
| async def write_data(self, data: Dict[str, Any]) -> bool: | |
| """将数据写入Redis哈希表""" | |
| try: | |
| if not data: | |
| await self._client.delete(self._hash_name) | |
| return True | |
| hash_data = {} | |
| for key, value in data.items(): | |
| try: | |
| hash_data[key] = json.dumps(value, ensure_ascii=False) | |
| except (TypeError, ValueError) as e: | |
| log.error(f"Error serializing data for key {key}: {e}") | |
| continue | |
| if not hash_data: | |
| return True | |
| pipe = self._client.pipeline() | |
| pipe.delete(self._hash_name) | |
| pipe.hset(self._hash_name, mapping=hash_data) | |
| await pipe.execute() | |
| return True | |
| except Exception as e: | |
| log.error(f"Error writing data to Redis hash {self._hash_name}: {e}") | |
| return False | |
| class RedisManager: | |
| """Redis数据库管理器""" | |
| def __init__(self): | |
| self._client: Optional[redis.Redis] = None | |
| self._initialized = False | |
| self._lock = asyncio.Lock() | |
| # 配置 | |
| self._connection_uri = None | |
| self._database_index = 0 | |
| # 哈希表设计 - 所有凭证存在一个哈希表中 | |
| self._credentials_hash_name = "gcli2api:credentials" | |
| self._config_hash_name = "gcli2api:config" | |
| # 性能监控 | |
| self._operation_count = 0 | |
| self._operation_times = deque(maxlen=5000) | |
| # 统一缓存管理器 | |
| self._credentials_cache_manager: Optional[UnifiedCacheManager] = None | |
| self._config_cache_manager: Optional[UnifiedCacheManager] = None | |
| # 写入配置参数 | |
| self._write_delay = 1.0 # 写入延迟(秒) | |
| self._cache_ttl = 300 # 缓存TTL(秒) | |
| async def initialize(self): | |
| """初始化Redis连接""" | |
| async with self._lock: | |
| if self._initialized: | |
| return | |
| try: | |
| # 获取连接配置 | |
| self._connection_uri = os.getenv("REDIS_URI", "redis://localhost:6379") | |
| self._database_index = int(os.getenv("REDIS_DATABASE", "0")) | |
| # 建立连接 - 使用最简配置确保兼容性 | |
| # 检查是否需要 SSL | |
| if self._connection_uri.startswith("rediss://"): | |
| # SSL 连接 | |
| self._client = redis.from_url( | |
| self._connection_uri, | |
| db=self._database_index, | |
| decode_responses=True, | |
| ssl_cert_reqs=None, | |
| ssl_check_hostname=False, | |
| ssl_ca_certs=None | |
| ) | |
| else: | |
| # 普通连接 | |
| self._client = redis.from_url( | |
| self._connection_uri, | |
| db=self._database_index, | |
| decode_responses=True | |
| ) | |
| # 验证连接 | |
| await self._client.ping() | |
| # 创建缓存管理器 | |
| credentials_backend = RedisCacheBackend(self._client, self._credentials_hash_name) | |
| config_backend = RedisCacheBackend(self._client, self._config_hash_name) | |
| self._credentials_cache_manager = UnifiedCacheManager( | |
| credentials_backend, | |
| cache_ttl=self._cache_ttl, | |
| write_delay=self._write_delay, | |
| name="credentials" | |
| ) | |
| self._config_cache_manager = UnifiedCacheManager( | |
| config_backend, | |
| cache_ttl=self._cache_ttl, | |
| write_delay=self._write_delay, | |
| name="config" | |
| ) | |
| # 启动缓存管理器 | |
| await self._credentials_cache_manager.start() | |
| await self._config_cache_manager.start() | |
| self._initialized = True | |
| log.info(f"Redis connection established to database {self._database_index} with unified cache") | |
| except Exception as e: | |
| log.error(f"Error initializing Redis: {e}") | |
| raise | |
| async def close(self): | |
| """关闭Redis连接""" | |
| # 停止缓存管理器 | |
| if self._credentials_cache_manager: | |
| await self._credentials_cache_manager.stop() | |
| if self._config_cache_manager: | |
| await self._config_cache_manager.stop() | |
| if self._client: | |
| await self._client.close() | |
| self._initialized = False | |
| log.info("Redis connection closed with unified cache flushed") | |
| def _ensure_initialized(self): | |
| """确保已初始化""" | |
| if not self._initialized: | |
| raise RuntimeError("Redis manager not initialized") | |
| def _get_default_state(self) -> Dict[str, Any]: | |
| """获取默认状态数据""" | |
| return { | |
| "error_codes": [], | |
| "disabled": False, | |
| "last_success": time.time(), | |
| "user_email": None, | |
| } | |
| def _get_default_stats(self) -> Dict[str, Any]: | |
| """获取默认统计数据""" | |
| return { | |
| "gemini_2_5_pro_calls": 0, | |
| "total_calls": 0, | |
| "next_reset_time": None, | |
| "daily_limit_gemini_2_5_pro": 100, | |
| "daily_limit_total": 1000 | |
| } | |
| # ============ 凭证管理 ============ | |
| async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool: | |
| """存储凭证数据到统一缓存""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| # 获取现有数据或创建新数据 | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| credential_entry = { | |
| "credential": credential_data, | |
| "state": existing_data.get("state", self._get_default_state()), | |
| "stats": existing_data.get("stats", self._get_default_stats()) | |
| } | |
| success = await self._credentials_cache_manager.set(filename, credential_entry) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Stored credential to unified cache: {filename} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error storing credential {filename} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]: | |
| """从统一缓存获取凭证数据""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| if credential_entry and "credential" in credential_entry: | |
| return credential_entry["credential"] | |
| return None | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error retrieving credential {filename} in {operation_time:.3f}s: {e}") | |
| return None | |
| async def list_credentials(self) -> List[str]: | |
| """从统一缓存列出所有凭证文件名""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| filenames = list(all_data.keys()) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Listed {len(filenames)} credentials from unified cache in {operation_time:.3f}s") | |
| return filenames | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error listing credentials in {operation_time:.3f}s: {e}") | |
| return [] | |
| async def delete_credential(self, filename: str) -> bool: | |
| """从统一缓存删除凭证及所有相关数据""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| success = await self._credentials_cache_manager.delete(filename) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Deleted credential from unified cache: {filename} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error deleting credential {filename} in {operation_time:.3f}s: {e}") | |
| return False | |
| # ============ 状态管理 ============ | |
| async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool: | |
| """更新凭证状态(使用统一缓存)""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| # 获取现有数据或创建新数据 | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| if not existing_data: | |
| existing_data = { | |
| "credential": {}, | |
| "state": self._get_default_state(), | |
| "stats": self._get_default_stats() | |
| } | |
| # 更新状态数据 | |
| existing_data["state"].update(state_updates) | |
| success = await self._credentials_cache_manager.set(filename, existing_data) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Updated credential state in unified cache: {filename} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error updating credential state {filename} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def get_credential_state(self, filename: str) -> Dict[str, Any]: | |
| """从统一缓存获取凭证状态""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| if credential_entry and "state" in credential_entry: | |
| log.debug(f"Retrieved credential state from unified cache: {filename} in {operation_time:.3f}s") | |
| return credential_entry["state"] | |
| else: | |
| # 返回默认状态 | |
| return self._get_default_state() | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting credential state {filename} in {operation_time:.3f}s: {e}") | |
| return self._get_default_state() | |
| async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]: | |
| """从统一缓存获取所有凭证状态""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| states = {} | |
| for filename, cred_data in all_data.items(): | |
| states[filename] = cred_data.get("state", self._get_default_state()) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Retrieved all credential states from unified cache ({len(states)}) in {operation_time:.3f}s") | |
| return states | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting all credential states in {operation_time:.3f}s: {e}") | |
| return {} | |
| # ============ 配置管理 ============ | |
| async def set_config(self, key: str, value: Any) -> bool: | |
| """设置配置到统一缓存""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| success = await self._config_cache_manager.set(key, value) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Set config to unified cache: {key} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error setting config {key} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def get_config(self, key: str, default: Any = None) -> Any: | |
| """从统一缓存获取配置""" | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.get(key, default) | |
| async def get_all_config(self) -> Dict[str, Any]: | |
| """从统一缓存获取所有配置""" | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.get_all() | |
| async def delete_config(self, key: str) -> bool: | |
| """从统一缓存删除配置""" | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.delete(key) | |
| # ============ 使用统计管理 ============ | |
| async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool: | |
| """更新使用统计(使用统一缓存)""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| # 获取现有数据或创建新数据 | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| if not existing_data: | |
| existing_data = { | |
| "credential": {}, | |
| "state": self._get_default_state(), | |
| "stats": self._get_default_stats() | |
| } | |
| # 更新统计数据 | |
| existing_data["stats"].update(stats_updates) | |
| success = await self._credentials_cache_manager.set(filename, existing_data) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Updated usage stats in unified cache: {filename} in {operation_time:.3f}s") | |
| return success | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error updating usage stats {filename} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def get_usage_stats(self, filename: str) -> Dict[str, Any]: | |
| """从统一缓存获取使用统计""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| if credential_entry and "stats" in credential_entry: | |
| log.debug(f"Retrieved usage stats from unified cache: {filename} in {operation_time:.3f}s") | |
| return credential_entry["stats"] | |
| else: | |
| return self._get_default_stats() | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting usage stats {filename} in {operation_time:.3f}s: {e}") | |
| return self._get_default_stats() | |
| async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]: | |
| """从统一缓存获取所有使用统计""" | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| stats = {} | |
| for filename, cred_data in all_data.items(): | |
| if "stats" in cred_data: | |
| stats[filename] = cred_data["stats"] | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"Retrieved all usage stats from unified cache ({len(stats)}) in {operation_time:.3f}s") | |
| return stats | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting all usage stats in {operation_time:.3f}s: {e}") | |
| return {} |