DevilsDozen / src /realtime /sync_manager.py
legomaheggo's picture
feat: Add config, database, realtime, and UI layers with full multiplayer gameplay
a9b6601
"""
Devil's Dozen - Realtime Sync Manager
High-level manager that ties together channel subscriptions with
game state reconciliation. Provides convenience functions for the UI layer
and a polling fallback when WebSocket connections fail.
"""
from __future__ import annotations
import logging
import threading
import time
from typing import Any, Callable
from supabase import Client
from src.database.game_state import GameStateManager
from src.database.lobby import LobbyManager
from src.database.models import GameState, Lobby, Player
from src.database.player import PlayerManager
from src.realtime.events import EventPayload, GameEvent
from src.realtime.subscriptions import ChannelManager
logger = logging.getLogger(__name__)
class RealtimeManager:
"""Coordinates realtime subscriptions and state reconciliation.
Wraps ChannelManager with higher-level game logic: snapshot
fetching on subscribe, polling fallback, and clean teardown.
"""
def __init__(self, client: Client) -> None:
self._client = client
self._channel_mgr = ChannelManager(client)
self._lobby_mgr = LobbyManager(client)
self._player_mgr = PlayerManager(client)
self._game_state_mgr = GameStateManager(client)
self._poll_threads: dict[str, threading.Event] = {}
def subscribe(
self,
lobby_id: str,
on_event: Callable[[EventPayload], None],
*,
use_polling_fallback: bool = True,
poll_interval: float = 2.0,
) -> None:
"""Subscribe to live updates for a lobby.
Attempts WebSocket subscription first. If it fails and
use_polling_fallback is True, starts a polling thread instead.
Args:
lobby_id: UUID of the lobby to watch.
on_event: Callback receiving EventPayload for each change.
use_polling_fallback: Fall back to polling on WS failure.
poll_interval: Seconds between polls (fallback only).
"""
try:
self._channel_mgr.subscribe(lobby_id, on_event)
logger.info("Realtime subscription active for lobby %s", lobby_id)
except Exception:
logger.exception("WebSocket subscription failed for lobby %s", lobby_id)
if use_polling_fallback:
logger.info("Falling back to polling for lobby %s", lobby_id)
self._start_polling(lobby_id, on_event, poll_interval)
else:
raise
def unsubscribe(self, lobby_id: str) -> None:
"""Unsubscribe from a lobby (both WS and polling)."""
self._channel_mgr.unsubscribe(lobby_id)
self._stop_polling(lobby_id)
def get_snapshot(self, lobby_id: str) -> dict[str, Any]:
"""Fetch the current full state of a lobby from the database.
Useful for initial state load on subscribe and for
reconciliation after reconnects.
Returns:
Dict with 'lobby', 'players', and 'game_state' keys.
"""
lobby = self._lobby_mgr.get_by_id(lobby_id)
players = self._player_mgr.list_by_lobby(lobby_id)
game_state = self._game_state_mgr.get(lobby_id)
return {
"lobby": lobby,
"players": players,
"game_state": game_state,
}
def shutdown(self) -> None:
"""Clean up all subscriptions and background threads."""
for lobby_id in list(self._poll_threads.keys()):
self._stop_polling(lobby_id)
self._channel_mgr.shutdown()
# -- Polling fallback ------------------------------------------------
def _start_polling(
self,
lobby_id: str,
on_event: Callable[[EventPayload], None],
interval: float,
) -> None:
"""Start a background polling thread for a lobby."""
if lobby_id in self._poll_threads:
return
stop_event = threading.Event()
self._poll_threads[lobby_id] = stop_event
thread = threading.Thread(
target=self._poll_loop,
args=(lobby_id, on_event, interval, stop_event),
daemon=True,
name=f"poll-{lobby_id[:8]}",
)
thread.start()
def _stop_polling(self, lobby_id: str) -> None:
"""Signal a polling thread to stop."""
stop_event = self._poll_threads.pop(lobby_id, None)
if stop_event:
stop_event.set()
def _poll_loop(
self,
lobby_id: str,
on_event: Callable[[EventPayload], None],
interval: float,
stop_event: threading.Event,
) -> None:
"""Poll the database for changes and emit events."""
last_state: dict[str, Any] | None = None
while not stop_event.is_set():
try:
game_state = self._game_state_mgr.get(lobby_id)
lobby = self._lobby_mgr.get_by_id(lobby_id)
if game_state and last_state:
self._diff_and_emit(
lobby_id, game_state, lobby, last_state, on_event
)
last_state = {
"game_state": game_state,
"lobby": lobby,
}
except Exception:
logger.exception("Polling error for lobby %s", lobby_id)
stop_event.wait(interval)
def _diff_and_emit(
self,
lobby_id: str,
game_state: GameState,
lobby: Lobby | None,
last_state: dict[str, Any],
on_event: Callable[[EventPayload], None],
) -> None:
"""Compare current state to previous and emit events for differences."""
prev_gs: GameState | None = last_state.get("game_state")
prev_lobby: Lobby | None = last_state.get("lobby")
if prev_gs and game_state:
if game_state.is_bust and not prev_gs.is_bust:
on_event(EventPayload(
event=GameEvent.PLAYER_BUST,
lobby_id=lobby_id,
data={"game_state": game_state.model_dump()},
))
elif game_state.active_dice != prev_gs.active_dice:
on_event(EventPayload(
event=GameEvent.DICE_ROLLED,
lobby_id=lobby_id,
data={"game_state": game_state.model_dump()},
))
elif game_state.held_indices != prev_gs.held_indices:
on_event(EventPayload(
event=GameEvent.DICE_HELD,
lobby_id=lobby_id,
data={"game_state": game_state.model_dump()},
))
elif game_state.updated_at != prev_gs.updated_at:
on_event(EventPayload(
event=GameEvent.STATE_UPDATED,
lobby_id=lobby_id,
data={"game_state": game_state.model_dump()},
))
if prev_lobby and lobby:
if lobby.status != prev_lobby.status:
if lobby.status == "playing":
on_event(EventPayload(
event=GameEvent.GAME_STARTED,
lobby_id=lobby_id,
data={"lobby": lobby.model_dump()},
))
elif lobby.status == "finished":
on_event(EventPayload(
event=GameEvent.GAME_WON,
lobby_id=lobby_id,
data={"lobby": lobby.model_dump()},
))
elif lobby.current_turn_index != prev_lobby.current_turn_index:
on_event(EventPayload(
event=GameEvent.TURN_ADVANCED,
lobby_id=lobby_id,
data={"lobby": lobby.model_dump()},
))
# -- Module-level convenience functions ----------------------------------
_manager_instance: RealtimeManager | None = None
_manager_lock = threading.Lock()
def _get_manager(client: Client) -> RealtimeManager:
"""Get or create the singleton RealtimeManager."""
global _manager_instance
with _manager_lock:
if _manager_instance is None:
_manager_instance = RealtimeManager(client)
return _manager_instance
def subscribe_to_lobby(
client: Client,
lobby_id: str,
on_event: Callable[[EventPayload], None],
) -> RealtimeManager:
"""Subscribe to realtime updates for a lobby.
Convenience function for use in the UI layer.
Args:
client: Supabase client instance.
lobby_id: UUID of the lobby to watch.
on_event: Callback for game events.
Returns:
The RealtimeManager instance (for snapshot access, etc.)
"""
manager = _get_manager(client)
manager.subscribe(lobby_id, on_event)
return manager
def unsubscribe_from_lobby(client: Client, lobby_id: str) -> None:
"""Unsubscribe from realtime updates for a lobby."""
manager = _get_manager(client)
manager.unsubscribe(lobby_id)