Spaces:
Running
Running
| """ | |
| Deteksi urutan kata yang janggal dalam kalimat Bahasa Indonesia, | |
| mis. "makan suka aku" yang mestinya "aku suka makan". | |
| Karena urutan kata Indonesia cukup fleksibel, deteksi tidak memakai aturan tata bahasa | |
| kaku. Tiap kalimat diberi skor kewajaran dengan IndoBERT (pseudo-log-likelihood: tiap | |
| token ditutup bergantian lalu ditebak modelnya), lalu dibandingkan dengan beberapa | |
| permutasi acak kata-katanya sendiri. Kalimat yang wajar mengungguli acakannya; kalau | |
| justru kalah, kalimat ditandai UNUSUAL_WORD_ORDER. Cara ini bebas bias panjang dan | |
| kosakata. Temuannya bersifat saran, bukan vonis. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import random | |
| import re | |
| import threading | |
| from dataclasses import dataclass | |
| logger = logging.getLogger(__name__) | |
| # Konstanta & Konfigurasi | |
| _MODEL_ID = "indolem/indobert-base-uncased" | |
| # Panjang kalimat minimum (dalam kata) yang diperiksa. Kalimat 1–2 kata tidak | |
| # memiliki urutan yang bermakna untuk dibandingkan: 2 kata hanya menghasilkan | |
| # 1 permutasi sehingga rasio check menjadi biner (0% atau 100%) dan rawan false | |
| # positive. Nilai 3 memastikan permutasi cukup beragam untuk evaluasi yang andal. | |
| _MIN_WORDS = 3 | |
| _MAX_WORDS = 500 | |
| # Token wordpiece maksimum per kalimat. Ini batas arsitektur model IndoBERT | |
| # (maksimum 512 posisi), bukan batas yang kita pilih: kalimat sangat panjang | |
| # tetap diproses, hanya dipotong agar model tidak gagal. | |
| _MAX_TOKENS = 256 | |
| # Jumlah kalimat maksimum per pemanggilan. Dibuat longgar agar field panjang | |
| # tetap diperiksa seluruhnya; nilai ini hanya pelindung latensi. | |
| _MAX_SENTENCES = 100 | |
| # Jumlah permutasi acak per kalimat untuk pembanding self-normalizing. | |
| _SHUFFLE_COUNT = 6 | |
| # Ambang rasio: tandai jika fraksi permutasi acak yang LEBIH WAJAR dari urutan | |
| # asli mencapai nilai ini. 0.66 = mayoritas (≥2/3) acakan mengalahkan urutan asli | |
| # → indikasi kuat urutan kata bermasalah. Makin tinggi → makin konservatif. | |
| _RATIO_THRESHOLD = 0.66 | |
| # Seed tetap agar hasil deterministik antar-pemanggilan. | |
| _RNG_SEED = 1234 | |
| # Tipe Data | |
| class SyntaxFinding: | |
| """Satu kalimat dengan urutan kata yang terindikasi janggal.""" | |
| sentence: str # kalimat yang ditandai (dipotong jika > 80 karakter di pipeline) | |
| start: int # offset karakter awal dalam teks asli | |
| end: int # offset karakter akhir (eksklusif) | |
| score: float # rata-rata log-probabilitas per token (makin rendah makin janggal) | |
| reason: str # penjelasan untuk pengguna | |
| confidence: float # skor kepercayaan 0.0–1.0 | |
| # Tokenisasi Kalimat | |
| # Pisah teks menjadi kalimat berdasarkan tanda akhir (. ! ?) atau baris baru, | |
| # sambil mempertahankan offset karakter di teks asli. | |
| _SENTENCE_SPLIT = re.compile(r'(?:[^.!?\n]|(?<=\d)\.(?=\d))+(?:[.!?]+|\n|$)') | |
| # Pola kalimat yang berisi identifier/alamat/data PII. Kalimat semacam ini | |
| # bukan bahasa naratif sehingga PLL scoring menghasilkan false positive. | |
| _PII_ADDRESS_SKIP = re.compile( | |
| r'(?:' | |
| r'NIK|NPWP|BPJS|SIM\b|KTP|Paspor' # dokumen identitas | |
| r'|Jl\b|Jln\b|Jalan\b|No\.\s*\d' # alamat | |
| r'|(?:0\d{2,3}[-\s]?\d{4,})' # nomor telepon | |
| r'|\b\d{10,}' # deretan angka panjang (NIK, rekening) | |
| r'|\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b' # alamat IP | |
| r'|\b[A-Z]\s?\d{4}\s?[A-Z]{1,3}\b' # plat nomor | |
| r'|\b\d{4}\s?\d{4}\s?\d{4}\s?\d{4}\b' # nomor kartu kredit | |
| r')', | |
| re.IGNORECASE, | |
| ) | |
| def _is_pii_like(sentence: str) -> bool: | |
| """True jika kalimat didominasi data PII/alamat, bukan bahasa naratif.""" | |
| if _PII_ADDRESS_SKIP.search(sentence): | |
| return True | |
| # Kalimat yang lebih dari separuh tokennya angka/simbol → bukan naratif | |
| tokens = sentence.split() | |
| if not tokens: | |
| return False | |
| non_alpha = sum(1 for t in tokens if not re.search(r'[a-zA-Z]{2,}', t)) | |
| return non_alpha > len(tokens) * 0.5 | |
| def _align_norm_sentences(orig_sentences: list[tuple[str, int, int]], | |
| norm_text: str) -> dict[int, str]: | |
| """Pasangkan tiap kalimat asli (berdasarkan indeks) ke kalimat ternormalisasi. | |
| Diasumsikan koreksi Word Quality tidak mengubah batas kalimat (tanda .!?\\n), | |
| sehingga indeks kalimat antara teks asli dan ternormalisasi tetap sejajar. | |
| """ | |
| norm_sents = _split_sentences(norm_text) | |
| return {i: ns for i, (ns, _, _) in enumerate(norm_sents[:len(orig_sentences)])} | |
| def _split_sentences(text: str) -> list[tuple[str, int, int]]: | |
| """ | |
| Pecah teks menjadi (kalimat, start, end) dengan offset di teks asli. | |
| Hanya kalimat dengan jumlah kata dalam rentang [_MIN_WORDS, _MAX_WORDS] | |
| yang dikembalikan. Kalimat yang didominasi data PII/alamat dilewati | |
| karena bukan bahasa naratif dan rawan false positive. | |
| """ | |
| results: list[tuple[str, int, int]] = [] | |
| for m in _SENTENCE_SPLIT.finditer(text): | |
| raw = m.group() | |
| stripped = raw.strip() | |
| if not stripped: | |
| continue | |
| word_count = len(stripped.split()) | |
| if word_count < _MIN_WORDS or word_count > _MAX_WORDS: | |
| continue | |
| if _is_pii_like(stripped): | |
| continue | |
| # Sesuaikan offset agar menunjuk teks tanpa spasi pinggir | |
| lead = len(raw) - len(raw.lstrip()) | |
| start = m.start() + lead | |
| end = start + len(stripped) | |
| results.append((stripped, start, end)) | |
| return results | |
| def _make_shuffles(words: list[str], count: int, rng: random.Random) -> list[list[str]]: | |
| """ | |
| Hasilkan hingga `count` permutasi acak unik dari `words`, semuanya berbeda | |
| dari urutan asli. Mengembalikan list kosong hanya bila kata kurang dari dua, | |
| karena satu kata tidak punya urutan alternatif untuk dibandingkan. | |
| """ | |
| if len(words) < 2: | |
| return [] | |
| original = tuple(words) | |
| seen: set[tuple[str, ...]] = {original} | |
| shuffles: list[list[str]] = [] | |
| # Batasi percobaan agar tidak loop selamanya pada kalimat dengan kata berulang. | |
| for _ in range(count * 6): | |
| if len(shuffles) >= count: | |
| break | |
| candidate = words[:] | |
| rng.shuffle(candidate) | |
| key = tuple(candidate) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| shuffles.append(candidate) | |
| return shuffles | |
| # Kelas Utama | |
| class SyntaxChecker: | |
| """ | |
| Detektor urutan kata janggal Bahasa Indonesia berbasis perplexity IndoBERT. | |
| Contoh penggunaan:: | |
| chk = SyntaxChecker() | |
| chk.load() | |
| for f in chk.check("makan suka aku nasi goreng"): | |
| print(f.sentence, f.score, f.confidence) | |
| """ | |
| def __init__(self, use_ml: bool = True, model_id: str = _MODEL_ID) -> None: | |
| """ | |
| Args: | |
| use_ml: Aktifkan deteksi (butuh transformers + torch). Jika False | |
| atau dependensi tidak ada, check() selalu mengembalikan []. | |
| model_id: ID model HuggingFace masked-LM untuk skoring. | |
| """ | |
| self._use_ml = use_ml | |
| self._model_id = model_id | |
| self._tokenizer = None | |
| self._model = None | |
| self._torch = None | |
| # Model torch tidak thread-safe; lock menyerialkan inferensi saat beberapa | |
| # field (task/context/references) dievaluasi paralel di pipeline. | |
| self._ml_lock = threading.Lock() | |
| self._loaded = False | |
| # Public API | |
| def load(self) -> bool: | |
| """ | |
| Muat tokenizer + model masked-LM. | |
| Returns: | |
| True jika model berhasil dimuat (deteksi aktif), False jika tidak. | |
| """ | |
| if self._loaded: | |
| return self._model is not None | |
| self._loaded = True | |
| if not self._use_ml: | |
| return False | |
| try: | |
| import torch | |
| from transformers import AutoModelForMaskedLM, AutoTokenizer | |
| logger.info("Memuat model Syntax Checker '%s'...", self._model_id) | |
| self._torch = torch | |
| self._tokenizer = AutoTokenizer.from_pretrained(self._model_id) | |
| self._model = AutoModelForMaskedLM.from_pretrained(self._model_id) | |
| self._model.eval() | |
| logger.info("Syntax Checker siap (model IndoBERT aktif).") | |
| return True | |
| except Exception as exc: | |
| logger.warning("Gagal memuat Syntax Checker: %s — deteksi dinonaktifkan.", exc) | |
| self._model = None | |
| return False | |
| def check(self, text: str, language: str = "id", | |
| norm_text: str | None = None) -> list[SyntaxFinding]: | |
| """ | |
| Periksa teks dan kembalikan kalimat dengan urutan kata janggal. | |
| Args: | |
| text: Teks yang akan diperiksa. | |
| language: Penanda bahasa ("id"/"unknown"). | |
| norm_text: Teks ternormalisasi (koreksi Word Quality sudah diterapkan) | |
| untuk dipakai dalam scoring PLL. Offset dan tampilan kalimat | |
| di temuan tetap mengacu ke `text` asli. | |
| Returns: | |
| Daftar SyntaxFinding diurutkan berdasarkan posisi (start ascending). | |
| """ | |
| if not text or not text.strip(): | |
| return [] | |
| if not self._loaded: | |
| self.load() | |
| if self._model is None: | |
| return [] | |
| sentences = _split_sentences(text)[:_MAX_SENTENCES] | |
| if not sentences: | |
| return [] | |
| norm_map: dict[int, str] = {} | |
| if norm_text and norm_text.strip() != text.strip(): | |
| norm_map = _align_norm_sentences(sentences, norm_text) | |
| rng = random.Random(_RNG_SEED) | |
| findings: list[SyntaxFinding] = [] | |
| for i, (sentence, start, end) in enumerate(sentences): | |
| # Gunakan kalimat ternormalisasi untuk scoring agar kata alay/singkatan | |
| # tidak menyebabkan false positive urutan kata. | |
| score_sent = norm_map.get(i, sentence) | |
| words = score_sent.split() | |
| if len(words) < _MIN_WORDS: | |
| continue | |
| # Bangun permutasi acak yang berbeda dari urutan asli. | |
| shuffles = _make_shuffles(words, _SHUFFLE_COUNT, rng) | |
| if not shuffles: | |
| continue | |
| # Skor urutan asli + semua permutasi dalam satu forward pass. | |
| variants = [score_sent] + [" ".join(p) for p in shuffles] | |
| scores = self._pll_many(variants) | |
| orig_score = scores[0] | |
| shuffle_scores = [s for s in scores[1:] if s is not None] | |
| if orig_score is None or not shuffle_scores: | |
| continue | |
| # Fraksi permutasi acak yang lebih wajar daripada urutan asli. | |
| better = sum(1 for s in shuffle_scores if s > orig_score) | |
| ratio = better / len(shuffle_scores) | |
| if ratio < _RATIO_THRESHOLD: | |
| continue | |
| # Kepercayaan proporsional terhadap rasio (0.66→~0.55, 1.0→~0.85). | |
| confidence = max(0.50, min(0.85, 0.30 + ratio * 0.55)) | |
| findings.append(SyntaxFinding( | |
| sentence=sentence, # selalu tampilkan teks asli | |
| start=start, | |
| end=end, | |
| score=round(orig_score, 3), | |
| reason="Susunan kata kalimat ini terasa tidak wajar dan mungkin sulit " | |
| "dipahami AI. Periksa kembali urutan kata — pastikan mengikuti " | |
| "pola yang lazim (mis. subjek–predikat–objek).", | |
| confidence=round(confidence, 3), | |
| )) | |
| return findings | |
| # Properties | |
| def is_loaded(self) -> bool: | |
| """True jika checker sudah mencoba memuat model.""" | |
| return self._loaded | |
| def ml_active(self) -> bool: | |
| """True jika model berhasil dimuat dan deteksi aktif.""" | |
| return self._model is not None | |
| # Internal | |
| def _pll_many(self, variants: list[str]) -> list[float | None]: | |
| """ | |
| Hitung rata-rata pseudo-log-likelihood per token untuk beberapa kalimat | |
| sekaligus dalam SATU forward pass. | |
| Untuk setiap varian, tiap token (kecuali token spesial) di-mask bergantian; | |
| model memprediksi token aslinya. Semua baris ter-mask dari semua varian | |
| digabung ke satu batch agar efisien, lalu hasilnya dipisah kembali. | |
| Returns: | |
| Daftar skor rata-rata log-prob (sejajar dengan `variants`); elemen | |
| None untuk varian yang terlalu pendek / tak punya token termaskable. | |
| """ | |
| torch = self._torch | |
| tok = self._tokenizer | |
| model = self._model | |
| mask_id = tok.mask_token_id | |
| pad_id = tok.pad_token_id if tok.pad_token_id is not None else 0 | |
| if mask_id is None: | |
| return [None] * len(variants) | |
| special_ids = set(tok.all_special_ids) | |
| # Kumpulkan semua baris ter-mask dari semua varian. | |
| # Tiap entri: (variant_idx, masked_ids, true_token_id, pos) | |
| masked_rows: list[tuple[int, list[int], int, int]] = [] | |
| for vi, text in enumerate(variants): | |
| ids = tok(text, truncation=True, max_length=_MAX_TOKENS)["input_ids"] | |
| positions = [i for i, t in enumerate(ids) if t not in special_ids] | |
| for pos in positions: | |
| masked = list(ids) | |
| true_tok = masked[pos] | |
| masked[pos] = mask_id | |
| masked_rows.append((vi, masked, true_tok, pos)) | |
| if not masked_rows: | |
| return [None] * len(variants) | |
| max_len = max(len(m[1]) for m in masked_rows) | |
| n_rows = len(masked_rows) | |
| input_ids = torch.full((n_rows, max_len), pad_id, dtype=torch.long) | |
| attn = torch.zeros((n_rows, max_len), dtype=torch.long) | |
| for r, (_vi, masked, _tok, _pos) in enumerate(masked_rows): | |
| L = len(masked) | |
| input_ids[r, :L] = torch.tensor(masked, dtype=torch.long) | |
| attn[r, :L] = 1 | |
| with self._ml_lock, torch.no_grad(): | |
| logits = model(input_ids=input_ids, attention_mask=attn).logits | |
| sums: list[float] = [0.0] * len(variants) | |
| counts: list[int] = [0] * len(variants) | |
| for r, (vi, _masked, true_tok, pos) in enumerate(masked_rows): | |
| log_probs = torch.log_softmax(logits[r, pos], dim=-1) | |
| sums[vi] += log_probs[true_tok].item() | |
| counts[vi] += 1 | |
| return [ | |
| (sums[i] / counts[i]) if counts[i] else None | |
| for i in range(len(variants)) | |
| ] | |
| # Singleton | |
| _default_checker: SyntaxChecker | None = None | |
| def get_checker(use_ml: bool = True, load: bool = True) -> SyntaxChecker: | |
| """ | |
| Kembalikan instance SyntaxChecker singleton (lazy-initialized). | |
| Args: | |
| use_ml: Aktifkan deteksi berbasis ML. | |
| load: Jika True, panggil load() otomatis sebelum dikembalikan. | |
| """ | |
| global _default_checker | |
| if _default_checker is None: | |
| _default_checker = SyntaxChecker(use_ml=use_ml) | |
| if load and not _default_checker.is_loaded: | |
| _default_checker.load() | |
| return _default_checker | |
| # Demo CLI | |
| if __name__ == "__main__": | |
| import sys | |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") | |
| SAMPLES = [ | |
| "Aku suka makan nasi goreng.", # normal | |
| "Makan suka aku nasi goreng.", # janggal | |
| "Saya sudah menyelesaikan laporan ini.", # normal | |
| "Laporan ini saya sudah selesaikan.", # agak janggal | |
| "Tolong buatkan ringkasan artikel ini.", # normal | |
| "Ringkasan ini artikel buatkan tolong.", # janggal | |
| ] | |
| texts = sys.argv[1:] or SAMPLES | |
| chk = SyntaxChecker() | |
| if not chk.load(): | |
| print("[WARN] Model tidak tersedia — deteksi nonaktif.") | |
| sys.exit(0) | |
| print(f"Permutasi/kalimat: {_SHUFFLE_COUNT} | ambang rasio: {_RATIO_THRESHOLD:.0%}\n{'-' * 60}") | |
| for text in texts: | |
| findings = chk.check(text) | |
| print(f"\n> {text}") | |
| if not findings: | |
| print(" (urutan kata wajar)") | |
| for f in findings: | |
| print(f" [JANGGAL] skor={f.score:.2f} conf={f.confidence:.0%} {f.sentence!r}") | |