Chess_Analyzer / analyzer.py
Fu01978's picture
Rename analyzer (14).py to analyzer.py
f1b847a verified
"""
Chess game analyzer using Stockfish + Expected Points model.
Classification hierarchy (highest priority first):
Book → Brilliant → Great → Miss → Best → Excellent → Good → Inaccuracy → Mistake → Blunder
"""
import chess
import chess.pgn
import chess.engine
import io
import math
import json
import random
import shutil
import os
def _find_stockfish() -> str:
"""Locate stockfish binary on common paths."""
# Try PATH first
found = shutil.which("stockfish")
if found:
return found
for path in [
"/usr/games/stockfish",
"/usr/bin/stockfish",
"/usr/local/bin/stockfish",
"/opt/homebrew/bin/stockfish",
"/opt/homebrew/games/stockfish",
]:
if os.path.isfile(path) and os.access(path, os.X_OK):
return path
return "stockfish" # Let it fail with a clear error
# ── ECO opening book ───────────────────────────────────────────────────────────
def _load_eco_db() -> dict:
"""
Load ECO JSON files (ecoA-D.json) from an eco/ folder next to this file.
Keys are FEN strings; values contain name, eco code, and moves.
Returns an empty dict gracefully if the folder or files are missing.
"""
db: dict = {}
eco_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "eco")
if not os.path.isdir(eco_dir):
return db
for letter in "ABCD":
path = os.path.join(eco_dir, f"eco{letter}.json")
if not os.path.isfile(path):
continue
try:
with open(path, encoding="utf-8") as f:
data = json.load(f)
db.update(data)
except Exception:
pass
return db
ECO_DB: dict = _load_eco_db()
def lookup_book(fen_after: str) -> dict | None:
"""Return opening entry if position is in ECO db, else None."""
return ECO_DB.get(fen_after)
# ── Expected Points ────────────────────────────────────────────────────────────
def cp_to_ep(cp: float, k: float = 0.4) -> float:
"""Convert centipawn score to Expected Points from White's perspective."""
return 1.0 / (1.0 + math.exp(-k * (cp / 100.0)))
def score_to_ep_white(score: chess.engine.Score) -> float:
"""Convert a PovScore (already in White's POV) to EP."""
if score.is_mate():
m = score.mate()
return 1.0 if (m is not None and m > 0) else 0.0
cp = score.score()
return cp_to_ep(cp if cp is not None else 0)
def get_ep_white(info: dict) -> float:
return score_to_ep_white(info["score"].white())
# ── Sacrifice detection ────────────────────────────────────────────────────────
PIECE_VALUES = {
chess.PAWN: 1, chess.KNIGHT: 3, chess.BISHOP: 3,
chess.ROOK: 5, chess.QUEEN: 9, chess.KING: 99
}
def _see(board: chess.Board, square: int, attacker: chess.Color, target_val: int) -> int:
"""
Recursive Static Exchange Evaluation.
Returns the net material gain for `attacker` if they initiate a capture
sequence on `square`, where `target_val` is the value of the piece there.
Positive = attacker profits, 0 = even trade, negative = attacker loses.
"""
attackers = board.attackers(attacker, square)
if not attackers:
return 0
# Always capture with the least valuable piece first
best_sq = min(
(sq for sq in attackers if board.piece_at(sq)),
key=lambda sq: PIECE_VALUES.get(board.piece_at(sq).piece_type, 99),
default=None,
)
if best_sq is None:
return 0
capturing_piece = board.piece_at(best_sq)
capturing_val = PIECE_VALUES.get(capturing_piece.piece_type, 0)
# Simulate the capture by manually updating a board copy
b2 = board.copy()
b2.remove_piece_at(best_sq)
b2.set_piece_at(square, chess.Piece(capturing_piece.piece_type, attacker))
# The other side may now recapture — they'll only do so if it's profitable
recapture_gain = _see(b2, square, not attacker, capturing_val)
# Our gain: captured target_val, but may lose capturing_val if opponent recaptures
return target_val - max(0, recapture_gain)
def _is_hanging(board: chess.Board, square: int, our_color: chess.Color) -> bool:
"""
Returns True if the piece on `square` is en prise for the opponent —
i.e. the opponent comes out ahead if they capture it (SEE > 0).
"""
piece = board.piece_at(square)
if piece is None or piece.color != our_color:
return False
piece_val = PIECE_VALUES.get(piece.piece_type, 0)
if piece_val < 3:
return False # Ignore pawns and kings
opponent = not our_color
return _see(board, square, opponent, piece_val) > 0
def check_sacrifice(board: chess.Board, move: chess.Move) -> bool:
"""
Returns True if this move involves a genuine piece sacrifice — either:
(a) The moved piece itself lands en prise (SEE favours the opponent), OR
(b) A friendly piece is newly left hanging after the move — e.g. a queen
that was shielded by the moving piece (pin scenario) is now exposed.
A knight defended by a pawn attacked by a bishop is NOT a sacrifice:
Bxn, pxB = even trade → SEE = 0 → not flagged.
"""
our_color = board.turn
piece = board.piece_at(move.from_square)
if piece is None:
return False
piece_val = PIECE_VALUES.get(piece.piece_type, 0)
# ── (a) Moved-piece sacrifice ──────────────────────────────────────────
if piece_val >= 3:
# What we already captured by making this move (0 if not a capture)
captured = board.piece_at(move.to_square)
captured_val = PIECE_VALUES.get(captured.piece_type, 0) if captured else 0
board_after = board.copy()
board_after.push(move)
opponent = board_after.turn
# The opponent's net gain = what they take from us minus what we already took.
# e.g. Qxd8 Rxd8: opponent SEE=9, but we already took 9 → net gain=0 → not a sac.
# e.g. Bh6 gxh6: opponent SEE=3, we took nothing → net gain=3 → is a sac.
net_gain_for_opponent = _see(board_after, move.to_square, opponent, piece_val) - captured_val
if net_gain_for_opponent > 0:
return True
# ── (b) Piece-left-behind sacrifice (e.g. walking out of a pin) ───────
# Collect which friendly pieces were already hanging BEFORE the move,
# so we only flag pieces that are *newly* exposed.
hanging_before: set[int] = set()
for sq in board.pieces(chess.QUEEN, our_color) | \
board.pieces(chess.ROOK, our_color) | \
board.pieces(chess.BISHOP, our_color) | \
board.pieces(chess.KNIGHT, our_color):
if sq == move.from_square:
continue # This is the piece we're moving — skip
if _is_hanging(board, sq, our_color):
hanging_before.add(sq)
board_after = board.copy()
board_after.push(move)
for sq in board_after.pieces(chess.QUEEN, our_color) | \
board_after.pieces(chess.ROOK, our_color) | \
board_after.pieces(chess.BISHOP, our_color) | \
board_after.pieces(chess.KNIGHT, our_color):
if sq == move.to_square:
continue # Already handled in (a)
if sq not in hanging_before and _is_hanging(board_after, sq, our_color):
return True # This piece is newly hanging — it's the real sacrifice
# ── (c) Deliberately ignored threat ────────────────────────────────────
# Our piece was already hanging before this move (opponent attacked it last
# turn), and our move neither moved it nor added a defender — we intentionally
# left it en prise to play something more important elsewhere.
# _is_hanging uses SEE so a piece that is now defended returns False.
opponent = board_after.turn
for sq in hanging_before:
if _is_hanging(board_after, sq, our_color):
return True # Still hanging after our move — deliberate sacrifice
return False
# ── Move classification ────────────────────────────────────────────────────────
COMMENTS = {
"Book": [
"A well-known opening move.",
"Theory — this position has been played thousands of times.",
"A mainline opening move.",
],
"Brilliant": [
"A stunning sacrifice that seizes a lasting advantage.",
"Extraordinary piece sacrifice — the position rewards bold play.",
"An unexpected sacrifice with deep positional compensation.",
],
"Great": [
"A critical move that shifts the balance of the game.",
"Finding this move takes real precision — it dramatically improves the position.",
"The only move that keeps things in hand. Well found.",
],
"Miss": [
"A missed opportunity — the opponent's error went unpunished.",
"After the opponent's mistake, this fails to press the advantage.",
"The position offered a winning shot, but it slipped away here.",
],
"Best": [
"The engine's top choice. Precise and principled.",
"Perfect play — the ideal move in this position.",
"Exactly what the position demanded.",
],
"Excellent": [
"A very strong move that keeps the position under control.",
"Nearly best — a fine practical choice.",
"A sharp, high-quality response.",
],
"Good": [
"A solid move that maintains the balance.",
"Reasonable play — the position stays roughly equal.",
"A sensible continuation with no serious drawbacks.",
],
"Inaccuracy": [
"A slight imprecision — a better option was available.",
"Not a serious error, but leaves something on the table.",
"The position allowed for more here.",
],
"Mistake": [
"An error that hands the opponent a meaningful advantage.",
"This weakens the position more than it needed to.",
"A significant misstep — the opponent can now press hard.",
],
"Blunder": [
"A serious blunder that could cost the game.",
"A major error — this dramatically changes the evaluation.",
"Devastating. The position collapses after this move.",
],
}
def _net_exchange(board: 'chess.Board', move: chess.Move, our_color: chess.Color) -> int:
"""
Net material result of this move for our_color, accounting for the full
exchange sequence via SEE. Unlike a raw board snapshot, this correctly
handles trades and recaptures.
Returns:
positive — we come out ahead in material (e.g. win-back tactical combo)
zero — even exchange (true trade)
negative — we lose material net (positional / speculative sacrifice)
Brilliant gate uses this to distinguish:
gain <= 0 → real sacrifice for compensation → Brilliant candidate
1 <= gain <= 2 → "cheap" win of a pawn or two → downgrade to Great
gain >= 3 → significant material-winning combo → Brilliant candidate
"""
piece = board.piece_at(move.from_square)
if piece is None:
return 0
piece_val = PIECE_VALUES.get(piece.piece_type, 0)
# What we capture immediately, if anything
captured = board.piece_at(move.to_square)
captured_val = PIECE_VALUES.get(captured.piece_type, 0) if captured else 0
board_after = board.copy()
board_after.push(move)
opponent = not our_color
# How much does opponent gain by recapturing our piece on to_square?
opp_gain_dest = max(0, _see(board_after, move.to_square, opponent, piece_val))
# Net from the primary exchange on the destination square
dest_net = captured_val - opp_gain_dest
# Also account for any piece newly left hanging (path-b sacrifice —
# e.g. walking a knight out of a pin, exposing the queen).
# We already know check_sacrifice flagged this, so find the newly hanging
# piece and subtract its SEE loss.
hanging_before: set = set()
for sq in (board.pieces(chess.QUEEN, our_color) |
board.pieces(chess.ROOK, our_color) |
board.pieces(chess.BISHOP, our_color) |
board.pieces(chess.KNIGHT, our_color)):
if sq == move.from_square:
continue
if _is_hanging(board, sq, our_color):
hanging_before.add(sq)
newly_hanging_loss = 0
for sq in (board_after.pieces(chess.QUEEN, our_color) |
board_after.pieces(chess.ROOK, our_color) |
board_after.pieces(chess.BISHOP, our_color) |
board_after.pieces(chess.KNIGHT, our_color)):
if sq == move.to_square:
continue
if sq not in hanging_before and _is_hanging(board_after, sq, our_color):
p = board_after.piece_at(sq)
pv = PIECE_VALUES.get(p.piece_type, 0)
newly_hanging_loss += max(0, _see(board_after, sq, opponent, pv))
return dest_net - newly_hanging_loss
def get_comment(classification: str) -> str:
return random.choice(COMMENTS.get(classification, ["—"]))
def classify_move(
player_ep_before: float,
player_ep_after: float,
player_ep_second_best: float | None,
is_best: bool,
move_rank: int | None,
is_sacrifice: bool,
opponent_blunder_swing: float | None,
ep_before_opponent_blunder: float | None,
board_before: 'chess.Board | None' = None,
board_after: 'chess.Board | None' = None,
our_color: 'chess.Color | None' = None,
move: 'chess.Move | None' = None,
) -> str:
ep_loss = player_ep_before - player_ep_after
# ── Brilliant ──────────────────────────────────────────────────────────────
# Best move + piece sacrifice + lands in a good position + significant material swing.
# Two paths to Brilliant:
# 1. Normal: wasn't already near-completely-winning (< 0.97).
# 2. Was already winning but the sacrifice dramatically improves the position
# (ep jump >= 0.15) -- e.g. forcing bishop sac into a mating attack.
# Downgrade to Great if the material net gain is <= 2 pawns: a tiny material
# pickup doesn't justify Brilliant even if every other condition is met.
if is_best and is_sacrifice:
ep_jump = player_ep_after - player_ep_before
already_winning = player_ep_before >= 0.97
lands_well = player_ep_after >= 0.45 # equal or better after sac is fine
if lands_well and (not already_winning or ep_jump >= 0.15):
# Material gate:
# net < 0 → real material sacrifice (gives up more than gains) → Brilliant
# net 0..2 → equal trade or trivial pickup (e.g. queen swap, +1 pawn) → Great
# net >= 3 → significant material-winning combination → Brilliant
# Trades (net=0) and small wins are explicitly excluded from Brilliant.
if board_before is not None and our_color is not None and move is not None:
net = _net_exchange(board_before, move, our_color)
if 0 <= net <= 2:
return "Great"
return "Brilliant"
# ── Great ──────────────────────────────────────────────────────────────────
if is_best:
was_losing = player_ep_before < 0.45
is_equal = 0.44 <= player_ep_after <= 0.62
is_winning = player_ep_after > 0.55
major_swing = was_losing and (is_equal or is_winning)
only_good = (
player_ep_second_best is not None
and player_ep_before - player_ep_second_best > 0.15
)
if major_swing or only_good:
return "Great"
# ── Miss ───────────────────────────────────────────────────────────────────
if (
opponent_blunder_swing is not None
and opponent_blunder_swing > 0.10
and ep_before_opponent_blunder is not None
and player_ep_after <= ep_before_opponent_blunder + 0.02
):
return "Miss"
# ── EP table ───────────────────────────────────────────────────────────────
ep_loss = max(0.0, ep_loss) # cap negatives (position improved beyond eval)
# is_best is checked first — float rounding can produce a tiny non-zero
# ep_loss even for the engine's top move, so we must not let the threshold
# bands override a confirmed best-move result.
if is_best:
return "Best"
if ep_loss <= 0.02:
return "Excellent"
if ep_loss <= 0.05:
return "Good"
if ep_loss <= 0.10:
return "Inaccuracy"
if ep_loss <= 0.20:
return "Mistake"
return "Blunder"
# ── Continuation formatting ────────────────────────────────────────────────────
def format_continuation(moves_san: list[str], move_number: int, player_color: str) -> str:
"""Format a list of SAN continuation moves with proper move numbers.
After white plays move N → continuation starts with black at N, then white at N+1
After black plays move N → continuation starts with white at N+1
"""
if not moves_san:
return ""
parts: list[str] = []
if player_color == "white":
# Next is black's response at the same move number
next_is_white = False
num = move_number
else:
# Next is white's move, which is move_number + 1
next_is_white = True
num = move_number + 1
for i, san in enumerate(moves_san):
if next_is_white:
parts.append(f"{num}.")
parts.append(san)
next_is_white = False
else:
if i == 0:
# First black continuation move: needs the "N..." prefix
parts.append(f"{num}...")
parts.append(san)
next_is_white = True
num += 1 # After black plays, white's next turn is N+1
return " ".join(parts)
# ── Main analysis entry point ──────────────────────────────────────────────────
def analyze_game(pgn_text: str, depth: int, progress_cb):
"""
Analyze a full PGN game. Calls progress_cb({type, message, progress, [data]}).
"""
pgn_io = io.StringIO(pgn_text)
game = chess.pgn.read_game(pgn_io)
if game is None:
progress_cb({"type": "error", "message": "Could not parse PGN. Please check the notation."})
return
white = game.headers.get("White", "White")
if not white or white == "?": white = "White"
black = game.headers.get("Black", "Black")
if not black or black == "?": black = "Black"
moves_list = list(game.mainline_moves())
total = len(moves_list)
if total == 0:
progress_cb({"type": "error", "message": "The PGN contains no moves."})
return
progress_cb({"type": "progress", "message": "Initializing Stockfish engine…", "progress": 0.0})
sf_path = _find_stockfish()
try:
engine = chess.engine.SimpleEngine.popen_uci(sf_path)
except FileNotFoundError:
progress_cb({"type": "error",
"message": f"Stockfish not found (tried '{sf_path}'). Install Stockfish and ensure it is on your PATH."})
return
try:
# Build board snapshots: boards[i] is the state BEFORE move i
boards: list[chess.Board] = []
b = game.board()
for mv in moves_list:
boards.append(b.copy())
b.push(mv)
boards.append(b.copy()) # final position after all moves
# Analyse every position with MultiPV=5
multipv: list[list[dict]] = []
ep_white: list[float] = []
for i, board_snap in enumerate(boards):
if i < total:
msg = f"Analyzing move {i + 1} of {total}…"
prog = i / total * 0.90
else:
msg = "Finalizing analysis…"
prog = 0.93
progress_cb({"type": "progress", "message": msg, "progress": prog})
infos = engine.analyse(board_snap, chess.engine.Limit(depth=depth), multipv=5)
multipv.append(infos)
ep_white.append(get_ep_white(infos[0]))
# Classify each move
progress_cb({"type": "progress", "message": "Classifying moves…", "progress": 0.96})
results = []
for i, move in enumerate(moves_list):
board_snap = boards[i]
turn = board_snap.turn
color = "white" if turn == chess.WHITE else "black"
move_san = board_snap.san(move)
move_uci = move.uci()
fen_before = board_snap.fen()
fen_after = boards[i + 1].fen()
ep_w_before = ep_white[i]
ep_w_after = ep_white[i + 1]
if turn == chess.WHITE:
player_ep_before = ep_w_before
player_ep_after = ep_w_after
else:
player_ep_before = 1.0 - ep_w_before
player_ep_after = 1.0 - ep_w_after
# Best move
best_move_obj = multipv[i][0]["pv"][0]
best_move_san = board_snap.san(best_move_obj)
best_move_uci = best_move_obj.uci()
is_best = (move.uci() == best_move_uci)
# Second-best EP (for "only good move" detection)
player_ep_second_best: float | None = None
if len(multipv[i]) > 1:
ep_sb_w = get_ep_white(multipv[i][1])
player_ep_second_best = ep_sb_w if turn == chess.WHITE else 1.0 - ep_sb_w
# Rank among top-5
move_rank: int | None = None
for rank, info in enumerate(multipv[i], 1):
if info["pv"][0].uci() == move.uci():
move_rank = rank
break
# Sacrifice?
is_sacrifice = check_sacrifice(board_snap, move)
# Miss detection
opponent_blunder_swing: float | None = None
ep_before_opponent_blunder: float | None = None
if i >= 1:
ep_w_prev = ep_white[i - 1]
if turn == chess.WHITE:
swing = ep_w_before - ep_w_prev
pre_blunder = ep_w_prev
else:
swing = (1.0 - ep_w_before) - (1.0 - ep_w_prev)
pre_blunder = 1.0 - ep_w_prev
if swing > 0.10:
opponent_blunder_swing = swing
ep_before_opponent_blunder = pre_blunder
# Book move check — if the resulting position is in the ECO db,
# classify immediately regardless of EP; opening theory trumps evaluation.
book_entry = lookup_book(fen_after)
if book_entry:
classification = "Book"
opening_name = book_entry.get("name", "")
opening_eco = book_entry.get("eco", "")
else:
opening_name = ""
opening_eco = ""
classification = classify_move(
player_ep_before=player_ep_before,
player_ep_after=player_ep_after,
player_ep_second_best=player_ep_second_best,
is_best=is_best,
move_rank=move_rank,
is_sacrifice=is_sacrifice,
opponent_blunder_swing=opponent_blunder_swing,
ep_before_opponent_blunder=ep_before_opponent_blunder,
board_before=board_snap,
board_after=boards[i + 1],
our_color=turn,
move=move,
)
# Continuation: engine's best line from the position after this move
continuation_san: list[str] = []
if multipv[i + 1] and "pv" in multipv[i + 1][0]:
temp = boards[i + 1].copy()
for cont_mv in multipv[i + 1][0]["pv"][:6]:
try:
continuation_san.append(temp.san(cont_mv))
temp.push(cont_mv)
except Exception:
break
ep_loss = max(0.0, player_ep_before - player_ep_after)
results.append({
"move_number": (i // 2) + 1,
"ply": i,
"color": color,
"san": move_san,
"uci": move_uci,
"from_square": chess.square_name(move.from_square),
"to_square": chess.square_name(move.to_square),
"classification": classification,
"ep_loss": round(ep_loss, 4),
"ep_before": round(player_ep_before, 4),
"ep_after": round(player_ep_after, 4),
"best_move_san": best_move_san if not is_best else None,
"best_move_uci": best_move_uci if not is_best else None,
"continuation": continuation_san,
"continuation_fmt": format_continuation(continuation_san,
(i // 2) + 1, color),
"fen_before": fen_before,
"fen_after": fen_after,
"is_best": is_best,
"comment": get_comment(classification),
"opening_name": opening_name,
"opening_eco": opening_eco,
})
progress_cb({
"type": "complete",
"message": "Analysis complete!",
"progress": 1.0,
"data": {
"white": white,
"black": black,
"initial_fen": game.board().fen(),
"moves": results,
"summary": _compute_summary(results),
}
})
finally:
engine.quit()
# ── Summary stats ──────────────────────────────────────────────────────────────
ALL_CLASSIFICATIONS = [
"Book", "Brilliant", "Great", "Best", "Excellent", "Good",
"Inaccuracy", "Mistake", "Blunder", "Miss",
]
def _compute_summary(moves: list[dict]) -> dict:
"""Return per-player classification counts and accuracy."""
stats = {}
for color in ("white", "black"):
player_moves = [m for m in moves if m["color"] == color]
counts = {cls: 0 for cls in ALL_CLASSIFICATIONS}
for m in player_moves:
cls = m["classification"]
if cls in counts:
counts[cls] += 1
# Accuracy: average fraction of winning chances preserved each move.
# For each move: score = ep_after / max(ep_before, 0.01), clamped 0-1.
# Gives 100% for Best/Brilliant, degrades proportionally with EP loss.
if player_moves:
scores = [
max(0.0, min(1.0, m["ep_after"] / max(m["ep_before"], 0.01)))
for m in player_moves
]
accuracy = round(sum(scores) / len(scores) * 100, 1)
else:
accuracy = 0.0
stats[color] = {"accuracy": accuracy, "counts": counts}
return stats