| from __future__ import annotations |
|
|
| import asyncio |
| import random |
| import secrets |
| import string |
| import time |
| from dataclasses import dataclass, field |
| from pathlib import Path |
| from typing import Any |
|
|
| from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect |
| from fastapi.responses import FileResponse, JSONResponse, Response |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel, Field |
|
|
|
|
| ROOT = Path(__file__).resolve().parent |
| STATIC_DIR = ROOT / "static" |
| UNO_HTML = ROOT / "uno.html" |
|
|
| COLORS = ["red", "blue", "green", "yellow"] |
| ACTION_VALUES = {"skip", "reverse", "draw2", "wild", "wild_draw4"} |
| BOT_DELAYS = {"easy": 0.6, "medium": 1.0, "hard": 1.4} |
| MIN_PLAYERS = 3 |
| MAX_PLAYERS = 15 |
| ROOM_IDLE_SECONDS = 30 * 60 |
| RECONNECT_SECONDS = 2 * 60 |
| AUTO_DRAW_DELAY_SECONDS = 0.42 |
|
|
|
|
| class GameError(Exception): |
| def __init__(self, code: str, message: str): |
| super().__init__(message) |
| self.code = code |
| self.message = message |
|
|
|
|
| @dataclass |
| class Card: |
| id: str |
| color: str |
| value: str |
|
|
| def public(self) -> dict[str, str]: |
| return {"id": self.id, "color": self.color, "value": self.value} |
|
|
|
|
| @dataclass |
| class Player: |
| id: str |
| name: str |
| token: str |
| is_bot: bool = False |
| connected: bool = False |
| hand: list[Card] = field(default_factory=list) |
| said_uno: bool = False |
| finished: bool = False |
| rank: int | None = None |
| disconnected_at: float | None = None |
|
|
|
|
| @dataclass |
| class Room: |
| code: str |
| host_player_id: str |
| total_players: int |
| bot_difficulty: str = "hard" |
| bot_jump_in: bool = True |
| phase: str = "lobby" |
| players: list[Player] = field(default_factory=list) |
| deck: list[Card] = field(default_factory=list) |
| discard_pile: list[Card] = field(default_factory=list) |
| current_player_id: str | None = None |
| direction: int = 1 |
| current_color: str = "red" |
| pending_draw: int = 0 |
| finish_order: list[str] = field(default_factory=list) |
| awaiting_color_player_id: str | None = None |
| awaiting_color_card_value: str | None = None |
| jump_in_enabled: bool = True |
| connections: dict[str, WebSocket] = field(default_factory=dict) |
| lock: asyncio.Lock = field(default_factory=asyncio.Lock) |
| created_at: float = field(default_factory=time.time) |
| last_active: float = field(default_factory=time.time) |
| version: int = 0 |
| card_uid_seed: int = 1 |
|
|
| def touch(self) -> None: |
| self.last_active = time.time() |
|
|
| def next_card_id(self) -> str: |
| card_id = f"card_{self.card_uid_seed}" |
| self.card_uid_seed += 1 |
| return card_id |
|
|
| def active_players(self) -> list[Player]: |
| return [p for p in self.players if not p.finished] |
|
|
| def human_players(self) -> list[Player]: |
| return [p for p in self.players if not p.is_bot] |
|
|
| def get_player(self, player_id: str) -> Player: |
| for player in self.players: |
| if player.id == player_id: |
| return player |
| raise GameError("player_not_found", "Player is not in this room.") |
|
|
| def get_player_index(self, player_id: str) -> int: |
| for idx, player in enumerate(self.players): |
| if player.id == player_id: |
| return idx |
| raise GameError("player_not_found", "Player is not in this room.") |
|
|
| def get_top_card(self) -> Card: |
| if not self.discard_pile: |
| raise GameError("no_discard", "The discard pile is empty.") |
| return self.discard_pile[-1] |
|
|
| def make_deck(self) -> list[Card]: |
| deck: list[Card] = [] |
| for color in COLORS: |
| deck.append(Card(self.next_card_id(), color, "0")) |
| for value in range(1, 10): |
| deck.append(Card(self.next_card_id(), color, str(value))) |
| deck.append(Card(self.next_card_id(), color, str(value))) |
| for value in ("skip", "reverse", "draw2"): |
| deck.append(Card(self.next_card_id(), color, value)) |
| deck.append(Card(self.next_card_id(), color, value)) |
| for _ in range(4): |
| deck.append(Card(self.next_card_id(), "wild", "wild")) |
| deck.append(Card(self.next_card_id(), "wild", "wild_draw4")) |
| random.shuffle(deck) |
| return deck |
|
|
| def start_game(self, actor_id: str) -> dict[str, Any]: |
| self.require_host(actor_id) |
| if self.phase == "playing": |
| raise GameError("already_started", "This room is already playing.") |
| if not (MIN_PLAYERS <= len(self.players) <= MAX_PLAYERS): |
| raise GameError("bad_player_count", "A game needs 3 to 15 players.") |
| disconnected = [p for p in self.players if not p.is_bot and not p.connected] |
| if disconnected: |
| raise GameError("player_offline", "All human players must be connected before starting.") |
|
|
| self.card_uid_seed = 1 |
| self.deck = self.make_deck() |
| self.discard_pile = [] |
| self.direction = 1 |
| self.pending_draw = 0 |
| self.finish_order = [] |
| self.awaiting_color_player_id = None |
| self.awaiting_color_card_value = None |
| self.phase = "playing" |
|
|
| for player in self.players: |
| player.hand = [] |
| player.said_uno = False |
| player.finished = False |
| player.rank = None |
|
|
| for _ in range(7): |
| for player in self.players: |
| player.hand.append(self.deck.pop()) |
|
|
| start_card: Card | None = None |
| while self.deck: |
| candidate = self.deck.pop() |
| if candidate.color == "wild": |
| self.deck.insert(0, candidate) |
| random.shuffle(self.deck) |
| continue |
| start_card = candidate |
| break |
| if start_card is None: |
| raise GameError("deck_error", "Could not choose a starting card.") |
|
|
| self.discard_pile.append(start_card) |
| self.current_color = start_card.color |
| self.current_player_id = self.active_players()[0].id |
| self.sync_uno_flags() |
| return {"type": "game_started", "roomCode": self.code} |
|
|
| def require_host(self, actor_id: str) -> None: |
| if actor_id != self.host_player_id: |
| raise GameError("not_host", "Only the host can do that.") |
|
|
| def set_room_options(self, actor_id: str, payload: dict[str, Any]) -> dict[str, Any]: |
| self.require_host(actor_id) |
| if self.phase != "lobby": |
| raise GameError("game_started", "Room options can only be changed in the lobby.") |
| total = int(payload.get("totalPlayers", self.total_players)) |
| if not (MIN_PLAYERS <= total <= MAX_PLAYERS): |
| raise GameError("bad_total_players", "Total players must be between 3 and 15.") |
| if total < len(self.players): |
| raise GameError("too_small", "Total players cannot be lower than current seats.") |
| difficulty = payload.get("botDifficulty", self.bot_difficulty) |
| if difficulty not in BOT_DELAYS: |
| raise GameError("bad_difficulty", "Bot difficulty must be easy, medium, or hard.") |
| self.total_players = total |
| self.bot_difficulty = difficulty |
| self.bot_jump_in = bool(payload.get("botJumpIn", self.bot_jump_in)) |
| return {"type": "room_options_changed"} |
|
|
| def add_bot(self, actor_id: str) -> dict[str, Any]: |
| self.require_host(actor_id) |
| if self.phase != "lobby": |
| raise GameError("game_started", "Bots can only be added in the lobby.") |
| if len(self.players) >= self.total_players: |
| raise GameError("room_full", "The room is already full.") |
| bot_num = 1 + sum(1 for p in self.players if p.is_bot) |
| bot = Player( |
| id=make_id("bot"), |
| name=f"Bot {bot_num}", |
| token=secrets.token_urlsafe(16), |
| is_bot=True, |
| connected=True, |
| ) |
| self.players.append(bot) |
| return {"type": "bot_added", "playerId": bot.id, "name": bot.name} |
|
|
| def remove_bot(self, actor_id: str) -> dict[str, Any]: |
| self.require_host(actor_id) |
| if self.phase != "lobby": |
| raise GameError("game_started", "Bots can only be removed in the lobby.") |
| for idx in range(len(self.players) - 1, -1, -1): |
| if self.players[idx].is_bot: |
| bot = self.players.pop(idx) |
| return {"type": "bot_removed", "playerId": bot.id, "name": bot.name} |
| raise GameError("no_bot", "There are no bots to remove.") |
|
|
| def is_valid_play(self, card: Card) -> bool: |
| top_card = self.get_top_card() |
| if self.pending_draw > 0: |
| if top_card.value == "draw2": |
| return card.value in {"draw2", "wild_draw4"} |
| if top_card.value == "wild_draw4": |
| return card.value == "wild_draw4" |
| return False |
| if card.color == "wild": |
| return True |
| if card.color == self.current_color: |
| return True |
| return card.value == top_card.value |
|
|
| def is_strict_jump_in(self, card: Card) -> bool: |
| top_card = self.get_top_card() |
| return card.color == top_card.color and card.value == top_card.value |
|
|
| def legal_card_ids_for(self, player_id: str) -> list[str]: |
| if self.phase != "playing" or self.awaiting_color_player_id: |
| return [] |
| player = self.get_player(player_id) |
| if player.finished: |
| return [] |
| if player_id == self.current_player_id: |
| return [card.id for card in player.hand if self.is_valid_play(card)] |
| return [card.id for card in player.hand if self.is_strict_jump_in(card)] |
|
|
| def has_valid_turn_play(self, player_id: str) -> bool: |
| if self.phase != "playing" or self.awaiting_color_player_id: |
| return False |
| player = self.get_player(player_id) |
| if player.finished: |
| return False |
| return any(self.is_valid_play(card) for card in player.hand) |
|
|
| def can_draw(self, player_id: str) -> bool: |
| return ( |
| self.phase == "playing" |
| and not self.awaiting_color_player_id |
| and player_id == self.current_player_id |
| and not self.get_player(player_id).finished |
| ) |
|
|
| def find_card(self, player: Player, card_id: str) -> Card: |
| for card in player.hand: |
| if card.id == card_id: |
| return card |
| raise GameError("card_not_found", "That card is not in your hand.") |
|
|
| def remove_card(self, player: Player, card: Card) -> None: |
| for idx, candidate in enumerate(player.hand): |
| if candidate.id == card.id: |
| player.hand.pop(idx) |
| return |
| raise GameError("card_not_found", "That card is not in your hand.") |
|
|
| def choose_bot_color(self, bot: Player) -> str: |
| counts = {color: 0 for color in COLORS} |
| for card in bot.hand: |
| if card.color in counts: |
| counts[card.color] += 1 |
| return max(COLORS, key=lambda color: counts[color]) |
|
|
| def pick_bot_card(self, bot: Player) -> Card | None: |
| legal = [card for card in bot.hand if self.is_valid_play(card)] |
| if not legal: |
| return None |
| if self.bot_difficulty == "easy": |
| return random.choice(legal) |
| if self.bot_difficulty == "medium": |
| action = next((c for c in legal if c.color == self.current_color and c.value in {"skip", "reverse", "draw2"}), None) |
| if action: |
| return action |
| number = next((c for c in legal if c.color == self.current_color and c.value.isdigit()), None) |
| return number or legal[0] |
|
|
| has_current_color = any(c.color == self.current_color for c in bot.hand if c.color != "wild") |
| if not has_current_color: |
| wild_draw4 = next((c for c in legal if c.value == "wild_draw4"), None) |
| if wild_draw4: |
| return wild_draw4 |
| action = next((c for c in legal if c.color == self.current_color and c.value in {"skip", "reverse"}), None) |
| if action: |
| return action |
| draw2 = next((c for c in legal if c.color == self.current_color and c.value == "draw2"), None) |
| if draw2: |
| return draw2 |
| numbers = [c for c in legal if c.color == self.current_color and c.value.isdigit()] |
| if numbers: |
| return sorted(numbers, key=lambda c: int(c.value), reverse=True)[0] |
| non_wild = next((c for c in legal if c.color != "wild"), None) |
| return non_wild or legal[0] |
|
|
| def play_card( |
| self, |
| player_id: str, |
| card_id: str, |
| *, |
| bot_auto_color: bool = False, |
| expected_top_card_id: str | None = None, |
| expected_version: int | None = None, |
| ) -> dict[str, Any]: |
| if self.phase != "playing": |
| raise GameError("not_playing", "The game is not currently playing.") |
| if self.awaiting_color_player_id: |
| raise GameError("awaiting_color", "A wild card color must be chosen first.") |
|
|
| player = self.get_player(player_id) |
| if player.finished: |
| raise GameError("player_finished", "Finished players cannot play cards.") |
| card = self.find_card(player, card_id) |
| if expected_version is not None and expected_version != self.version: |
| raise GameError("stale_table", "The table changed before your play arrived.") |
| if expected_top_card_id is not None and expected_top_card_id != self.get_top_card().id: |
| raise GameError("stale_table", "The discard pile changed before your play arrived.") |
| jump_in = player_id != self.current_player_id |
| if jump_in: |
| if expected_version is None or expected_top_card_id is None: |
| raise GameError("stale_jump_in", "Jump-in requires the latest table snapshot.") |
| if not self.jump_in_enabled: |
| raise GameError("jump_in_disabled", "Jump-in is not enabled.") |
| if not self.is_strict_jump_in(card): |
| raise GameError("bad_jump_in", "Jump-in requires the exact same card.") |
| self.current_player_id = player_id |
| elif not self.is_valid_play(card): |
| raise GameError("bad_play", "That card cannot be played right now.") |
|
|
| self.remove_card(player, card) |
| self.discard_pile.append(card) |
|
|
| if card.color != "wild": |
| self.current_color = card.color |
| if card.value == "reverse": |
| self.direction *= -1 |
| elif card.value == "draw2": |
| self.pending_draw += 2 |
| elif card.value == "wild_draw4": |
| self.pending_draw += 4 |
| self.current_color = "wild" |
| elif card.value == "wild": |
| self.current_color = "wild" |
|
|
| self.sync_uno_flags() |
| self.mark_finished_if_needed(player) |
|
|
| chosen_color: str | None = None |
| if self.phase != "ended": |
| if card.value in {"wild", "wild_draw4"}: |
| if bot_auto_color or player.is_bot: |
| chosen_color = self.choose_bot_color(player) |
| self.current_color = chosen_color |
| self.finish_turn_after(player_id, card) |
| else: |
| self.awaiting_color_player_id = player_id |
| self.awaiting_color_card_value = card.value |
| else: |
| self.finish_turn_after(player_id, card) |
|
|
| return { |
| "type": "card_played", |
| "playerId": player_id, |
| "playerName": player.name, |
| "card": card.public(), |
| "jumpIn": jump_in, |
| "chosenColor": chosen_color, |
| } |
|
|
| def choose_color(self, player_id: str, color: str) -> dict[str, Any]: |
| if color not in COLORS: |
| raise GameError("bad_color", "Color must be red, blue, green, or yellow.") |
| if self.awaiting_color_player_id != player_id: |
| raise GameError("not_awaiting_you", "This room is not waiting for you to choose a color.") |
| value = self.awaiting_color_card_value or "wild" |
| self.current_color = color |
| self.awaiting_color_player_id = None |
| self.awaiting_color_card_value = None |
| if self.phase != "ended": |
| self.finish_turn_after(player_id, Card("color_choice", "wild", value)) |
| player = self.get_player(player_id) |
| return {"type": "color_changed", "playerId": player_id, "playerName": player.name, "color": color} |
|
|
| def draw_for_player(self, player_id: str, *, auto: bool = False, reason: str | None = None) -> dict[str, Any]: |
| if self.phase != "playing": |
| raise GameError("not_playing", "The game is not currently playing.") |
| if self.awaiting_color_player_id: |
| raise GameError("awaiting_color", "A wild card color must be chosen first.") |
| if player_id != self.current_player_id: |
| raise GameError("not_your_turn", "Only the current player can draw.") |
| player = self.get_player(player_id) |
| if player.finished: |
| raise GameError("player_finished", "Finished players cannot draw.") |
|
|
| count = self.pending_draw if self.pending_draw > 0 else 1 |
| drawn = self.take_cards(count) |
| player.hand.extend(drawn) |
| self.pending_draw = 0 |
| self.sync_uno_flags() |
| self.finish_turn_after(player_id, None) |
| event: dict[str, Any] = { |
| "type": "cards_drawn", |
| "playerId": player_id, |
| "playerName": player.name, |
| "count": len(drawn), |
| "cards": [card.public() for card in drawn], |
| } |
| if auto: |
| event["auto"] = True |
| event["reason"] = reason or "no_playable" |
| return event |
|
|
| def take_cards(self, count: int) -> list[Card]: |
| drawn: list[Card] = [] |
| for _ in range(count): |
| if not self.deck: |
| self.reshuffle_deck() |
| if not self.deck: |
| break |
| drawn.append(self.deck.pop()) |
| return drawn |
|
|
| def reshuffle_deck(self) -> None: |
| if len(self.discard_pile) <= 1: |
| return |
| top_card = self.discard_pile[-1] |
| recycled = self.discard_pile[:-1] |
| random.shuffle(recycled) |
| self.deck.extend(recycled) |
| self.discard_pile = [top_card] |
|
|
| def sync_uno_flags(self) -> None: |
| for player in self.players: |
| if len(player.hand) != 1: |
| player.said_uno = False |
|
|
| def call_uno(self, player_id: str) -> dict[str, Any]: |
| player = self.get_player(player_id) |
| if len(player.hand) != 1: |
| raise GameError("not_uno", "UNO can only be called with exactly one card.") |
| player.said_uno = True |
| return {"type": "uno_called", "playerId": player_id, "playerName": player.name} |
|
|
| def rename_player(self, player_id: str, name: str) -> dict[str, Any]: |
| player = self.get_player(player_id) |
| if player.is_bot: |
| raise GameError("bot_rename", "Bots cannot be renamed from a socket.") |
| new_name = normalize_name(name) |
| if player.name == new_name: |
| return {"type": "player_renamed", "playerId": player_id, "playerName": player.name} |
| player.name = new_name |
| return {"type": "player_renamed", "playerId": player_id, "playerName": player.name} |
|
|
| def mark_finished_if_needed(self, player: Player) -> None: |
| if player.hand or player.finished: |
| return |
| player.finished = True |
| player.rank = len(self.finish_order) + 1 |
| self.finish_order.append(player.id) |
| remaining = [p for p in self.players if not p.finished] |
| if len(remaining) == 1: |
| last = remaining[0] |
| last.finished = True |
| last.rank = len(self.finish_order) + 1 |
| self.finish_order.append(last.id) |
| self.phase = "ended" |
| self.current_player_id = None |
| self.awaiting_color_player_id = None |
| self.awaiting_color_card_value = None |
|
|
| def advance_steps_from(self, player_id: str, steps: int) -> str: |
| if not self.active_players(): |
| return player_id |
| idx = self.get_player_index(player_id) |
| for _ in range(steps): |
| for _ in range(len(self.players)): |
| idx = (idx + self.direction + len(self.players)) % len(self.players) |
| candidate = self.players[idx] |
| if not candidate.finished: |
| break |
| return self.players[idx].id |
|
|
| def finish_turn_after(self, player_id: str, card: Card | None) -> None: |
| if self.phase == "ended": |
| return |
| steps = 1 |
| if card and card.value == "skip": |
| steps = 2 |
| elif card and card.value == "reverse" and len(self.active_players()) == 2: |
| steps = 2 |
| self.current_player_id = self.advance_steps_from(player_id, steps) |
|
|
| def snapshot_for(self, recipient_id: str) -> dict[str, Any]: |
| legal_ids = self.legal_card_ids_for(recipient_id) if self.phase == "playing" else [] |
| players_payload: list[dict[str, Any]] = [] |
| for player in self.players: |
| payload: dict[str, Any] = { |
| "id": player.id, |
| "name": player.name, |
| "isBot": player.is_bot, |
| "connected": player.connected, |
| "handCount": len(player.hand), |
| "saidUno": player.said_uno, |
| "finished": player.finished, |
| "rank": player.rank, |
| } |
| if player.id == recipient_id: |
| payload["hand"] = [card.public() for card in player.hand] |
| players_payload.append(payload) |
| top_card = self.discard_pile[-1].public() if self.discard_pile else None |
| return { |
| "roomCode": self.code, |
| "phase": self.phase, |
| "myPlayerId": recipient_id, |
| "hostPlayerId": self.host_player_id, |
| "settings": { |
| "totalPlayers": self.total_players, |
| "botDifficulty": self.bot_difficulty, |
| "botJumpIn": self.bot_jump_in, |
| }, |
| "players": players_payload, |
| "deckCount": len(self.deck), |
| "discardPile": [card.public() for card in self.discard_pile], |
| "topCard": top_card, |
| "currentPlayerId": self.current_player_id, |
| "direction": self.direction, |
| "currentColor": self.current_color, |
| "pendingDraw": self.pending_draw, |
| "finishOrder": list(self.finish_order), |
| "awaitingColorPlayerId": self.awaiting_color_player_id, |
| "legalCardIds": legal_ids, |
| "canDraw": self.can_draw(recipient_id) if self.phase == "playing" else False, |
| "version": self.version, |
| } |
|
|
|
|
| class CreateRoomRequest(BaseModel): |
| name: str = Field(min_length=1, max_length=24) |
| totalPlayers: int = Field(default=3, ge=MIN_PLAYERS, le=MAX_PLAYERS) |
| botDifficulty: str = "hard" |
| botJumpIn: bool = True |
|
|
|
|
| class JoinRoomRequest(BaseModel): |
| name: str = Field(min_length=1, max_length=24) |
|
|
|
|
| rooms: dict[str, Room] = {} |
| app = FastAPI(title="Multiplayer UNO", version="1.0.0") |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
|
|
|
|
| def make_id(prefix: str) -> str: |
| return f"{prefix}_{secrets.token_urlsafe(8)}" |
|
|
|
|
| def make_room_code() -> str: |
| alphabet = string.ascii_uppercase + string.digits |
| while True: |
| code = "".join(secrets.choice(alphabet) for _ in range(5)) |
| if code not in rooms: |
| return code |
|
|
|
|
| def normalize_name(name: str) -> str: |
| cleaned = " ".join(name.strip().split()) |
| return cleaned[:24] or "Player" |
|
|
|
|
| def validate_difficulty(value: str) -> str: |
| if value not in BOT_DELAYS: |
| raise HTTPException(status_code=400, detail="botDifficulty must be easy, medium, or hard") |
| return value |
|
|
|
|
| def is_room_code(value: str) -> bool: |
| return len(value) == 5 and all(ch in string.ascii_letters + string.digits for ch in value) |
|
|
|
|
| def public_join_payload(room: Room, player: Player) -> dict[str, str]: |
| return {"roomCode": room.code, "playerId": player.id, "token": player.token} |
|
|
|
|
| @app.get("/") |
| async def index() -> FileResponse: |
| return FileResponse(STATIC_DIR / "index.html") |
|
|
|
|
| @app.get("/healthz") |
| async def healthz() -> JSONResponse: |
| return JSONResponse({"ok": True, "rooms": len(rooms)}) |
|
|
|
|
| @app.get("/favicon.ico") |
| async def favicon() -> Response: |
| return Response(status_code=204) |
|
|
|
|
| @app.get("/uno-style.css") |
| async def uno_style() -> Response: |
| if not UNO_HTML.exists(): |
| return Response("", media_type="text/css") |
| text = UNO_HTML.read_text(encoding="utf-8", errors="replace") |
| start = text.find("<style>") |
| end = text.find("</style>") |
| css = text[start + len("<style>") : end] if start != -1 and end != -1 else "" |
| return Response(css, media_type="text/css") |
|
|
|
|
| @app.get("/{room_code}") |
| async def room_link(room_code: str) -> FileResponse: |
| if not is_room_code(room_code): |
| raise HTTPException(status_code=404, detail="Not found") |
| return FileResponse(STATIC_DIR / "index.html") |
|
|
|
|
| @app.post("/api/rooms") |
| async def create_room(request: CreateRoomRequest) -> JSONResponse: |
| difficulty = validate_difficulty(request.botDifficulty) |
| code = make_room_code() |
| host = Player( |
| id=make_id("player"), |
| name=normalize_name(request.name), |
| token=secrets.token_urlsafe(24), |
| connected=False, |
| ) |
| room = Room( |
| code=code, |
| host_player_id=host.id, |
| total_players=request.totalPlayers, |
| bot_difficulty=difficulty, |
| bot_jump_in=request.botJumpIn, |
| players=[host], |
| ) |
| rooms[code] = room |
| return JSONResponse(public_join_payload(room, host)) |
|
|
|
|
| @app.post("/api/rooms/{room_code}/join") |
| async def join_room(room_code: str, request: JoinRoomRequest) -> JSONResponse: |
| code = room_code.upper() |
| room = rooms.get(code) |
| if not room: |
| raise HTTPException(status_code=404, detail="Room not found") |
| async with room.lock: |
| if room.phase != "lobby": |
| raise HTTPException(status_code=409, detail="This room has already started") |
| if len(room.players) >= room.total_players: |
| raise HTTPException(status_code=409, detail="This room is full") |
| player = Player( |
| id=make_id("player"), |
| name=normalize_name(request.name), |
| token=secrets.token_urlsafe(24), |
| connected=False, |
| ) |
| room.players.append(player) |
| room.touch() |
| room.version += 1 |
| await broadcast_room(room, {"type": "player_joined", "playerId": player.id, "playerName": player.name}) |
| return JSONResponse(public_join_payload(room, player)) |
|
|
|
|
| @app.websocket("/ws/{room_code}") |
| async def websocket_endpoint(websocket: WebSocket, room_code: str) -> None: |
| code = room_code.upper() |
| player_id = websocket.query_params.get("playerId", "") |
| token = websocket.query_params.get("token", "") |
| await websocket.accept() |
|
|
| room = rooms.get(code) |
| if not room: |
| await websocket.send_json({"type": "error", "code": "room_not_found", "message": "Room not found."}) |
| await websocket.close() |
| return |
|
|
| try: |
| async with room.lock: |
| player = room.get_player(player_id) |
| if player.token != token: |
| raise GameError("bad_token", "Invalid reconnect token.") |
| if player.is_bot: |
| raise GameError("bot_socket", "Bots cannot open sockets.") |
| player.connected = True |
| player.disconnected_at = None |
| room.connections[player_id] = websocket |
| room.touch() |
| room.version += 1 |
| await broadcast_room(room, {"type": "player_connected", "playerId": player.id, "playerName": player.name}) |
| schedule_room_automation(room) |
|
|
| while True: |
| data = await websocket.receive_json() |
| await handle_command(room, player_id, data) |
| except WebSocketDisconnect: |
| pass |
| except GameError as exc: |
| await websocket.send_json({"type": "error", "code": exc.code, "message": exc.message}) |
| finally: |
| async with room.lock: |
| existing = room.connections.get(player_id) |
| if existing is websocket: |
| room.connections.pop(player_id, None) |
| try: |
| player = room.get_player(player_id) |
| player.connected = False |
| player.disconnected_at = time.time() |
| room.version += 1 |
| except GameError: |
| player = None |
| if player: |
| await broadcast_room(room, {"type": "player_disconnected", "playerId": player.id, "playerName": player.name}) |
|
|
|
|
| async def handle_command(room: Room, player_id: str, data: dict[str, Any]) -> None: |
| command = data.get("type") |
| payload = data.get("payload") or {} |
| event: dict[str, Any] | None = None |
| error: GameError | None = None |
| changed = False |
|
|
| async with room.lock: |
| try: |
| room.touch() |
| if command == "set_room_options": |
| event = room.set_room_options(player_id, payload) |
| changed = True |
| elif command == "add_bot": |
| event = room.add_bot(player_id) |
| changed = True |
| elif command == "remove_bot": |
| event = room.remove_bot(player_id) |
| changed = True |
| elif command == "start_game": |
| event = room.start_game(player_id) |
| changed = True |
| elif command == "play_card": |
| card_id = str(payload.get("cardId", "")) |
| expected_top = payload.get("expectedTopCardId") |
| expected_version = payload.get("expectedVersion") |
| if expected_version is not None: |
| expected_version = int(expected_version) |
| event = room.play_card( |
| player_id, |
| card_id, |
| expected_top_card_id=str(expected_top) if expected_top is not None else None, |
| expected_version=expected_version, |
| ) |
| changed = True |
| elif command == "draw": |
| event = room.draw_for_player(player_id) |
| changed = True |
| elif command == "choose_color": |
| color = str(payload.get("color", "")) |
| event = room.choose_color(player_id, color) |
| changed = True |
| elif command == "call_uno": |
| event = room.call_uno(player_id) |
| changed = True |
| elif command == "rename": |
| event = room.rename_player(player_id, str(payload.get("name", ""))) |
| changed = True |
| elif command == "restart": |
| event = room.start_game(player_id) |
| changed = True |
| else: |
| raise GameError("unknown_command", "Unknown command.") |
| if changed: |
| room.version += 1 |
| except GameError as exc: |
| error = exc |
|
|
| if error: |
| await send_error(room, player_id, error) |
| return |
| if changed: |
| await broadcast_room(room, event) |
| schedule_room_automation(room) |
|
|
|
|
| async def send_error(room: Room, player_id: str, error: GameError) -> None: |
| websocket = room.connections.get(player_id) |
| if not websocket: |
| return |
| await websocket.send_json({"type": "error", "code": error.code, "message": error.message}) |
| async with room.lock: |
| snapshot = room.snapshot_for(player_id) |
| await websocket.send_json({"type": "snapshot", "state": snapshot}) |
|
|
|
|
| def event_for_recipient(event: dict[str, Any] | None, recipient_id: str) -> dict[str, Any] | None: |
| if not event: |
| return None |
| filtered = dict(event) |
| if filtered.get("type") == "cards_drawn" and filtered.get("playerId") != recipient_id: |
| filtered.pop("cards", None) |
| return filtered |
|
|
|
|
| async def broadcast_room(room: Room, event: dict[str, Any] | None = None) -> None: |
| async with room.lock: |
| recipients = list(room.connections.items()) |
| snapshots = {player_id: room.snapshot_for(player_id) for player_id, _ in recipients} |
| dead: list[str] = [] |
| for player_id, websocket in recipients: |
| try: |
| recipient_event = event_for_recipient(event, player_id) |
| if recipient_event: |
| await websocket.send_json({"type": "event", "event": recipient_event}) |
| await websocket.send_json({"type": "snapshot", "state": snapshots[player_id]}) |
| except Exception: |
| dead.append(player_id) |
| if dead: |
| async with room.lock: |
| for player_id in dead: |
| room.connections.pop(player_id, None) |
|
|
|
|
| def schedule_room_automation(room: Room) -> None: |
| if room.phase != "playing" or room.awaiting_color_player_id: |
| return |
| version = room.version |
| current = None |
| try: |
| current = room.get_player(room.current_player_id or "") |
| except GameError: |
| pass |
| if current and current.is_bot: |
| delay = BOT_DELAYS.get(room.bot_difficulty, 1.0) |
| asyncio.create_task(run_bot_turn(room.code, current.id, version, delay)) |
| elif current and not current.finished and not room.has_valid_turn_play(current.id): |
| top_card = room.get_top_card() |
| asyncio.create_task( |
| run_auto_draw( |
| room.code, |
| current.id, |
| version, |
| top_card.id, |
| room.current_color, |
| room.pending_draw, |
| AUTO_DRAW_DELAY_SECONDS, |
| ) |
| ) |
| if room.bot_jump_in: |
| for player in room.players: |
| if player.is_bot and player.id != room.current_player_id and not player.finished: |
| if any(room.is_strict_jump_in(card) for card in player.hand): |
| delay = random.uniform(0.45, 0.9) |
| asyncio.create_task(run_bot_jump_in(room.code, player.id, version, delay)) |
|
|
|
|
| def schedule_bots(room: Room) -> None: |
| schedule_room_automation(room) |
|
|
|
|
| async def run_bot_turn(room_code: str, player_id: str, expected_version: int, delay: float) -> None: |
| await asyncio.sleep(delay) |
| room = rooms.get(room_code) |
| if not room: |
| return |
| event: dict[str, Any] | None = None |
| async with room.lock: |
| if room.version != expected_version or room.phase != "playing" or room.awaiting_color_player_id: |
| return |
| if room.current_player_id != player_id: |
| return |
| bot = room.get_player(player_id) |
| card = room.pick_bot_card(bot) |
| if card: |
| event = room.play_card(player_id, card.id, bot_auto_color=True) |
| else: |
| event = room.draw_for_player(player_id) |
| room.touch() |
| room.version += 1 |
| await broadcast_room(room, event) |
| schedule_room_automation(room) |
|
|
|
|
| async def run_bot_jump_in(room_code: str, player_id: str, expected_version: int, delay: float) -> None: |
| await asyncio.sleep(delay) |
| room = rooms.get(room_code) |
| if not room: |
| return |
| event: dict[str, Any] | None = None |
| async with room.lock: |
| if room.version != expected_version or room.phase != "playing" or room.awaiting_color_player_id: |
| return |
| if room.current_player_id == player_id: |
| return |
| bot = room.get_player(player_id) |
| card = next((candidate for candidate in bot.hand if room.is_strict_jump_in(candidate)), None) |
| if not card: |
| return |
| event = room.play_card( |
| player_id, |
| card.id, |
| bot_auto_color=True, |
| expected_top_card_id=room.get_top_card().id, |
| expected_version=expected_version, |
| ) |
| room.touch() |
| room.version += 1 |
| await broadcast_room(room, event) |
| schedule_room_automation(room) |
|
|
|
|
| async def run_auto_draw( |
| room_code: str, |
| player_id: str, |
| expected_version: int, |
| expected_top_card_id: str, |
| expected_color: str, |
| expected_pending_draw: int, |
| delay: float = AUTO_DRAW_DELAY_SECONDS, |
| ) -> None: |
| await asyncio.sleep(delay) |
| room = rooms.get(room_code) |
| if not room: |
| return |
| event: dict[str, Any] | None = None |
| async with room.lock: |
| if room.version != expected_version or room.phase != "playing" or room.awaiting_color_player_id: |
| return |
| if room.current_player_id != player_id: |
| return |
| if room.current_color != expected_color or room.pending_draw != expected_pending_draw: |
| return |
| if room.get_top_card().id != expected_top_card_id: |
| return |
| player = room.get_player(player_id) |
| if player.is_bot or player.finished or room.has_valid_turn_play(player_id): |
| return |
| event = room.draw_for_player(player_id, auto=True, reason="no_playable") |
| room.touch() |
| room.version += 1 |
| await broadcast_room(room, event) |
| schedule_room_automation(room) |
|
|
|
|
| async def cleanup_rooms() -> None: |
| while True: |
| await asyncio.sleep(30) |
| now = time.time() |
| for code, room in list(rooms.items()): |
| async with room.lock: |
| if now - room.last_active > ROOM_IDLE_SECONDS: |
| rooms.pop(code, None) |
| continue |
| if room.phase == "lobby": |
| for player in list(room.players): |
| if ( |
| not player.is_bot |
| and not player.connected |
| and player.disconnected_at |
| and now - player.disconnected_at > RECONNECT_SECONDS |
| and player.id != room.host_player_id |
| ): |
| room.players.remove(player) |
| room.version += 1 |
|
|
|
|
| @app.on_event("startup") |
| async def on_startup() -> None: |
| asyncio.create_task(cleanup_rooms()) |
|
|
|
|
| def create_deck_for_tests() -> list[Card]: |
| room = Room(code="TEST", host_player_id="host", total_players=3) |
| return room.make_deck() |
|
|