from __future__ import annotations import asyncio import random import secrets import string import time from dataclasses import dataclass, field from pathlib import Path from typing import Any from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse, JSONResponse, Response from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field ROOT = Path(__file__).resolve().parent STATIC_DIR = ROOT / "static" UNO_HTML = ROOT / "uno.html" COLORS = ["red", "blue", "green", "yellow"] ACTION_VALUES = {"skip", "reverse", "draw2", "wild", "wild_draw4"} BOT_DELAYS = {"easy": 0.6, "medium": 1.0, "hard": 1.4} MIN_PLAYERS = 3 MAX_PLAYERS = 15 ROOM_IDLE_SECONDS = 30 * 60 RECONNECT_SECONDS = 2 * 60 AUTO_DRAW_DELAY_SECONDS = 0.42 class GameError(Exception): def __init__(self, code: str, message: str): super().__init__(message) self.code = code self.message = message @dataclass class Card: id: str color: str value: str def public(self) -> dict[str, str]: return {"id": self.id, "color": self.color, "value": self.value} @dataclass class Player: id: str name: str token: str is_bot: bool = False connected: bool = False hand: list[Card] = field(default_factory=list) said_uno: bool = False finished: bool = False rank: int | None = None disconnected_at: float | None = None @dataclass class Room: code: str host_player_id: str total_players: int bot_difficulty: str = "hard" bot_jump_in: bool = True phase: str = "lobby" players: list[Player] = field(default_factory=list) deck: list[Card] = field(default_factory=list) discard_pile: list[Card] = field(default_factory=list) current_player_id: str | None = None direction: int = 1 current_color: str = "red" pending_draw: int = 0 finish_order: list[str] = field(default_factory=list) awaiting_color_player_id: str | None = None awaiting_color_card_value: str | None = None jump_in_enabled: bool = True connections: dict[str, WebSocket] = field(default_factory=dict) lock: asyncio.Lock = field(default_factory=asyncio.Lock) created_at: float = field(default_factory=time.time) last_active: float = field(default_factory=time.time) version: int = 0 card_uid_seed: int = 1 def touch(self) -> None: self.last_active = time.time() def next_card_id(self) -> str: card_id = f"card_{self.card_uid_seed}" self.card_uid_seed += 1 return card_id def active_players(self) -> list[Player]: return [p for p in self.players if not p.finished] def human_players(self) -> list[Player]: return [p for p in self.players if not p.is_bot] def get_player(self, player_id: str) -> Player: for player in self.players: if player.id == player_id: return player raise GameError("player_not_found", "Player is not in this room.") def get_player_index(self, player_id: str) -> int: for idx, player in enumerate(self.players): if player.id == player_id: return idx raise GameError("player_not_found", "Player is not in this room.") def get_top_card(self) -> Card: if not self.discard_pile: raise GameError("no_discard", "The discard pile is empty.") return self.discard_pile[-1] def make_deck(self) -> list[Card]: deck: list[Card] = [] for color in COLORS: deck.append(Card(self.next_card_id(), color, "0")) for value in range(1, 10): deck.append(Card(self.next_card_id(), color, str(value))) deck.append(Card(self.next_card_id(), color, str(value))) for value in ("skip", "reverse", "draw2"): deck.append(Card(self.next_card_id(), color, value)) deck.append(Card(self.next_card_id(), color, value)) for _ in range(4): deck.append(Card(self.next_card_id(), "wild", "wild")) deck.append(Card(self.next_card_id(), "wild", "wild_draw4")) random.shuffle(deck) return deck def start_game(self, actor_id: str) -> dict[str, Any]: self.require_host(actor_id) if self.phase == "playing": raise GameError("already_started", "This room is already playing.") if not (MIN_PLAYERS <= len(self.players) <= MAX_PLAYERS): raise GameError("bad_player_count", "A game needs 3 to 15 players.") disconnected = [p for p in self.players if not p.is_bot and not p.connected] if disconnected: raise GameError("player_offline", "All human players must be connected before starting.") self.card_uid_seed = 1 self.deck = self.make_deck() self.discard_pile = [] self.direction = 1 self.pending_draw = 0 self.finish_order = [] self.awaiting_color_player_id = None self.awaiting_color_card_value = None self.phase = "playing" for player in self.players: player.hand = [] player.said_uno = False player.finished = False player.rank = None for _ in range(7): for player in self.players: player.hand.append(self.deck.pop()) start_card: Card | None = None while self.deck: candidate = self.deck.pop() if candidate.color == "wild": self.deck.insert(0, candidate) random.shuffle(self.deck) continue start_card = candidate break if start_card is None: raise GameError("deck_error", "Could not choose a starting card.") self.discard_pile.append(start_card) self.current_color = start_card.color self.current_player_id = self.active_players()[0].id self.sync_uno_flags() return {"type": "game_started", "roomCode": self.code} def require_host(self, actor_id: str) -> None: if actor_id != self.host_player_id: raise GameError("not_host", "Only the host can do that.") def set_room_options(self, actor_id: str, payload: dict[str, Any]) -> dict[str, Any]: self.require_host(actor_id) if self.phase != "lobby": raise GameError("game_started", "Room options can only be changed in the lobby.") total = int(payload.get("totalPlayers", self.total_players)) if not (MIN_PLAYERS <= total <= MAX_PLAYERS): raise GameError("bad_total_players", "Total players must be between 3 and 15.") if total < len(self.players): raise GameError("too_small", "Total players cannot be lower than current seats.") difficulty = payload.get("botDifficulty", self.bot_difficulty) if difficulty not in BOT_DELAYS: raise GameError("bad_difficulty", "Bot difficulty must be easy, medium, or hard.") self.total_players = total self.bot_difficulty = difficulty self.bot_jump_in = bool(payload.get("botJumpIn", self.bot_jump_in)) return {"type": "room_options_changed"} def add_bot(self, actor_id: str) -> dict[str, Any]: self.require_host(actor_id) if self.phase != "lobby": raise GameError("game_started", "Bots can only be added in the lobby.") if len(self.players) >= self.total_players: raise GameError("room_full", "The room is already full.") bot_num = 1 + sum(1 for p in self.players if p.is_bot) bot = Player( id=make_id("bot"), name=f"Bot {bot_num}", token=secrets.token_urlsafe(16), is_bot=True, connected=True, ) self.players.append(bot) return {"type": "bot_added", "playerId": bot.id, "name": bot.name} def remove_bot(self, actor_id: str) -> dict[str, Any]: self.require_host(actor_id) if self.phase != "lobby": raise GameError("game_started", "Bots can only be removed in the lobby.") for idx in range(len(self.players) - 1, -1, -1): if self.players[idx].is_bot: bot = self.players.pop(idx) return {"type": "bot_removed", "playerId": bot.id, "name": bot.name} raise GameError("no_bot", "There are no bots to remove.") def is_valid_play(self, card: Card) -> bool: top_card = self.get_top_card() if self.pending_draw > 0: if top_card.value == "draw2": return card.value in {"draw2", "wild_draw4"} if top_card.value == "wild_draw4": return card.value == "wild_draw4" return False if card.color == "wild": return True if card.color == self.current_color: return True return card.value == top_card.value def is_strict_jump_in(self, card: Card) -> bool: top_card = self.get_top_card() return card.color == top_card.color and card.value == top_card.value def legal_card_ids_for(self, player_id: str) -> list[str]: if self.phase != "playing" or self.awaiting_color_player_id: return [] player = self.get_player(player_id) if player.finished: return [] if player_id == self.current_player_id: return [card.id for card in player.hand if self.is_valid_play(card)] return [card.id for card in player.hand if self.is_strict_jump_in(card)] def has_valid_turn_play(self, player_id: str) -> bool: if self.phase != "playing" or self.awaiting_color_player_id: return False player = self.get_player(player_id) if player.finished: return False return any(self.is_valid_play(card) for card in player.hand) def can_draw(self, player_id: str) -> bool: return ( self.phase == "playing" and not self.awaiting_color_player_id and player_id == self.current_player_id and not self.get_player(player_id).finished ) def find_card(self, player: Player, card_id: str) -> Card: for card in player.hand: if card.id == card_id: return card raise GameError("card_not_found", "That card is not in your hand.") def remove_card(self, player: Player, card: Card) -> None: for idx, candidate in enumerate(player.hand): if candidate.id == card.id: player.hand.pop(idx) return raise GameError("card_not_found", "That card is not in your hand.") def choose_bot_color(self, bot: Player) -> str: counts = {color: 0 for color in COLORS} for card in bot.hand: if card.color in counts: counts[card.color] += 1 return max(COLORS, key=lambda color: counts[color]) def pick_bot_card(self, bot: Player) -> Card | None: legal = [card for card in bot.hand if self.is_valid_play(card)] if not legal: return None if self.bot_difficulty == "easy": return random.choice(legal) if self.bot_difficulty == "medium": action = next((c for c in legal if c.color == self.current_color and c.value in {"skip", "reverse", "draw2"}), None) if action: return action number = next((c for c in legal if c.color == self.current_color and c.value.isdigit()), None) return number or legal[0] has_current_color = any(c.color == self.current_color for c in bot.hand if c.color != "wild") if not has_current_color: wild_draw4 = next((c for c in legal if c.value == "wild_draw4"), None) if wild_draw4: return wild_draw4 action = next((c for c in legal if c.color == self.current_color and c.value in {"skip", "reverse"}), None) if action: return action draw2 = next((c for c in legal if c.color == self.current_color and c.value == "draw2"), None) if draw2: return draw2 numbers = [c for c in legal if c.color == self.current_color and c.value.isdigit()] if numbers: return sorted(numbers, key=lambda c: int(c.value), reverse=True)[0] non_wild = next((c for c in legal if c.color != "wild"), None) return non_wild or legal[0] def play_card( self, player_id: str, card_id: str, *, bot_auto_color: bool = False, expected_top_card_id: str | None = None, expected_version: int | None = None, ) -> dict[str, Any]: if self.phase != "playing": raise GameError("not_playing", "The game is not currently playing.") if self.awaiting_color_player_id: raise GameError("awaiting_color", "A wild card color must be chosen first.") player = self.get_player(player_id) if player.finished: raise GameError("player_finished", "Finished players cannot play cards.") card = self.find_card(player, card_id) if expected_version is not None and expected_version != self.version: raise GameError("stale_table", "The table changed before your play arrived.") if expected_top_card_id is not None and expected_top_card_id != self.get_top_card().id: raise GameError("stale_table", "The discard pile changed before your play arrived.") jump_in = player_id != self.current_player_id if jump_in: if expected_version is None or expected_top_card_id is None: raise GameError("stale_jump_in", "Jump-in requires the latest table snapshot.") if not self.jump_in_enabled: raise GameError("jump_in_disabled", "Jump-in is not enabled.") if not self.is_strict_jump_in(card): raise GameError("bad_jump_in", "Jump-in requires the exact same card.") self.current_player_id = player_id elif not self.is_valid_play(card): raise GameError("bad_play", "That card cannot be played right now.") self.remove_card(player, card) self.discard_pile.append(card) if card.color != "wild": self.current_color = card.color if card.value == "reverse": self.direction *= -1 elif card.value == "draw2": self.pending_draw += 2 elif card.value == "wild_draw4": self.pending_draw += 4 self.current_color = "wild" elif card.value == "wild": self.current_color = "wild" self.sync_uno_flags() self.mark_finished_if_needed(player) chosen_color: str | None = None if self.phase != "ended": if card.value in {"wild", "wild_draw4"}: if bot_auto_color or player.is_bot: chosen_color = self.choose_bot_color(player) self.current_color = chosen_color self.finish_turn_after(player_id, card) else: self.awaiting_color_player_id = player_id self.awaiting_color_card_value = card.value else: self.finish_turn_after(player_id, card) return { "type": "card_played", "playerId": player_id, "playerName": player.name, "card": card.public(), "jumpIn": jump_in, "chosenColor": chosen_color, } def choose_color(self, player_id: str, color: str) -> dict[str, Any]: if color not in COLORS: raise GameError("bad_color", "Color must be red, blue, green, or yellow.") if self.awaiting_color_player_id != player_id: raise GameError("not_awaiting_you", "This room is not waiting for you to choose a color.") value = self.awaiting_color_card_value or "wild" self.current_color = color self.awaiting_color_player_id = None self.awaiting_color_card_value = None if self.phase != "ended": self.finish_turn_after(player_id, Card("color_choice", "wild", value)) player = self.get_player(player_id) return {"type": "color_changed", "playerId": player_id, "playerName": player.name, "color": color} def draw_for_player(self, player_id: str, *, auto: bool = False, reason: str | None = None) -> dict[str, Any]: if self.phase != "playing": raise GameError("not_playing", "The game is not currently playing.") if self.awaiting_color_player_id: raise GameError("awaiting_color", "A wild card color must be chosen first.") if player_id != self.current_player_id: raise GameError("not_your_turn", "Only the current player can draw.") player = self.get_player(player_id) if player.finished: raise GameError("player_finished", "Finished players cannot draw.") count = self.pending_draw if self.pending_draw > 0 else 1 drawn = self.take_cards(count) player.hand.extend(drawn) self.pending_draw = 0 self.sync_uno_flags() self.finish_turn_after(player_id, None) event: dict[str, Any] = { "type": "cards_drawn", "playerId": player_id, "playerName": player.name, "count": len(drawn), "cards": [card.public() for card in drawn], } if auto: event["auto"] = True event["reason"] = reason or "no_playable" return event def take_cards(self, count: int) -> list[Card]: drawn: list[Card] = [] for _ in range(count): if not self.deck: self.reshuffle_deck() if not self.deck: break drawn.append(self.deck.pop()) return drawn def reshuffle_deck(self) -> None: if len(self.discard_pile) <= 1: return top_card = self.discard_pile[-1] recycled = self.discard_pile[:-1] random.shuffle(recycled) self.deck.extend(recycled) self.discard_pile = [top_card] def sync_uno_flags(self) -> None: for player in self.players: if len(player.hand) != 1: player.said_uno = False def call_uno(self, player_id: str) -> dict[str, Any]: player = self.get_player(player_id) if len(player.hand) != 1: raise GameError("not_uno", "UNO can only be called with exactly one card.") player.said_uno = True return {"type": "uno_called", "playerId": player_id, "playerName": player.name} def rename_player(self, player_id: str, name: str) -> dict[str, Any]: player = self.get_player(player_id) if player.is_bot: raise GameError("bot_rename", "Bots cannot be renamed from a socket.") new_name = normalize_name(name) if player.name == new_name: return {"type": "player_renamed", "playerId": player_id, "playerName": player.name} player.name = new_name return {"type": "player_renamed", "playerId": player_id, "playerName": player.name} def mark_finished_if_needed(self, player: Player) -> None: if player.hand or player.finished: return player.finished = True player.rank = len(self.finish_order) + 1 self.finish_order.append(player.id) remaining = [p for p in self.players if not p.finished] if len(remaining) == 1: last = remaining[0] last.finished = True last.rank = len(self.finish_order) + 1 self.finish_order.append(last.id) self.phase = "ended" self.current_player_id = None self.awaiting_color_player_id = None self.awaiting_color_card_value = None def advance_steps_from(self, player_id: str, steps: int) -> str: if not self.active_players(): return player_id idx = self.get_player_index(player_id) for _ in range(steps): for _ in range(len(self.players)): idx = (idx + self.direction + len(self.players)) % len(self.players) candidate = self.players[idx] if not candidate.finished: break return self.players[idx].id def finish_turn_after(self, player_id: str, card: Card | None) -> None: if self.phase == "ended": return steps = 1 if card and card.value == "skip": steps = 2 elif card and card.value == "reverse" and len(self.active_players()) == 2: steps = 2 self.current_player_id = self.advance_steps_from(player_id, steps) def snapshot_for(self, recipient_id: str) -> dict[str, Any]: legal_ids = self.legal_card_ids_for(recipient_id) if self.phase == "playing" else [] players_payload: list[dict[str, Any]] = [] for player in self.players: payload: dict[str, Any] = { "id": player.id, "name": player.name, "isBot": player.is_bot, "connected": player.connected, "handCount": len(player.hand), "saidUno": player.said_uno, "finished": player.finished, "rank": player.rank, } if player.id == recipient_id: payload["hand"] = [card.public() for card in player.hand] players_payload.append(payload) top_card = self.discard_pile[-1].public() if self.discard_pile else None return { "roomCode": self.code, "phase": self.phase, "myPlayerId": recipient_id, "hostPlayerId": self.host_player_id, "settings": { "totalPlayers": self.total_players, "botDifficulty": self.bot_difficulty, "botJumpIn": self.bot_jump_in, }, "players": players_payload, "deckCount": len(self.deck), "discardPile": [card.public() for card in self.discard_pile], "topCard": top_card, "currentPlayerId": self.current_player_id, "direction": self.direction, "currentColor": self.current_color, "pendingDraw": self.pending_draw, "finishOrder": list(self.finish_order), "awaitingColorPlayerId": self.awaiting_color_player_id, "legalCardIds": legal_ids, "canDraw": self.can_draw(recipient_id) if self.phase == "playing" else False, "version": self.version, } class CreateRoomRequest(BaseModel): name: str = Field(min_length=1, max_length=24) totalPlayers: int = Field(default=3, ge=MIN_PLAYERS, le=MAX_PLAYERS) botDifficulty: str = "hard" botJumpIn: bool = True class JoinRoomRequest(BaseModel): name: str = Field(min_length=1, max_length=24) rooms: dict[str, Room] = {} app = FastAPI(title="Multiplayer UNO", version="1.0.0") app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") def make_id(prefix: str) -> str: return f"{prefix}_{secrets.token_urlsafe(8)}" def make_room_code() -> str: alphabet = string.ascii_uppercase + string.digits while True: code = "".join(secrets.choice(alphabet) for _ in range(5)) if code not in rooms: return code def normalize_name(name: str) -> str: cleaned = " ".join(name.strip().split()) return cleaned[:24] or "Player" def validate_difficulty(value: str) -> str: if value not in BOT_DELAYS: raise HTTPException(status_code=400, detail="botDifficulty must be easy, medium, or hard") return value def is_room_code(value: str) -> bool: return len(value) == 5 and all(ch in string.ascii_letters + string.digits for ch in value) def public_join_payload(room: Room, player: Player) -> dict[str, str]: return {"roomCode": room.code, "playerId": player.id, "token": player.token} @app.get("/") async def index() -> FileResponse: return FileResponse(STATIC_DIR / "index.html") @app.get("/healthz") async def healthz() -> JSONResponse: return JSONResponse({"ok": True, "rooms": len(rooms)}) @app.get("/favicon.ico") async def favicon() -> Response: return Response(status_code=204) @app.get("/uno-style.css") async def uno_style() -> Response: if not UNO_HTML.exists(): return Response("", media_type="text/css") text = UNO_HTML.read_text(encoding="utf-8", errors="replace") start = text.find("") css = text[start + len("