"""Zemberek-based root validation and correction (Fix 4).""" from __future__ import annotations import os from pathlib import Path # ── Zemberek JAR: bundled with package ─────────────────────────────────────── _DATA_DIR = Path(__file__).parent / "data" JAR_PATH = _DATA_DIR / "zemberek-full.jar" ZEMBEREK_AVAILABLE = False _morphology = None def _init_zemberek() -> None: global ZEMBEREK_AVAILABLE, _morphology if not JAR_PATH.exists(): print( f"[TurkTokenizer] zemberek-full.jar not found at {JAR_PATH}\n" " Root validation disabled — morphological fixes will be limited." ) return try: import jpype # noqa: PLC0415 if not jpype.isJVMStarted(): jpype.startJVM( jpype.getDefaultJVMPath(), "-ea", f"-Djava.class.path={JAR_PATH}", convertStrings=False, ) TurkishMorphology = jpype.JClass("zemberek.morphology.TurkishMorphology") _morphology = TurkishMorphology.createWithDefaults() ZEMBEREK_AVAILABLE = True except ImportError: print("[TurkTokenizer] jpype1 not installed → pip install jpype1") except Exception as exc: # noqa: BLE001 print(f"[TurkTokenizer] Zemberek init failed: {exc}") _init_zemberek() # ── Zemberek API helpers ────────────────────────────────────────────────────── def _jstr(s: str): import jpype # noqa: PLC0415 return jpype.JString(s) def analyze_word(word: str) -> list[dict]: """Return all Zemberek analyses for a single word.""" if not ZEMBEREK_AVAILABLE: return [] try: wa = _morphology.analyze(_jstr(word)) return [ { "lemma": str(sa.getDictionaryItem().lemma), "pos": str(sa.getPos().shortForm), "morphemes":[str(m) for m in sa.getMorphemes()], "surface": str(sa.surfaceForm()), } for sa in wa.getAnalysisResults() ] except Exception: # noqa: BLE001 return [] def get_root_and_suffixes(word: str) -> dict | None: """Return root + suffix list for a word, or None if unknown.""" analyses = analyze_word(word) if not analyses: return None a = analyses[0] return {"root": a["lemma"], "suffixes": a["morphemes"][1:], "pos": a["pos"]} # ── Heuristic fallback (no Zemberek) ───────────────────────────────────────── _SPURIOUS_SHORT_ROOTS = {"oğ", "gök", "zo", "me", "im", "pro", "go", "da", "al"} def _is_spurious_root(root: str, next_tokens: list[dict]) -> bool: if root.strip().lower() not in _SPURIOUS_SHORT_ROOTS: return False return sum(1 for t in next_tokens[:3] if t["type"] == "BPE") >= 2 # ── Main validation ─────────────────────────────────────────────────────────── def build_correction_map( original_words: list[str], base_tokenizer ) -> dict[str, str]: """Build a {tokenizer_root → zemberek_root} correction map.""" correction_map: dict[str, str] = {} for word in original_words: w = word.lower().strip("'\".,!?;:()") if not w or len(w) < 3: continue z = get_root_and_suffixes(w) if not z or z["root"] == "UNK": continue z_root = z["root"].lower() try: toks = base_tokenizer.tokenize_text(w) t_root = next( (t["token"].strip().lower() for t in toks if t["type"] == "ROOT"), None, ) except Exception: # noqa: BLE001 continue if not t_root or t_root == z_root: continue diff = len(z_root) - len(t_root) if diff < 0 or diff > 4: continue if not z_root.startswith(t_root): continue correction_map[t_root] = z_root return correction_map def validate_roots( tokens: list[dict], original_words: list[str], base_tokenizer=None, ) -> list[dict]: """Apply Zemberek root corrections to the token stream.""" if not ZEMBEREK_AVAILABLE: result = [] for i, tok in enumerate(tokens): if tok["type"] == "ROOT" and not tok["token"].strip().startswith("<"): if _is_spurious_root(tok["token"], tokens[i + 1 : i + 5]): tok = {**tok, "_suspicious": True} result.append(tok) return result corr = ( build_correction_map(original_words, base_tokenizer) if base_tokenizer is not None else {} ) result = [] for tok in tokens: if tok["type"] != "ROOT" or tok["token"].strip().startswith("<"): result.append(tok) continue surface = tok["token"].strip().lower() correct = corr.get(surface) if correct and correct != surface: leading = " " if tok["token"].startswith(" ") else "" tok = { **tok, "token": leading + correct, "_original_token": tok["token"], "_root_corrected": True, "_note": f"root corrected: '{surface}' → '{correct}'", } result.append(tok) return result def disambiguate_sentence(words: list[str]) -> list[dict | None]: """Sentence-level Zemberek disambiguation.""" if not ZEMBEREK_AVAILABLE: return [None] * len(words) try: sa_result = _morphology.analyzeAndDisambiguate(_jstr(" ".join(words))) best = sa_result.bestAnalysis() out = [] for i in range(best.size()): try: sa = best.get(i) item = sa.getDictionaryItem() out.append({ "lemma": str(item.lemma), "pos": str(sa.getPos().shortForm), "morphemes": [str(m) for m in sa.getMorphemes()], }) except Exception: # noqa: BLE001 out.append(None) while len(out) < len(words): out.append(None) return out[: len(words)] except Exception: # noqa: BLE001 return [analyze_word(w)[0] if analyze_word(w) else None for w in words]