""" Chess game analyzer using Stockfish + Expected Points model. Classification hierarchy (highest priority first): Book → Brilliant → Great → Miss → Best → Excellent → Good → Inaccuracy → Mistake → Blunder """ import chess import chess.pgn import chess.engine import io import math import json import random import shutil import os def _find_stockfish() -> str: """Locate stockfish binary on common paths.""" # Try PATH first found = shutil.which("stockfish") if found: return found for path in [ "/usr/games/stockfish", "/usr/bin/stockfish", "/usr/local/bin/stockfish", "/opt/homebrew/bin/stockfish", "/opt/homebrew/games/stockfish", ]: if os.path.isfile(path) and os.access(path, os.X_OK): return path return "stockfish" # Let it fail with a clear error # ── ECO opening book ─────────────────────────────────────────────────────────── def _load_eco_db() -> dict: """ Load ECO JSON files (ecoA-D.json) from an eco/ folder next to this file. Keys are FEN strings; values contain name, eco code, and moves. Returns an empty dict gracefully if the folder or files are missing. """ db: dict = {} eco_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "eco") if not os.path.isdir(eco_dir): return db for letter in "ABCD": path = os.path.join(eco_dir, f"eco{letter}.json") if not os.path.isfile(path): continue try: with open(path, encoding="utf-8") as f: data = json.load(f) db.update(data) except Exception: pass return db ECO_DB: dict = _load_eco_db() def lookup_book(fen_after: str) -> dict | None: """Return opening entry if position is in ECO db, else None.""" return ECO_DB.get(fen_after) # ── Expected Points ──────────────────────────────────────────────────────────── def cp_to_ep(cp: float, k: float = 0.4) -> float: """Convert centipawn score to Expected Points from White's perspective.""" return 1.0 / (1.0 + math.exp(-k * (cp / 100.0))) def score_to_ep_white(score: chess.engine.Score) -> float: """Convert a PovScore (already in White's POV) to EP.""" if score.is_mate(): m = score.mate() return 1.0 if (m is not None and m > 0) else 0.0 cp = score.score() return cp_to_ep(cp if cp is not None else 0) def get_ep_white(info: dict) -> float: return score_to_ep_white(info["score"].white()) # ── Sacrifice detection ──────────────────────────────────────────────────────── PIECE_VALUES = { chess.PAWN: 1, chess.KNIGHT: 3, chess.BISHOP: 3, chess.ROOK: 5, chess.QUEEN: 9, chess.KING: 99 } def _see(board: chess.Board, square: int, attacker: chess.Color, target_val: int) -> int: """ Recursive Static Exchange Evaluation. Returns the net material gain for `attacker` if they initiate a capture sequence on `square`, where `target_val` is the value of the piece there. Positive = attacker profits, 0 = even trade, negative = attacker loses. """ attackers = board.attackers(attacker, square) if not attackers: return 0 # Always capture with the least valuable piece first best_sq = min( (sq for sq in attackers if board.piece_at(sq)), key=lambda sq: PIECE_VALUES.get(board.piece_at(sq).piece_type, 99), default=None, ) if best_sq is None: return 0 capturing_piece = board.piece_at(best_sq) capturing_val = PIECE_VALUES.get(capturing_piece.piece_type, 0) # Simulate the capture by manually updating a board copy b2 = board.copy() b2.remove_piece_at(best_sq) b2.set_piece_at(square, chess.Piece(capturing_piece.piece_type, attacker)) # The other side may now recapture — they'll only do so if it's profitable recapture_gain = _see(b2, square, not attacker, capturing_val) # Our gain: captured target_val, but may lose capturing_val if opponent recaptures return target_val - max(0, recapture_gain) def _is_hanging(board: chess.Board, square: int, our_color: chess.Color) -> bool: """ Returns True if the piece on `square` is en prise for the opponent — i.e. the opponent comes out ahead if they capture it (SEE > 0). """ piece = board.piece_at(square) if piece is None or piece.color != our_color: return False piece_val = PIECE_VALUES.get(piece.piece_type, 0) if piece_val < 3: return False # Ignore pawns and kings opponent = not our_color return _see(board, square, opponent, piece_val) > 0 def check_sacrifice(board: chess.Board, move: chess.Move) -> bool: """ Returns True if this move involves a genuine piece sacrifice — either: (a) The moved piece itself lands en prise (SEE favours the opponent), OR (b) A friendly piece is newly left hanging after the move — e.g. a queen that was shielded by the moving piece (pin scenario) is now exposed. A knight defended by a pawn attacked by a bishop is NOT a sacrifice: Bxn, pxB = even trade → SEE = 0 → not flagged. """ our_color = board.turn piece = board.piece_at(move.from_square) if piece is None: return False piece_val = PIECE_VALUES.get(piece.piece_type, 0) # ── (a) Moved-piece sacrifice ────────────────────────────────────────── if piece_val >= 3: # What we already captured by making this move (0 if not a capture) captured = board.piece_at(move.to_square) captured_val = PIECE_VALUES.get(captured.piece_type, 0) if captured else 0 board_after = board.copy() board_after.push(move) opponent = board_after.turn # The opponent's net gain = what they take from us minus what we already took. # e.g. Qxd8 Rxd8: opponent SEE=9, but we already took 9 → net gain=0 → not a sac. # e.g. Bh6 gxh6: opponent SEE=3, we took nothing → net gain=3 → is a sac. net_gain_for_opponent = _see(board_after, move.to_square, opponent, piece_val) - captured_val if net_gain_for_opponent > 0: return True # ── (b) Piece-left-behind sacrifice (e.g. walking out of a pin) ─────── # Collect which friendly pieces were already hanging BEFORE the move, # so we only flag pieces that are *newly* exposed. hanging_before: set[int] = set() for sq in board.pieces(chess.QUEEN, our_color) | \ board.pieces(chess.ROOK, our_color) | \ board.pieces(chess.BISHOP, our_color) | \ board.pieces(chess.KNIGHT, our_color): if sq == move.from_square: continue # This is the piece we're moving — skip if _is_hanging(board, sq, our_color): hanging_before.add(sq) board_after = board.copy() board_after.push(move) for sq in board_after.pieces(chess.QUEEN, our_color) | \ board_after.pieces(chess.ROOK, our_color) | \ board_after.pieces(chess.BISHOP, our_color) | \ board_after.pieces(chess.KNIGHT, our_color): if sq == move.to_square: continue # Already handled in (a) if sq not in hanging_before and _is_hanging(board_after, sq, our_color): return True # This piece is newly hanging — it's the real sacrifice # ── (c) Deliberately ignored threat ──────────────────────────────────── # Our piece was already hanging before this move (opponent attacked it last # turn), and our move neither moved it nor added a defender — we intentionally # left it en prise to play something more important elsewhere. # _is_hanging uses SEE so a piece that is now defended returns False. opponent = board_after.turn for sq in hanging_before: if _is_hanging(board_after, sq, our_color): return True # Still hanging after our move — deliberate sacrifice return False # ── Move classification ──────────────────────────────────────────────────────── COMMENTS = { "Book": [ "A well-known opening move.", "Theory — this position has been played thousands of times.", "A mainline opening move.", ], "Brilliant": [ "A stunning sacrifice that seizes a lasting advantage.", "Extraordinary piece sacrifice — the position rewards bold play.", "An unexpected sacrifice with deep positional compensation.", ], "Great": [ "A critical move that shifts the balance of the game.", "Finding this move takes real precision — it dramatically improves the position.", "The only move that keeps things in hand. Well found.", ], "Miss": [ "A missed opportunity — the opponent's error went unpunished.", "After the opponent's mistake, this fails to press the advantage.", "The position offered a winning shot, but it slipped away here.", ], "Best": [ "The engine's top choice. Precise and principled.", "Perfect play — the ideal move in this position.", "Exactly what the position demanded.", ], "Excellent": [ "A very strong move that keeps the position under control.", "Nearly best — a fine practical choice.", "A sharp, high-quality response.", ], "Good": [ "A solid move that maintains the balance.", "Reasonable play — the position stays roughly equal.", "A sensible continuation with no serious drawbacks.", ], "Inaccuracy": [ "A slight imprecision — a better option was available.", "Not a serious error, but leaves something on the table.", "The position allowed for more here.", ], "Mistake": [ "An error that hands the opponent a meaningful advantage.", "This weakens the position more than it needed to.", "A significant misstep — the opponent can now press hard.", ], "Blunder": [ "A serious blunder that could cost the game.", "A major error — this dramatically changes the evaluation.", "Devastating. The position collapses after this move.", ], } def _net_exchange(board: 'chess.Board', move: chess.Move, our_color: chess.Color) -> int: """ Net material result of this move for our_color, accounting for the full exchange sequence via SEE. Unlike a raw board snapshot, this correctly handles trades and recaptures. Returns: positive — we come out ahead in material (e.g. win-back tactical combo) zero — even exchange (true trade) negative — we lose material net (positional / speculative sacrifice) Brilliant gate uses this to distinguish: gain <= 0 → real sacrifice for compensation → Brilliant candidate 1 <= gain <= 2 → "cheap" win of a pawn or two → downgrade to Great gain >= 3 → significant material-winning combo → Brilliant candidate """ piece = board.piece_at(move.from_square) if piece is None: return 0 piece_val = PIECE_VALUES.get(piece.piece_type, 0) # What we capture immediately, if anything captured = board.piece_at(move.to_square) captured_val = PIECE_VALUES.get(captured.piece_type, 0) if captured else 0 board_after = board.copy() board_after.push(move) opponent = not our_color # How much does opponent gain by recapturing our piece on to_square? opp_gain_dest = max(0, _see(board_after, move.to_square, opponent, piece_val)) # Net from the primary exchange on the destination square dest_net = captured_val - opp_gain_dest # Also account for any piece newly left hanging (path-b sacrifice — # e.g. walking a knight out of a pin, exposing the queen). # We already know check_sacrifice flagged this, so find the newly hanging # piece and subtract its SEE loss. hanging_before: set = set() for sq in (board.pieces(chess.QUEEN, our_color) | board.pieces(chess.ROOK, our_color) | board.pieces(chess.BISHOP, our_color) | board.pieces(chess.KNIGHT, our_color)): if sq == move.from_square: continue if _is_hanging(board, sq, our_color): hanging_before.add(sq) newly_hanging_loss = 0 for sq in (board_after.pieces(chess.QUEEN, our_color) | board_after.pieces(chess.ROOK, our_color) | board_after.pieces(chess.BISHOP, our_color) | board_after.pieces(chess.KNIGHT, our_color)): if sq == move.to_square: continue if sq not in hanging_before and _is_hanging(board_after, sq, our_color): p = board_after.piece_at(sq) pv = PIECE_VALUES.get(p.piece_type, 0) newly_hanging_loss += max(0, _see(board_after, sq, opponent, pv)) return dest_net - newly_hanging_loss def get_comment(classification: str) -> str: return random.choice(COMMENTS.get(classification, ["—"])) def classify_move( player_ep_before: float, player_ep_after: float, player_ep_second_best: float | None, is_best: bool, move_rank: int | None, is_sacrifice: bool, opponent_blunder_swing: float | None, ep_before_opponent_blunder: float | None, board_before: 'chess.Board | None' = None, board_after: 'chess.Board | None' = None, our_color: 'chess.Color | None' = None, move: 'chess.Move | None' = None, ) -> str: ep_loss = player_ep_before - player_ep_after # ── Brilliant ────────────────────────────────────────────────────────────── # Best move + piece sacrifice + lands in a good position + significant material swing. # Two paths to Brilliant: # 1. Normal: wasn't already near-completely-winning (< 0.97). # 2. Was already winning but the sacrifice dramatically improves the position # (ep jump >= 0.15) -- e.g. forcing bishop sac into a mating attack. # Downgrade to Great if the material net gain is <= 2 pawns: a tiny material # pickup doesn't justify Brilliant even if every other condition is met. if is_best and is_sacrifice: ep_jump = player_ep_after - player_ep_before already_winning = player_ep_before >= 0.97 lands_well = player_ep_after >= 0.45 # equal or better after sac is fine if lands_well and (not already_winning or ep_jump >= 0.15): # Material gate: # net < 0 → real material sacrifice (gives up more than gains) → Brilliant # net 0..2 → equal trade or trivial pickup (e.g. queen swap, +1 pawn) → Great # net >= 3 → significant material-winning combination → Brilliant # Trades (net=0) and small wins are explicitly excluded from Brilliant. if board_before is not None and our_color is not None and move is not None: net = _net_exchange(board_before, move, our_color) if 0 <= net <= 2: return "Great" return "Brilliant" # ── Great ────────────────────────────────────────────────────────────────── if is_best: was_losing = player_ep_before < 0.45 is_equal = 0.44 <= player_ep_after <= 0.62 is_winning = player_ep_after > 0.55 major_swing = was_losing and (is_equal or is_winning) only_good = ( player_ep_second_best is not None and player_ep_before - player_ep_second_best > 0.15 ) if major_swing or only_good: return "Great" # ── Miss ─────────────────────────────────────────────────────────────────── if ( opponent_blunder_swing is not None and opponent_blunder_swing > 0.10 and ep_before_opponent_blunder is not None and player_ep_after <= ep_before_opponent_blunder + 0.02 ): return "Miss" # ── EP table ─────────────────────────────────────────────────────────────── ep_loss = max(0.0, ep_loss) # cap negatives (position improved beyond eval) # is_best is checked first — float rounding can produce a tiny non-zero # ep_loss even for the engine's top move, so we must not let the threshold # bands override a confirmed best-move result. if is_best: return "Best" if ep_loss <= 0.02: return "Excellent" if ep_loss <= 0.05: return "Good" if ep_loss <= 0.10: return "Inaccuracy" if ep_loss <= 0.20: return "Mistake" return "Blunder" # ── Continuation formatting ──────────────────────────────────────────────────── def format_continuation(moves_san: list[str], move_number: int, player_color: str) -> str: """Format a list of SAN continuation moves with proper move numbers. After white plays move N → continuation starts with black at N, then white at N+1 After black plays move N → continuation starts with white at N+1 """ if not moves_san: return "" parts: list[str] = [] if player_color == "white": # Next is black's response at the same move number next_is_white = False num = move_number else: # Next is white's move, which is move_number + 1 next_is_white = True num = move_number + 1 for i, san in enumerate(moves_san): if next_is_white: parts.append(f"{num}.") parts.append(san) next_is_white = False else: if i == 0: # First black continuation move: needs the "N..." prefix parts.append(f"{num}...") parts.append(san) next_is_white = True num += 1 # After black plays, white's next turn is N+1 return " ".join(parts) # ── Main analysis entry point ────────────────────────────────────────────────── def analyze_game(pgn_text: str, depth: int, progress_cb): """ Analyze a full PGN game. Calls progress_cb({type, message, progress, [data]}). """ pgn_io = io.StringIO(pgn_text) game = chess.pgn.read_game(pgn_io) if game is None: progress_cb({"type": "error", "message": "Could not parse PGN. Please check the notation."}) return white = game.headers.get("White", "White") if not white or white == "?": white = "White" black = game.headers.get("Black", "Black") if not black or black == "?": black = "Black" moves_list = list(game.mainline_moves()) total = len(moves_list) if total == 0: progress_cb({"type": "error", "message": "The PGN contains no moves."}) return progress_cb({"type": "progress", "message": "Initializing Stockfish engine…", "progress": 0.0}) sf_path = _find_stockfish() try: engine = chess.engine.SimpleEngine.popen_uci(sf_path) except FileNotFoundError: progress_cb({"type": "error", "message": f"Stockfish not found (tried '{sf_path}'). Install Stockfish and ensure it is on your PATH."}) return try: # Build board snapshots: boards[i] is the state BEFORE move i boards: list[chess.Board] = [] b = game.board() for mv in moves_list: boards.append(b.copy()) b.push(mv) boards.append(b.copy()) # final position after all moves # Analyse every position with MultiPV=5 multipv: list[list[dict]] = [] ep_white: list[float] = [] for i, board_snap in enumerate(boards): if i < total: msg = f"Analyzing move {i + 1} of {total}…" prog = i / total * 0.90 else: msg = "Finalizing analysis…" prog = 0.93 progress_cb({"type": "progress", "message": msg, "progress": prog}) infos = engine.analyse(board_snap, chess.engine.Limit(depth=depth), multipv=5) multipv.append(infos) ep_white.append(get_ep_white(infos[0])) # Classify each move progress_cb({"type": "progress", "message": "Classifying moves…", "progress": 0.96}) results = [] for i, move in enumerate(moves_list): board_snap = boards[i] turn = board_snap.turn color = "white" if turn == chess.WHITE else "black" move_san = board_snap.san(move) move_uci = move.uci() fen_before = board_snap.fen() fen_after = boards[i + 1].fen() ep_w_before = ep_white[i] ep_w_after = ep_white[i + 1] if turn == chess.WHITE: player_ep_before = ep_w_before player_ep_after = ep_w_after else: player_ep_before = 1.0 - ep_w_before player_ep_after = 1.0 - ep_w_after # Best move best_move_obj = multipv[i][0]["pv"][0] best_move_san = board_snap.san(best_move_obj) best_move_uci = best_move_obj.uci() is_best = (move.uci() == best_move_uci) # Second-best EP (for "only good move" detection) player_ep_second_best: float | None = None if len(multipv[i]) > 1: ep_sb_w = get_ep_white(multipv[i][1]) player_ep_second_best = ep_sb_w if turn == chess.WHITE else 1.0 - ep_sb_w # Rank among top-5 move_rank: int | None = None for rank, info in enumerate(multipv[i], 1): if info["pv"][0].uci() == move.uci(): move_rank = rank break # Sacrifice? is_sacrifice = check_sacrifice(board_snap, move) # Miss detection opponent_blunder_swing: float | None = None ep_before_opponent_blunder: float | None = None if i >= 1: ep_w_prev = ep_white[i - 1] if turn == chess.WHITE: swing = ep_w_before - ep_w_prev pre_blunder = ep_w_prev else: swing = (1.0 - ep_w_before) - (1.0 - ep_w_prev) pre_blunder = 1.0 - ep_w_prev if swing > 0.10: opponent_blunder_swing = swing ep_before_opponent_blunder = pre_blunder # Book move check — if the resulting position is in the ECO db, # classify immediately regardless of EP; opening theory trumps evaluation. book_entry = lookup_book(fen_after) if book_entry: classification = "Book" opening_name = book_entry.get("name", "") opening_eco = book_entry.get("eco", "") else: opening_name = "" opening_eco = "" classification = classify_move( player_ep_before=player_ep_before, player_ep_after=player_ep_after, player_ep_second_best=player_ep_second_best, is_best=is_best, move_rank=move_rank, is_sacrifice=is_sacrifice, opponent_blunder_swing=opponent_blunder_swing, ep_before_opponent_blunder=ep_before_opponent_blunder, board_before=board_snap, board_after=boards[i + 1], our_color=turn, move=move, ) # Continuation: engine's best line from the position after this move continuation_san: list[str] = [] if multipv[i + 1] and "pv" in multipv[i + 1][0]: temp = boards[i + 1].copy() for cont_mv in multipv[i + 1][0]["pv"][:6]: try: continuation_san.append(temp.san(cont_mv)) temp.push(cont_mv) except Exception: break ep_loss = max(0.0, player_ep_before - player_ep_after) results.append({ "move_number": (i // 2) + 1, "ply": i, "color": color, "san": move_san, "uci": move_uci, "from_square": chess.square_name(move.from_square), "to_square": chess.square_name(move.to_square), "classification": classification, "ep_loss": round(ep_loss, 4), "ep_before": round(player_ep_before, 4), "ep_after": round(player_ep_after, 4), "best_move_san": best_move_san if not is_best else None, "best_move_uci": best_move_uci if not is_best else None, "continuation": continuation_san, "continuation_fmt": format_continuation(continuation_san, (i // 2) + 1, color), "fen_before": fen_before, "fen_after": fen_after, "is_best": is_best, "comment": get_comment(classification), "opening_name": opening_name, "opening_eco": opening_eco, }) progress_cb({ "type": "complete", "message": "Analysis complete!", "progress": 1.0, "data": { "white": white, "black": black, "initial_fen": game.board().fen(), "moves": results, "summary": _compute_summary(results), } }) finally: engine.quit() # ── Summary stats ────────────────────────────────────────────────────────────── ALL_CLASSIFICATIONS = [ "Book", "Brilliant", "Great", "Best", "Excellent", "Good", "Inaccuracy", "Mistake", "Blunder", "Miss", ] def _compute_summary(moves: list[dict]) -> dict: """Return per-player classification counts and accuracy.""" stats = {} for color in ("white", "black"): player_moves = [m for m in moves if m["color"] == color] counts = {cls: 0 for cls in ALL_CLASSIFICATIONS} for m in player_moves: cls = m["classification"] if cls in counts: counts[cls] += 1 # Accuracy: average fraction of winning chances preserved each move. # For each move: score = ep_after / max(ep_before, 0.01), clamped 0-1. # Gives 100% for Best/Brilliant, degrades proportionally with EP loss. if player_moves: scores = [ max(0.0, min(1.0, m["ep_after"] / max(m["ep_before"], 0.01))) for m in player_moves ] accuracy = round(sum(scores) / len(scores) * 100, 1) else: accuracy = 0.0 stats[color] = {"accuracy": accuracy, "counts": counts} return stats