| """
|
| Storage abstraction supporting file and PostgreSQL backends.
|
|
|
| If DATABASE_URL is set, PostgreSQL is used.
|
| """
|
|
|
| import asyncio
|
| import json
|
| import logging
|
| import os
|
| import threading
|
| from typing import Optional
|
|
|
| from dotenv import load_dotenv
|
|
|
| load_dotenv()
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| _db_pool = None
|
| _db_pool_lock = None
|
| _db_loop = None
|
| _db_thread = None
|
| _db_loop_lock = threading.Lock()
|
|
|
|
|
| def _get_database_url() -> str:
|
| return os.environ.get("DATABASE_URL", "").strip()
|
|
|
|
|
| def is_database_enabled() -> bool:
|
| """Return True when DATABASE_URL is configured."""
|
| return bool(_get_database_url())
|
|
|
|
|
| def _ensure_db_loop() -> asyncio.AbstractEventLoop:
|
| global _db_loop, _db_thread
|
| if _db_loop and _db_thread and _db_thread.is_alive():
|
| return _db_loop
|
| with _db_loop_lock:
|
| if _db_loop and _db_thread and _db_thread.is_alive():
|
| return _db_loop
|
| loop = asyncio.new_event_loop()
|
|
|
| def _runner() -> None:
|
| asyncio.set_event_loop(loop)
|
| loop.run_forever()
|
|
|
| thread = threading.Thread(target=_runner, name="storage-db-loop", daemon=True)
|
| thread.start()
|
| _db_loop = loop
|
| _db_thread = thread
|
| return _db_loop
|
|
|
|
|
| def _run_in_db_loop(coro):
|
| loop = _ensure_db_loop()
|
| future = asyncio.run_coroutine_threadsafe(coro, loop)
|
| return future.result()
|
|
|
|
|
| async def _get_pool():
|
| """Get (or create) the asyncpg connection pool."""
|
| global _db_pool, _db_pool_lock
|
| if _db_pool is not None:
|
| return _db_pool
|
| if _db_pool_lock is None:
|
| _db_pool_lock = asyncio.Lock()
|
| async with _db_pool_lock:
|
| if _db_pool is not None:
|
| return _db_pool
|
| db_url = _get_database_url()
|
| if not db_url:
|
| raise ValueError("DATABASE_URL is not set")
|
| try:
|
| import asyncpg
|
| _db_pool = await asyncpg.create_pool(
|
| db_url,
|
| min_size=1,
|
| max_size=10,
|
| command_timeout=30,
|
| )
|
| await _init_tables(_db_pool)
|
| logger.info("[STORAGE] PostgreSQL pool initialized")
|
| except ImportError:
|
| logger.error("[STORAGE] asyncpg is required for database storage")
|
| raise
|
| except Exception as e:
|
| logger.error(f"[STORAGE] Database connection failed: {e}")
|
| raise
|
| return _db_pool
|
|
|
|
|
| async def _init_tables(pool) -> None:
|
| """Initialize database tables."""
|
| async with pool.acquire() as conn:
|
| await conn.execute(
|
| """
|
| CREATE TABLE IF NOT EXISTS kv_store (
|
| key TEXT PRIMARY KEY,
|
| value JSONB NOT NULL,
|
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| )
|
| """
|
| )
|
| logger.info("[STORAGE] Database tables initialized")
|
|
|
|
|
| async def db_get(key: str) -> Optional[dict]:
|
| """Fetch a value from the database."""
|
| pool = await _get_pool()
|
| async with pool.acquire() as conn:
|
| row = await conn.fetchrow(
|
| "SELECT value FROM kv_store WHERE key = $1", key
|
| )
|
| if not row:
|
| return None
|
| value = row["value"]
|
| if isinstance(value, str):
|
| return json.loads(value)
|
| return value
|
|
|
|
|
| async def db_set(key: str, value: dict) -> None:
|
| """Persist a value to the database."""
|
| pool = await _get_pool()
|
| async with pool.acquire() as conn:
|
| await conn.execute(
|
| """
|
| INSERT INTO kv_store (key, value, updated_at)
|
| VALUES ($1, $2, CURRENT_TIMESTAMP)
|
| ON CONFLICT (key) DO UPDATE SET
|
| value = EXCLUDED.value,
|
| updated_at = CURRENT_TIMESTAMP
|
| """,
|
| key,
|
| json.dumps(value, ensure_ascii=False),
|
| )
|
|
|
|
|
|
|
|
|
| async def load_accounts() -> Optional[list]:
|
| """
|
| Load account configuration from database when enabled.
|
| Return None to indicate file-based fallback.
|
| """
|
| if not is_database_enabled():
|
| return None
|
| try:
|
| data = await db_get("accounts")
|
| if data:
|
| logger.info(f"[STORAGE] Loaded {len(data)} accounts from database")
|
| return data
|
| logger.info("[STORAGE] No accounts found in database")
|
| return []
|
| except Exception as e:
|
| logger.error(f"[STORAGE] Database read failed: {e}")
|
| return None
|
|
|
|
|
| async def get_accounts_updated_at() -> Optional[float]:
|
| """
|
| Get the accounts updated_at timestamp (epoch seconds).
|
| Return None if database is not enabled or failed.
|
| """
|
| if not is_database_enabled():
|
| return None
|
| try:
|
| pool = await _get_pool()
|
| async with pool.acquire() as conn:
|
| row = await conn.fetchrow(
|
| "SELECT EXTRACT(EPOCH FROM updated_at) AS ts FROM kv_store WHERE key = $1",
|
| "accounts",
|
| )
|
| if not row or row["ts"] is None:
|
| return None
|
| return float(row["ts"])
|
| except Exception as e:
|
| logger.error(f"[STORAGE] Database accounts updated_at failed: {e}")
|
| return None
|
|
|
|
|
| def get_accounts_updated_at_sync() -> Optional[float]:
|
| """Sync wrapper for get_accounts_updated_at."""
|
| return _run_in_db_loop(get_accounts_updated_at())
|
|
|
|
|
| async def save_accounts(accounts: list) -> bool:
|
| """Save account configuration to database when enabled."""
|
| if not is_database_enabled():
|
| return False
|
| try:
|
| await db_set("accounts", accounts)
|
| logger.info(f"[STORAGE] Saved {len(accounts)} accounts to database")
|
| return True
|
| except Exception as e:
|
| logger.error(f"[STORAGE] Database write failed: {e}")
|
| return False
|
|
|
|
|
| def load_accounts_sync() -> Optional[list]:
|
| """Sync wrapper for load_accounts (safe in sync/async call sites)."""
|
| return _run_in_db_loop(load_accounts())
|
|
|
|
|
| def save_accounts_sync(accounts: list) -> bool:
|
| """Sync wrapper for save_accounts (safe in sync/async call sites)."""
|
| return _run_in_db_loop(save_accounts(accounts))
|
|
|
|
|
|
|
|
|
| async def load_settings() -> Optional[dict]:
|
| if not is_database_enabled():
|
| return None
|
| try:
|
| return await db_get("settings")
|
| except Exception as e:
|
| logger.error(f"[STORAGE] Settings read failed: {e}")
|
| return None
|
|
|
|
|
| async def save_settings(settings: dict) -> bool:
|
| if not is_database_enabled():
|
| return False
|
| try:
|
| await db_set("settings", settings)
|
| logger.info("[STORAGE] Settings saved to database")
|
| return True
|
| except Exception as e:
|
| logger.error(f"[STORAGE] Settings write failed: {e}")
|
| return False
|
|
|
|
|
|
|
|
|
| async def load_stats() -> Optional[dict]:
|
| if not is_database_enabled():
|
| return None
|
| try:
|
| return await db_get("stats")
|
| except Exception as e:
|
| logger.error(f"[STORAGE] Stats read failed: {e}")
|
| return None
|
|
|
|
|
| async def save_stats(stats: dict) -> bool:
|
| if not is_database_enabled():
|
| return False
|
| try:
|
| await db_set("stats", stats)
|
| return True
|
| except Exception as e:
|
| logger.error(f"[STORAGE] Stats write failed: {e}")
|
| return False
|
|
|
|
|
| def load_settings_sync() -> Optional[dict]:
|
| return _run_in_db_loop(load_settings())
|
|
|
|
|
| def save_settings_sync(settings: dict) -> bool:
|
| return _run_in_db_loop(save_settings(settings))
|
|
|
|
|
| def load_stats_sync() -> Optional[dict]:
|
| return _run_in_db_loop(load_stats())
|
|
|
|
|
| def save_stats_sync(stats: dict) -> bool:
|
| return _run_in_db_loop(save_stats(stats))
|
|
|