ml / app.py
doctorlinux's picture
Upload 5 files
8a71b3f verified
import os
import io
import re
import math
import time
import random
import traceback
from io import StringIO
import gradio as gr
import numpy as np
import pandas as pd
from joblib import dump, load
from sklearn.ensemble import RandomForestClassifier
import chess, chess.pgn, chess.engine
APP_TITLE = "DecodeChess‑IA — Doctor Linux"
ENGINE_CANDIDATES = ["stockfish", "/usr/bin/stockfish", "/usr/games/stockfish"]
# ==============================
# Utilidades de Motor
# ==============================
def load_engine():
last = None
for p in ENGINE_CANDIDATES:
try:
eng = chess.engine.SimpleEngine.popen_uci(p)
return eng
except Exception as e:
last = e
raise RuntimeError(f"No pude iniciar Stockfish. Último error: {last}")
def score_cp(score: chess.engine.PovScore) -> float:
"""Devuelve centipawns desde la perspectiva de las blancas (+ = blancas mejor)."""
if score.is_mate():
m = score.white().mate()
if m is None:
return 0.0
return 100000.0 if m > 0 else -100000.0
return float(score.white().score(mate_score=100000))
# ==============================
# Reparador PGN simple
# ==============================
def repair_pgn_text(text: str) -> str:
text = text.replace("\r\n", "\n").replace("\r", "\n")
# Si no hay headers mínimos, agrégalos
if "[Event " not in text:
head = ['[Event "?"]','[Site "?"]','[Date "????.??.??"]','[Round "?"]','[White "?"]','[Black "?"]','[Result "*"]','','']
text = "\n".join(head) + text
# Arreglos menores
text = re.sub(r'\[Ulnite', '[White', text)
text = text.replace('[Result "I-0"]','[Result "1-0"]').replace('[Result "O-I"]','[Result "0-1"]').replace('[Result "I/2-I/2"]','[Result "1/2-1/2"]')
return text.strip() + "\n"
# ==============================
# Features de posición
# ==============================
PIECE_VALUES = {chess.PAWN:1, chess.KNIGHT:3, chess.BISHOP:3, chess.ROOK:5, chess.QUEEN:9}
CENTER = [chess.D4, chess.E4, chess.D5, chess.E5]
def material_eval(board: chess.Board, color=True):
s = 0
for pt, v in PIECE_VALUES.items():
s += len(board.pieces(pt, chess.WHITE))*v
s -= len(board.pieces(pt, chess.BLACK))*v
return s if color else -s
def mobility(board: chess.Board):
return board.legal_moves.count()
def king_safety(board: chess.Board, color=chess.WHITE):
king_sq = board.king(color)
if king_sq is None:
return 0
# casillas alrededor del rey libres del enemigo
danger = 0
for sq in chess.SquareSet.ring(king_sq):
if board.is_attacked_by(not color, sq):
danger += 1
return -danger
def hanging_pieces(board: chess.Board, color=chess.WHITE):
# piezas atacadas y no defendidas
count = 0
for sq in chess.SquareSet(board.occupied_co[color]):
if board.is_attacked_by(not color, sq) and not board.is_attacked_by(color, sq):
count += 1
return count
def basic_features(board: chess.Board):
turn_white = board.turn
feats = {
"turn_white": 1 if turn_white else 0,
"mat_cp": material_eval(board), # + = blancas
"mobility": mobility(board),
"king_safety_w": king_safety(board, chess.WHITE),
"king_safety_b": king_safety(board, chess.BLACK),
"hanging_w": hanging_pieces(board, chess.WHITE),
"hanging_b": hanging_pieces(board, chess.BLACK),
"center_pawns": int(any(board.piece_at(sq) and board.piece_at(sq).piece_type==chess.PAWN for sq in CENTER)),
"in_check": 1 if board.is_check() else 0,
"phase": len(board.move_stack) # ply actual
}
return feats
FEATURE_ORDER = ["turn_white","mat_cp","mobility","king_safety_w","king_safety_b","hanging_w","hanging_b","center_pawns","in_check","phase","eval_before_cp"]
# ==============================
# Etiquetado de categorías por delta de evaluación
# ==============================
def delta_to_label(delta_cp: float) -> str:
# delta_cp = eval_after(POV side-to-move) - eval_before(POV side-to-move)
drop = -delta_cp # caída (negativo = peor)
if drop < 20: return "Best"
if drop < 60: return "Good"
if drop < 120: return "Inaccuracy"
if drop < 300: return "Mistake"
return "Blunder"
# ==============================
# Generación de dataset (entrenamiento rápido on‑startup)
# ==============================
def generate_training(engine, games=20, plies_per_game=50, time_per=0.1):
rows = []
for g in range(games):
board = chess.Board()
for _ in range(plies_per_game):
if board.is_game_over():
break
# Eval antes
info_before = engine.analyse(board, chess.engine.Limit(time=time_per))
eval_before = score_cp(info_before["score"].pov(chess.WHITE))
# mejor jugada (para usarla como "buena")
pv = info_before.get("pv")
if pv:
best = pv[0]
else:
# fallback: escoger la primera legal
legal = list(board.legal_moves)
if not legal: break
best = legal[0]
# jugar movimiento aleatorio razonable (50% best, 50% otro random) para variar etiquetas
legal = list(board.legal_moves)
if not legal: break
if random.random() < 0.5:
move = best
else:
move = random.choice(legal)
before_board = board.copy()
before_feats = basic_features(before_board)
before_feats["eval_before_cp"] = eval_before
san_played = board.san(move) if move else None
board.push(move)
info_after = engine.analyse(board, chess.engine.Limit(time=time_per))
eval_after = score_cp(info_after["score"].pov(chess.WHITE))
# delta desde POV del bando que movía
if before_board.turn: # blancas movían
delta_cp = eval_after - eval_before
else: # negras movían
delta_cp = -(eval_after - eval_before)
label = delta_to_label(delta_cp)
row = {k: before_feats[k] for k in FEATURE_ORDER if k in before_feats}
row["eval_before_cp"] = before_feats["eval_before_cp"]
row["delta_cp"] = delta_cp
row["label"] = label
row["played_san"] = san_played
rows.append(row)
df = pd.DataFrame(rows)
return df
def train_model_if_needed():
model_path = "model_rf.joblib"
if os.path.exists(model_path):
try:
clf = load(model_path)
return clf
except Exception:
pass
# Entrenamiento rápido
eng = load_engine()
try:
df = generate_training(eng, games=24, plies_per_game=40, time_per=0.06)
finally:
eng.quit()
# Balance simple
X = df[FEATURE_ORDER].astype(float).values
y = df["label"].values
clf = RandomForestClassifier(n_estimators=140, max_depth=None, random_state=42, n_jobs=-1, class_weight="balanced")
clf.fit(X, y)
dump(clf, model_path)
return clf
# ==============================
# Explicaciones naturales (plantillas simples)
# ==============================
def explain(label: str, delta_cp: float, in_check: int, hanging_w: int, hanging_b: int) -> str:
tips = []
if label == "Blunder":
tips.append("Grave caída de evaluación; probablemente táctica inmediata o pieza colgando.")
elif label == "Mistake":
tips.append("Movimiento impreciso que cede ventaja significativa.")
elif label == "Inaccuracy":
tips.append("Había alternativas más fuertes según el motor.")
elif label == "Good":
tips.append("Jugada razonable; mantiene la evaluación.")
else:
tips.append("Excelente jugada; coincide con la mejor línea del motor.")
if in_check:
tips.append("El rey estaba en jaque o vulnerable.")
if hanging_w or hanging_b:
tips.append("Detecto pieza(s) atacadas sin defensa; revisa táctica.")
tips.append(f"Δ={round(delta_cp,1)} cp.")
return " ".join(tips)
# ==============================
# Análisis de una partida PGN
# ==============================
def analyze_pgn(pgn_text: str, time_per_move=0.2, depth_limit=0):
pgn_text = repair_pgn_text(pgn_text)
f = StringIO(pgn_text)
# Tomar la primera partida válida con movimientos
game = chess.pgn.read_game(f)
while game is not None and sum(1 for _ in game.mainline_moves()) == 0:
game = chess.pgn.read_game(f)
if game is None:
return None, None, "No se encontró una partida válida."
engine = load_engine()
try:
# Cargar modelo (entrenar si no existe)
clf = train_model_if_needed()
board = game.board()
rows = []
# Exporter para comentarios
exporter = chess.pgn.StringExporter(headers=True, comments=True, variations=False)
node = game
ply = 0
while node.variations:
move = node.variation(0).move
turn_white = board.turn
# eval antes
info_b = engine.analyse(board, chess.engine.Limit(time=time_per_move, depth=None if depth_limit<=0 else depth_limit))
eval_b = score_cp(info_b["score"].pov(chess.WHITE))
feats = basic_features(board)
feats["eval_before_cp"] = eval_b
X = np.array([[feats.get(k,0.0) for k in FEATURE_ORDER]], dtype=float)
pred = clf.predict(X)[0]
# mejor SAN propuesto (primera PV)
best_move = info_b.get("pv", [move])[0]
best_san = board.san(best_move) if best_move else None
played_san = board.san(move)
board.push(move)
info_a = engine.analyse(board, chess.engine.Limit(time=time_per_move, depth=None if depth_limit<=0 else depth_limit))
eval_a = score_cp(info_a["score"].pov(chess.WHITE))
if turn_white:
delta = eval_a - eval_b
else:
delta = -(eval_a - eval_b)
text = explain(pred, delta, feats["in_check"], feats["hanging_w"], feats["hanging_b"])
# Añadir comentario al nodo siguiente
node = node.variation(0)
node.comment = (node.comment + " " if node.comment else "") + f"[{pred}] Δ={round(delta,1)} | Mejor: {best_san}. {text}"
ply += 1
rows.append({
"ply": ply,
"turn": "White" if turn_white else "Black",
"played": played_san,
"best": best_san,
"eval_before_cp": round(eval_b,1),
"eval_after_cp": round(eval_a,1),
"delta_cp": round(delta,1),
"ml_label": pred,
"explanation": text
})
annotated = game.accept(exporter)
finally:
engine.quit()
# Resumen rápido
worst = sorted(rows, key=lambda r: r["delta_cp"])[:10]
md = ["### Principales caídas de evaluación",
"| Ply | Turno | Jugada | Mejor | Δcp | ML |",
"|---:|:---:|:---|:---|---:|:---|"]
for r in worst:
md.append(f"| {r['ply']} | {r['turn']} | {r['played']} | {r.get('best','')} | {r['delta_cp']} | {r['ml_label']} |")
summary_md = "\n".join(md)
df = pd.DataFrame(rows)
return annotated, summary_md, df.to_csv(index=False)
# ==============================
# Interfaz Gradio
# ==============================
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(f"# {APP_TITLE}\n**Motor + Machine Learning**\n1) Entrenamiento rápido en el arranque (RandomForest).\n2) Analiza tu PGN y comenta jugada por jugada (estilo DecodeChess).\n3) Descarga el PGN anotado y el CSV de jugadas.")
eg = gr.Examples(
examples=[
["[Event \"Ejemplo\"]\n[Site \"?\"]\n[Date \"2024.??.??\"]\n[Round \"?\"]\n[White \"Alice\"]\n[Black \"Bob\"]\n[Result \"1-0\"]\n\n1. e4 e5 2. Nf3 Nc6 3. Bb5 a6 4. Ba4 Nf6 5. O-O Be7 6. Re1 b5 7. Bb3 O-O 8. c3 d5 9. exd5 Nxd5 10. Nxe5 Nxe5 11. Rxe5 c6 12. d4 Bd6 13. Re1 Qh4 14. g3 Qh3 15. Qf3 Bg4 16. Qg2 Rae8 17. Be3 Qh5 18. Nd2 Bh3 19. Qf3 Bg4 20. Qg2 f5 21. Bxd5+ cxd5 22. Qxd5+ Kh8 23. Qxd6 f4 24. Bxf4 Bf3 25. Rxe8 Rxe8 26. Be5 Qh3 27. Nxf3 Qf5 28. Qc6 Rf8 29. Qb7 Qg6 30. Nh4 Qc2 31. Qxg7# 1-0"],
inputs=[gr.Textbox(visible=False)] # solo para pre-cargar
)
pgn_in = gr.Textbox(lines=18, label="PGN (pega tu partida aquí)", placeholder="Pega aquí tu PGN...")
with gr.Row():
tpm = gr.Slider(0.05, 0.5, value=0.2, step=0.05, label="Tiempo de análisis por jugada (seg)")
depth = gr.Slider(0, 30, value=0, step=1, label="Profundidad máxima (0 = solo por tiempo)")
run = gr.Button("Analizar con IA")
annotated_out = gr.Textbox(lines=12, label="PGN anotado por IA")
summary_out = gr.Markdown(label="Resumen IA")
files_out = gr.Files(label="Descargas (PGN + CSV)")
def _analyze(pgn_text, t, d):
try:
annotated, summary_md, csv_txt = analyze_pgn(pgn_text, time_per_move=t, depth_limit=d)
files = []
if annotated:
open("anotado.pgn","w",encoding="utf-8").write(annotated)
files.append("anotado.pgn")
if csv_txt:
open("jugadas.csv","w",encoding="utf-8").write(csv_txt)
files.append("jugadas.csv")
return annotated, summary_md, files
except Exception as e:
tb = traceback.format_exc(limit=2)
return f"Error: {e}\n{tb}", "",""
run.click(_analyze, inputs=[pgn_in, tpm, depth], outputs=[annotated_out, summary_out, files_out])
if __name__ == "__main__":
# Entrenar en arranque (si no existe)
try:
_ = train_model_if_needed()
except Exception as e:
print("⚠️ Entrenamiento en arranque falló:", e)
demo.launch()