Spaces:
Sleeping
Sleeping
| """ | |
| Postgres数据库管理器,采用单行设计并兼容 UnifiedCacheManager。 | |
| 实现与 mongodb_manager.py 风格一致的接口(异步)。 | |
| 需要环境变量: POSTGRES_DSN (例如: postgresql://user:pass@host:port/dbname) | |
| """ | |
| import asyncio | |
| import os | |
| import time | |
| import json | |
| from datetime import datetime, timezone | |
| from typing import Dict, Any, List, Optional | |
| from collections import deque | |
| import asyncpg | |
| from log import log | |
| from .cache_manager import UnifiedCacheManager, CacheBackend | |
| class PostgresCacheBackend(CacheBackend): | |
| """Postgres缓存后端,数据存储为key, data(JSONB), updated_at | |
| 单行/单表设计:表名由管理器指定,每行以key区分。 | |
| """ | |
| def __init__(self, conn_pool, table_name: str, row_key: str): | |
| self._pool = conn_pool | |
| self._table_name = table_name | |
| self._row_key = row_key | |
| async def load_data(self) -> Dict[str, Any]: | |
| try: | |
| async with self._pool.acquire() as conn: | |
| row = await conn.fetchrow( | |
| f"SELECT data FROM {self._table_name} WHERE key = $1", | |
| self._row_key | |
| ) | |
| if row and row.get('data') is not None: | |
| data = row['data'] | |
| # JSONB字段返回JSON字符串,需要解析为字典 | |
| if isinstance(data, str): | |
| return json.loads(data) | |
| elif isinstance(data, dict): | |
| return data | |
| else: | |
| log.warning(f"Unexpected data type from JSONB field: {type(data)}") | |
| return {} | |
| return {} | |
| except Exception as e: | |
| log.error(f"Error loading data from Postgres row {self._row_key}: {e}") | |
| return {} | |
| async def write_data(self, data: Dict[str, Any]) -> bool: | |
| try: | |
| async with self._pool.acquire() as conn: | |
| await conn.execute( | |
| f"INSERT INTO {self._table_name}(key, data, updated_at) VALUES($1, $2::jsonb, $3)" | |
| " ON CONFLICT (key) DO UPDATE SET data = EXCLUDED.data, updated_at = EXCLUDED.updated_at", | |
| self._row_key, json.dumps(data, default=str), datetime.now(timezone.utc) | |
| ) | |
| return True | |
| except Exception as e: | |
| log.error(f"Error writing data to Postgres row {self._row_key}: {e}") | |
| return False | |
| class PostgresManager: | |
| """Postgres管理器。 | |
| 使用单表单行设计存储凭证和配置数据。 | |
| """ | |
| def __init__(self): | |
| self._pool: Optional[asyncpg.pool.Pool] = None | |
| self._initialized = False | |
| self._lock = asyncio.Lock() | |
| self._dsn = None | |
| self._table_name = 'unified_storage' | |
| self._operation_count = 0 | |
| self._operation_times = deque(maxlen=5000) | |
| self._credentials_cache_manager: Optional[UnifiedCacheManager] = None | |
| self._config_cache_manager: Optional[UnifiedCacheManager] = None | |
| self._credentials_row_key = 'all_credentials' | |
| self._config_row_key = 'config_data' | |
| self._write_delay = 1.0 | |
| self._cache_ttl = 300 | |
| async def initialize(self): | |
| async with self._lock: | |
| if self._initialized: | |
| return | |
| try: | |
| self._dsn = os.getenv('POSTGRES_DSN') | |
| if not self._dsn: | |
| raise ValueError('POSTGRES_DSN environment variable is required') | |
| self._pool = await asyncpg.create_pool(dsn=self._dsn, max_size=20, min_size=1) | |
| # 确保表存在 | |
| await self._ensure_table() | |
| # 创建缓存管理器后端 | |
| credentials_backend = PostgresCacheBackend(self._pool, self._table_name, self._credentials_row_key) | |
| config_backend = PostgresCacheBackend(self._pool, self._table_name, self._config_row_key) | |
| self._credentials_cache_manager = UnifiedCacheManager( | |
| credentials_backend, cache_ttl=self._cache_ttl, write_delay=self._write_delay, name='credentials' | |
| ) | |
| self._config_cache_manager = UnifiedCacheManager( | |
| config_backend, cache_ttl=self._cache_ttl, write_delay=self._write_delay, name='config' | |
| ) | |
| await self._credentials_cache_manager.start() | |
| await self._config_cache_manager.start() | |
| self._initialized = True | |
| log.info('Postgres connection established with unified cache') | |
| except Exception as e: | |
| log.error(f'Error initializing Postgres: {e}') | |
| raise | |
| async def _ensure_table(self): | |
| try: | |
| async with self._pool.acquire() as conn: | |
| await conn.execute( | |
| f"CREATE TABLE IF NOT EXISTS {self._table_name}(\n key TEXT PRIMARY KEY,\n data JSONB,\n updated_at TIMESTAMPTZ\n )" | |
| ) | |
| except Exception as e: | |
| log.error(f'Error ensuring Postgres table: {e}') | |
| raise | |
| async def close(self): | |
| if self._credentials_cache_manager: | |
| await self._credentials_cache_manager.stop() | |
| if self._config_cache_manager: | |
| await self._config_cache_manager.stop() | |
| if self._pool: | |
| await self._pool.close() | |
| self._initialized = False | |
| log.info('Postgres connection closed with unified cache flushed') | |
| def _ensure_initialized(self): | |
| if not self._initialized: | |
| raise RuntimeError('Postgres manager not initialized') | |
| def _get_default_state(self) -> Dict[str, Any]: | |
| return { | |
| 'error_codes': [], | |
| 'disabled': False, | |
| 'last_success': time.time(), | |
| 'user_email': None, | |
| } | |
| def _get_default_stats(self) -> Dict[str, Any]: | |
| return { | |
| 'gemini_2_5_pro_calls': 0, | |
| 'total_calls': 0, | |
| 'next_reset_time': None, | |
| 'daily_limit_gemini_2_5_pro': 100, | |
| 'daily_limit_total': 1000 | |
| } | |
| # 以下方法委托给 UnifiedCacheManager | |
| async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool: | |
| self._ensure_initialized() | |
| start_time = time.time() | |
| try: | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| credential_entry = { | |
| 'credential': credential_data, | |
| 'state': existing_data.get('state', self._get_default_state()), | |
| 'stats': existing_data.get('stats', self._get_default_stats()) | |
| } | |
| success = await self._credentials_cache_manager.set(filename, credential_entry) | |
| self._operation_count += 1 | |
| self._operation_times.append(time.time() - start_time) | |
| log.debug(f'Stored credential to unified cache (postgres): {filename}') | |
| return success | |
| except Exception as e: | |
| log.error(f'Error storing credential {filename} in Postgres: {e}') | |
| return False | |
| async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]: | |
| self._ensure_initialized() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| self._operation_count += 1 | |
| if credential_entry and 'credential' in credential_entry: | |
| return credential_entry['credential'] | |
| return None | |
| except Exception as e: | |
| log.error(f'Error retrieving credential {filename} from Postgres: {e}') | |
| return None | |
| async def list_credentials(self) -> List[str]: | |
| self._ensure_initialized() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| return list(all_data.keys()) | |
| except Exception as e: | |
| log.error(f'Error listing credentials from Postgres: {e}') | |
| return [] | |
| async def delete_credential(self, filename: str) -> bool: | |
| self._ensure_initialized() | |
| try: | |
| return await self._credentials_cache_manager.delete(filename) | |
| except Exception as e: | |
| log.error(f'Error deleting credential {filename} from Postgres: {e}') | |
| return False | |
| async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool: | |
| self._ensure_initialized() | |
| try: | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| if not existing_data: | |
| existing_data = {'credential': {}, 'state': self._get_default_state(), 'stats': self._get_default_stats()} | |
| existing_data['state'].update(state_updates) | |
| return await self._credentials_cache_manager.set(filename, existing_data) | |
| except Exception as e: | |
| log.error(f'Error updating credential state {filename} in Postgres: {e}') | |
| return False | |
| async def get_credential_state(self, filename: str) -> Dict[str, Any]: | |
| self._ensure_initialized() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| if credential_entry and 'state' in credential_entry: | |
| return credential_entry['state'] | |
| return self._get_default_state() | |
| except Exception as e: | |
| log.error(f'Error getting credential state {filename} from Postgres: {e}') | |
| return self._get_default_state() | |
| async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]: | |
| self._ensure_initialized() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| states = {fn: data.get('state', self._get_default_state()) for fn, data in all_data.items()} | |
| return states | |
| except Exception as e: | |
| log.error(f'Error getting all credential states from Postgres: {e}') | |
| return {} | |
| async def set_config(self, key: str, value: Any) -> bool: | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.set(key, value) | |
| async def get_config(self, key: str, default: Any = None) -> Any: | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.get(key, default) | |
| async def get_all_config(self) -> Dict[str, Any]: | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.get_all() | |
| async def delete_config(self, key: str) -> bool: | |
| self._ensure_initialized() | |
| return await self._config_cache_manager.delete(key) | |
| async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool: | |
| self._ensure_initialized() | |
| try: | |
| existing_data = await self._credentials_cache_manager.get(filename, {}) | |
| if not existing_data: | |
| existing_data = {'credential': {}, 'state': self._get_default_state(), 'stats': self._get_default_stats()} | |
| existing_data['stats'].update(stats_updates) | |
| return await self._credentials_cache_manager.set(filename, existing_data) | |
| except Exception as e: | |
| log.error(f'Error updating usage stats for {filename} in Postgres: {e}') | |
| return False | |
| async def get_usage_stats(self, filename: str) -> Dict[str, Any]: | |
| self._ensure_initialized() | |
| try: | |
| credential_entry = await self._credentials_cache_manager.get(filename) | |
| if credential_entry and 'stats' in credential_entry: | |
| return credential_entry['stats'] | |
| return self._get_default_stats() | |
| except Exception as e: | |
| log.error(f'Error getting usage stats for {filename} from Postgres: {e}') | |
| return self._get_default_stats() | |
| async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]: | |
| self._ensure_initialized() | |
| try: | |
| all_data = await self._credentials_cache_manager.get_all() | |
| stats = {fn: data.get('stats', self._get_default_stats()) for fn, data in all_data.items()} | |
| return stats | |
| except Exception as e: | |
| log.error(f'Error getting all usage stats from Postgres: {e}') | |
| return {} | |