Spaces:
Configuration error
Configuration error
| import os | |
| import io | |
| import re | |
| import math | |
| import time | |
| import random | |
| import traceback | |
| from io import StringIO | |
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| from joblib import dump, load | |
| from sklearn.ensemble import RandomForestClassifier | |
| import chess, chess.pgn, chess.engine | |
| APP_TITLE = "DecodeChess‑IA — Doctor Linux" | |
| ENGINE_CANDIDATES = ["stockfish", "/usr/bin/stockfish", "/usr/games/stockfish"] | |
| # ============================== | |
| # Utilidades de Motor | |
| # ============================== | |
| def load_engine(): | |
| last = None | |
| for p in ENGINE_CANDIDATES: | |
| try: | |
| eng = chess.engine.SimpleEngine.popen_uci(p) | |
| return eng | |
| except Exception as e: | |
| last = e | |
| raise RuntimeError(f"No pude iniciar Stockfish. Último error: {last}") | |
| def score_cp(score: chess.engine.PovScore) -> float: | |
| """Devuelve centipawns desde la perspectiva de las blancas (+ = blancas mejor).""" | |
| if score.is_mate(): | |
| m = score.white().mate() | |
| if m is None: | |
| return 0.0 | |
| return 100000.0 if m > 0 else -100000.0 | |
| return float(score.white().score(mate_score=100000)) | |
| # ============================== | |
| # Reparador PGN simple | |
| # ============================== | |
| def repair_pgn_text(text: str) -> str: | |
| text = text.replace("\r\n", "\n").replace("\r", "\n") | |
| # Si no hay headers mínimos, agrégalos | |
| if "[Event " not in text: | |
| head = ['[Event "?"]','[Site "?"]','[Date "????.??.??"]','[Round "?"]','[White "?"]','[Black "?"]','[Result "*"]','',''] | |
| text = "\n".join(head) + text | |
| # Arreglos menores | |
| text = re.sub(r'\[Ulnite', '[White', text) | |
| text = text.replace('[Result "I-0"]','[Result "1-0"]').replace('[Result "O-I"]','[Result "0-1"]').replace('[Result "I/2-I/2"]','[Result "1/2-1/2"]') | |
| return text.strip() + "\n" | |
| # ============================== | |
| # Features de posición | |
| # ============================== | |
| PIECE_VALUES = {chess.PAWN:1, chess.KNIGHT:3, chess.BISHOP:3, chess.ROOK:5, chess.QUEEN:9} | |
| CENTER = [chess.D4, chess.E4, chess.D5, chess.E5] | |
| def material_eval(board: chess.Board, color=True): | |
| s = 0 | |
| for pt, v in PIECE_VALUES.items(): | |
| s += len(board.pieces(pt, chess.WHITE))*v | |
| s -= len(board.pieces(pt, chess.BLACK))*v | |
| return s if color else -s | |
| def mobility(board: chess.Board): | |
| return board.legal_moves.count() | |
| def king_safety(board: chess.Board, color=chess.WHITE): | |
| king_sq = board.king(color) | |
| if king_sq is None: | |
| return 0 | |
| # casillas alrededor del rey libres del enemigo | |
| danger = 0 | |
| for sq in chess.SquareSet.ring(king_sq): | |
| if board.is_attacked_by(not color, sq): | |
| danger += 1 | |
| return -danger | |
| def hanging_pieces(board: chess.Board, color=chess.WHITE): | |
| # piezas atacadas y no defendidas | |
| count = 0 | |
| for sq in chess.SquareSet(board.occupied_co[color]): | |
| if board.is_attacked_by(not color, sq) and not board.is_attacked_by(color, sq): | |
| count += 1 | |
| return count | |
| def basic_features(board: chess.Board): | |
| turn_white = board.turn | |
| feats = { | |
| "turn_white": 1 if turn_white else 0, | |
| "mat_cp": material_eval(board), # + = blancas | |
| "mobility": mobility(board), | |
| "king_safety_w": king_safety(board, chess.WHITE), | |
| "king_safety_b": king_safety(board, chess.BLACK), | |
| "hanging_w": hanging_pieces(board, chess.WHITE), | |
| "hanging_b": hanging_pieces(board, chess.BLACK), | |
| "center_pawns": int(any(board.piece_at(sq) and board.piece_at(sq).piece_type==chess.PAWN for sq in CENTER)), | |
| "in_check": 1 if board.is_check() else 0, | |
| "phase": len(board.move_stack) # ply actual | |
| } | |
| return feats | |
| FEATURE_ORDER = ["turn_white","mat_cp","mobility","king_safety_w","king_safety_b","hanging_w","hanging_b","center_pawns","in_check","phase","eval_before_cp"] | |
| # ============================== | |
| # Etiquetado de categorías por delta de evaluación | |
| # ============================== | |
| def delta_to_label(delta_cp: float) -> str: | |
| # delta_cp = eval_after(POV side-to-move) - eval_before(POV side-to-move) | |
| drop = -delta_cp # caída (negativo = peor) | |
| if drop < 20: return "Best" | |
| if drop < 60: return "Good" | |
| if drop < 120: return "Inaccuracy" | |
| if drop < 300: return "Mistake" | |
| return "Blunder" | |
| # ============================== | |
| # Generación de dataset (entrenamiento rápido on‑startup) | |
| # ============================== | |
| def generate_training(engine, games=20, plies_per_game=50, time_per=0.1): | |
| rows = [] | |
| for g in range(games): | |
| board = chess.Board() | |
| for _ in range(plies_per_game): | |
| if board.is_game_over(): | |
| break | |
| # Eval antes | |
| info_before = engine.analyse(board, chess.engine.Limit(time=time_per)) | |
| eval_before = score_cp(info_before["score"].pov(chess.WHITE)) | |
| # mejor jugada (para usarla como "buena") | |
| pv = info_before.get("pv") | |
| if pv: | |
| best = pv[0] | |
| else: | |
| # fallback: escoger la primera legal | |
| legal = list(board.legal_moves) | |
| if not legal: break | |
| best = legal[0] | |
| # jugar movimiento aleatorio razonable (50% best, 50% otro random) para variar etiquetas | |
| legal = list(board.legal_moves) | |
| if not legal: break | |
| if random.random() < 0.5: | |
| move = best | |
| else: | |
| move = random.choice(legal) | |
| before_board = board.copy() | |
| before_feats = basic_features(before_board) | |
| before_feats["eval_before_cp"] = eval_before | |
| san_played = board.san(move) if move else None | |
| board.push(move) | |
| info_after = engine.analyse(board, chess.engine.Limit(time=time_per)) | |
| eval_after = score_cp(info_after["score"].pov(chess.WHITE)) | |
| # delta desde POV del bando que movía | |
| if before_board.turn: # blancas movían | |
| delta_cp = eval_after - eval_before | |
| else: # negras movían | |
| delta_cp = -(eval_after - eval_before) | |
| label = delta_to_label(delta_cp) | |
| row = {k: before_feats[k] for k in FEATURE_ORDER if k in before_feats} | |
| row["eval_before_cp"] = before_feats["eval_before_cp"] | |
| row["delta_cp"] = delta_cp | |
| row["label"] = label | |
| row["played_san"] = san_played | |
| rows.append(row) | |
| df = pd.DataFrame(rows) | |
| return df | |
| def train_model_if_needed(): | |
| model_path = "model_rf.joblib" | |
| if os.path.exists(model_path): | |
| try: | |
| clf = load(model_path) | |
| return clf | |
| except Exception: | |
| pass | |
| # Entrenamiento rápido | |
| eng = load_engine() | |
| try: | |
| df = generate_training(eng, games=24, plies_per_game=40, time_per=0.06) | |
| finally: | |
| eng.quit() | |
| # Balance simple | |
| X = df[FEATURE_ORDER].astype(float).values | |
| y = df["label"].values | |
| clf = RandomForestClassifier(n_estimators=140, max_depth=None, random_state=42, n_jobs=-1, class_weight="balanced") | |
| clf.fit(X, y) | |
| dump(clf, model_path) | |
| return clf | |
| # ============================== | |
| # Explicaciones naturales (plantillas simples) | |
| # ============================== | |
| def explain(label: str, delta_cp: float, in_check: int, hanging_w: int, hanging_b: int) -> str: | |
| tips = [] | |
| if label == "Blunder": | |
| tips.append("Grave caída de evaluación; probablemente táctica inmediata o pieza colgando.") | |
| elif label == "Mistake": | |
| tips.append("Movimiento impreciso que cede ventaja significativa.") | |
| elif label == "Inaccuracy": | |
| tips.append("Había alternativas más fuertes según el motor.") | |
| elif label == "Good": | |
| tips.append("Jugada razonable; mantiene la evaluación.") | |
| else: | |
| tips.append("Excelente jugada; coincide con la mejor línea del motor.") | |
| if in_check: | |
| tips.append("El rey estaba en jaque o vulnerable.") | |
| if hanging_w or hanging_b: | |
| tips.append("Detecto pieza(s) atacadas sin defensa; revisa táctica.") | |
| tips.append(f"Δ={round(delta_cp,1)} cp.") | |
| return " ".join(tips) | |
| # ============================== | |
| # Análisis de una partida PGN | |
| # ============================== | |
| def analyze_pgn(pgn_text: str, time_per_move=0.2, depth_limit=0): | |
| pgn_text = repair_pgn_text(pgn_text) | |
| f = StringIO(pgn_text) | |
| # Tomar la primera partida válida con movimientos | |
| game = chess.pgn.read_game(f) | |
| while game is not None and sum(1 for _ in game.mainline_moves()) == 0: | |
| game = chess.pgn.read_game(f) | |
| if game is None: | |
| return None, None, "No se encontró una partida válida." | |
| engine = load_engine() | |
| try: | |
| # Cargar modelo (entrenar si no existe) | |
| clf = train_model_if_needed() | |
| board = game.board() | |
| rows = [] | |
| # Exporter para comentarios | |
| exporter = chess.pgn.StringExporter(headers=True, comments=True, variations=False) | |
| node = game | |
| ply = 0 | |
| while node.variations: | |
| move = node.variation(0).move | |
| turn_white = board.turn | |
| # eval antes | |
| info_b = engine.analyse(board, chess.engine.Limit(time=time_per_move, depth=None if depth_limit<=0 else depth_limit)) | |
| eval_b = score_cp(info_b["score"].pov(chess.WHITE)) | |
| feats = basic_features(board) | |
| feats["eval_before_cp"] = eval_b | |
| X = np.array([[feats.get(k,0.0) for k in FEATURE_ORDER]], dtype=float) | |
| pred = clf.predict(X)[0] | |
| # mejor SAN propuesto (primera PV) | |
| best_move = info_b.get("pv", [move])[0] | |
| best_san = board.san(best_move) if best_move else None | |
| played_san = board.san(move) | |
| board.push(move) | |
| info_a = engine.analyse(board, chess.engine.Limit(time=time_per_move, depth=None if depth_limit<=0 else depth_limit)) | |
| eval_a = score_cp(info_a["score"].pov(chess.WHITE)) | |
| if turn_white: | |
| delta = eval_a - eval_b | |
| else: | |
| delta = -(eval_a - eval_b) | |
| text = explain(pred, delta, feats["in_check"], feats["hanging_w"], feats["hanging_b"]) | |
| # Añadir comentario al nodo siguiente | |
| node = node.variation(0) | |
| node.comment = (node.comment + " " if node.comment else "") + f"[{pred}] Δ={round(delta,1)} | Mejor: {best_san}. {text}" | |
| ply += 1 | |
| rows.append({ | |
| "ply": ply, | |
| "turn": "White" if turn_white else "Black", | |
| "played": played_san, | |
| "best": best_san, | |
| "eval_before_cp": round(eval_b,1), | |
| "eval_after_cp": round(eval_a,1), | |
| "delta_cp": round(delta,1), | |
| "ml_label": pred, | |
| "explanation": text | |
| }) | |
| annotated = game.accept(exporter) | |
| finally: | |
| engine.quit() | |
| # Resumen rápido | |
| worst = sorted(rows, key=lambda r: r["delta_cp"])[:10] | |
| md = ["### Principales caídas de evaluación", | |
| "| Ply | Turno | Jugada | Mejor | Δcp | ML |", | |
| "|---:|:---:|:---|:---|---:|:---|"] | |
| for r in worst: | |
| md.append(f"| {r['ply']} | {r['turn']} | {r['played']} | {r.get('best','')} | {r['delta_cp']} | {r['ml_label']} |") | |
| summary_md = "\n".join(md) | |
| df = pd.DataFrame(rows) | |
| return annotated, summary_md, df.to_csv(index=False) | |
| # ============================== | |
| # Interfaz Gradio | |
| # ============================== | |
| with gr.Blocks(title=APP_TITLE) as demo: | |
| gr.Markdown(f"# {APP_TITLE}\n**Motor + Machine Learning**\n1) Entrenamiento rápido en el arranque (RandomForest).\n2) Analiza tu PGN y comenta jugada por jugada (estilo DecodeChess).\n3) Descarga el PGN anotado y el CSV de jugadas.") | |
| eg = gr.Examples( | |
| examples=[ | |
| ["[Event \"Ejemplo\"]\n[Site \"?\"]\n[Date \"2024.??.??\"]\n[Round \"?\"]\n[White \"Alice\"]\n[Black \"Bob\"]\n[Result \"1-0\"]\n\n1. e4 e5 2. Nf3 Nc6 3. Bb5 a6 4. Ba4 Nf6 5. O-O Be7 6. Re1 b5 7. Bb3 O-O 8. c3 d5 9. exd5 Nxd5 10. Nxe5 Nxe5 11. Rxe5 c6 12. d4 Bd6 13. Re1 Qh4 14. g3 Qh3 15. Qf3 Bg4 16. Qg2 Rae8 17. Be3 Qh5 18. Nd2 Bh3 19. Qf3 Bg4 20. Qg2 f5 21. Bxd5+ cxd5 22. Qxd5+ Kh8 23. Qxd6 f4 24. Bxf4 Bf3 25. Rxe8 Rxe8 26. Be5 Qh3 27. Nxf3 Qf5 28. Qc6 Rf8 29. Qb7 Qg6 30. Nh4 Qc2 31. Qxg7# 1-0"], | |
| inputs=[gr.Textbox(visible=False)] # solo para pre-cargar | |
| ) | |
| pgn_in = gr.Textbox(lines=18, label="PGN (pega tu partida aquí)", placeholder="Pega aquí tu PGN...") | |
| with gr.Row(): | |
| tpm = gr.Slider(0.05, 0.5, value=0.2, step=0.05, label="Tiempo de análisis por jugada (seg)") | |
| depth = gr.Slider(0, 30, value=0, step=1, label="Profundidad máxima (0 = solo por tiempo)") | |
| run = gr.Button("Analizar con IA") | |
| annotated_out = gr.Textbox(lines=12, label="PGN anotado por IA") | |
| summary_out = gr.Markdown(label="Resumen IA") | |
| files_out = gr.Files(label="Descargas (PGN + CSV)") | |
| def _analyze(pgn_text, t, d): | |
| try: | |
| annotated, summary_md, csv_txt = analyze_pgn(pgn_text, time_per_move=t, depth_limit=d) | |
| files = [] | |
| if annotated: | |
| open("anotado.pgn","w",encoding="utf-8").write(annotated) | |
| files.append("anotado.pgn") | |
| if csv_txt: | |
| open("jugadas.csv","w",encoding="utf-8").write(csv_txt) | |
| files.append("jugadas.csv") | |
| return annotated, summary_md, files | |
| except Exception as e: | |
| tb = traceback.format_exc(limit=2) | |
| return f"Error: {e}\n{tb}", "","" | |
| run.click(_analyze, inputs=[pgn_in, tpm, depth], outputs=[annotated_out, summary_out, files_out]) | |
| if __name__ == "__main__": | |
| # Entrenar en arranque (si no existe) | |
| try: | |
| _ = train_model_if_needed() | |
| except Exception as e: | |
| print("⚠️ Entrenamiento en arranque falló:", e) | |
| demo.launch() |