gcli2api / src /storage /mongodb_manager.py
lightspeed's picture
Upload 22 files
5868187 verified
"""
MongoDB数据库管理器,使用单文档设计和统一缓存。
所有凭证数据存储在一个文档中,配置数据存储在另一个文档中,类似TOML文件结构。
"""
import asyncio
import os
import time
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
from collections import deque
import motor.motor_asyncio
from log import log
from .cache_manager import UnifiedCacheManager, CacheBackend
class MongoDBCacheBackend(CacheBackend):
"""MongoDB缓存后端实现"""
def __init__(self, db, collection_name: str, doc_key: str):
self._db = db
self._collection_name = collection_name
self._doc_key = doc_key
async def load_data(self) -> Dict[str, Any]:
"""从MongoDB文档加载数据"""
try:
collection = self._db[self._collection_name]
doc = await collection.find_one({"key": self._doc_key})
if doc and "data" in doc:
return doc["data"]
return {}
except Exception as e:
log.error(f"Error loading data from MongoDB document {self._doc_key}: {e}")
return {}
async def write_data(self, data: Dict[str, Any]) -> bool:
"""将数据写入MongoDB文档"""
try:
collection = self._db[self._collection_name]
doc = {
"key": self._doc_key,
"data": data,
"updated_at": datetime.now(timezone.utc)
}
await collection.replace_one(
{"key": self._doc_key},
doc,
upsert=True
)
return True
except Exception as e:
log.error(f"Error writing data to MongoDB document {self._doc_key}: {e}")
return False
class MongoDBManager:
"""MongoDB数据库管理器"""
def __init__(self):
self._client: Optional[motor.motor_asyncio.AsyncIOMotorClient] = None
self._db: Optional[motor.motor_asyncio.AsyncIOMotorDatabase] = None
self._initialized = False
self._lock = asyncio.Lock()
# 配置
self._connection_uri = None
self._database_name = None
# 单文档设计 - 所有凭证存在一个文档中(类似TOML文件)
self._collection_name = "credentials_data"
# 性能监控
self._operation_count = 0
self._operation_times = deque(maxlen=5000)
# 统一缓存管理器
self._credentials_cache_manager: Optional[UnifiedCacheManager] = None
self._config_cache_manager: Optional[UnifiedCacheManager] = None
# 文档key定义
self._credentials_doc_key = "all_credentials"
self._config_doc_key = "config_data"
# 写入配置参数
self._write_delay = 1.0 # 写入延迟(秒)
self._cache_ttl = 300 # 缓存TTL(秒)
async def initialize(self):
"""初始化MongoDB连接"""
async with self._lock:
if self._initialized:
return
try:
# 获取连接配置
self._connection_uri = os.getenv("MONGODB_URI")
self._database_name = os.getenv("MONGODB_DATABASE", "gcli2api")
if not self._connection_uri:
raise ValueError("MONGODB_URI environment variable is required")
# 建立连接
self._client = motor.motor_asyncio.AsyncIOMotorClient(
self._connection_uri,
serverSelectionTimeoutMS=5000,
maxPoolSize=100,
minPoolSize=10,
maxIdleTimeMS=45000,
waitQueueTimeoutMS=10000,
)
# 验证连接
await self._client.admin.command('ping')
# 获取数据库
self._db = self._client[self._database_name]
# 创建索引
await self._create_indexes()
# 创建缓存管理器
credentials_backend = MongoDBCacheBackend(self._db, self._collection_name, self._credentials_doc_key)
config_backend = MongoDBCacheBackend(self._db, self._collection_name, self._config_doc_key)
self._credentials_cache_manager = UnifiedCacheManager(
credentials_backend,
cache_ttl=self._cache_ttl,
write_delay=self._write_delay,
name="credentials"
)
self._config_cache_manager = UnifiedCacheManager(
config_backend,
cache_ttl=self._cache_ttl,
write_delay=self._write_delay,
name="config"
)
# 启动缓存管理器
await self._credentials_cache_manager.start()
await self._config_cache_manager.start()
self._initialized = True
log.info(f"MongoDB connection established to {self._database_name} with unified cache")
except Exception as e:
log.error(f"Error initializing MongoDB: {e}")
raise
async def _create_indexes(self):
"""创建简单索引(单文档设计)"""
try:
# 单文档设计只需要主键索引
await self._db[self._collection_name].create_index("key", unique=True)
await self._db[self._collection_name].create_index("updated_at")
log.info("MongoDB indexes created for single-document design")
except Exception as e:
log.error(f"Error creating MongoDB indexes: {e}")
async def close(self):
"""关闭MongoDB连接"""
# 停止缓存管理器
if self._credentials_cache_manager:
await self._credentials_cache_manager.stop()
if self._config_cache_manager:
await self._config_cache_manager.stop()
if self._client:
self._client.close()
self._initialized = False
log.info("MongoDB connection closed with unified cache flushed")
def _ensure_initialized(self):
"""确保已初始化"""
if not self._initialized:
raise RuntimeError("MongoDB manager not initialized")
def _get_default_state(self) -> Dict[str, Any]:
"""获取默认状态数据"""
return {
"error_codes": [],
"disabled": False,
"last_success": time.time(),
"user_email": None,
}
def _get_default_stats(self) -> Dict[str, Any]:
"""获取默认统计数据"""
return {
"gemini_2_5_pro_calls": 0,
"total_calls": 0,
"next_reset_time": None,
"daily_limit_gemini_2_5_pro": 100,
"daily_limit_total": 1000
}
# ============ 凭证管理 ============
async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool:
"""存储凭证数据到统一缓存"""
self._ensure_initialized()
start_time = time.time()
try:
# 获取现有数据或创建新数据
existing_data = await self._credentials_cache_manager.get(filename, {})
credential_entry = {
"credential": credential_data,
"state": existing_data.get("state", self._get_default_state()),
"stats": existing_data.get("stats", self._get_default_stats())
}
success = await self._credentials_cache_manager.set(filename, credential_entry)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Stored credential to unified cache: {filename} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error storing credential {filename} in {operation_time:.3f}s: {e}")
return False
async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]:
"""从统一缓存获取凭证数据"""
self._ensure_initialized()
start_time = time.time()
try:
credential_entry = await self._credentials_cache_manager.get(filename)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
if credential_entry and "credential" in credential_entry:
return credential_entry["credential"]
return None
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error retrieving credential {filename} in {operation_time:.3f}s: {e}")
return None
async def list_credentials(self) -> List[str]:
"""从统一缓存列出所有凭证文件名"""
self._ensure_initialized()
start_time = time.time()
try:
all_data = await self._credentials_cache_manager.get_all()
filenames = list(all_data.keys())
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Listed {len(filenames)} credentials from unified cache in {operation_time:.3f}s")
return filenames
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error listing credentials in {operation_time:.3f}s: {e}")
return []
async def delete_credential(self, filename: str) -> bool:
"""从统一缓存删除凭证及所有相关数据"""
self._ensure_initialized()
start_time = time.time()
try:
success = await self._credentials_cache_manager.delete(filename)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Deleted credential from unified cache: {filename} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error deleting credential {filename} in {operation_time:.3f}s: {e}")
return False
# ============ 状态管理 ============
async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool:
"""更新凭证状态(使用统一缓存)"""
self._ensure_initialized()
start_time = time.time()
try:
# 获取现有数据或创建新数据
existing_data = await self._credentials_cache_manager.get(filename, {})
if not existing_data:
existing_data = {
"credential": {},
"state": self._get_default_state(),
"stats": self._get_default_stats()
}
# 更新状态数据
existing_data["state"].update(state_updates)
success = await self._credentials_cache_manager.set(filename, existing_data)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Updated credential state in unified cache: {filename} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error updating credential state {filename} in {operation_time:.3f}s: {e}")
return False
async def get_credential_state(self, filename: str) -> Dict[str, Any]:
"""从统一缓存获取凭证状态"""
self._ensure_initialized()
start_time = time.time()
try:
credential_entry = await self._credentials_cache_manager.get(filename)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
if credential_entry and "state" in credential_entry:
log.debug(f"Retrieved credential state from unified cache: {filename} in {operation_time:.3f}s")
return credential_entry["state"]
else:
# 返回默认状态
return self._get_default_state()
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error getting credential state {filename} in {operation_time:.3f}s: {e}")
return self._get_default_state()
async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]:
"""从统一缓存获取所有凭证状态"""
self._ensure_initialized()
start_time = time.time()
try:
all_data = await self._credentials_cache_manager.get_all()
states = {}
for filename, cred_data in all_data.items():
states[filename] = cred_data.get("state", self._get_default_state())
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Retrieved all credential states from unified cache ({len(states)}) in {operation_time:.3f}s")
return states
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error getting all credential states in {operation_time:.3f}s: {e}")
return {}
# ============ 配置管理 ============
async def set_config(self, key: str, value: Any) -> bool:
"""设置配置到统一缓存"""
self._ensure_initialized()
return await self._config_cache_manager.set(key, value)
async def get_config(self, key: str, default: Any = None) -> Any:
"""从统一缓存获取配置"""
self._ensure_initialized()
return await self._config_cache_manager.get(key, default)
async def get_all_config(self) -> Dict[str, Any]:
"""从统一缓存获取所有配置"""
self._ensure_initialized()
return await self._config_cache_manager.get_all()
async def delete_config(self, key: str) -> bool:
"""从统一缓存删除配置"""
self._ensure_initialized()
return await self._config_cache_manager.delete(key)
# ============ 使用统计管理 ============
async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool:
"""更新使用统计(使用统一缓存)"""
self._ensure_initialized()
start_time = time.time()
try:
# 获取现有数据或创建新数据
existing_data = await self._credentials_cache_manager.get(filename, {})
if not existing_data:
existing_data = {
"credential": {},
"state": self._get_default_state(),
"stats": self._get_default_stats()
}
# 更新统计数据
existing_data["stats"].update(stats_updates)
success = await self._credentials_cache_manager.set(filename, existing_data)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Updated usage stats in unified cache: {filename} in {operation_time:.3f}s")
return success
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error updating usage stats {filename} in {operation_time:.3f}s: {e}")
return False
async def get_usage_stats(self, filename: str) -> Dict[str, Any]:
"""从统一缓存获取使用统计"""
self._ensure_initialized()
start_time = time.time()
try:
credential_entry = await self._credentials_cache_manager.get(filename)
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
if credential_entry and "stats" in credential_entry:
log.debug(f"Retrieved usage stats from unified cache: {filename} in {operation_time:.3f}s")
return credential_entry["stats"]
else:
return self._get_default_stats()
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error getting usage stats {filename} in {operation_time:.3f}s: {e}")
return self._get_default_stats()
async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]:
"""从统一缓存获取所有使用统计"""
self._ensure_initialized()
start_time = time.time()
try:
all_data = await self._credentials_cache_manager.get_all()
stats = {}
for filename, cred_data in all_data.items():
if "stats" in cred_data:
stats[filename] = cred_data["stats"]
# 性能监控
self._operation_count += 1
operation_time = time.time() - start_time
self._operation_times.append(operation_time)
log.debug(f"Retrieved all usage stats from unified cache ({len(stats)}) in {operation_time:.3f}s")
return stats
except Exception as e:
operation_time = time.time() - start_time
log.error(f"Error getting all usage stats in {operation_time:.3f}s: {e}")
return {}