turk-tokenizer / turk_tokenizer /_root_validator.py
nmstech's picture
Initial release: TurkTokenizer v1.0.0 β€” TR-MMLU 92%
ca41c16 verified
"""Zemberek-based root validation and correction (Fix 4)."""
from __future__ import annotations
import os
from pathlib import Path
# ── Zemberek JAR: bundled with package ───────────────────────────────────────
_DATA_DIR = Path(__file__).parent / "data"
JAR_PATH = _DATA_DIR / "zemberek-full.jar"
ZEMBEREK_AVAILABLE = False
_morphology = None
def _init_zemberek() -> None:
global ZEMBEREK_AVAILABLE, _morphology
if not JAR_PATH.exists():
print(
f"[TurkTokenizer] zemberek-full.jar not found at {JAR_PATH}\n"
" Root validation disabled β€” morphological fixes will be limited."
)
return
try:
import jpype # noqa: PLC0415
if not jpype.isJVMStarted():
jpype.startJVM(
jpype.getDefaultJVMPath(),
"-ea",
f"-Djava.class.path={JAR_PATH}",
convertStrings=False,
)
TurkishMorphology = jpype.JClass("zemberek.morphology.TurkishMorphology")
_morphology = TurkishMorphology.createWithDefaults()
ZEMBEREK_AVAILABLE = True
except ImportError:
print("[TurkTokenizer] jpype1 not installed β†’ pip install jpype1")
except Exception as exc: # noqa: BLE001
print(f"[TurkTokenizer] Zemberek init failed: {exc}")
_init_zemberek()
# ── Zemberek API helpers ──────────────────────────────────────────────────────
def _jstr(s: str):
import jpype # noqa: PLC0415
return jpype.JString(s)
def analyze_word(word: str) -> list[dict]:
"""Return all Zemberek analyses for a single word."""
if not ZEMBEREK_AVAILABLE:
return []
try:
wa = _morphology.analyze(_jstr(word))
return [
{
"lemma": str(sa.getDictionaryItem().lemma),
"pos": str(sa.getPos().shortForm),
"morphemes":[str(m) for m in sa.getMorphemes()],
"surface": str(sa.surfaceForm()),
}
for sa in wa.getAnalysisResults()
]
except Exception: # noqa: BLE001
return []
def get_root_and_suffixes(word: str) -> dict | None:
"""Return root + suffix list for a word, or None if unknown."""
analyses = analyze_word(word)
if not analyses:
return None
a = analyses[0]
return {"root": a["lemma"], "suffixes": a["morphemes"][1:], "pos": a["pos"]}
# ── Heuristic fallback (no Zemberek) ─────────────────────────────────────────
_SPURIOUS_SHORT_ROOTS = {"oğ", "gâk", "zo", "me", "im", "pro", "go", "da", "al"}
def _is_spurious_root(root: str, next_tokens: list[dict]) -> bool:
if root.strip().lower() not in _SPURIOUS_SHORT_ROOTS:
return False
return sum(1 for t in next_tokens[:3] if t["type"] == "BPE") >= 2
# ── Main validation ───────────────────────────────────────────────────────────
def build_correction_map(
original_words: list[str], base_tokenizer
) -> dict[str, str]:
"""Build a {tokenizer_root β†’ zemberek_root} correction map."""
correction_map: dict[str, str] = {}
for word in original_words:
w = word.lower().strip("'\".,!?;:()")
if not w or len(w) < 3:
continue
z = get_root_and_suffixes(w)
if not z or z["root"] == "UNK":
continue
z_root = z["root"].lower()
try:
toks = base_tokenizer.tokenize_text(w)
t_root = next(
(t["token"].strip().lower() for t in toks if t["type"] == "ROOT"),
None,
)
except Exception: # noqa: BLE001
continue
if not t_root or t_root == z_root:
continue
diff = len(z_root) - len(t_root)
if diff < 0 or diff > 4:
continue
if not z_root.startswith(t_root):
continue
correction_map[t_root] = z_root
return correction_map
def validate_roots(
tokens: list[dict],
original_words: list[str],
base_tokenizer=None,
) -> list[dict]:
"""Apply Zemberek root corrections to the token stream."""
if not ZEMBEREK_AVAILABLE:
result = []
for i, tok in enumerate(tokens):
if tok["type"] == "ROOT" and not tok["token"].strip().startswith("<"):
if _is_spurious_root(tok["token"], tokens[i + 1 : i + 5]):
tok = {**tok, "_suspicious": True}
result.append(tok)
return result
corr = (
build_correction_map(original_words, base_tokenizer)
if base_tokenizer is not None
else {}
)
result = []
for tok in tokens:
if tok["type"] != "ROOT" or tok["token"].strip().startswith("<"):
result.append(tok)
continue
surface = tok["token"].strip().lower()
correct = corr.get(surface)
if correct and correct != surface:
leading = " " if tok["token"].startswith(" ") else ""
tok = {
**tok,
"token": leading + correct,
"_original_token": tok["token"],
"_root_corrected": True,
"_note": f"root corrected: '{surface}' β†’ '{correct}'",
}
result.append(tok)
return result
def disambiguate_sentence(words: list[str]) -> list[dict | None]:
"""Sentence-level Zemberek disambiguation."""
if not ZEMBEREK_AVAILABLE:
return [None] * len(words)
try:
sa_result = _morphology.analyzeAndDisambiguate(_jstr(" ".join(words)))
best = sa_result.bestAnalysis()
out = []
for i in range(best.size()):
try:
sa = best.get(i)
item = sa.getDictionaryItem()
out.append({
"lemma": str(item.lemma),
"pos": str(sa.getPos().shortForm),
"morphemes": [str(m) for m in sa.getMorphemes()],
})
except Exception: # noqa: BLE001
out.append(None)
while len(out) < len(words):
out.append(None)
return out[: len(words)]
except Exception: # noqa: BLE001
return [analyze_word(w)[0] if analyze_word(w) else None for w in words]