Spaces:
Sleeping
Sleeping
| # coding: utf-8 | |
| # Author: Du Mingzhe (mingzhe@nus.edu.sg) | |
| # Date: 2025-09-09 | |
| import os | |
| import uuid | |
| import random | |
| import logging | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from enum import Enum | |
| from pathlib import Path | |
| from openai import OpenAI | |
| from scipy.stats import norm | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from abc import ABC, abstractmethod | |
| from typing import List, Tuple, Dict | |
| from trueskill import Rating, quality, rate | |
| from openai_cost_calculator import estimate_cost_typed | |
| # Logging Configuration | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logger = logging.getLogger() | |
| logger.setLevel(logging.INFO) | |
| formatter = logging.Formatter('%(asctime)s %(levelname)s:%(name)s:%(message)s') | |
| file_handler = logging.FileHandler('dixit.log') | |
| file_handler.setFormatter(formatter) | |
| stream_handler = logging.StreamHandler() | |
| stream_handler.setFormatter(formatter) | |
| if not logger.handlers: | |
| logger.addHandler(file_handler) | |
| logger.addHandler(stream_handler) | |
| load_dotenv() | |
| Role = Enum('Role', ['STORYTELLER', 'GUESSER', 'UNKNOWN']) | |
| class StorytellResponse(BaseModel): | |
| clue: str | |
| explaination: str | |
| class GuessResponse(BaseModel): | |
| card_id: int | |
| explaination: str | |
| class VoteResponse(BaseModel): | |
| card_id: int | |
| explaination: str | |
| StorytellPrompt = """ | |
| You are a master storyteller in the board game Dixit, known for your clever and evocative clues. | |
| Your challenge is to create a hard and ambiguous clue for the card below that strikes a perfect balance, as your score depends on it. | |
| You can get 3 points if someone but not all of them guesses your card. Otherwise, you get 0 points. | |
| # The Goal: | |
| At least one player must guess your card, but not all of them. If everyone or no one guesses correctly, you score zero points. | |
| # Your Task: | |
| - Provide a clue for the card. This can be a word, a phrase, a quote, or a concise fragment of a story. | |
| - Briefly explain the strategy behind your clue: Why do you think it's not too obvious, yet not too obscure? | |
| # Response Format | |
| class StorytellResponse(BaseModel): | |
| clue: str | |
| explaination: str | |
| """ | |
| GuessPrompt = """ | |
| You are a master player in the creative game of Dixit. The storyteller has provided an ambiguous clue. | |
| Your objective is to analyze this clue and select one card from your hand to serve as a decoy. | |
| Your goal is to mislead other players into voting for your card. You will earn 1 point for each vote your card receives. | |
| Storyteller's Clue: {story} | |
| # Response Format | |
| class GuessResponse(BaseModel): | |
| card_id: int # the index of the image in the list of provided images (starts from 0) | |
| explaination: str | |
| """ | |
| VotePrompt = """ | |
| You are a master player in the creative game of Dixit. You are in the voting phase of the game. | |
| The storyteller has provided an ambiguous clue, and a set of cards are on the table—one is the storyteller's, and the rest are decoys from other guessers. | |
| Your goal is to analyze the clue and the art on each card to deduce which one belongs to the storyteller. | |
| Storyteller's Clue: {story} | |
| # Response Format | |
| class VoteResponse(BaseModel): | |
| card_id: int # the index of the image in the list of provided images (starts from 0) | |
| explaination: str | |
| """ | |
| class Card: | |
| def __init__(self, card_id: int, image_path: str): | |
| self.card_id = card_id | |
| self.guessed_by = None | |
| self.voted_by = list() | |
| self.image_path = image_path | |
| class Player(ABC): | |
| def __init__(self, player_id: str): | |
| self.player_id = player_id | |
| self.role = Role.UNKNOWN | |
| self.selected_card_id = None | |
| self.guessed_card_id = None | |
| self.voted_card_id = None | |
| self.cost = 0 | |
| logger.debug(f"[Player] Player {self.player_id} initialized with role {self.role}") | |
| def storytell_inference(self, card: Card) -> Tuple[str, str, float]: | |
| raise NotImplementedError | |
| def guess_inference(self, story: str, cards: List[Card]) -> Tuple[Card, str, float]: | |
| raise NotImplementedError | |
| def vote_inference(self, story: str, cards: List[Card]) -> Tuple[Card, str, float]: | |
| raise NotImplementedError | |
| def storytell(self, card: Card) -> str: | |
| assert self.role == Role.STORYTELLER | |
| story, explaination, cost = self.storytell_inference(card) | |
| self.cost += cost | |
| self.selected_card_id = card.card_id | |
| logger.info(f"[Storytelling] [{self.player_id}] [{card.card_id}] -> '{story}' (cost: {cost:.6f} USD)") | |
| return story | |
| def guess(self, story: str, cards: List[Card]) -> Card: | |
| assert self.role == Role.GUESSER | |
| card, explaination, cost = self.guess_inference(story, cards) | |
| self.cost += cost | |
| self.guessed_card_id = card.card_id | |
| card.guessed_by = self.player_id | |
| logger.info(f"[Guessing] [{self.player_id}] [{', '.join([card.card_id for card in cards])}] -> [{card.card_id}] (cost: {cost:.6f} USD)") | |
| return card | |
| def vote(self, story: str, cards: List[Card]) -> Card: | |
| assert self.role == Role.GUESSER | |
| card, explaination, cost = self.vote_inference(story, cards) | |
| self.cost += cost | |
| self.voted_card_id = card.card_id | |
| card.voted_by.append(self.player_id) | |
| logger.info(f"[Voting] [{self.player_id}] [{', '.join([card.card_id for card in cards])}] -> [{card.card_id}] (cost: {cost:.6f} USD)") | |
| return card | |
| class HumanPlayer(Player): | |
| def __init__(self, player_id: str): | |
| super().__init__(player_id) | |
| def storytell_inference(self, card: Card) -> Tuple[str, str, float]: | |
| # TODO: implement human storytell inference | |
| return "This is a test story", "This is a test explaination", 0.0 | |
| def guess_inference(self, story: str, cards: List[Card]) -> Tuple[Card, str, float]: | |
| # TODO: implement human guess inference | |
| return random.choice(cards), "This is a test explaination", 0.0 | |
| def vote_inference(self, story: str, cards: List[Card]) -> Tuple[Card, str, float]: | |
| # TODO: implement human vote inference | |
| return random.choice(cards), "This is a test explaination", 0.0 | |
| class OpenAIPlayer(Player): | |
| def __init__(self, player_id: str, model_name: str, debug_mode: bool = False): | |
| super().__init__(player_id) | |
| self.model_name = model_name | |
| self.debug_mode = debug_mode | |
| self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| def storytell_inference(self, card: Card) -> Tuple[str, str, float]: | |
| if self.debug_mode: | |
| return "test clue", "explaination", 0.0 | |
| card_file = self.client.files.create( | |
| file=open(card.image_path.resolve(), "rb"), | |
| purpose="user_data" | |
| ) | |
| response = self.client.responses.parse( | |
| model=self.model_name, | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": StorytellPrompt, | |
| }, | |
| { | |
| "type": "input_image", | |
| "file_id": card_file.id, | |
| } | |
| ] | |
| } | |
| ], | |
| text_format=StorytellResponse, | |
| ) | |
| clue = response.output_parsed.clue | |
| explaination = response.output_parsed.explaination | |
| cost = estimate_cost_typed(response) | |
| return clue, explaination, cost.total_cost | |
| def guess_inference(self, story: str, cards: List[Card]) -> Tuple[Card, str, float]: | |
| if self.debug_mode: | |
| return cards[0], "explaination", 0.0 | |
| card_content_list = list() | |
| for card in cards: | |
| card_file = self.client.files.create( | |
| file=open(card.image_path.resolve(), "rb"), | |
| purpose="user_data" | |
| ) | |
| card_content_list.append({ | |
| "type": "input_image", | |
| "file_id": card_file.id, | |
| }) | |
| response = self.client.responses.parse( | |
| model=self.model_name, | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": GuessPrompt.format(story=story) | |
| }, | |
| ] + card_content_list | |
| } | |
| ], | |
| text_format=GuessResponse, | |
| ) | |
| card_index = response.output_parsed.card_id | |
| explaination = response.output_parsed.explaination | |
| cost = estimate_cost_typed(response) | |
| return cards[card_index%len(cards)], explaination, cost.total_cost | |
| def vote_inference(self, story: str, cards: List[Card]) -> Tuple[Card, str, float]: | |
| if self.debug_mode: | |
| return cards[0], "explaination", 0.0 | |
| card_content_list = list() | |
| for card in cards: | |
| card_file = self.client.files.create( | |
| file=open(card.image_path.resolve(), "rb"), | |
| purpose="user_data" | |
| ) | |
| card_content_list.append({ | |
| "type": "input_image", | |
| "file_id": card_file.id, | |
| }) | |
| response = self.client.responses.parse( | |
| model=self.model_name, | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "input_text", | |
| "text": VotePrompt.format(story=story) | |
| }, | |
| ] + card_content_list | |
| } | |
| ], | |
| text_format=VoteResponse, | |
| ) | |
| card_index = response.output_parsed.card_id | |
| explaination = response.output_parsed.explaination | |
| cost = estimate_cost_typed(response) | |
| return cards[card_index%len(cards)], explaination, cost.total_cost | |
| class Leaderboard(): | |
| def __init__(self, player_list: List[Player]): | |
| self.player_list = player_list | |
| self.player_rating_dict = {player.player_id: [Rating()] for player in player_list} | |
| def get_ranking(score_dict: Dict[str, int]) -> Dict[str, int]: | |
| # 1. Get all unique scores, and sort them in descending order | |
| unique_scores = sorted(set(score_dict.values()), reverse=True) | |
| # 2. Create a mapping from score to rank (rank starts from 1) | |
| score_to_rank = {score: rank + 1 for rank, score in enumerate(unique_scores)} | |
| # 3. Assign the correct rank to each player using the mapping | |
| player_ranking = {player: score_to_rank[score] for player, score in score_dict.items()} | |
| return player_ranking | |
| def get_rating_trace(self, player_id: str) -> List[Rating]: | |
| return self.player_rating_dict[player_id] | |
| def get_latest_rating(self) -> Dict[str, Rating]: | |
| return {player.player_id: self.player_rating_dict[player.player_id][-1] for player in self.player_list} | |
| def rating_update(self, score_dict: Dict[str, int]): | |
| player_ranking_dict = self.get_ranking(score_dict) | |
| player_rating_groups = [[self.player_rating_dict[player.player_id][-1]] for player in self.player_list] | |
| player_ranks = [player_ranking_dict[player.player_id] for player in self.player_list] | |
| new_player_rating_list = rate(rating_groups=player_rating_groups, ranks=player_ranks) | |
| for player, new_rating in zip(self.player_list, new_player_rating_list): | |
| self.player_rating_dict[player.player_id].append(new_rating[0]) | |
| def visualize_rating(self, game_id: str) -> None: | |
| fig, ax = plt.subplots(figsize=(24, 8)) | |
| for player_id, rating_trace in self.player_rating_dict.items(): | |
| rating = rating_trace[-1] | |
| mu = rating.mu | |
| sigma = rating.sigma | |
| dist = norm(loc=mu, scale=sigma) | |
| x = np.linspace(mu - 4*sigma, mu + 4*sigma, 200) | |
| pdf = dist.pdf(x) | |
| ax.plot(x, pdf, label=f'{player_id} (μ={mu:.2f}, σ={sigma:.2f})') | |
| ax.set_title("Player Skill Distributions (TrueSkill Ratings)", fontsize=16) | |
| ax.set_xlabel("Skill Level (μ)", fontsize=12) | |
| ax.set_ylabel("Probability Density", fontsize=12) | |
| ax.legend() | |
| ax.grid(True, linestyle='--', alpha=0.6) | |
| plt.tight_layout() | |
| plt.savefig(f"player_skill_distributions_{game_id}.png") | |
| class Game: | |
| def __init__(self, player_list: List[Player], card_dir: Path): | |
| self.game_id = str(uuid.uuid4()) | |
| self.game_cost = 0 | |
| self.card_dir = card_dir | |
| self.player_list = player_list | |
| self.num_players = len(player_list) | |
| self.storyteller = None | |
| self.guesser_list = list() | |
| self.table_cards = dict() | |
| self.deck = self.initialize_deck(card_dir) | |
| self.score_dict = {player.player_id: 0 for player in self.player_list} | |
| logger.info(f"[Game] Game {self.game_id} initialized (num_players: {self.num_players})") | |
| def initialize_deck(self, card_dir: Path): | |
| temp_deck = list() | |
| for card_file in card_dir.glob("*.png"): | |
| card = Card(card_file.stem, card_file) | |
| temp_deck.append(card) | |
| random.shuffle(temp_deck) | |
| logger.info(f"[Game] Game {self.game_id} deck initialized ({len(temp_deck)} cards)") | |
| return temp_deck | |
| def pick_cards(self, num_cards: int): | |
| return random.sample(self.deck, num_cards) | |
| def get_card_by_id(self, card_id: int) -> Card: | |
| for card in self.deck: | |
| if card.card_id == card_id: | |
| return card | |
| raise ValueError(f"[Game] Card {card_id} not found") | |
| def role_assignment(self) -> None: | |
| random.shuffle(self.player_list) | |
| self.storyteller = self.player_list[0] | |
| self.guesser_list = self.player_list[1:] | |
| self.storyteller.role = Role.STORYTELLER | |
| for guesser in self.guesser_list: | |
| guesser.role = Role.GUESSER | |
| logger.info(f"[Game] Game {self.game_id} role assigned (storyteller: {self.storyteller.player_id}, guesser: {', '.join([guesser.player_id for guesser in self.guesser_list])})") | |
| def dealing(self) -> Tuple[Card, Dict[str, List[Card]]]: | |
| picked_cards = self.pick_cards(1 + len(self.guesser_list) * 6) | |
| storyteller_card = picked_cards.pop(0) | |
| storyteller_card.guessed_by = self.storyteller.player_id | |
| guesser_card_dict = {guesser.player_id: picked_cards[index*6:(index+1)*6] for index, guesser in enumerate(self.guesser_list)} | |
| return storyteller_card, guesser_card_dict | |
| def calculate_score(self): | |
| correct_voted_list = [guesser for guesser in self.guesser_list if guesser.voted_card_id == self.storyteller.selected_card_id] | |
| if self.num_players == 5: | |
| # Storyteller | |
| self.score_dict[self.storyteller.player_id] += 3 if len(correct_voted_list) in [1, 2, 3] else 0 | |
| # Guesser (Find the Storyteller's card) | |
| if len(correct_voted_list) == 0: | |
| for guesser in self.guesser_list: | |
| self.score_dict[guesser.player_id] += 2 | |
| elif len(correct_voted_list) in [1, 2, 3]: | |
| for guesser in correct_voted_list: | |
| self.score_dict[guesser.player_id] += 3 | |
| elif len(correct_voted_list) == 4: | |
| for guesser in correct_voted_list: | |
| self.score_dict[guesser.player_id] += 2 | |
| # Guesser (Others vote for guesser's card) | |
| for guesser in self.guesser_list: | |
| guessed_card = self.get_card_by_id(guesser.voted_card_id) | |
| if guessed_card.guessed_by != self.storyteller.player_id: | |
| self.score_dict[guessed_card.guessed_by] += 1 | |
| elif self.num_players == 6: | |
| # Storyteller | |
| self.score_dict[self.storyteller.player_id] += 3 if len(correct_voted_list) in [1, 2, 3, 4] else 0 | |
| # Guesser (Find the Storyteller's card) | |
| if len(correct_voted_list) == 0: | |
| for guesser in self.guesser_list: | |
| self.score_dict[guesser.player_id] += 2 | |
| elif len(correct_voted_list) in [1, 2, 3, 4]: | |
| for guesser in correct_voted_list: | |
| self.score_dict[guesser.player_id] += 3 | |
| elif len(correct_voted_list) == 5: | |
| for guesser in correct_voted_list: | |
| self.score_dict[guesser.player_id] += 2 | |
| # Guesser (Others vote for guesser's card) | |
| for guesser in self.guesser_list: | |
| guessed_card = self.get_card_by_id(guesser.voted_card_id) | |
| if guessed_card.guessed_by != self.storyteller.player_id: | |
| self.score_dict[guessed_card.guessed_by] += 1 | |
| else: | |
| raise ValueError(f"[Game] Unsupported number of players {self.num_players}") | |
| def loop(self): | |
| logger.info(f"[Game] Game {self.game_id} started") | |
| self.game_cost = 0 | |
| self.table_cards = dict() | |
| self.score_dict = {player.player_id: 0 for player in self.player_list} | |
| # Step 1: Role assignment | |
| self.role_assignment() | |
| logger.info("[+] [Role Assignment]" + "="*50) | |
| # Step 2: Dealing | |
| storyteller_card, guesser_card_dict = self.dealing() | |
| logger.info("[+] [Dealing]" + "="*50) | |
| # Step 3: Storytelling | |
| story = self.storyteller.storytell(storyteller_card) | |
| logger.info(f"Game {self.game_id} storyteller card: {storyteller_card.card_id} -> '{story}'") | |
| logger.info("[+] [Storytelling]" + "="*50) | |
| # Step 4: Guessing | |
| for guesser in self.guesser_list: | |
| guessed_card = guesser.guess(story, guesser_card_dict[guesser.player_id]) | |
| guessed_card.guessed_by = guesser.player_id | |
| self.table_cards[guessed_card.card_id] = guessed_card | |
| self.table_cards[storyteller_card.card_id] = storyteller_card | |
| logger.info(f"Game {self.game_id} guessed cards: {', '.join([card.card_id for card in self.table_cards.values()])}") | |
| logger.info("[+] [Guessing]" + "="*50) | |
| # Step 5: Voting | |
| for guesser in self.guesser_list: | |
| voted_card = guesser.vote(story, [self.table_cards[card_id] for card_id in self.table_cards]) | |
| self.table_cards[voted_card.card_id] = voted_card | |
| logger.info(f"Game {self.game_id} voted cards: {', '.join([card.card_id for card in self.table_cards.values()])}") | |
| logger.info("[+] [Voting]" + "="*50) | |
| # Step 6: Calculate score | |
| self.calculate_score() | |
| logger.info(f"Game {self.game_id} scores calculated: {self.score_dict}") | |
| logger.info("[+] [Score Calculation]" + "="*50) | |
| # Step 7: Calculate cost | |
| self.game_cost = sum([player.cost for player in self.player_list]) | |
| logger.info(f"Game {self.game_id} ended (cost: {self.game_cost:.6f} USD)") | |
| logger.info("[+] [End]" + "="*50) | |
| return self.score_dict | |
| if __name__ == "__main__": | |
| player_list = [ | |
| OpenAIPlayer(player_id="player_0", model_name="gpt-4.1"), | |
| OpenAIPlayer(player_id="player_1", model_name="gpt-4o-mini"), | |
| OpenAIPlayer(player_id="player_2", model_name="gpt-5"), | |
| OpenAIPlayer(player_id="player_3", model_name="gpt-5-mini"), | |
| OpenAIPlayer(player_id="player_4", model_name="gpt-5-nano"), | |
| ] | |
| card_dir = Path('asset/cards') | |
| leaderboard = Leaderboard(player_list) | |
| game = Game(player_list, card_dir) | |
| for i in range(20): | |
| print(f"==================== Game {i} started ====================") | |
| try: | |
| score_dict= game.loop() | |
| leaderboard.rating_update(score_dict) | |
| leaderboard.visualize_rating(f"round_{i}_{game.game_id}") | |
| logger.info(f"Game {game.game_id} leaderboard: {leaderboard.get_latest_rating()}") | |
| except Exception as e: | |
| print(e) | |
| print(f"==================== Game {i} ended ====================") | |