| """In-memory balance cache — keeps the gist OFF the request hot path. |
| |
| The router holds every active user's balance in RAM. Reads are instant (no gist |
| hit). The tracker's flusher writes the cached balance back to the gist every 30s. |
| On startup / first request from a user, we lazily load their balance from the gist. |
| |
| Trade-off: if the process crashes, we lose up to 30s of un-flushed burns — i.e. |
| users get a tiny amount of free service. Acceptable for an MVP on free upstreams. |
| """ |
| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| from typing import Optional |
|
|
| from . import registry, user_store |
|
|
| logger = logging.getLogger("router.cache") |
|
|
| |
| _balances: dict[str, int] = {} |
| |
| _gist_ids: dict[str, str] = {} |
| |
| _loaded: set[str] = set() |
| |
| _lock = asyncio.Lock() |
|
|
|
|
| def _user_record(user_id: str) -> Optional[dict]: |
| """Look up a user in the registry cache + memoize their gist_id.""" |
| if user_id not in _gist_ids: |
| rec = registry.find_user(user_id) |
| if not rec: |
| return None |
| _gist_ids[user_id] = rec["gist_id"] |
| return rec |
| return {"gist_id": _gist_ids[user_id]} |
|
|
|
|
| def gist_id_for(user_id: str) -> Optional[str]: |
| """Return the gist_id for a user (from cache), or None if unknown.""" |
| rec = _user_record(user_id) |
| return rec["gist_id"] if rec else None |
|
|
|
|
| async def get_balance(user_id: str) -> int: |
| """Return the user's balance, loading from the gist on first access.""" |
| if user_id not in _loaded: |
| gid = gist_id_for(user_id) |
| if not gid: |
| return 0 |
| bal = await user_store.read_balance(gid) |
| _balances[user_id] = bal.get("credits", 0) |
| _loaded.add(user_id) |
| return _balances.get(user_id, 0) |
|
|
|
|
| def spend(user_id: str, amount: int) -> None: |
| """Decrement the cached balance by <amount> (clamped at 0). Instant, no gist hit.""" |
| cur = _balances.get(user_id, 0) |
| _balances[user_id] = max(0, cur - amount) |
|
|
|
|
| def credit(user_id: str, amount: int) -> None: |
| """Increment the cached balance (called when a payment is confirmed).""" |
| _balances[user_id] = _balances.get(user_id, 0) + amount |
|
|
|
|
| def set_balance(user_id: str, credits: int) -> None: |
| """Force-set the cached balance (used after a full gist reload).""" |
| _balances[user_id] = credits |
| _loaded.add(user_id) |
|
|
|
|
| async def flush() -> int: |
| """Write every dirty cached balance back to its gist. Returns the count written.""" |
| async with _lock: |
| written = 0 |
| for user_id, credits in list(_balances.items()): |
| gid = gist_id_for(user_id) |
| if not gid: |
| continue |
| try: |
| bal = await user_store.read_balance(gid) |
| bal["credits"] = credits |
| await user_store.write_balance(gid, bal) |
| written += 1 |
| except Exception as e: |
| logger.warning("flush failed for %s: %s", user_id, e) |
| return written |
|
|
|
|
| def snapshot() -> dict[str, int]: |
| """Read-only copy of the cache (for status/diagnostics).""" |
| return dict(_balances) |
|
|