Spaces:
Sleeping
Sleeping
| import sys | |
| import os | |
| import re | |
| import json | |
| import math | |
| import logging | |
| from collections import Counter, defaultdict | |
| from datetime import datetime | |
| from typing import List, Dict, Tuple, Optional | |
| import nltk | |
| import numpy as np | |
| import pandas as pd | |
| from flask import Flask, request, jsonify | |
| try: | |
| from langdetect import detect | |
| except Exception: | |
| # Fallback sederhana jika langdetect tidak tersedia | |
| def detect(_text: str) -> str: | |
| return "id" | |
| # --- LIBRARY BARU (Deep Learning & Emoji) --- | |
| import emoji | |
| import torch | |
| from transformers import AutoTokenizer, AutoModel | |
| from sklearn.cluster import KMeans | |
| from sklearn.feature_extraction.text import TfidfVectorizer # Tetap butuh untuk fallback | |
| # NLTK & RAKE | |
| from nltk.corpus import stopwords | |
| from nltk.sentiment import SentimentIntensityAnalyzer | |
| from rake_nltk import Rake | |
| try: | |
| # Optional Indonesian stemmer (improves recall) | |
| from Sastrawi.Stemmer.StemmerFactory import StemmerFactory # type: ignore | |
| _sastrawi_factory = StemmerFactory() | |
| _sastrawi_stemmer = _sastrawi_factory.create_stemmer() | |
| def _stem_id(word: str) -> str: | |
| try: | |
| return _sastrawi_stemmer.stem(word) | |
| except Exception: | |
| return word | |
| except Exception: | |
| _sastrawi_stemmer = None | |
| def _stem_id(word: str) -> str: | |
| return word | |
| # Setup Logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Download NLTK resources safely - SKIP jika tidak perlu | |
| def ensure_nltk_safe(): | |
| """Check NLTK packages, skip download if missing (offline mode).""" | |
| needed = { | |
| "punkt": "tokenizers/punkt", | |
| "punkt_tab": "tokenizers/punkt_tab", | |
| "stopwords": "corpora/stopwords", | |
| } | |
| for pkg, path in needed.items(): | |
| try: | |
| nltk.data.find(path) | |
| print(f"✅ {pkg} ready") | |
| except LookupError: | |
| print(f"⚠️ {pkg} not found - continuing in offline mode") | |
| # Panggil tanpa download otomatis | |
| try: | |
| ensure_nltk_safe() | |
| print("=" * 60) | |
| except Exception as e: | |
| print(f"⚠️ NLTK check error: {e}") | |
| app = Flask(__name__) | |
| # Configuration | |
| API_KEY = os.getenv("FLASK_API_KEY", "rahasia-negara-123") # Gunakan env var | |
| SERVICE_VERSION = "1.2.0-bert-sarcasm" # Version bump | |
| # --- GLOBAL VARIABLES --- | |
| # Initialize SentimentIntensityAnalyzer safely (skip jika vader_lexicon tidak ada) | |
| try: | |
| sia = SentimentIntensityAnalyzer() | |
| print("✅ VADER sentiment analyzer ready") | |
| except Exception as e: | |
| print(f"⚠️ VADER not available, using custom lexicon only: {e}") | |
| sia = None | |
| STOPWORDS_ID_CHAT = set(stopwords.words('indonesian')) | set(stopwords.words('english')) | |
| _CHAT_FILLERS = { | |
| "sih", "dong", "kok", "kan", "tuh", "deh", "lah", "yah", "ni", "tu", | |
| "ya", "yak", "yuk", "loh", "masa", "mana", "tapi", "kalo", "kalau", | |
| "biar", "buat", "bikin", "bilang", "gak", "ga", "nggak", "enggak", | |
| "kagak", "tak", "ndak", "udah", "sudah", "blm", "belum", "pas", | |
| "lagi", "lg", "td", "tadi", "km", "kamu", "aku", "saya", "gw", "gue", | |
| "lu", "lo", "elu", "kita", "kalian", "mereka", "dia", "ini", "itu", | |
| "sini", "situ", "sana", "bgt", "banget", "aja", "saja", "cuma", | |
| "doang", "terus", "trs", "jd", "jadi", "karna", "karena", "krn", | |
| "bisa", "bs", "mau", "mo", "pengen", "ingin", "ada", "tiada", | |
| "sama", "dgn", "dengan", "dr", "dari", "ke", "di", "pd", "pada", | |
| "kapan", "dimana", "siapa", "mengapa", "kenapa", "gimana", "bagaimana", | |
| "wkwk", "haha", "hehe", "huhu", "anjir", "njir", "anjing", | |
| "apalah", "apa", "aduh", "wah", "nah", "kek", "kayak", "macam" | |
| } | |
| STOPWORDS_ID_CHAT.update(_CHAT_FILLERS) | |
| # ==== Integrasi TALA Stopwords tambahan ==== | |
| try: | |
| _TALA_PATH = os.path.join(os.path.dirname(__file__), 'tala-stopwords-indonesia.txt') | |
| if os.path.exists(_TALA_PATH): | |
| with open(_TALA_PATH, 'r', encoding='utf-8') as _tf: | |
| tala_words = {w.strip().lower() for w in _tf if w.strip() and not w.startswith('#')} | |
| # Hindari kata yang terlalu pendek (1 huruf) agar tidak over-filter | |
| tala_words = {w for w in tala_words if len(w) > 1} | |
| STOPWORDS_ID_CHAT.update(tala_words) | |
| logger.info(f"Loaded TALA stopwords: +{len(tala_words)} terms (total={len(STOPWORDS_ID_CHAT)})") | |
| else: | |
| logger.warning('TALA stopwords file not found, skipping integration.') | |
| except Exception as e: | |
| logger.warning(f'Failed loading TALA stopwords: {e}') | |
| # Lexicon sederhana untuk Indonesia/Kupang dalam range standar [-1, +1] | |
| ID_EXTRA = { | |
| # Emosi negatif umum | |
| "capek": -0.7, "capai": -0.5, "pusing": -0.7, "marah": -0.8, "sedih": -0.7, | |
| "murung": -0.7, "galau": -0.6, "bingung": -0.5, "takut": -0.7, "cemas": -0.7, | |
| "kecewa": -0.7, "kesal": -0.6, "jengkel": -0.6, "frustasi": -0.8, "frustrasi": -0.8, "depresi": -0.9, | |
| "stres": -0.8, "tegang": -0.6, "resah": -0.7, "gelisah": -0.7, "sendirian": -0.5, | |
| # Emosi positif umum | |
| "senang": 0.7, "bahagia": 0.8, "semangat": 0.7, "hepi": 0.7, "gembira": 0.8, | |
| "excited": 0.7, "antusias": 0.7, "optimis": 0.6, "tenang": 0.5, "damai": 0.6, | |
| "puas": 0.6, "lega": 0.6, "syukur": 0.7, "bangga": 0.7, | |
| # Masalah sekolah | |
| "telat": -0.6, "bolos": -0.8, "berantem": -0.9, "ribut": -0.7, "gaduh": -0.6, | |
| "berkelahi": -0.9, "bertengkar": -0.8, "keributan": -0.7, "masalah": -0.5, | |
| "PR": -0.3, "tugas": -0.2, "banyak": -0.2, "malas": -0.5, "rajin": 0.5, | |
| "skip": -0.6, "cabut": -0.6, "pontang": -0.7, "mangkir": -0.7, | |
| # Keluarga & rumah | |
| "berantem": -0.9, "cekcok": -0.8, "bertengkar": -0.8, "marahan": -0.7, | |
| "berisik": -0.5, "berantakan": -0.4, "kacau": -0.7, "chaos": -0.7, | |
| "pisah": -0.7, "bercerai": -0.8, "kabur": -0.7, "minggat": -0.8, "pergi": -0.3, | |
| # Kupang/Manado dialect dengan sentiment | |
| "sonde": -0.3, "tara": -0.2, "teda": -0.2, "pigi": -0.1, # Kupang negation/pergi | |
| "kaco": -0.5, "cungkel": -0.5, "bongkar": -0.2, "kobo": -0.4, "susa": -0.6, | |
| "dolo": -0.4, "molo": -0.4, "so": -0.3, "nda": -0.3, # Manado negation | |
| "bodo": -0.6, "bodoh": -0.7, "tolol": -0.8, "goblok": -0.8, # Insults | |
| # Neutral pronouns (score 0 won't affect sentiment) | |
| "beta": 0.0, "ko": 0.0, "torang": 0.0, "katong": 0.0, "deng": 0.0, | |
| "dong": 0.0, "de": 0.0, "so": 0.0, "pe": 0.0, "pung": 0.0, | |
| "tanta": 0.0, "oma": 0.0, "opa": 0.0, "mama": 0.0, "papa": 0.0, | |
| } | |
| # tambahkan ke VADER (jika available) | |
| if sia: | |
| sia.lexicon.update({k.lower(): v for k, v in ID_EXTRA.items()}) | |
| app = Flask(__name__) | |
| API_KEY = os.environ.get("ML_API_KEY") # optional | |
| FEEDBACK_FILE = os.environ.get("ML_FEEDBACK_FILE", os.path.join(os.path.dirname(__file__), "feedback_weights.json")) | |
| LEXICON_DIR = os.environ.get("ML_LEXICON_DIR", os.path.join(os.path.dirname(__file__), "lexicons")) | |
| ENABLE_BERT = os.environ.get("ML_ENABLE_BERT", "false").lower() in ("1","true","yes") | |
| BERT_MODEL_NAME = os.environ.get("ML_BERT_MODEL", "indobenchmark/indobert-base-p1") | |
| ENABLE_BERT_WARMUP = os.environ.get("ML_BERT_WARMUP", "false").lower() in ("1","true","yes") | |
| BERT_L2_NORMALIZE = os.environ.get("ML_BERT_L2_NORMALIZE", "true").lower() in ("1", "true", "yes") | |
| SERVICE_VERSION = os.environ.get("ML_VERSION", "ml-rasaya:2025.11.0") | |
| def check_key(): | |
| if API_KEY: | |
| # accept both header casings/variants for compatibility | |
| key = request.headers.get("X-API-KEY") or request.headers.get("X-API-Key") | |
| if key != API_KEY: | |
| return False | |
| return True | |
| def detect_lang(txt, hint=None): | |
| if hint: | |
| return hint | |
| try: | |
| return detect(txt) if txt and txt.strip() else "id" | |
| except Exception: | |
| return "id" | |
| def label_from_score(compound: float) -> str: | |
| if compound >= 0.05: return "positif" | |
| if compound <= -0.05: return "negatif" | |
| return "netral" | |
| # Legacy default map removed in favor of taxonomy-derived categories | |
| def load_feedback_weights(): | |
| try: | |
| with open(FEEDBACK_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception: | |
| return {} | |
| def save_feedback_weights(weights: dict): | |
| try: | |
| with open(FEEDBACK_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(weights, f, ensure_ascii=False, indent=2) | |
| except Exception: | |
| pass | |
| def score_categories_for_text(txt: str, categories_map: dict, feedback: dict): | |
| """Scoring kategori berbasis token & n-gram. | |
| - Tokenize + optional stemming (Sastrawi) untuk generalisasi. | |
| - Match unigram/bigram/trigram secara exact (bukan substring bebas). | |
| - Bobot dasar dibagi oleh banyaknya kategori yang memakai keyword (1/n_cats). | |
| - Boost n-gram (bi=1.4x, tri=1.6x), downweight token sangat pendek (<=3: 0.5x). | |
| - Tambahkan feedback weight jika ada, lalu normalisasi ke proporsi total. | |
| """ | |
| clean = clean_text(txt) | |
| toks = _tokenize_and_stem(clean) | |
| uni, bi, tri = _build_ngram_sets(toks) | |
| # Invert index: keyword -> categories | |
| inv = defaultdict(list) | |
| for cat, kws in categories_map.items(): | |
| for kw in kws: | |
| k = (kw or '').strip().lower() | |
| if k: | |
| inv[k].append(cat) | |
| scores = {cat: 0.0 for cat in categories_map.keys()} | |
| reasons = defaultdict(list) | |
| for kw, cats in inv.items(): | |
| parts = [p for p in kw.split() if p] | |
| parts_stem = [_stem_id(p) for p in parts] | |
| gram = len(parts_stem) | |
| present = False | |
| if gram == 1: | |
| present = parts_stem[0] in uni | |
| elif gram == 2: | |
| present = (parts_stem[0] + ' ' + parts_stem[1]) in bi | |
| else: | |
| seq = ' '.join(parts_stem[:3]) | |
| present = seq in tri if len(parts_stem) >= 3 else False | |
| if not present: | |
| continue | |
| base = 1.0 / max(1, len(cats)) | |
| if gram == 1 and len(parts_stem[0]) <= 3: | |
| base *= 0.5 | |
| if gram == 2: | |
| base *= 1.4 | |
| elif gram >= 3: | |
| base *= 1.6 | |
| for cat in cats: | |
| adj = base + float(feedback.get(kw, {}).get(cat, 0.0)) | |
| scores[cat] += adj | |
| reasons[cat].append(kw) | |
| total = sum(scores.values()) | |
| if total > 0: | |
| for k in scores.keys(): | |
| scores[k] = round(scores[k] / total, 4) | |
| return scores, {k: sorted(set(v))[:5] for k, v in reasons.items()} | |
| """ | |
| Cleaning & Lexicon Loader (InSet + optional Barasa) | |
| """ | |
| # Regex patterns | |
| _RE_URL = re.compile(r"https?://\S+|www\.\S+") | |
| _RE_MENTION = re.compile(r"[@#]\w+") | |
| _RE_REPEAT = re.compile(r"(.)\1{2,}") # 3 kali atau lebih | |
| _RE_MULTISPACE = re.compile(r"\s+") | |
| def clean_text(t: str) -> str: | |
| """ | |
| Cleaning text tapi mempertahankan emoji dan tanda baca penting untuk sentimen. | |
| """ | |
| if not t: return "" | |
| # 1. Demojize: Ubah emoji jadi teks bahasa Indonesia (manual mapping dikit) | |
| t = emoji.demojize(t, delimiters=(" ", " ")) | |
| t = t.replace("loudly_crying_face", "menangis") \ | |
| .replace("crying_face", "sedih") \ | |
| .replace("pensive_face", "murung") \ | |
| .replace("angry_face", "marah") \ | |
| .replace("rolling_on_the_floor_laughing", "tertawa") \ | |
| .replace("face_with_rolling_eyes", "bosan") \ | |
| .replace("broken_heart", "patah hati") | |
| t = t.lower().strip() | |
| # 2. Remove URL & Mention | |
| t = _RE_URL.sub(" ", t) | |
| t = _RE_MENTION.sub(" ", t) | |
| # 3. Keep punctuation important for emotion (?!.,) | |
| # Hapus karakter aneh selain alphanumeric dan tanda baca penting | |
| t = re.sub(r"[^a-z0-9\?\!\.\,\s]", " ", t) | |
| # Pisahkan tanda baca biar jadi token terpisah | |
| t = re.sub(r"([\?\!\.\,])", r" \1 ", t) | |
| # 4. Normalize Repeat (bangeeet -> banget) | |
| t = _RE_REPEAT.sub(r"\1", t) | |
| # 5. Slang & Dialect Normalization (Indonesian + Kupang + Manado + Ambon) | |
| dialect = { | |
| # Standard Indonesian slang | |
| "gw": "saya", "gue": "saya", "lu": "kamu", "lo": "kamu", "elu": "kamu", | |
| "ak": "aku", "aq": "aku", "sy": "saya", "w": "saya", "ane": "saya", | |
| "gak": "tidak", "ga": "tidak", "nggak": "tidak", "kaga": "tidak", "ndak": "tidak", | |
| "enggak": "tidak", "engga": "tidak", "ngga": "tidak", "kagak": "tidak", | |
| "krn": "karena", "karna": "karena", "bgt": "banget", "bgtt": "banget", | |
| "tdk": "tidak", "jgn": "jangan", "udh": "sudah", "sdh": "sudah", | |
| "blm": "belum", "trus": "terus", "jd": "jadi", "dgn": "dengan", | |
| "sm": "sama", "yg": "yang", "kalo": "kalau", "kl": "kalau", | |
| "mager": "malas gerak", "baper": "bawa perasaan", "gabut": "bosan", | |
| "anjir": "kaget", "njir": "kaget", "anjay": "hebat", | |
| "mantul": "mantap", "santuy": "santai", "sans": "santai", | |
| "gajelas": "tidak jelas", "gaje": "tidak jelas", | |
| # Kupang/NTT dialect | |
| # --- KATA GANTI ORANG (PRONOUNS) --- | |
| "beta": "saya", "b": "saya", "bt": "saya", # Kupang/Ambon | |
| "kita": "saya", # Manado (konteks santai) | |
| "ana": "saya", "awak": "saya", "sa": "saya", "sy": "saya", | |
| "ak": "aku", "aq": "aku", "gw": "saya", "gue": "saya", | |
| "lu": "kamu", "lo": "kamu", "elu": "kamu", | |
| "ose": "kamu", "os": "kamu", "ale": "kamu", # Ambon | |
| "ngana": "kamu", "nga": "kamu", # Manado | |
| "ko": "kamu", "kau": "kamu", "ju": "kamu", # Kupang/Papua | |
| "bo": "kamu", # Bima/Dompu kadang masuk | |
| "dia": "dia", "de": "dia", "i": "dia", # Papua/Kupang (De pung rumah) | |
| "antua": "beliau", # Ambon (respektif) | |
| "katong": "kita", "ketong": "kita", "ktg": "kita", # Kupang/Ambon | |
| "torang": "kita", "tong": "kita", # Manado/Papua | |
| "dorang": "mereka", "dong": "mereka", "drg": "mereka", # Manado/Kupang/Ambon | |
| "besong": "kalian", "basong": "kalian", "kamorang": "kalian", # Kupang/Papua | |
| "ngoni": "kalian", # Manado | |
| # --- NEGASI (TIDAK/BUKAN) --- | |
| "sonde": "tidak", "son": "tidak", "snd": "tidak", "sond": "tidak", # Kupang | |
| "seng": "tidak", "sing": "tidak", "tra": "tidak", "trada": "tidak", # Ambon/Papua | |
| "tara": "tidak", "tar": "tidak", | |
| "nyanda": "tidak", "nda": "tidak", "ndak": "tidak", # Manado/Jawa | |
| "gak": "tidak", "ga": "tidak", "nggak": "tidak", "kaga": "tidak", | |
| "bukang": "bukan", | |
| # --- KATA KERJA & KETERANGAN (VERBS & ADVERBS) --- | |
| "pi": "pergi", "p": "pergi", "pig": "pergi", # Kupang/Ambon (saya kabur 'pi'...) | |
| "su": "sudah", "so": "sudah", # Kupang/Manado/Ambon | |
| "sdh": "sudah", "udh": "sudah", "udah": "sudah", | |
| "blm": "belum", "balom": "belum", | |
| "mo": "mau", "mau": "mau", | |
| "kasi": "beri", "kase": "beri", "kas": "beri", # Kase tinggal -> Beri tinggal | |
| "omong": "bicara", "baomong": "bicara", "bakata": "berkata", | |
| "dapa": "dapat", "dap": "dapat", | |
| "baku": "saling", # Baku pukul -> Saling pukul | |
| "bae": "baik", "baek": "baik", | |
| "ancor": "hancur", | |
| "ambe": "ambil", "pigi": "pergi", | |
| # --- KEPEMILIKAN & PENGHUBUNG --- | |
| "pung": "punya", "puny": "punya", "pu": "punya", "pe": "punya", # Beta pung -> Saya punya | |
| "deng": "dengan", "dg": "dengan", "dng": "dengan", | |
| "par": "untuk", "for": "untuk", # Ambon/Manado (For ngana) | |
| "vor": "untuk", | |
| "kek": "seperti", "mcam": "macam", "kek": "kayak", | |
| # --- KATA SIFAT & LAINNYA --- | |
| "talalu": "terlalu", "tlalu": "terlalu", | |
| "sadiki": "sedikit", "sadikit": "sedikit", | |
| "banya": "banyak", | |
| "skali": "sekali", | |
| "samua": "semua", | |
| "karna": "karena", "krn": "karena", "gara": "karena", | |
| # --- GENERAL SLANG INDONESIA --- | |
| "bgt": "banget", "bgtt": "banget", | |
| "trus": "terus", "trs": "terus", | |
| "jd": "jadi", "jdi": "jadi", | |
| "yg": "yang", "kalo": "kalau", "kl": "kalau", | |
| "mager": "malas gerak", "baper": "bawa perasaan", "gabut": "bosan", | |
| "anjir": "kaget", "njir": "kaget", "anjay": "hebat", | |
| "mantul": "mantap", "santuy": "santai", "sans": "santai", | |
| "gajelas": "tidak jelas", "gaje": "tidak jelas", | |
| "ortu": "orang tua", "mksd": "maksud", | |
| "knp": "kenapa", "np": "kenapa", "napa": "kenapa", | |
| "utk": "untuk" | |
| } | |
| toks = [] | |
| for tk in t.split(): | |
| toks.append(dialect.get(tk, tk)) | |
| t = " ".join(toks) | |
| t = _RE_MULTISPACE.sub(" ", t).strip() | |
| return t | |
| # Tokenization + optional stemming helpers | |
| def _tokenize_and_stem(t: str) -> list[str]: | |
| toks = [w for w in t.split() if w] | |
| if _sastrawi_stemmer is None: | |
| return toks | |
| return [_stem_id(w) for w in toks] | |
| def _build_ngram_sets(tokens: list[str]) -> tuple[set[str], set[str], set[str]]: | |
| uni = set(tokens) | |
| bi = set([tokens[i] + " " + tokens[i+1] for i in range(len(tokens)-1)]) if len(tokens) >= 2 else set() | |
| tri = set([tokens[i] + " " + tokens[i+1] + " " + tokens[i+2] for i in range(len(tokens)-2)]) if len(tokens) >= 3 else set() | |
| return uni, bi, tri | |
| def detect_sarcasm_heuristic(text_clean, raw_text, current_sentiment): | |
| """ | |
| Mendeteksi potensi sarkasme berdasarkan kontras sentimen, emoji, dan tanda baca. | |
| Returns: (is_sarcasm: bool, confidence: float) | |
| """ | |
| is_sarcasm = False | |
| confidence = 0.0 | |
| text_clean = text_clean.lower() | |
| # Kamus Heuristik | |
| intensifiers = ["banget", "bgt", "kali", "sumpah", "bener", "bet", "parah", "amat"] | |
| positives = ["hebat", "bagus", "pinter", "jenius", "mantap", "enak", "keren", "rajin", "suci"] | |
| negatives = ["pusing", "capek", "stres", "gila", "mati", "rusak", "hancur", "sebel", "benci", "malas", "bodoh", "tolol"] | |
| # Fitur | |
| has_pos = any(p in text_clean for p in positives) | |
| has_neg = any(n in text_clean for n in negatives) | |
| has_intensifier = any(i in text_clean for i in intensifiers) | |
| has_exclamation = "!" in raw_text or "?" in raw_text | |
| # LOGIC 1: Kalimat mengandung Positif DAN Negatif ("Hebat banget lo bikin gue stres") | |
| if has_pos and has_neg: | |
| return True, 0.75 | |
| # LOGIC 2: Kalimat Positif + Tanda baca agresif + Konteks ambigu ("Pinter ya lo??") | |
| # Biasanya kalau muji beneran jarang pake '??' | |
| if has_pos and ("??" in raw_text or "!!" in raw_text): | |
| return True, 0.6 | |
| # LOGIC 3: Positif + Emoji Negatif (Manual check raw text for common sarcastic emojis) | |
| # Emoji: Rolling eyes, Unamused face, Upside-down face | |
| sarcastic_emojis = ["🙄", "😒", "🙃", "😤", "🤡"] | |
| if has_pos and any(e in raw_text for e in sarcastic_emojis): | |
| return True, 0.9 | |
| return False, 0.0 | |
| def load_inset_lexicon(base_dir: str) -> dict[str, float]: | |
| """Load InSet format: lexicons/inset/{positive.tsv,negative.tsv}.""" | |
| out: dict[str, float] = {} | |
| inset_dir = os.path.join(base_dir, "inset") | |
| pos = os.path.join(inset_dir, "positive.tsv") | |
| neg = os.path.join(inset_dir, "negative.tsv") | |
| if os.path.exists(pos): | |
| with open(pos, "r", encoding="utf-8") as f: | |
| for line in f: | |
| w = line.strip().split("\t")[0] | |
| if w: | |
| out[w.lower()] = 1.0 | |
| if os.path.exists(neg): | |
| with open(neg, "r", encoding="utf-8") as f: | |
| for line in f: | |
| w = line.strip().split("\t")[0] | |
| if w: | |
| out[w.lower()] = -1.0 | |
| return out | |
| def load_barasa_csv(path: str) -> dict[str, float]: | |
| """Load Barasa CSV with headers; expects at least a 'lemma' column and | |
| either a 'score' column (float, negative to positive) or separate | |
| 'pos'/'neg' columns that can be combined (score = pos - neg). | |
| Values are clamped to [-1, 1]. | |
| """ | |
| lex: dict[str, float] = {} | |
| try: | |
| import csv | |
| with open(path, encoding="utf-8") as f: | |
| r = csv.DictReader(f) | |
| for row in r: | |
| lemma = (row.get("lemma") or row.get("word") or row.get("token") or "").strip().lower() | |
| if not lemma: | |
| continue | |
| score_val = None | |
| # Prefer unified score | |
| if row.get("score") not in (None, ""): | |
| try: | |
| score_val = float(row.get("score")) | |
| except Exception: | |
| score_val = None | |
| # Else try pos/neg columns | |
| if score_val is None: | |
| try: | |
| pos = float(row.get("pos") or row.get("positive") or 0) | |
| neg = float(row.get("neg") or row.get("negative") or 0) | |
| score_val = pos - neg | |
| except Exception: | |
| score_val = 0.0 | |
| score_val = max(-1.0, min(1.0, float(score_val))) | |
| lex[lemma] = score_val | |
| except Exception: | |
| pass | |
| return lex | |
| def load_barasa_optional(base_dir: str) -> dict[str, float]: | |
| """ | |
| Try to read Barasa resources if available. The provided file wn-msa-all.tab | |
| is a WordNet-style tab file (no explicit polarity). We don't assign scores | |
| from it directly; instead we just return empty dict so it doesn't affect | |
| sentiment unless in the future we add mapping rules. | |
| If you later provide barasa.csv (word,score), we can extend this loader. | |
| """ | |
| barasa_dir = os.path.join(base_dir, "barasa") | |
| wn_file = os.path.join(barasa_dir, "wn-msa-all.tab") | |
| # Placeholder: no direct sentiment; return empty for now. | |
| # Future: map synonyms of existing sentiment words and inherit score * 0.8 | |
| if os.path.exists(wn_file): | |
| return {} | |
| # also support barasa.csv if added by user | |
| csv_file = os.path.join(base_dir, "barasa.csv") | |
| if os.path.exists(csv_file): | |
| out: dict[str, float] = {} | |
| with open(csv_file, "r", encoding="utf-8") as f: | |
| for line in f: | |
| if "," in line: | |
| w, sc = line.strip().split(",", 1) | |
| try: | |
| out[w.lower()] = max(-1.0, min(1.0, float(sc))) | |
| except Exception: | |
| continue | |
| return out | |
| return {} | |
| def build_lexicon() -> dict[str, float]: | |
| # Start from InSet if available | |
| lex = load_inset_lexicon(LEXICON_DIR) | |
| # Merge Barasa if CSV provided; else try optional WordNet source (no polarity) | |
| barasa_csv = os.path.join(LEXICON_DIR, "barasa", "barasa_lexicon.csv") | |
| if os.path.exists(barasa_csv): | |
| lex.update(load_barasa_csv(barasa_csv)) | |
| else: | |
| bar = load_barasa_optional(LEXICON_DIR) | |
| lex.update(bar) | |
| # Add custom Kupang/ID extra (sudah dalam range [-1, +1]) | |
| for k, v in ID_EXTRA.items(): | |
| lex[k.lower()] = max(-1.0, min(1.0, float(v))) | |
| return lex | |
| LEXICON_ID = build_lexicon() | |
| def score_with_lexicon(text: str, lex: Dict[str, float]) -> float: | |
| toks = clean_text(text).split() | |
| if not toks: | |
| return 0.0 | |
| # Context-aware scoring: handle negation (pre & post), intensifiers | |
| negation_words = {"tidak", "bukan", "belum", "jangan", "tanpa", "sonde", "tara", "teda", "nda", "tra"} | |
| intensifiers = {"banget", "sangat", "amat", "sekali", "parah", "bener", "pisan"} | |
| s = 0.0 | |
| neg_window = 0 # number of next tokens to negate | |
| intensify = 1.0 | |
| # track last scored token to handle patterns like "paham ... belum" | |
| last_score_val = 0.0 | |
| last_score_idx = -10 | |
| for i, tok in enumerate(toks): | |
| # Negation token: start negation window and optionally flip previous positive nearby | |
| if tok in negation_words: | |
| # If a positive word occurred recently (within 2 tokens), flip it retroactively | |
| if last_score_val > 0 and (i - last_score_idx) <= 2: | |
| # subtract a bit more than added to reflect negation of previous positive | |
| s -= last_score_val * 1.2 | |
| last_score_val = 0.0 | |
| neg_window = 3 | |
| continue | |
| # Intensifier affects next scored word only | |
| if tok in intensifiers: | |
| intensify = 1.5 | |
| continue | |
| # Base lexical score | |
| score = lex.get(tok, 0.0) | |
| # Apply active negation window | |
| if neg_window > 0 and score != 0.0: | |
| score = -score * 0.8 | |
| neg_window -= 1 | |
| elif neg_window > 0: | |
| # consume window even if current token has no score | |
| neg_window -= 1 | |
| # Apply intensifier | |
| if intensify > 1.0 and score != 0.0: | |
| score = score * intensify | |
| intensify = 1.0 | |
| s += score | |
| if score != 0.0: | |
| last_score_val = score | |
| last_score_idx = i | |
| # Dampen by sqrt length to avoid bias for long texts | |
| normalized = s / max(1.0, math.sqrt(len(toks))) | |
| return max(-1.0, min(1.0, normalized)) | |
| INTENSIFIERS = {"banget": 1.0, "sangat": 0.8, "parah": 0.9, "amat": 0.5} | |
| def negative_gate(aggregate: float, raw_txt: str) -> tuple[bool, float]: | |
| # severity from magnitude + intensifiers + punctuation and repeats | |
| clean = clean_text(raw_txt) | |
| toks = clean.split() | |
| intens = sum(INTENSIFIERS.get(t, 0.0) for t in toks) | |
| exclam = min(raw_txt.count("!"), 3) * 0.1 | |
| repeat = 0.1 if _RE_REPEAT.search(raw_txt) else 0.0 | |
| sev = max(0.0, min(1.0, (-aggregate) * 0.7 + intens * 0.2 + exclam + repeat)) | |
| return (aggregate <= -0.05), round(sev, 3) | |
| # ===================== | |
| # Taxonomy (topics/subtopics) for semi-supervised labeling | |
| # ===================== | |
| TAXONOMY_PATH = os.path.join(os.path.dirname(__file__), "taxonomy.json") | |
| _TAX_MTIME: float = -1.0 | |
| try: | |
| with open(TAXONOMY_PATH, "r", encoding="utf-8") as _f: | |
| _TAX = json.load(_f) | |
| except Exception: | |
| _TAX = {"topics": []} | |
| def _reload_taxonomy_if_changed(force: bool = False) -> None: | |
| """Reload taxonomy.json if file changed. | |
| Needed so DB soft-deletes / is_active toggles (synced into taxonomy.json) | |
| take effect without restarting the ML service. | |
| """ | |
| global _TAX, _TAX_MTIME, BUCKET_KW, SUBTOPICS | |
| try: | |
| mtime = os.path.getmtime(TAXONOMY_PATH) | |
| except Exception: | |
| return | |
| if (not force) and _TAX_MTIME >= 0 and mtime <= _TAX_MTIME: | |
| return | |
| try: | |
| with open(TAXONOMY_PATH, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if not isinstance(data, dict): | |
| return | |
| _TAX = data | |
| _TAX_MTIME = mtime | |
| # refresh derived indices (best-effort) | |
| try: | |
| BUCKET_KW, SUBTOPICS = _taxonomy_keywords() | |
| except Exception: | |
| pass | |
| logger.info("taxonomy.json reloaded", extra={"mtime": mtime}) | |
| except Exception as e: | |
| logger.warning(f"Failed reloading taxonomy.json: {e}") | |
| def _taxonomy_keywords(): | |
| buckets = {} | |
| subtopics = {} | |
| for tp in _TAX.get("topics", []): | |
| bucket = tp.get("bucket") or "" | |
| topic_id = tp.get("id") or bucket or "TOPIC" | |
| topic_name = tp.get("name") or topic_id | |
| buckets.setdefault(bucket, set()).update([str(w).lower() for w in tp.get("keywords", []) if w]) | |
| for st in tp.get("subtopics", []) or []: | |
| # Maintain internal id (taxonomy id) and external 'code' matching kategori_masalahs.kode | |
| st_id = st.get("id") or st.get("code") or st.get("name") | |
| st_code = st.get("code") or st_id | |
| if not st_id: | |
| continue | |
| subtopics[st_id] = { | |
| "name": st.get("name") or st_id, | |
| "bucket": bucket, | |
| "topic_id": topic_id, | |
| "topic_name": topic_name, | |
| "code": st_code, | |
| "keywords": set([str(w).lower() for w in st.get("keywords", []) if w]), | |
| "examples": st.get("examples", []) or [] | |
| } | |
| return buckets, subtopics | |
| BUCKET_KW, SUBTOPICS = _taxonomy_keywords() | |
| def build_topic_index_and_categories_map(): | |
| """HYBRID APPROACH (OLD METHOD + NEW DATA): | |
| Builds multi-level keyword matching dengan data dari database. | |
| Returns: (topic_index, categories_map, bucket_map) | |
| - topic_index: metadata per kategori kecil {UPPER(name): {id, name, bucket, kode}} | |
| - categories_map: keywords per kategori kecil {UPPER(name): [keywords]} | |
| - bucket_map: keywords per kategori besar {UPPER(bucket): [aggregated keywords]} | |
| WHY THIS IS BETTER: | |
| - Multi-level matching: Check keywords di kategori kecil DAN kategori besar | |
| - Redundancy: Jika miss di kategori kecil, bisa match di bucket agregat | |
| - Better coverage: Keywords dari semua kategori kecil teragregasi ke bucket | |
| """ | |
| topic_index = {} | |
| categories_map = {} | |
| bucket_map = defaultdict(set) # Agregasi keywords per bucket | |
| # Process topics (kategori kecil) dari database | |
| for tp in _TAX.get("topics", []): | |
| topic_id = tp.get("id") or tp.get("code") or "TOPIC" | |
| topic_name = tp.get("name") or topic_id | |
| bucket = tp.get("bucket") or "" | |
| key = str(topic_name).upper() | |
| # Collect keywords from topic level (kategori kecil) | |
| kw = set([str(w).lower().strip() for w in (tp.get("keywords") or []) if w]) | |
| # Legacy support: subtopics (backward compatibility) | |
| for st in tp.get("subtopics", []) or []: | |
| for w in st.get("keywords", []) or []: | |
| if w: | |
| kw.add(str(w).lower().strip()) | |
| # Store kategori kecil metadata & keywords | |
| topic_index[key] = { | |
| "id": topic_id, | |
| "name": topic_name, | |
| "bucket": bucket, | |
| "kode": topic_id # Match dengan kategori_masalahs.kode | |
| } | |
| categories_map[key] = sorted(list(kw)) | |
| # AGGREGATE keywords ke bucket (kategori besar) | |
| # Ini yang bikin metode lama lebih akurat! | |
| if bucket: | |
| bucket_map[bucket.upper()].update(kw) | |
| # Convert bucket_map sets to sorted lists | |
| bucket_keywords = {k: sorted(list(v)) for k, v in bucket_map.items()} | |
| return topic_index, categories_map, bucket_keywords | |
| def extract_keyphrases(texts, lang="id"): | |
| # RAKE pakai stopwords bhs Inggris default; untuk id sederhana kita kasih stopwords id juga | |
| sw = set(stopwords.words('indonesian')) | set(stopwords.words('english')) | |
| r = Rake(stopwords=sw) | |
| joined = " . ".join(texts) | |
| r.extract_keywords_from_text(joined) | |
| ranked = r.get_ranked_phrases_with_scores() | |
| out = [] | |
| for score, phrase in ranked[:20]: | |
| out.append({"term": phrase, "weight": float(score)}) | |
| return out | |
| def extract_core_tokens(texts): | |
| """Ambil token inti dengan pembersihan: | |
| - lower & clean_text | |
| - buang stopwords (ID + EN) & filler umum | |
| - buang token panjang < 3 | |
| - hitung frekuensi, ambil top 10 | |
| """ | |
| freq = Counter() | |
| try: | |
| sw_id = set(stopwords.words('indonesian')) | |
| except Exception: | |
| sw_id = set() | |
| try: | |
| sw_en = set(stopwords.words('english')) | |
| except Exception: | |
| sw_en = set() | |
| filler = { | |
| 'dan','atau','yang','di','ke','dengan','pada','untuk','dari','lagi','sih','deh','lah','ya','kok','kan','udah','aja','pun','itu','ini','jadi','kalau','kalo','bahwa','sementara','sering','kayak','kayakny','nih','tuh','dong','de','si','mungkin','masih','bisa','harus','karena','seperti','kaya','gitu','buat' | |
| } | |
| for t in texts: | |
| for tok in clean_text(t).split(): | |
| if len(tok) < 3: continue | |
| if tok in sw_id or tok in sw_en or tok in filler: continue | |
| freq[tok] += 1 | |
| return [w for w,_ in freq.most_common(10)] | |
| def _build_cluster_vectorizer(): | |
| """Vectorizer for clustering top-terms: single-word tokens, heavy stopwords cleanup.""" | |
| try: | |
| sw_id = set(stopwords.words('indonesian')) | |
| except Exception: | |
| sw_id = set() | |
| try: | |
| sw_en = set(stopwords.words('english')) | |
| except Exception: | |
| sw_en = set() | |
| extra = { | |
| # connectors/intensifiers/pronouns/common fillers | |
| 'dan','atau','yang','di','ke','dengan','pada','untuk','dari','lagi','banget','sekali','paling','sih','deh','dong','lah','ya', | |
| 'aku','saya','gue','gua','dia','kamu','kau','ko','kami','kita','mereka', | |
| 'punya','dengar','dng','sm','nih','tuh','kok','kan','udah','lagi','aja','de','si', | |
| } | |
| stopset = sw_id | sw_en | extra | |
| # Use our cleaner as preprocessor; single-word tokens only | |
| vec = TfidfVectorizer( | |
| preprocessor=clean_text, | |
| tokenizer=str.split, | |
| token_pattern=None, | |
| lowercase=True, | |
| stop_words=list(stopset), | |
| ngram_range=(1,1), | |
| max_df=0.95, | |
| min_df=1, | |
| max_features=1000, | |
| ) | |
| return vec | |
| def health(): | |
| return jsonify({"status": "ok", "version": SERVICE_VERSION, "bert": ENABLE_BERT}) | |
| # ===================== | |
| # IndoBERT caching & optional warmup | |
| # ===================== | |
| BERT_CACHE = {"tok": None, "mdl": None, "device": "cpu"} | |
| def _l2_normalize_rows_dense(X: np.ndarray, eps: float = 1e-12) -> np.ndarray: | |
| """Row-wise L2 normalization for dense matrices. | |
| This is used to make IndoBERT embeddings comparable for distance-based | |
| clustering (KMeans) and to reduce magnitude effects. | |
| """ | |
| if X is None: | |
| return X | |
| X = np.asarray(X, dtype=np.float32) | |
| if X.ndim != 2 or X.shape[0] == 0: | |
| return X | |
| norms = np.linalg.norm(X, axis=1, keepdims=True) | |
| norms = np.maximum(norms, eps) | |
| return X / norms | |
| # --- GLOBAL BERT VARIABLES --- | |
| _bert_tokenizer = None | |
| _bert_model = None | |
| _bert_device = None | |
| def get_bert(): | |
| global _bert_tokenizer, _bert_model, _bert_device | |
| if _bert_tokenizer is None: | |
| print("⏳ Loading IndoBERT model... (First run might take a while)") | |
| try: | |
| model_name = "indobenchmark/indobert-base-p1" | |
| _bert_tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| _bert_model = AutoModel.from_pretrained(model_name) | |
| _bert_device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| _bert_model.to(_bert_device) | |
| _bert_model.eval() | |
| print(f"✅ IndoBERT loaded on {_bert_device}") | |
| except Exception as e: | |
| print(f"❌ Failed to load IndoBERT: {e}") | |
| return None, None, None | |
| return _bert_tokenizer, _bert_model, _bert_device | |
| # Warmup at startup if requested (download/load once) | |
| if ENABLE_BERT and ENABLE_BERT_WARMUP: | |
| tok, mdl, dev = get_bert() | |
| try: | |
| if tok is not None and mdl is not None: | |
| import torch # type: ignore | |
| with torch.no_grad(): | |
| enc = tok(["warmup"], padding=True, truncation=True, max_length=16, return_tensors="pt") | |
| _ = mdl(**enc.to(dev)) | |
| except Exception: | |
| pass | |
| def warmup(): | |
| """Optionally trigger BERT load and a tiny forward pass to avoid first-request latency.""" | |
| if not ENABLE_BERT: | |
| return jsonify({"bert": "disabled"}) | |
| tok, mdl, dev = get_bert() | |
| if tok is None or mdl is None: | |
| return jsonify({"bert": "unavailable"}), 500 | |
| try: | |
| import torch # type: ignore | |
| with torch.no_grad(): | |
| enc = tok(["warmup"], padding=True, truncation=True, max_length=16, return_tensors="pt") | |
| _ = mdl(**enc.to(dev)) | |
| return jsonify({"bert": "ready", "device": dev}) | |
| except Exception as e: | |
| return jsonify({"bert": "error", "message": str(e)}), 500 | |
| # (Load helpers lain seperti check_key, load_feedback, taxonomy, dll biarkan seperti file lama Anda) | |
| # ... (Pastikan functions: check_key, load_feedback_weights, build_topic_index..., load_inset_lexicon ada) ... | |
| def analyze(): | |
| if not check_key(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| data = request.get_json(force=True) or {} | |
| items = data.get("items") | |
| if items is None: | |
| items = [{ | |
| "id": data.get("id") or "item-1", | |
| "text": data.get("text") or "", | |
| "lang_hint": (data.get("context") or {}).get("lang_hint") if isinstance(data.get("context"), dict) else None | |
| }] | |
| if not isinstance(items, list) or not items: | |
| return jsonify({"error": "items required"}), 422 | |
| # Reload taxonomy if it changed (e.g., admin disabled/restored categories) | |
| _reload_taxonomy_if_changed() | |
| # Setup Taxonomy & Feedback (HYBRID APPROACH) | |
| categories_override = data.get("categories") | |
| TOPIC_INDEX, TAXONOMY_CATEGORIES, BUCKET_KEYWORDS = build_topic_index_and_categories_map() | |
| categories_map = {} | |
| bucket_map = {} | |
| if isinstance(categories_override, dict) and categories_override: | |
| for k, v in categories_override.items(): | |
| if isinstance(v, list): | |
| categories_map[str(k).upper()] = [str(x) for x in v if isinstance(x, (str, int))] | |
| if not categories_map: | |
| categories_map = TAXONOMY_CATEGORIES | |
| bucket_map = BUCKET_KEYWORDS | |
| feedback = load_feedback_weights() | |
| # Setup Variables | |
| results = [] | |
| per_legacy = [] | |
| all_texts = [] | |
| negatives = [] | |
| per_entry_cats = {} | |
| # Load IndoBERT Model (only if enabled) | |
| tok, mdl, dev = get_bert() if ENABLE_BERT else (None, None, None) | |
| # --- PROCESS PER ITEM --- | |
| for it in items: | |
| item_id = it.get("id") | |
| raw_txt = (it.get("text") or "").strip() | |
| lang_hint = it.get("lang_hint") | |
| # 1. Text Cleaning (New Logic) | |
| clean = clean_text(raw_txt) | |
| if not clean: | |
| continue | |
| # 2. Sentiment Scoring (Hybrid) | |
| s_lex = score_with_lexicon(clean, LEXICON_ID) | |
| s_vad = sia.polarity_scores(raw_txt).get("compound", 0.0) if sia else 0.0 | |
| aggregate = float(0.7 * s_lex + 0.3 * s_vad) if sia else s_lex | |
| # Fallback: keyword-based detection if aggregate is neutral (0) | |
| if abs(aggregate) < 0.05: | |
| negative_keywords = [ | |
| "berkelahi", "bertengkar", "murung", "sedih", "marah", "kabur", | |
| "masalah", "ribut", "berantem", "stress", "stres", "pusing", | |
| "takut", "cemas", "galau", "kecewa", "frustrasi", "frustasi", | |
| "jelek", "drop", "sendiri", "sendirian", "tidak paham" | |
| ] | |
| positive_keywords = ["senang", "bahagia", "gembira", "semangat", "excited", "bagus", "oke", "mantap", "suka", "hebat"] | |
| neg_count = sum(1 for kw in negative_keywords if kw in clean) | |
| pos_count = sum(1 for kw in positive_keywords if kw in clean) | |
| if neg_count > pos_count and neg_count > 0: | |
| aggregate = -0.35 # Set mild negative | |
| elif pos_count > neg_count and pos_count > 0: | |
| aggregate = 0.3 # Set mild positive | |
| # 3. Sarcasm Detection (New Logic) | |
| is_sarcasm, sarc_conf = detect_sarcasm_heuristic(clean, raw_txt, aggregate) | |
| if is_sarcasm: | |
| # Flip score: Positive -> Negative | |
| if aggregate > 0: | |
| aggregate = -0.5 * aggregate - 0.3 | |
| elif aggregate == 0: | |
| aggregate = -0.4 | |
| lbl = "negatif" | |
| else: | |
| lbl = label_from_score(aggregate) | |
| # 4. Negative Gate & Severity | |
| # Check severity based on flipped score | |
| neg_flag, severity = negative_gate(aggregate, raw_txt) | |
| if is_sarcasm: | |
| neg_flag = True | |
| severity = max(severity, 0.6) # Sarkasme biasanya sakit | |
| # 5. Category Scoring (ONLY FOR NEGATIVE CONTENT) | |
| # Skip kategorisasi jika semua input positif (aggregate > 0 dan tidak ada sarkasme) | |
| cat_scores = {} | |
| reasons = {} | |
| bucket_scores = defaultdict(float) | |
| best_cat = None | |
| best_bucket = None | |
| cluster = None | |
| if neg_flag or aggregate <= 0: | |
| # HYBRID: Kategori Kecil + Bucket Agregat (ONLY FOR NEGATIVE) | |
| cat_scores, reasons = score_categories_for_text(clean, categories_map, feedback) | |
| # BOOST: Aggregate bucket scores dari kategori kecil | |
| for cat, score in cat_scores.items(): | |
| tp_meta = TOPIC_INDEX.get(str(cat).upper()) | |
| if tp_meta and tp_meta.get("bucket"): | |
| bucket_scores[tp_meta["bucket"]] += score * 0.8 # Slightly dampen aggregated | |
| # Also score directly against bucket keywords (OLD METHOD) | |
| if bucket_map: | |
| bucket_direct, _ = score_categories_for_text(clean, bucket_map, feedback) | |
| for bucket, score in bucket_direct.items(): | |
| bucket_scores[bucket] += score * 1.2 # Boost direct matches | |
| # Find best kategori kecil | |
| best_cat = max(cat_scores, key=cat_scores.get) if cat_scores else None | |
| best_bucket = max(bucket_scores, key=bucket_scores.get) if bucket_scores else None | |
| # Apply minimum confidence thresholds to reduce false positives | |
| if best_cat and cat_scores.get(best_cat, 0.0) < 0.22: | |
| best_cat = None | |
| if best_bucket and bucket_scores.get(best_bucket, 0.0) < 0.25: | |
| best_bucket = None | |
| # 6. Cluster Labeling (Prioritize Kategori Kecil, fallback to Bucket) | |
| if best_cat: | |
| tp_meta = TOPIC_INDEX.get(str(best_cat).upper()) | |
| if tp_meta: | |
| cluster = { | |
| "id": tp_meta.get("kode"), # Match dengan kategori_masalahs.kode | |
| "label": tp_meta.get("name"), | |
| "bucket": tp_meta.get("bucket"), | |
| "topic_id": tp_meta.get("kode"), | |
| "topic_name": tp_meta.get("name"), | |
| "confidence": round(cat_scores[best_cat], 3) | |
| } | |
| elif best_bucket: | |
| # Fallback: Use bucket if no specific kategori kecil matched | |
| cluster = { | |
| "id": best_bucket, | |
| "label": best_bucket, | |
| "bucket": best_bucket, | |
| "topic_id": None, | |
| "topic_name": None, | |
| "confidence": round(bucket_scores[best_bucket], 3) | |
| } | |
| # Else: Skip kategorisasi untuk input positif | |
| # 7. Keywords Extraction | |
| try: | |
| rk = Rake(stopwords=STOPWORDS_ID_CHAT, min_length=1, max_length=3) | |
| rk.extract_keywords_from_text(clean) # Use clean text | |
| raw_phrases = [p.lower() for p in rk.get_ranked_phrases()[:8]] | |
| except Exception: | |
| raw_phrases = [] | |
| # Filter phrases | |
| phrases = sorted(list(set(raw_phrases)), key=len)[:5] | |
| # 8. Summary Text | |
| if is_sarcasm: | |
| summary_text = f"Terdeteksi sarkasme/sindiran. Inti keluhan: {', '.join(phrases[:3])}." | |
| elif neg_flag and cluster: | |
| summary_text = f"Masalah utama: {cluster['label']}. Gejala: {', '.join(phrases[:3])}." | |
| elif neg_flag: | |
| summary_text = f"Inti keluhan: {', '.join(phrases[:3])}." | |
| else: | |
| # Positive input - no categorization needed | |
| summary_text = f"Ekspresi positif. Kata kunci: {', '.join(phrases[:3]) if phrases else 'tidak ada keluhan'}." | |
| results.append({ | |
| "id": item_id, | |
| "clean_text": clean, | |
| "sentiment": { | |
| "barasa": s_lex, "english": s_vad, "aggregate": aggregate, "label": lbl | |
| }, | |
| "negative_flag": neg_flag, | |
| "is_sarcasm": is_sarcasm, # Field Baru | |
| "severity": severity, | |
| "cluster": cluster, | |
| "summary": summary_text, | |
| "key_phrases": phrases, | |
| "recommendations": [], | |
| "cat_scores": cat_scores, | |
| "cat_reasons": reasons, | |
| }) | |
| per_legacy.append({ | |
| "id": item_id, "text": raw_txt, "sentiment": aggregate, | |
| "label": lbl, "keywords": phrases | |
| }) | |
| all_texts.append(clean) | |
| # Collect negatives for clustering | |
| if neg_flag: | |
| negatives.append(clean) | |
| ranked = sorted([(c, s) for c, s in cat_scores.items() if s > 0], key=lambda x: x[1], reverse=True) | |
| per_entry_cats[item_id] = { | |
| "ranked": ranked[:3], | |
| "reasons": {c: reasons.get(c, []) for c, _ in ranked[:3]} | |
| } | |
| # --- AGGREGATION & CLUSTERING --- | |
| # Global Keywords | |
| keyphrases = extract_keyphrases(all_texts) if all_texts else [] | |
| # Clustering with IndoBERT | |
| clusters = [] | |
| if len(negatives) >= 2: | |
| used_engine = "tfidf" | |
| X = None | |
| # Try BERT | |
| if tok and mdl: | |
| try: | |
| with torch.no_grad(): | |
| enc = tok(negatives, padding=True, truncation=True, max_length=128, return_tensors="pt").to(dev) | |
| out = mdl(**enc) | |
| cls = out.last_hidden_state[:, 0, :] | |
| X = cls.detach().cpu().numpy() | |
| if BERT_L2_NORMALIZE: | |
| X = _l2_normalize_rows_dense(X) | |
| used_engine = "bert" | |
| except Exception as e: | |
| print(f"⚠️ BERT error, falling back: {e}") | |
| X = None | |
| # Fallback TF-IDF | |
| if X is None: | |
| vec = _build_cluster_vectorizer() # Pastikan fungsi ini ada (helper lama) | |
| X = vec.fit_transform(negatives) | |
| k = 2 if len(negatives) == 2 else min(4, max(2, len(negatives)//2)) | |
| km = KMeans(n_clusters=k, n_init='auto', random_state=42) | |
| y = km.fit_predict(X) | |
| n_total = max(1, len(negatives)) | |
| for ci in range(k): | |
| idxs = [i for i in range(len(negatives)) if y[i] == ci] | |
| ex = [negatives[i] for i in idxs][:5] | |
| size = len(idxs) | |
| ratio = float(size) / float(n_total) if n_total else 0.0 | |
| # Simple label for UI hinting: top tokens from examples | |
| try: | |
| toks = extract_core_tokens(ex) | |
| label = ", ".join(toks[:3]) if toks else "" | |
| except Exception: | |
| label = "" | |
| # Taxonomy-based hint: map this cluster to the most likely topic/bucket | |
| hint = {} | |
| try: | |
| joined = " . ".join([t for t in ex if isinstance(t, str) and t.strip()]) | |
| # Prefer kategori kecil (topic) | |
| cc, rr = score_categories_for_text(joined, categories_map, feedback) | |
| best_cat = max(cc, key=cc.get) if cc else None | |
| if best_cat and cc.get(best_cat, 0.0) < 0.22: | |
| best_cat = None | |
| best_bucket = None | |
| bb = {} | |
| br = {} | |
| if not best_cat and bucket_map: | |
| bb, br = score_categories_for_text(joined, bucket_map, feedback) | |
| best_bucket = max(bb, key=bb.get) if bb else None | |
| if best_bucket and bb.get(best_bucket, 0.0) < 0.25: | |
| best_bucket = None | |
| if best_cat: | |
| meta = TOPIC_INDEX.get(str(best_cat).upper()) or {} | |
| hint = { | |
| "type": "topic", | |
| "name": meta.get("name") or str(best_cat), | |
| "bucket": meta.get("bucket") or "", | |
| "confidence": round(float(cc.get(best_cat, 0.0)), 3), | |
| "keywords": (rr.get(best_cat) or [])[:5], | |
| } | |
| elif best_bucket: | |
| hint = { | |
| "type": "bucket", | |
| "name": str(best_bucket), | |
| "bucket": str(best_bucket), | |
| "confidence": round(float(bb.get(best_bucket, 0.0)), 3), | |
| "keywords": (br.get(best_bucket) or [])[:5], | |
| } | |
| except Exception: | |
| hint = {} | |
| clusters.append({ | |
| "cluster": int(ci), | |
| "engine": used_engine, | |
| "size": int(size), | |
| "ratio": round(ratio, 4), | |
| "label": label, | |
| "hint": hint, | |
| "examples": ex | |
| }) | |
| # Overview Weighted by Severity & Sarcasm (KATEGORI KECIL - NEGATIVE ONLY) | |
| cat_counter = Counter() | |
| for r in results: | |
| # ONLY count negative items for categorization | |
| if not r.get("negative_flag"): | |
| continue | |
| sev = r.get("severity", 0.0) | |
| weight = 1.0 + sev | |
| # Aggregate by kategori kecil (topic) | |
| cluster = r.get("cluster") or {} | |
| topic_name = cluster.get("topic_name") or cluster.get("label") | |
| if topic_name: | |
| # Use cluster confidence as base score | |
| score = cluster.get("confidence", 0.5) | |
| cat_counter[topic_name] += score * weight | |
| categories_overview = [ | |
| {"category": cat, "score": round(val, 4)} for cat, val in cat_counter.most_common() | |
| ] | |
| # Summary Stats | |
| avg = sum([x["sentiment"] for x in per_legacy]) / len(per_legacy) if per_legacy else 0.0 | |
| summary = { | |
| "avg_sentiment": round(avg, 3), | |
| "negative_ratio": round(sum(1 for x in per_legacy if x["label"]=="negatif")/len(per_legacy), 3) if per_legacy else 0.0 | |
| } | |
| # NEW: Recommendations Generation PER KATEGORI KECIL (Granular) | |
| # Laravel akan filter lebih lanjut berdasarkan master_rekomendasis.rules | |
| def recommend_by_topic(topic_id: str, topic_name: str, bucket: str, severity_val: float, negative: bool, sarcasm: bool): | |
| """Generate recommendations based on kategori kecil (topic). | |
| Returns structured data yang bisa di-match dengan master_rekomendasis di Laravel. | |
| Format return: | |
| { | |
| "kategori_kode": topic_id, # Match dengan kategori_masalahs.kode | |
| "kategori_nama": topic_name, | |
| "bucket": bucket, | |
| "severity": severity_val, | |
| "negative": negative, | |
| "sarcasm": sarcasm, | |
| "suggested_actions": [...] # Heuristic suggestions (optional) | |
| } | |
| """ | |
| rec = { | |
| "kategori_kode": topic_id, | |
| "kategori_nama": topic_name, | |
| "bucket": bucket, | |
| "severity": severity_val, | |
| "negative": negative, | |
| "sarcasm": sarcasm, | |
| "suggested_actions": [] | |
| } | |
| # Heuristic suggestions (Laravel akan filter sesuai master_rekomendasis) | |
| if (negative or sarcasm) and severity_val >= 0.6: | |
| rec["suggested_actions"].append({ | |
| "type": "URGENT", | |
| "reason": "Severity tinggi atau terdeteksi sarkasme" | |
| }) | |
| elif negative and severity_val >= 0.4: | |
| rec["suggested_actions"].append({ | |
| "type": "MODERATE", | |
| "reason": "Indikasi masalah perlu perhatian" | |
| }) | |
| return rec | |
| # Assign Recs per item (GRANULAR: Per Kategori Kecil) | |
| for r in results: | |
| cluster = r.get("cluster") or {} | |
| topic_id = cluster.get("topic_id") or cluster.get("id") | |
| topic_name = cluster.get("topic_name") or cluster.get("label") | |
| bucket = cluster.get("bucket", "") | |
| if topic_id: | |
| # Return kategori kecil info untuk Laravel matching | |
| r["recommendations"] = [recommend_by_topic( | |
| topic_id, | |
| topic_name, | |
| bucket, | |
| r.get("severity", 0), | |
| r.get("negative_flag", False), | |
| r.get("is_sarcasm", False) | |
| )] | |
| else: | |
| # Fallback: No specific kategori detected | |
| r["recommendations"] = [] | |
| # Global Recs (PER KATEGORI KECIL - Granular) | |
| abs_sent = abs(avg) | |
| global_recommendations = [] | |
| valid_cats = [c for c in categories_overview if c["score"] >= 0.05] | |
| is_neg_avg = avg < -0.05 | |
| for cat in valid_cats: | |
| cname = cat["category"] | |
| meta = TOPIC_INDEX.get(cname.upper()) or {} | |
| topic_id = meta.get("kode") or meta.get("id") | |
| topic_name = meta.get("name", cname) | |
| bucket = meta.get("bucket", "") | |
| if topic_id: | |
| rec_data = recommend_by_topic( | |
| topic_id, | |
| topic_name, | |
| bucket, | |
| max(0.3, abs_sent), | |
| is_neg_avg, | |
| False # No global sarcasm flag | |
| ) | |
| global_recommendations.append({ | |
| "category": cname, | |
| "kategori_kode": topic_id, | |
| "score": cat["score"], | |
| "recommendation": rec_data | |
| }) | |
| return jsonify({ | |
| "version": SERVICE_VERSION, | |
| "items": results, | |
| "summary": summary, | |
| "keyphrases": keyphrases, | |
| "clusters": clusters, | |
| "categories_overview": categories_overview, | |
| "global_recommendations": global_recommendations, | |
| }) | |
| def feedback(): | |
| if not check_key(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| data = request.get_json(force=True) or {} | |
| # expected: { keywords: ["telat","bolos"], from_category?: "AKADEMIK", to_category?: "DISIPLIN", delta?: 0.2 } | |
| kws = data.get("keywords") or [] | |
| from_cat = str(data.get("from_category") or "").upper() | |
| to_cat = str(data.get("to_category") or "").upper() | |
| delta = float(data.get("delta") or 0.2) | |
| if not kws or (not from_cat and not to_cat): | |
| return jsonify({"error": "invalid payload"}), 422 | |
| weights = load_feedback_weights() | |
| for kw in kws: | |
| k = str(kw).lower().strip() | |
| if not k: | |
| continue | |
| entry = weights.get(k, {}) | |
| # penalize from_cat slightly, reward to_cat (if provided) | |
| if from_cat: | |
| entry[from_cat] = float(entry.get(from_cat, 0.0)) - (delta / 2.0) | |
| if to_cat: | |
| entry[to_cat] = float(entry.get(to_cat, 0.0)) + delta | |
| weights[k] = entry | |
| save_feedback_weights(weights) | |
| return jsonify({"ok": True, "updated": len(kws)}) | |
| def receive_feedback(): | |
| """ | |
| Receive teacher revision feedback for continuous learning. | |
| Expected payload: | |
| { | |
| "revision_id": 123, | |
| "original_text": "...", | |
| "original_kategori": "AKADEMIK", | |
| "original_rekomendasi": [...], | |
| "revised_kategori": "DISIPLIN", | |
| "revised_rekomendasi": [...], | |
| "revision_notes": "..." (optional) | |
| } | |
| This endpoint will: | |
| 1. Extract keywords from original text | |
| 2. Penalize weights for original_kategori | |
| 3. Reward weights for revised_kategori | |
| 4. Learn from the correction pattern | |
| """ | |
| if not check_key(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| try: | |
| data = request.get_json(force=True) or {} | |
| revision_id = data.get("revision_id") | |
| original_text = data.get("original_text", "") | |
| original_kategori = str(data.get("original_kategori", "")).upper() | |
| revised_kategori = str(data.get("revised_kategori", "")).upper() | |
| if not original_text or not revised_kategori: | |
| return jsonify({"error": "Missing required fields"}), 422 | |
| # Only learn if kategori was changed (not just rekomendasi) | |
| if original_kategori == revised_kategori: | |
| logger.info(f"Revision #{revision_id}: Kategori unchanged, skipping weight update") | |
| return jsonify({ | |
| "ok": True, | |
| "message": "Kategori unchanged, no weight update needed", | |
| "revision_id": revision_id | |
| }) | |
| # Extract keywords from original text | |
| keywords = [] | |
| try: | |
| # Simple keyword extraction - tokenize and filter stopwords | |
| tokens = nltk.word_tokenize(original_text.lower()) | |
| filtered_tokens = [ | |
| t for t in tokens | |
| if t.isalnum() and len(t) > 2 | |
| and t not in STOPWORDS_ID_CHAT | |
| and t not in _CHAT_FILLERS | |
| ] | |
| # Get top 10 most meaningful words | |
| word_counts = Counter(filtered_tokens) | |
| keywords = [word for word, _ in word_counts.most_common(10)] | |
| logger.info(f"Revision #{revision_id}: Extracted keywords: {keywords}") | |
| except Exception as e: | |
| logger.warning(f"Failed to extract keywords: {e}") | |
| # Fallback: split by space | |
| keywords = [w for w in original_text.lower().split() if len(w) > 2][:10] | |
| if not keywords: | |
| return jsonify({ | |
| "ok": False, | |
| "error": "Could not extract keywords from text" | |
| }), 422 | |
| # Update feedback weights | |
| weights = load_feedback_weights() | |
| delta = 0.3 # Learning rate | |
| for kw in keywords: | |
| k = str(kw).lower().strip() | |
| entry = weights.get(k, {}) | |
| # Penalize original (wrong) kategori | |
| if original_kategori: | |
| entry[original_kategori] = float(entry.get(original_kategori, 0.0)) - (delta / 2.0) | |
| # Reward revised (correct) kategori | |
| entry[revised_kategori] = float(entry.get(revised_kategori, 0.0)) + delta | |
| weights[k] = entry | |
| save_feedback_weights(weights) | |
| logger.info(f"Revision #{revision_id}: Updated weights for {len(keywords)} keywords " | |
| f"from {original_kategori} → {revised_kategori}") | |
| return jsonify({ | |
| "ok": True, | |
| "message": "Feedback learned successfully", | |
| "revision_id": revision_id, | |
| "keywords_updated": len(keywords), | |
| "correction": f"{original_kategori} → {revised_kategori}" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error processing feedback: {e}", exc_info=True) | |
| return jsonify({ | |
| "ok": False, | |
| "error": str(e) | |
| }), 500 | |
| def sync_taxonomy(): | |
| if not check_key(): | |
| return jsonify({"error": "unauthorized"}), 401 | |
| data = request.get_json(force=True) or {} | |
| topics = data.get("topics", []) | |
| buckets = data.get("buckets", []) | |
| if not topics: | |
| return jsonify({"error": "topics required"}), 422 | |
| try: | |
| # Update taxonomy.json | |
| taxonomy = { | |
| "topics": topics, | |
| "buckets": buckets, | |
| "meta": { | |
| "last_synced": datetime.now().isoformat(), | |
| "version": "2.0", | |
| "source": "api_sync" | |
| } | |
| } | |
| with open(TAXONOMY_PATH, "w", encoding="utf-8") as f: | |
| json.dump(taxonomy, f, ensure_ascii=False, indent=2) | |
| # Force reload cache | |
| _reload_taxonomy_if_changed(force=True) | |
| logger.info(f"Taxonomy synced: {len(topics)} topics, {len(buckets)} buckets") | |
| return jsonify({ | |
| "ok": True, | |
| "topics_count": len(topics), | |
| "buckets_count": len(buckets), | |
| "message": "Taxonomy synced via API" | |
| }) | |
| except Exception as e: | |
| logger.error(f"Sync taxonomy error: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def get_current_taxonomy(): | |
| """Endpoint untuk ngecek isi taxonomy yang sedang aktif di memori container""" | |
| _reload_taxonomy_if_changed() # Pastikan kita baca yang terbaru | |
| return jsonify({ | |
| "status": "ok", | |
| "last_mtime": _TAX_MTIME, | |
| "topics_count": len(_TAX.get("topics", [])), | |
| "buckets_count": len(_TAX.get("buckets", [])), | |
| "taxonomy_data": _TAX # Hati-hati kalau datanya kegedean | |
| }) | |
| # Tambahkan ini di bagian bawah, sebelum if __name__ == "__main__": | |
| def intip_taxonomy(): | |
| # Baca langsung file yang ada di dalam "Rumah" (Container) saat ini | |
| try: | |
| with open(TAXONOMY_PATH, "r", encoding="utf-8") as f: | |
| isi_sekarang = json.load(f) | |
| return jsonify({ | |
| "status": "Ini isi file di dalam container yang sedang jalan", | |
| "waktu_cek": datetime.now().isoformat(), | |
| "data": isi_sekarang | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", 7860)) # HF Space default port | |
| app.run(host="0.0.0.0", port=port, debug=False) # debug=False untuk production | |