thomas-schweich commited on
Commit
f9aaa57
·
1 Parent(s): 44311e2

Add Monte Carlo accuracy ceiling computation to engine

Browse files

Engine additions:
- GameState::from_move_tokens() — replay from token sequence
- GameState::play_random_to_end() — play random moves to termination
- rollout_legal_moves() — MC rollouts from each legal move at a position
- compute_accuracy_ceiling() — full ceiling computation with rayon parallelism
- Python binding: chess_engine.compute_accuracy_ceiling()

scripts/compute_theoretical_ceiling.py: Reproducible script that computes
unconditional (E[1/N_legal]) and outcome-conditioned (MC rollout) ceilings
with per-outcome and per-distance-from-end breakdowns. Saves to JSON.

engine/python/chess_engine/__init__.py CHANGED
@@ -31,6 +31,8 @@ from chess_engine._engine import (
31
  # Interactive game state (for RL)
32
  PyGameState,
33
  PyBatchRLEnv,
 
 
34
  # Utilities
35
  hello,
36
  )
@@ -55,5 +57,6 @@ __all__ = [
55
  "export_move_vocabulary",
56
  "PyGameState",
57
  "PyBatchRLEnv",
 
58
  "hello",
59
  ]
 
31
  # Interactive game state (for RL)
32
  PyGameState,
33
  PyBatchRLEnv,
34
+ # Accuracy ceiling
35
+ compute_accuracy_ceiling_py as compute_accuracy_ceiling,
36
  # Utilities
37
  hello,
38
  )
 
57
  "export_move_vocabulary",
58
  "PyGameState",
59
  "PyBatchRLEnv",
60
+ "compute_accuracy_ceiling",
61
  "hello",
62
  ]
engine/src/board.rs CHANGED
@@ -470,6 +470,29 @@ impl GameState {
470
  self.make_move(token).ok();
471
  Some(token)
472
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
473
  }
474
 
475
  #[cfg(test)]
 
470
  self.make_move(token).ok();
471
  Some(token)
472
  }
473
+
474
+ /// Create a GameState by replaying a sequence of move tokens from the starting position.
475
+ /// Returns an error if any token is invalid or illegal.
476
+ pub fn from_move_tokens(tokens: &[u16]) -> Result<Self, String> {
477
+ let mut state = Self::new();
478
+ for (i, &token) in tokens.iter().enumerate() {
479
+ state.make_move(token).map_err(|e| format!("ply {}: {}", i, e))?;
480
+ }
481
+ Ok(state)
482
+ }
483
+
484
+ /// Play out a random game from the current position to completion.
485
+ /// Returns the termination type.
486
+ pub fn play_random_to_end(&mut self, rng: &mut impl Rng, max_ply: usize) -> Termination {
487
+ loop {
488
+ if let Some(term) = self.check_termination(max_ply) {
489
+ return term;
490
+ }
491
+ if self.make_random_move(rng).is_none() {
492
+ return Termination::Stalemate;
493
+ }
494
+ }
495
+ }
496
  }
497
 
498
  #[cfg(test)]
engine/src/lib.rs CHANGED
@@ -902,6 +902,70 @@ impl PyBatchRLEnv {
902
  }
903
  }
904
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
905
  #[pymodule]
906
  fn _engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
907
  m.add_function(wrap_pyfunction!(hello, m)?)?;
@@ -924,5 +988,6 @@ fn _engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
924
  m.add_class::<PyBatchRLEnv>()?;
925
  m.add_function(wrap_pyfunction!(parse_pgn_file, m)?)?;
926
  m.add_function(wrap_pyfunction!(pgn_to_tokens, m)?)?;
 
927
  Ok(())
928
  }
 
902
  }
903
  }
904
 
905
+ /// Compute theoretical accuracy ceiling via Monte Carlo rollouts.
906
+ ///
907
+ /// For a sample of positions from random games, estimates:
908
+ /// - Unconditional ceiling: E[1/N_legal]
909
+ /// - Conditional ceiling: E[max_m P(m|outcome, history)] via rollouts
910
+ ///
911
+ /// Returns dict with overall ceilings and per-position data.
912
+ #[pyfunction]
913
+ #[pyo3(signature = (n_games=1000, max_ply=255, n_rollouts=32, sample_rate=0.01, seed=77777))]
914
+ fn compute_accuracy_ceiling_py(
915
+ py: Python<'_>,
916
+ n_games: usize,
917
+ max_ply: usize,
918
+ n_rollouts: usize,
919
+ sample_rate: f64,
920
+ seed: u64,
921
+ ) -> PyResult<PyObject> {
922
+ let results = py.allow_threads(|| {
923
+ random::compute_accuracy_ceiling(n_games, max_ply, n_rollouts, sample_rate, seed)
924
+ });
925
+
926
+ let n = results.len();
927
+ let mut uncond_sum = 0.0f64;
928
+ let mut cond_sum = 0.0f64;
929
+
930
+ // Build per-position arrays
931
+ let mut plies = Vec::with_capacity(n);
932
+ let mut game_lengths = Vec::with_capacity(n);
933
+ let mut n_legals = Vec::with_capacity(n);
934
+ let mut unconditionals = Vec::with_capacity(n);
935
+ let mut conditionals = Vec::with_capacity(n);
936
+ let mut outcomes = Vec::with_capacity(n);
937
+
938
+ for r in &results {
939
+ uncond_sum += r.unconditional;
940
+ cond_sum += r.conditional;
941
+ plies.push(r.ply);
942
+ game_lengths.push(r.game_length);
943
+ n_legals.push(r.n_legal);
944
+ unconditionals.push(r.unconditional as f32);
945
+ conditionals.push(r.conditional as f32);
946
+ outcomes.push(r.actual_outcome);
947
+ }
948
+
949
+ let dict = pyo3::types::PyDict::new(py);
950
+ dict.set_item("n_positions", n)?;
951
+ dict.set_item("n_games", n_games)?;
952
+ dict.set_item("n_rollouts", n_rollouts)?;
953
+ dict.set_item("sample_rate", sample_rate)?;
954
+ dict.set_item("unconditional_ceiling", if n > 0 { uncond_sum / n as f64 } else { 0.0 })?;
955
+ dict.set_item("conditional_ceiling", if n > 0 { cond_sum / n as f64 } else { 0.0 })?;
956
+
957
+ // Return numpy arrays for per-position data
958
+ let np = py.import("numpy")?;
959
+ dict.set_item("ply", np.call_method1("array", (plies,))?)?;
960
+ dict.set_item("game_length", np.call_method1("array", (game_lengths,))?)?;
961
+ dict.set_item("n_legal", np.call_method1("array", (n_legals,))?)?;
962
+ dict.set_item("unconditional", np.call_method1("array", (unconditionals,))?)?;
963
+ dict.set_item("conditional", np.call_method1("array", (conditionals,))?)?;
964
+ dict.set_item("outcome", np.call_method1("array", (outcomes,))?)?;
965
+
966
+ Ok(dict.into())
967
+ }
968
+
969
  #[pymodule]
970
  fn _engine(m: &Bound<'_, PyModule>) -> PyResult<()> {
971
  m.add_function(wrap_pyfunction!(hello, m)?)?;
 
988
  m.add_class::<PyBatchRLEnv>()?;
989
  m.add_function(wrap_pyfunction!(parse_pgn_file, m)?)?;
990
  m.add_function(wrap_pyfunction!(pgn_to_tokens, m)?)?;
991
+ m.add_function(wrap_pyfunction!(compute_accuracy_ceiling_py, m)?)?;
992
  Ok(())
993
  }
engine/src/random.rs CHANGED
@@ -87,6 +87,160 @@ pub fn generate_one_game(seed: u64, max_ply: usize) -> (Vec<u16>, u16, Terminati
87
  }
88
  }
89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  /// Training example for checkmate prediction.
91
  pub struct CheckmateExample {
92
  pub move_ids: Vec<u16>, // full game including mating move
 
87
  }
88
  }
89
 
90
+ /// Outcome distribution from Monte Carlo rollouts.
91
+ #[derive(Debug, Clone, Default)]
92
+ pub struct OutcomeDistribution {
93
+ pub counts: [u32; 6], // indexed by Termination as usize
94
+ pub total: u32,
95
+ }
96
+
97
+ /// Result for a single position in the ceiling computation.
98
+ #[derive(Debug, Clone)]
99
+ pub struct PositionCeiling {
100
+ /// Number of legal moves at this position
101
+ pub n_legal: u32,
102
+ /// Unconditional ceiling: 1/n_legal
103
+ pub unconditional: f64,
104
+ /// Conditional ceiling: max_m P(m | outcome, history) where the max is over
105
+ /// legal moves and P is estimated from rollouts
106
+ pub conditional: f64,
107
+ /// The actual outcome of the game this position came from
108
+ pub actual_outcome: u8,
109
+ /// Ply index within the game
110
+ pub ply: u16,
111
+ /// Game length
112
+ pub game_length: u16,
113
+ }
114
+
115
+ /// For a given position (as move token prefix), play out N random continuations
116
+ /// from each legal move and return the outcome distribution per move.
117
+ ///
118
+ /// Returns Vec<(token, OutcomeDistribution)> for each legal move.
119
+ pub fn rollout_legal_moves(
120
+ prefix_tokens: &[u16],
121
+ n_rollouts: usize,
122
+ max_ply: usize,
123
+ base_seed: u64,
124
+ ) -> Vec<(u16, OutcomeDistribution)> {
125
+ let state = match GameState::from_move_tokens(prefix_tokens) {
126
+ Ok(s) => s,
127
+ Err(_) => return Vec::new(),
128
+ };
129
+
130
+ let legal_tokens = state.legal_move_tokens();
131
+ if legal_tokens.is_empty() {
132
+ return Vec::new();
133
+ }
134
+
135
+ let seeds = derive_game_seeds(base_seed, legal_tokens.len() * n_rollouts);
136
+
137
+ legal_tokens
138
+ .iter()
139
+ .enumerate()
140
+ .map(|(move_idx, &token)| {
141
+ let mut dist = OutcomeDistribution::default();
142
+ for r in 0..n_rollouts {
143
+ let seed = seeds[move_idx * n_rollouts + r];
144
+ let mut rng = ChaCha8Rng::seed_from_u64(seed);
145
+ let mut s = state.clone();
146
+ s.make_move(token).unwrap();
147
+ let term = s.play_random_to_end(&mut rng, max_ply);
148
+ dist.counts[term as usize] += 1;
149
+ dist.total += 1;
150
+ }
151
+ (token, dist)
152
+ })
153
+ .collect()
154
+ }
155
+
156
+ /// Compute the theoretical accuracy ceiling for a batch of random games.
157
+ ///
158
+ /// For each position in each game:
159
+ /// - Computes 1/N_legal (unconditional ceiling)
160
+ /// - Uses Monte Carlo rollouts to estimate the conditional ceiling
161
+ /// (how well you can predict the move given the outcome)
162
+ ///
163
+ /// Returns per-position results. The overall ceiling is the mean.
164
+ pub fn compute_accuracy_ceiling(
165
+ n_games: usize,
166
+ max_ply: usize,
167
+ n_rollouts_per_move: usize,
168
+ sample_rate: f64, // fraction of positions to sample (1.0 = all, 0.01 = 1%)
169
+ base_seed: u64,
170
+ ) -> Vec<PositionCeiling> {
171
+ let game_seeds = derive_game_seeds(base_seed, n_games);
172
+
173
+ // Generate all games first
174
+ let games: Vec<(Vec<u16>, u16, Termination)> = game_seeds
175
+ .par_iter()
176
+ .map(|&seed| generate_one_game(seed, max_ply))
177
+ .collect();
178
+
179
+ // For each sampled position, compute the ceiling
180
+ let mut rng_sample = ChaCha8Rng::seed_from_u64(base_seed.wrapping_add(999));
181
+ let mut work_items: Vec<(usize, usize, u8, u16)> = Vec::new(); // (game_idx, ply, outcome, game_length)
182
+
183
+ for (game_idx, (move_ids, game_length, termination)) in games.iter().enumerate() {
184
+ let gl = *game_length as usize;
185
+ let outcome = *termination as u8;
186
+ for ply in 0..gl {
187
+ if sample_rate >= 1.0 || rng_sample.gen::<f64>() < sample_rate {
188
+ work_items.push((game_idx, ply, outcome, *game_length));
189
+ }
190
+ }
191
+ }
192
+
193
+ // Process positions in parallel
194
+ let rollout_seed_base = base_seed.wrapping_add(1_000_000);
195
+
196
+ work_items
197
+ .par_iter()
198
+ .enumerate()
199
+ .map(|(work_idx, &(game_idx, ply, actual_outcome, game_length))| {
200
+ let prefix = &games[game_idx].0[..ply];
201
+ let actual_move = games[game_idx].0[ply];
202
+
203
+ let rollout_seed = rollout_seed_base.wrapping_add(work_idx as u64 * 1000);
204
+ let move_dists = rollout_legal_moves(prefix, n_rollouts_per_move, max_ply, rollout_seed);
205
+
206
+ let n_legal = move_dists.len() as u32;
207
+ let unconditional = if n_legal > 0 { 1.0 / n_legal as f64 } else { 0.0 };
208
+
209
+ // Conditional ceiling: P(actual_outcome | move) for each move,
210
+ // then the best predictor picks the move with highest P(outcome|move).
211
+ // Accuracy = max_m P(m | outcome) = max_m [P(outcome|m) / sum_m' P(outcome|m')]
212
+ let outcome_idx = actual_outcome as usize;
213
+ let probs: Vec<f64> = move_dists
214
+ .iter()
215
+ .map(|(_, dist)| {
216
+ if dist.total > 0 {
217
+ dist.counts[outcome_idx] as f64 / dist.total as f64
218
+ } else {
219
+ 0.0
220
+ }
221
+ })
222
+ .collect();
223
+
224
+ let sum_probs: f64 = probs.iter().sum();
225
+ let conditional = if sum_probs > 0.0 {
226
+ let max_prob = probs.iter().cloned().fold(0.0f64, f64::max);
227
+ max_prob / sum_probs
228
+ } else {
229
+ unconditional
230
+ };
231
+
232
+ PositionCeiling {
233
+ n_legal,
234
+ unconditional,
235
+ conditional,
236
+ actual_outcome,
237
+ ply: ply as u16,
238
+ game_length,
239
+ }
240
+ })
241
+ .collect()
242
+ }
243
+
244
  /// Training example for checkmate prediction.
245
  pub struct CheckmateExample {
246
  pub move_ids: Vec<u16>, // full game including mating move
scripts/compute_theoretical_ceiling.py CHANGED
@@ -1,17 +1,20 @@
1
  #!/usr/bin/env python3
2
  """Compute theoretical maximum top-1 accuracy for random chess play.
3
 
4
- Two ceilings:
 
5
  1. Unconditional: E[1/N_legal] — best accuracy without knowing the outcome.
6
  2. Outcome-conditioned: E[max_m P(m|outcome, history)] — best accuracy when
7
- the outcome token is known. Estimated via Monte Carlo rollouts.
 
8
 
9
  The "adjusted accuracy" normalizes model accuracy against these ceilings:
10
  adjusted = model_accuracy / ceiling
11
 
12
  Usage:
13
- uv run python scripts/compute_theoretical_ceiling.py --n-games 10000
14
- uv run python scripts/compute_theoretical_ceiling.py --n-games 50000 --rollouts 64
 
15
  """
16
 
17
  from __future__ import annotations
@@ -19,7 +22,6 @@ from __future__ import annotations
19
  import argparse
20
  import json
21
  import time
22
- from collections import defaultdict
23
  from pathlib import Path
24
 
25
  import numpy as np
@@ -27,208 +29,16 @@ import numpy as np
27
  import chess_engine as engine
28
 
29
 
30
- def compute_unconditional_ceiling(
31
- n_games: int, max_ply: int = 255, seed: int = 77777,
32
- ) -> dict:
33
- """Compute E[1/N_legal] from a corpus of random games.
34
-
35
- This is the theoretical maximum top-1 accuracy for a predictor that
36
- knows the rules of chess but NOT the outcome token.
37
- """
38
- # Generate random games and get legal move masks
39
- move_ids, game_lengths, term_codes = engine.generate_random_games(
40
- n_games, max_ply, seed,
41
- )
42
-
43
- # Compute legal move masks: grid is (n_games, max_ply, 64) packed bits
44
- grid, promo = engine.compute_legal_move_masks(move_ids, game_lengths)
45
-
46
- # Count legal moves at each position
47
- inv_n_sum = 0.0
48
- total_positions = 0
49
- inv_n_by_ply = defaultdict(list)
50
-
51
- for i in range(n_games):
52
- gl = int(game_lengths[i])
53
- for ply in range(gl):
54
- # Count legal grid moves: unpack 64 uint64 values, popcount each
55
- n_legal = 0
56
- for sq in range(64):
57
- n_legal += bin(int(grid[i, ply, sq])).count('1')
58
- # Add promotion moves
59
- if promo is not None and promo.shape[1] > ply:
60
- n_legal += int(np.sum(promo[i, ply] > 0))
61
-
62
- if n_legal > 0:
63
- inv_n_sum += 1.0 / n_legal
64
- inv_n_by_ply[ply].append(1.0 / n_legal)
65
- total_positions += 1
66
-
67
- overall = inv_n_sum / total_positions if total_positions else 0
68
-
69
- # Per-ply breakdown (sampled)
70
- ply_ceilings = {}
71
- for ply in sorted(inv_n_by_ply.keys())[:256]:
72
- vals = inv_n_by_ply[ply]
73
- ply_ceilings[ply] = sum(vals) / len(vals)
74
-
75
- return {
76
- "unconditional_ceiling": overall,
77
- "total_positions": total_positions,
78
- "n_games": n_games,
79
- "per_ply_ceiling": ply_ceilings,
80
- }
81
-
82
-
83
- def compute_conditional_ceiling_mc(
84
- n_games: int = 5000,
85
- n_sample_positions: int = 2000,
86
- n_rollouts: int = 32,
87
- max_ply: int = 255,
88
- seed: int = 88888,
89
- ) -> dict:
90
- """Estimate outcome-conditioned ceiling via Monte Carlo rollouts.
91
-
92
- For a sample of positions, enumerate legal moves and estimate
93
- P(outcome | move, history) by playing out random continuations.
94
- The Bayes-optimal predictor picks argmax, giving accuracy =
95
- max_m P(outcome | move, history) / sum_m P(outcome | move, history).
96
-
97
- This requires playing games from arbitrary positions, which we approximate
98
- by generating many games and looking at positions where the same board
99
- state appears with different continuations.
100
-
101
- More practical approach: for each sampled position in a game:
102
- - We know the actual outcome O and the actual move m*
103
- - We know N_legal moves
104
- - We estimate: does knowing O help predict m*?
105
- - Specifically: we compute the fraction of random continuations from m*
106
- that produce outcome O, vs the average fraction across all legal moves.
107
-
108
- Since we can't easily play from arbitrary positions in the engine,
109
- we use an analytical approximation based on game structure:
110
- - Near game end (last few plies of checkmate): huge conditioning benefit
111
- - Mid-game: minimal conditioning benefit (~= 1/N)
112
- - PLY_LIMIT games: game length is known, slight benefit
113
- """
114
- # Generate games
115
- move_ids, game_lengths, term_codes = engine.generate_random_games(
116
- n_games, max_ply, seed,
117
- )
118
- grid, promo = engine.compute_legal_move_masks(move_ids, game_lengths)
119
-
120
- # Analytical estimation of conditioning benefit
121
- #
122
- # For each position, the conditioning benefit depends on:
123
- # 1. How many plies remain (closer to end = more benefit)
124
- # 2. The outcome type (checkmate is more constraining than ply_limit)
125
- #
126
- # At the LAST ply of a checkmate game:
127
- # Only checkmate-delivering moves are consistent with the outcome.
128
- # Ceiling = 1/n_checkmate_moves (often 1-3 out of ~30 legal moves)
129
- #
130
- # At earlier plies: the benefit decays roughly exponentially.
131
- # P(outcome | move, history) ≈ 1/N_legal * (1 + benefit(plies_remaining))
132
- # where benefit → large near the end, → 0 far from the end.
133
-
134
- # Empirical approach: measure how concentrated the move distribution is
135
- # by looking at the last K plies of decisive games.
136
- conditioning_by_plies_from_end = defaultdict(list)
137
-
138
- for i in range(min(n_games, 10000)):
139
- gl = int(game_lengths[i])
140
- tc = int(term_codes[i]) # 0=checkmate, 1=stalemate, etc.
141
-
142
- for ply in range(gl):
143
- plies_from_end = gl - ply
144
-
145
- # Count legal moves
146
- n_legal = 0
147
- for sq in range(64):
148
- n_legal += bin(int(grid[i, ply, sq])).count('1')
149
- if promo is not None and promo.shape[1] > ply:
150
- n_legal += int(np.sum(promo[i, ply] > 0))
151
-
152
- if n_legal <= 0:
153
- continue
154
-
155
- # For the last move of a checkmate: only 1 move delivers mate
156
- # (approximately — sometimes 2-3 moves all give checkmate)
157
- if tc == 0 and plies_from_end == 1:
158
- # Last move is checkmate. Estimate ~1-2 mating moves.
159
- # Ceiling ≈ 1/min(n_legal, 2)
160
- effective_n = min(n_legal, 2)
161
- elif tc == 0 and plies_from_end <= 3:
162
- # Near-checkmate: some conditioning benefit
163
- # Rough: conditioning cuts effective choices by factor of
164
- # plies_from_end
165
- effective_n = max(1, n_legal / plies_from_end)
166
- elif tc == 1 and plies_from_end == 1:
167
- # Last move before stalemate
168
- effective_n = min(n_legal, 3)
169
- else:
170
- # General position: conditioning benefit is small
171
- # The outcome provides ~log2(5) ≈ 2.3 bits over the whole
172
- # game, distributed across ~gl plies. Per-ply benefit is tiny.
173
- effective_n = n_legal
174
-
175
- conditioning_by_plies_from_end[plies_from_end].append(
176
- 1.0 / effective_n
177
- )
178
-
179
- # Compute overall conditioned ceiling
180
- all_conditioned = []
181
- all_unconditioned = []
182
- for i in range(min(n_games, 10000)):
183
- gl = int(game_lengths[i])
184
- tc = int(term_codes[i])
185
- for ply in range(gl):
186
- n_legal = 0
187
- for sq in range(64):
188
- n_legal += bin(int(grid[i, ply, sq])).count('1')
189
- if promo is not None and promo.shape[1] > ply:
190
- n_legal += int(np.sum(promo[i, ply] > 0))
191
- if n_legal <= 0:
192
- continue
193
-
194
- plies_from_end = gl - ply
195
- all_unconditioned.append(1.0 / n_legal)
196
-
197
- if tc == 0 and plies_from_end == 1:
198
- all_conditioned.append(1.0 / min(n_legal, 2))
199
- elif tc == 0 and plies_from_end <= 3:
200
- all_conditioned.append(1.0 / max(1, n_legal / plies_from_end))
201
- elif tc == 1 and plies_from_end == 1:
202
- all_conditioned.append(1.0 / min(n_legal, 3))
203
- else:
204
- all_conditioned.append(1.0 / n_legal)
205
-
206
- uncond = np.mean(all_unconditioned)
207
- cond = np.mean(all_conditioned)
208
-
209
- # Per-distance-from-end breakdown
210
- by_distance = {}
211
- for dist in sorted(conditioning_by_plies_from_end.keys()):
212
- if dist <= 20:
213
- vals = conditioning_by_plies_from_end[dist]
214
- by_distance[dist] = float(np.mean(vals))
215
-
216
- return {
217
- "conditional_ceiling_estimate": float(cond),
218
- "unconditional_ceiling": float(uncond),
219
- "conditioning_boost": float(cond / uncond) if uncond > 0 else 0,
220
- "n_positions": len(all_conditioned),
221
- "ceiling_by_plies_from_end": by_distance,
222
- "note": "Conditional ceiling is an analytical estimate, not exact Monte Carlo. "
223
- "The main benefit comes from the last 1-3 plies of decisive games.",
224
- }
225
-
226
-
227
  def main():
228
  parser = argparse.ArgumentParser(
229
  description="Compute theoretical accuracy ceilings for random chess"
230
  )
231
- parser.add_argument("--n-games", type=int, default=10000)
 
 
 
 
 
232
  parser.add_argument("--seed", type=int, default=77777)
233
  parser.add_argument("--output", type=str, default="data/theoretical_ceiling.json")
234
  parser.add_argument("--model-accuracy", type=float, default=None,
@@ -238,62 +48,110 @@ def main():
238
  output_path = Path(args.output)
239
  output_path.parent.mkdir(parents=True, exist_ok=True)
240
 
241
- print(f"Computing theoretical accuracy ceilings ({args.n_games:,} games)...")
 
 
 
 
242
  print()
243
 
244
  t0 = time.time()
 
 
 
 
 
 
 
 
245
 
246
- # Unconditional ceiling
247
- print("1. Unconditional ceiling (E[1/N_legal])...")
248
- uncond = compute_unconditional_ceiling(args.n_games, seed=args.seed)
249
- print(f" = {uncond['unconditional_ceiling']:.4f} "
250
- f"({uncond['unconditional_ceiling']*100:.2f}%)")
251
- print(f" ({uncond['total_positions']:,} positions from {args.n_games:,} games)")
252
 
253
- # Conditional ceiling
 
 
 
 
254
  print()
255
- print("2. Outcome-conditioned ceiling (analytical estimate)...")
256
- cond = compute_conditional_ceiling_mc(
257
- n_games=args.n_games, seed=args.seed + 1,
258
- )
259
- print(f" = {cond['conditional_ceiling_estimate']:.4f} "
260
- f"({cond['conditional_ceiling_estimate']*100:.2f}%)")
261
- print(f" Conditioning boost: {cond['conditioning_boost']:.2f}x")
262
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
263
  print()
264
- print(f" Ceiling by plies from game end:")
265
- for dist, ceil in sorted(cond["ceiling_by_plies_from_end"].items()):
266
- bar = "#" * int(ceil * 200)
267
- print(f" {dist:>3} plies from end: {ceil:.4f} ({ceil*100:.1f}%) {bar}")
268
 
269
- # Summary
270
- elapsed = time.time() - t0
271
- results = {
272
- "unconditional_ceiling": uncond["unconditional_ceiling"],
273
- "conditional_ceiling": cond["conditional_ceiling_estimate"],
274
- "conditioning_boost": cond["conditioning_boost"],
275
- "n_games": args.n_games,
276
- "total_positions": uncond["total_positions"],
277
- "per_ply_ceiling": uncond["per_ply_ceiling"],
278
- "ceiling_by_plies_from_end": cond["ceiling_by_plies_from_end"],
279
- "elapsed_seconds": elapsed,
280
- }
 
 
 
 
 
281
 
 
282
  if args.model_accuracy is not None:
283
  ma = args.model_accuracy
284
- results["model_accuracy"] = ma
285
- results["adjusted_vs_unconditional"] = ma / uncond["unconditional_ceiling"]
286
- results["adjusted_vs_conditional"] = ma / cond["conditional_ceiling_estimate"]
287
- print()
288
  print(f"Model accuracy: {ma:.4f} ({ma*100:.2f}%)")
289
- print(f" vs unconditional ceiling: {results['adjusted_vs_unconditional']:.2f}x "
290
- f"({results['adjusted_vs_unconditional']*100:.1f}% of theoretical max)")
291
- print(f" vs conditional ceiling: {results['adjusted_vs_conditional']:.2f}x "
292
- f"({results['adjusted_vs_conditional']*100:.1f}% of theoretical max)")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
293
 
294
  with open(output_path, "w") as f:
295
- json.dump(results, f, indent=2)
296
- print(f"\nSaved to {output_path} ({elapsed:.1f}s)")
297
 
298
 
299
  if __name__ == "__main__":
 
1
  #!/usr/bin/env python3
2
  """Compute theoretical maximum top-1 accuracy for random chess play.
3
 
4
+ Two ceilings computed via Monte Carlo rollouts in the Rust engine:
5
+
6
  1. Unconditional: E[1/N_legal] — best accuracy without knowing the outcome.
7
  2. Outcome-conditioned: E[max_m P(m|outcome, history)] — best accuracy when
8
+ the outcome token is known. Estimated by playing out random continuations
9
+ from each legal move and measuring which outcomes result.
10
 
11
  The "adjusted accuracy" normalizes model accuracy against these ceilings:
12
  adjusted = model_accuracy / ceiling
13
 
14
  Usage:
15
+ uv run python scripts/compute_theoretical_ceiling.py
16
+ uv run python scripts/compute_theoretical_ceiling.py --n-games 5000 --rollouts 64
17
+ uv run python scripts/compute_theoretical_ceiling.py --model-accuracy 0.070
18
  """
19
 
20
  from __future__ import annotations
 
22
  import argparse
23
  import json
24
  import time
 
25
  from pathlib import Path
26
 
27
  import numpy as np
 
29
  import chess_engine as engine
30
 
31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  def main():
33
  parser = argparse.ArgumentParser(
34
  description="Compute theoretical accuracy ceilings for random chess"
35
  )
36
+ parser.add_argument("--n-games", type=int, default=2000,
37
+ help="Number of random games to generate")
38
+ parser.add_argument("--rollouts", type=int, default=32,
39
+ help="Monte Carlo rollouts per legal move")
40
+ parser.add_argument("--sample-rate", type=float, default=0.02,
41
+ help="Fraction of positions to sample (1.0=all, 0.02=2%%)")
42
  parser.add_argument("--seed", type=int, default=77777)
43
  parser.add_argument("--output", type=str, default="data/theoretical_ceiling.json")
44
  parser.add_argument("--model-accuracy", type=float, default=None,
 
48
  output_path = Path(args.output)
49
  output_path.parent.mkdir(parents=True, exist_ok=True)
50
 
51
+ print(f"Computing theoretical accuracy ceilings")
52
+ print(f" Games: {args.n_games:,}")
53
+ print(f" Rollouts/move: {args.rollouts}")
54
+ print(f" Sample rate: {args.sample_rate:.0%}")
55
+ print(f" Seed: {args.seed}")
56
  print()
57
 
58
  t0 = time.time()
59
+ result = engine.compute_accuracy_ceiling(
60
+ n_games=args.n_games,
61
+ max_ply=255,
62
+ n_rollouts=args.rollouts,
63
+ sample_rate=args.sample_rate,
64
+ seed=args.seed,
65
+ )
66
+ elapsed = time.time() - t0
67
 
68
+ uncond = result["unconditional_ceiling"]
69
+ cond = result["conditional_ceiling"]
70
+ boost = cond / uncond if uncond > 0 else 0
 
 
 
71
 
72
+ print(f"Positions sampled: {result['n_positions']:,}")
73
+ print(f"Unconditional ceiling: {uncond:.4f} ({uncond*100:.2f}%)")
74
+ print(f"Conditional ceiling: {cond:.4f} ({cond*100:.2f}%)")
75
+ print(f"Conditioning boost: {boost:.2f}x")
76
+ print(f"Time: {elapsed:.0f}s")
77
  print()
 
 
 
 
 
 
 
78
 
79
+ # Per-outcome breakdown
80
+ outcomes = result["outcome"]
81
+ conditionals = result["conditional"]
82
+ unconditionals = result["unconditional"]
83
+ outcome_names = [
84
+ "Checkmate", "Stalemate", "75-move", "5-fold rep",
85
+ "Insuff mat", "Ply limit",
86
+ ]
87
+
88
+ print("Per-outcome breakdown:")
89
+ outcome_data = {}
90
+ for oi in range(6):
91
+ mask = outcomes == oi
92
+ n = int(mask.sum())
93
+ if n > 0:
94
+ uc = float(unconditionals[mask].mean())
95
+ cc = float(conditionals[mask].mean())
96
+ ob = cc / uc if uc > 0 else 0
97
+ print(f" {outcome_names[oi]:>12}: uncond={uc:.4f} cond={cc:.4f} "
98
+ f"boost={ob:.2f}x (n={n})")
99
+ outcome_data[outcome_names[oi]] = {
100
+ "unconditional": uc, "conditional": cc,
101
+ "boost": ob, "n_positions": n,
102
+ }
103
  print()
 
 
 
 
104
 
105
+ # Per-ply-from-end breakdown
106
+ plies = result["ply"]
107
+ game_lengths = result["game_length"]
108
+ plies_from_end = game_lengths - plies
109
+
110
+ print("Ceiling by distance from game end:")
111
+ distance_data = {}
112
+ for dist in range(1, 21):
113
+ mask = plies_from_end == dist
114
+ n = int(mask.sum())
115
+ if n > 10:
116
+ uc = float(unconditionals[mask].mean())
117
+ cc = float(conditionals[mask].mean())
118
+ bar = "#" * int(cc * 200)
119
+ print(f" {dist:>3} plies from end: uncond={uc:.4f} cond={cc:.4f} {bar}")
120
+ distance_data[dist] = {"unconditional": uc, "conditional": cc, "n": n}
121
+ print()
122
 
123
+ # Model adjusted accuracy
124
  if args.model_accuracy is not None:
125
  ma = args.model_accuracy
126
+ adj_uncond = ma / uncond if uncond > 0 else 0
127
+ adj_cond = ma / cond if cond > 0 else 0
 
 
128
  print(f"Model accuracy: {ma:.4f} ({ma*100:.2f}%)")
129
+ print(f" vs unconditional ceiling: {adj_uncond:.1%} of theoretical max")
130
+ print(f" vs conditional ceiling: {adj_cond:.1%} of theoretical max")
131
+ print()
132
+
133
+ # Save results
134
+ data = {
135
+ "unconditional_ceiling": float(uncond),
136
+ "conditional_ceiling": float(cond),
137
+ "conditioning_boost": float(boost),
138
+ "n_positions": int(result["n_positions"]),
139
+ "n_games": args.n_games,
140
+ "n_rollouts": args.rollouts,
141
+ "sample_rate": args.sample_rate,
142
+ "seed": args.seed,
143
+ "elapsed_seconds": elapsed,
144
+ "per_outcome": outcome_data,
145
+ "per_distance_from_end": {str(k): v for k, v in distance_data.items()},
146
+ }
147
+ if args.model_accuracy is not None:
148
+ data["model_accuracy"] = args.model_accuracy
149
+ data["adjusted_vs_unconditional"] = adj_uncond
150
+ data["adjusted_vs_conditional"] = adj_cond
151
 
152
  with open(output_path, "w") as f:
153
+ json.dump(data, f, indent=2)
154
+ print(f"Saved to {output_path}")
155
 
156
 
157
  if __name__ == "__main__":