""" Storage abstraction supporting file and PostgreSQL backends. If DATABASE_URL is set, PostgreSQL is used. """ import asyncio import json import logging import os import threading from typing import Optional from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) _db_pool = None _db_pool_lock = None _db_loop = None _db_thread = None _db_loop_lock = threading.Lock() def _get_database_url() -> str: return os.environ.get("DATABASE_URL", "").strip() def is_database_enabled() -> bool: """Return True when DATABASE_URL is configured.""" return bool(_get_database_url()) def _ensure_db_loop() -> asyncio.AbstractEventLoop: global _db_loop, _db_thread if _db_loop and _db_thread and _db_thread.is_alive(): return _db_loop with _db_loop_lock: if _db_loop and _db_thread and _db_thread.is_alive(): return _db_loop loop = asyncio.new_event_loop() def _runner() -> None: asyncio.set_event_loop(loop) loop.run_forever() thread = threading.Thread(target=_runner, name="storage-db-loop", daemon=True) thread.start() _db_loop = loop _db_thread = thread return _db_loop def _run_in_db_loop(coro): loop = _ensure_db_loop() future = asyncio.run_coroutine_threadsafe(coro, loop) return future.result() async def _get_pool(): """Get (or create) the asyncpg connection pool.""" global _db_pool, _db_pool_lock if _db_pool is not None: return _db_pool if _db_pool_lock is None: _db_pool_lock = asyncio.Lock() async with _db_pool_lock: if _db_pool is not None: return _db_pool db_url = _get_database_url() if not db_url: raise ValueError("DATABASE_URL is not set") try: import asyncpg _db_pool = await asyncpg.create_pool( db_url, min_size=1, max_size=10, command_timeout=30, ) await _init_tables(_db_pool) logger.info("[STORAGE] PostgreSQL pool initialized") except ImportError: logger.error("[STORAGE] asyncpg is required for database storage") raise except Exception as e: logger.error(f"[STORAGE] Database connection failed: {e}") raise return _db_pool async def _init_tables(pool) -> None: """Initialize database tables.""" async with pool.acquire() as conn: await conn.execute( """ CREATE TABLE IF NOT EXISTS kv_store ( key TEXT PRIMARY KEY, value JSONB NOT NULL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """ ) logger.info("[STORAGE] Database tables initialized") async def db_get(key: str) -> Optional[dict]: """Fetch a value from the database.""" pool = await _get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT value FROM kv_store WHERE key = $1", key ) if not row: return None value = row["value"] if isinstance(value, str): return json.loads(value) return value async def db_set(key: str, value: dict) -> None: """Persist a value to the database.""" pool = await _get_pool() async with pool.acquire() as conn: await conn.execute( """ INSERT INTO kv_store (key, value, updated_at) VALUES ($1, $2, CURRENT_TIMESTAMP) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = CURRENT_TIMESTAMP """, key, json.dumps(value, ensure_ascii=False), ) # ==================== Accounts storage ==================== async def load_accounts() -> Optional[list]: """ Load account configuration from database when enabled. Return None to indicate file-based fallback. """ if not is_database_enabled(): return None try: data = await db_get("accounts") if data: logger.info(f"[STORAGE] Loaded {len(data)} accounts from database") return data logger.info("[STORAGE] No accounts found in database") return [] except Exception as e: logger.error(f"[STORAGE] Database read failed: {e}") return None async def get_accounts_updated_at() -> Optional[float]: """ Get the accounts updated_at timestamp (epoch seconds). Return None if database is not enabled or failed. """ if not is_database_enabled(): return None try: pool = await _get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT EXTRACT(EPOCH FROM updated_at) AS ts FROM kv_store WHERE key = $1", "accounts", ) if not row or row["ts"] is None: return None return float(row["ts"]) except Exception as e: logger.error(f"[STORAGE] Database accounts updated_at failed: {e}") return None def get_accounts_updated_at_sync() -> Optional[float]: """Sync wrapper for get_accounts_updated_at.""" return _run_in_db_loop(get_accounts_updated_at()) async def save_accounts(accounts: list) -> bool: """Save account configuration to database when enabled.""" if not is_database_enabled(): return False try: await db_set("accounts", accounts) logger.info(f"[STORAGE] Saved {len(accounts)} accounts to database") return True except Exception as e: logger.error(f"[STORAGE] Database write failed: {e}") return False def load_accounts_sync() -> Optional[list]: """Sync wrapper for load_accounts (safe in sync/async call sites).""" return _run_in_db_loop(load_accounts()) def save_accounts_sync(accounts: list) -> bool: """Sync wrapper for save_accounts (safe in sync/async call sites).""" return _run_in_db_loop(save_accounts(accounts)) # ==================== Settings storage ==================== async def load_settings() -> Optional[dict]: if not is_database_enabled(): return None try: return await db_get("settings") except Exception as e: logger.error(f"[STORAGE] Settings read failed: {e}") return None async def save_settings(settings: dict) -> bool: if not is_database_enabled(): return False try: await db_set("settings", settings) logger.info("[STORAGE] Settings saved to database") return True except Exception as e: logger.error(f"[STORAGE] Settings write failed: {e}") return False # ==================== Stats storage ==================== async def load_stats() -> Optional[dict]: if not is_database_enabled(): return None try: return await db_get("stats") except Exception as e: logger.error(f"[STORAGE] Stats read failed: {e}") return None async def save_stats(stats: dict) -> bool: if not is_database_enabled(): return False try: await db_set("stats", stats) return True except Exception as e: logger.error(f"[STORAGE] Stats write failed: {e}") return False def load_settings_sync() -> Optional[dict]: return _run_in_db_loop(load_settings()) def save_settings_sync(settings: dict) -> bool: return _run_in_db_loop(save_settings(settings)) def load_stats_sync() -> Optional[dict]: return _run_in_db_loop(load_stats()) def save_stats_sync(stats: dict) -> bool: return _run_in_db_loop(save_stats(stats))