| | """
|
| | Storage abstraction supporting file and PostgreSQL backends.
|
| |
|
| | If DATABASE_URL is set, PostgreSQL is used.
|
| | """
|
| |
|
| | import asyncio
|
| | import json
|
| | import logging
|
| | import os
|
| | import threading
|
| | from typing import Optional
|
| |
|
| | from dotenv import load_dotenv
|
| |
|
| | load_dotenv()
|
| |
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | _db_pool = None
|
| | _db_pool_lock = None
|
| | _db_loop = None
|
| | _db_thread = None
|
| | _db_loop_lock = threading.Lock()
|
| |
|
| |
|
| | def _get_database_url() -> str:
|
| | return os.environ.get("DATABASE_URL", "").strip()
|
| |
|
| |
|
| | def is_database_enabled() -> bool:
|
| | """Return True when DATABASE_URL is configured."""
|
| | return bool(_get_database_url())
|
| |
|
| |
|
| | def _ensure_db_loop() -> asyncio.AbstractEventLoop:
|
| | global _db_loop, _db_thread
|
| | if _db_loop and _db_thread and _db_thread.is_alive():
|
| | return _db_loop
|
| | with _db_loop_lock:
|
| | if _db_loop and _db_thread and _db_thread.is_alive():
|
| | return _db_loop
|
| | loop = asyncio.new_event_loop()
|
| |
|
| | def _runner() -> None:
|
| | asyncio.set_event_loop(loop)
|
| | loop.run_forever()
|
| |
|
| | thread = threading.Thread(target=_runner, name="storage-db-loop", daemon=True)
|
| | thread.start()
|
| | _db_loop = loop
|
| | _db_thread = thread
|
| | return _db_loop
|
| |
|
| |
|
| | def _run_in_db_loop(coro):
|
| | loop = _ensure_db_loop()
|
| | future = asyncio.run_coroutine_threadsafe(coro, loop)
|
| | return future.result()
|
| |
|
| |
|
| | async def _get_pool():
|
| | """Get (or create) the asyncpg connection pool."""
|
| | global _db_pool, _db_pool_lock
|
| | if _db_pool is not None:
|
| | return _db_pool
|
| | if _db_pool_lock is None:
|
| | _db_pool_lock = asyncio.Lock()
|
| | async with _db_pool_lock:
|
| | if _db_pool is not None:
|
| | return _db_pool
|
| | db_url = _get_database_url()
|
| | if not db_url:
|
| | raise ValueError("DATABASE_URL is not set")
|
| | try:
|
| | import asyncpg
|
| | _db_pool = await asyncpg.create_pool(
|
| | db_url,
|
| | min_size=1,
|
| | max_size=10,
|
| | command_timeout=30,
|
| | )
|
| | await _init_tables(_db_pool)
|
| | logger.info("[STORAGE] PostgreSQL pool initialized")
|
| | except ImportError:
|
| | logger.error("[STORAGE] asyncpg is required for database storage")
|
| | raise
|
| | except Exception as e:
|
| | logger.error(f"[STORAGE] Database connection failed: {e}")
|
| | raise
|
| | return _db_pool
|
| |
|
| |
|
| | async def _init_tables(pool) -> None:
|
| | """Initialize database tables."""
|
| | async with pool.acquire() as conn:
|
| | await conn.execute(
|
| | """
|
| | CREATE TABLE IF NOT EXISTS kv_store (
|
| | key TEXT PRIMARY KEY,
|
| | value JSONB NOT NULL,
|
| | updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| | )
|
| | """
|
| | )
|
| | logger.info("[STORAGE] Database tables initialized")
|
| |
|
| |
|
| | async def db_get(key: str) -> Optional[dict]:
|
| | """Fetch a value from the database."""
|
| | pool = await _get_pool()
|
| | async with pool.acquire() as conn:
|
| | row = await conn.fetchrow(
|
| | "SELECT value FROM kv_store WHERE key = $1", key
|
| | )
|
| | if not row:
|
| | return None
|
| | value = row["value"]
|
| | if isinstance(value, str):
|
| | return json.loads(value)
|
| | return value
|
| |
|
| |
|
| | async def db_set(key: str, value: dict) -> None:
|
| | """Persist a value to the database."""
|
| | pool = await _get_pool()
|
| | async with pool.acquire() as conn:
|
| | await conn.execute(
|
| | """
|
| | INSERT INTO kv_store (key, value, updated_at)
|
| | VALUES ($1, $2, CURRENT_TIMESTAMP)
|
| | ON CONFLICT (key) DO UPDATE SET
|
| | value = EXCLUDED.value,
|
| | updated_at = CURRENT_TIMESTAMP
|
| | """,
|
| | key,
|
| | json.dumps(value, ensure_ascii=False),
|
| | )
|
| |
|
| |
|
| |
|
| |
|
| | async def load_accounts() -> Optional[list]:
|
| | """
|
| | Load account configuration from database when enabled.
|
| | Return None to indicate file-based fallback.
|
| | """
|
| | if not is_database_enabled():
|
| | return None
|
| | try:
|
| | data = await db_get("accounts")
|
| | if data:
|
| | logger.info(f"[STORAGE] Loaded {len(data)} accounts from database")
|
| | return data
|
| | logger.info("[STORAGE] No accounts found in database")
|
| | return []
|
| | except Exception as e:
|
| | logger.error(f"[STORAGE] Database read failed: {e}")
|
| | return None
|
| |
|
| |
|
| | async def get_accounts_updated_at() -> Optional[float]:
|
| | """
|
| | Get the accounts updated_at timestamp (epoch seconds).
|
| | Return None if database is not enabled or failed.
|
| | """
|
| | if not is_database_enabled():
|
| | return None
|
| | try:
|
| | pool = await _get_pool()
|
| | async with pool.acquire() as conn:
|
| | row = await conn.fetchrow(
|
| | "SELECT EXTRACT(EPOCH FROM updated_at) AS ts FROM kv_store WHERE key = $1",
|
| | "accounts",
|
| | )
|
| | if not row or row["ts"] is None:
|
| | return None
|
| | return float(row["ts"])
|
| | except Exception as e:
|
| | logger.error(f"[STORAGE] Database accounts updated_at failed: {e}")
|
| | return None
|
| |
|
| |
|
| | def get_accounts_updated_at_sync() -> Optional[float]:
|
| | """Sync wrapper for get_accounts_updated_at."""
|
| | return _run_in_db_loop(get_accounts_updated_at())
|
| |
|
| |
|
| | async def save_accounts(accounts: list) -> bool:
|
| | """Save account configuration to database when enabled."""
|
| | if not is_database_enabled():
|
| | return False
|
| | try:
|
| | await db_set("accounts", accounts)
|
| | logger.info(f"[STORAGE] Saved {len(accounts)} accounts to database")
|
| | return True
|
| | except Exception as e:
|
| | logger.error(f"[STORAGE] Database write failed: {e}")
|
| | return False
|
| |
|
| |
|
| | def load_accounts_sync() -> Optional[list]:
|
| | """Sync wrapper for load_accounts (safe in sync/async call sites)."""
|
| | return _run_in_db_loop(load_accounts())
|
| |
|
| |
|
| | def save_accounts_sync(accounts: list) -> bool:
|
| | """Sync wrapper for save_accounts (safe in sync/async call sites)."""
|
| | return _run_in_db_loop(save_accounts(accounts))
|
| |
|
| |
|
| |
|
| |
|
| | async def load_settings() -> Optional[dict]:
|
| | if not is_database_enabled():
|
| | return None
|
| | try:
|
| | return await db_get("settings")
|
| | except Exception as e:
|
| | logger.error(f"[STORAGE] Settings read failed: {e}")
|
| | return None
|
| |
|
| |
|
| | async def save_settings(settings: dict) -> bool:
|
| | if not is_database_enabled():
|
| | return False
|
| | try:
|
| | await db_set("settings", settings)
|
| | logger.info("[STORAGE] Settings saved to database")
|
| | return True
|
| | except Exception as e:
|
| | logger.error(f"[STORAGE] Settings write failed: {e}")
|
| | return False
|
| |
|
| |
|
| |
|
| |
|
| | async def load_stats() -> Optional[dict]:
|
| | if not is_database_enabled():
|
| | return None
|
| | try:
|
| | return await db_get("stats")
|
| | except Exception as e:
|
| | logger.error(f"[STORAGE] Stats read failed: {e}")
|
| | return None
|
| |
|
| |
|
| | async def save_stats(stats: dict) -> bool:
|
| | if not is_database_enabled():
|
| | return False
|
| | try:
|
| | await db_set("stats", stats)
|
| | return True
|
| | except Exception as e:
|
| | logger.error(f"[STORAGE] Stats write failed: {e}")
|
| | return False
|
| |
|
| |
|
| | def load_settings_sync() -> Optional[dict]:
|
| | return _run_in_db_loop(load_settings())
|
| |
|
| |
|
| | def save_settings_sync(settings: dict) -> bool:
|
| | return _run_in_db_loop(save_settings(settings))
|
| |
|
| |
|
| | def load_stats_sync() -> Optional[dict]:
|
| | return _run_in_db_loop(load_stats())
|
| |
|
| |
|
| | def save_stats_sync(stats: dict) -> bool:
|
| | return _run_in_db_loop(save_stats(stats))
|
| |
|