gemini2api / core /storage.py
xiaoyukkkk's picture
Upload 10 files
ba0f03a verified
"""
Storage abstraction supporting file and PostgreSQL backends.
If DATABASE_URL is set, PostgreSQL is used.
"""
import asyncio
import json
import logging
import os
import threading
from typing import Optional
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger(__name__)
_db_pool = None
_db_pool_lock = None
_db_loop = None
_db_thread = None
_db_loop_lock = threading.Lock()
def _get_database_url() -> str:
return os.environ.get("DATABASE_URL", "").strip()
def is_database_enabled() -> bool:
"""Return True when DATABASE_URL is configured."""
return bool(_get_database_url())
def _ensure_db_loop() -> asyncio.AbstractEventLoop:
global _db_loop, _db_thread
if _db_loop and _db_thread and _db_thread.is_alive():
return _db_loop
with _db_loop_lock:
if _db_loop and _db_thread and _db_thread.is_alive():
return _db_loop
loop = asyncio.new_event_loop()
def _runner() -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
thread = threading.Thread(target=_runner, name="storage-db-loop", daemon=True)
thread.start()
_db_loop = loop
_db_thread = thread
return _db_loop
def _run_in_db_loop(coro):
loop = _ensure_db_loop()
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result()
async def _get_pool():
"""Get (or create) the asyncpg connection pool."""
global _db_pool, _db_pool_lock
if _db_pool is not None:
return _db_pool
if _db_pool_lock is None:
_db_pool_lock = asyncio.Lock()
async with _db_pool_lock:
if _db_pool is not None:
return _db_pool
db_url = _get_database_url()
if not db_url:
raise ValueError("DATABASE_URL is not set")
try:
import asyncpg
_db_pool = await asyncpg.create_pool(
db_url,
min_size=1,
max_size=10,
command_timeout=30,
)
await _init_tables(_db_pool)
logger.info("[STORAGE] PostgreSQL pool initialized")
except ImportError:
logger.error("[STORAGE] asyncpg is required for database storage")
raise
except Exception as e:
logger.error(f"[STORAGE] Database connection failed: {e}")
raise
return _db_pool
async def _init_tables(pool) -> None:
"""Initialize database tables."""
async with pool.acquire() as conn:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS kv_store (
key TEXT PRIMARY KEY,
value JSONB NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
)
logger.info("[STORAGE] Database tables initialized")
async def db_get(key: str) -> Optional[dict]:
"""Fetch a value from the database."""
pool = await _get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT value FROM kv_store WHERE key = $1", key
)
if not row:
return None
value = row["value"]
if isinstance(value, str):
return json.loads(value)
return value
async def db_set(key: str, value: dict) -> None:
"""Persist a value to the database."""
pool = await _get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO kv_store (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP)
ON CONFLICT (key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""",
key,
json.dumps(value, ensure_ascii=False),
)
# ==================== Accounts storage ====================
async def load_accounts() -> Optional[list]:
"""
Load account configuration from database when enabled.
Return None to indicate file-based fallback.
"""
if not is_database_enabled():
return None
try:
data = await db_get("accounts")
if data:
logger.info(f"[STORAGE] Loaded {len(data)} accounts from database")
return data
logger.info("[STORAGE] No accounts found in database")
return []
except Exception as e:
logger.error(f"[STORAGE] Database read failed: {e}")
return None
async def get_accounts_updated_at() -> Optional[float]:
"""
Get the accounts updated_at timestamp (epoch seconds).
Return None if database is not enabled or failed.
"""
if not is_database_enabled():
return None
try:
pool = await _get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT EXTRACT(EPOCH FROM updated_at) AS ts FROM kv_store WHERE key = $1",
"accounts",
)
if not row or row["ts"] is None:
return None
return float(row["ts"])
except Exception as e:
logger.error(f"[STORAGE] Database accounts updated_at failed: {e}")
return None
def get_accounts_updated_at_sync() -> Optional[float]:
"""Sync wrapper for get_accounts_updated_at."""
return _run_in_db_loop(get_accounts_updated_at())
async def save_accounts(accounts: list) -> bool:
"""Save account configuration to database when enabled."""
if not is_database_enabled():
return False
try:
await db_set("accounts", accounts)
logger.info(f"[STORAGE] Saved {len(accounts)} accounts to database")
return True
except Exception as e:
logger.error(f"[STORAGE] Database write failed: {e}")
return False
def load_accounts_sync() -> Optional[list]:
"""Sync wrapper for load_accounts (safe in sync/async call sites)."""
return _run_in_db_loop(load_accounts())
def save_accounts_sync(accounts: list) -> bool:
"""Sync wrapper for save_accounts (safe in sync/async call sites)."""
return _run_in_db_loop(save_accounts(accounts))
# ==================== Settings storage ====================
async def load_settings() -> Optional[dict]:
if not is_database_enabled():
return None
try:
return await db_get("settings")
except Exception as e:
logger.error(f"[STORAGE] Settings read failed: {e}")
return None
async def save_settings(settings: dict) -> bool:
if not is_database_enabled():
return False
try:
await db_set("settings", settings)
logger.info("[STORAGE] Settings saved to database")
return True
except Exception as e:
logger.error(f"[STORAGE] Settings write failed: {e}")
return False
# ==================== Stats storage ====================
async def load_stats() -> Optional[dict]:
if not is_database_enabled():
return None
try:
return await db_get("stats")
except Exception as e:
logger.error(f"[STORAGE] Stats read failed: {e}")
return None
async def save_stats(stats: dict) -> bool:
if not is_database_enabled():
return False
try:
await db_set("stats", stats)
return True
except Exception as e:
logger.error(f"[STORAGE] Stats write failed: {e}")
return False
def load_settings_sync() -> Optional[dict]:
return _run_in_db_loop(load_settings())
def save_settings_sync(settings: dict) -> bool:
return _run_in_db_loop(save_settings(settings))
def load_stats_sync() -> Optional[dict]:
return _run_in_db_loop(load_stats())
def save_stats_sync(stats: dict) -> bool:
return _run_in_db_loop(save_stats(stats))