gcli2api / src /storage /redis_manager.py
lightspeed's picture
Upload 22 files
5868187 verified
"""
Redis数据库管理器,使用哈希表设计和统一缓存。
所有凭证数据存储在一个哈希表中,配置数据存储在另一个哈希表中。
"""
import asyncio
import json
import os
import time
from typing import Dict, Any, List, Optional
from collections import deque
import redis.asyncio as redis
from log import log
from .cache_manager import UnifiedCacheManager, CacheBackend
class RedisCacheBackend(CacheBackend):
"""Redis缓存后端实现"""
def __init__(self, redis_client: redis.Redis, hash_name: str):
self._client = redis_client
self._hash_name = hash_name
async def load_data(self) -> Dict[str, Any]:
"""从Redis哈希表加载数据"""
try:
hash_data = await self._client.hgetall(self._hash_name)
if not hash_data:
return {}
result = {}
for key, value_str in hash_data.items():
try:
result[key] = json.loads(value_str)
except json.JSONDecodeError as e:
log.error(f"Error deserializing Redis data for key {key}: {e}")
continue
return result
except Exception as e:
log.error(f"Error loading data from Redis hash {self._hash_name}: {e}")
return {}
async def write_data(self, data: Dict[str, Any]) -> bool:
"""将数据写入Redis哈希表"""
try:
if not data:
await self._client.delete(self._hash_name)
return True
hash_data = {}
for key, value in data.items():
try:
hash_data[key] = json.dumps(value, ensure_ascii=False)
except (TypeError, ValueError) as e:
log.error(f"Error serializing data for key {key}: {e}")
continue
if not hash_data:
return True
pipe = self._client.pipeline()
pipe.delete(self._hash_name)
pipe.hset(self._hash_name, mapping=hash_data)
await pipe.execute()
return True
except Exception as e:
log.error(f"Error writing data to Redis hash {self._hash_name}: {e}")
return False
class RedisManager:
"""Redis数据库管理器"""
def __init__(self):
self._client: Optional[redis.Redis] = None
self._initialized = False
self._lock = asyncio.Lock()
# 配置
self._connection_uri = None
self._database_index = 0
# 哈希表设计 - 所有凭证存在一个哈希表中
self._credentials_hash_name = "gcli2api:credentials"
self._config_hash_name = "gcli2api:config"
# 性能监控
self._operation_count = 0
self._operation_times = deque(maxlen=5000)
# 统一缓存管理器
self._credentials_cache_manager: Optional[UnifiedCacheManager] = None
self._config_cache_manager: Optional[UnifiedCacheManager] = None
# 写入配置参数
self._write_delay = 1.0 # 写入延迟(秒)
self._cache_ttl = 300 # 缓存TTL(秒)
async def initialize(self):
"""初始化Redis连接"""
async with self._lock:
if self._initialized:
return
try:
# 获取连接配置
self._connection_uri = os.getenv("REDIS_URI", "redis://localhost:6379")
self._database_index = int(os.getenv("REDIS_DATABASE", "0"))
# 建立连接 - 使用最简配置确保兼容性
# 检查是否需要 SSL
if self._connection_uri.startswith("rediss://"):
# SSL 连接
self._client = redis.from_url(
self._connection_uri,
db=self._database_index,
decode_responses=True,
ssl_cert_reqs=None,
ssl_check_hostname=False,
ssl_ca_certs=None
)
else:
# 普通连接
self._client = redis.from_url(
self._connection_uri,
db=self._database_index,
decode_responses=True
)
# 验证连接
await self._client.ping()
# 创建缓存管理器
credentials_backend = RedisCacheBackend(self._client, self._credentials_hash_name)
config_backend = RedisCacheBackend(self._client, self._config_hash_name)
self._credentials_cache_manager = UnifiedCacheManager(
credentials_backend,
cache_ttl=self._cache_ttl,
write_delay=self._write_delay,
name="credentials"
)
self._config_cache_manager = UnifiedCacheManager(
config_backend,
cache_ttl=self._cache_ttl,
write_delay=self._write_delay,
name="config"
)
# 启动缓存管理器
await self._credentials_cache_manager.start()
await self._config_cache_manager.start()
self._initialized = True
log.info(f"Redis connection established to database {self._database_index} with unified cache")
except Exception as e:
log.error(f"Error initializing Redis: {e}")
raise
async def close(self):
"""关闭Redis连接"""
# 停止缓存管理器
if self._credentials_cache_manager:
await self._credentials_cache_manager.stop()
if self._config_cache_manager:
await self._config_cache_manager.stop()
if self._client:
await self._client.close()
self._initialized = False
log.info("Redis connection closed with unified cache flushed")
def _ensure_initialized(self):
"""确保已初始化"""
if not self._initialized:
raise RuntimeError("Redis manager not initialized")
def _get_default_state(self) -> Dict[str, Any]:
"""获取默认状态数据"""
return {
"error_codes": [],
"disabled": False,
"last_success": time.time(),
"user_email": None,
}
def _get_default_stats(self) -> Dict[str, Any]:
"""获取默认统计数据"""
return {
"gemini_2_5_pro_calls": 0,
"total_calls": 0,
"next_reset_time": None,
"daily_limit_gemini_2_5_pro": 100,
"daily_limit_total": 1000
}
# ============ 凭证管理 ============
async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool:
"""存储凭证数据到统一缓存"""
self._ensure_initialized()
start_time = time.time()
try:
# 获取现有数据或创建新数据
existing_data = await self._credentials_cache_manager.get(filename, {})
credential_entry = {
"credential": credential_data,
"state": existing_data.get("state", self._get_default_state()),
"stats": existing_data.get("stats", self._get_default_stats())
}
success = await self._credentials_cache_manager.set(filename, credential_entry)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Stored credential to unified cache: {filename} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error storing credential {filename} in {operation_time:.3f}s: {e}")
return False
async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]:
"""从统一缓存获取凭证数据"""
self._ensure_initialized()
start_time = time.time()
try:
credential_entry = await self._credentials_cache_manager.get(filename)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
if credential_entry and "credential" in credential_entry:
return credential_entry["credential"]
return None
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error retrieving credential {filename} in {operation_time:.3f}s: {e}")
return None
async def list_credentials(self) -> List[str]:
"""从统一缓存列出所有凭证文件名"""
self._ensure_initialized()
start_time = time.time()
try:
all_data = await self._credentials_cache_manager.get_all()
filenames = list(all_data.keys())
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Listed {len(filenames)} credentials from unified cache in {operation_time:.3f}s")
return filenames
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error listing credentials in {operation_time:.3f}s: {e}")
return []
async def delete_credential(self, filename: str) -> bool:
"""从统一缓存删除凭证及所有相关数据"""
self._ensure_initialized()
start_time = time.time()
try:
success = await self._credentials_cache_manager.delete(filename)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Deleted credential from unified cache: {filename} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error deleting credential {filename} in {operation_time:.3f}s: {e}")
return False
# ============ 状态管理 ============
async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool:
"""更新凭证状态(使用统一缓存)"""
self._ensure_initialized()
start_time = time.time()
try:
# 获取现有数据或创建新数据
existing_data = await self._credentials_cache_manager.get(filename, {})
if not existing_data:
existing_data = {
"credential": {},
"state": self._get_default_state(),
"stats": self._get_default_stats()
}
# 更新状态数据
existing_data["state"].update(state_updates)
success = await self._credentials_cache_manager.set(filename, existing_data)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Updated credential state in unified cache: {filename} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error updating credential state {filename} in {operation_time:.3f}s: {e}")
return False
async def get_credential_state(self, filename: str) -> Dict[str, Any]:
"""从统一缓存获取凭证状态"""
self._ensure_initialized()
start_time = time.time()
try:
credential_entry = await self._credentials_cache_manager.get(filename)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
if credential_entry and "state" in credential_entry:
log.debug(f"Retrieved credential state from unified cache: {filename} in {operation_time:.3f}s")
return credential_entry["state"]
else:
# 返回默认状态
return self._get_default_state()
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error getting credential state {filename} in {operation_time:.3f}s: {e}")
return self._get_default_state()
async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]:
"""从统一缓存获取所有凭证状态"""
self._ensure_initialized()
start_time = time.time()
try:
all_data = await self._credentials_cache_manager.get_all()
states = {}
for filename, cred_data in all_data.items():
states[filename] = cred_data.get("state", self._get_default_state())
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Retrieved all credential states from unified cache ({len(states)}) in {operation_time:.3f}s")
return states
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error getting all credential states in {operation_time:.3f}s: {e}")
return {}
# ============ 配置管理 ============
async def set_config(self, key: str, value: Any) -> bool:
"""设置配置到统一缓存"""
self._ensure_initialized()
start_time = time.time()
try:
success = await self._config_cache_manager.set(key, value)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Set config to unified cache: {key} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error setting config {key} in {operation_time:.3f}s: {e}")
return False
async def get_config(self, key: str, default: Any = None) -> Any:
"""从统一缓存获取配置"""
self._ensure_initialized()
return await self._config_cache_manager.get(key, default)
async def get_all_config(self) -> Dict[str, Any]:
"""从统一缓存获取所有配置"""
self._ensure_initialized()
return await self._config_cache_manager.get_all()
async def delete_config(self, key: str) -> bool:
"""从统一缓存删除配置"""
self._ensure_initialized()
return await self._config_cache_manager.delete(key)
# ============ 使用统计管理 ============
async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool:
"""更新使用统计(使用统一缓存)"""
self._ensure_initialized()
start_time = time.time()
try:
# 获取现有数据或创建新数据
existing_data = await self._credentials_cache_manager.get(filename, {})
if not existing_data:
existing_data = {
"credential": {},
"state": self._get_default_state(),
"stats": self._get_default_stats()
}
# 更新统计数据
existing_data["stats"].update(stats_updates)
success = await self._credentials_cache_manager.set(filename, existing_data)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Updated usage stats in unified cache: {filename} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error updating usage stats {filename} in {operation_time:.3f}s: {e}")
return False
async def get_usage_stats(self, filename: str) -> Dict[str, Any]:
"""从统一缓存获取使用统计"""
self._ensure_initialized()
start_time = time.time()
try:
credential_entry = await self._credentials_cache_manager.get(filename)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
if credential_entry and "stats" in credential_entry:
log.debug(f"Retrieved usage stats from unified cache: {filename} in {operation_time:.3f}s")
return credential_entry["stats"]
else:
return self._get_default_stats()
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error getting usage stats {filename} in {operation_time:.3f}s: {e}")
return self._get_default_stats()
async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]:
"""从统一缓存获取所有使用统计"""
self._ensure_initialized()
start_time = time.time()
try:
all_data = await self._credentials_cache_manager.get_all()
stats = {}
for filename, cred_data in all_data.items():
if "stats" in cred_data:
stats[filename] = cred_data["stats"]
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Retrieved all usage stats from unified cache ({len(stats)}) in {operation_time:.3f}s")
return stats
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error getting all usage stats in {operation_time:.3f}s: {e}")
return {}