"""In-memory balance cache — keeps the gist OFF the request hot path. The router holds every active user's balance in RAM. Reads are instant (no gist hit). The tracker's flusher writes the cached balance back to the gist every 30s. On startup / first request from a user, we lazily load their balance from the gist. Trade-off: if the process crashes, we lose up to 30s of un-flushed burns — i.e. users get a tiny amount of free service. Acceptable for an MVP on free upstreams. """ from __future__ import annotations import asyncio import logging from typing import Optional from . import registry, user_store logger = logging.getLogger("router.cache") # user_id -> current credit balance (the hot counter) _balances: dict[str, int] = {} # user_id -> gist_id (loaded from the registry cache, avoids re-lookup) _gist_ids: dict[str, str] = {} # which users we've already loaded from the gist (don't re-read each request) _loaded: set[str] = set() # serialize flushes so concurrent writes don't interleave _lock = asyncio.Lock() def _user_record(user_id: str) -> Optional[dict]: """Look up a user in the registry cache + memoize their gist_id.""" if user_id not in _gist_ids: rec = registry.find_user(user_id) if not rec: return None _gist_ids[user_id] = rec["gist_id"] return rec return {"gist_id": _gist_ids[user_id]} def gist_id_for(user_id: str) -> Optional[str]: """Return the gist_id for a user (from cache), or None if unknown.""" rec = _user_record(user_id) return rec["gist_id"] if rec else None async def get_balance(user_id: str) -> int: """Return the user's balance, loading from the gist on first access.""" if user_id not in _loaded: gid = gist_id_for(user_id) if not gid: return 0 bal = await user_store.read_balance(gid) _balances[user_id] = bal.get("credits", 0) _loaded.add(user_id) return _balances.get(user_id, 0) def spend(user_id: str, amount: int) -> None: """Decrement the cached balance by (clamped at 0). Instant, no gist hit.""" cur = _balances.get(user_id, 0) _balances[user_id] = max(0, cur - amount) def credit(user_id: str, amount: int) -> None: """Increment the cached balance (called when a payment is confirmed).""" _balances[user_id] = _balances.get(user_id, 0) + amount def set_balance(user_id: str, credits: int) -> None: """Force-set the cached balance (used after a full gist reload).""" _balances[user_id] = credits _loaded.add(user_id) async def flush() -> int: """Write every dirty cached balance back to its gist. Returns the count written.""" async with _lock: written = 0 for user_id, credits in list(_balances.items()): gid = gist_id_for(user_id) if not gid: continue try: bal = await user_store.read_balance(gid) bal["credits"] = credits await user_store.write_balance(gid, bal) written += 1 except Exception as e: # don't let one bad gist abort the whole flush logger.warning("flush failed for %s: %s", user_id, e) return written def snapshot() -> dict[str, int]: """Read-only copy of the cache (for status/diagnostics).""" return dict(_balances)