rabukasim / tools /_legacy_scripts /agent_tournament_fast.py
trioskosmos's picture
Upload folder using huggingface_hub
463f868 verified
"""
FAST CPU-Only Agent Tournament
Optimized for maximum throughput with heuristic agents.
Key Optimizations:
1. Mutable game state (no copy per step) - HUGE speedup
2. Disabled verbose logging
3. Disabled loop detection (saves tuple creation + hashing)
4. Larger chunksizes for reduced IPC overhead
5. Minimal object creation per turn
"""
import argparse
import os
import random
import subprocess
import sys
import time
from enum import IntEnum
from multiprocessing import Pool, cpu_count
from typing import Dict, List
import numpy as np
# Add parent dir to path
root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(root)
sys.path.append(os.path.join(root, "engine"))
sys.path.append(os.path.join(root, "ai"))
from game.data_loader import CardDataLoader
# ============================================================================
# INLINE MINIMAL GAME STATE (Optimized for Speed)
# ============================================================================
class Phase(IntEnum):
SETUP = -2
MULLIGAN_P1 = -1
MULLIGAN_P2 = 0
ACTIVE = 1
ENERGY = 2
DRAW = 3
MAIN = 4
LIVE_SET = 5
PERFORMANCE_P1 = 6
PERFORMANCE_P2 = 7
LIVE_RESULT = 8
class FastPlayerState:
"""Minimal player state for fast simulation."""
__slots__ = [
"player_id",
"hand",
"main_deck",
"energy_deck",
"discard",
"energy_zone",
"success_lives",
"live_zone",
"stage",
"stage_energy",
"tapped_energy",
"tapped_members",
"members_played_this_turn",
"mulligan_selection",
]
def __init__(self, player_id: int):
self.player_id = player_id
self.hand: List[int] = []
self.main_deck: List[int] = []
self.energy_deck: List[int] = []
self.discard: List[int] = []
self.energy_zone: List[int] = []
self.success_lives: List[int] = []
self.live_zone: List[int] = []
self.stage: np.ndarray = np.full(3, -1, dtype=np.int32)
self.stage_energy: List[List[int]] = [[], [], []]
self.tapped_energy: np.ndarray = np.zeros(50, dtype=bool)
self.tapped_members: np.ndarray = np.zeros(3, dtype=bool)
self.members_played_this_turn: np.ndarray = np.zeros(3, dtype=bool)
self.mulligan_selection: set = set()
def untap_all(self):
self.tapped_energy[:] = False
self.tapped_members[:] = False
def count_untapped_energy(self) -> int:
return len(self.energy_zone) - np.sum(self.tapped_energy[: len(self.energy_zone)])
def get_total_hearts(self, member_db: Dict) -> np.ndarray:
total = np.zeros(7, dtype=np.int32)
for i, card_id in enumerate(self.stage):
if card_id >= 0 and not self.tapped_members[i] and card_id in member_db:
member = member_db[card_id]
total[:6] += member.hearts
return total
def get_total_blades(self, member_db: Dict) -> int:
total = 0
for i, card_id in enumerate(self.stage):
if card_id >= 0 and not self.tapped_members[i] and card_id in member_db:
total += member_db[card_id].blades
return total
class FastGameState:
"""Minimal game state - MUTABLE (no copies) for speed."""
def __init__(self, member_db: Dict, live_db: Dict):
self.member_db = member_db
self.live_db = live_db
self.players = [FastPlayerState(0), FastPlayerState(1)]
self.current_player = 0
self.first_player = 0
self.phase = Phase.ACTIVE
self.turn_number = 1
self.game_over = False
self.winner = -1
def get_legal_actions(self) -> np.ndarray:
"""Returns legal action mask - simplified for speed."""
mask = np.zeros(500, dtype=bool)
if self.game_over:
return mask
p = self.players[self.current_player]
# MULLIGAN
if self.phase in (Phase.MULLIGAN_P1, Phase.MULLIGAN_P2):
mask[0] = True
for i in range(min(len(p.hand), 60)):
mask[300 + i] = True
return mask
# Auto-advance phases
if self.phase in (
Phase.ACTIVE,
Phase.ENERGY,
Phase.DRAW,
Phase.PERFORMANCE_P1,
Phase.PERFORMANCE_P2,
Phase.LIVE_RESULT,
):
mask[0] = True
return mask
# MAIN phase
if self.phase == Phase.MAIN:
mask[0] = True
available_energy = p.count_untapped_energy()
# Play members
for i, card_id in enumerate(p.hand):
if card_id not in self.member_db:
continue
member = self.member_db[card_id]
for area in range(3):
if p.members_played_this_turn[area]:
continue
active_cost = member.cost
if p.stage[area] >= 0 and p.stage[area] in self.member_db:
active_cost = max(0, active_cost - self.member_db[p.stage[area]].cost)
if active_cost <= available_energy:
mask[1 + i * 3 + area] = True
return mask
# LIVE_SET
if self.phase == Phase.LIVE_SET:
mask[0] = True
if len(p.live_zone) < 3:
for i, card_id in enumerate(p.hand):
if card_id in self.live_db:
mask[400 + i] = True
return mask
mask[0] = True
return mask
def step_inplace(self, action_id: int):
"""Execute action IN-PLACE (no copy) for speed."""
p = self.players[self.current_player]
# MULLIGAN
if self.phase in (Phase.MULLIGAN_P1, Phase.MULLIGAN_P2):
if 300 <= action_id < 360:
idx = action_id - 300
if idx < len(p.hand):
if idx in p.mulligan_selection:
p.mulligan_selection.discard(idx)
else:
p.mulligan_selection.add(idx)
elif action_id == 0:
# Execute mulligan
if p.mulligan_selection:
to_return = sorted(p.mulligan_selection, reverse=True)
for idx in to_return:
if idx < len(p.hand):
card = p.hand.pop(idx)
p.main_deck.insert(0, card)
random.shuffle(p.main_deck)
draw_count = len(to_return)
for _ in range(draw_count):
if p.main_deck:
p.hand.append(p.main_deck.pop())
p.mulligan_selection.clear()
# Advance phase
if self.phase == Phase.MULLIGAN_P1:
self.phase = Phase.MULLIGAN_P2
self.current_player = 1
else:
self.phase = Phase.ACTIVE
self.current_player = self.first_player
return
# Auto-advance phases
if self.phase == Phase.ACTIVE:
p.untap_all()
p.members_played_this_turn[:] = False
self.phase = Phase.ENERGY
return
if self.phase == Phase.ENERGY:
if p.energy_deck:
p.energy_zone.append(p.energy_deck.pop(0))
self.phase = Phase.DRAW
return
if self.phase == Phase.DRAW:
for _ in range(2):
if p.main_deck:
p.hand.append(p.main_deck.pop())
self.phase = Phase.MAIN
return
# MAIN phase actions
if self.phase == Phase.MAIN:
if action_id == 0:
self.phase = Phase.LIVE_SET
return
if 1 <= action_id <= 180:
hand_idx = (action_id - 1) // 3
area = (action_id - 1) % 3
if hand_idx < len(p.hand):
card_id = p.hand[hand_idx]
if card_id in self.member_db:
member = self.member_db[card_id]
cost = member.cost
# Baton touch cost reduction
if p.stage[area] >= 0 and p.stage[area] in self.member_db:
old_cost = self.member_db[p.stage[area]].cost
cost = max(0, cost - old_cost)
# Send old member to discard
p.discard.append(p.stage[area])
# Pay cost
paid = 0
for i in range(len(p.energy_zone)):
if paid >= cost:
break
if not p.tapped_energy[i]:
p.tapped_energy[i] = True
paid += 1
# Place member
p.hand.pop(hand_idx)
p.stage[area] = card_id
p.members_played_this_turn[area] = True
return
# LIVE_SET
if self.phase == Phase.LIVE_SET:
if action_id == 0:
self.phase = Phase.PERFORMANCE_P1
self.current_player = self.first_player
return
if 400 <= action_id < 460:
idx = action_id - 400
if idx < len(p.hand) and p.hand[idx] in self.live_db:
card = p.hand.pop(idx)
p.live_zone.append(card)
return
# PERFORMANCE phases (simplified - just check and resolve)
if self.phase == Phase.PERFORMANCE_P1:
self._resolve_performance(self.first_player)
self.phase = Phase.PERFORMANCE_P2
self.current_player = 1 - self.first_player
return
if self.phase == Phase.PERFORMANCE_P2:
self._resolve_performance(1 - self.first_player)
self.phase = Phase.LIVE_RESULT
return
if self.phase == Phase.LIVE_RESULT:
self._check_win()
# Next turn
self.turn_number += 1
self.first_player = 1 - self.first_player
self.current_player = self.first_player
self.phase = Phase.ACTIVE
return
def _resolve_performance(self, pid: int):
"""Simplified performance resolution."""
p = self.players[pid]
if not p.live_zone:
return
hearts = p.get_total_hearts(self.member_db)
passed = []
failed = []
for live_id in p.live_zone:
if live_id not in self.live_db:
failed.append(live_id)
continue
live = self.live_db[live_id]
req = live.required_hearts
have = hearts.copy()
# Check colored hearts
ok = True
for c in range(6):
if have[c] >= req[c]:
have[c] -= req[c]
else:
ok = False
break
# Check 'any' hearts
if ok and np.sum(have) >= req[6]:
passed.append(live_id)
else:
failed.append(live_id)
p.success_lives.extend(passed)
p.discard.extend(failed)
p.live_zone = []
def _check_win(self):
if len(self.players[0].success_lives) >= 3:
self.game_over = True
self.winner = 0
elif len(self.players[1].success_lives) >= 3:
self.game_over = True
self.winner = 1
# ============================================================================
# FAST AGENTS (Simplified for Speed)
# ============================================================================
class FastTrueRandomAgent:
def choose_action(self, state: FastGameState, pid: int) -> int:
mask = state.get_legal_actions()
legal = np.where(mask)[0]
return int(np.random.choice(legal)) if len(legal) > 0 else 0
class FastRandomAgent:
def choose_action(self, state: FastGameState, pid: int) -> int:
mask = state.get_legal_actions()
legal = np.where(mask)[0]
if len(legal) == 0:
return 0
non_pass = [i for i in legal if i != 0]
if non_pass and random.random() < 0.8:
return int(random.choice(non_pass))
return int(np.random.choice(legal))
class FastSmartAgent:
def choose_action(self, state: FastGameState, pid: int) -> int:
mask = state.get_legal_actions()
legal = np.where(mask)[0]
if len(legal) == 0:
return 0
p = state.players[pid]
# MULLIGAN: Keep low cost members
if state.phase in (Phase.MULLIGAN_P1, Phase.MULLIGAN_P2):
for i, card_id in enumerate(p.hand):
should_keep = card_id in state.member_db and state.member_db[card_id].cost <= 3
is_marked = i in p.mulligan_selection
if should_keep and is_marked:
return 300 + i
if not should_keep and not is_marked:
return 300 + i
return 0
# LIVE_SET: Set viable lives
if state.phase == Phase.LIVE_SET:
live_actions = [i for i in legal if 400 <= i < 460]
if live_actions:
hearts = p.get_total_hearts(state.member_db)
for action in live_actions:
idx = action - 400
if idx < len(p.hand):
live_id = p.hand[idx]
if live_id in state.live_db:
req = state.live_db[live_id].required_hearts
have = hearts.copy()
ok = True
for c in range(6):
if have[c] >= req[c]:
have[c] -= req[c]
else:
ok = False
break
if ok and np.sum(have) >= req[6]:
return action
return 0
# MAIN: Play members
if state.phase == Phase.MAIN:
play_actions = [i for i in legal if 1 <= i <= 180]
if play_actions:
return int(random.choice(play_actions))
# Default: non-pass if available
non_pass = [i for i in legal if i != 0]
if non_pass:
return int(random.choice(non_pass))
return 0
# ============================================================================
# TOURNAMENT LOGIC
# ============================================================================
class EloRating:
def __init__(self, k_factor=32):
self.k_factor = k_factor
self.ratings = {}
self.matches = {}
self.wins = {}
self.draws = {}
def init_agent(self, name):
if name not in self.ratings:
self.ratings[name] = 1000
self.matches[name] = 0
self.wins[name] = 0
self.draws[name] = 0
def update(self, agent_a, agent_b, score_a):
self.init_agent(agent_a)
self.init_agent(agent_b)
self.matches[agent_a] += 1
self.matches[agent_b] += 1
if score_a == 1:
self.wins[agent_a] += 1
elif score_a == 0:
self.wins[agent_b] += 1
else:
self.draws[agent_a] += 1
self.draws[agent_b] += 1
ra, rb = self.ratings[agent_a], self.ratings[agent_b]
ea = 1 / (1 + 10 ** ((rb - ra) / 400))
eb = 1 - ea
k = self.k_factor * 2 if self.matches[agent_a] <= 20 else self.k_factor
self.ratings[agent_a] = ra + k * (score_a - ea)
self.ratings[agent_b] = rb + k * ((1 - score_a) - eb)
def get_free_ram() -> int:
try:
output = subprocess.check_output(["wmic", "OS", "get", "FreePhysicalMemory", "/Value"], encoding="utf-8")
for line in output.splitlines():
if "FreePhysicalMemory" in line:
return int(line.split("=")[1].strip()) // 1024
return 4096
except:
return 4096
# Global for workers
G_MEMBER_DB = {}
G_LIVE_DB = {}
G_AGENTS = {}
def init_worker(member_db, live_db):
global G_MEMBER_DB, G_LIVE_DB, G_AGENTS
G_MEMBER_DB = member_db
G_LIVE_DB = live_db
G_AGENTS = {
"TrueRandom": FastTrueRandomAgent(),
"Random": FastRandomAgent(),
"Smart": FastSmartAgent(),
}
def run_single_game(args_tuple):
agent_name_a, agent_name_b, game_seed = args_tuple
random.seed(game_seed)
np.random.seed(game_seed)
state = FastGameState(G_MEMBER_DB, G_LIVE_DB)
agent_a = G_AGENTS[agent_name_a]
agent_b = G_AGENTS[agent_name_b]
# Setup decks
m_ids = list(G_MEMBER_DB.keys())
l_ids = list(G_LIVE_DB.keys())
for p in state.players:
p.energy_deck = [2000] * 12
p.main_deck = [random.choice(m_ids) for _ in range(40)] + [random.choice(l_ids) for _ in range(10)]
random.shuffle(p.main_deck)
for _ in range(5):
if p.main_deck:
p.hand.append(p.main_deck.pop())
for _ in range(3):
if p.energy_deck:
p.energy_zone.append(p.energy_deck.pop(0))
first_player = random.randint(0, 1)
state.first_player = first_player
state.current_player = first_player
state.phase = Phase.MULLIGAN_P1
# Run game
for _ in range(2000):
if state.game_over:
break
mask = state.get_legal_actions()
if not np.any(mask):
state.game_over = True
state.winner = 2
break
pid = state.current_player
active_agent = agent_a if (first_player == 0 and pid == 0) or (first_player == 1 and pid == 1) else agent_b
if first_player == 0:
active_agent = agent_a if pid == 0 else agent_b
else:
active_agent = agent_b if pid == 0 else agent_a
action = active_agent.choose_action(state, pid)
state.step_inplace(action)
# Result
if not state.game_over:
s0 = len(state.players[0].success_lives)
s1 = len(state.players[1].success_lives)
winner = 0 if s0 > s1 else (1 if s1 > s0 else 2)
else:
winner = state.winner
if winner == 2:
return 0.5
if first_player == 0:
return 1.0 if winner == 0 else 0.0
else:
return 0.0 if winner == 0 else 1.0
def main():
parser = argparse.ArgumentParser(description="FAST CPU-Only Agent Tournament")
parser.add_argument("--games_per_pair", type=int, default=10)
parser.add_argument("--seed", type=int, default=42)
parser.add_argument("--workers", type=int, default=0)
args = parser.parse_args()
random.seed(args.seed)
np.random.seed(args.seed)
# Only 3 fast agents for now
agent_names = ["TrueRandom", "Random", "Smart"]
elo = EloRating()
for name in agent_names:
elo.init_agent(name)
matchups = {}
for i in range(len(agent_names)):
for j in range(i + 1, len(agent_names)):
matchups[(agent_names[i], agent_names[j])] = [0, 0, 0]
loader = CardDataLoader("data/cards.json")
m, l, e = loader.load()
# Workers
cores = cpu_count()
ram = get_free_ram()
max_workers = min(ram // 50, cores - 1)
num_workers = args.workers if args.workers > 0 else max(1, max_workers)
print(f"FAST Tournament: {cores} cores, {ram}MB RAM, {num_workers} workers")
# Build tasks
tasks = []
meta = []
for i in range(len(agent_names)):
for j in range(i + 1, len(agent_names)):
for g in range(args.games_per_pair):
tasks.append((agent_names[i], agent_names[j], args.seed + len(tasks)))
meta.append((agent_names[i], agent_names[j]))
print(f"Running {len(tasks)} games...")
start = time.time()
with Pool(num_workers, init_worker, (m, l)) as pool:
results = list(pool.imap(run_single_game, tasks, chunksize=8))
elapsed = time.time() - start
for result, (a, b) in zip(results, meta):
elo.update(a, b, result)
if result == 1.0:
matchups[(a, b)][0] += 1
elif result == 0.0:
matchups[(a, b)][1] += 1
else:
matchups[(a, b)][2] += 1
print(f"\nCompleted in {elapsed:.2f}s ({elapsed / len(tasks) * 1000:.1f}ms/game)")
print(f"Throughput: {len(tasks) / elapsed:.1f} games/sec")
print("\n" + "=" * 50)
print(f"{'Agent':<12} | {'ELO':<6} | {'Wins':<5} | {'Win%'}")
print("-" * 50)
for name in sorted(agent_names, key=lambda x: elo.ratings[x], reverse=True):
e_score = int(elo.ratings[name])
w = elo.wins[name]
m_count = elo.matches[name]
wr = f"{w / m_count * 100:.1f}%" if m_count > 0 else "N/A"
print(f"{name:<12} | {e_score:<6} | {w:<5} | {wr}")
print("=" * 50)
if __name__ == "__main__":
main()