gcli2api / src /storage_adapter.py
lightspeed's picture
Upload 22 files
5868187 verified
"""
存储适配器,提供统一的接口来处理Redis、MongoDB和本地文件存储。
根据配置自动选择存储后端,优先级:Redis > MongoDB > 本地文件。
"""
import asyncio
import os
import json
from typing import Dict, Any, List, Optional, Protocol
from log import log
class StorageBackend(Protocol):
"""存储后端协议"""
async def initialize(self) -> None:
"""初始化存储后端"""
...
async def close(self) -> None:
"""关闭存储后端"""
...
# 凭证管理
async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool:
"""存储凭证数据"""
...
async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]:
"""获取凭证数据"""
...
async def list_credentials(self) -> List[str]:
"""列出所有凭证文件名"""
...
async def delete_credential(self, filename: str) -> bool:
"""删除凭证"""
...
# 状态管理
async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool:
"""更新凭证状态"""
...
async def get_credential_state(self, filename: str) -> Dict[str, Any]:
"""获取凭证状态"""
...
async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]:
"""获取所有凭证状态"""
...
# 配置管理
async def set_config(self, key: str, value: Any) -> bool:
"""设置配置项"""
...
async def get_config(self, key: str, default: Any = None) -> Any:
"""获取配置项"""
...
async def get_all_config(self) -> Dict[str, Any]:
"""获取所有配置"""
...
async def delete_config(self, key: str) -> bool:
"""删除配置项"""
...
# 使用统计管理
async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool:
"""更新使用统计"""
...
async def get_usage_stats(self, filename: str) -> Dict[str, Any]:
"""获取使用统计"""
...
async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]:
"""获取所有使用统计"""
...
class StorageAdapter:
"""存储适配器,根据配置选择存储后端"""
def __init__(self):
self._backend: Optional["StorageBackend"] = None
self._initialized = False
self._lock = asyncio.Lock()
async def initialize(self) -> None:
"""初始化存储适配器"""
async with self._lock:
if self._initialized:
return
# 按优先级检查存储后端:Redis > MongoDB > 本地文件
redis_uri = os.getenv("REDIS_URI", "")
mongodb_uri = os.getenv("MONGODB_URI", "")
# 优先尝试Redis存储
if redis_uri:
try:
from .storage.redis_manager import RedisManager
self._backend = RedisManager()
await self._backend.initialize()
log.info("Using Redis storage backend")
except ImportError as e:
log.error(f"Failed to import Redis backend: {e}")
log.info("Falling back to next available storage backend")
except Exception as e:
log.error(f"Failed to initialize Redis backend: {e}")
log.info("Falling back to next available storage backend")
# 如果Redis不可用或未配置,接下来尝试Postgres(优先级低于Redis)
postgres_dsn = os.getenv("POSTGRES_DSN", "")
if not self._backend and postgres_dsn:
try:
from .storage.postgres_manager import PostgresManager
self._backend = PostgresManager()
await self._backend.initialize()
log.info("Using Postgres storage backend")
except ImportError as e:
log.error(f"Failed to import Postgres backend: {e}")
log.info("Falling back to next available storage backend")
except Exception as e:
log.error(f"Failed to initialize Postgres backend: {e}")
log.info("Falling back to next available storage backend")
# 如果Redis和Postgres不可用,尝试MongoDB存储
if not self._backend and mongodb_uri:
try:
from .storage.mongodb_manager import MongoDBManager
self._backend = MongoDBManager()
await self._backend.initialize()
log.info("Using MongoDB storage backend")
except ImportError as e:
log.error(f"Failed to import MongoDB backend: {e}")
log.info("Falling back to file storage backend")
except Exception as e:
log.error(f"Failed to initialize MongoDB backend: {e}")
log.info("Falling back to file storage backend")
# 如果Redis和MongoDB都不可用,使用文件存储
if not self._backend:
from .storage.file_storage_manager import FileStorageManager
self._backend = FileStorageManager()
await self._backend.initialize()
log.info("Using file storage backend")
self._initialized = True
async def close(self) -> None:
"""关闭存储适配器"""
if self._backend:
await self._backend.close()
self._backend = None
self._initialized = False
def _ensure_initialized(self):
"""确保存储适配器已初始化"""
if not self._initialized or not self._backend:
raise RuntimeError("Storage adapter not initialized")
# ============ 凭证管理 ============
async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool:
"""存储凭证数据"""
self._ensure_initialized()
return await self._backend.store_credential(filename, credential_data)
async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]:
"""获取凭证数据"""
self._ensure_initialized()
return await self._backend.get_credential(filename)
async def list_credentials(self) -> List[str]:
"""列出所有凭证文件名"""
self._ensure_initialized()
return await self._backend.list_credentials()
async def delete_credential(self, filename: str) -> bool:
"""删除凭证"""
self._ensure_initialized()
return await self._backend.delete_credential(filename)
# ============ 状态管理 ============
async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool:
"""更新凭证状态"""
self._ensure_initialized()
return await self._backend.update_credential_state(filename, state_updates)
async def get_credential_state(self, filename: str) -> Dict[str, Any]:
"""获取凭证状态"""
self._ensure_initialized()
return await self._backend.get_credential_state(filename)
async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]:
"""获取所有凭证状态"""
self._ensure_initialized()
return await self._backend.get_all_credential_states()
# ============ 配置管理 ============
async def set_config(self, key: str, value: Any) -> bool:
"""设置配置项"""
self._ensure_initialized()
return await self._backend.set_config(key, value)
async def get_config(self, key: str, default: Any = None) -> Any:
"""获取配置项"""
self._ensure_initialized()
return await self._backend.get_config(key, default)
async def get_all_config(self) -> Dict[str, Any]:
"""获取所有配置"""
self._ensure_initialized()
return await self._backend.get_all_config()
async def delete_config(self, key: str) -> bool:
"""删除配置项"""
self._ensure_initialized()
return await self._backend.delete_config(key)
# ============ 使用统计管理 ============
async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool:
"""更新使用统计"""
self._ensure_initialized()
return await self._backend.update_usage_stats(filename, stats_updates)
async def get_usage_stats(self, filename: str) -> Dict[str, Any]:
"""获取使用统计"""
self._ensure_initialized()
return await self._backend.get_usage_stats(filename)
async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]:
"""获取所有使用统计"""
self._ensure_initialized()
return await self._backend.get_all_usage_stats()
# ============ 工具方法 ============
async def export_credential_to_json(self, filename: str, output_path: str = None) -> bool:
"""将凭证导出为JSON文件"""
self._ensure_initialized()
if hasattr(self._backend, 'export_credential_to_json'):
return await self._backend.export_credential_to_json(filename, output_path)
# MongoDB后端的fallback实现
credential_data = await self.get_credential(filename)
if credential_data is None:
return False
if output_path is None:
output_path = f"{filename}.json"
import aiofiles
try:
async with aiofiles.open(output_path, "w", encoding="utf-8") as f:
await f.write(json.dumps(credential_data, indent=2, ensure_ascii=False))
return True
except Exception:
return False
async def import_credential_from_json(self, json_path: str, filename: str = None) -> bool:
"""从JSON文件导入凭证"""
self._ensure_initialized()
if hasattr(self._backend, 'import_credential_from_json'):
return await self._backend.import_credential_from_json(json_path, filename)
# MongoDB后端的fallback实现
try:
import aiofiles
async with aiofiles.open(json_path, "r", encoding="utf-8") as f:
content = await f.read()
credential_data = json.loads(content)
if filename is None:
filename = os.path.basename(json_path)
return await self.store_credential(filename, credential_data)
except Exception:
return False
def get_backend_type(self) -> str:
"""获取当前存储后端类型"""
if not self._backend:
return "none"
# 检查后端类型
backend_class_name = self._backend.__class__.__name__
if "File" in backend_class_name or "file" in backend_class_name.lower():
return "file"
elif "MongoDB" in backend_class_name or "mongo" in backend_class_name.lower():
return "mongodb"
elif "Redis" in backend_class_name or "redis" in backend_class_name.lower():
return "redis"
else:
return "unknown"
async def get_backend_info(self) -> Dict[str, Any]:
"""获取存储后端信息"""
self._ensure_initialized()
backend_type = self.get_backend_type()
info = {
"backend_type": backend_type,
"initialized": self._initialized
}
# 获取底层存储信息
if hasattr(self._backend, 'get_database_info'):
try:
db_info = await self._backend.get_database_info()
info.update(db_info)
except Exception as e:
info["database_error"] = str(e)
else:
backend_type = self.get_backend_type()
if backend_type == "file":
info.update({
"credentials_dir": getattr(self._backend, '_credentials_dir', None),
"state_file": getattr(self._backend, '_state_file', None),
"config_file": getattr(self._backend, '_config_file', None)
})
elif backend_type == "redis":
info.update({
"redis_url": getattr(self._backend, '_redis_url', None),
"connection_pool_size": getattr(self._backend, '_pool_size', None)
})
return info
# 全局存储适配器实例
_storage_adapter: Optional[StorageAdapter] = None
async def get_storage_adapter() -> StorageAdapter:
"""获取全局存储适配器实例"""
global _storage_adapter
if _storage_adapter is None:
_storage_adapter = StorageAdapter()
await _storage_adapter.initialize()
return _storage_adapter
async def close_storage_adapter():
"""关闭全局存储适配器"""
global _storage_adapter
if _storage_adapter:
await _storage_adapter.close()
_storage_adapter = None