uno / app.py
cacodex's picture
Fix auto draw and room deep links
9f14abf verified
Raw
History Blame Contribute Delete
38.2 kB
from __future__ import annotations
import asyncio
import random
import secrets
import string
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse, JSONResponse, Response
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
ROOT = Path(__file__).resolve().parent
STATIC_DIR = ROOT / "static"
UNO_HTML = ROOT / "uno.html"
COLORS = ["red", "blue", "green", "yellow"]
ACTION_VALUES = {"skip", "reverse", "draw2", "wild", "wild_draw4"}
BOT_DELAYS = {"easy": 0.6, "medium": 1.0, "hard": 1.4}
MIN_PLAYERS = 3
MAX_PLAYERS = 15
ROOM_IDLE_SECONDS = 30 * 60
RECONNECT_SECONDS = 2 * 60
AUTO_DRAW_DELAY_SECONDS = 0.42
class GameError(Exception):
def __init__(self, code: str, message: str):
super().__init__(message)
self.code = code
self.message = message
@dataclass
class Card:
id: str
color: str
value: str
def public(self) -> dict[str, str]:
return {"id": self.id, "color": self.color, "value": self.value}
@dataclass
class Player:
id: str
name: str
token: str
is_bot: bool = False
connected: bool = False
hand: list[Card] = field(default_factory=list)
said_uno: bool = False
finished: bool = False
rank: int | None = None
disconnected_at: float | None = None
@dataclass
class Room:
code: str
host_player_id: str
total_players: int
bot_difficulty: str = "hard"
bot_jump_in: bool = True
phase: str = "lobby"
players: list[Player] = field(default_factory=list)
deck: list[Card] = field(default_factory=list)
discard_pile: list[Card] = field(default_factory=list)
current_player_id: str | None = None
direction: int = 1
current_color: str = "red"
pending_draw: int = 0
finish_order: list[str] = field(default_factory=list)
awaiting_color_player_id: str | None = None
awaiting_color_card_value: str | None = None
jump_in_enabled: bool = True
connections: dict[str, WebSocket] = field(default_factory=dict)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
version: int = 0
card_uid_seed: int = 1
def touch(self) -> None:
self.last_active = time.time()
def next_card_id(self) -> str:
card_id = f"card_{self.card_uid_seed}"
self.card_uid_seed += 1
return card_id
def active_players(self) -> list[Player]:
return [p for p in self.players if not p.finished]
def human_players(self) -> list[Player]:
return [p for p in self.players if not p.is_bot]
def get_player(self, player_id: str) -> Player:
for player in self.players:
if player.id == player_id:
return player
raise GameError("player_not_found", "Player is not in this room.")
def get_player_index(self, player_id: str) -> int:
for idx, player in enumerate(self.players):
if player.id == player_id:
return idx
raise GameError("player_not_found", "Player is not in this room.")
def get_top_card(self) -> Card:
if not self.discard_pile:
raise GameError("no_discard", "The discard pile is empty.")
return self.discard_pile[-1]
def make_deck(self) -> list[Card]:
deck: list[Card] = []
for color in COLORS:
deck.append(Card(self.next_card_id(), color, "0"))
for value in range(1, 10):
deck.append(Card(self.next_card_id(), color, str(value)))
deck.append(Card(self.next_card_id(), color, str(value)))
for value in ("skip", "reverse", "draw2"):
deck.append(Card(self.next_card_id(), color, value))
deck.append(Card(self.next_card_id(), color, value))
for _ in range(4):
deck.append(Card(self.next_card_id(), "wild", "wild"))
deck.append(Card(self.next_card_id(), "wild", "wild_draw4"))
random.shuffle(deck)
return deck
def start_game(self, actor_id: str) -> dict[str, Any]:
self.require_host(actor_id)
if self.phase == "playing":
raise GameError("already_started", "This room is already playing.")
if not (MIN_PLAYERS <= len(self.players) <= MAX_PLAYERS):
raise GameError("bad_player_count", "A game needs 3 to 15 players.")
disconnected = [p for p in self.players if not p.is_bot and not p.connected]
if disconnected:
raise GameError("player_offline", "All human players must be connected before starting.")
self.card_uid_seed = 1
self.deck = self.make_deck()
self.discard_pile = []
self.direction = 1
self.pending_draw = 0
self.finish_order = []
self.awaiting_color_player_id = None
self.awaiting_color_card_value = None
self.phase = "playing"
for player in self.players:
player.hand = []
player.said_uno = False
player.finished = False
player.rank = None
for _ in range(7):
for player in self.players:
player.hand.append(self.deck.pop())
start_card: Card | None = None
while self.deck:
candidate = self.deck.pop()
if candidate.color == "wild":
self.deck.insert(0, candidate)
random.shuffle(self.deck)
continue
start_card = candidate
break
if start_card is None:
raise GameError("deck_error", "Could not choose a starting card.")
self.discard_pile.append(start_card)
self.current_color = start_card.color
self.current_player_id = self.active_players()[0].id
self.sync_uno_flags()
return {"type": "game_started", "roomCode": self.code}
def require_host(self, actor_id: str) -> None:
if actor_id != self.host_player_id:
raise GameError("not_host", "Only the host can do that.")
def set_room_options(self, actor_id: str, payload: dict[str, Any]) -> dict[str, Any]:
self.require_host(actor_id)
if self.phase != "lobby":
raise GameError("game_started", "Room options can only be changed in the lobby.")
total = int(payload.get("totalPlayers", self.total_players))
if not (MIN_PLAYERS <= total <= MAX_PLAYERS):
raise GameError("bad_total_players", "Total players must be between 3 and 15.")
if total < len(self.players):
raise GameError("too_small", "Total players cannot be lower than current seats.")
difficulty = payload.get("botDifficulty", self.bot_difficulty)
if difficulty not in BOT_DELAYS:
raise GameError("bad_difficulty", "Bot difficulty must be easy, medium, or hard.")
self.total_players = total
self.bot_difficulty = difficulty
self.bot_jump_in = bool(payload.get("botJumpIn", self.bot_jump_in))
return {"type": "room_options_changed"}
def add_bot(self, actor_id: str) -> dict[str, Any]:
self.require_host(actor_id)
if self.phase != "lobby":
raise GameError("game_started", "Bots can only be added in the lobby.")
if len(self.players) >= self.total_players:
raise GameError("room_full", "The room is already full.")
bot_num = 1 + sum(1 for p in self.players if p.is_bot)
bot = Player(
id=make_id("bot"),
name=f"Bot {bot_num}",
token=secrets.token_urlsafe(16),
is_bot=True,
connected=True,
)
self.players.append(bot)
return {"type": "bot_added", "playerId": bot.id, "name": bot.name}
def remove_bot(self, actor_id: str) -> dict[str, Any]:
self.require_host(actor_id)
if self.phase != "lobby":
raise GameError("game_started", "Bots can only be removed in the lobby.")
for idx in range(len(self.players) - 1, -1, -1):
if self.players[idx].is_bot:
bot = self.players.pop(idx)
return {"type": "bot_removed", "playerId": bot.id, "name": bot.name}
raise GameError("no_bot", "There are no bots to remove.")
def is_valid_play(self, card: Card) -> bool:
top_card = self.get_top_card()
if self.pending_draw > 0:
if top_card.value == "draw2":
return card.value in {"draw2", "wild_draw4"}
if top_card.value == "wild_draw4":
return card.value == "wild_draw4"
return False
if card.color == "wild":
return True
if card.color == self.current_color:
return True
return card.value == top_card.value
def is_strict_jump_in(self, card: Card) -> bool:
top_card = self.get_top_card()
return card.color == top_card.color and card.value == top_card.value
def legal_card_ids_for(self, player_id: str) -> list[str]:
if self.phase != "playing" or self.awaiting_color_player_id:
return []
player = self.get_player(player_id)
if player.finished:
return []
if player_id == self.current_player_id:
return [card.id for card in player.hand if self.is_valid_play(card)]
return [card.id for card in player.hand if self.is_strict_jump_in(card)]
def has_valid_turn_play(self, player_id: str) -> bool:
if self.phase != "playing" or self.awaiting_color_player_id:
return False
player = self.get_player(player_id)
if player.finished:
return False
return any(self.is_valid_play(card) for card in player.hand)
def can_draw(self, player_id: str) -> bool:
return (
self.phase == "playing"
and not self.awaiting_color_player_id
and player_id == self.current_player_id
and not self.get_player(player_id).finished
)
def find_card(self, player: Player, card_id: str) -> Card:
for card in player.hand:
if card.id == card_id:
return card
raise GameError("card_not_found", "That card is not in your hand.")
def remove_card(self, player: Player, card: Card) -> None:
for idx, candidate in enumerate(player.hand):
if candidate.id == card.id:
player.hand.pop(idx)
return
raise GameError("card_not_found", "That card is not in your hand.")
def choose_bot_color(self, bot: Player) -> str:
counts = {color: 0 for color in COLORS}
for card in bot.hand:
if card.color in counts:
counts[card.color] += 1
return max(COLORS, key=lambda color: counts[color])
def pick_bot_card(self, bot: Player) -> Card | None:
legal = [card for card in bot.hand if self.is_valid_play(card)]
if not legal:
return None
if self.bot_difficulty == "easy":
return random.choice(legal)
if self.bot_difficulty == "medium":
action = next((c for c in legal if c.color == self.current_color and c.value in {"skip", "reverse", "draw2"}), None)
if action:
return action
number = next((c for c in legal if c.color == self.current_color and c.value.isdigit()), None)
return number or legal[0]
has_current_color = any(c.color == self.current_color for c in bot.hand if c.color != "wild")
if not has_current_color:
wild_draw4 = next((c for c in legal if c.value == "wild_draw4"), None)
if wild_draw4:
return wild_draw4
action = next((c for c in legal if c.color == self.current_color and c.value in {"skip", "reverse"}), None)
if action:
return action
draw2 = next((c for c in legal if c.color == self.current_color and c.value == "draw2"), None)
if draw2:
return draw2
numbers = [c for c in legal if c.color == self.current_color and c.value.isdigit()]
if numbers:
return sorted(numbers, key=lambda c: int(c.value), reverse=True)[0]
non_wild = next((c for c in legal if c.color != "wild"), None)
return non_wild or legal[0]
def play_card(
self,
player_id: str,
card_id: str,
*,
bot_auto_color: bool = False,
expected_top_card_id: str | None = None,
expected_version: int | None = None,
) -> dict[str, Any]:
if self.phase != "playing":
raise GameError("not_playing", "The game is not currently playing.")
if self.awaiting_color_player_id:
raise GameError("awaiting_color", "A wild card color must be chosen first.")
player = self.get_player(player_id)
if player.finished:
raise GameError("player_finished", "Finished players cannot play cards.")
card = self.find_card(player, card_id)
if expected_version is not None and expected_version != self.version:
raise GameError("stale_table", "The table changed before your play arrived.")
if expected_top_card_id is not None and expected_top_card_id != self.get_top_card().id:
raise GameError("stale_table", "The discard pile changed before your play arrived.")
jump_in = player_id != self.current_player_id
if jump_in:
if expected_version is None or expected_top_card_id is None:
raise GameError("stale_jump_in", "Jump-in requires the latest table snapshot.")
if not self.jump_in_enabled:
raise GameError("jump_in_disabled", "Jump-in is not enabled.")
if not self.is_strict_jump_in(card):
raise GameError("bad_jump_in", "Jump-in requires the exact same card.")
self.current_player_id = player_id
elif not self.is_valid_play(card):
raise GameError("bad_play", "That card cannot be played right now.")
self.remove_card(player, card)
self.discard_pile.append(card)
if card.color != "wild":
self.current_color = card.color
if card.value == "reverse":
self.direction *= -1
elif card.value == "draw2":
self.pending_draw += 2
elif card.value == "wild_draw4":
self.pending_draw += 4
self.current_color = "wild"
elif card.value == "wild":
self.current_color = "wild"
self.sync_uno_flags()
self.mark_finished_if_needed(player)
chosen_color: str | None = None
if self.phase != "ended":
if card.value in {"wild", "wild_draw4"}:
if bot_auto_color or player.is_bot:
chosen_color = self.choose_bot_color(player)
self.current_color = chosen_color
self.finish_turn_after(player_id, card)
else:
self.awaiting_color_player_id = player_id
self.awaiting_color_card_value = card.value
else:
self.finish_turn_after(player_id, card)
return {
"type": "card_played",
"playerId": player_id,
"playerName": player.name,
"card": card.public(),
"jumpIn": jump_in,
"chosenColor": chosen_color,
}
def choose_color(self, player_id: str, color: str) -> dict[str, Any]:
if color not in COLORS:
raise GameError("bad_color", "Color must be red, blue, green, or yellow.")
if self.awaiting_color_player_id != player_id:
raise GameError("not_awaiting_you", "This room is not waiting for you to choose a color.")
value = self.awaiting_color_card_value or "wild"
self.current_color = color
self.awaiting_color_player_id = None
self.awaiting_color_card_value = None
if self.phase != "ended":
self.finish_turn_after(player_id, Card("color_choice", "wild", value))
player = self.get_player(player_id)
return {"type": "color_changed", "playerId": player_id, "playerName": player.name, "color": color}
def draw_for_player(self, player_id: str, *, auto: bool = False, reason: str | None = None) -> dict[str, Any]:
if self.phase != "playing":
raise GameError("not_playing", "The game is not currently playing.")
if self.awaiting_color_player_id:
raise GameError("awaiting_color", "A wild card color must be chosen first.")
if player_id != self.current_player_id:
raise GameError("not_your_turn", "Only the current player can draw.")
player = self.get_player(player_id)
if player.finished:
raise GameError("player_finished", "Finished players cannot draw.")
count = self.pending_draw if self.pending_draw > 0 else 1
drawn = self.take_cards(count)
player.hand.extend(drawn)
self.pending_draw = 0
self.sync_uno_flags()
self.finish_turn_after(player_id, None)
event: dict[str, Any] = {
"type": "cards_drawn",
"playerId": player_id,
"playerName": player.name,
"count": len(drawn),
"cards": [card.public() for card in drawn],
}
if auto:
event["auto"] = True
event["reason"] = reason or "no_playable"
return event
def take_cards(self, count: int) -> list[Card]:
drawn: list[Card] = []
for _ in range(count):
if not self.deck:
self.reshuffle_deck()
if not self.deck:
break
drawn.append(self.deck.pop())
return drawn
def reshuffle_deck(self) -> None:
if len(self.discard_pile) <= 1:
return
top_card = self.discard_pile[-1]
recycled = self.discard_pile[:-1]
random.shuffle(recycled)
self.deck.extend(recycled)
self.discard_pile = [top_card]
def sync_uno_flags(self) -> None:
for player in self.players:
if len(player.hand) != 1:
player.said_uno = False
def call_uno(self, player_id: str) -> dict[str, Any]:
player = self.get_player(player_id)
if len(player.hand) != 1:
raise GameError("not_uno", "UNO can only be called with exactly one card.")
player.said_uno = True
return {"type": "uno_called", "playerId": player_id, "playerName": player.name}
def rename_player(self, player_id: str, name: str) -> dict[str, Any]:
player = self.get_player(player_id)
if player.is_bot:
raise GameError("bot_rename", "Bots cannot be renamed from a socket.")
new_name = normalize_name(name)
if player.name == new_name:
return {"type": "player_renamed", "playerId": player_id, "playerName": player.name}
player.name = new_name
return {"type": "player_renamed", "playerId": player_id, "playerName": player.name}
def mark_finished_if_needed(self, player: Player) -> None:
if player.hand or player.finished:
return
player.finished = True
player.rank = len(self.finish_order) + 1
self.finish_order.append(player.id)
remaining = [p for p in self.players if not p.finished]
if len(remaining) == 1:
last = remaining[0]
last.finished = True
last.rank = len(self.finish_order) + 1
self.finish_order.append(last.id)
self.phase = "ended"
self.current_player_id = None
self.awaiting_color_player_id = None
self.awaiting_color_card_value = None
def advance_steps_from(self, player_id: str, steps: int) -> str:
if not self.active_players():
return player_id
idx = self.get_player_index(player_id)
for _ in range(steps):
for _ in range(len(self.players)):
idx = (idx + self.direction + len(self.players)) % len(self.players)
candidate = self.players[idx]
if not candidate.finished:
break
return self.players[idx].id
def finish_turn_after(self, player_id: str, card: Card | None) -> None:
if self.phase == "ended":
return
steps = 1
if card and card.value == "skip":
steps = 2
elif card and card.value == "reverse" and len(self.active_players()) == 2:
steps = 2
self.current_player_id = self.advance_steps_from(player_id, steps)
def snapshot_for(self, recipient_id: str) -> dict[str, Any]:
legal_ids = self.legal_card_ids_for(recipient_id) if self.phase == "playing" else []
players_payload: list[dict[str, Any]] = []
for player in self.players:
payload: dict[str, Any] = {
"id": player.id,
"name": player.name,
"isBot": player.is_bot,
"connected": player.connected,
"handCount": len(player.hand),
"saidUno": player.said_uno,
"finished": player.finished,
"rank": player.rank,
}
if player.id == recipient_id:
payload["hand"] = [card.public() for card in player.hand]
players_payload.append(payload)
top_card = self.discard_pile[-1].public() if self.discard_pile else None
return {
"roomCode": self.code,
"phase": self.phase,
"myPlayerId": recipient_id,
"hostPlayerId": self.host_player_id,
"settings": {
"totalPlayers": self.total_players,
"botDifficulty": self.bot_difficulty,
"botJumpIn": self.bot_jump_in,
},
"players": players_payload,
"deckCount": len(self.deck),
"discardPile": [card.public() for card in self.discard_pile],
"topCard": top_card,
"currentPlayerId": self.current_player_id,
"direction": self.direction,
"currentColor": self.current_color,
"pendingDraw": self.pending_draw,
"finishOrder": list(self.finish_order),
"awaitingColorPlayerId": self.awaiting_color_player_id,
"legalCardIds": legal_ids,
"canDraw": self.can_draw(recipient_id) if self.phase == "playing" else False,
"version": self.version,
}
class CreateRoomRequest(BaseModel):
name: str = Field(min_length=1, max_length=24)
totalPlayers: int = Field(default=3, ge=MIN_PLAYERS, le=MAX_PLAYERS)
botDifficulty: str = "hard"
botJumpIn: bool = True
class JoinRoomRequest(BaseModel):
name: str = Field(min_length=1, max_length=24)
rooms: dict[str, Room] = {}
app = FastAPI(title="Multiplayer UNO", version="1.0.0")
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
def make_id(prefix: str) -> str:
return f"{prefix}_{secrets.token_urlsafe(8)}"
def make_room_code() -> str:
alphabet = string.ascii_uppercase + string.digits
while True:
code = "".join(secrets.choice(alphabet) for _ in range(5))
if code not in rooms:
return code
def normalize_name(name: str) -> str:
cleaned = " ".join(name.strip().split())
return cleaned[:24] or "Player"
def validate_difficulty(value: str) -> str:
if value not in BOT_DELAYS:
raise HTTPException(status_code=400, detail="botDifficulty must be easy, medium, or hard")
return value
def is_room_code(value: str) -> bool:
return len(value) == 5 and all(ch in string.ascii_letters + string.digits for ch in value)
def public_join_payload(room: Room, player: Player) -> dict[str, str]:
return {"roomCode": room.code, "playerId": player.id, "token": player.token}
@app.get("/")
async def index() -> FileResponse:
return FileResponse(STATIC_DIR / "index.html")
@app.get("/healthz")
async def healthz() -> JSONResponse:
return JSONResponse({"ok": True, "rooms": len(rooms)})
@app.get("/favicon.ico")
async def favicon() -> Response:
return Response(status_code=204)
@app.get("/uno-style.css")
async def uno_style() -> Response:
if not UNO_HTML.exists():
return Response("", media_type="text/css")
text = UNO_HTML.read_text(encoding="utf-8", errors="replace")
start = text.find("<style>")
end = text.find("</style>")
css = text[start + len("<style>") : end] if start != -1 and end != -1 else ""
return Response(css, media_type="text/css")
@app.get("/{room_code}")
async def room_link(room_code: str) -> FileResponse:
if not is_room_code(room_code):
raise HTTPException(status_code=404, detail="Not found")
return FileResponse(STATIC_DIR / "index.html")
@app.post("/api/rooms")
async def create_room(request: CreateRoomRequest) -> JSONResponse:
difficulty = validate_difficulty(request.botDifficulty)
code = make_room_code()
host = Player(
id=make_id("player"),
name=normalize_name(request.name),
token=secrets.token_urlsafe(24),
connected=False,
)
room = Room(
code=code,
host_player_id=host.id,
total_players=request.totalPlayers,
bot_difficulty=difficulty,
bot_jump_in=request.botJumpIn,
players=[host],
)
rooms[code] = room
return JSONResponse(public_join_payload(room, host))
@app.post("/api/rooms/{room_code}/join")
async def join_room(room_code: str, request: JoinRoomRequest) -> JSONResponse:
code = room_code.upper()
room = rooms.get(code)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
async with room.lock:
if room.phase != "lobby":
raise HTTPException(status_code=409, detail="This room has already started")
if len(room.players) >= room.total_players:
raise HTTPException(status_code=409, detail="This room is full")
player = Player(
id=make_id("player"),
name=normalize_name(request.name),
token=secrets.token_urlsafe(24),
connected=False,
)
room.players.append(player)
room.touch()
room.version += 1
await broadcast_room(room, {"type": "player_joined", "playerId": player.id, "playerName": player.name})
return JSONResponse(public_join_payload(room, player))
@app.websocket("/ws/{room_code}")
async def websocket_endpoint(websocket: WebSocket, room_code: str) -> None:
code = room_code.upper()
player_id = websocket.query_params.get("playerId", "")
token = websocket.query_params.get("token", "")
await websocket.accept()
room = rooms.get(code)
if not room:
await websocket.send_json({"type": "error", "code": "room_not_found", "message": "Room not found."})
await websocket.close()
return
try:
async with room.lock:
player = room.get_player(player_id)
if player.token != token:
raise GameError("bad_token", "Invalid reconnect token.")
if player.is_bot:
raise GameError("bot_socket", "Bots cannot open sockets.")
player.connected = True
player.disconnected_at = None
room.connections[player_id] = websocket
room.touch()
room.version += 1
await broadcast_room(room, {"type": "player_connected", "playerId": player.id, "playerName": player.name})
schedule_room_automation(room)
while True:
data = await websocket.receive_json()
await handle_command(room, player_id, data)
except WebSocketDisconnect:
pass
except GameError as exc:
await websocket.send_json({"type": "error", "code": exc.code, "message": exc.message})
finally:
async with room.lock:
existing = room.connections.get(player_id)
if existing is websocket:
room.connections.pop(player_id, None)
try:
player = room.get_player(player_id)
player.connected = False
player.disconnected_at = time.time()
room.version += 1
except GameError:
player = None
if player:
await broadcast_room(room, {"type": "player_disconnected", "playerId": player.id, "playerName": player.name})
async def handle_command(room: Room, player_id: str, data: dict[str, Any]) -> None:
command = data.get("type")
payload = data.get("payload") or {}
event: dict[str, Any] | None = None
error: GameError | None = None
changed = False
async with room.lock:
try:
room.touch()
if command == "set_room_options":
event = room.set_room_options(player_id, payload)
changed = True
elif command == "add_bot":
event = room.add_bot(player_id)
changed = True
elif command == "remove_bot":
event = room.remove_bot(player_id)
changed = True
elif command == "start_game":
event = room.start_game(player_id)
changed = True
elif command == "play_card":
card_id = str(payload.get("cardId", ""))
expected_top = payload.get("expectedTopCardId")
expected_version = payload.get("expectedVersion")
if expected_version is not None:
expected_version = int(expected_version)
event = room.play_card(
player_id,
card_id,
expected_top_card_id=str(expected_top) if expected_top is not None else None,
expected_version=expected_version,
)
changed = True
elif command == "draw":
event = room.draw_for_player(player_id)
changed = True
elif command == "choose_color":
color = str(payload.get("color", ""))
event = room.choose_color(player_id, color)
changed = True
elif command == "call_uno":
event = room.call_uno(player_id)
changed = True
elif command == "rename":
event = room.rename_player(player_id, str(payload.get("name", "")))
changed = True
elif command == "restart":
event = room.start_game(player_id)
changed = True
else:
raise GameError("unknown_command", "Unknown command.")
if changed:
room.version += 1
except GameError as exc:
error = exc
if error:
await send_error(room, player_id, error)
return
if changed:
await broadcast_room(room, event)
schedule_room_automation(room)
async def send_error(room: Room, player_id: str, error: GameError) -> None:
websocket = room.connections.get(player_id)
if not websocket:
return
await websocket.send_json({"type": "error", "code": error.code, "message": error.message})
async with room.lock:
snapshot = room.snapshot_for(player_id)
await websocket.send_json({"type": "snapshot", "state": snapshot})
def event_for_recipient(event: dict[str, Any] | None, recipient_id: str) -> dict[str, Any] | None:
if not event:
return None
filtered = dict(event)
if filtered.get("type") == "cards_drawn" and filtered.get("playerId") != recipient_id:
filtered.pop("cards", None)
return filtered
async def broadcast_room(room: Room, event: dict[str, Any] | None = None) -> None:
async with room.lock:
recipients = list(room.connections.items())
snapshots = {player_id: room.snapshot_for(player_id) for player_id, _ in recipients}
dead: list[str] = []
for player_id, websocket in recipients:
try:
recipient_event = event_for_recipient(event, player_id)
if recipient_event:
await websocket.send_json({"type": "event", "event": recipient_event})
await websocket.send_json({"type": "snapshot", "state": snapshots[player_id]})
except Exception:
dead.append(player_id)
if dead:
async with room.lock:
for player_id in dead:
room.connections.pop(player_id, None)
def schedule_room_automation(room: Room) -> None:
if room.phase != "playing" or room.awaiting_color_player_id:
return
version = room.version
current = None
try:
current = room.get_player(room.current_player_id or "")
except GameError:
pass
if current and current.is_bot:
delay = BOT_DELAYS.get(room.bot_difficulty, 1.0)
asyncio.create_task(run_bot_turn(room.code, current.id, version, delay))
elif current and not current.finished and not room.has_valid_turn_play(current.id):
top_card = room.get_top_card()
asyncio.create_task(
run_auto_draw(
room.code,
current.id,
version,
top_card.id,
room.current_color,
room.pending_draw,
AUTO_DRAW_DELAY_SECONDS,
)
)
if room.bot_jump_in:
for player in room.players:
if player.is_bot and player.id != room.current_player_id and not player.finished:
if any(room.is_strict_jump_in(card) for card in player.hand):
delay = random.uniform(0.45, 0.9)
asyncio.create_task(run_bot_jump_in(room.code, player.id, version, delay))
def schedule_bots(room: Room) -> None:
schedule_room_automation(room)
async def run_bot_turn(room_code: str, player_id: str, expected_version: int, delay: float) -> None:
await asyncio.sleep(delay)
room = rooms.get(room_code)
if not room:
return
event: dict[str, Any] | None = None
async with room.lock:
if room.version != expected_version or room.phase != "playing" or room.awaiting_color_player_id:
return
if room.current_player_id != player_id:
return
bot = room.get_player(player_id)
card = room.pick_bot_card(bot)
if card:
event = room.play_card(player_id, card.id, bot_auto_color=True)
else:
event = room.draw_for_player(player_id)
room.touch()
room.version += 1
await broadcast_room(room, event)
schedule_room_automation(room)
async def run_bot_jump_in(room_code: str, player_id: str, expected_version: int, delay: float) -> None:
await asyncio.sleep(delay)
room = rooms.get(room_code)
if not room:
return
event: dict[str, Any] | None = None
async with room.lock:
if room.version != expected_version or room.phase != "playing" or room.awaiting_color_player_id:
return
if room.current_player_id == player_id:
return
bot = room.get_player(player_id)
card = next((candidate for candidate in bot.hand if room.is_strict_jump_in(candidate)), None)
if not card:
return
event = room.play_card(
player_id,
card.id,
bot_auto_color=True,
expected_top_card_id=room.get_top_card().id,
expected_version=expected_version,
)
room.touch()
room.version += 1
await broadcast_room(room, event)
schedule_room_automation(room)
async def run_auto_draw(
room_code: str,
player_id: str,
expected_version: int,
expected_top_card_id: str,
expected_color: str,
expected_pending_draw: int,
delay: float = AUTO_DRAW_DELAY_SECONDS,
) -> None:
await asyncio.sleep(delay)
room = rooms.get(room_code)
if not room:
return
event: dict[str, Any] | None = None
async with room.lock:
if room.version != expected_version or room.phase != "playing" or room.awaiting_color_player_id:
return
if room.current_player_id != player_id:
return
if room.current_color != expected_color or room.pending_draw != expected_pending_draw:
return
if room.get_top_card().id != expected_top_card_id:
return
player = room.get_player(player_id)
if player.is_bot or player.finished or room.has_valid_turn_play(player_id):
return
event = room.draw_for_player(player_id, auto=True, reason="no_playable")
room.touch()
room.version += 1
await broadcast_room(room, event)
schedule_room_automation(room)
async def cleanup_rooms() -> None:
while True:
await asyncio.sleep(30)
now = time.time()
for code, room in list(rooms.items()):
async with room.lock:
if now - room.last_active > ROOM_IDLE_SECONDS:
rooms.pop(code, None)
continue
if room.phase == "lobby":
for player in list(room.players):
if (
not player.is_bot
and not player.connected
and player.disconnected_at
and now - player.disconnected_at > RECONNECT_SECONDS
and player.id != room.host_player_id
):
room.players.remove(player)
room.version += 1
@app.on_event("startup")
async def on_startup() -> None:
asyncio.create_task(cleanup_rooms())
def create_deck_for_tests() -> list[Card]:
room = Room(code="TEST", host_player_id="host", total_players=3)
return room.make_deck()