gapura-oneclick / data /nlp_service.py
Muhammad Ridzki Nugraha
Deploy API and config (Batch 3)
07476a1 verified
"""
NLP Model Service for Gapura AI
Loads and uses trained NLP models for severity classification, issue type classification, etc.
Falls back to rule-based logic if models are not available.
"""
import os
import logging
import pickle
from typing import List, Dict, Any, Optional
import numpy as np
import inspect
logger = logging.getLogger(__name__)
_nlp_instance = None
def get_nlp_service():
"""Get or create singleton NLPModelService instance"""
global _nlp_instance
if _nlp_instance is None:
_nlp_instance = NLPModelService()
return _nlp_instance
class NLPModelService:
"""Service for NLP predictions using trained models or rule-based fallback"""
def __init__(self):
self.severity_model = None
self.severity_tokenizer = None
self.severity_label_encoder = None
self.severity_vectorizer = None
self.severity_classifier = None # For TF-IDF classifier
self.issue_model = None
self.issue_tokenizer = None
self.issue_label_encoder = None
self.summarizer = None
self.version = "1.0.0-rule-based"
self.models_loaded = False
self._load_models()
def _load_models(self):
"""Load or create NLP models"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
nlp_dir = os.path.join(base_dir, "models", "nlp")
multitask_onnx_path = os.path.join(
base_dir, "models", "multi_task_transformer.onnx"
)
multitask_pt_path = os.path.join(
base_dir, "models", "multi_task_transformer.pt"
)
encoders_pkl = os.path.join(base_dir, "models", "multi_task_label_encoders.pkl")
encoders_json = os.path.join(
base_dir, "models", "multi_task_label_encoders.json"
)
disable_multitask = os.getenv("NLP_DISABLE_MULTITASK", "").lower() in {
"1",
"true",
"yes",
}
allow_transformers = os.getenv("NLP_ALLOW_TRANSFORMERS", "").lower() in {
"1",
"true",
"yes",
}
prefer_multitask = os.getenv("NLP_PREFER_MULTITASK", "").lower() in {
"1",
"true",
"yes",
}
# Try severity_bert (fine-tuned) first, then severity_classifier
severity_bert_dir = os.path.join(nlp_dir, "severity_bert")
severity_classifier_dir = os.path.join(nlp_dir, "severity_classifier")
# Try fine-tuned severity_bert first
if os.path.exists(severity_bert_dir):
try:
cfg = os.path.join(severity_bert_dir, "config.json")
has_model = os.path.exists(
os.path.join(severity_bert_dir, "model.safetensors")
)
has_tok = os.path.exists(
os.path.join(severity_bert_dir, "tokenizer.json")
)
if os.path.exists(cfg) and has_model and has_tok:
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
)
import torch
logger.info("Loading fine-tuned severity_bert model...")
self.severity_tokenizer = AutoTokenizer.from_pretrained(
severity_bert_dir
)
self.severity_model = (
AutoModelForSequenceClassification.from_pretrained(
severity_bert_dir
)
)
self.severity_model.to(
torch.device("cuda" if torch.cuda.is_available() else "cpu")
)
self.severity_model.eval()
try:
with open(
os.path.join(severity_bert_dir, "label_encoder.pkl"), "rb"
) as f:
self.severity_label_encoder = pickle.load(f)
except Exception:
self.severity_label_encoder = {
"Low": 0,
"Medium": 1,
"High": 2,
"Critical": 3,
}
self.models_loaded = True
self.version = "3.0.0-severity-bert"
logger.info(
f"Loaded fine-tuned severity_bert model (v{self.version})"
)
if not prefer_multitask:
return
except Exception as e:
logger.warning(f"Failed to load severity_bert: {e}")
# Fallback to severity_classifier (TF-IDF or HF)
if not self.models_loaded and os.path.exists(severity_classifier_dir):
try:
cfg = os.path.join(severity_classifier_dir, "config.json")
has_pt = os.path.exists(
os.path.join(severity_classifier_dir, "pytorch_model.bin")
) or os.path.exists(
os.path.join(severity_classifier_dir, "model.safetensors")
)
has_tok = os.path.exists(
os.path.join(severity_classifier_dir, "tokenizer.json")
) or os.path.exists(os.path.join(severity_classifier_dir, "vocab.txt"))
if os.path.exists(cfg) and (has_pt or has_tok):
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
)
import torch
self.severity_tokenizer = AutoTokenizer.from_pretrained(
severity_classifier_dir
)
self.severity_model = (
AutoModelForSequenceClassification.from_pretrained(
severity_classifier_dir
)
)
self.severity_model.to(
torch.device("cuda" if torch.cuda.is_available() else "cpu")
)
try:
with open(
os.path.join(severity_classifier_dir, "label_encoder.pkl"),
"rb",
) as f:
self.severity_label_encoder = pickle.load(f)
except Exception:
self.severity_label_encoder = None
self.models_loaded = True
self.version = "2.0.0-hf"
logger.info("Loaded HF severity classifier")
if not prefer_multitask:
return
except Exception as e:
logger.warning(f"Failed to load HF severity classifier: {e}")
if self.severity_model is None and self.severity_vectorizer is None:
try:
with open(
os.path.join(severity_classifier_dir, "vectorizer.pkl"), "rb"
) as f:
self.severity_vectorizer = pickle.load(f)
with open(
os.path.join(severity_classifier_dir, "classifier.pkl"), "rb"
) as f:
self.severity_model = pickle.load(f)
try:
with open(
os.path.join(severity_classifier_dir, "label_encoder.pkl"),
"rb",
) as f:
label_data = pickle.load(f)
self.severity_label_encoder = label_data.get(
"reverse_map",
{0: "Critical", 1: "High", 2: "Medium", 3: "Low"},
)
except Exception:
self.severity_label_encoder = None
self.models_loaded = True
self.version = "1.0.0-trained-tfidf"
logger.info(
"Loaded trained severity classifier (TF-IDF + RandomForest)"
)
except Exception as e:
logger.warning(f"Failed to load TF-IDF classifier: {e}")
# Prefer ONNX multi-task only if explicitly preferred
if (
prefer_multitask
and not disable_multitask
and allow_transformers
and os.path.exists(multitask_onnx_path)
):
try:
from transformers import DistilBertTokenizer, AutoTokenizer
import onnxruntime as ort
import json
try:
self.multi_task_tokenizer = DistilBertTokenizer.from_pretrained(
"distilbert-base-uncased", local_files_only=True
)
except Exception:
self.multi_task_tokenizer = AutoTokenizer.from_pretrained(
"distilbert-base-uncased"
)
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = 1
sess_options.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
self.onnx_session = ort.InferenceSession(
multitask_onnx_path, sess_options
)
self.multi_task_label_encoders = {}
if os.path.exists(encoders_pkl):
try:
with open(encoders_pkl, "rb") as f:
self.multi_task_label_encoders = pickle.load(f)
except Exception:
self.multi_task_label_encoders = {}
elif os.path.exists(encoders_json):
try:
with open(encoders_json, "r", encoding="utf-8") as f:
self.multi_task_label_encoders = json.load(f)
except Exception:
self.multi_task_label_encoders = {}
self.models_loaded = True
self.version = "4.0.0-onnx"
logger.info(f"Loaded optimized ONNX multi-task model (v{self.version})")
return
except Exception as e:
logger.warning(f"Failed to load ONNX multi-task model: {e}")
# Fallback: PyTorch checkpoint (may be heavy)
if (
prefer_multitask
and not disable_multitask
and allow_transformers
and os.path.exists(multitask_pt_path)
):
try:
import torch
from transformers import DistilBertTokenizer
# Load metadata from PyTorch checkpoint
checkpoint = torch.load(
multitask_pt_path, weights_only=False, map_location="cpu"
)
self.multi_task_label_encoders = checkpoint["label_encoders"]
try:
self.multi_task_tokenizer = DistilBertTokenizer.from_pretrained(
"distilbert-base-uncased", local_files_only=True
)
except Exception:
raise RuntimeError("DistilBert tokenizer not available locally")
# Fallback to PyTorch model
from data.transformer_architecture import MultiTaskDistilBert
num_labels_dict = checkpoint["num_labels_dict"]
self.multi_task_model = MultiTaskDistilBert(num_labels_dict)
self.multi_task_model.load_state_dict(checkpoint["model_state_dict"])
self.multi_task_model.eval()
self.models_loaded = True
self.version = "4.0.0-synthetic-plus"
logger.info(
f"Loaded trained multi-task transformer model (v{self.version})"
)
return
except Exception as e:
logger.warning(f"Failed to load multi-task transformer: {e}")
# Try to load IndoBERT classifier (slower but more accurate)
bert_dir = os.path.join(nlp_dir, "severity_bert")
if allow_transformers and os.path.exists(bert_dir):
try:
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
)
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.severity_tokenizer = AutoTokenizer.from_pretrained(bert_dir)
self.severity_model = (
AutoModelForSequenceClassification.from_pretrained(bert_dir)
)
self.severity_model.to(device)
self.severity_model.eval()
with open(os.path.join(bert_dir, "label_encoder.pkl"), "rb") as f:
self.severity_label_encoder = pickle.load(f)
self.models_loaded = True
self.version = "1.0.0-trained-bert"
logger.info("Loaded trained IndoBERT severity classifier")
return
except Exception as e:
logger.warning(f"Failed to load IndoBERT classifier: {e}")
# If any model has been loaded successfully, keep it and skip fallback
if self.models_loaded:
logger.info("Using loaded NLP models")
return
else:
logger.info("No trained NLP models found, using rule-based fallback")
self.models_loaded = False
def classify_severity(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Classify severity of reports"""
# Multi-task model provides 'report' category (Irregularity vs Complaint)
if hasattr(self, "multi_task_model") and self.multi_task_model is not None:
# For severity, we still might want to use the dedicated severity model if it exists
# but here we can at least show we have the multi-task data.
# Actually, the user request was to upgrade to multi-task for ALL targets.
# If severity is NOT one of the multi-task targets, we use the fallback or dedicated model.
pass
# Check if IndoBERT model is loaded (dedicated severity)
if (
hasattr(self, "severity_model")
and self.severity_model is not None
and self.severity_tokenizer is not None
):
return self._classify_with_model(
texts,
self.severity_model,
self.severity_tokenizer,
self.severity_label_encoder,
)
# Check if TF-IDF model is loaded
if (
hasattr(self, "severity_vectorizer")
and self.severity_vectorizer is not None
):
return self._classify_with_tfidf(texts)
return self._classify_severity_rule_based(texts)
def _classify_with_tfidf(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Use trained TF-IDF + classifier for classification"""
results = []
X = self.severity_vectorizer.transform(texts)
predictions = self.severity_model.predict(X)
probabilities = self.severity_model.predict_proba(X)
for i, (pred, probs) in enumerate(zip(predictions, probabilities)):
if isinstance(self.severity_label_encoder, dict):
label_map = self.severity_label_encoder.get("label_map", {})
if isinstance(pred, (int, np.integer)):
label = label_map.get(pred, "Low")
else:
label = str(pred)
else:
label = str(pred)
confidence = float(max(probs))
results.append(
{
"severity": label,
"confidence": round(confidence, 2),
}
)
return results
def _classify_with_model(
self, texts: List[str], model, tokenizer, label_encoder
) -> List[Dict]:
"""Use trained model for classification"""
import torch
device = next(model.parameters()).device
results = []
with torch.no_grad():
for text in texts:
inputs = tokenizer(
text,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt",
)
inputs = {k: v.to(device) for k, v in inputs.items()}
sig = inspect.signature(model.forward)
allowed = set(sig.parameters.keys())
filtered_inputs = {k: v for k, v in inputs.items() if k in allowed}
outputs = model(**filtered_inputs)
probs = torch.softmax(outputs.logits, dim=1)
confidence, pred = torch.max(probs, dim=1)
if label_encoder is not None and hasattr(
label_encoder, "inverse_transform"
):
label = label_encoder.inverse_transform([pred.item()])[0]
elif hasattr(model, "config") and hasattr(model.config, "id2label"):
label = model.config.id2label.get(pred.item(), str(pred.item()))
else:
label = str(pred.item())
results.append(
{
"severity": label,
"confidence": confidence.item(),
}
)
return results
def _classify_severity_rule_based(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Rule-based severity classification fallback with bilingual keywords"""
critical_keywords = [
"emergency",
"darurat",
"critical",
"kritis",
"genting",
"severe",
"parah",
"serius",
"injury",
"cedera",
"luka",
"accident",
"kecelakaan",
"safety issue",
"masalah keselamatan",
"fire",
"kebakaran",
"api",
"explosion",
"ledakan",
"death",
"kematian",
"meninggal",
]
high_keywords = [
"damage",
"rusak",
"kerusakan",
"torn",
"robek",
"sobek",
"broken",
"pecah",
"patah",
"urgent",
"mendesak",
"segera",
"lost",
"hilang",
"stolen",
"dicuri",
"theft",
"pencurian",
"security",
"keamanan",
"safety",
"keselamatan",
]
medium_keywords = [
"delay",
"terlambat",
"keterlambatan",
"telat",
"late",
"terlambat",
"wrong",
"salah",
"incorrect",
"tidak benar",
"keliru",
"missing",
"hilang",
"tidak ada",
"error",
"kesalahan",
"galat",
"fail",
"gagal",
"kegagalan",
"problem",
"masalah",
"issue",
"isu",
"complaint",
"keluhan",
"reject",
"tolak",
"ditolak",
"complain",
"komplain",
"keluh",
]
low_keywords = [
"minor",
"kecil",
"ringan",
"small",
"sedikit",
"slight",
"tipis",
"normal",
"biasa",
"routine",
"rutin",
]
results = []
for text in texts:
text_lower = text.lower()
critical_count = sum(1 for kw in critical_keywords if kw in text_lower)
high_count = sum(1 for kw in high_keywords if kw in text_lower)
medium_count = sum(1 for kw in medium_keywords if kw in text_lower)
low_count = sum(1 for kw in low_keywords if kw in text_lower)
if critical_count >= 1:
severity = "Critical"
confidence = 0.92
elif high_count >= 2:
severity = "High"
confidence = 0.88
elif high_count >= 1:
severity = "High"
confidence = 0.82
elif medium_count >= 2:
severity = "Medium"
confidence = 0.78
elif medium_count >= 1:
severity = "Medium"
confidence = 0.72
elif low_count >= 1:
severity = "Low"
confidence = 0.85
else:
severity = "Low"
confidence = 0.80
results.append(
{
"severity": severity,
"confidence": confidence,
"keyword_counts": {
"critical": critical_count,
"high": high_count,
"medium": medium_count,
"low": low_count,
},
}
)
return results
def classify_issue_type(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Classify issue type using Multi-Task (issue_type) or legacy models"""
has_multitask = hasattr(self, "onnx_session") or (
hasattr(self, "multi_task_model") and self.multi_task_model is not None
)
if has_multitask and "issue_type" in self.multi_task_label_encoders:
multi_results = self._classify_with_multitask(texts)
return [
res.get("issue_type", {"label": "Unknown", "confidence": 0.0})
for res in multi_results
]
if self.issue_model is not None and self.issue_tokenizer is not None:
return self._classify_with_model(
texts, self.issue_model, self.issue_tokenizer, self.issue_label_encoder
)
return [{"label": "Unknown", "confidence": 0.5} for _ in texts]
def classify_root_cause(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Classify root cause using Multi-Task head"""
has_multitask = hasattr(self, "onnx_session") or (
hasattr(self, "multi_task_model") and self.multi_task_model is not None
)
if has_multitask and "root_cause" in self.multi_task_label_encoders:
multi_results = self._classify_with_multitask(texts)
return [
res.get("root_cause", {"label": "Unknown", "confidence": 0.0})
for res in multi_results
]
return [{"label": "Unknown", "confidence": 0.0} for _ in texts]
def predict_multi_task(self, texts: List[str]) -> Optional[List[Dict[str, Any]]]:
"""Public method for multi-task predictions"""
has_multitask = hasattr(self, "onnx_session") or (
hasattr(self, "multi_task_model") and self.multi_task_model is not None
)
if has_multitask:
return self._classify_with_multitask(texts)
return None
def _classify_with_multitask(
self, texts: List[str], batch_size: int = 16
) -> List[Dict[str, Any]]:
"""Inference using Multi-Task Transformer (ONNX or PyTorch)"""
results = []
# ONNX Inference
if hasattr(self, "onnx_session") and self.onnx_session is not None:
import numpy as np
output_names = [output.name for output in self.onnx_session.get_outputs()]
for i in range(0, len(texts), batch_size):
chunk = texts[i : i + batch_size]
# Tokenize
inputs = self.multi_task_tokenizer(
chunk,
padding=True,
truncation=True,
max_length=256,
return_tensors="np",
)
onnx_inputs = {
"input_ids": inputs["input_ids"].astype(np.int64),
"attention_mask": inputs["attention_mask"].astype(np.int64),
}
# Run inference
onnx_outputs = self.onnx_session.run(None, onnx_inputs)
# Process outputs (Vectorized)
chunk_results = [{} for _ in range(len(chunk))]
for k, output_name in enumerate(output_names):
logits_batch = onnx_outputs[k]
# Vectorized Softmax
max_logits = np.max(logits_batch, axis=1, keepdims=True)
exp_logits = np.exp(logits_batch - max_logits)
probs_batch = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
pred_indices = np.argmax(probs_batch, axis=1)
confidences = np.max(probs_batch, axis=1)
if output_name in self.multi_task_label_encoders:
le = self.multi_task_label_encoders[output_name]
try:
labels = le.inverse_transform(pred_indices)
except:
labels = pred_indices.astype(str)
else:
labels = pred_indices.astype(str)
for j in range(len(chunk)):
chunk_results[j][output_name] = {
"label": str(labels[j]),
"confidence": round(float(confidences[j]), 3),
}
results.extend(chunk_results)
return results
# PyTorch Inference
import torch
if not hasattr(self, "multi_task_model") or self.multi_task_model is None:
return []
device = next(self.multi_task_model.parameters()).device
results = []
# Process in chunks to balance speed and memory
for i in range(0, len(texts), batch_size):
chunk = texts[i : i + batch_size]
with torch.no_grad():
inputs = self.multi_task_tokenizer(
chunk,
padding=True,
truncation=True,
max_length=256,
return_tensors="pt",
)
inputs = {k: v.to(device) for k, v in inputs.items()}
logits_dict = self.multi_task_model(**inputs)
# Extract results for each item in the chunk
for j in range(len(chunk)):
prediction = {}
for key, logits in logits_dict.items():
# Get probabilities for the j-th record in batch
probs = torch.softmax(logits[j : j + 1], dim=1)
confidence, pred = torch.max(probs, dim=1)
if key in self.multi_task_label_encoders:
le = self.multi_task_label_encoders[key]
label = le.inverse_transform([pred.item()])[0]
prediction[key] = {
"label": str(label),
"confidence": round(float(confidence.item()), 3),
}
results.append(prediction)
return results
def summarize(self, text: str) -> Dict[str, Any]:
"""Generate summary of report"""
if self.summarizer is not None:
return self.summarizer.summarize(text)
return self._simple_summarize(text)
def _simple_summarize(self, text: str) -> Dict[str, Any]:
"""Simple extractive summarization fallback"""
max_chars = int(os.getenv("SUMMARY_MAX_CHARS", "600"))
if not text or len(text) < 50:
return {
"executiveSummary": (text[:max_chars] + "...")
if len(text) > max_chars
else text,
"keyPoints": [],
}
important_keywords = [
"damage",
"torn",
"broken",
"cargo",
"baggage",
"passenger",
"delay",
"late",
"error",
"fail",
]
sentences = text.replace("!", ".").replace("?", ".").split(".")
sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
sentence_scores = []
for sent in sentences:
score = sum(1 for kw in important_keywords if kw in sent.lower())
sentence_scores.append((sent, score))
sentence_scores.sort(key=lambda x: x[1], reverse=True)
top_sentences = [s[0] for s in sentence_scores[:3]]
summary = ". ".join(top_sentences) + "." if top_sentences else text
key_points = []
if any(kw in text.lower() for kw in ["cargo", "uld", "kargo"]):
key_points.append("Cargo-related issue")
if any(kw in text.lower() for kw in ["baggage", "bag", "bagasi"]):
key_points.append("Baggage handling issue")
if any(kw in text.lower() for kw in ["passenger", "pax", "penumpang"]):
key_points.append("Passenger service issue")
if any(
kw in text.lower()
for kw in ["damage", "torn", "broken", "rusak", "pecah", "robek"]
):
key_points.append("Physical damage reported")
if any(kw in text.lower() for kw in ["delay", "terlambat", "telat"]):
key_points.append("Delay reported")
return {
"executiveSummary": (summary[:max_chars] + "...")
if len(summary) > max_chars
else summary,
"keyPoints": key_points[:5],
}
def analyze_urgency(self, texts: List[str]) -> List[Dict[str, Any]]:
critical = {
"emergency": 1.2,
"darurat": 1.2,
"critical": 1.2,
"kritis": 1.2,
"severe": 1.1,
"parah": 1.1,
"injury": 1.2,
"cedera": 1.2,
"death": 1.3,
"kematian": 1.3,
"fire": 1.3,
"kebakaran": 1.3,
"explosion": 1.3,
"ledakan": 1.3,
"evacuate": 1.2,
"evakuasi": 1.2,
"safety": 1.1,
"keselamatan": 1.1,
"security": 1.0,
"keamanan": 1.0,
"accident": 1.2,
"kecelakaan": 1.2,
}
high = {
"damage": 1.0,
"rusak": 1.0,
"broken": 1.0,
"pecah": 1.0,
"patah": 1.0,
"torn": 0.9,
"robek": 0.9,
"spillage": 0.9,
"bocor": 0.9,
"lost": 0.8,
"hilang": 0.8,
"stolen": 0.9,
"dicuri": 0.9,
"theft": 0.9,
"pencurian": 0.9,
"unsafe": 1.0,
"berbahaya": 1.0,
}
medium = {
"delay": 0.6,
"terlambat": 0.6,
"telat": 0.6,
"late": 0.6,
"misload": 0.6,
"salah muat": 0.6,
"wrong": 0.5,
"incorrect": 0.5,
"keliru": 0.5,
"missing": 0.5,
"tidak ada": 0.5,
"error": 0.5,
"kesalahan": 0.5,
"fail": 0.5,
"gagal": 0.5,
"complaint": 0.5,
"keluhan": 0.5,
"complain": 0.5,
"komplain": 0.5,
}
low = {
"minor": 0.3,
"kecil": 0.3,
"ringan": 0.3,
"small": 0.3,
"slight": 0.3,
"normal": 0.2,
"rutin": 0.2,
"routine": 0.2,
}
intensifiers = {
"very",
"sangat",
"extremely",
"sangatlah",
"urgent",
"mendesak",
"segera",
"immediately",
"secepatnya",
}
deintensifiers = {"slight", "sedikit", "minor", "low", "ringan"}
negations = {"no", "not", "tidak", "bukan", "tanpa", "false alarm"}
results = []
for text in texts:
tl = (text or "").lower()
found = set()
score = 0.0
for kw, w in critical.items():
if kw in tl:
found.add(kw)
score += w
for kw, w in high.items():
if kw in tl:
found.add(kw)
score += w
for kw, w in medium.items():
if kw in tl:
found.add(kw)
score += w
for kw, w in low.items():
if kw in tl:
found.add(kw)
score += w
intens_count = sum(1 for t in intensifiers if t in tl)
if intens_count > 0 and score > 0:
score *= min(1.4, 1.0 + 0.15 * intens_count)
deint_count = sum(1 for t in deintensifiers if t in tl)
if deint_count > 0:
score *= max(0.6, 1.0 - 0.1 * deint_count)
if any(n in tl for n in negations) and any(
k in tl
for k in [
"damage",
"rusak",
"broken",
"pecah",
"injury",
"cedera",
"fire",
"kebakaran",
]
):
score *= 0.6
excl_bonus = min(0.1, tl.count("!") * 0.03)
caps_ratio = 0.0
letters = [c for c in text if c.isalpha()]
if letters:
caps_ratio = sum(1 for c in letters if c.isupper()) / len(letters)
caps_bonus = 0.1 if caps_ratio > 0.35 else 0.0
score += excl_bonus + caps_bonus
norm = min(1.0, score / 3.0)
if norm >= 0.75:
sentiment = "Critical Negative"
elif norm >= 0.5:
sentiment = "Negative"
elif norm >= 0.25:
sentiment = "Somewhat Negative"
else:
sentiment = "Neutral"
results.append(
{
"urgency_score": round(norm, 2),
"sentiment": sentiment,
"keywords": sorted(list(found))[:15],
}
)
return results