static-embedding-chess / scripts /convert_to_english.py
oneryalcin's picture
Add files using upload-large-folder tool
f8392aa verified
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = ["chess", "datasets>=2.19", "tqdm"]
# ///
"""Deterministic chess→English converter for puzzles.
Generates a standardized English-readable description of each puzzle WITHOUT
any LLM. Uses python-chess for UCI→SAN conversion (with board context), regex
for decamelizing themes, and a fixed template.
For each puzzle, produces a doc like:
"White to move. Short middlegame puzzle with crushing fork and hanging
piece motifs. Opening: King's Pawn Game. Moves: Bxg3 Rxe7 Qb1+ Nc1 Qxc1+
Qxc1"
Pretrained English cross-encoders have seen SAN notation in chess web content
during pretraining, so this doc is semantically meaningful to them — unlike
the raw UCI form (`f2g3`) which gets fragmented into character pieces.
Output: parquet at models/puzzles_english.parquet with columns:
PuzzleId, anchor (original themes+opening str), english_doc
Run:
SMOKE_TEST=1 uv run --exclude-newer=2026-05-12 convert_to_english.py
uv run --exclude-newer=2026-05-12 convert_to_english.py
"""
from __future__ import annotations
import os
import re
import sys
import chess
from datasets import Dataset, load_dataset
from tqdm import tqdm
sys.stdout.reconfigure(line_buffering=True)
OUTPUT_PATH = "models/puzzles_english.parquet"
SMOKE_TEST = os.environ.get("SMOKE_TEST") == "1"
# Length tag mapping
LENGTH_MAP = {
"oneMove": "single-move",
"short": "short",
"long": "long",
"veryLong": "very long",
}
PHASE_TAGS = {"opening", "middlegame", "endgame"}
LENGTH_TAGS = set(LENGTH_MAP.keys())
# Anything matching `mateInN`, `mateIn1`, etc.
MATE_IN_PATTERN = re.compile(r"^mateIn(\d+)$")
# Specific mate-pattern names (their English form is just decamel)
# camelCase → "camel case" via regex
_CAMEL_BOUNDARY = re.compile(r"(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])")
def decamelize(tag: str) -> str:
"""`backRankMate` → 'back rank mate'. `attackingF2F7` → 'attacking f2 f7'."""
return _CAMEL_BOUNDARY.sub(" ", tag).lower()
def themes_to_english(themes: list[str]) -> tuple[str, str, str, list[str]]:
"""Returns (side_phrase, length_phrase, phase, decamelized_other_themes).
Splits themes into structural (phase, length, mate-in-N) and motif (everything else).
The motifs are returned decamelized.
"""
if not themes:
return ("", "", "", [])
phase = ""
length = ""
mate_in = None
motifs = []
for t in themes:
if t in PHASE_TAGS:
phase = t
elif t in LENGTH_TAGS:
length = LENGTH_MAP[t]
elif (m := MATE_IN_PATTERN.match(t)):
mate_in = int(m.group(1))
else:
motifs.append(decamelize(t))
# Mate-in-N gets folded into motifs as natural-language phrase
if mate_in is not None:
motifs.append(f"mate in {mate_in}")
return phase, length, "", motifs # side_phrase computed separately from FEN
def opening_tags_to_english(opening_tags: list[str]) -> str:
"""`['Kings_Pawn_Game', 'Kings_Pawn_Game_Leonardis_Variation']` → 'King's Pawn Game Leonardi's Variation'.
Dedupe by taking the longest matching tag."""
if not opening_tags:
return ""
# Use the longest tag (most specific) and replace underscores with spaces
longest = max(opening_tags, key=len)
return longest.replace("_", " ")
def uci_to_san_sequence(fen: str, uci_moves: str) -> str:
"""Convert UCI move sequence to SAN, using board context for disambiguation."""
try:
board = chess.Board(fen)
san_moves = []
for uci in uci_moves.split():
try:
move = chess.Move.from_uci(uci)
san = board.san(move)
san_moves.append(san)
board.push(move)
except Exception:
# Invalid move — skip rest
break
return " ".join(san_moves)
except Exception:
return uci_moves # fall back to raw UCI
def side_to_move(fen: str) -> str:
parts = fen.split()
if len(parts) >= 2 and parts[1] == "w":
return "White"
return "Black"
def build_english_doc(row: dict) -> str:
"""Build a deterministic English description from a Lichess puzzle row."""
side = side_to_move(row["FEN"])
phase, length, _, motifs = themes_to_english(row["Themes"] or [])
opening = opening_tags_to_english(row.get("OpeningTags") or [])
san = uci_to_san_sequence(row["FEN"], row["Moves"])
# Construct sentence
parts = []
parts.append(f"{side} to move.")
# "Short middlegame puzzle with crushing fork and hanging piece motifs."
descriptor = []
if length:
descriptor.append(length)
if phase:
descriptor.append(phase)
descriptor.append("puzzle")
descriptor_str = " ".join(descriptor)
if motifs:
motifs_str = ", ".join(motifs)
descriptor_str += f" with {motifs_str} motifs"
parts.append(descriptor_str.capitalize() + ".")
if opening:
parts.append(f"Opening: {opening}.")
if san:
parts.append(f"Moves: {san}")
return " ".join(parts)
def build_english_anchor(row: dict) -> str:
"""Anchor side: same as before (themes + opening) but in deterministic English.
Used as query for retrieval/reranker training."""
phase, length, _, motifs = themes_to_english(row["Themes"] or [])
opening = opening_tags_to_english(row.get("OpeningTags") or [])
parts = []
if motifs:
parts.append(", ".join(motifs))
if length:
parts.append(length)
if phase:
parts.append(phase)
if opening:
parts.append(opening)
return " ".join(parts).strip()
def main():
print("Loading puzzles...")
puzzles = load_dataset("Lichess/chess-puzzles", split="train")
if SMOKE_TEST:
puzzles = puzzles.select(range(2_000))
print(f" {len(puzzles):,} rows")
print("Converting to English (deterministic)...")
def proc(batch):
ids, anchors, docs = [], [], []
for r in [{k: batch[k][i] for k in batch} for i in range(len(batch["PuzzleId"]))]:
if not r["Themes"]:
continue
ids.append(r["PuzzleId"])
anchors.append(build_english_anchor(r))
docs.append(build_english_doc(r))
return {"PuzzleId": ids, "anchor_en": anchors, "doc_en": docs}
out = puzzles.map(
proc, batched=True, batch_size=10_000,
remove_columns=puzzles.column_names,
num_proc=4,
)
print(f" produced {len(out):,} English-converted rows")
print("\n=== Sample conversions ===")
for i in [0, 100, 1000]:
r = out[i]
print(f"\nPuzzleId: {r['PuzzleId']}")
print(f" anchor: {r['anchor_en']!r}")
print(f" doc: {r['doc_en'][:200]!r}")
out.to_parquet(OUTPUT_PATH)
print(f"\nSaved to {OUTPUT_PATH} ({os.path.getsize(OUTPUT_PATH) / 1e6:.1f} MB)")
if __name__ == "__main__":
main()