import os import io import re import math import time import random import traceback from io import StringIO import gradio as gr import numpy as np import pandas as pd from joblib import dump, load from sklearn.ensemble import RandomForestClassifier import chess, chess.pgn, chess.engine APP_TITLE = "DecodeChess‑IA — Doctor Linux" ENGINE_CANDIDATES = ["stockfish", "/usr/bin/stockfish", "/usr/games/stockfish"] # ============================== # Utilidades de Motor # ============================== def load_engine(): last = None for p in ENGINE_CANDIDATES: try: eng = chess.engine.SimpleEngine.popen_uci(p) return eng except Exception as e: last = e raise RuntimeError(f"No pude iniciar Stockfish. Último error: {last}") def score_cp(score: chess.engine.PovScore) -> float: """Devuelve centipawns desde la perspectiva de las blancas (+ = blancas mejor).""" if score.is_mate(): m = score.white().mate() if m is None: return 0.0 return 100000.0 if m > 0 else -100000.0 return float(score.white().score(mate_score=100000)) # ============================== # Reparador PGN simple # ============================== def repair_pgn_text(text: str) -> str: text = text.replace("\r\n", "\n").replace("\r", "\n") # Si no hay headers mínimos, agrégalos if "[Event " not in text: head = ['[Event "?"]','[Site "?"]','[Date "????.??.??"]','[Round "?"]','[White "?"]','[Black "?"]','[Result "*"]','',''] text = "\n".join(head) + text # Arreglos menores text = re.sub(r'\[Ulnite', '[White', text) text = text.replace('[Result "I-0"]','[Result "1-0"]').replace('[Result "O-I"]','[Result "0-1"]').replace('[Result "I/2-I/2"]','[Result "1/2-1/2"]') return text.strip() + "\n" # ============================== # Features de posición # ============================== PIECE_VALUES = {chess.PAWN:1, chess.KNIGHT:3, chess.BISHOP:3, chess.ROOK:5, chess.QUEEN:9} CENTER = [chess.D4, chess.E4, chess.D5, chess.E5] def material_eval(board: chess.Board, color=True): s = 0 for pt, v in PIECE_VALUES.items(): s += len(board.pieces(pt, chess.WHITE))*v s -= len(board.pieces(pt, chess.BLACK))*v return s if color else -s def mobility(board: chess.Board): return board.legal_moves.count() def king_safety(board: chess.Board, color=chess.WHITE): king_sq = board.king(color) if king_sq is None: return 0 # casillas alrededor del rey libres del enemigo danger = 0 for sq in chess.SquareSet.ring(king_sq): if board.is_attacked_by(not color, sq): danger += 1 return -danger def hanging_pieces(board: chess.Board, color=chess.WHITE): # piezas atacadas y no defendidas count = 0 for sq in chess.SquareSet(board.occupied_co[color]): if board.is_attacked_by(not color, sq) and not board.is_attacked_by(color, sq): count += 1 return count def basic_features(board: chess.Board): turn_white = board.turn feats = { "turn_white": 1 if turn_white else 0, "mat_cp": material_eval(board), # + = blancas "mobility": mobility(board), "king_safety_w": king_safety(board, chess.WHITE), "king_safety_b": king_safety(board, chess.BLACK), "hanging_w": hanging_pieces(board, chess.WHITE), "hanging_b": hanging_pieces(board, chess.BLACK), "center_pawns": int(any(board.piece_at(sq) and board.piece_at(sq).piece_type==chess.PAWN for sq in CENTER)), "in_check": 1 if board.is_check() else 0, "phase": len(board.move_stack) # ply actual } return feats FEATURE_ORDER = ["turn_white","mat_cp","mobility","king_safety_w","king_safety_b","hanging_w","hanging_b","center_pawns","in_check","phase","eval_before_cp"] # ============================== # Etiquetado de categorías por delta de evaluación # ============================== def delta_to_label(delta_cp: float) -> str: # delta_cp = eval_after(POV side-to-move) - eval_before(POV side-to-move) drop = -delta_cp # caída (negativo = peor) if drop < 20: return "Best" if drop < 60: return "Good" if drop < 120: return "Inaccuracy" if drop < 300: return "Mistake" return "Blunder" # ============================== # Generación de dataset (entrenamiento rápido on‑startup) # ============================== def generate_training(engine, games=20, plies_per_game=50, time_per=0.1): rows = [] for g in range(games): board = chess.Board() for _ in range(plies_per_game): if board.is_game_over(): break # Eval antes info_before = engine.analyse(board, chess.engine.Limit(time=time_per)) eval_before = score_cp(info_before["score"].pov(chess.WHITE)) # mejor jugada (para usarla como "buena") pv = info_before.get("pv") if pv: best = pv[0] else: # fallback: escoger la primera legal legal = list(board.legal_moves) if not legal: break best = legal[0] # jugar movimiento aleatorio razonable (50% best, 50% otro random) para variar etiquetas legal = list(board.legal_moves) if not legal: break if random.random() < 0.5: move = best else: move = random.choice(legal) before_board = board.copy() before_feats = basic_features(before_board) before_feats["eval_before_cp"] = eval_before san_played = board.san(move) if move else None board.push(move) info_after = engine.analyse(board, chess.engine.Limit(time=time_per)) eval_after = score_cp(info_after["score"].pov(chess.WHITE)) # delta desde POV del bando que movía if before_board.turn: # blancas movían delta_cp = eval_after - eval_before else: # negras movían delta_cp = -(eval_after - eval_before) label = delta_to_label(delta_cp) row = {k: before_feats[k] for k in FEATURE_ORDER if k in before_feats} row["eval_before_cp"] = before_feats["eval_before_cp"] row["delta_cp"] = delta_cp row["label"] = label row["played_san"] = san_played rows.append(row) df = pd.DataFrame(rows) return df def train_model_if_needed(): model_path = "model_rf.joblib" if os.path.exists(model_path): try: clf = load(model_path) return clf except Exception: pass # Entrenamiento rápido eng = load_engine() try: df = generate_training(eng, games=24, plies_per_game=40, time_per=0.06) finally: eng.quit() # Balance simple X = df[FEATURE_ORDER].astype(float).values y = df["label"].values clf = RandomForestClassifier(n_estimators=140, max_depth=None, random_state=42, n_jobs=-1, class_weight="balanced") clf.fit(X, y) dump(clf, model_path) return clf # ============================== # Explicaciones naturales (plantillas simples) # ============================== def explain(label: str, delta_cp: float, in_check: int, hanging_w: int, hanging_b: int) -> str: tips = [] if label == "Blunder": tips.append("Grave caída de evaluación; probablemente táctica inmediata o pieza colgando.") elif label == "Mistake": tips.append("Movimiento impreciso que cede ventaja significativa.") elif label == "Inaccuracy": tips.append("Había alternativas más fuertes según el motor.") elif label == "Good": tips.append("Jugada razonable; mantiene la evaluación.") else: tips.append("Excelente jugada; coincide con la mejor línea del motor.") if in_check: tips.append("El rey estaba en jaque o vulnerable.") if hanging_w or hanging_b: tips.append("Detecto pieza(s) atacadas sin defensa; revisa táctica.") tips.append(f"Δ={round(delta_cp,1)} cp.") return " ".join(tips) # ============================== # Análisis de una partida PGN # ============================== def analyze_pgn(pgn_text: str, time_per_move=0.2, depth_limit=0): pgn_text = repair_pgn_text(pgn_text) f = StringIO(pgn_text) # Tomar la primera partida válida con movimientos game = chess.pgn.read_game(f) while game is not None and sum(1 for _ in game.mainline_moves()) == 0: game = chess.pgn.read_game(f) if game is None: return None, None, "No se encontró una partida válida." engine = load_engine() try: # Cargar modelo (entrenar si no existe) clf = train_model_if_needed() board = game.board() rows = [] # Exporter para comentarios exporter = chess.pgn.StringExporter(headers=True, comments=True, variations=False) node = game ply = 0 while node.variations: move = node.variation(0).move turn_white = board.turn # eval antes info_b = engine.analyse(board, chess.engine.Limit(time=time_per_move, depth=None if depth_limit<=0 else depth_limit)) eval_b = score_cp(info_b["score"].pov(chess.WHITE)) feats = basic_features(board) feats["eval_before_cp"] = eval_b X = np.array([[feats.get(k,0.0) for k in FEATURE_ORDER]], dtype=float) pred = clf.predict(X)[0] # mejor SAN propuesto (primera PV) best_move = info_b.get("pv", [move])[0] best_san = board.san(best_move) if best_move else None played_san = board.san(move) board.push(move) info_a = engine.analyse(board, chess.engine.Limit(time=time_per_move, depth=None if depth_limit<=0 else depth_limit)) eval_a = score_cp(info_a["score"].pov(chess.WHITE)) if turn_white: delta = eval_a - eval_b else: delta = -(eval_a - eval_b) text = explain(pred, delta, feats["in_check"], feats["hanging_w"], feats["hanging_b"]) # Añadir comentario al nodo siguiente node = node.variation(0) node.comment = (node.comment + " " if node.comment else "") + f"[{pred}] Δ={round(delta,1)} | Mejor: {best_san}. {text}" ply += 1 rows.append({ "ply": ply, "turn": "White" if turn_white else "Black", "played": played_san, "best": best_san, "eval_before_cp": round(eval_b,1), "eval_after_cp": round(eval_a,1), "delta_cp": round(delta,1), "ml_label": pred, "explanation": text }) annotated = game.accept(exporter) finally: engine.quit() # Resumen rápido worst = sorted(rows, key=lambda r: r["delta_cp"])[:10] md = ["### Principales caídas de evaluación", "| Ply | Turno | Jugada | Mejor | Δcp | ML |", "|---:|:---:|:---|:---|---:|:---|"] for r in worst: md.append(f"| {r['ply']} | {r['turn']} | {r['played']} | {r.get('best','')} | {r['delta_cp']} | {r['ml_label']} |") summary_md = "\n".join(md) df = pd.DataFrame(rows) return annotated, summary_md, df.to_csv(index=False) # ============================== # Interfaz Gradio # ============================== with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(f"# {APP_TITLE}\n**Motor + Machine Learning**\n1) Entrenamiento rápido en el arranque (RandomForest).\n2) Analiza tu PGN y comenta jugada por jugada (estilo DecodeChess).\n3) Descarga el PGN anotado y el CSV de jugadas.") eg = gr.Examples( examples=[ ["[Event \"Ejemplo\"]\n[Site \"?\"]\n[Date \"2024.??.??\"]\n[Round \"?\"]\n[White \"Alice\"]\n[Black \"Bob\"]\n[Result \"1-0\"]\n\n1. e4 e5 2. Nf3 Nc6 3. Bb5 a6 4. Ba4 Nf6 5. O-O Be7 6. Re1 b5 7. Bb3 O-O 8. c3 d5 9. exd5 Nxd5 10. Nxe5 Nxe5 11. Rxe5 c6 12. d4 Bd6 13. Re1 Qh4 14. g3 Qh3 15. Qf3 Bg4 16. Qg2 Rae8 17. Be3 Qh5 18. Nd2 Bh3 19. Qf3 Bg4 20. Qg2 f5 21. Bxd5+ cxd5 22. Qxd5+ Kh8 23. Qxd6 f4 24. Bxf4 Bf3 25. Rxe8 Rxe8 26. Be5 Qh3 27. Nxf3 Qf5 28. Qc6 Rf8 29. Qb7 Qg6 30. Nh4 Qc2 31. Qxg7# 1-0"], inputs=[gr.Textbox(visible=False)] # solo para pre-cargar ) pgn_in = gr.Textbox(lines=18, label="PGN (pega tu partida aquí)", placeholder="Pega aquí tu PGN...") with gr.Row(): tpm = gr.Slider(0.05, 0.5, value=0.2, step=0.05, label="Tiempo de análisis por jugada (seg)") depth = gr.Slider(0, 30, value=0, step=1, label="Profundidad máxima (0 = solo por tiempo)") run = gr.Button("Analizar con IA") annotated_out = gr.Textbox(lines=12, label="PGN anotado por IA") summary_out = gr.Markdown(label="Resumen IA") files_out = gr.Files(label="Descargas (PGN + CSV)") def _analyze(pgn_text, t, d): try: annotated, summary_md, csv_txt = analyze_pgn(pgn_text, time_per_move=t, depth_limit=d) files = [] if annotated: open("anotado.pgn","w",encoding="utf-8").write(annotated) files.append("anotado.pgn") if csv_txt: open("jugadas.csv","w",encoding="utf-8").write(csv_txt) files.append("jugadas.csv") return annotated, summary_md, files except Exception as e: tb = traceback.format_exc(limit=2) return f"Error: {e}\n{tb}", "","" run.click(_analyze, inputs=[pgn_in, tpm, depth], outputs=[annotated_out, summary_out, files_out]) if __name__ == "__main__": # Entrenar en arranque (si no existe) try: _ = train_model_if_needed() except Exception as e: print("⚠️ Entrenamiento en arranque falló:", e) demo.launch()