api-rasaya-ml / app.py
adoravelc's picture
Normalisasi BERT
f167ad4 verified
import sys
import os
import re
import json
import math
import logging
from collections import Counter, defaultdict
from datetime import datetime
from typing import List, Dict, Tuple, Optional
import nltk
import numpy as np
import pandas as pd
from flask import Flask, request, jsonify
try:
from langdetect import detect
except Exception:
# Fallback sederhana jika langdetect tidak tersedia
def detect(_text: str) -> str:
return "id"
# --- LIBRARY BARU (Deep Learning & Emoji) ---
import emoji
import torch
from transformers import AutoTokenizer, AutoModel
from sklearn.cluster import KMeans
from sklearn.feature_extraction.text import TfidfVectorizer # Tetap butuh untuk fallback
# NLTK & RAKE
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer
from rake_nltk import Rake
try:
# Optional Indonesian stemmer (improves recall)
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory # type: ignore
_sastrawi_factory = StemmerFactory()
_sastrawi_stemmer = _sastrawi_factory.create_stemmer()
def _stem_id(word: str) -> str:
try:
return _sastrawi_stemmer.stem(word)
except Exception:
return word
except Exception:
_sastrawi_stemmer = None
def _stem_id(word: str) -> str:
return word
# Setup Logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Download NLTK resources safely - SKIP jika tidak perlu
def ensure_nltk_safe():
"""Check NLTK packages, skip download if missing (offline mode)."""
needed = {
"punkt": "tokenizers/punkt",
"punkt_tab": "tokenizers/punkt_tab",
"stopwords": "corpora/stopwords",
}
for pkg, path in needed.items():
try:
nltk.data.find(path)
print(f"✅ {pkg} ready")
except LookupError:
print(f"⚠️ {pkg} not found - continuing in offline mode")
# Panggil tanpa download otomatis
try:
ensure_nltk_safe()
print("=" * 60)
except Exception as e:
print(f"⚠️ NLTK check error: {e}")
app = Flask(__name__)
# Configuration
API_KEY = os.getenv("FLASK_API_KEY", "rahasia-negara-123") # Gunakan env var
SERVICE_VERSION = "1.2.0-bert-sarcasm" # Version bump
# --- GLOBAL VARIABLES ---
# Initialize SentimentIntensityAnalyzer safely (skip jika vader_lexicon tidak ada)
try:
sia = SentimentIntensityAnalyzer()
print("✅ VADER sentiment analyzer ready")
except Exception as e:
print(f"⚠️ VADER not available, using custom lexicon only: {e}")
sia = None
STOPWORDS_ID_CHAT = set(stopwords.words('indonesian')) | set(stopwords.words('english'))
_CHAT_FILLERS = {
"sih", "dong", "kok", "kan", "tuh", "deh", "lah", "yah", "ni", "tu",
"ya", "yak", "yuk", "loh", "masa", "mana", "tapi", "kalo", "kalau",
"biar", "buat", "bikin", "bilang", "gak", "ga", "nggak", "enggak",
"kagak", "tak", "ndak", "udah", "sudah", "blm", "belum", "pas",
"lagi", "lg", "td", "tadi", "km", "kamu", "aku", "saya", "gw", "gue",
"lu", "lo", "elu", "kita", "kalian", "mereka", "dia", "ini", "itu",
"sini", "situ", "sana", "bgt", "banget", "aja", "saja", "cuma",
"doang", "terus", "trs", "jd", "jadi", "karna", "karena", "krn",
"bisa", "bs", "mau", "mo", "pengen", "ingin", "ada", "tiada",
"sama", "dgn", "dengan", "dr", "dari", "ke", "di", "pd", "pada",
"kapan", "dimana", "siapa", "mengapa", "kenapa", "gimana", "bagaimana",
"wkwk", "haha", "hehe", "huhu", "anjir", "njir", "anjing",
"apalah", "apa", "aduh", "wah", "nah", "kek", "kayak", "macam"
}
STOPWORDS_ID_CHAT.update(_CHAT_FILLERS)
# ==== Integrasi TALA Stopwords tambahan ====
try:
_TALA_PATH = os.path.join(os.path.dirname(__file__), 'tala-stopwords-indonesia.txt')
if os.path.exists(_TALA_PATH):
with open(_TALA_PATH, 'r', encoding='utf-8') as _tf:
tala_words = {w.strip().lower() for w in _tf if w.strip() and not w.startswith('#')}
# Hindari kata yang terlalu pendek (1 huruf) agar tidak over-filter
tala_words = {w for w in tala_words if len(w) > 1}
STOPWORDS_ID_CHAT.update(tala_words)
logger.info(f"Loaded TALA stopwords: +{len(tala_words)} terms (total={len(STOPWORDS_ID_CHAT)})")
else:
logger.warning('TALA stopwords file not found, skipping integration.')
except Exception as e:
logger.warning(f'Failed loading TALA stopwords: {e}')
# Lexicon sederhana untuk Indonesia/Kupang dalam range standar [-1, +1]
ID_EXTRA = {
# Emosi negatif umum
"capek": -0.7, "capai": -0.5, "pusing": -0.7, "marah": -0.8, "sedih": -0.7,
"murung": -0.7, "galau": -0.6, "bingung": -0.5, "takut": -0.7, "cemas": -0.7,
"kecewa": -0.7, "kesal": -0.6, "jengkel": -0.6, "frustasi": -0.8, "frustrasi": -0.8, "depresi": -0.9,
"stres": -0.8, "tegang": -0.6, "resah": -0.7, "gelisah": -0.7, "sendirian": -0.5,
# Emosi positif umum
"senang": 0.7, "bahagia": 0.8, "semangat": 0.7, "hepi": 0.7, "gembira": 0.8,
"excited": 0.7, "antusias": 0.7, "optimis": 0.6, "tenang": 0.5, "damai": 0.6,
"puas": 0.6, "lega": 0.6, "syukur": 0.7, "bangga": 0.7,
# Masalah sekolah
"telat": -0.6, "bolos": -0.8, "berantem": -0.9, "ribut": -0.7, "gaduh": -0.6,
"berkelahi": -0.9, "bertengkar": -0.8, "keributan": -0.7, "masalah": -0.5,
"PR": -0.3, "tugas": -0.2, "banyak": -0.2, "malas": -0.5, "rajin": 0.5,
"skip": -0.6, "cabut": -0.6, "pontang": -0.7, "mangkir": -0.7,
# Keluarga & rumah
"berantem": -0.9, "cekcok": -0.8, "bertengkar": -0.8, "marahan": -0.7,
"berisik": -0.5, "berantakan": -0.4, "kacau": -0.7, "chaos": -0.7,
"pisah": -0.7, "bercerai": -0.8, "kabur": -0.7, "minggat": -0.8, "pergi": -0.3,
# Kupang/Manado dialect dengan sentiment
"sonde": -0.3, "tara": -0.2, "teda": -0.2, "pigi": -0.1, # Kupang negation/pergi
"kaco": -0.5, "cungkel": -0.5, "bongkar": -0.2, "kobo": -0.4, "susa": -0.6,
"dolo": -0.4, "molo": -0.4, "so": -0.3, "nda": -0.3, # Manado negation
"bodo": -0.6, "bodoh": -0.7, "tolol": -0.8, "goblok": -0.8, # Insults
# Neutral pronouns (score 0 won't affect sentiment)
"beta": 0.0, "ko": 0.0, "torang": 0.0, "katong": 0.0, "deng": 0.0,
"dong": 0.0, "de": 0.0, "so": 0.0, "pe": 0.0, "pung": 0.0,
"tanta": 0.0, "oma": 0.0, "opa": 0.0, "mama": 0.0, "papa": 0.0,
}
# tambahkan ke VADER (jika available)
if sia:
sia.lexicon.update({k.lower(): v for k, v in ID_EXTRA.items()})
app = Flask(__name__)
API_KEY = os.environ.get("ML_API_KEY") # optional
FEEDBACK_FILE = os.environ.get("ML_FEEDBACK_FILE", os.path.join(os.path.dirname(__file__), "feedback_weights.json"))
LEXICON_DIR = os.environ.get("ML_LEXICON_DIR", os.path.join(os.path.dirname(__file__), "lexicons"))
ENABLE_BERT = os.environ.get("ML_ENABLE_BERT", "false").lower() in ("1","true","yes")
BERT_MODEL_NAME = os.environ.get("ML_BERT_MODEL", "indobenchmark/indobert-base-p1")
ENABLE_BERT_WARMUP = os.environ.get("ML_BERT_WARMUP", "false").lower() in ("1","true","yes")
BERT_L2_NORMALIZE = os.environ.get("ML_BERT_L2_NORMALIZE", "true").lower() in ("1", "true", "yes")
SERVICE_VERSION = os.environ.get("ML_VERSION", "ml-rasaya:2025.11.0")
def check_key():
if API_KEY:
# accept both header casings/variants for compatibility
key = request.headers.get("X-API-KEY") or request.headers.get("X-API-Key")
if key != API_KEY:
return False
return True
def detect_lang(txt, hint=None):
if hint:
return hint
try:
return detect(txt) if txt and txt.strip() else "id"
except Exception:
return "id"
def label_from_score(compound: float) -> str:
if compound >= 0.05: return "positif"
if compound <= -0.05: return "negatif"
return "netral"
# Legacy default map removed in favor of taxonomy-derived categories
def load_feedback_weights():
try:
with open(FEEDBACK_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {}
def save_feedback_weights(weights: dict):
try:
with open(FEEDBACK_FILE, 'w', encoding='utf-8') as f:
json.dump(weights, f, ensure_ascii=False, indent=2)
except Exception:
pass
def score_categories_for_text(txt: str, categories_map: dict, feedback: dict):
"""Scoring kategori berbasis token & n-gram.
- Tokenize + optional stemming (Sastrawi) untuk generalisasi.
- Match unigram/bigram/trigram secara exact (bukan substring bebas).
- Bobot dasar dibagi oleh banyaknya kategori yang memakai keyword (1/n_cats).
- Boost n-gram (bi=1.4x, tri=1.6x), downweight token sangat pendek (<=3: 0.5x).
- Tambahkan feedback weight jika ada, lalu normalisasi ke proporsi total.
"""
clean = clean_text(txt)
toks = _tokenize_and_stem(clean)
uni, bi, tri = _build_ngram_sets(toks)
# Invert index: keyword -> categories
inv = defaultdict(list)
for cat, kws in categories_map.items():
for kw in kws:
k = (kw or '').strip().lower()
if k:
inv[k].append(cat)
scores = {cat: 0.0 for cat in categories_map.keys()}
reasons = defaultdict(list)
for kw, cats in inv.items():
parts = [p for p in kw.split() if p]
parts_stem = [_stem_id(p) for p in parts]
gram = len(parts_stem)
present = False
if gram == 1:
present = parts_stem[0] in uni
elif gram == 2:
present = (parts_stem[0] + ' ' + parts_stem[1]) in bi
else:
seq = ' '.join(parts_stem[:3])
present = seq in tri if len(parts_stem) >= 3 else False
if not present:
continue
base = 1.0 / max(1, len(cats))
if gram == 1 and len(parts_stem[0]) <= 3:
base *= 0.5
if gram == 2:
base *= 1.4
elif gram >= 3:
base *= 1.6
for cat in cats:
adj = base + float(feedback.get(kw, {}).get(cat, 0.0))
scores[cat] += adj
reasons[cat].append(kw)
total = sum(scores.values())
if total > 0:
for k in scores.keys():
scores[k] = round(scores[k] / total, 4)
return scores, {k: sorted(set(v))[:5] for k, v in reasons.items()}
"""
Cleaning & Lexicon Loader (InSet + optional Barasa)
"""
# Regex patterns
_RE_URL = re.compile(r"https?://\S+|www\.\S+")
_RE_MENTION = re.compile(r"[@#]\w+")
_RE_REPEAT = re.compile(r"(.)\1{2,}") # 3 kali atau lebih
_RE_MULTISPACE = re.compile(r"\s+")
def clean_text(t: str) -> str:
"""
Cleaning text tapi mempertahankan emoji dan tanda baca penting untuk sentimen.
"""
if not t: return ""
# 1. Demojize: Ubah emoji jadi teks bahasa Indonesia (manual mapping dikit)
t = emoji.demojize(t, delimiters=(" ", " "))
t = t.replace("loudly_crying_face", "menangis") \
.replace("crying_face", "sedih") \
.replace("pensive_face", "murung") \
.replace("angry_face", "marah") \
.replace("rolling_on_the_floor_laughing", "tertawa") \
.replace("face_with_rolling_eyes", "bosan") \
.replace("broken_heart", "patah hati")
t = t.lower().strip()
# 2. Remove URL & Mention
t = _RE_URL.sub(" ", t)
t = _RE_MENTION.sub(" ", t)
# 3. Keep punctuation important for emotion (?!.,)
# Hapus karakter aneh selain alphanumeric dan tanda baca penting
t = re.sub(r"[^a-z0-9\?\!\.\,\s]", " ", t)
# Pisahkan tanda baca biar jadi token terpisah
t = re.sub(r"([\?\!\.\,])", r" \1 ", t)
# 4. Normalize Repeat (bangeeet -> banget)
t = _RE_REPEAT.sub(r"\1", t)
# 5. Slang & Dialect Normalization (Indonesian + Kupang + Manado + Ambon)
dialect = {
# Standard Indonesian slang
"gw": "saya", "gue": "saya", "lu": "kamu", "lo": "kamu", "elu": "kamu",
"ak": "aku", "aq": "aku", "sy": "saya", "w": "saya", "ane": "saya",
"gak": "tidak", "ga": "tidak", "nggak": "tidak", "kaga": "tidak", "ndak": "tidak",
"enggak": "tidak", "engga": "tidak", "ngga": "tidak", "kagak": "tidak",
"krn": "karena", "karna": "karena", "bgt": "banget", "bgtt": "banget",
"tdk": "tidak", "jgn": "jangan", "udh": "sudah", "sdh": "sudah",
"blm": "belum", "trus": "terus", "jd": "jadi", "dgn": "dengan",
"sm": "sama", "yg": "yang", "kalo": "kalau", "kl": "kalau",
"mager": "malas gerak", "baper": "bawa perasaan", "gabut": "bosan",
"anjir": "kaget", "njir": "kaget", "anjay": "hebat",
"mantul": "mantap", "santuy": "santai", "sans": "santai",
"gajelas": "tidak jelas", "gaje": "tidak jelas",
# Kupang/NTT dialect
# --- KATA GANTI ORANG (PRONOUNS) ---
"beta": "saya", "b": "saya", "bt": "saya", # Kupang/Ambon
"kita": "saya", # Manado (konteks santai)
"ana": "saya", "awak": "saya", "sa": "saya", "sy": "saya",
"ak": "aku", "aq": "aku", "gw": "saya", "gue": "saya",
"lu": "kamu", "lo": "kamu", "elu": "kamu",
"ose": "kamu", "os": "kamu", "ale": "kamu", # Ambon
"ngana": "kamu", "nga": "kamu", # Manado
"ko": "kamu", "kau": "kamu", "ju": "kamu", # Kupang/Papua
"bo": "kamu", # Bima/Dompu kadang masuk
"dia": "dia", "de": "dia", "i": "dia", # Papua/Kupang (De pung rumah)
"antua": "beliau", # Ambon (respektif)
"katong": "kita", "ketong": "kita", "ktg": "kita", # Kupang/Ambon
"torang": "kita", "tong": "kita", # Manado/Papua
"dorang": "mereka", "dong": "mereka", "drg": "mereka", # Manado/Kupang/Ambon
"besong": "kalian", "basong": "kalian", "kamorang": "kalian", # Kupang/Papua
"ngoni": "kalian", # Manado
# --- NEGASI (TIDAK/BUKAN) ---
"sonde": "tidak", "son": "tidak", "snd": "tidak", "sond": "tidak", # Kupang
"seng": "tidak", "sing": "tidak", "tra": "tidak", "trada": "tidak", # Ambon/Papua
"tara": "tidak", "tar": "tidak",
"nyanda": "tidak", "nda": "tidak", "ndak": "tidak", # Manado/Jawa
"gak": "tidak", "ga": "tidak", "nggak": "tidak", "kaga": "tidak",
"bukang": "bukan",
# --- KATA KERJA & KETERANGAN (VERBS & ADVERBS) ---
"pi": "pergi", "p": "pergi", "pig": "pergi", # Kupang/Ambon (saya kabur 'pi'...)
"su": "sudah", "so": "sudah", # Kupang/Manado/Ambon
"sdh": "sudah", "udh": "sudah", "udah": "sudah",
"blm": "belum", "balom": "belum",
"mo": "mau", "mau": "mau",
"kasi": "beri", "kase": "beri", "kas": "beri", # Kase tinggal -> Beri tinggal
"omong": "bicara", "baomong": "bicara", "bakata": "berkata",
"dapa": "dapat", "dap": "dapat",
"baku": "saling", # Baku pukul -> Saling pukul
"bae": "baik", "baek": "baik",
"ancor": "hancur",
"ambe": "ambil", "pigi": "pergi",
# --- KEPEMILIKAN & PENGHUBUNG ---
"pung": "punya", "puny": "punya", "pu": "punya", "pe": "punya", # Beta pung -> Saya punya
"deng": "dengan", "dg": "dengan", "dng": "dengan",
"par": "untuk", "for": "untuk", # Ambon/Manado (For ngana)
"vor": "untuk",
"kek": "seperti", "mcam": "macam", "kek": "kayak",
# --- KATA SIFAT & LAINNYA ---
"talalu": "terlalu", "tlalu": "terlalu",
"sadiki": "sedikit", "sadikit": "sedikit",
"banya": "banyak",
"skali": "sekali",
"samua": "semua",
"karna": "karena", "krn": "karena", "gara": "karena",
# --- GENERAL SLANG INDONESIA ---
"bgt": "banget", "bgtt": "banget",
"trus": "terus", "trs": "terus",
"jd": "jadi", "jdi": "jadi",
"yg": "yang", "kalo": "kalau", "kl": "kalau",
"mager": "malas gerak", "baper": "bawa perasaan", "gabut": "bosan",
"anjir": "kaget", "njir": "kaget", "anjay": "hebat",
"mantul": "mantap", "santuy": "santai", "sans": "santai",
"gajelas": "tidak jelas", "gaje": "tidak jelas",
"ortu": "orang tua", "mksd": "maksud",
"knp": "kenapa", "np": "kenapa", "napa": "kenapa",
"utk": "untuk"
}
toks = []
for tk in t.split():
toks.append(dialect.get(tk, tk))
t = " ".join(toks)
t = _RE_MULTISPACE.sub(" ", t).strip()
return t
# Tokenization + optional stemming helpers
def _tokenize_and_stem(t: str) -> list[str]:
toks = [w for w in t.split() if w]
if _sastrawi_stemmer is None:
return toks
return [_stem_id(w) for w in toks]
def _build_ngram_sets(tokens: list[str]) -> tuple[set[str], set[str], set[str]]:
uni = set(tokens)
bi = set([tokens[i] + " " + tokens[i+1] for i in range(len(tokens)-1)]) if len(tokens) >= 2 else set()
tri = set([tokens[i] + " " + tokens[i+1] + " " + tokens[i+2] for i in range(len(tokens)-2)]) if len(tokens) >= 3 else set()
return uni, bi, tri
def detect_sarcasm_heuristic(text_clean, raw_text, current_sentiment):
"""
Mendeteksi potensi sarkasme berdasarkan kontras sentimen, emoji, dan tanda baca.
Returns: (is_sarcasm: bool, confidence: float)
"""
is_sarcasm = False
confidence = 0.0
text_clean = text_clean.lower()
# Kamus Heuristik
intensifiers = ["banget", "bgt", "kali", "sumpah", "bener", "bet", "parah", "amat"]
positives = ["hebat", "bagus", "pinter", "jenius", "mantap", "enak", "keren", "rajin", "suci"]
negatives = ["pusing", "capek", "stres", "gila", "mati", "rusak", "hancur", "sebel", "benci", "malas", "bodoh", "tolol"]
# Fitur
has_pos = any(p in text_clean for p in positives)
has_neg = any(n in text_clean for n in negatives)
has_intensifier = any(i in text_clean for i in intensifiers)
has_exclamation = "!" in raw_text or "?" in raw_text
# LOGIC 1: Kalimat mengandung Positif DAN Negatif ("Hebat banget lo bikin gue stres")
if has_pos and has_neg:
return True, 0.75
# LOGIC 2: Kalimat Positif + Tanda baca agresif + Konteks ambigu ("Pinter ya lo??")
# Biasanya kalau muji beneran jarang pake '??'
if has_pos and ("??" in raw_text or "!!" in raw_text):
return True, 0.6
# LOGIC 3: Positif + Emoji Negatif (Manual check raw text for common sarcastic emojis)
# Emoji: Rolling eyes, Unamused face, Upside-down face
sarcastic_emojis = ["🙄", "😒", "🙃", "😤", "🤡"]
if has_pos and any(e in raw_text for e in sarcastic_emojis):
return True, 0.9
return False, 0.0
def load_inset_lexicon(base_dir: str) -> dict[str, float]:
"""Load InSet format: lexicons/inset/{positive.tsv,negative.tsv}."""
out: dict[str, float] = {}
inset_dir = os.path.join(base_dir, "inset")
pos = os.path.join(inset_dir, "positive.tsv")
neg = os.path.join(inset_dir, "negative.tsv")
if os.path.exists(pos):
with open(pos, "r", encoding="utf-8") as f:
for line in f:
w = line.strip().split("\t")[0]
if w:
out[w.lower()] = 1.0
if os.path.exists(neg):
with open(neg, "r", encoding="utf-8") as f:
for line in f:
w = line.strip().split("\t")[0]
if w:
out[w.lower()] = -1.0
return out
def load_barasa_csv(path: str) -> dict[str, float]:
"""Load Barasa CSV with headers; expects at least a 'lemma' column and
either a 'score' column (float, negative to positive) or separate
'pos'/'neg' columns that can be combined (score = pos - neg).
Values are clamped to [-1, 1].
"""
lex: dict[str, float] = {}
try:
import csv
with open(path, encoding="utf-8") as f:
r = csv.DictReader(f)
for row in r:
lemma = (row.get("lemma") or row.get("word") or row.get("token") or "").strip().lower()
if not lemma:
continue
score_val = None
# Prefer unified score
if row.get("score") not in (None, ""):
try:
score_val = float(row.get("score"))
except Exception:
score_val = None
# Else try pos/neg columns
if score_val is None:
try:
pos = float(row.get("pos") or row.get("positive") or 0)
neg = float(row.get("neg") or row.get("negative") or 0)
score_val = pos - neg
except Exception:
score_val = 0.0
score_val = max(-1.0, min(1.0, float(score_val)))
lex[lemma] = score_val
except Exception:
pass
return lex
def load_barasa_optional(base_dir: str) -> dict[str, float]:
"""
Try to read Barasa resources if available. The provided file wn-msa-all.tab
is a WordNet-style tab file (no explicit polarity). We don't assign scores
from it directly; instead we just return empty dict so it doesn't affect
sentiment unless in the future we add mapping rules.
If you later provide barasa.csv (word,score), we can extend this loader.
"""
barasa_dir = os.path.join(base_dir, "barasa")
wn_file = os.path.join(barasa_dir, "wn-msa-all.tab")
# Placeholder: no direct sentiment; return empty for now.
# Future: map synonyms of existing sentiment words and inherit score * 0.8
if os.path.exists(wn_file):
return {}
# also support barasa.csv if added by user
csv_file = os.path.join(base_dir, "barasa.csv")
if os.path.exists(csv_file):
out: dict[str, float] = {}
with open(csv_file, "r", encoding="utf-8") as f:
for line in f:
if "," in line:
w, sc = line.strip().split(",", 1)
try:
out[w.lower()] = max(-1.0, min(1.0, float(sc)))
except Exception:
continue
return out
return {}
def build_lexicon() -> dict[str, float]:
# Start from InSet if available
lex = load_inset_lexicon(LEXICON_DIR)
# Merge Barasa if CSV provided; else try optional WordNet source (no polarity)
barasa_csv = os.path.join(LEXICON_DIR, "barasa", "barasa_lexicon.csv")
if os.path.exists(barasa_csv):
lex.update(load_barasa_csv(barasa_csv))
else:
bar = load_barasa_optional(LEXICON_DIR)
lex.update(bar)
# Add custom Kupang/ID extra (sudah dalam range [-1, +1])
for k, v in ID_EXTRA.items():
lex[k.lower()] = max(-1.0, min(1.0, float(v)))
return lex
LEXICON_ID = build_lexicon()
def score_with_lexicon(text: str, lex: Dict[str, float]) -> float:
toks = clean_text(text).split()
if not toks:
return 0.0
# Context-aware scoring: handle negation (pre & post), intensifiers
negation_words = {"tidak", "bukan", "belum", "jangan", "tanpa", "sonde", "tara", "teda", "nda", "tra"}
intensifiers = {"banget", "sangat", "amat", "sekali", "parah", "bener", "pisan"}
s = 0.0
neg_window = 0 # number of next tokens to negate
intensify = 1.0
# track last scored token to handle patterns like "paham ... belum"
last_score_val = 0.0
last_score_idx = -10
for i, tok in enumerate(toks):
# Negation token: start negation window and optionally flip previous positive nearby
if tok in negation_words:
# If a positive word occurred recently (within 2 tokens), flip it retroactively
if last_score_val > 0 and (i - last_score_idx) <= 2:
# subtract a bit more than added to reflect negation of previous positive
s -= last_score_val * 1.2
last_score_val = 0.0
neg_window = 3
continue
# Intensifier affects next scored word only
if tok in intensifiers:
intensify = 1.5
continue
# Base lexical score
score = lex.get(tok, 0.0)
# Apply active negation window
if neg_window > 0 and score != 0.0:
score = -score * 0.8
neg_window -= 1
elif neg_window > 0:
# consume window even if current token has no score
neg_window -= 1
# Apply intensifier
if intensify > 1.0 and score != 0.0:
score = score * intensify
intensify = 1.0
s += score
if score != 0.0:
last_score_val = score
last_score_idx = i
# Dampen by sqrt length to avoid bias for long texts
normalized = s / max(1.0, math.sqrt(len(toks)))
return max(-1.0, min(1.0, normalized))
INTENSIFIERS = {"banget": 1.0, "sangat": 0.8, "parah": 0.9, "amat": 0.5}
def negative_gate(aggregate: float, raw_txt: str) -> tuple[bool, float]:
# severity from magnitude + intensifiers + punctuation and repeats
clean = clean_text(raw_txt)
toks = clean.split()
intens = sum(INTENSIFIERS.get(t, 0.0) for t in toks)
exclam = min(raw_txt.count("!"), 3) * 0.1
repeat = 0.1 if _RE_REPEAT.search(raw_txt) else 0.0
sev = max(0.0, min(1.0, (-aggregate) * 0.7 + intens * 0.2 + exclam + repeat))
return (aggregate <= -0.05), round(sev, 3)
# =====================
# Taxonomy (topics/subtopics) for semi-supervised labeling
# =====================
TAXONOMY_PATH = os.path.join(os.path.dirname(__file__), "taxonomy.json")
_TAX_MTIME: float = -1.0
try:
with open(TAXONOMY_PATH, "r", encoding="utf-8") as _f:
_TAX = json.load(_f)
except Exception:
_TAX = {"topics": []}
def _reload_taxonomy_if_changed(force: bool = False) -> None:
"""Reload taxonomy.json if file changed.
Needed so DB soft-deletes / is_active toggles (synced into taxonomy.json)
take effect without restarting the ML service.
"""
global _TAX, _TAX_MTIME, BUCKET_KW, SUBTOPICS
try:
mtime = os.path.getmtime(TAXONOMY_PATH)
except Exception:
return
if (not force) and _TAX_MTIME >= 0 and mtime <= _TAX_MTIME:
return
try:
with open(TAXONOMY_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
return
_TAX = data
_TAX_MTIME = mtime
# refresh derived indices (best-effort)
try:
BUCKET_KW, SUBTOPICS = _taxonomy_keywords()
except Exception:
pass
logger.info("taxonomy.json reloaded", extra={"mtime": mtime})
except Exception as e:
logger.warning(f"Failed reloading taxonomy.json: {e}")
def _taxonomy_keywords():
buckets = {}
subtopics = {}
for tp in _TAX.get("topics", []):
bucket = tp.get("bucket") or ""
topic_id = tp.get("id") or bucket or "TOPIC"
topic_name = tp.get("name") or topic_id
buckets.setdefault(bucket, set()).update([str(w).lower() for w in tp.get("keywords", []) if w])
for st in tp.get("subtopics", []) or []:
# Maintain internal id (taxonomy id) and external 'code' matching kategori_masalahs.kode
st_id = st.get("id") or st.get("code") or st.get("name")
st_code = st.get("code") or st_id
if not st_id:
continue
subtopics[st_id] = {
"name": st.get("name") or st_id,
"bucket": bucket,
"topic_id": topic_id,
"topic_name": topic_name,
"code": st_code,
"keywords": set([str(w).lower() for w in st.get("keywords", []) if w]),
"examples": st.get("examples", []) or []
}
return buckets, subtopics
BUCKET_KW, SUBTOPICS = _taxonomy_keywords()
def build_topic_index_and_categories_map():
"""HYBRID APPROACH (OLD METHOD + NEW DATA):
Builds multi-level keyword matching dengan data dari database.
Returns: (topic_index, categories_map, bucket_map)
- topic_index: metadata per kategori kecil {UPPER(name): {id, name, bucket, kode}}
- categories_map: keywords per kategori kecil {UPPER(name): [keywords]}
- bucket_map: keywords per kategori besar {UPPER(bucket): [aggregated keywords]}
WHY THIS IS BETTER:
- Multi-level matching: Check keywords di kategori kecil DAN kategori besar
- Redundancy: Jika miss di kategori kecil, bisa match di bucket agregat
- Better coverage: Keywords dari semua kategori kecil teragregasi ke bucket
"""
topic_index = {}
categories_map = {}
bucket_map = defaultdict(set) # Agregasi keywords per bucket
# Process topics (kategori kecil) dari database
for tp in _TAX.get("topics", []):
topic_id = tp.get("id") or tp.get("code") or "TOPIC"
topic_name = tp.get("name") or topic_id
bucket = tp.get("bucket") or ""
key = str(topic_name).upper()
# Collect keywords from topic level (kategori kecil)
kw = set([str(w).lower().strip() for w in (tp.get("keywords") or []) if w])
# Legacy support: subtopics (backward compatibility)
for st in tp.get("subtopics", []) or []:
for w in st.get("keywords", []) or []:
if w:
kw.add(str(w).lower().strip())
# Store kategori kecil metadata & keywords
topic_index[key] = {
"id": topic_id,
"name": topic_name,
"bucket": bucket,
"kode": topic_id # Match dengan kategori_masalahs.kode
}
categories_map[key] = sorted(list(kw))
# AGGREGATE keywords ke bucket (kategori besar)
# Ini yang bikin metode lama lebih akurat!
if bucket:
bucket_map[bucket.upper()].update(kw)
# Convert bucket_map sets to sorted lists
bucket_keywords = {k: sorted(list(v)) for k, v in bucket_map.items()}
return topic_index, categories_map, bucket_keywords
def extract_keyphrases(texts, lang="id"):
# RAKE pakai stopwords bhs Inggris default; untuk id sederhana kita kasih stopwords id juga
sw = set(stopwords.words('indonesian')) | set(stopwords.words('english'))
r = Rake(stopwords=sw)
joined = " . ".join(texts)
r.extract_keywords_from_text(joined)
ranked = r.get_ranked_phrases_with_scores()
out = []
for score, phrase in ranked[:20]:
out.append({"term": phrase, "weight": float(score)})
return out
def extract_core_tokens(texts):
"""Ambil token inti dengan pembersihan:
- lower & clean_text
- buang stopwords (ID + EN) & filler umum
- buang token panjang < 3
- hitung frekuensi, ambil top 10
"""
freq = Counter()
try:
sw_id = set(stopwords.words('indonesian'))
except Exception:
sw_id = set()
try:
sw_en = set(stopwords.words('english'))
except Exception:
sw_en = set()
filler = {
'dan','atau','yang','di','ke','dengan','pada','untuk','dari','lagi','sih','deh','lah','ya','kok','kan','udah','aja','pun','itu','ini','jadi','kalau','kalo','bahwa','sementara','sering','kayak','kayakny','nih','tuh','dong','de','si','mungkin','masih','bisa','harus','karena','seperti','kaya','gitu','buat'
}
for t in texts:
for tok in clean_text(t).split():
if len(tok) < 3: continue
if tok in sw_id or tok in sw_en or tok in filler: continue
freq[tok] += 1
return [w for w,_ in freq.most_common(10)]
def _build_cluster_vectorizer():
"""Vectorizer for clustering top-terms: single-word tokens, heavy stopwords cleanup."""
try:
sw_id = set(stopwords.words('indonesian'))
except Exception:
sw_id = set()
try:
sw_en = set(stopwords.words('english'))
except Exception:
sw_en = set()
extra = {
# connectors/intensifiers/pronouns/common fillers
'dan','atau','yang','di','ke','dengan','pada','untuk','dari','lagi','banget','sekali','paling','sih','deh','dong','lah','ya',
'aku','saya','gue','gua','dia','kamu','kau','ko','kami','kita','mereka',
'punya','dengar','dng','sm','nih','tuh','kok','kan','udah','lagi','aja','de','si',
}
stopset = sw_id | sw_en | extra
# Use our cleaner as preprocessor; single-word tokens only
vec = TfidfVectorizer(
preprocessor=clean_text,
tokenizer=str.split,
token_pattern=None,
lowercase=True,
stop_words=list(stopset),
ngram_range=(1,1),
max_df=0.95,
min_df=1,
max_features=1000,
)
return vec
@app.get("/health")
def health():
return jsonify({"status": "ok", "version": SERVICE_VERSION, "bert": ENABLE_BERT})
# =====================
# IndoBERT caching & optional warmup
# =====================
BERT_CACHE = {"tok": None, "mdl": None, "device": "cpu"}
def _l2_normalize_rows_dense(X: np.ndarray, eps: float = 1e-12) -> np.ndarray:
"""Row-wise L2 normalization for dense matrices.
This is used to make IndoBERT embeddings comparable for distance-based
clustering (KMeans) and to reduce magnitude effects.
"""
if X is None:
return X
X = np.asarray(X, dtype=np.float32)
if X.ndim != 2 or X.shape[0] == 0:
return X
norms = np.linalg.norm(X, axis=1, keepdims=True)
norms = np.maximum(norms, eps)
return X / norms
# --- GLOBAL BERT VARIABLES ---
_bert_tokenizer = None
_bert_model = None
_bert_device = None
def get_bert():
global _bert_tokenizer, _bert_model, _bert_device
if _bert_tokenizer is None:
print("⏳ Loading IndoBERT model... (First run might take a while)")
try:
model_name = "indobenchmark/indobert-base-p1"
_bert_tokenizer = AutoTokenizer.from_pretrained(model_name)
_bert_model = AutoModel.from_pretrained(model_name)
_bert_device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
_bert_model.to(_bert_device)
_bert_model.eval()
print(f"✅ IndoBERT loaded on {_bert_device}")
except Exception as e:
print(f"❌ Failed to load IndoBERT: {e}")
return None, None, None
return _bert_tokenizer, _bert_model, _bert_device
# Warmup at startup if requested (download/load once)
if ENABLE_BERT and ENABLE_BERT_WARMUP:
tok, mdl, dev = get_bert()
try:
if tok is not None and mdl is not None:
import torch # type: ignore
with torch.no_grad():
enc = tok(["warmup"], padding=True, truncation=True, max_length=16, return_tensors="pt")
_ = mdl(**enc.to(dev))
except Exception:
pass
@app.get("/warmup")
def warmup():
"""Optionally trigger BERT load and a tiny forward pass to avoid first-request latency."""
if not ENABLE_BERT:
return jsonify({"bert": "disabled"})
tok, mdl, dev = get_bert()
if tok is None or mdl is None:
return jsonify({"bert": "unavailable"}), 500
try:
import torch # type: ignore
with torch.no_grad():
enc = tok(["warmup"], padding=True, truncation=True, max_length=16, return_tensors="pt")
_ = mdl(**enc.to(dev))
return jsonify({"bert": "ready", "device": dev})
except Exception as e:
return jsonify({"bert": "error", "message": str(e)}), 500
# (Load helpers lain seperti check_key, load_feedback, taxonomy, dll biarkan seperti file lama Anda)
# ... (Pastikan functions: check_key, load_feedback_weights, build_topic_index..., load_inset_lexicon ada) ...
@app.post("/analyze")
def analyze():
if not check_key():
return jsonify({"error": "unauthorized"}), 401
data = request.get_json(force=True) or {}
items = data.get("items")
if items is None:
items = [{
"id": data.get("id") or "item-1",
"text": data.get("text") or "",
"lang_hint": (data.get("context") or {}).get("lang_hint") if isinstance(data.get("context"), dict) else None
}]
if not isinstance(items, list) or not items:
return jsonify({"error": "items required"}), 422
# Reload taxonomy if it changed (e.g., admin disabled/restored categories)
_reload_taxonomy_if_changed()
# Setup Taxonomy & Feedback (HYBRID APPROACH)
categories_override = data.get("categories")
TOPIC_INDEX, TAXONOMY_CATEGORIES, BUCKET_KEYWORDS = build_topic_index_and_categories_map()
categories_map = {}
bucket_map = {}
if isinstance(categories_override, dict) and categories_override:
for k, v in categories_override.items():
if isinstance(v, list):
categories_map[str(k).upper()] = [str(x) for x in v if isinstance(x, (str, int))]
if not categories_map:
categories_map = TAXONOMY_CATEGORIES
bucket_map = BUCKET_KEYWORDS
feedback = load_feedback_weights()
# Setup Variables
results = []
per_legacy = []
all_texts = []
negatives = []
per_entry_cats = {}
# Load IndoBERT Model (only if enabled)
tok, mdl, dev = get_bert() if ENABLE_BERT else (None, None, None)
# --- PROCESS PER ITEM ---
for it in items:
item_id = it.get("id")
raw_txt = (it.get("text") or "").strip()
lang_hint = it.get("lang_hint")
# 1. Text Cleaning (New Logic)
clean = clean_text(raw_txt)
if not clean:
continue
# 2. Sentiment Scoring (Hybrid)
s_lex = score_with_lexicon(clean, LEXICON_ID)
s_vad = sia.polarity_scores(raw_txt).get("compound", 0.0) if sia else 0.0
aggregate = float(0.7 * s_lex + 0.3 * s_vad) if sia else s_lex
# Fallback: keyword-based detection if aggregate is neutral (0)
if abs(aggregate) < 0.05:
negative_keywords = [
"berkelahi", "bertengkar", "murung", "sedih", "marah", "kabur",
"masalah", "ribut", "berantem", "stress", "stres", "pusing",
"takut", "cemas", "galau", "kecewa", "frustrasi", "frustasi",
"jelek", "drop", "sendiri", "sendirian", "tidak paham"
]
positive_keywords = ["senang", "bahagia", "gembira", "semangat", "excited", "bagus", "oke", "mantap", "suka", "hebat"]
neg_count = sum(1 for kw in negative_keywords if kw in clean)
pos_count = sum(1 for kw in positive_keywords if kw in clean)
if neg_count > pos_count and neg_count > 0:
aggregate = -0.35 # Set mild negative
elif pos_count > neg_count and pos_count > 0:
aggregate = 0.3 # Set mild positive
# 3. Sarcasm Detection (New Logic)
is_sarcasm, sarc_conf = detect_sarcasm_heuristic(clean, raw_txt, aggregate)
if is_sarcasm:
# Flip score: Positive -> Negative
if aggregate > 0:
aggregate = -0.5 * aggregate - 0.3
elif aggregate == 0:
aggregate = -0.4
lbl = "negatif"
else:
lbl = label_from_score(aggregate)
# 4. Negative Gate & Severity
# Check severity based on flipped score
neg_flag, severity = negative_gate(aggregate, raw_txt)
if is_sarcasm:
neg_flag = True
severity = max(severity, 0.6) # Sarkasme biasanya sakit
# 5. Category Scoring (ONLY FOR NEGATIVE CONTENT)
# Skip kategorisasi jika semua input positif (aggregate > 0 dan tidak ada sarkasme)
cat_scores = {}
reasons = {}
bucket_scores = defaultdict(float)
best_cat = None
best_bucket = None
cluster = None
if neg_flag or aggregate <= 0:
# HYBRID: Kategori Kecil + Bucket Agregat (ONLY FOR NEGATIVE)
cat_scores, reasons = score_categories_for_text(clean, categories_map, feedback)
# BOOST: Aggregate bucket scores dari kategori kecil
for cat, score in cat_scores.items():
tp_meta = TOPIC_INDEX.get(str(cat).upper())
if tp_meta and tp_meta.get("bucket"):
bucket_scores[tp_meta["bucket"]] += score * 0.8 # Slightly dampen aggregated
# Also score directly against bucket keywords (OLD METHOD)
if bucket_map:
bucket_direct, _ = score_categories_for_text(clean, bucket_map, feedback)
for bucket, score in bucket_direct.items():
bucket_scores[bucket] += score * 1.2 # Boost direct matches
# Find best kategori kecil
best_cat = max(cat_scores, key=cat_scores.get) if cat_scores else None
best_bucket = max(bucket_scores, key=bucket_scores.get) if bucket_scores else None
# Apply minimum confidence thresholds to reduce false positives
if best_cat and cat_scores.get(best_cat, 0.0) < 0.22:
best_cat = None
if best_bucket and bucket_scores.get(best_bucket, 0.0) < 0.25:
best_bucket = None
# 6. Cluster Labeling (Prioritize Kategori Kecil, fallback to Bucket)
if best_cat:
tp_meta = TOPIC_INDEX.get(str(best_cat).upper())
if tp_meta:
cluster = {
"id": tp_meta.get("kode"), # Match dengan kategori_masalahs.kode
"label": tp_meta.get("name"),
"bucket": tp_meta.get("bucket"),
"topic_id": tp_meta.get("kode"),
"topic_name": tp_meta.get("name"),
"confidence": round(cat_scores[best_cat], 3)
}
elif best_bucket:
# Fallback: Use bucket if no specific kategori kecil matched
cluster = {
"id": best_bucket,
"label": best_bucket,
"bucket": best_bucket,
"topic_id": None,
"topic_name": None,
"confidence": round(bucket_scores[best_bucket], 3)
}
# Else: Skip kategorisasi untuk input positif
# 7. Keywords Extraction
try:
rk = Rake(stopwords=STOPWORDS_ID_CHAT, min_length=1, max_length=3)
rk.extract_keywords_from_text(clean) # Use clean text
raw_phrases = [p.lower() for p in rk.get_ranked_phrases()[:8]]
except Exception:
raw_phrases = []
# Filter phrases
phrases = sorted(list(set(raw_phrases)), key=len)[:5]
# 8. Summary Text
if is_sarcasm:
summary_text = f"Terdeteksi sarkasme/sindiran. Inti keluhan: {', '.join(phrases[:3])}."
elif neg_flag and cluster:
summary_text = f"Masalah utama: {cluster['label']}. Gejala: {', '.join(phrases[:3])}."
elif neg_flag:
summary_text = f"Inti keluhan: {', '.join(phrases[:3])}."
else:
# Positive input - no categorization needed
summary_text = f"Ekspresi positif. Kata kunci: {', '.join(phrases[:3]) if phrases else 'tidak ada keluhan'}."
results.append({
"id": item_id,
"clean_text": clean,
"sentiment": {
"barasa": s_lex, "english": s_vad, "aggregate": aggregate, "label": lbl
},
"negative_flag": neg_flag,
"is_sarcasm": is_sarcasm, # Field Baru
"severity": severity,
"cluster": cluster,
"summary": summary_text,
"key_phrases": phrases,
"recommendations": [],
"cat_scores": cat_scores,
"cat_reasons": reasons,
})
per_legacy.append({
"id": item_id, "text": raw_txt, "sentiment": aggregate,
"label": lbl, "keywords": phrases
})
all_texts.append(clean)
# Collect negatives for clustering
if neg_flag:
negatives.append(clean)
ranked = sorted([(c, s) for c, s in cat_scores.items() if s > 0], key=lambda x: x[1], reverse=True)
per_entry_cats[item_id] = {
"ranked": ranked[:3],
"reasons": {c: reasons.get(c, []) for c, _ in ranked[:3]}
}
# --- AGGREGATION & CLUSTERING ---
# Global Keywords
keyphrases = extract_keyphrases(all_texts) if all_texts else []
# Clustering with IndoBERT
clusters = []
if len(negatives) >= 2:
used_engine = "tfidf"
X = None
# Try BERT
if tok and mdl:
try:
with torch.no_grad():
enc = tok(negatives, padding=True, truncation=True, max_length=128, return_tensors="pt").to(dev)
out = mdl(**enc)
cls = out.last_hidden_state[:, 0, :]
X = cls.detach().cpu().numpy()
if BERT_L2_NORMALIZE:
X = _l2_normalize_rows_dense(X)
used_engine = "bert"
except Exception as e:
print(f"⚠️ BERT error, falling back: {e}")
X = None
# Fallback TF-IDF
if X is None:
vec = _build_cluster_vectorizer() # Pastikan fungsi ini ada (helper lama)
X = vec.fit_transform(negatives)
k = 2 if len(negatives) == 2 else min(4, max(2, len(negatives)//2))
km = KMeans(n_clusters=k, n_init='auto', random_state=42)
y = km.fit_predict(X)
n_total = max(1, len(negatives))
for ci in range(k):
idxs = [i for i in range(len(negatives)) if y[i] == ci]
ex = [negatives[i] for i in idxs][:5]
size = len(idxs)
ratio = float(size) / float(n_total) if n_total else 0.0
# Simple label for UI hinting: top tokens from examples
try:
toks = extract_core_tokens(ex)
label = ", ".join(toks[:3]) if toks else ""
except Exception:
label = ""
# Taxonomy-based hint: map this cluster to the most likely topic/bucket
hint = {}
try:
joined = " . ".join([t for t in ex if isinstance(t, str) and t.strip()])
# Prefer kategori kecil (topic)
cc, rr = score_categories_for_text(joined, categories_map, feedback)
best_cat = max(cc, key=cc.get) if cc else None
if best_cat and cc.get(best_cat, 0.0) < 0.22:
best_cat = None
best_bucket = None
bb = {}
br = {}
if not best_cat and bucket_map:
bb, br = score_categories_for_text(joined, bucket_map, feedback)
best_bucket = max(bb, key=bb.get) if bb else None
if best_bucket and bb.get(best_bucket, 0.0) < 0.25:
best_bucket = None
if best_cat:
meta = TOPIC_INDEX.get(str(best_cat).upper()) or {}
hint = {
"type": "topic",
"name": meta.get("name") or str(best_cat),
"bucket": meta.get("bucket") or "",
"confidence": round(float(cc.get(best_cat, 0.0)), 3),
"keywords": (rr.get(best_cat) or [])[:5],
}
elif best_bucket:
hint = {
"type": "bucket",
"name": str(best_bucket),
"bucket": str(best_bucket),
"confidence": round(float(bb.get(best_bucket, 0.0)), 3),
"keywords": (br.get(best_bucket) or [])[:5],
}
except Exception:
hint = {}
clusters.append({
"cluster": int(ci),
"engine": used_engine,
"size": int(size),
"ratio": round(ratio, 4),
"label": label,
"hint": hint,
"examples": ex
})
# Overview Weighted by Severity & Sarcasm (KATEGORI KECIL - NEGATIVE ONLY)
cat_counter = Counter()
for r in results:
# ONLY count negative items for categorization
if not r.get("negative_flag"):
continue
sev = r.get("severity", 0.0)
weight = 1.0 + sev
# Aggregate by kategori kecil (topic)
cluster = r.get("cluster") or {}
topic_name = cluster.get("topic_name") or cluster.get("label")
if topic_name:
# Use cluster confidence as base score
score = cluster.get("confidence", 0.5)
cat_counter[topic_name] += score * weight
categories_overview = [
{"category": cat, "score": round(val, 4)} for cat, val in cat_counter.most_common()
]
# Summary Stats
avg = sum([x["sentiment"] for x in per_legacy]) / len(per_legacy) if per_legacy else 0.0
summary = {
"avg_sentiment": round(avg, 3),
"negative_ratio": round(sum(1 for x in per_legacy if x["label"]=="negatif")/len(per_legacy), 3) if per_legacy else 0.0
}
# NEW: Recommendations Generation PER KATEGORI KECIL (Granular)
# Laravel akan filter lebih lanjut berdasarkan master_rekomendasis.rules
def recommend_by_topic(topic_id: str, topic_name: str, bucket: str, severity_val: float, negative: bool, sarcasm: bool):
"""Generate recommendations based on kategori kecil (topic).
Returns structured data yang bisa di-match dengan master_rekomendasis di Laravel.
Format return:
{
"kategori_kode": topic_id, # Match dengan kategori_masalahs.kode
"kategori_nama": topic_name,
"bucket": bucket,
"severity": severity_val,
"negative": negative,
"sarcasm": sarcasm,
"suggested_actions": [...] # Heuristic suggestions (optional)
}
"""
rec = {
"kategori_kode": topic_id,
"kategori_nama": topic_name,
"bucket": bucket,
"severity": severity_val,
"negative": negative,
"sarcasm": sarcasm,
"suggested_actions": []
}
# Heuristic suggestions (Laravel akan filter sesuai master_rekomendasis)
if (negative or sarcasm) and severity_val >= 0.6:
rec["suggested_actions"].append({
"type": "URGENT",
"reason": "Severity tinggi atau terdeteksi sarkasme"
})
elif negative and severity_val >= 0.4:
rec["suggested_actions"].append({
"type": "MODERATE",
"reason": "Indikasi masalah perlu perhatian"
})
return rec
# Assign Recs per item (GRANULAR: Per Kategori Kecil)
for r in results:
cluster = r.get("cluster") or {}
topic_id = cluster.get("topic_id") or cluster.get("id")
topic_name = cluster.get("topic_name") or cluster.get("label")
bucket = cluster.get("bucket", "")
if topic_id:
# Return kategori kecil info untuk Laravel matching
r["recommendations"] = [recommend_by_topic(
topic_id,
topic_name,
bucket,
r.get("severity", 0),
r.get("negative_flag", False),
r.get("is_sarcasm", False)
)]
else:
# Fallback: No specific kategori detected
r["recommendations"] = []
# Global Recs (PER KATEGORI KECIL - Granular)
abs_sent = abs(avg)
global_recommendations = []
valid_cats = [c for c in categories_overview if c["score"] >= 0.05]
is_neg_avg = avg < -0.05
for cat in valid_cats:
cname = cat["category"]
meta = TOPIC_INDEX.get(cname.upper()) or {}
topic_id = meta.get("kode") or meta.get("id")
topic_name = meta.get("name", cname)
bucket = meta.get("bucket", "")
if topic_id:
rec_data = recommend_by_topic(
topic_id,
topic_name,
bucket,
max(0.3, abs_sent),
is_neg_avg,
False # No global sarcasm flag
)
global_recommendations.append({
"category": cname,
"kategori_kode": topic_id,
"score": cat["score"],
"recommendation": rec_data
})
return jsonify({
"version": SERVICE_VERSION,
"items": results,
"summary": summary,
"keyphrases": keyphrases,
"clusters": clusters,
"categories_overview": categories_overview,
"global_recommendations": global_recommendations,
})
@app.post("/feedback")
def feedback():
if not check_key():
return jsonify({"error": "unauthorized"}), 401
data = request.get_json(force=True) or {}
# expected: { keywords: ["telat","bolos"], from_category?: "AKADEMIK", to_category?: "DISIPLIN", delta?: 0.2 }
kws = data.get("keywords") or []
from_cat = str(data.get("from_category") or "").upper()
to_cat = str(data.get("to_category") or "").upper()
delta = float(data.get("delta") or 0.2)
if not kws or (not from_cat and not to_cat):
return jsonify({"error": "invalid payload"}), 422
weights = load_feedback_weights()
for kw in kws:
k = str(kw).lower().strip()
if not k:
continue
entry = weights.get(k, {})
# penalize from_cat slightly, reward to_cat (if provided)
if from_cat:
entry[from_cat] = float(entry.get(from_cat, 0.0)) - (delta / 2.0)
if to_cat:
entry[to_cat] = float(entry.get(to_cat, 0.0)) + delta
weights[k] = entry
save_feedback_weights(weights)
return jsonify({"ok": True, "updated": len(kws)})
@app.route("/feedback", methods=["POST"])
def receive_feedback():
"""
Receive teacher revision feedback for continuous learning.
Expected payload:
{
"revision_id": 123,
"original_text": "...",
"original_kategori": "AKADEMIK",
"original_rekomendasi": [...],
"revised_kategori": "DISIPLIN",
"revised_rekomendasi": [...],
"revision_notes": "..." (optional)
}
This endpoint will:
1. Extract keywords from original text
2. Penalize weights for original_kategori
3. Reward weights for revised_kategori
4. Learn from the correction pattern
"""
if not check_key():
return jsonify({"error": "unauthorized"}), 401
try:
data = request.get_json(force=True) or {}
revision_id = data.get("revision_id")
original_text = data.get("original_text", "")
original_kategori = str(data.get("original_kategori", "")).upper()
revised_kategori = str(data.get("revised_kategori", "")).upper()
if not original_text or not revised_kategori:
return jsonify({"error": "Missing required fields"}), 422
# Only learn if kategori was changed (not just rekomendasi)
if original_kategori == revised_kategori:
logger.info(f"Revision #{revision_id}: Kategori unchanged, skipping weight update")
return jsonify({
"ok": True,
"message": "Kategori unchanged, no weight update needed",
"revision_id": revision_id
})
# Extract keywords from original text
keywords = []
try:
# Simple keyword extraction - tokenize and filter stopwords
tokens = nltk.word_tokenize(original_text.lower())
filtered_tokens = [
t for t in tokens
if t.isalnum() and len(t) > 2
and t not in STOPWORDS_ID_CHAT
and t not in _CHAT_FILLERS
]
# Get top 10 most meaningful words
word_counts = Counter(filtered_tokens)
keywords = [word for word, _ in word_counts.most_common(10)]
logger.info(f"Revision #{revision_id}: Extracted keywords: {keywords}")
except Exception as e:
logger.warning(f"Failed to extract keywords: {e}")
# Fallback: split by space
keywords = [w for w in original_text.lower().split() if len(w) > 2][:10]
if not keywords:
return jsonify({
"ok": False,
"error": "Could not extract keywords from text"
}), 422
# Update feedback weights
weights = load_feedback_weights()
delta = 0.3 # Learning rate
for kw in keywords:
k = str(kw).lower().strip()
entry = weights.get(k, {})
# Penalize original (wrong) kategori
if original_kategori:
entry[original_kategori] = float(entry.get(original_kategori, 0.0)) - (delta / 2.0)
# Reward revised (correct) kategori
entry[revised_kategori] = float(entry.get(revised_kategori, 0.0)) + delta
weights[k] = entry
save_feedback_weights(weights)
logger.info(f"Revision #{revision_id}: Updated weights for {len(keywords)} keywords "
f"from {original_kategori}{revised_kategori}")
return jsonify({
"ok": True,
"message": "Feedback learned successfully",
"revision_id": revision_id,
"keywords_updated": len(keywords),
"correction": f"{original_kategori}{revised_kategori}"
})
except Exception as e:
logger.error(f"Error processing feedback: {e}", exc_info=True)
return jsonify({
"ok": False,
"error": str(e)
}), 500
@app.post("/sync-taxonomy")
def sync_taxonomy():
if not check_key():
return jsonify({"error": "unauthorized"}), 401
data = request.get_json(force=True) or {}
topics = data.get("topics", [])
buckets = data.get("buckets", [])
if not topics:
return jsonify({"error": "topics required"}), 422
try:
# Update taxonomy.json
taxonomy = {
"topics": topics,
"buckets": buckets,
"meta": {
"last_synced": datetime.now().isoformat(),
"version": "2.0",
"source": "api_sync"
}
}
with open(TAXONOMY_PATH, "w", encoding="utf-8") as f:
json.dump(taxonomy, f, ensure_ascii=False, indent=2)
# Force reload cache
_reload_taxonomy_if_changed(force=True)
logger.info(f"Taxonomy synced: {len(topics)} topics, {len(buckets)} buckets")
return jsonify({
"ok": True,
"topics_count": len(topics),
"buckets_count": len(buckets),
"message": "Taxonomy synced via API"
})
except Exception as e:
logger.error(f"Sync taxonomy error: {e}")
return jsonify({"error": str(e)}), 500
@app.get("/debug-taxonomy")
def get_current_taxonomy():
"""Endpoint untuk ngecek isi taxonomy yang sedang aktif di memori container"""
_reload_taxonomy_if_changed() # Pastikan kita baca yang terbaru
return jsonify({
"status": "ok",
"last_mtime": _TAX_MTIME,
"topics_count": len(_TAX.get("topics", [])),
"buckets_count": len(_TAX.get("buckets", [])),
"taxonomy_data": _TAX # Hati-hati kalau datanya kegedean
})
# Tambahkan ini di bagian bawah, sebelum if __name__ == "__main__":
@app.get("/intip-taxonomy")
def intip_taxonomy():
# Baca langsung file yang ada di dalam "Rumah" (Container) saat ini
try:
with open(TAXONOMY_PATH, "r", encoding="utf-8") as f:
isi_sekarang = json.load(f)
return jsonify({
"status": "Ini isi file di dalam container yang sedang jalan",
"waktu_cek": datetime.now().isoformat(),
"data": isi_sekarang
})
except Exception as e:
return jsonify({"error": str(e)})
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860)) # HF Space default port
app.run(host="0.0.0.0", port=port, debug=False) # debug=False untuk production