File size: 12,379 Bytes
5868187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
"""
Postgres数据库管理器,采用单行设计并兼容 UnifiedCacheManager。
实现与 mongodb_manager.py 风格一致的接口(异步)。
需要环境变量: POSTGRES_DSN (例如: postgresql://user:pass@host:port/dbname)
"""
import asyncio
import os
import time
import json
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
from collections import deque

import asyncpg
from log import log
from .cache_manager import UnifiedCacheManager, CacheBackend


class PostgresCacheBackend(CacheBackend):
    """Postgres缓存后端,数据存储为key, data(JSONB), updated_at
    单行/单表设计:表名由管理器指定,每行以key区分。
    """

    def __init__(self, conn_pool, table_name: str, row_key: str):
        self._pool = conn_pool
        self._table_name = table_name
        self._row_key = row_key

    async def load_data(self) -> Dict[str, Any]:
        try:
            async with self._pool.acquire() as conn:
                row = await conn.fetchrow(
                    f"SELECT data FROM {self._table_name} WHERE key = $1",
                    self._row_key
                )
                if row and row.get('data') is not None:
                    data = row['data']
                    # JSONB字段返回JSON字符串,需要解析为字典
                    if isinstance(data, str):
                        return json.loads(data)
                    elif isinstance(data, dict):
                        return data
                    else:
                        log.warning(f"Unexpected data type from JSONB field: {type(data)}")
                        return {}
                return {}
        except Exception as e:
            log.error(f"Error loading data from Postgres row {self._row_key}: {e}")
            return {}

    async def write_data(self, data: Dict[str, Any]) -> bool:
        try:
            async with self._pool.acquire() as conn:
                await conn.execute(
                    f"INSERT INTO {self._table_name}(key, data, updated_at) VALUES($1, $2::jsonb, $3)"
                    " ON CONFLICT (key) DO UPDATE SET data = EXCLUDED.data, updated_at = EXCLUDED.updated_at",
                    self._row_key, json.dumps(data, default=str), datetime.now(timezone.utc)
                )
                return True
        except Exception as e:
            log.error(f"Error writing data to Postgres row {self._row_key}: {e}")
            return False


class PostgresManager:
    """Postgres管理器。
    使用单表单行设计存储凭证和配置数据。
    """

    def __init__(self):
        self._pool: Optional[asyncpg.pool.Pool] = None
        self._initialized = False
        self._lock = asyncio.Lock()

        self._dsn = None
        self._table_name = 'unified_storage'

        self._operation_count = 0

        self._operation_times = deque(maxlen=5000)

        self._credentials_cache_manager: Optional[UnifiedCacheManager] = None
        self._config_cache_manager: Optional[UnifiedCacheManager] = None

        self._credentials_row_key = 'all_credentials'
        self._config_row_key = 'config_data'

        self._write_delay = 1.0
        self._cache_ttl = 300

    async def initialize(self):
        async with self._lock:
            if self._initialized:
                return
            try:
                self._dsn = os.getenv('POSTGRES_DSN')
                if not self._dsn:
                    raise ValueError('POSTGRES_DSN environment variable is required')

                self._pool = await asyncpg.create_pool(dsn=self._dsn, max_size=20, min_size=1)

                # 确保表存在
                await self._ensure_table()

                # 创建缓存管理器后端
                credentials_backend = PostgresCacheBackend(self._pool, self._table_name, self._credentials_row_key)
                config_backend = PostgresCacheBackend(self._pool, self._table_name, self._config_row_key)

                self._credentials_cache_manager = UnifiedCacheManager(
                    credentials_backend, cache_ttl=self._cache_ttl, write_delay=self._write_delay, name='credentials'
                )
                self._config_cache_manager = UnifiedCacheManager(
                    config_backend, cache_ttl=self._cache_ttl, write_delay=self._write_delay, name='config'
                )

                await self._credentials_cache_manager.start()
                await self._config_cache_manager.start()

                self._initialized = True
                log.info('Postgres connection established with unified cache')
            except Exception as e:
                log.error(f'Error initializing Postgres: {e}')
                raise

    async def _ensure_table(self):
        try:
            async with self._pool.acquire() as conn:
                await conn.execute(
                    f"CREATE TABLE IF NOT EXISTS {self._table_name}(\n                        key TEXT PRIMARY KEY,\n                        data JSONB,\n                        updated_at TIMESTAMPTZ\n                    )"
                )
        except Exception as e:
            log.error(f'Error ensuring Postgres table: {e}')
            raise

    async def close(self):
        if self._credentials_cache_manager:
            await self._credentials_cache_manager.stop()
        if self._config_cache_manager:
            await self._config_cache_manager.stop()
        if self._pool:
            await self._pool.close()
            self._initialized = False
            log.info('Postgres connection closed with unified cache flushed')

    def _ensure_initialized(self):
        if not self._initialized:
            raise RuntimeError('Postgres manager not initialized')

    def _get_default_state(self) -> Dict[str, Any]:
        return {
            'error_codes': [],
            'disabled': False,
            'last_success': time.time(),
            'user_email': None,
        }

    def _get_default_stats(self) -> Dict[str, Any]:
        return {
            'gemini_2_5_pro_calls': 0,
            'total_calls': 0,
            'next_reset_time': None,
            'daily_limit_gemini_2_5_pro': 100,
            'daily_limit_total': 1000
        }

    # 以下方法委托给 UnifiedCacheManager
    async def store_credential(self, filename: str, credential_data: Dict[str, Any]) -> bool:
        self._ensure_initialized()
        start_time = time.time()
        try:
            existing_data = await self._credentials_cache_manager.get(filename, {})
            credential_entry = {
                'credential': credential_data,
                'state': existing_data.get('state', self._get_default_state()),
                'stats': existing_data.get('stats', self._get_default_stats())
            }
            success = await self._credentials_cache_manager.set(filename, credential_entry)
            self._operation_count += 1
            self._operation_times.append(time.time() - start_time)
            log.debug(f'Stored credential to unified cache (postgres): {filename}')
            return success
        except Exception as e:
            log.error(f'Error storing credential {filename} in Postgres: {e}')
            return False

    async def get_credential(self, filename: str) -> Optional[Dict[str, Any]]:
        self._ensure_initialized()
        try:
            credential_entry = await self._credentials_cache_manager.get(filename)
            self._operation_count += 1
            if credential_entry and 'credential' in credential_entry:
                return credential_entry['credential']
            return None
        except Exception as e:
            log.error(f'Error retrieving credential {filename} from Postgres: {e}')
            return None

    async def list_credentials(self) -> List[str]:
        self._ensure_initialized()
        try:
            all_data = await self._credentials_cache_manager.get_all()
            return list(all_data.keys())
        except Exception as e:
            log.error(f'Error listing credentials from Postgres: {e}')
            return []

    async def delete_credential(self, filename: str) -> bool:
        self._ensure_initialized()
        try:
            return await self._credentials_cache_manager.delete(filename)
        except Exception as e:
            log.error(f'Error deleting credential {filename} from Postgres: {e}')
            return False

    async def update_credential_state(self, filename: str, state_updates: Dict[str, Any]) -> bool:
        self._ensure_initialized()
        try:
            existing_data = await self._credentials_cache_manager.get(filename, {})
            if not existing_data:
                existing_data = {'credential': {}, 'state': self._get_default_state(), 'stats': self._get_default_stats()}
            existing_data['state'].update(state_updates)
            return await self._credentials_cache_manager.set(filename, existing_data)
        except Exception as e:
            log.error(f'Error updating credential state {filename} in Postgres: {e}')
            return False

    async def get_credential_state(self, filename: str) -> Dict[str, Any]:
        self._ensure_initialized()
        try:
            credential_entry = await self._credentials_cache_manager.get(filename)
            if credential_entry and 'state' in credential_entry:
                return credential_entry['state']
            return self._get_default_state()
        except Exception as e:
            log.error(f'Error getting credential state {filename} from Postgres: {e}')
            return self._get_default_state()

    async def get_all_credential_states(self) -> Dict[str, Dict[str, Any]]:
        self._ensure_initialized()
        try:
            all_data = await self._credentials_cache_manager.get_all()
            states = {fn: data.get('state', self._get_default_state()) for fn, data in all_data.items()}
            return states
        except Exception as e:
            log.error(f'Error getting all credential states from Postgres: {e}')
            return {}

    async def set_config(self, key: str, value: Any) -> bool:
        self._ensure_initialized()
        return await self._config_cache_manager.set(key, value)

    async def get_config(self, key: str, default: Any = None) -> Any:
        self._ensure_initialized()
        return await self._config_cache_manager.get(key, default)

    async def get_all_config(self) -> Dict[str, Any]:
        self._ensure_initialized()
        return await self._config_cache_manager.get_all()

    async def delete_config(self, key: str) -> bool:
        self._ensure_initialized()
        return await self._config_cache_manager.delete(key)

    async def update_usage_stats(self, filename: str, stats_updates: Dict[str, Any]) -> bool:
        self._ensure_initialized()
        try:
            existing_data = await self._credentials_cache_manager.get(filename, {})
            if not existing_data:
                existing_data = {'credential': {}, 'state': self._get_default_state(), 'stats': self._get_default_stats()}
            existing_data['stats'].update(stats_updates)
            return await self._credentials_cache_manager.set(filename, existing_data)
        except Exception as e:
            log.error(f'Error updating usage stats for {filename} in Postgres: {e}')
            return False

    async def get_usage_stats(self, filename: str) -> Dict[str, Any]:
        self._ensure_initialized()
        try:
            credential_entry = await self._credentials_cache_manager.get(filename)
            if credential_entry and 'stats' in credential_entry:
                return credential_entry['stats']
            return self._get_default_stats()
        except Exception as e:
            log.error(f'Error getting usage stats for {filename} from Postgres: {e}')
            return self._get_default_stats()

    async def get_all_usage_stats(self) -> Dict[str, Dict[str, Any]]:
        self._ensure_initialized()
        try:
            all_data = await self._credentials_cache_manager.get_all()
            stats = {fn: data.get('stats', self._get_default_stats()) for fn, data in all_data.items()}
            return stats
        except Exception as e:
            log.error(f'Error getting all usage stats from Postgres: {e}')
            return {}