File size: 13,988 Bytes
8a71b3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import os
import io
import re
import math
import time
import random
import traceback
from io import StringIO

import gradio as gr
import numpy as np
import pandas as pd
from joblib import dump, load
from sklearn.ensemble import RandomForestClassifier

import chess, chess.pgn, chess.engine

APP_TITLE = "DecodeChess‑IA — Doctor Linux"
ENGINE_CANDIDATES = ["stockfish", "/usr/bin/stockfish", "/usr/games/stockfish"]

# ==============================
# Utilidades de Motor
# ==============================
def load_engine():
    last = None
    for p in ENGINE_CANDIDATES:
        try:
            eng = chess.engine.SimpleEngine.popen_uci(p)
            return eng
        except Exception as e:
            last = e
    raise RuntimeError(f"No pude iniciar Stockfish. Último error: {last}")

def score_cp(score: chess.engine.PovScore) -> float:
    """Devuelve centipawns desde la perspectiva de las blancas (+ = blancas mejor)."""
    if score.is_mate():
        m = score.white().mate()
        if m is None:
            return 0.0
        return 100000.0 if m > 0 else -100000.0
    return float(score.white().score(mate_score=100000))

# ==============================
# Reparador PGN simple
# ==============================
def repair_pgn_text(text: str) -> str:
    text = text.replace("\r\n", "\n").replace("\r", "\n")
    # Si no hay headers mínimos, agrégalos
    if "[Event " not in text:
        head = ['[Event "?"]','[Site "?"]','[Date "????.??.??"]','[Round "?"]','[White "?"]','[Black "?"]','[Result "*"]','','']
        text = "\n".join(head) + text
    # Arreglos menores
    text = re.sub(r'\[Ulnite', '[White', text)
    text = text.replace('[Result "I-0"]','[Result "1-0"]').replace('[Result "O-I"]','[Result "0-1"]').replace('[Result "I/2-I/2"]','[Result "1/2-1/2"]')
    return text.strip() + "\n"

# ==============================
# Features de posición
# ==============================
PIECE_VALUES = {chess.PAWN:1, chess.KNIGHT:3, chess.BISHOP:3, chess.ROOK:5, chess.QUEEN:9}
CENTER = [chess.D4, chess.E4, chess.D5, chess.E5]

def material_eval(board: chess.Board, color=True):
    s = 0
    for pt, v in PIECE_VALUES.items():
        s += len(board.pieces(pt, chess.WHITE))*v
        s -= len(board.pieces(pt, chess.BLACK))*v
    return s if color else -s

def mobility(board: chess.Board):
    return board.legal_moves.count()

def king_safety(board: chess.Board, color=chess.WHITE):
    king_sq = board.king(color)
    if king_sq is None:
        return 0
    # casillas alrededor del rey libres del enemigo
    danger = 0
    for sq in chess.SquareSet.ring(king_sq):
        if board.is_attacked_by(not color, sq):
            danger += 1
    return -danger

def hanging_pieces(board: chess.Board, color=chess.WHITE):
    # piezas atacadas y no defendidas
    count = 0
    for sq in chess.SquareSet(board.occupied_co[color]):
        if board.is_attacked_by(not color, sq) and not board.is_attacked_by(color, sq):
            count += 1
    return count

def basic_features(board: chess.Board):
    turn_white = board.turn
    feats = {
        "turn_white": 1 if turn_white else 0,
        "mat_cp": material_eval(board),  # + = blancas
        "mobility": mobility(board),
        "king_safety_w": king_safety(board, chess.WHITE),
        "king_safety_b": king_safety(board, chess.BLACK),
        "hanging_w": hanging_pieces(board, chess.WHITE),
        "hanging_b": hanging_pieces(board, chess.BLACK),
        "center_pawns": int(any(board.piece_at(sq) and board.piece_at(sq).piece_type==chess.PAWN for sq in CENTER)),
        "in_check": 1 if board.is_check() else 0,
        "phase": len(board.move_stack)  # ply actual
    }
    return feats

FEATURE_ORDER = ["turn_white","mat_cp","mobility","king_safety_w","king_safety_b","hanging_w","hanging_b","center_pawns","in_check","phase","eval_before_cp"]

# ==============================
# Etiquetado de categorías por delta de evaluación
# ==============================
def delta_to_label(delta_cp: float) -> str:
    # delta_cp = eval_after(POV side-to-move) - eval_before(POV side-to-move)
    drop = -delta_cp  # caída (negativo = peor)
    if drop < 20:   return "Best"
    if drop < 60:   return "Good"
    if drop < 120:  return "Inaccuracy"
    if drop < 300:  return "Mistake"
    return "Blunder"

# ==============================
# Generación de dataset (entrenamiento rápido on‑startup)
# ==============================
def generate_training(engine, games=20, plies_per_game=50, time_per=0.1):
    rows = []
    for g in range(games):
        board = chess.Board()
        for _ in range(plies_per_game):
            if board.is_game_over():
                break
            # Eval antes
            info_before = engine.analyse(board, chess.engine.Limit(time=time_per))
            eval_before = score_cp(info_before["score"].pov(chess.WHITE))
            # mejor jugada (para usarla como "buena")
            pv = info_before.get("pv")
            if pv:
                best = pv[0]
            else:
                # fallback: escoger la primera legal
                legal = list(board.legal_moves)
                if not legal: break
                best = legal[0]

            # jugar movimiento aleatorio razonable (50% best, 50% otro random) para variar etiquetas
            legal = list(board.legal_moves)
            if not legal: break
            if random.random() < 0.5:
                move = best
            else:
                move = random.choice(legal)

            before_board = board.copy()
            before_feats = basic_features(before_board)
            before_feats["eval_before_cp"] = eval_before

            san_played = board.san(move) if move else None
            board.push(move)

            info_after = engine.analyse(board, chess.engine.Limit(time=time_per))
            eval_after = score_cp(info_after["score"].pov(chess.WHITE))

            # delta desde POV del bando que movía
            if before_board.turn:  # blancas movían
                delta_cp = eval_after - eval_before
            else:                  # negras movían
                delta_cp = -(eval_after - eval_before)

            label = delta_to_label(delta_cp)

            row = {k: before_feats[k] for k in FEATURE_ORDER if k in before_feats}
            row["eval_before_cp"] = before_feats["eval_before_cp"]
            row["delta_cp"] = delta_cp
            row["label"] = label
            row["played_san"] = san_played
            rows.append(row)
    df = pd.DataFrame(rows)
    return df

def train_model_if_needed():
    model_path = "model_rf.joblib"
    if os.path.exists(model_path):
        try:
            clf = load(model_path)
            return clf
        except Exception:
            pass
    # Entrenamiento rápido
    eng = load_engine()
    try:
        df = generate_training(eng, games=24, plies_per_game=40, time_per=0.06)
    finally:
        eng.quit()
    # Balance simple
    X = df[FEATURE_ORDER].astype(float).values
    y = df["label"].values
    clf = RandomForestClassifier(n_estimators=140, max_depth=None, random_state=42, n_jobs=-1, class_weight="balanced")
    clf.fit(X, y)
    dump(clf, model_path)
    return clf

# ==============================
# Explicaciones naturales (plantillas simples)
# ==============================
def explain(label: str, delta_cp: float, in_check: int, hanging_w: int, hanging_b: int) -> str:
    tips = []
    if label == "Blunder":
        tips.append("Grave caída de evaluación; probablemente táctica inmediata o pieza colgando.")
    elif label == "Mistake":
        tips.append("Movimiento impreciso que cede ventaja significativa.")
    elif label == "Inaccuracy":
        tips.append("Había alternativas más fuertes según el motor.")
    elif label == "Good":
        tips.append("Jugada razonable; mantiene la evaluación.")
    else:
        tips.append("Excelente jugada; coincide con la mejor línea del motor.")
    if in_check:
        tips.append("El rey estaba en jaque o vulnerable.")
    if hanging_w or hanging_b:
        tips.append("Detecto pieza(s) atacadas sin defensa; revisa táctica.")
    tips.append(f"Δ={round(delta_cp,1)} cp.")
    return " ".join(tips)

# ==============================
# Análisis de una partida PGN
# ==============================
def analyze_pgn(pgn_text: str, time_per_move=0.2, depth_limit=0):
    pgn_text = repair_pgn_text(pgn_text)
    f = StringIO(pgn_text)

    # Tomar la primera partida válida con movimientos
    game = chess.pgn.read_game(f)
    while game is not None and sum(1 for _ in game.mainline_moves()) == 0:
        game = chess.pgn.read_game(f)
    if game is None:
        return None, None, "No se encontró una partida válida."

    engine = load_engine()
    try:
        # Cargar modelo (entrenar si no existe)
        clf = train_model_if_needed()
        board = game.board()
        rows = []
        # Exporter para comentarios
        exporter = chess.pgn.StringExporter(headers=True, comments=True, variations=False)

        node = game
        ply = 0
        while node.variations:
            move = node.variation(0).move
            turn_white = board.turn
            # eval antes
            info_b = engine.analyse(board, chess.engine.Limit(time=time_per_move, depth=None if depth_limit<=0 else depth_limit))
            eval_b = score_cp(info_b["score"].pov(chess.WHITE))

            feats = basic_features(board)
            feats["eval_before_cp"] = eval_b
            X = np.array([[feats.get(k,0.0) for k in FEATURE_ORDER]], dtype=float)
            pred = clf.predict(X)[0]

            # mejor SAN propuesto (primera PV)
            best_move = info_b.get("pv", [move])[0]
            best_san = board.san(best_move) if best_move else None

            played_san = board.san(move)
            board.push(move)

            info_a = engine.analyse(board, chess.engine.Limit(time=time_per_move, depth=None if depth_limit<=0 else depth_limit))
            eval_a = score_cp(info_a["score"].pov(chess.WHITE))

            if turn_white:
                delta = eval_a - eval_b
            else:
                delta = -(eval_a - eval_b)

            text = explain(pred, delta, feats["in_check"], feats["hanging_w"], feats["hanging_b"])
            # Añadir comentario al nodo siguiente
            node = node.variation(0)
            node.comment = (node.comment + " " if node.comment else "") + f"[{pred}] Δ={round(delta,1)} | Mejor: {best_san}. {text}"

            ply += 1
            rows.append({
                "ply": ply,
                "turn": "White" if turn_white else "Black",
                "played": played_san,
                "best": best_san,
                "eval_before_cp": round(eval_b,1),
                "eval_after_cp": round(eval_a,1),
                "delta_cp": round(delta,1),
                "ml_label": pred,
                "explanation": text
            })

        annotated = game.accept(exporter)
    finally:
        engine.quit()

    # Resumen rápido
    worst = sorted(rows, key=lambda r: r["delta_cp"])[:10]
    md = ["### Principales caídas de evaluación",
          "| Ply | Turno | Jugada | Mejor | Δcp | ML |",
          "|---:|:---:|:---|:---|---:|:---|"]
    for r in worst:
        md.append(f"| {r['ply']} | {r['turn']} | {r['played']} | {r.get('best','')} | {r['delta_cp']} | {r['ml_label']} |")
    summary_md = "\n".join(md)

    df = pd.DataFrame(rows)
    return annotated, summary_md, df.to_csv(index=False)

# ==============================
# Interfaz Gradio
# ==============================
with gr.Blocks(title=APP_TITLE) as demo:
    gr.Markdown(f"# {APP_TITLE}\n**Motor + Machine Learning**\n1) Entrenamiento rápido en el arranque (RandomForest).\n2) Analiza tu PGN y comenta jugada por jugada (estilo DecodeChess).\n3) Descarga el PGN anotado y el CSV de jugadas.")

    eg = gr.Examples(
        examples=[
            ["[Event \"Ejemplo\"]\n[Site \"?\"]\n[Date \"2024.??.??\"]\n[Round \"?\"]\n[White \"Alice\"]\n[Black \"Bob\"]\n[Result \"1-0\"]\n\n1. e4 e5 2. Nf3 Nc6 3. Bb5 a6 4. Ba4 Nf6 5. O-O Be7 6. Re1 b5 7. Bb3 O-O 8. c3 d5 9. exd5 Nxd5 10. Nxe5 Nxe5 11. Rxe5 c6 12. d4 Bd6 13. Re1 Qh4 14. g3 Qh3 15. Qf3 Bg4 16. Qg2 Rae8 17. Be3 Qh5 18. Nd2 Bh3 19. Qf3 Bg4 20. Qg2 f5 21. Bxd5+ cxd5 22. Qxd5+ Kh8 23. Qxd6 f4 24. Bxf4 Bf3 25. Rxe8 Rxe8 26. Be5 Qh3 27. Nxf3 Qf5 28. Qc6 Rf8 29. Qb7 Qg6 30. Nh4 Qc2 31. Qxg7# 1-0"],
        inputs=[gr.Textbox(visible=False)]  # solo para pre-cargar
    )

    pgn_in = gr.Textbox(lines=18, label="PGN (pega tu partida aquí)", placeholder="Pega aquí tu PGN...")

    with gr.Row():
        tpm = gr.Slider(0.05, 0.5, value=0.2, step=0.05, label="Tiempo de análisis por jugada (seg)")
        depth = gr.Slider(0, 30, value=0, step=1, label="Profundidad máxima (0 = solo por tiempo)")

    run = gr.Button("Analizar con IA")

    annotated_out = gr.Textbox(lines=12, label="PGN anotado por IA")
    summary_out = gr.Markdown(label="Resumen IA")
    files_out = gr.Files(label="Descargas (PGN + CSV)")

    def _analyze(pgn_text, t, d):
        try:
            annotated, summary_md, csv_txt = analyze_pgn(pgn_text, time_per_move=t, depth_limit=d)
            files = []
            if annotated:
                open("anotado.pgn","w",encoding="utf-8").write(annotated)
                files.append("anotado.pgn")
            if csv_txt:
                open("jugadas.csv","w",encoding="utf-8").write(csv_txt)
                files.append("jugadas.csv")
            return annotated, summary_md, files
        except Exception as e:
            tb = traceback.format_exc(limit=2)
            return f"Error: {e}\n{tb}", "",""

    run.click(_analyze, inputs=[pgn_in, tpm, depth], outputs=[annotated_out, summary_out, files_out])

if __name__ == "__main__":
    # Entrenar en arranque (si no existe)
    try:
        _ = train_model_if_needed()
    except Exception as e:
        print("⚠️ Entrenamiento en arranque falló:", e)
    demo.launch()