| """Zemberek-based root validation and correction (Fix 4).""" |
|
|
| from __future__ import annotations |
|
|
| import os |
| from pathlib import Path |
|
|
| |
|
|
| _DATA_DIR = Path(__file__).parent / "data" |
| JAR_PATH = _DATA_DIR / "zemberek-full.jar" |
|
|
| ZEMBEREK_AVAILABLE = False |
| _morphology = None |
|
|
|
|
| def _init_zemberek() -> None: |
| global ZEMBEREK_AVAILABLE, _morphology |
|
|
| if not JAR_PATH.exists(): |
| print( |
| f"[TurkTokenizer] zemberek-full.jar not found at {JAR_PATH}\n" |
| " Root validation disabled β morphological fixes will be limited." |
| ) |
| return |
|
|
| try: |
| import jpype |
|
|
| if not jpype.isJVMStarted(): |
| jpype.startJVM( |
| jpype.getDefaultJVMPath(), |
| "-ea", |
| f"-Djava.class.path={JAR_PATH}", |
| convertStrings=False, |
| ) |
|
|
| TurkishMorphology = jpype.JClass("zemberek.morphology.TurkishMorphology") |
| _morphology = TurkishMorphology.createWithDefaults() |
| ZEMBEREK_AVAILABLE = True |
|
|
| except ImportError: |
| print("[TurkTokenizer] jpype1 not installed β pip install jpype1") |
| except Exception as exc: |
| print(f"[TurkTokenizer] Zemberek init failed: {exc}") |
|
|
|
|
| _init_zemberek() |
|
|
|
|
| |
|
|
| def _jstr(s: str): |
| import jpype |
| return jpype.JString(s) |
|
|
|
|
| def analyze_word(word: str) -> list[dict]: |
| """Return all Zemberek analyses for a single word.""" |
| if not ZEMBEREK_AVAILABLE: |
| return [] |
| try: |
| wa = _morphology.analyze(_jstr(word)) |
| return [ |
| { |
| "lemma": str(sa.getDictionaryItem().lemma), |
| "pos": str(sa.getPos().shortForm), |
| "morphemes":[str(m) for m in sa.getMorphemes()], |
| "surface": str(sa.surfaceForm()), |
| } |
| for sa in wa.getAnalysisResults() |
| ] |
| except Exception: |
| return [] |
|
|
|
|
| def get_root_and_suffixes(word: str) -> dict | None: |
| """Return root + suffix list for a word, or None if unknown.""" |
| analyses = analyze_word(word) |
| if not analyses: |
| return None |
| a = analyses[0] |
| return {"root": a["lemma"], "suffixes": a["morphemes"][1:], "pos": a["pos"]} |
|
|
|
|
| |
|
|
| _SPURIOUS_SHORT_ROOTS = {"oΔ", "gΓΆk", "zo", "me", "im", "pro", "go", "da", "al"} |
|
|
|
|
| def _is_spurious_root(root: str, next_tokens: list[dict]) -> bool: |
| if root.strip().lower() not in _SPURIOUS_SHORT_ROOTS: |
| return False |
| return sum(1 for t in next_tokens[:3] if t["type"] == "BPE") >= 2 |
|
|
|
|
| |
|
|
| def build_correction_map( |
| original_words: list[str], base_tokenizer |
| ) -> dict[str, str]: |
| """Build a {tokenizer_root β zemberek_root} correction map.""" |
| correction_map: dict[str, str] = {} |
|
|
| for word in original_words: |
| w = word.lower().strip("'\".,!?;:()") |
| if not w or len(w) < 3: |
| continue |
|
|
| z = get_root_and_suffixes(w) |
| if not z or z["root"] == "UNK": |
| continue |
| z_root = z["root"].lower() |
|
|
| try: |
| toks = base_tokenizer.tokenize_text(w) |
| t_root = next( |
| (t["token"].strip().lower() for t in toks if t["type"] == "ROOT"), |
| None, |
| ) |
| except Exception: |
| continue |
|
|
| if not t_root or t_root == z_root: |
| continue |
|
|
| diff = len(z_root) - len(t_root) |
| if diff < 0 or diff > 4: |
| continue |
| if not z_root.startswith(t_root): |
| continue |
|
|
| correction_map[t_root] = z_root |
|
|
| return correction_map |
|
|
|
|
| def validate_roots( |
| tokens: list[dict], |
| original_words: list[str], |
| base_tokenizer=None, |
| ) -> list[dict]: |
| """Apply Zemberek root corrections to the token stream.""" |
| if not ZEMBEREK_AVAILABLE: |
| result = [] |
| for i, tok in enumerate(tokens): |
| if tok["type"] == "ROOT" and not tok["token"].strip().startswith("<"): |
| if _is_spurious_root(tok["token"], tokens[i + 1 : i + 5]): |
| tok = {**tok, "_suspicious": True} |
| result.append(tok) |
| return result |
|
|
| corr = ( |
| build_correction_map(original_words, base_tokenizer) |
| if base_tokenizer is not None |
| else {} |
| ) |
|
|
| result = [] |
| for tok in tokens: |
| if tok["type"] != "ROOT" or tok["token"].strip().startswith("<"): |
| result.append(tok) |
| continue |
|
|
| surface = tok["token"].strip().lower() |
| correct = corr.get(surface) |
|
|
| if correct and correct != surface: |
| leading = " " if tok["token"].startswith(" ") else "" |
| tok = { |
| **tok, |
| "token": leading + correct, |
| "_original_token": tok["token"], |
| "_root_corrected": True, |
| "_note": f"root corrected: '{surface}' β '{correct}'", |
| } |
|
|
| result.append(tok) |
|
|
| return result |
|
|
|
|
| def disambiguate_sentence(words: list[str]) -> list[dict | None]: |
| """Sentence-level Zemberek disambiguation.""" |
| if not ZEMBEREK_AVAILABLE: |
| return [None] * len(words) |
| try: |
| sa_result = _morphology.analyzeAndDisambiguate(_jstr(" ".join(words))) |
| best = sa_result.bestAnalysis() |
| out = [] |
| for i in range(best.size()): |
| try: |
| sa = best.get(i) |
| item = sa.getDictionaryItem() |
| out.append({ |
| "lemma": str(item.lemma), |
| "pos": str(sa.getPos().shortForm), |
| "morphemes": [str(m) for m in sa.getMorphemes()], |
| }) |
| except Exception: |
| out.append(None) |
| while len(out) < len(words): |
| out.append(None) |
| return out[: len(words)] |
| except Exception: |
| return [analyze_word(w)[0] if analyze_word(w) else None for w in words] |
|
|