Spaces:
Sleeping
Sleeping
| from pathlib import Path | |
| from typing import Optional | |
| import chess | |
| import chess.engine | |
| import chess.svg | |
| import chess.pgn | |
| from src.llm.sambanova_wrapper import SambaNovaWrapper | |
| from src.util.pgn_util import add_variation, format_pv | |
| class ThinkSquareEngine: | |
| _ENGINE = str(Path("bin/stockfish").resolve()) | |
| llm_commentator = SambaNovaWrapper() | |
| def get_best_move(fen: Optional[str] = None, time_limit=0.1): | |
| if fen is None: | |
| fen = chess.STARTING_FEN | |
| board = chess.Board(fen) | |
| with chess.engine.SimpleEngine.popen_uci(ThinkSquareEngine._ENGINE) as engine: | |
| result = engine.play(board, chess.engine.Limit(time=time_limit)) | |
| best_move = result.move | |
| bestmove_san = board.san(best_move) | |
| return bestmove_san | |
| def get_engine_analysis(board, analysis_time=0.1): | |
| with chess.engine.SimpleEngine.popen_uci(ThinkSquareEngine._ENGINE) as engine: | |
| pre_info = engine.analyse(board, chess.engine.Limit(time=analysis_time)) | |
| return pre_info | |
| def _perform_post_analysis_and_add_comment( | |
| analysis_time, | |
| board, | |
| played_node, | |
| pre_eval, | |
| engine_best_move_san, | |
| pv, | |
| ): | |
| post_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) | |
| post_eval = post_info["score"].white().score(mate_score=100000) | |
| # Evaluation drop | |
| eval_drop = ( | |
| (pre_eval - post_eval) | |
| if pre_eval is not None and post_eval is not None | |
| else 0 | |
| ) | |
| # Classification | |
| if eval_drop > 200: | |
| label = "Blunder" | |
| elif eval_drop > 100: | |
| label = "Mistake" | |
| elif eval_drop > 50: | |
| label = "Inaccuracy" | |
| elif eval_drop < -150: | |
| label = "Brilliant" | |
| elif eval_drop < -60: | |
| label = "Very Good" | |
| elif abs(eval_drop) <= 30: | |
| label = None | |
| else: | |
| label = None # "Good" | |
| if post_eval is not None: | |
| if post_eval > 200: | |
| overall_situation = "White is better" | |
| elif post_eval > 100: | |
| overall_situation = "White has a slight advantage" | |
| elif post_eval < -200: | |
| overall_situation = "Black is better" | |
| elif post_eval < -100: | |
| overall_situation = "Black has a slight advantage" | |
| else: | |
| overall_situation = "No side has a significant advantage" | |
| else: | |
| overall_situation = None | |
| node_reference = None | |
| _comment = None | |
| variation = None | |
| variation_san = None | |
| post_eval_score = post_eval | |
| if label is not None: | |
| comment = f"{label}. " | |
| node_reference = played_node | |
| _comment = comment | |
| if eval_drop > 0 and engine_best_move_san is not None: | |
| comment += f"Better was {engine_best_move_san} " | |
| _comment = comment | |
| # played_node.comment = comment | |
| if pv is not None: | |
| # add_variation(played_node.parent, pv) | |
| variation = pv | |
| variation_san = format_pv(pv, played_node.parent.board()) | |
| else: | |
| # played_node.comment = comment | |
| _comment = comment | |
| if overall_situation is not None: | |
| if _comment is not None: | |
| _comment += f"\n Overall, {overall_situation}." | |
| else: | |
| _comment = f"Overall, {overall_situation}." | |
| return node_reference, _comment, variation, variation_san, post_eval_score | |
| def annotate(game, analysis_time: float = 0.1, llm_character: Optional[str] = None): | |
| if not isinstance(game, chess.pgn.Game): | |
| raise ValueError("Input must be a chess.pgn.Game object") | |
| if not game.variations: | |
| raise ValueError("Game must have at least one variation") | |
| if analysis_time <= 0: | |
| raise ValueError("Analysis time must be greater than 0") | |
| node = game | |
| comment_refs = [] | |
| node_refs = [] | |
| comments = [] | |
| variations = [] | |
| variation_sans = [] | |
| move_numbers = [] | |
| played_moves = [] | |
| played_by = [] | |
| pre_eval_scores = [] | |
| post_eval_scores = [] | |
| while node.variations: | |
| board = node.board() | |
| played_node = node.variation(0) | |
| played_move = played_node.move | |
| # Get engine's best move BEFORE the actual move | |
| pre_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) | |
| pre_eval = pre_info["score"].white().score(mate_score=100000) | |
| # Best move suggestion | |
| engine_best_move = pre_info.get("pv", [None])[0] | |
| engine_best_move_san = ( | |
| board.san(engine_best_move) if engine_best_move else None | |
| ) | |
| # Get principal variation (PV) | |
| pv = pre_info.get("pv", []) | |
| # Make the played move and get new evaluation | |
| played_move_san = board.san(played_move) if played_move else None | |
| board.push(played_move) | |
| if played_move_san != engine_best_move_san: | |
| node_referece, _comment, variation, variation_san, post_eval_score = ( | |
| ThinkSquareEngine._perform_post_analysis_and_add_comment( | |
| analysis_time, | |
| board, | |
| played_node, | |
| pre_eval, | |
| engine_best_move_san, | |
| pv, | |
| ) | |
| ) | |
| else: | |
| node_referece = played_node | |
| _comment = "Best move played." | |
| variation = None | |
| variation_san = None | |
| post_info = ThinkSquareEngine.get_engine_analysis(board, analysis_time) | |
| post_eval = post_info["score"].white().score(mate_score=100000) | |
| post_eval_score = post_eval | |
| if node_referece is not None: | |
| node_refs.append(node_referece) | |
| comments.append(_comment) | |
| variations.append(variation) | |
| variation_sans.append(variation_san) | |
| move_numbers.append(played_node.parent.board().fullmove_number) | |
| played_moves.append(played_move_san) | |
| played_by.append( | |
| "white" if played_node.parent.board().turn else "black" | |
| ) | |
| pre_eval_scores.append(pre_eval) | |
| post_eval_scores.append(post_eval_score) | |
| comment_refs.append(len(comment_refs) + 1) | |
| node = played_node | |
| if llm_character is not None: | |
| formatted_comments = ThinkSquareEngine.llm_commentator.comment( | |
| character=llm_character, | |
| game=str(game), | |
| comment_refs=comment_refs, | |
| move_nums=move_numbers, | |
| comments=comments, | |
| move_suggestions=variation_sans, | |
| played_moves=played_moves, | |
| played_by=played_by, | |
| pre_eval_scores=pre_eval_scores, | |
| post_eval_scores=post_eval_scores, | |
| ) | |
| for comment_with_move_num in formatted_comments["comments"]: | |
| comment_ref = comment_with_move_num["comment_ref"] | |
| comment = comment_with_move_num["comment"] | |
| if comment_ref not in comment_refs: | |
| raise ValueError( | |
| f"Comment reference {comment_ref} not found in comment_refs." | |
| ) | |
| index = comment_refs.index(comment_ref) | |
| comments[index] = comment | |
| for node_ref, comment, variation in zip(node_refs, comments, variations): | |
| if node_ref is None: | |
| continue | |
| node_ref.comment = comment | |
| if variation is not None: | |
| add_variation(node_ref.parent, variation) | |
| return game | |
| def is_valid_move( | |
| move_san: str, | |
| fen: Optional[str] = None, | |
| ) -> bool: | |
| if fen is None: | |
| fen = chess.STARTING_FEN | |
| board = chess.Board(fen) | |
| try: | |
| move = board.parse_san(move_san) | |
| return board.is_legal(move) | |
| except ValueError: | |
| return False | |
| def get_fen_after_move( | |
| move_san: str, | |
| fen: Optional[str] = None, | |
| ) -> Optional[str]: | |
| if fen is None: | |
| fen = chess.STARTING_FEN | |
| board = chess.Board(fen) | |
| try: | |
| move = board.parse_san(move_san) | |
| if board.is_legal(move): | |
| board.push(move) | |
| return board.fen() | |
| else: | |
| return None | |
| except ValueError: | |
| return None | |
| def render_board_ascii(fen: Optional[str] = None) -> str: | |
| if fen is None: | |
| fen = chess.STARTING_FEN | |
| board = chess.Board(fen) | |
| orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK | |
| ascii_board = str(board).split("\n") | |
| if orientation == chess.BLACK: | |
| # Flip both vertically and horizontally | |
| ascii_board = [row[::-1] for row in ascii_board[::-1]] | |
| return "\n".join(ascii_board) | |
| def render_board_svg(fen: Optional[str] = None): | |
| if fen is None: | |
| fen = chess.STARTING_FEN | |
| board = chess.Board(fen) | |
| orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK | |
| svg = chess.svg.board( | |
| board=board, orientation=orientation, size=400, coordinates=True | |
| ) | |
| return svg | |
| def render_board_unicode(fen: Optional[str] = None) -> str: | |
| if fen is None: | |
| fen = chess.STARTING_FEN | |
| board = chess.Board(fen) | |
| orientation = chess.WHITE if board.turn == chess.WHITE else chess.BLACK | |
| unicode_representation = board.unicode( | |
| invert_color=False, borders=True, empty_square=".", orientation=orientation | |
| ) | |
| return unicode_representation | |