TA-FINAL-SPACE / app.py
fjrmhri's picture
Update app.py
b3036fa verified
"""
Indo Hoax Detector API β€” v3.3.1
[FIX-RC1] v3.2.0: Hapus dual inference, gunakan agregasi kalimat.
[FIX-RC2] v3.3.0: Tambah _aggregate_verdict dengan tie-breaking β†’ hoaks.
[FIX-RC3] v3.3.1: Perbaiki bias Rule-1 di _aggregate_verdict:
v3.3.0 Rule-1 β€” max(P_hoaks) >= THRESH_HIGH (0.80) β†’ auto hoaks.
Akibat: 6 kalimat fakta + 1 kalimat hoaks (P=0.93) β†’ verdict HOAKS (SALAH).
Rule-1 terlalu bias ke 1 kalimat ekstrem tanpa memperhitungkan mayoritas.
Fix: Hapus Rule-1. Gunakan murni majority vote dengan tie-breaking β†’ hoaks.
Logika akhir:
- hoax_count >= not_hoax_count AND hoax_count > 0 β†’ hoaks (tie β†’ hoaks)
- Sisanya β†’ fakta
Confidence dari sisi yang menang:
- Hoaks β†’ mean P(hoaks) kalimat berlabel hoaks
- Fakta β†’ mean P(fakta) kalimat berlabel fakta
p_hoax_doc β†’ selalu mean seluruh kalimat (representatif & informatif).
[FIX-PC1] v3.3.0: Perluas PETA_KATEGORI β€” tambah sinonim, negara, militer,
keamanan, dan variasi bahasa Indonesia.
"""
import json
import os
import random
import re
import threading
from collections import Counter, defaultdict
from threading import Lock
from typing import Any, Dict, Iterable, List, Optional, Tuple
import numpy as np
import torch
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from transformers import AutoModelForSequenceClassification, AutoTokenizer
# =========================
# Konfigurasi
# =========================
MODEL_ID = os.getenv("MODEL_ID", "fjrmhri/deteksi_hoaks_indobert")
SUBFOLDER = os.getenv("MODEL_SUBFOLDER", "") or None
MAX_LENGTH = int(os.getenv("MAX_LENGTH", "256"))
THRESH_HIGH = float(os.getenv("HOAX_THRESH_HIGH", "0.80"))
THRESH_MED = float(os.getenv("HOAX_THRESH_MED", "0.50"))
MIN_KATA_KALIMAT = int(os.getenv("MIN_KATA_KALIMAT", "8"))
THRESH_KALIMAT_PENDEK = float(os.getenv("THRESH_KALIMAT_PENDEK", "0.70"))
PREDICT_BATCH_SIZE = int(os.getenv("PREDICT_BATCH_SIZE", "64"))
SENTENCE_BATCH_SIZE = int(os.getenv("SENTENCE_BATCH_SIZE", "64"))
SENTENCE_AMBER_CONF = float(os.getenv("SENTENCE_AMBER_CONF", "0.70"))
BERTOPIC_EMBED_BATCH = int(os.getenv("BERTOPIC_EMBED_BATCH", "32"))
TOPIC_KEYWORDS_TOPK = int(os.getenv("TOPIC_KEYWORDS_TOPK", "3"))
TOPIC_BERTOPIC_MODEL_ID = os.getenv(
"TOPIC_BERTOPIC_MODEL_ID", "fjrmhri/deteksi_hoaks_bertopic"
).strip()
BERTOPIC_EMBED_MODEL_ID = os.getenv(
"BERTOPIC_EMBED_MODEL_ID",
"sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2",
)
ENABLE_LOGGING = os.getenv("ENABLE_HOAX_LOGGING", "0") == "1"
LOG_SAMPLE_RATE = float(os.getenv("HOAX_LOG_SAMPLE_RATE", "0.2"))
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
torch.set_float32_matmul_precision("high")
print("======================================")
print(f"Loading IndoBERT dari Hub : {MODEL_ID}")
print(f"Device : {DEVICE}")
print(f"THRESH_HIGH/MED : {THRESH_HIGH} / {THRESH_MED}")
print(f"THRESH_KALIMAT_PENDEK : {THRESH_KALIMAT_PENDEK} (< {MIN_KATA_KALIMAT} kata)")
print(f"BERTopic embed model : {BERTOPIC_EMBED_MODEL_ID}")
print("======================================")
# =========================
# Load IndoBERT (singleton)
# =========================
def _load_model_artifacts():
kw = {}
if SUBFOLDER:
kw["subfolder"] = SUBFOLDER
try:
tok = AutoTokenizer.from_pretrained(MODEL_ID, **kw)
mdl = AutoModelForSequenceClassification.from_pretrained(MODEL_ID, **kw)
return tok, mdl
except Exception as e:
if SUBFOLDER:
print(f"[WARN] Gagal load subfolder='{SUBFOLDER}', retry: {e}")
tok = AutoTokenizer.from_pretrained(MODEL_ID)
mdl = AutoModelForSequenceClassification.from_pretrained(MODEL_ID)
return tok, mdl
raise
tokenizer, model = _load_model_artifacts()
model.to(DEVICE)
model.eval()
ID2LABEL: Dict[int, str] = {0: "not_hoax", 1: "hoax"}
_THRESHOLD_OPTIMAL: float = 0.62
try:
from huggingface_hub import hf_hub_download
_cfg_path = hf_hub_download(MODEL_ID, "inference_config.json")
with open(_cfg_path, encoding="utf-8") as _f:
_inf_cfg = json.load(_f)
_THRESHOLD_OPTIMAL = float(_inf_cfg.get("threshold_optimal", 0.62))
print(f"[INFO] threshold_optimal dari inference_config.json: {_THRESHOLD_OPTIMAL}")
except Exception as _e:
print(f"[INFO] inference_config.json tidak tersedia ({_e}). Pakai {_THRESHOLD_OPTIMAL}")
# =========================
# [FIX-PC1] PETA_KATEGORI β€” diperluas dengan sinonim & konteks
# =========================
PETA_KATEGORI: List[Tuple[str, set]] = [
("Kriminal & Hukum", {
"polisi", "tersangka", "pengadilan", "hukum", "penjara", "korupsi",
"kpk", "pembunuhan", "penipuan", "sidang", "vonis", "kriminal",
"penyidikan", "jaksa", "hakim", "ditangkap", "ditahan", "terdakwa",
"dakwaan", "kejaksaan", "mahkamah", "peradilan", "pidana", "perdata",
"polri", "rutan", "lapas", "napi", "tahanan", "bui", "sel",
"persidangan", "putusan", "hukuman", "denda", "banding", "kasasi",
"penggeledahan", "penyitaan", "rekonstruksi", "otopsi", "visum",
"tipikor", "suap", "gratifikasi", "pencucian", "pemalsuan",
"penganiayaan", "pencurian", "perampokan", "narkoba", "narkotika",
"pelecehan", "pemerkosaan", "kejahatan", "pelaku", "korban kriminal",
}),
("Politik", {
"pemilu", "pilkada", "dpr", "partai", "kampanye", "bawaslu", "kpu",
"pilpres", "caleg", "koalisi", "oposisi", "legislasi", "debat",
"konstitusi", "suara", "demokrat", "golkar", "pdip", "gerindra",
"pks", "dpd", "mpr", "fraksi", "legislatif", "senator",
"dprd", "pilwalkot", "pilgub", "pilbup", "capres", "cawapres",
"paslon", "petahana", "tim sukses", "quick count", "real count",
"rekap suara", "money politics", "politik uang", "black campaign",
"kampanye hitam", "hoaks politik", "propaganda", "agitasi",
"referendum", "demokrasi", "oligarki", "populisme", "nasionalisme",
"pkb", "ppp", "pan", "nasdem", "hanura", "perindo", "psi",
"pemilih", "suara rakyat", "kebijakan publik", "anggaran negara",
}),
("Nasional & Pemerintahan", {
"kementerian", "menteri", "kebijakan", "asn", "pns", "pemerintah",
"presiden", "ibukota", "otonomi", "daerah", "regulasi", "proyek",
"pembangunan", "gubernur", "bupati", "walikota", "dprd", "pemda",
"anggaran", "apbn", "apbd", "perpres", "perda", "kabinet",
"wapres", "jokowi", "prabowo",
"sekretariat", "lembaga", "badan", "komisi", "dirjen", "direktorat",
"keppres", "inpres", "pp", "uu", "ruu", "peraturan", "undang-undang",
"ibu kota nusantara", "ikn", "brin", "bpk", "bpn", "bps",
"kemendag", "kemenhub", "kemenkes", "kemendikbud", "kementan",
"aparatur", "birokrasi", "reformasi birokrasi", "e-government",
"pengadaan", "tender", "proyek nasional", "infrastruktur nasional",
"bansos", "bantuan sosial", "subsidi", "blt", "pkh",
}),
("Ekonomi & Bisnis", {
"ekonomi", "saham", "investasi", "inflasi", "bank", "keuangan",
"pajak", "ihsg", "umkm", "harga", "pasar", "ekspor", "impor",
"startup", "bisnis", "perdagangan", "rupiah", "dolar", "kurs",
"bi", "ojk", "bumn", "swasta", "perusahaan", "modal", "aset",
"defisit", "surplus", "neraca", "pdb", "gdp",
"inflasi", "deflasi", "resesi", "stagflasi", "suku bunga",
"kredit", "pinjaman", "utang", "obligasi", "saham", "dividen",
"bursa efek", "bei", "forex", "valuta", "mata uang",
"pertumbuhan ekonomi", "kemiskinan", "pengangguran", "lapangan kerja",
"upah", "gaji", "phk", "tenaga kerja", "buruh", "pekerja",
"industri", "manufaktur", "produksi", "ekspansi", "merger",
"akuisisi", "ipo", "go public", "e-commerce", "marketplace",
"fintech", "kripto", "bitcoin", "blockchain", "digital economy",
"harga bahan pokok", "sembako", "beras", "minyak goreng", "bbm",
}),
("Kesehatan", {
"kesehatan", "penyakit", "dokter", "virus", "vaksin",
"obat", "bpjs", "pandemi", "medis", "gejala", "terapi", "pasien",
"klinis", "covid", "kemenkes", "epidemi", "wabah", "imunisasi",
"apotek", "farmasi", "faskes", "puskesmas", "nakes",
"rumah sakit", "rs", "poliklinik", "igd", "icu", "rawat inap",
"rawat jalan", "operasi", "bedah", "diagnosa", "resep",
"kanker", "diabetes", "hipertensi", "jantung", "stroke",
"dbd", "malaria", "tbc", "hiv", "aids", "hepatitis",
"mpox", "cacar", "flu", "demam", "batuk", "sesak napas",
"lockdown", "karantina", "isolasi", "klaster", "herd immunity",
"booster", "dosis", "suntik", "vaksinasi", "pfizer", "sinovac",
"herbal", "jamu", "suplemen", "vitamin", "nutrisi", "gizi",
"stunting", "gizi buruk", "obesitas", "kesehatan jiwa",
}),
("Teknologi & Sains", {
"teknologi", "internet", "aplikasi", "digital", "siber", "hacker",
"inovasi", "satelit", "algoritma", "data", "ai", "kecerdasan",
"buatan", "software", "hardware", "smartphone", "kominfo", "server",
"cloud", "robot",
"artificial intelligence", "machine learning", "deep learning",
"big data", "iot", "internet of things", "5g", "metaverse",
"virtual reality", "vr", "augmented reality", "ar",
"keamanan siber", "cybersecurity", "ransomware", "phishing",
"kebocoran data", "privasi digital", "enkripsi", "firewall",
"coding", "programming", "developer", "startup teknologi",
"komputasi", "prosesor", "chip", "semikonduktor",
"drone", "luar angkasa", "roket", "wahana", "lapan", "brin",
"riset", "penelitian", "jurnal", "ilmiah", "laboratorium",
}),
("Bencana & Cuaca", {
"gempa", "banjir", "cuaca", "bmkg", "tsunami", "longsor", "erupsi",
"badai", "evakuasi", "korban", "mitigasi", "iklim", "hujan", "angin",
"kebakaran", "bencana", "bnpb", "bpbd", "kekeringan", "rob", "topan",
"bencana alam", "force majeure", "tanah bergerak", "abrasi",
"angin puting beliung", "tornado", "siklon", "hujan es",
"banjir bandang", "banjir rob", "banjir lahar", "awan panas",
"gunung berapi", "vulkanik", "aktivitas seismik", "magnitudo",
"skala richter", "peringatan dini", "sirine", "tsunami warning",
"pengungsian", "shelter", "posko", "bantuan bencana",
"cuaca ekstrem", "el nino", "la nina", "perubahan iklim",
}),
("Olahraga", {
"olahraga", "sepakbola", "futsal", "basket", "bulutangkis", "atlet",
"turnamen", "medali", "piala", "fifa", "aff", "liga", "stadion",
"pertandingan", "klub", "pssi", "pbsi", "olimpiade",
"voli", "tenis", "badminton", "pemain", "pelatih",
"sea games", "asian games", "world cup", "euro", "copa",
"premier league", "serie a", "la liga", "bundesliga", "liga 1",
"timnas", "persib", "persija", "arema", "bali united",
"gol", "kartu merah", "kartu kuning", "offside", "penalti",
"skor", "klasemen", "degradasi", "promosi", "transfer pemain",
"sprint", "maraton", "lari", "renang", "senam", "tinju", "mma",
"e-sports", "gaming kompetitif", "esports",
}),
("Keamanan & Pertahanan", {
"militer", "tni", "angkatan darat", "angkatan laut", "angkatan udara",
"tentara", "prajurit", "pasukan", "batalyon", "komando",
"pertahanan", "senjata", "amunisi", "peluru", "meriam", "tank",
"pesawat tempur", "kapal perang", "kapal selam", "frigate",
"operasi militer", "latihan militer", "manuver", "gelar pasukan",
"konflik bersenjata", "perang saudara", "gerilya", "insurgensi",
"teror", "teroris", "bom", "ledakan", "serangan", "penembakan",
"separatis", "papua", "kkb", "opm", "kelompok bersenjata",
"natuna", "laut china selatan", "kedaulatan wilayah", "perbatasan",
"pertahanan nasional", "kemenhan", "mabes tni", "panglima",
"densus 88", "brimob", "kopassus", "kostrad", "marinir",
"intel", "intelijen", "bais", "bnpt", "deradikalisasi",
"pangkalan militer", "alutsista", "alutsista baru",
}),
("Internasional", {
"diplomasi", "perang", "konflik", "pbb", "nato", "geopolitik",
"internasional", "sanksi", "asean", "g20", "kedutaan", "wna", "visa",
"rusia", "russia", "ukraina", "ukraine", "amerika", "as", "usa",
"china", "cina", "tiongkok", "taiwan", "hongkong",
"eropa", "uni eropa", "inggris", "jerman", "perancis", "italia",
"jepang", "korea selatan", "korea utara", "india", "pakistan",
"iran", "arab saudi", "israel", "palestina", "gaza", "lebanon",
"suriah", "irak", "afghanistan", "turki", "mesir", "nigeria",
"australia", "kanada", "brazil", "meksiko",
"hubungan bilateral", "hubungan multilateral", "perjanjian",
"kerja sama internasional", "kunjungan kenegaraan", "state visit",
"konferensi internasional", "summit", "ktt",
"embargo", "blokade", "resolusi pbb", "dewan keamanan pbb",
"who", "imf", "world bank", "wto", "apec",
"pengungsi", "imigran", "asylum", "deportasi",
"hak asasi manusia", "ham internasional", "amnesty international",
"mata-mata", "espionase", "perang proxy", "perang dagang",
}),
("Pendidikan", {
"sekolah", "guru", "siswa", "mahasiswa", "kampus", "universitas",
"beasiswa", "kurikulum", "ujian", "akademik", "riset",
"kemendikbud", "snbp", "snbt", "sma", "smp", "sd", "dosen",
"rektor", "fakultas",
"pelajar", "murid", "pengajar", "pendidik", "tenaga pendidik",
"perguruan tinggi", "pt", "prodi", "jurusan", "semester",
"ipk", "skripsi", "tesis", "disertasi", "wisuda", "ijazah",
"akreditasi", "bsnp", "kemdikbud", "dikti",
"un", "ujian nasional", "seleksi masuk", "snmptn", "sbmptn",
"ppdb", "penerimaan peserta didik", "zonasi", "jalur prestasi",
"literasi", "numerasi", "kompetensi", "sertifikasi guru",
"tunjangan guru", "p3k", "cpns guru",
"bimbel", "les", "kursus", "pelatihan", "vokasi", "smk",
"pendidikan karakter", "anti bullying", "perundungan",
}),
("Transportasi & Infrastruktur", {
"jalan", "tol", "kereta", "bandara", "pelabuhan", "transportasi",
"kendaraan", "mrt", "lrt", "bus", "pesawat", "kapal",
"terminal", "stasiun", "garuda", "kemenhub",
"krl", "kereta cepat", "whoosh", "kai", "damri", "transjakarta",
"ojek online", "gojek", "grab", "taksi", "angkutan umum",
"jalan tol", "tol trans jawa", "tol trans sumatera",
"jembatan", "flyover", "underpass", "terowongan",
"bandara soetta", "bandara internasional", "runway",
"maskapai", "lion air", "batik air", "citilink", "airasia",
"kapal laut", "pelni", "asdp", "ferry",
"kecelakaan lalu lintas", "kemacetan", "tilang",
"sim", "stnk", "kir", "emisi kendaraan",
"bbm", "spbu", "subsidi bbm", "pertamax", "pertalite",
}),
("Lingkungan & Energi", {
"lingkungan", "energi", "listrik", "minyak", "gas", "emisi",
"polusi", "tambang", "pln", "pertamina", "karbon",
"hutan", "deforestasi", "sawit", "sampah",
"perubahan iklim", "climate change", "pemanasan global",
"emisi karbon", "co2", "gas rumah kaca", "net zero",
"energi terbarukan", "panel surya", "turbin angin", "pltm",
"pltu", "pltn", "nuklir", "geothermal", "panas bumi",
"batu bara", "batubara", "gas alam", "lng", "lpg",
"illegal logging", "pembalakan liar", "kebakaran hutan",
"asap", "kabut asap", "karhutla",
"pencemaran", "polusi udara", "polusi air", "polusi tanah",
"limbah", "limbah industri", "limbah b3", "sampah plastik",
"daur ulang", "zero waste", "bank sampah",
"konservasi", "satwa liar", "biodiversitas", "ekosistem",
"mangrove", "terumbu karang", "laut bersih",
"tambang nikel", "tambang emas", "tambang batu bara",
}),
("Hiburan & Gaya Hidup", {
"artis", "film", "musik", "konser", "selebritas", "bioskop", "drama",
"viral", "sinetron", "festival", "influencer", "lifestyle", "seleb",
"youtube", "instagram", "tiktok", "kuliner", "wisata",
"aktor", "aktris", "penyanyi", "band", "idol", "kpop", "anime",
"streaming", "netflix", "disney", "spotify", "podcast",
"game", "gaming", "esports", "twitch",
"fashion", "mode", "tren", "beauty", "skincare", "makeup",
"diet", "fitness", "gym", "olahraga gaya hidup",
"restoran", "kafe", "cafe", "food vlogger", "street food",
"destinasi wisata", "hotel", "resort", "villa",
"selebgram", "youtuber", "content creator", "buzzer",
"gosip", "scandal", "perceraian", "pernikahan seleb",
"award", "festival film", "box office",
}),
]
def _kategorisasi_teks(teks: str) -> Optional[Tuple[str, float]]:
teks_lower = teks.lower()
teks_clean = re.sub(r"[^\w\s]", " ", teks_lower)
token_set = set(teks_clean.split())
total = max(len(token_set), 1)
best_nama: Optional[str] = None
best_skor: float = 0.0
for nama, kata_kunci in PETA_KATEGORI:
hit = 0
for kw in kata_kunci:
if " " in kw:
if kw in teks_lower:
hit += 1
else:
if kw in token_set:
hit += 1
if hit == 0:
continue
skor = hit / total
if skor > best_skor:
best_skor = skor
best_nama = nama
if best_nama is None:
return None
return best_nama, _round6(best_skor)
# =========================
# BERTopic + SentenceTransformer Singleton
# =========================
_bertopic_model = None
_st_embedder = None
_bertopic_lock = Lock()
_bertopic_ready = False
def _load_bertopic_background():
global _bertopic_model, _st_embedder, _bertopic_ready
with _bertopic_lock:
if _bertopic_ready:
return
try:
from bertopic import BERTopic
from sentence_transformers import SentenceTransformer
print(f"[INFO] Loading BERTopic dari: {TOPIC_BERTOPIC_MODEL_ID}")
_bertopic_model = BERTopic.load(TOPIC_BERTOPIC_MODEL_ID)
print(f"[INFO] Loading SentenceTransformer: {BERTOPIC_EMBED_MODEL_ID}")
_st_embedder = SentenceTransformer(BERTOPIC_EMBED_MODEL_ID, device="cpu")
print("[INFO] BERTopic + embedder berhasil dimuat.")
except Exception as e:
print(f"[WARN] Gagal load BERTopic/embedder: {e}.")
_bertopic_model = None
_st_embedder = None
finally:
_bertopic_ready = True
threading.Thread(target=_load_bertopic_background, daemon=True).start()
def _get_bertopic_components() -> Tuple[Optional[Any], Optional[Any]]:
with _bertopic_lock:
if _bertopic_ready:
return _bertopic_model, _st_embedder
return None, None
# =========================
# FastAPI
# =========================
app = FastAPI(title="Indo Hoax Detector API", version="3.3.1")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =========================
# Schemas
# =========================
class PredictRequest(BaseModel):
text: str
class BatchPredictRequest(BaseModel):
texts: List[str]
class PredictResponse(BaseModel):
label: str
score: float
probabilities: Dict[str, float]
hoax_probability: float
risk_level: str
risk_explanation: str
class BatchPredictResponse(BaseModel):
results: List[PredictResponse]
class AnalyzeRequest(BaseModel):
text: str
topic_per_paragraph: bool = False
sentence_level: bool = True
class DocumentSummary(BaseModel):
paragraph_count: int
sentence_count: int
hoax_sentence_count: int
not_hoax_sentence_count: int
class DocumentAnalysis(BaseModel):
label: str
hoax_probability: float
confidence: float
risk_level: str
risk_explanation: str
sentence_aggregate_label: str
summary: DocumentSummary
class TopicInfo(BaseModel):
label: str
score: float
keywords: List[str]
class SentenceAnalysis(BaseModel):
sentence_index: int
text: str
label: str
probabilities: Dict[str, float]
hoax_probability: float
confidence: float
color: str
class ParagraphAnalysis(BaseModel):
paragraph_index: int
text: str
label: str
hoax_probability: float
confidence: float
topic: TopicInfo
sentences: List[SentenceAnalysis]
class SharedTopic(BaseModel):
label: str
paragraph_indices: List[int]
class AnalyzeMeta(BaseModel):
model_id: str
max_length: int
sentence_batch_size: int
threshold_used: Optional[float] = None
topic_model_used: str = "bertopic+rules"
class AnalyzeResponse(BaseModel):
document: DocumentAnalysis
paragraphs: List[ParagraphAnalysis]
shared_topics: List[SharedTopic]
topics_global: Optional[TopicInfo] = None
meta: AnalyzeMeta
# =========================
# Util
# =========================
PARAGRAPH_SPLIT_RE = re.compile(r"(?:\r?\n){2,}")
SENTENCE_SPLIT_RE = re.compile(r'[^.!?]+(?:[.!?]+(?:[")\]]+)?)|[^.!?]+$')
WS_RE = re.compile(r"\s+")
_FALLBACK_TOPIC = TopicInfo(label="topik_umum", score=0.0, keywords=["topik_umum"])
def _round6(v: float) -> float:
return float(round(float(v), 6))
def _iter_chunks(items: List[str], chunk_size: int) -> Iterable[List[str]]:
chunk_size = max(1, chunk_size)
for i in range(0, len(items), chunk_size):
yield items[i:i + chunk_size]
def _normalize_unit_text(text: str) -> str:
return WS_RE.sub(" ", str(text)).strip()
def _prepare_texts(texts: List[str]) -> List[str]:
return [_normalize_unit_text(t) if t else "[EMPTY]" for t in texts]
# =========================
# IndoBERT inference
# =========================
def _predict_proba(texts: List[str], batch_size: int = SENTENCE_BATCH_SIZE) -> List[Dict[str, float]]:
if not texts:
return []
prepared = _prepare_texts(texts)
unique_texts: List[str] = []
text_to_idx: Dict[str, int] = {}
inverse: List[int] = []
for t in prepared:
if t not in text_to_idx:
text_to_idx[t] = len(unique_texts)
unique_texts.append(t)
inverse.append(text_to_idx[t])
unique_results: List[Dict[str, float]] = []
for chunk in _iter_chunks(unique_texts, batch_size):
enc = tokenizer(
chunk, padding=True, truncation=True,
max_length=MAX_LENGTH, return_tensors="pt",
)
enc = {k: v.to(DEVICE) for k, v in enc.items()}
with torch.inference_mode():
probs = torch.softmax(model(**enc).logits, dim=-1).cpu().numpy()
for row in probs:
unique_results.append({
ID2LABEL.get(idx, str(idx)): float(p)
for idx, p in enumerate(row)
})
return [dict(unique_results[i]) for i in inverse]
def _extract_hoax_probability(prob_dict: Dict[str, float]) -> float:
if not prob_dict:
return 0.0
for k in prob_dict:
nk = re.sub(r"[^a-z0-9]+", "_", k.lower()).strip("_")
if nk in {"hoax", "hoaks"} or ("hoax" in nk and "not" not in nk and "non" not in nk):
return float(prob_dict[k])
if len(prob_dict) == 2:
for k in prob_dict:
nk = re.sub(r"[^a-z0-9]+", "_", k.lower()).strip("_")
if nk in {"not_hoax", "non_hoax", "fakta", "fact", "valid"}:
return float(1.0 - float(prob_dict[k]))
return 0.0
def _extract_not_hoax_probability(prob_dict: Dict[str, float], p_hoax: float) -> float:
if not prob_dict:
return float(1.0 - p_hoax)
for k in prob_dict:
nk = re.sub(r"[^a-z0-9]+", "_", k.lower()).strip("_")
if nk in {"not_hoax", "non_hoax", "fakta", "fact", "valid"}:
return float(prob_dict[k])
return float(max(0.0, min(1.0, 1.0 - p_hoax)))
def analyze_risk(p_hoax: float, original_text: Optional[str] = None) -> Tuple[str, str]:
if p_hoax > THRESH_HIGH:
level = "high"
explanation = (
f"Model sangat yakin teks ini hoaks (P(hoaks) β‰ˆ {p_hoax:.2%}). "
"Jangan disebarkan sebelum ada klarifikasi resmi."
)
elif p_hoax > max(THRESH_MED, _THRESHOLD_OPTIMAL):
level = "medium"
explanation = (
f"Model menilai teks ini berpotensi hoaks (P(hoaks) β‰ˆ {p_hoax:.2%}). "
"Cek ulang ke sumber resmi."
)
else:
level = "low"
explanation = (
f"Model menilai teks ini cenderung bukan hoaks (P(hoaks) β‰ˆ {p_hoax:.2%}). "
"Tetap gunakan literasi kritis."
)
if original_text is not None and len(str(original_text).strip().split()) < 5:
if level == "low":
level = "medium"
explanation += " Teks sangat pendek (< 5 kata), prediksi bisa kurang stabil."
return level, explanation
def _split_paragraphs(text: str) -> List[str]:
raw = str(text).strip()
if not raw:
return []
paragraphs = [p.strip() for p in PARAGRAPH_SPLIT_RE.split(raw) if p.strip()]
if len(paragraphs) <= 1 and "\n" in raw:
line_based = [p.strip() for p in raw.splitlines() if p.strip()]
if len(line_based) > 1:
return line_based
return paragraphs or [raw]
def _split_sentences(paragraph: str) -> List[str]:
normalized = _normalize_unit_text(paragraph)
if not normalized:
return []
sentences = [m.group(0).strip() for m in SENTENCE_SPLIT_RE.finditer(normalized)]
return [s for s in sentences if s] or [normalized]
def _sentence_color(label: str, confidence: float) -> str:
if label == "hoax":
return "red"
if confidence < SENTENCE_AMBER_CONF:
return "amber"
return "green"
def _to_canonical_label(p_hoax: float, teks: Optional[str] = None) -> str:
thresh = _THRESHOLD_OPTIMAL
if teks is not None:
n_kata = len(str(teks).strip().split())
if n_kata < MIN_KATA_KALIMAT:
thresh = THRESH_KALIMAT_PENDEK
return "hoax" if p_hoax >= thresh else "not_hoax"
# =========================
# BERTopic inference
# =========================
def _st_encode(texts: List[str], embedder) -> np.ndarray:
return embedder.encode(
texts, batch_size=BERTOPIC_EMBED_BATCH,
show_progress_bar=False, convert_to_numpy=True, normalize_embeddings=True,
)
def _infer_topic_per_paragraf(texts: List[str]) -> List[TopicInfo]:
rule_results: List[Optional[Tuple[str, float]]] = [
_kategorisasi_teks(t) for t in texts
]
idx_perlu_bertopic = [i for i, r in enumerate(rule_results) if r is None]
bertopic_map: Dict[int, TopicInfo] = {}
if idx_perlu_bertopic:
btm, embedder = _get_bertopic_components()
if btm is not None and embedder is not None:
try:
teks_subset = [texts[i] for i in idx_perlu_bertopic]
embeddings = _st_encode(teks_subset, embedder)
topic_ids, _ = btm.transform(teks_subset, embeddings=embeddings)
for local_i, (global_i, tid) in enumerate(
zip(idx_perlu_bertopic, topic_ids)
):
if tid == -1:
bertopic_map[global_i] = _FALLBACK_TOPIC
continue
topic_words = btm.get_topic(tid) or []
keywords = [w for w, _ in topic_words[:TOPIC_KEYWORDS_TOPK]]
score = float(topic_words[0][1]) if topic_words else 0.0
label = " / ".join(keywords[:2]) if keywords else f"topik_{tid}"
bertopic_map[global_i] = TopicInfo(
label=label, score=_round6(score), keywords=keywords
)
except Exception as e:
print(f"[WARN] BERTopic inference error: {e}")
final: List[TopicInfo] = []
for i in range(len(texts)):
rule_match = rule_results[i]
if rule_match is not None:
nama, skor = rule_match
final.append(TopicInfo(label=nama, score=skor, keywords=[nama]))
elif i in bertopic_map:
final.append(bertopic_map[i])
else:
final.append(_FALLBACK_TOPIC)
return final
def _build_predict_response(prob_dict: Dict[str, float], original_text: str) -> PredictResponse:
label = max(prob_dict, key=prob_dict.get)
score = float(prob_dict[label])
p_hoax = _extract_hoax_probability(prob_dict)
risk_level, risk_explanation = analyze_risk(p_hoax, original_text=original_text)
return PredictResponse(
label=label, score=score, probabilities=prob_dict,
hoax_probability=float(p_hoax), risk_level=risk_level,
risk_explanation=risk_explanation,
)
def _maybe_log(sample_info: Dict):
if not ENABLE_LOGGING:
return
if random.random() > LOG_SAMPLE_RATE:
return
print("[HOAX_LOG]", sample_info)
# =========================
# [FIX-RC2] Fungsi agregasi verdict dari kalimat
# =========================
def _aggregate_verdict(
all_sentences: List[SentenceAnalysis],
) -> Tuple[str, float, float]:
"""
Kembalikan (doc_label, p_hoax_doc, doc_conf).
[FIX-RC3] Logika final β€” murni majority vote:
Hoaks menang jika hoax_count >= not_hoax_count AND hoax_count > 0.
Tie (sama banyak) β†’ hoaks (lebih aman untuk sistem deteksi).
Selain itu β†’ fakta.
p_hoax_doc:
Selalu = mean P(hoaks) seluruh kalimat β€” representatif & informatif.
Ditampilkan frontend sebagai "P(hoaks): XX%".
doc_conf:
Confidence dari sisi yang menang, bukan dari mean keseluruhan:
- Hoaks β†’ mean P(hoaks) kalimat berlabel hoaks
- Fakta β†’ mean P(fakta) kalimat berlabel fakta
Contoh verifikasi:
6 fakta (Pβ‰ˆ0.0002) + 1 hoaks (P=0.9346):
hoax_count=1 < not_hoax_count=6 β†’ FAKTA
p_hoax_doc = 0.1352, doc_conf = 0.9998 (P(fakta) rata-rata) βœ“
1 hoaks (P=0.9346) + 1 fakta (Pβ‰ˆ0.0002):
hoax_count=1 == not_hoax_count=1 β†’ tie β†’ HOAKS
p_hoax_doc = 0.9346, doc_conf = 0.9346 βœ“
3 hoaks + 4 fakta:
hoax_count=3 < not_hoax_count=4 β†’ FAKTA βœ“
3 hoaks + 3 fakta:
tie β†’ HOAKS βœ“
"""
if not all_sentences:
return "not_hoax", 0.0, 0.0
hoax_sents = [s for s in all_sentences if s.label == "hoax"]
not_hoax_sents = [s for s in all_sentences if s.label == "not_hoax"]
hoax_count = len(hoax_sents)
not_hoax_count = len(not_hoax_sents)
mean_p_hoax = float(
sum(s.hoax_probability for s in all_sentences) / len(all_sentences)
)
if hoax_count >= not_hoax_count and hoax_count > 0:
# Hoaks menang atau tie β†’ hoaks
p_hoax_doc = float(
sum(s.hoax_probability for s in hoax_sents) / hoax_count
)
return "hoax", mean_p_hoax, p_hoax_doc
# Fakta menang
p_fakta_doc = float(
sum(1.0 - s.hoax_probability for s in not_hoax_sents) / not_hoax_count
) if not_hoax_sents else 0.5
return "not_hoax", mean_p_hoax, p_fakta_doc
# =========================
# Routes
# =========================
@app.get("/")
def read_root():
return {
"message": "Indo Hoax Detector API is running.",
"version": "3.3.1",
"model_id": MODEL_ID,
"id2label": ID2LABEL,
"threshold_optimal": _THRESHOLD_OPTIMAL,
"thresh_high": THRESH_HIGH,
"thresh_kalimat_pendek": THRESH_KALIMAT_PENDEK,
"min_kata_kalimat": MIN_KATA_KALIMAT,
"device": str(DEVICE),
"bertopic_ready": _bertopic_ready,
"bertopic_embed_model": BERTOPIC_EMBED_MODEL_ID,
"topic_model": "bertopic+rule-based",
"kategori": [nama for nama, _ in PETA_KATEGORI],
}
@app.get("/health")
def health_check():
return {"status": "ok", "bertopic_ready": _bertopic_ready}
@app.post("/predict", response_model=PredictResponse)
def predict(request: PredictRequest):
original_text = request.text
prob_list = _predict_proba([original_text], batch_size=1)
if not prob_list:
return PredictResponse(
label="unknown", score=0.0, probabilities={},
hoax_probability=0.0, risk_level="low",
risk_explanation="Teks kosong.",
)
response = _build_predict_response(prob_list[0], original_text=str(original_text))
_maybe_log({"route": "/predict", "label": response.label, "p_hoax": response.hoax_probability})
return response
@app.post("/predict-batch", response_model=BatchPredictResponse)
def predict_batch(request: BatchPredictRequest):
texts = request.texts or []
prob_list = _predict_proba(texts, batch_size=PREDICT_BATCH_SIZE)
results = [
_build_predict_response(pd, original_text=str(t))
for t, pd in zip(texts, prob_list)
]
return BatchPredictResponse(results=results)
@app.post("/analyze", response_model=AnalyzeResponse)
def analyze(request: AnalyzeRequest):
original_text = _normalize_unit_text(request.text)
base_meta = AnalyzeMeta(
model_id=MODEL_ID, max_length=MAX_LENGTH,
sentence_batch_size=SENTENCE_BATCH_SIZE,
threshold_used=_THRESHOLD_OPTIMAL,
topic_model_used="bertopic+rules",
)
if not original_text:
return AnalyzeResponse(
document=DocumentAnalysis(
label="not_hoax", hoax_probability=0.0, confidence=0.0,
risk_level="low", risk_explanation="Teks kosong.",
sentence_aggregate_label="not_hoax",
summary=DocumentSummary(
paragraph_count=0, sentence_count=0,
hoax_sentence_count=0, not_hoax_sentence_count=0,
),
),
paragraphs=[], shared_topics=[], topics_global=None, meta=base_meta,
)
# Step 1: split
paragraph_texts = _split_paragraphs(original_text)
sentence_texts: List[str] = []
sentence_map: List[Tuple[int, int]] = []
for p_idx, paragraph in enumerate(paragraph_texts):
for s_idx, sentence in enumerate(_split_sentences(paragraph)):
sentence_texts.append(sentence)
sentence_map.append((p_idx, s_idx))
# Step 2: inferensi per kalimat
sentence_prob_list = _predict_proba(sentence_texts, batch_size=SENTENCE_BATCH_SIZE)
# Step 3: bangun SentenceAnalysis
paragraph_sentences: List[List[SentenceAnalysis]] = [[] for _ in paragraph_texts]
for (p_idx, s_idx), sent_text, sent_prob_dict in zip(
sentence_map, sentence_texts, sentence_prob_list
):
p_hoax = _extract_hoax_probability(sent_prob_dict)
p_not_hoax = _extract_not_hoax_probability(sent_prob_dict, p_hoax)
sent_label = _to_canonical_label(p_hoax, teks=sent_text)
sent_conf = max(p_hoax, p_not_hoax)
paragraph_sentences[p_idx].append(SentenceAnalysis(
sentence_index=int(s_idx),
text=sent_text,
label=sent_label,
probabilities={"not_hoax": _round6(p_not_hoax), "hoax": _round6(p_hoax)},
hoax_probability=_round6(p_hoax),
confidence=_round6(sent_conf),
color=_sentence_color(sent_label, sent_conf),
))
# Step 4: [FIX-RC2] agregasi verdict dari kalimat
all_sentences_flat = [s for plist in paragraph_sentences for s in plist]
hoax_sentence_count = sum(1 for s in all_sentences_flat if s.label == "hoax")
not_hoax_sentence_count = sum(1 for s in all_sentences_flat if s.label == "not_hoax")
doc_label, p_hoax_doc, doc_conf = _aggregate_verdict(all_sentences_flat)
sentence_aggregate_label = doc_label
risk_level, risk_explanation = analyze_risk(p_hoax_doc, original_text=original_text)
# Step 5: topik
per_paragraph_topics = _infer_topic_per_paragraf(paragraph_texts)
if request.topic_per_paragraph:
topics_global = None
else:
label_counts: Counter = Counter(t.label for t in per_paragraph_topics)
most_common_label = label_counts.most_common(1)[0][0]
topics_global = next(
(t for t in per_paragraph_topics if t.label == most_common_label),
_FALLBACK_TOPIC,
)
# Step 6: bangun ParagraphAnalysis
paragraphs: List[ParagraphAnalysis] = []
for p_idx, p_text in enumerate(paragraph_texts):
sents = sorted(paragraph_sentences[p_idx], key=lambda x: x.sentence_index)
n_hoax = sum(1 for s in sents if s.label == "hoax")
n_not = sum(1 for s in sents if s.label == "not_hoax")
if sents:
p_max_hoax = max(s.hoax_probability for s in sents)
p_label = "hoax" if n_hoax >= n_not and n_hoax > 0 else "not_hoax"
p_conf = p_max_hoax if p_label == "hoax" else (1.0 - p_max_hoax)
else:
p_max_hoax = 0.0
p_label = "not_hoax"
p_conf = 0.0
topic_info = (
per_paragraph_topics[p_idx]
if p_idx < len(per_paragraph_topics)
else _FALLBACK_TOPIC
)
paragraphs.append(ParagraphAnalysis(
paragraph_index=int(p_idx),
text=p_text,
label=p_label,
hoax_probability=_round6(p_max_hoax),
confidence=_round6(p_conf),
topic=topic_info,
sentences=sents,
))
shared_topic_map: Dict[str, List[int]] = defaultdict(list)
for p in paragraphs:
shared_topic_map[p.topic.label].append(int(p.paragraph_index))
shared_topics = sorted(
[SharedTopic(label=lbl, paragraph_indices=idxs)
for lbl, idxs in shared_topic_map.items() if len(idxs) > 1],
key=lambda x: (x.paragraph_indices[0], x.label),
)
_maybe_log({
"route": "/analyze",
"doc_label": doc_label,
"doc_p_hoax": p_hoax_doc,
"hoax_sentence_count": hoax_sentence_count,
"paragraph_count": len(paragraphs),
})
return AnalyzeResponse(
document=DocumentAnalysis(
label=doc_label,
hoax_probability=_round6(p_hoax_doc),
confidence=_round6(doc_conf),
risk_level=risk_level,
risk_explanation=risk_explanation,
sentence_aggregate_label=sentence_aggregate_label,
summary=DocumentSummary(
paragraph_count=len(paragraphs),
sentence_count=hoax_sentence_count + not_hoax_sentence_count,
hoax_sentence_count=hoax_sentence_count,
not_hoax_sentence_count=not_hoax_sentence_count,
),
),
paragraphs=paragraphs,
shared_topics=shared_topics,
topics_global=topics_global,
meta=base_meta,
)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "7860"))
uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False)