Spaces:
Sleeping
Sleeping
| """ | |
| Devil's Dozen - Realtime Sync Manager | |
| High-level manager that ties together channel subscriptions with | |
| game state reconciliation. Provides convenience functions for the UI layer | |
| and a polling fallback when WebSocket connections fail. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import threading | |
| import time | |
| from typing import Any, Callable | |
| from supabase import Client | |
| from src.database.game_state import GameStateManager | |
| from src.database.lobby import LobbyManager | |
| from src.database.models import GameState, Lobby, Player | |
| from src.database.player import PlayerManager | |
| from src.realtime.events import EventPayload, GameEvent | |
| from src.realtime.subscriptions import ChannelManager | |
| logger = logging.getLogger(__name__) | |
| class RealtimeManager: | |
| """Coordinates realtime subscriptions and state reconciliation. | |
| Wraps ChannelManager with higher-level game logic: snapshot | |
| fetching on subscribe, polling fallback, and clean teardown. | |
| """ | |
| def __init__(self, client: Client) -> None: | |
| self._client = client | |
| self._channel_mgr = ChannelManager(client) | |
| self._lobby_mgr = LobbyManager(client) | |
| self._player_mgr = PlayerManager(client) | |
| self._game_state_mgr = GameStateManager(client) | |
| self._poll_threads: dict[str, threading.Event] = {} | |
| def subscribe( | |
| self, | |
| lobby_id: str, | |
| on_event: Callable[[EventPayload], None], | |
| *, | |
| use_polling_fallback: bool = True, | |
| poll_interval: float = 2.0, | |
| ) -> None: | |
| """Subscribe to live updates for a lobby. | |
| Attempts WebSocket subscription first. If it fails and | |
| use_polling_fallback is True, starts a polling thread instead. | |
| Args: | |
| lobby_id: UUID of the lobby to watch. | |
| on_event: Callback receiving EventPayload for each change. | |
| use_polling_fallback: Fall back to polling on WS failure. | |
| poll_interval: Seconds between polls (fallback only). | |
| """ | |
| try: | |
| self._channel_mgr.subscribe(lobby_id, on_event) | |
| logger.info("Realtime subscription active for lobby %s", lobby_id) | |
| except Exception: | |
| logger.exception("WebSocket subscription failed for lobby %s", lobby_id) | |
| if use_polling_fallback: | |
| logger.info("Falling back to polling for lobby %s", lobby_id) | |
| self._start_polling(lobby_id, on_event, poll_interval) | |
| else: | |
| raise | |
| def unsubscribe(self, lobby_id: str) -> None: | |
| """Unsubscribe from a lobby (both WS and polling).""" | |
| self._channel_mgr.unsubscribe(lobby_id) | |
| self._stop_polling(lobby_id) | |
| def get_snapshot(self, lobby_id: str) -> dict[str, Any]: | |
| """Fetch the current full state of a lobby from the database. | |
| Useful for initial state load on subscribe and for | |
| reconciliation after reconnects. | |
| Returns: | |
| Dict with 'lobby', 'players', and 'game_state' keys. | |
| """ | |
| lobby = self._lobby_mgr.get_by_id(lobby_id) | |
| players = self._player_mgr.list_by_lobby(lobby_id) | |
| game_state = self._game_state_mgr.get(lobby_id) | |
| return { | |
| "lobby": lobby, | |
| "players": players, | |
| "game_state": game_state, | |
| } | |
| def shutdown(self) -> None: | |
| """Clean up all subscriptions and background threads.""" | |
| for lobby_id in list(self._poll_threads.keys()): | |
| self._stop_polling(lobby_id) | |
| self._channel_mgr.shutdown() | |
| # -- Polling fallback ------------------------------------------------ | |
| def _start_polling( | |
| self, | |
| lobby_id: str, | |
| on_event: Callable[[EventPayload], None], | |
| interval: float, | |
| ) -> None: | |
| """Start a background polling thread for a lobby.""" | |
| if lobby_id in self._poll_threads: | |
| return | |
| stop_event = threading.Event() | |
| self._poll_threads[lobby_id] = stop_event | |
| thread = threading.Thread( | |
| target=self._poll_loop, | |
| args=(lobby_id, on_event, interval, stop_event), | |
| daemon=True, | |
| name=f"poll-{lobby_id[:8]}", | |
| ) | |
| thread.start() | |
| def _stop_polling(self, lobby_id: str) -> None: | |
| """Signal a polling thread to stop.""" | |
| stop_event = self._poll_threads.pop(lobby_id, None) | |
| if stop_event: | |
| stop_event.set() | |
| def _poll_loop( | |
| self, | |
| lobby_id: str, | |
| on_event: Callable[[EventPayload], None], | |
| interval: float, | |
| stop_event: threading.Event, | |
| ) -> None: | |
| """Poll the database for changes and emit events.""" | |
| last_state: dict[str, Any] | None = None | |
| while not stop_event.is_set(): | |
| try: | |
| game_state = self._game_state_mgr.get(lobby_id) | |
| lobby = self._lobby_mgr.get_by_id(lobby_id) | |
| if game_state and last_state: | |
| self._diff_and_emit( | |
| lobby_id, game_state, lobby, last_state, on_event | |
| ) | |
| last_state = { | |
| "game_state": game_state, | |
| "lobby": lobby, | |
| } | |
| except Exception: | |
| logger.exception("Polling error for lobby %s", lobby_id) | |
| stop_event.wait(interval) | |
| def _diff_and_emit( | |
| self, | |
| lobby_id: str, | |
| game_state: GameState, | |
| lobby: Lobby | None, | |
| last_state: dict[str, Any], | |
| on_event: Callable[[EventPayload], None], | |
| ) -> None: | |
| """Compare current state to previous and emit events for differences.""" | |
| prev_gs: GameState | None = last_state.get("game_state") | |
| prev_lobby: Lobby | None = last_state.get("lobby") | |
| if prev_gs and game_state: | |
| if game_state.is_bust and not prev_gs.is_bust: | |
| on_event(EventPayload( | |
| event=GameEvent.PLAYER_BUST, | |
| lobby_id=lobby_id, | |
| data={"game_state": game_state.model_dump()}, | |
| )) | |
| elif game_state.active_dice != prev_gs.active_dice: | |
| on_event(EventPayload( | |
| event=GameEvent.DICE_ROLLED, | |
| lobby_id=lobby_id, | |
| data={"game_state": game_state.model_dump()}, | |
| )) | |
| elif game_state.held_indices != prev_gs.held_indices: | |
| on_event(EventPayload( | |
| event=GameEvent.DICE_HELD, | |
| lobby_id=lobby_id, | |
| data={"game_state": game_state.model_dump()}, | |
| )) | |
| elif game_state.updated_at != prev_gs.updated_at: | |
| on_event(EventPayload( | |
| event=GameEvent.STATE_UPDATED, | |
| lobby_id=lobby_id, | |
| data={"game_state": game_state.model_dump()}, | |
| )) | |
| if prev_lobby and lobby: | |
| if lobby.status != prev_lobby.status: | |
| if lobby.status == "playing": | |
| on_event(EventPayload( | |
| event=GameEvent.GAME_STARTED, | |
| lobby_id=lobby_id, | |
| data={"lobby": lobby.model_dump()}, | |
| )) | |
| elif lobby.status == "finished": | |
| on_event(EventPayload( | |
| event=GameEvent.GAME_WON, | |
| lobby_id=lobby_id, | |
| data={"lobby": lobby.model_dump()}, | |
| )) | |
| elif lobby.current_turn_index != prev_lobby.current_turn_index: | |
| on_event(EventPayload( | |
| event=GameEvent.TURN_ADVANCED, | |
| lobby_id=lobby_id, | |
| data={"lobby": lobby.model_dump()}, | |
| )) | |
| # -- Module-level convenience functions ---------------------------------- | |
| _manager_instance: RealtimeManager | None = None | |
| _manager_lock = threading.Lock() | |
| def _get_manager(client: Client) -> RealtimeManager: | |
| """Get or create the singleton RealtimeManager.""" | |
| global _manager_instance | |
| with _manager_lock: | |
| if _manager_instance is None: | |
| _manager_instance = RealtimeManager(client) | |
| return _manager_instance | |
| def subscribe_to_lobby( | |
| client: Client, | |
| lobby_id: str, | |
| on_event: Callable[[EventPayload], None], | |
| ) -> RealtimeManager: | |
| """Subscribe to realtime updates for a lobby. | |
| Convenience function for use in the UI layer. | |
| Args: | |
| client: Supabase client instance. | |
| lobby_id: UUID of the lobby to watch. | |
| on_event: Callback for game events. | |
| Returns: | |
| The RealtimeManager instance (for snapshot access, etc.) | |
| """ | |
| manager = _get_manager(client) | |
| manager.subscribe(lobby_id, on_event) | |
| return manager | |
| def unsubscribe_from_lobby(client: Client, lobby_id: str) -> None: | |
| """Unsubscribe from realtime updates for a lobby.""" | |
| manager = _get_manager(client) | |
| manager.unsubscribe(lobby_id) | |