from __future__ import annotations
import asyncio
import random
import secrets
import string
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse, JSONResponse, Response
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
ROOT = Path(__file__).resolve().parent
STATIC_DIR = ROOT / "static"
UNO_HTML = ROOT / "uno.html"
COLORS = ["red", "blue", "green", "yellow"]
ACTION_VALUES = {"skip", "reverse", "draw2", "wild", "wild_draw4"}
BOT_DELAYS = {"easy": 0.6, "medium": 1.0, "hard": 1.4}
MIN_PLAYERS = 3
MAX_PLAYERS = 15
ROOM_IDLE_SECONDS = 30 * 60
RECONNECT_SECONDS = 2 * 60
AUTO_DRAW_DELAY_SECONDS = 0.42
class GameError(Exception):
def __init__(self, code: str, message: str):
super().__init__(message)
self.code = code
self.message = message
@dataclass
class Card:
id: str
color: str
value: str
def public(self) -> dict[str, str]:
return {"id": self.id, "color": self.color, "value": self.value}
@dataclass
class Player:
id: str
name: str
token: str
is_bot: bool = False
connected: bool = False
hand: list[Card] = field(default_factory=list)
said_uno: bool = False
finished: bool = False
rank: int | None = None
disconnected_at: float | None = None
@dataclass
class Room:
code: str
host_player_id: str
total_players: int
bot_difficulty: str = "hard"
bot_jump_in: bool = True
phase: str = "lobby"
players: list[Player] = field(default_factory=list)
deck: list[Card] = field(default_factory=list)
discard_pile: list[Card] = field(default_factory=list)
current_player_id: str | None = None
direction: int = 1
current_color: str = "red"
pending_draw: int = 0
finish_order: list[str] = field(default_factory=list)
awaiting_color_player_id: str | None = None
awaiting_color_card_value: str | None = None
jump_in_enabled: bool = True
connections: dict[str, WebSocket] = field(default_factory=dict)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
version: int = 0
card_uid_seed: int = 1
def touch(self) -> None:
self.last_active = time.time()
def next_card_id(self) -> str:
card_id = f"card_{self.card_uid_seed}"
self.card_uid_seed += 1
return card_id
def active_players(self) -> list[Player]:
return [p for p in self.players if not p.finished]
def human_players(self) -> list[Player]:
return [p for p in self.players if not p.is_bot]
def get_player(self, player_id: str) -> Player:
for player in self.players:
if player.id == player_id:
return player
raise GameError("player_not_found", "Player is not in this room.")
def get_player_index(self, player_id: str) -> int:
for idx, player in enumerate(self.players):
if player.id == player_id:
return idx
raise GameError("player_not_found", "Player is not in this room.")
def get_top_card(self) -> Card:
if not self.discard_pile:
raise GameError("no_discard", "The discard pile is empty.")
return self.discard_pile[-1]
def make_deck(self) -> list[Card]:
deck: list[Card] = []
for color in COLORS:
deck.append(Card(self.next_card_id(), color, "0"))
for value in range(1, 10):
deck.append(Card(self.next_card_id(), color, str(value)))
deck.append(Card(self.next_card_id(), color, str(value)))
for value in ("skip", "reverse", "draw2"):
deck.append(Card(self.next_card_id(), color, value))
deck.append(Card(self.next_card_id(), color, value))
for _ in range(4):
deck.append(Card(self.next_card_id(), "wild", "wild"))
deck.append(Card(self.next_card_id(), "wild", "wild_draw4"))
random.shuffle(deck)
return deck
def start_game(self, actor_id: str) -> dict[str, Any]:
self.require_host(actor_id)
if self.phase == "playing":
raise GameError("already_started", "This room is already playing.")
if not (MIN_PLAYERS <= len(self.players) <= MAX_PLAYERS):
raise GameError("bad_player_count", "A game needs 3 to 15 players.")
disconnected = [p for p in self.players if not p.is_bot and not p.connected]
if disconnected:
raise GameError("player_offline", "All human players must be connected before starting.")
self.card_uid_seed = 1
self.deck = self.make_deck()
self.discard_pile = []
self.direction = 1
self.pending_draw = 0
self.finish_order = []
self.awaiting_color_player_id = None
self.awaiting_color_card_value = None
self.phase = "playing"
for player in self.players:
player.hand = []
player.said_uno = False
player.finished = False
player.rank = None
for _ in range(7):
for player in self.players:
player.hand.append(self.deck.pop())
start_card: Card | None = None
while self.deck:
candidate = self.deck.pop()
if candidate.color == "wild":
self.deck.insert(0, candidate)
random.shuffle(self.deck)
continue
start_card = candidate
break
if start_card is None:
raise GameError("deck_error", "Could not choose a starting card.")
self.discard_pile.append(start_card)
self.current_color = start_card.color
self.current_player_id = self.active_players()[0].id
self.sync_uno_flags()
return {"type": "game_started", "roomCode": self.code}
def require_host(self, actor_id: str) -> None:
if actor_id != self.host_player_id:
raise GameError("not_host", "Only the host can do that.")
def set_room_options(self, actor_id: str, payload: dict[str, Any]) -> dict[str, Any]:
self.require_host(actor_id)
if self.phase != "lobby":
raise GameError("game_started", "Room options can only be changed in the lobby.")
total = int(payload.get("totalPlayers", self.total_players))
if not (MIN_PLAYERS <= total <= MAX_PLAYERS):
raise GameError("bad_total_players", "Total players must be between 3 and 15.")
if total < len(self.players):
raise GameError("too_small", "Total players cannot be lower than current seats.")
difficulty = payload.get("botDifficulty", self.bot_difficulty)
if difficulty not in BOT_DELAYS:
raise GameError("bad_difficulty", "Bot difficulty must be easy, medium, or hard.")
self.total_players = total
self.bot_difficulty = difficulty
self.bot_jump_in = bool(payload.get("botJumpIn", self.bot_jump_in))
return {"type": "room_options_changed"}
def add_bot(self, actor_id: str) -> dict[str, Any]:
self.require_host(actor_id)
if self.phase != "lobby":
raise GameError("game_started", "Bots can only be added in the lobby.")
if len(self.players) >= self.total_players:
raise GameError("room_full", "The room is already full.")
bot_num = 1 + sum(1 for p in self.players if p.is_bot)
bot = Player(
id=make_id("bot"),
name=f"Bot {bot_num}",
token=secrets.token_urlsafe(16),
is_bot=True,
connected=True,
)
self.players.append(bot)
return {"type": "bot_added", "playerId": bot.id, "name": bot.name}
def remove_bot(self, actor_id: str) -> dict[str, Any]:
self.require_host(actor_id)
if self.phase != "lobby":
raise GameError("game_started", "Bots can only be removed in the lobby.")
for idx in range(len(self.players) - 1, -1, -1):
if self.players[idx].is_bot:
bot = self.players.pop(idx)
return {"type": "bot_removed", "playerId": bot.id, "name": bot.name}
raise GameError("no_bot", "There are no bots to remove.")
def is_valid_play(self, card: Card) -> bool:
top_card = self.get_top_card()
if self.pending_draw > 0:
if top_card.value == "draw2":
return card.value in {"draw2", "wild_draw4"}
if top_card.value == "wild_draw4":
return card.value == "wild_draw4"
return False
if card.color == "wild":
return True
if card.color == self.current_color:
return True
return card.value == top_card.value
def is_strict_jump_in(self, card: Card) -> bool:
top_card = self.get_top_card()
return card.color == top_card.color and card.value == top_card.value
def legal_card_ids_for(self, player_id: str) -> list[str]:
if self.phase != "playing" or self.awaiting_color_player_id:
return []
player = self.get_player(player_id)
if player.finished:
return []
if player_id == self.current_player_id:
return [card.id for card in player.hand if self.is_valid_play(card)]
return [card.id for card in player.hand if self.is_strict_jump_in(card)]
def has_valid_turn_play(self, player_id: str) -> bool:
if self.phase != "playing" or self.awaiting_color_player_id:
return False
player = self.get_player(player_id)
if player.finished:
return False
return any(self.is_valid_play(card) for card in player.hand)
def can_draw(self, player_id: str) -> bool:
return (
self.phase == "playing"
and not self.awaiting_color_player_id
and player_id == self.current_player_id
and not self.get_player(player_id).finished
)
def find_card(self, player: Player, card_id: str) -> Card:
for card in player.hand:
if card.id == card_id:
return card
raise GameError("card_not_found", "That card is not in your hand.")
def remove_card(self, player: Player, card: Card) -> None:
for idx, candidate in enumerate(player.hand):
if candidate.id == card.id:
player.hand.pop(idx)
return
raise GameError("card_not_found", "That card is not in your hand.")
def choose_bot_color(self, bot: Player) -> str:
counts = {color: 0 for color in COLORS}
for card in bot.hand:
if card.color in counts:
counts[card.color] += 1
return max(COLORS, key=lambda color: counts[color])
def pick_bot_card(self, bot: Player) -> Card | None:
legal = [card for card in bot.hand if self.is_valid_play(card)]
if not legal:
return None
if self.bot_difficulty == "easy":
return random.choice(legal)
if self.bot_difficulty == "medium":
action = next((c for c in legal if c.color == self.current_color and c.value in {"skip", "reverse", "draw2"}), None)
if action:
return action
number = next((c for c in legal if c.color == self.current_color and c.value.isdigit()), None)
return number or legal[0]
has_current_color = any(c.color == self.current_color for c in bot.hand if c.color != "wild")
if not has_current_color:
wild_draw4 = next((c for c in legal if c.value == "wild_draw4"), None)
if wild_draw4:
return wild_draw4
action = next((c for c in legal if c.color == self.current_color and c.value in {"skip", "reverse"}), None)
if action:
return action
draw2 = next((c for c in legal if c.color == self.current_color and c.value == "draw2"), None)
if draw2:
return draw2
numbers = [c for c in legal if c.color == self.current_color and c.value.isdigit()]
if numbers:
return sorted(numbers, key=lambda c: int(c.value), reverse=True)[0]
non_wild = next((c for c in legal if c.color != "wild"), None)
return non_wild or legal[0]
def play_card(
self,
player_id: str,
card_id: str,
*,
bot_auto_color: bool = False,
expected_top_card_id: str | None = None,
expected_version: int | None = None,
) -> dict[str, Any]:
if self.phase != "playing":
raise GameError("not_playing", "The game is not currently playing.")
if self.awaiting_color_player_id:
raise GameError("awaiting_color", "A wild card color must be chosen first.")
player = self.get_player(player_id)
if player.finished:
raise GameError("player_finished", "Finished players cannot play cards.")
card = self.find_card(player, card_id)
if expected_version is not None and expected_version != self.version:
raise GameError("stale_table", "The table changed before your play arrived.")
if expected_top_card_id is not None and expected_top_card_id != self.get_top_card().id:
raise GameError("stale_table", "The discard pile changed before your play arrived.")
jump_in = player_id != self.current_player_id
if jump_in:
if expected_version is None or expected_top_card_id is None:
raise GameError("stale_jump_in", "Jump-in requires the latest table snapshot.")
if not self.jump_in_enabled:
raise GameError("jump_in_disabled", "Jump-in is not enabled.")
if not self.is_strict_jump_in(card):
raise GameError("bad_jump_in", "Jump-in requires the exact same card.")
self.current_player_id = player_id
elif not self.is_valid_play(card):
raise GameError("bad_play", "That card cannot be played right now.")
self.remove_card(player, card)
self.discard_pile.append(card)
if card.color != "wild":
self.current_color = card.color
if card.value == "reverse":
self.direction *= -1
elif card.value == "draw2":
self.pending_draw += 2
elif card.value == "wild_draw4":
self.pending_draw += 4
self.current_color = "wild"
elif card.value == "wild":
self.current_color = "wild"
self.sync_uno_flags()
self.mark_finished_if_needed(player)
chosen_color: str | None = None
if self.phase != "ended":
if card.value in {"wild", "wild_draw4"}:
if bot_auto_color or player.is_bot:
chosen_color = self.choose_bot_color(player)
self.current_color = chosen_color
self.finish_turn_after(player_id, card)
else:
self.awaiting_color_player_id = player_id
self.awaiting_color_card_value = card.value
else:
self.finish_turn_after(player_id, card)
return {
"type": "card_played",
"playerId": player_id,
"playerName": player.name,
"card": card.public(),
"jumpIn": jump_in,
"chosenColor": chosen_color,
}
def choose_color(self, player_id: str, color: str) -> dict[str, Any]:
if color not in COLORS:
raise GameError("bad_color", "Color must be red, blue, green, or yellow.")
if self.awaiting_color_player_id != player_id:
raise GameError("not_awaiting_you", "This room is not waiting for you to choose a color.")
value = self.awaiting_color_card_value or "wild"
self.current_color = color
self.awaiting_color_player_id = None
self.awaiting_color_card_value = None
if self.phase != "ended":
self.finish_turn_after(player_id, Card("color_choice", "wild", value))
player = self.get_player(player_id)
return {"type": "color_changed", "playerId": player_id, "playerName": player.name, "color": color}
def draw_for_player(self, player_id: str, *, auto: bool = False, reason: str | None = None) -> dict[str, Any]:
if self.phase != "playing":
raise GameError("not_playing", "The game is not currently playing.")
if self.awaiting_color_player_id:
raise GameError("awaiting_color", "A wild card color must be chosen first.")
if player_id != self.current_player_id:
raise GameError("not_your_turn", "Only the current player can draw.")
player = self.get_player(player_id)
if player.finished:
raise GameError("player_finished", "Finished players cannot draw.")
count = self.pending_draw if self.pending_draw > 0 else 1
drawn = self.take_cards(count)
player.hand.extend(drawn)
self.pending_draw = 0
self.sync_uno_flags()
self.finish_turn_after(player_id, None)
event: dict[str, Any] = {
"type": "cards_drawn",
"playerId": player_id,
"playerName": player.name,
"count": len(drawn),
"cards": [card.public() for card in drawn],
}
if auto:
event["auto"] = True
event["reason"] = reason or "no_playable"
return event
def take_cards(self, count: int) -> list[Card]:
drawn: list[Card] = []
for _ in range(count):
if not self.deck:
self.reshuffle_deck()
if not self.deck:
break
drawn.append(self.deck.pop())
return drawn
def reshuffle_deck(self) -> None:
if len(self.discard_pile) <= 1:
return
top_card = self.discard_pile[-1]
recycled = self.discard_pile[:-1]
random.shuffle(recycled)
self.deck.extend(recycled)
self.discard_pile = [top_card]
def sync_uno_flags(self) -> None:
for player in self.players:
if len(player.hand) != 1:
player.said_uno = False
def call_uno(self, player_id: str) -> dict[str, Any]:
player = self.get_player(player_id)
if len(player.hand) != 1:
raise GameError("not_uno", "UNO can only be called with exactly one card.")
player.said_uno = True
return {"type": "uno_called", "playerId": player_id, "playerName": player.name}
def rename_player(self, player_id: str, name: str) -> dict[str, Any]:
player = self.get_player(player_id)
if player.is_bot:
raise GameError("bot_rename", "Bots cannot be renamed from a socket.")
new_name = normalize_name(name)
if player.name == new_name:
return {"type": "player_renamed", "playerId": player_id, "playerName": player.name}
player.name = new_name
return {"type": "player_renamed", "playerId": player_id, "playerName": player.name}
def mark_finished_if_needed(self, player: Player) -> None:
if player.hand or player.finished:
return
player.finished = True
player.rank = len(self.finish_order) + 1
self.finish_order.append(player.id)
remaining = [p for p in self.players if not p.finished]
if len(remaining) == 1:
last = remaining[0]
last.finished = True
last.rank = len(self.finish_order) + 1
self.finish_order.append(last.id)
self.phase = "ended"
self.current_player_id = None
self.awaiting_color_player_id = None
self.awaiting_color_card_value = None
def advance_steps_from(self, player_id: str, steps: int) -> str:
if not self.active_players():
return player_id
idx = self.get_player_index(player_id)
for _ in range(steps):
for _ in range(len(self.players)):
idx = (idx + self.direction + len(self.players)) % len(self.players)
candidate = self.players[idx]
if not candidate.finished:
break
return self.players[idx].id
def finish_turn_after(self, player_id: str, card: Card | None) -> None:
if self.phase == "ended":
return
steps = 1
if card and card.value == "skip":
steps = 2
elif card and card.value == "reverse" and len(self.active_players()) == 2:
steps = 2
self.current_player_id = self.advance_steps_from(player_id, steps)
def snapshot_for(self, recipient_id: str) -> dict[str, Any]:
legal_ids = self.legal_card_ids_for(recipient_id) if self.phase == "playing" else []
players_payload: list[dict[str, Any]] = []
for player in self.players:
payload: dict[str, Any] = {
"id": player.id,
"name": player.name,
"isBot": player.is_bot,
"connected": player.connected,
"handCount": len(player.hand),
"saidUno": player.said_uno,
"finished": player.finished,
"rank": player.rank,
}
if player.id == recipient_id:
payload["hand"] = [card.public() for card in player.hand]
players_payload.append(payload)
top_card = self.discard_pile[-1].public() if self.discard_pile else None
return {
"roomCode": self.code,
"phase": self.phase,
"myPlayerId": recipient_id,
"hostPlayerId": self.host_player_id,
"settings": {
"totalPlayers": self.total_players,
"botDifficulty": self.bot_difficulty,
"botJumpIn": self.bot_jump_in,
},
"players": players_payload,
"deckCount": len(self.deck),
"discardPile": [card.public() for card in self.discard_pile],
"topCard": top_card,
"currentPlayerId": self.current_player_id,
"direction": self.direction,
"currentColor": self.current_color,
"pendingDraw": self.pending_draw,
"finishOrder": list(self.finish_order),
"awaitingColorPlayerId": self.awaiting_color_player_id,
"legalCardIds": legal_ids,
"canDraw": self.can_draw(recipient_id) if self.phase == "playing" else False,
"version": self.version,
}
class CreateRoomRequest(BaseModel):
name: str = Field(min_length=1, max_length=24)
totalPlayers: int = Field(default=3, ge=MIN_PLAYERS, le=MAX_PLAYERS)
botDifficulty: str = "hard"
botJumpIn: bool = True
class JoinRoomRequest(BaseModel):
name: str = Field(min_length=1, max_length=24)
rooms: dict[str, Room] = {}
app = FastAPI(title="Multiplayer UNO", version="1.0.0")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
def make_id(prefix: str) -> str:
return f"{prefix}_{secrets.token_urlsafe(8)}"
def make_room_code() -> str:
alphabet = string.ascii_uppercase + string.digits
while True:
code = "".join(secrets.choice(alphabet) for _ in range(5))
if code not in rooms:
return code
def normalize_name(name: str) -> str:
cleaned = " ".join(name.strip().split())
return cleaned[:24] or "Player"
def validate_difficulty(value: str) -> str:
if value not in BOT_DELAYS:
raise HTTPException(status_code=400, detail="botDifficulty must be easy, medium, or hard")
return value
def is_room_code(value: str) -> bool:
return len(value) == 5 and all(ch in string.ascii_letters + string.digits for ch in value)
def public_join_payload(room: Room, player: Player) -> dict[str, str]:
return {"roomCode": room.code, "playerId": player.id, "token": player.token}
@app.get("/")
async def index() -> FileResponse:
return FileResponse(STATIC_DIR / "index.html")
@app.get("/healthz")
async def healthz() -> JSONResponse:
return JSONResponse({"ok": True, "rooms": len(rooms)})
@app.get("/favicon.ico")
async def favicon() -> Response:
return Response(status_code=204)
@app.get("/uno-style.css")
async def uno_style() -> Response:
if not UNO_HTML.exists():
return Response("", media_type="text/css")
text = UNO_HTML.read_text(encoding="utf-8", errors="replace")
start = text.find("")
css = text[start + len("