Spaces:
Sleeping
Sleeping
| """ | |
| 存储适配器,提供统一的接口来处理Redis、MongoDB和本地文件存储。 | |
| 根据配置自动选择存储后端,优先级:Redis > MongoDB > 本地文件。 | |
| """ | |
| import asyncio | |
| import os | |
| import json | |
| from typing import Dict, Any, List, Optional, Protocol | |
| from log import log | |
| class StorageBackend(Protocol): | |
| """存储后端协议""" | |
| async def initialize(self) -> None: | |
| """初始化存储后端""" | |
| ... | |
| async def close(self) -> None: | |
| """关闭存储后端""" | |
| ... | |
| # 凭证管理 | |
| async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool: | |
| """存储凭证数据""" | |
| ... | |
| async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]: | |
| """获取凭证数据""" | |
| ... | |
| async def list_credentials(self) -> List[str]: | |
| """列出所有凭证文件名""" | |
| ... | |
| async def delete_credential(self, filename: str) -> bool: | |
| """删除凭证""" | |
| ... | |
| # 状态管理 | |
| async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool: | |
| """更新凭证状态""" | |
| ... | |
| async def get_credential_state(self, filename: str) -> Dict[str, Any]: | |
| """获取凭证状态""" | |
| ... | |
| async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]: | |
| """获取所有凭证状态""" | |
| ... | |
| # 配置管理 | |
| async def set_config(self, key: str, value: Any) -> bool: | |
| """设置配置项""" | |
| ... | |
| async def get_config(self, key: str, default: Any = None) -> Any: | |
| """获取配置项""" | |
| ... | |
| async def get_all_config(self) -> Dict[str, Any]: | |
| """获取所有配置""" | |
| ... | |
| async def delete_config(self, key: str) -> bool: | |
| """删除配置项""" | |
| ... | |
| # 使用统计管理 | |
| async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool: | |
| """更新使用统计""" | |
| ... | |
| async def get_usage_stats(self, filename: str) -> Dict[str, Any]: | |
| """获取使用统计""" | |
| ... | |
| async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]: | |
| """获取所有使用统计""" | |
| ... | |
| class StorageAdapter: | |
| """存储适配器,根据配置选择存储后端""" | |
| def __init__(self): | |
| self._backend: Optional["StorageBackend"] = None | |
| self._initialized = False | |
| self._lock = asyncio.Lock() | |
| async def initialize(self) -> None: | |
| """初始化存储适配器""" | |
| async with self._lock: | |
| if self._initialized: | |
| return | |
| # 按优先级检查存储后端:Redis > MongoDB > 本地文件 | |
| redis_uri = os.getenv("REDIS_URI", "") | |
| mongodb_uri = os.getenv("MONGODB_URI", "") | |
| # 优先尝试Redis存储 | |
| if redis_uri: | |
| try: | |
| from .storage.redis_manager import RedisManager | |
| self._backend = RedisManager() | |
| await self._backend.initialize() | |
| log.info("Using Redis storage backend") | |
| except ImportError as e: | |
| log.error(f"Failed to import Redis backend: {e}") | |
| log.info("Falling back to next available storage backend") | |
| except Exception as e: | |
| log.error(f"Failed to initialize Redis backend: {e}") | |
| log.info("Falling back to next available storage backend") | |
| # 如果Redis不可用或未配置,接下来尝试Postgres(优先级低于Redis) | |
| postgres_dsn = os.getenv("POSTGRES_DSN", "") | |
| if not self._backend and postgres_dsn: | |
| try: | |
| from .storage.postgres_manager import PostgresManager | |
| self._backend = PostgresManager() | |
| await self._backend.initialize() | |
| log.info("Using Postgres storage backend") | |
| except ImportError as e: | |
| log.error(f"Failed to import Postgres backend: {e}") | |
| log.info("Falling back to next available storage backend") | |
| except Exception as e: | |
| log.error(f"Failed to initialize Postgres backend: {e}") | |
| log.info("Falling back to next available storage backend") | |
| # 如果Redis和Postgres不可用,尝试MongoDB存储 | |
| if not self._backend and mongodb_uri: | |
| try: | |
| from .storage.mongodb_manager import MongoDBManager | |
| self._backend = MongoDBManager() | |
| await self._backend.initialize() | |
| log.info("Using MongoDB storage backend") | |
| except ImportError as e: | |
| log.error(f"Failed to import MongoDB backend: {e}") | |
| log.info("Falling back to file storage backend") | |
| except Exception as e: | |
| log.error(f"Failed to initialize MongoDB backend: {e}") | |
| log.info("Falling back to file storage backend") | |
| # 如果Redis和MongoDB都不可用,使用文件存储 | |
| if not self._backend: | |
| from .storage.file_storage_manager import FileStorageManager | |
| self._backend = FileStorageManager() | |
| await self._backend.initialize() | |
| log.info("Using file storage backend") | |
| self._initialized = True | |
| async def close(self) -> None: | |
| """关闭存储适配器""" | |
| if self._backend: | |
| await self._backend.close() | |
| self._backend = None | |
| self._initialized = False | |
| def _ensure_initialized(self): | |
| """确保存储适配器已初始化""" | |
| if not self._initialized or not self._backend: | |
| raise RuntimeError("Storage adapter not initialized") | |
| # ============ 凭证管理 ============ | |
| async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool: | |
| """存储凭证数据""" | |
| self._ensure_initialized() | |
| return await self._backend.store_credential(filename, credential_data) | |
| async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]: | |
| """获取凭证数据""" | |
| self._ensure_initialized() | |
| return await self._backend.get_credential(filename) | |
| async def list_credentials(self) -> List[str]: | |
| """列出所有凭证文件名""" | |
| self._ensure_initialized() | |
| return await self._backend.list_credentials() | |
| async def delete_credential(self, filename: str) -> bool: | |
| """删除凭证""" | |
| self._ensure_initialized() | |
| return await self._backend.delete_credential(filename) | |
| # ============ 状态管理 ============ | |
| async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool: | |
| """更新凭证状态""" | |
| self._ensure_initialized() | |
| return await self._backend.update_credential_state(filename, state_updates) | |
| async def get_credential_state(self, filename: str) -> Dict[str, Any]: | |
| """获取凭证状态""" | |
| self._ensure_initialized() | |
| return await self._backend.get_credential_state(filename) | |
| async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]: | |
| """获取所有凭证状态""" | |
| self._ensure_initialized() | |
| return await self._backend.get_all_credential_states() | |
| # ============ 配置管理 ============ | |
| async def set_config(self, key: str, value: Any) -> bool: | |
| """设置配置项""" | |
| self._ensure_initialized() | |
| return await self._backend.set_config(key, value) | |
| async def get_config(self, key: str, default: Any = None) -> Any: | |
| """获取配置项""" | |
| self._ensure_initialized() | |
| return await self._backend.get_config(key, default) | |
| async def get_all_config(self) -> Dict[str, Any]: | |
| """获取所有配置""" | |
| self._ensure_initialized() | |
| return await self._backend.get_all_config() | |
| async def delete_config(self, key: str) -> bool: | |
| """删除配置项""" | |
| self._ensure_initialized() | |
| return await self._backend.delete_config(key) | |
| # ============ 使用统计管理 ============ | |
| async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool: | |
| """更新使用统计""" | |
| self._ensure_initialized() | |
| return await self._backend.update_usage_stats(filename, stats_updates) | |
| async def get_usage_stats(self, filename: str) -> Dict[str, Any]: | |
| """获取使用统计""" | |
| self._ensure_initialized() | |
| return await self._backend.get_usage_stats(filename) | |
| async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]: | |
| """获取所有使用统计""" | |
| self._ensure_initialized() | |
| return await self._backend.get_all_usage_stats() | |
| # ============ 工具方法 ============ | |
| async def export_credential_to_json(self, filename: str, output_path: str = None) -> bool: | |
| """将凭证导出为JSON文件""" | |
| self._ensure_initialized() | |
| if hasattr(self._backend, 'export_credential_to_json'): | |
| return await self._backend.export_credential_to_json(filename, output_path) | |
| # MongoDB后端的fallback实现 | |
| credential_data = await self.get_credential(filename) | |
| if credential_data is None: | |
| return False | |
| if output_path is None: | |
| output_path = f"{filename}.json" | |
| import aiofiles | |
| try: | |
| async with aiofiles.open(output_path, "w", encoding="utf-8") as f: | |
| await f.write(json.dumps(credential_data, indent=2, ensure_ascii=False)) | |
| return True | |
| except Exception: | |
| return False | |
| async def import_credential_from_json(self, json_path: str, filename: str = None) -> bool: | |
| """从JSON文件导入凭证""" | |
| self._ensure_initialized() | |
| if hasattr(self._backend, 'import_credential_from_json'): | |
| return await self._backend.import_credential_from_json(json_path, filename) | |
| # MongoDB后端的fallback实现 | |
| try: | |
| import aiofiles | |
| async with aiofiles.open(json_path, "r", encoding="utf-8") as f: | |
| content = await f.read() | |
| credential_data = json.loads(content) | |
| if filename is None: | |
| filename = os.path.basename(json_path) | |
| return await self.store_credential(filename, credential_data) | |
| except Exception: | |
| return False | |
| def get_backend_type(self) -> str: | |
| """获取当前存储后端类型""" | |
| if not self._backend: | |
| return "none" | |
| # 检查后端类型 | |
| backend_class_name = self._backend.__class__.__name__ | |
| if "File" in backend_class_name or "file" in backend_class_name.lower(): | |
| return "file" | |
| elif "MongoDB" in backend_class_name or "mongo" in backend_class_name.lower(): | |
| return "mongodb" | |
| elif "Redis" in backend_class_name or "redis" in backend_class_name.lower(): | |
| return "redis" | |
| else: | |
| return "unknown" | |
| async def get_backend_info(self) -> Dict[str, Any]: | |
| """获取存储后端信息""" | |
| self._ensure_initialized() | |
| backend_type = self.get_backend_type() | |
| info = { | |
| "backend_type": backend_type, | |
| "initialized": self._initialized | |
| } | |
| # 获取底层存储信息 | |
| if hasattr(self._backend, 'get_database_info'): | |
| try: | |
| db_info = await self._backend.get_database_info() | |
| info.update(db_info) | |
| except Exception as e: | |
| info["database_error"] = str(e) | |
| else: | |
| backend_type = self.get_backend_type() | |
| if backend_type == "file": | |
| info.update({ | |
| "credentials_dir": getattr(self._backend, '_credentials_dir', None), | |
| "state_file": getattr(self._backend, '_state_file', None), | |
| "config_file": getattr(self._backend, '_config_file', None) | |
| }) | |
| elif backend_type == "redis": | |
| info.update({ | |
| "redis_url": getattr(self._backend, '_redis_url', None), | |
| "connection_pool_size": getattr(self._backend, '_pool_size', None) | |
| }) | |
| return info | |
| # 全局存储适配器实例 | |
| _storage_adapter: Optional[StorageAdapter] = None | |
| async def get_storage_adapter() -> StorageAdapter: | |
| """获取全局存储适配器实例""" | |
| global _storage_adapter | |
| if _storage_adapter is None: | |
| _storage_adapter = StorageAdapter() | |
| await _storage_adapter.initialize() | |
| return _storage_adapter | |
| async def close_storage_adapter(): | |
| """关闭全局存储适配器""" | |
| global _storage_adapter | |
| if _storage_adapter: | |
| await _storage_adapter.close() | |
| _storage_adapter = None | |