gapura-ai / data /nlp_service.py
gapura-dev's picture
Upload folder using huggingface_hub
1e84b90 verified
"""
NLP Model Service for Gapura AI
Loads and uses trained NLP models for severity classification, issue type classification, etc.
Falls back to rule-based logic if models are not available.
"""
import os
import logging
import pickle
import hashlib
import time
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from functools import lru_cache
logger = logging.getLogger(__name__)
EMBEDDING_CACHE: Dict[str, Tuple[np.ndarray, float]] = {}
EMBEDDING_CACHE_TTL = 3600
EMBEDDING_CACHE_MAX_SIZE = 5000
_nlp_instance = None
def get_nlp_service():
"""Get or create singleton NLPModelService instance"""
global _nlp_instance
if _nlp_instance is None:
_nlp_instance = NLPModelService()
return _nlp_instance
class NLPModelService:
"""Service for NLP predictions using trained models or rule-based fallback"""
def __init__(self):
labels_env = os.getenv("MULTITASK_SEVERITY_LABELS")
if labels_env:
parts = [p.strip() for p in labels_env.split(",") if p.strip()]
self._severity_index_to_label = {str(i): parts[i] for i in range(len(parts))}
else:
self._severity_index_to_label = {
"0": "Low",
"1": "Medium",
"2": "High",
"3": "Critical",
}
self.urgency_critical_threshold = float(os.getenv("URGENCY_CRITICAL_THRESHOLD", "0.85"))
self.urgency_negative_threshold = float(os.getenv("URGENCY_NEGATIVE_THRESHOLD", "0.6"))
self.urgency_somewhat_threshold = float(os.getenv("URGENCY_SOMEWHAT_THRESHOLD", "0.35"))
self.urgency_norm_denom = float(os.getenv("URGENCY_NORM_DENOM", "3.5"))
self.urgency_intensifier_scale = float(os.getenv("URGENCY_INTENSIFIER_SCALE", "0.1"))
self.urgency_deintensifier_scale = float(os.getenv("URGENCY_DEINTENSIFIER_SCALE", "0.15"))
self.urgency_negation_multiplier = float(os.getenv("URGENCY_NEGATION_MULTIPLIER", "0.5"))
self.severity_model = None
self.severity_tokenizer = None
self.severity_label_encoder = None
self.severity_vectorizer = None
self.severity_classifier = None # For TF-IDF classifier
self.issue_model = None
self.issue_tokenizer = None
self.issue_label_encoder = None
self.summarizer = None
self.version = "1.0.0-rule-based"
self.models_loaded = False
from .remote_client import get_remote_client
self.remote = get_remote_client()
self._load_models()
def _load_models(self):
"""Load or create NLP models"""
if getattr(self, "remote", None) and self.remote.enabled and os.getenv("NLP_REMOTE_ONLY", "1").lower() in {"1","true","yes"}:
self.version = "remote-dl"
self.models_loaded = False
return
rid = os.getenv("NLP_MODEL_REPO_ID") or os.getenv("MODEL_REPO_ID")
roots = []
app_models = os.path.join("/app", "models")
if os.path.isdir(app_models):
roots.append(app_models)
if rid:
try:
from huggingface_hub import snapshot_download
cache_dir = snapshot_download(repo_id=rid)
roots.append(cache_dir)
except Exception:
pass
roots.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def _path(*parts):
for r in roots:
p1 = os.path.join(r, *parts)
if os.path.exists(p1):
return p1
p2 = os.path.join(r, "models", *parts)
if os.path.exists(p2):
return p2
return os.path.join(roots[0], *parts)
nlp_dir = _path("nlp")
multitask_onnx_path = _path("multi_task_transformer.onnx")
multitask_pt_path = _path("multi_task_transformer.pt")
encoders_pkl = _path("multi_task_label_encoders.pkl")
encoders_json = _path("multi_task_label_encoders.json")
disable_multitask = os.getenv("NLP_DISABLE_MULTITASK", "").lower() in {
"1",
"true",
"yes",
}
allow_transformers = os.getenv("NLP_ALLOW_TRANSFORMERS", "").lower() in {
"1",
"true",
"yes",
}
prefer_multitask = os.getenv("NLP_PREFER_MULTITASK", "").lower() in {
"1",
"true",
"yes",
}
if (
prefer_multitask
and not disable_multitask
and os.path.exists(multitask_onnx_path)
):
try:
from transformers import AutoTokenizer
import onnxruntime as ort
import json
try:
self.multi_task_tokenizer = AutoTokenizer.from_pretrained(
"distilbert-base-uncased", local_files_only=True, use_fast=True
)
except Exception:
self.multi_task_tokenizer = AutoTokenizer.from_pretrained(
"distilbert-base-uncased", use_fast=True
)
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = int(os.getenv("ONNX_THREADS", "1"))
sess_options.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
int8_onnx_path = _path("multi_task_transformer_int8.onnx")
use_int8 = os.path.exists(int8_onnx_path) and os.getenv(
"ONNX_USE_INT8", "1"
).lower() in {"1", "true", "yes"}
selected_onnx_path = int8_onnx_path if use_int8 else multitask_onnx_path
self.onnx_session = ort.InferenceSession(
selected_onnx_path, sess_options
)
self.onnx_int8 = use_int8
self.multi_task_label_encoders = {}
if os.path.exists(encoders_pkl):
try:
with open(encoders_pkl, "rb") as f:
self.multi_task_label_encoders = pickle.load(f)
except Exception:
self.multi_task_label_encoders = {}
elif os.path.exists(encoders_json):
try:
with open(encoders_json, "r", encoding="utf-8") as f:
self.multi_task_label_encoders = json.load(f)
except Exception:
self.multi_task_label_encoders = {}
self.models_loaded = True
self.version = "4.0.0-onnx-int8" if use_int8 else "4.0.0-onnx"
logger.info(f"Loaded optimized ONNX multi-task model (v{self.version})")
return
except Exception as e:
logger.warning(f"Failed to load ONNX multi-task model: {e}")
# First try a lightweight trained TF-IDF severity classifier
severity_dir = os.path.join(nlp_dir, "severity_classifier")
if os.path.exists(severity_dir):
# Try HuggingFace format first (only if real HF weights/tokenizer exist)
try:
cfg = os.path.join(severity_dir, "config.json")
has_pt = os.path.exists(
os.path.join(severity_dir, "pytorch_model.bin")
) or os.path.exists(os.path.join(severity_dir, "model.safetensors"))
has_tok = os.path.exists(
os.path.join(severity_dir, "tokenizer.json")
) or os.path.exists(os.path.join(severity_dir, "vocab.txt"))
if os.path.exists(cfg) and (has_pt or has_tok):
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
)
import torch
self.severity_tokenizer = AutoTokenizer.from_pretrained(
severity_dir
)
self.severity_model = (
AutoModelForSequenceClassification.from_pretrained(severity_dir)
)
self.severity_model.to(
torch.device("cuda" if torch.cuda.is_available() else "cpu")
)
try:
with open(
os.path.join(severity_dir, "label_encoder.pkl"), "rb"
) as f:
self.severity_label_encoder = pickle.load(f)
except Exception:
self.severity_label_encoder = None
self.models_loaded = True
ver = os.getenv("NLP_SEVERITY_MODEL_VERSION")
if not ver:
try:
import json
cfg_path = os.path.join(severity_dir, "config.json")
if os.path.exists(cfg_path):
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
ver = cfg.get("version")
except Exception:
ver = None
self.version = ver or "2.0.0-hf"
logger.info("Loaded HF severity classifier")
if not prefer_multitask:
return
except Exception as e:
logger.warning(f"Failed to load HF severity classifier: {e}")
# Fallback to TF-IDF artifacts if present
try:
with open(os.path.join(severity_dir, "vectorizer.pkl"), "rb") as f:
self.severity_vectorizer = pickle.load(f)
with open(os.path.join(severity_dir, "classifier.pkl"), "rb") as f:
self.severity_model = pickle.load(f)
try:
with open(
os.path.join(severity_dir, "label_encoder.pkl"), "rb"
) as f:
label_obj = pickle.load(f)
# Support either a dict-based mapping or a sklearn LabelEncoder
if isinstance(label_obj, dict) and "reverse_map" in label_obj:
self.severity_label_encoder = label_obj
elif hasattr(label_obj, "classes_"):
self.severity_label_encoder = label_obj
else:
self.severity_label_encoder = None
except Exception:
self.severity_label_encoder = None
self.models_loaded = True
self.version = "1.0.0-trained-tfidf"
logger.info(
"Loaded trained severity classifier (TF-IDF + RandomForest)"
)
if not prefer_multitask:
return
except Exception as e:
logger.warning(f"Failed to load TF-IDF classifier: {e}")
# Prefer ONNX multi-task only if explicitly preferred
if (
prefer_multitask
and not disable_multitask
and allow_transformers
and os.path.exists(multitask_onnx_path)
):
try:
from transformers import AutoTokenizer
import onnxruntime as ort
import json
self.multi_task_tokenizer = AutoTokenizer.from_pretrained(
"distilbert-base-uncased", local_files_only=True, use_fast=True
)
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = 1
sess_options.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
self.onnx_session = ort.InferenceSession(
multitask_onnx_path, sess_options
)
self.multi_task_label_encoders = {}
if os.path.exists(encoders_pkl):
try:
with open(encoders_pkl, "rb") as f:
self.multi_task_label_encoders = pickle.load(f)
except Exception:
self.multi_task_label_encoders = {}
elif os.path.exists(encoders_json):
try:
with open(encoders_json, "r", encoding="utf-8") as f:
self.multi_task_label_encoders = json.load(f)
except Exception:
self.multi_task_label_encoders = {}
self.models_loaded = True
self.version = "4.0.0-onnx"
logger.info(f"Loaded optimized ONNX multi-task model (v{self.version})")
return
except Exception as e:
logger.warning(f"Failed to load ONNX multi-task model: {e}")
# Fallback: PyTorch checkpoint (may be heavy)
if (
prefer_multitask
and not disable_multitask
and allow_transformers
and os.path.exists(multitask_pt_path)
):
try:
import torch
from transformers import DistilBertTokenizer
# Load metadata from PyTorch checkpoint
checkpoint = torch.load(
multitask_pt_path, weights_only=False, map_location="cpu"
)
self.multi_task_label_encoders = checkpoint["label_encoders"]
try:
self.multi_task_tokenizer = DistilBertTokenizer.from_pretrained(
"distilbert-base-uncased", local_files_only=True
)
except Exception:
raise RuntimeError("DistilBert tokenizer not available locally")
# Fallback to PyTorch model
from data.transformer_architecture import MultiTaskDistilBert
num_labels_dict = checkpoint["num_labels_dict"]
self.multi_task_model = MultiTaskDistilBert(num_labels_dict)
self.multi_task_model.load_state_dict(checkpoint["model_state_dict"])
self.multi_task_model.eval()
self.models_loaded = True
self.version = "4.0.0-synthetic-plus"
logger.info(
f"Loaded trained multi-task transformer model (v{self.version})"
)
return
except Exception as e:
logger.warning(f"Failed to load multi-task transformer: {e}")
# Try to load HF Issue Classifier
issue_dir = os.path.join(nlp_dir, "issue_classifier")
if allow_transformers and os.path.exists(issue_dir):
try:
if os.path.exists(os.path.join(issue_dir, "config.json")):
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
)
import torch
self.issue_tokenizer = AutoTokenizer.from_pretrained(issue_dir)
self.issue_model = (
AutoModelForSequenceClassification.from_pretrained(issue_dir)
)
self.issue_model.to(
torch.device("cuda" if torch.cuda.is_available() else "cpu")
)
try:
with open(
os.path.join(issue_dir, "label_encoder.pkl"), "rb"
) as f:
self.issue_label_encoder = pickle.load(f)
except Exception:
self.issue_label_encoder = None
self.models_loaded = True
logger.info("Loaded HF issue classifier")
except Exception as e:
logger.warning(f"Failed to load HF issue classifier: {e}")
# Legacy IndoBERT path support
bert_dir = os.path.join(nlp_dir, "severity_bert")
if allow_transformers and os.path.exists(bert_dir):
try:
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
)
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.severity_tokenizer = AutoTokenizer.from_pretrained(bert_dir)
self.severity_model = (
AutoModelForSequenceClassification.from_pretrained(bert_dir)
)
self.severity_model.to(device)
self.severity_model.eval()
with open(os.path.join(bert_dir, "label_encoder.pkl"), "rb") as f:
self.severity_label_encoder = pickle.load(f)
self.models_loaded = True
self.version = "1.0.0-trained-bert"
logger.info("Loaded trained IndoBERT severity classifier")
return
except Exception as e:
logger.warning(f"Failed to load IndoBERT classifier: {e}")
try:
summ_path = os.path.join(nlp_dir, "summarizer.pkl")
if os.path.exists(summ_path):
with open(summ_path, "rb") as f:
self.summarizer = pickle.load(f)
except Exception as e:
logger.warning(f"Failed to load summarizer: {e}")
logger.info("No trained NLP models found, using rule-based fallback")
self.models_loaded = False
def _get_embedding_cache_key(self, text: str) -> str:
return hashlib.md5(text.encode("utf-8")).hexdigest()
def _get_cached_embedding(self, text: str) -> Optional[np.ndarray]:
key = self._get_embedding_cache_key(text)
if key in EMBEDDING_CACHE:
embedding, expiry = EMBEDDING_CACHE[key]
if time.time() < expiry:
return embedding
del EMBEDDING_CACHE[key]
return None
def _cache_embedding(self, text: str, embedding: np.ndarray):
if len(EMBEDDING_CACHE) >= EMBEDDING_CACHE_MAX_SIZE:
now = time.time()
expired = [k for k, (v, e) in EMBEDDING_CACHE.items() if e < now]
for k in expired:
del EMBEDDING_CACHE[k]
if len(EMBEDDING_CACHE) >= EMBEDDING_CACHE_MAX_SIZE:
oldest_key = min(
EMBEDDING_CACHE.keys(), key=lambda k: EMBEDDING_CACHE[k][1]
)
del EMBEDDING_CACHE[oldest_key]
key = self._get_embedding_cache_key(text)
EMBEDDING_CACHE[key] = (embedding, time.time() + EMBEDDING_CACHE_TTL)
def predict_multi_task_combined(
self, texts: List[str], tasks: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Combined multi-task inference - runs all classification tasks in a single pass.
Returns severity, issue_type, root_cause, and report_type in one call.
"""
if tasks is None:
tasks = ["severity", "issue_type", "root_cause", "report"]
has_multitask = (
hasattr(self, "onnx_session") and self.onnx_session is not None
) or (hasattr(self, "multi_task_model") and self.multi_task_model is not None)
if has_multitask:
import os
bs = int(os.getenv("ONNX_BATCH", "32"))
multitask_results = self._classify_with_multitask(texts, batch_size=bs)
# Cheap rule-based severity for reinforcement without extra model cost
rb_sev_list: List[Dict[str, Any]] = self._classify_severity_rule_based(texts)
conf_thr = float(os.getenv("SEVERITY_CONF_THRESHOLD", "0.6"))
enable_norm = os.getenv("ISSUE_LABEL_NORMALIZE", "1").lower() in {"1", "true", "yes"}
results = []
for idx, res in enumerate(multitask_results):
combined = {}
for task in tasks:
if task in res:
combined[task] = res[task]
else:
combined[task] = {"label": "Unknown", "confidence": 0.0}
# Normalize issue/root_cause labels
if enable_norm and "issue_type" in combined:
lbl = (combined["issue_type"].get("label") or "").strip()
norm_map = {
"Baggage": "Baggage Handling",
"Passenger": "Pax Handling",
"Pax": "Pax Handling",
"Cargo": "Cargo Problems",
"Cargo Problem": "Cargo Problems",
}
combined["issue_type"]["label"] = norm_map.get(lbl, lbl)
if enable_norm and "root_cause" in combined:
lbl = (combined["root_cause"].get("label") or "").strip()
norm_map = {
"Doc": "Documentation",
"Document": "Documentation",
}
combined["root_cause"]["label"] = norm_map.get(lbl, lbl)
# Reinforce severity with rule-based if confidence is low but signals are strong
try:
rb = rb_sev_list[idx] if idx < len(rb_sev_list) else None
if rb:
rb_label = rb.get("severity")
rb_conf = rb.get("confidence", 0.6)
sev_pred = combined.get("severity", {})
sev_label = sev_pred.get("label")
sev_conf = float(sev_pred.get("confidence", 0.0))
kc = rb.get("keyword_counts", {})
high_hits = int(kc.get("high", 0)) + int(kc.get("critical", 0))
if sev_conf < conf_thr and rb_label in {"High", "Critical"}:
combined["severity"] = {
"label": rb_label,
"confidence": round(min(0.9, max(rb_conf, conf_thr)), 2),
}
elif str(sev_label).lower() in {"low", "0"} and high_hits >= 1:
combined["severity"] = {
"label": rb_label,
"confidence": round(max(rb_conf, sev_conf), 2),
}
except Exception:
pass
results.append(combined)
return results
results = []
for text in texts:
combined = {}
if "severity" in tasks:
sev = self.classify_severity([text])[0]
combined["severity"] = sev
if "issue_type" in tasks:
issue = self.classify_issue_type([text])[0]
combined["issue_type"] = issue
if "root_cause" in tasks:
rc = self.classify_root_cause([text])[0]
combined["root_cause"] = rc
if "report" in tasks:
combined["report"] = {"label": "Unknown", "confidence": 0.5}
results.append(combined)
return results
def classify_severity(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Classify severity of reports"""
if getattr(self, "remote", None) and self.remote.enabled:
r = self.remote.severity(texts)
if r is not None and isinstance(r, list):
out = []
for item in r:
s = item.get("severity") or item.get("label") or "Medium"
c = item.get("confidence") or item.get("score") or 0.8
out.append({"severity": s, "confidence": c})
return out
# Multi-task model provides 'report' category (Irregularity vs Complaint)
if hasattr(self, "multi_task_model") and self.multi_task_model is not None:
# For severity, we still might want to use the dedicated severity model if it exists
# but here we can at least show we have the multi-task data.
# Actually, the user request was to upgrade to multi-task for ALL targets.
# If severity is NOT one of the multi-task targets, we use the fallback or dedicated model.
pass
# Check if IndoBERT model is loaded (dedicated severity)
if (
hasattr(self, "severity_model")
and self.severity_model is not None
and self.severity_tokenizer is not None
):
return self._classify_with_model(
texts,
self.severity_model,
self.severity_tokenizer,
self.severity_label_encoder,
)
# Check if TF-IDF model is loaded
if (
hasattr(self, "severity_vectorizer")
and self.severity_vectorizer is not None
):
return self._classify_with_tfidf(texts)
return self._classify_severity_rule_based(texts)
def _classify_with_tfidf(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Use trained TF-IDF + classifier for classification"""
results = []
X = self.severity_vectorizer.transform(texts)
predictions = self.severity_model.predict(X)
probabilities = self.severity_model.predict_proba(X)
for i, (pred, probs) in enumerate(zip(predictions, probabilities)):
label = None
try:
if isinstance(self.severity_label_encoder, dict):
# Expect a mapping dict or reverse_map style
label_map = (
self.severity_label_encoder.get("label_map")
or self.severity_label_encoder.get("reverse_map")
or {}
)
if isinstance(pred, (int, np.integer)):
label = label_map.get(int(pred))
elif hasattr(self.severity_label_encoder, "inverse_transform"):
label = self.severity_label_encoder.inverse_transform([pred])[0]
except Exception:
label = None
if label is None:
label = str(pred)
confidence = float(max(probs))
results.append(
{
"severity": label,
"confidence": round(confidence, 2),
}
)
return results
def _classify_with_model(
self, texts: List[str], model, tokenizer, label_encoder
) -> List[Dict]:
"""Use trained model for classification"""
import torch
device = next(model.parameters()).device
results = []
with torch.no_grad():
for text in texts:
inputs = tokenizer(
text,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt",
)
if "token_type_ids" in inputs:
inputs.pop("token_type_ids")
inputs = {k: v.to(device) for k, v in inputs.items()}
outputs = model(**inputs)
probs = torch.softmax(outputs.logits, dim=1)
confidence, pred = torch.max(probs, dim=1)
label = None
try:
if label_encoder is not None:
label = label_encoder.inverse_transform([pred.item()])[0]
elif hasattr(model, "config") and hasattr(model.config, "id2label"):
id2label = getattr(model.config, "id2label", {})
label = id2label.get(str(int(pred.item()))) or id2label.get(
int(pred.item())
)
except Exception:
label = None
if label is None:
label = str(int(pred.item()))
results.append(
{
"severity": label,
"confidence": confidence.item(),
}
)
return results
def _classify_severity_rule_based(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Enhanced rule-based severity classification with context-aware matching and calibrated confidence"""
critical_keywords = [
"emergency",
"darurat",
"critical",
"kritis",
"genting",
"severe",
"parah",
"serius",
"injury",
"cedera",
"luka",
"accident",
"kecelakaan",
"safety issue",
"masalah keselamatan",
"fire",
"kebakaran",
"api",
"explosion",
"ledakan",
"death",
"kematian",
"meninggal",
"fatality",
"fatal",
"evacuation",
"evakuasi",
"collapse",
"runtuh",
"crash",
"tabrakan",
]
high_keywords = [
"damage",
"rusak",
"kerusakan",
"torn",
"robek",
"sobek",
"broken",
"pecah",
"patah",
"urgent",
"mendesak",
"segera",
"lost",
"hilang",
"stolen",
"dicuri",
"theft",
"pencurian",
"security",
"keamanan",
"safety",
"keselamatan",
"spillage",
"tumpahan",
"contamination",
"kontaminasi",
"unsafe",
"berbahaya",
"risk",
"risiko",
"threat",
"ancaman",
]
medium_keywords = [
"delay",
"terlambat",
"keterlambatan",
"telat",
"late",
"wrong",
"salah",
"incorrect",
"tidak benar",
"keliru",
"missing",
"tidak ada",
"error",
"kesalahan",
"galat",
"fail",
"gagal",
"kegagalan",
"problem",
"masalah",
"issue",
"isu",
"complaint",
"keluhan",
"reject",
"tolak",
"ditolak",
"complain",
"komplain",
"keluh",
"misload",
"overload",
]
low_keywords = [
"minor",
"kecil",
"ringan",
"small",
"sedikit",
"slight",
"tipis",
"normal",
"biasa",
"routine",
"rutin",
"scheduled",
"terjadwal",
"planned",
"direncanakan",
]
negation_patterns = [
"no damage",
"tidak rusak",
"no injury",
"tidak cedera",
"not broken",
"tidak pecah",
"no accident",
"tidak kecelakaan",
"false alarm",
"salah alarm",
"near miss",
"hampir terjadi",
"almost",
"nearly",
"minor damage",
"kerusakan kecil",
"slight damage",
"sedikit rusak",
"no safety",
"tidak keselamatan",
"not urgent",
"tidak mendesak",
"no emergency",
"tidak darurat",
"potential only",
"potensi saja",
]
intensifier_patterns = [
"very",
"sangat",
"extremely",
"amat",
"highly",
"urgent",
"mendesak",
"segera",
"immediately",
"secepatnya",
"asap",
"serious",
"serius",
"major",
"besar",
]
deintensifier_patterns = [
"slight",
"sedikit",
"minor",
"kecil",
"small",
"partial",
"sebagian",
"temporary",
"sementara",
"minor",
"ringan",
"almost resolved",
"hampir selesai",
]
results = []
for text in texts:
text_lower = (text or "").lower()
critical_count = sum(1 for kw in critical_keywords if kw in text_lower)
high_count = sum(1 for kw in high_keywords if kw in text_lower)
medium_count = sum(1 for kw in medium_keywords if kw in text_lower)
low_count = sum(1 for kw in low_keywords if kw in text_lower)
negation_count = sum(1 for neg in negation_patterns if neg in text_lower)
intensifier_count = sum(
1 for intens in intensifier_patterns if intens in text_lower
)
deintensifier_count = sum(
1 for deint in deintensifier_patterns if deint in text_lower
)
base_severity = "Low"
base_confidence = 0.5
total_matches = critical_count + high_count + medium_count + low_count
if critical_count >= 1:
base_severity = "Critical"
base_confidence = min(0.95, 0.85 + critical_count * 0.03)
elif high_count >= 2:
base_severity = "High"
base_confidence = min(0.90, 0.80 + high_count * 0.03)
elif high_count >= 1:
base_severity = "High"
base_confidence = 0.75
elif medium_count >= 2:
base_severity = "Medium"
base_confidence = min(0.85, 0.70 + medium_count * 0.04)
elif medium_count >= 1:
base_severity = "Medium"
base_confidence = 0.65
elif low_count >= 1:
base_severity = "Low"
base_confidence = 0.70
else:
base_severity = "Low"
base_confidence = 0.50
if negation_count > 0 and base_severity in ["Critical", "High"]:
severity_downgrade = {
"Critical": "Medium",
"High": "Medium",
"Medium": "Low",
"Low": "Low",
}
base_severity = severity_downgrade.get(base_severity, base_severity)
base_confidence = max(0.50, base_confidence - 0.15 * negation_count)
if intensifier_count > 0:
base_confidence = min(0.95, base_confidence + 0.05 * intensifier_count)
if base_severity == "Medium" and intensifier_count >= 2:
base_severity = "High"
base_confidence = min(0.85, base_confidence + 0.10)
if deintensifier_count > 0:
base_confidence = max(
0.40, base_confidence - 0.05 * deintensifier_count
)
base_confidence = round(base_confidence, 2)
results.append(
{
"severity": base_severity,
"confidence": base_confidence,
"keyword_counts": {
"critical": critical_count,
"high": high_count,
"medium": medium_count,
"low": low_count,
},
"modifiers": {
"negations": negation_count,
"intensifiers": intensifier_count,
"deintensifiers": deintensifier_count,
},
}
)
return results
def classify_severity_with_tfidf_fallback(
self, texts: List[str]
) -> List[Dict[str, Any]]:
"""Classify severity with TF-IDF similarity fallback for low-confidence cases"""
results = self.classify_severity(texts)
if not hasattr(self, "_severity_reference_texts"):
self._init_severity_reference_texts()
if not hasattr(self, "_severity_tfidf_vectorizer"):
try:
from sklearn.feature_extraction.text import TfidfVectorizer
self._severity_tfidf_vectorizer = TfidfVectorizer(
max_features=1000, ngram_range=(1, 2), stop_words="english"
)
self._severity_tfidf_matrix = (
self._severity_tfidf_vectorizer.fit_transform(
self._severity_reference_texts["all"]
)
)
except Exception as e:
logger.warning(f"Failed to initialize TF-IDF fallback: {e}")
return results
for i, (text, result) in enumerate(zip(texts, results)):
if result.get("confidence", 0) < 0.6:
try:
tfidf_fallback = self._tfidf_severity_fallback(text)
if tfidf_fallback and tfidf_fallback.get(
"confidence", 0
) > result.get("confidence", 0):
result["severity"] = tfidf_fallback["severity"]
result["confidence"] = tfidf_fallback["confidence"]
result["method"] = "tfidf_fallback"
except Exception:
pass
return results
def _init_severity_reference_texts(self):
"""Initialize reference texts for TF-IDF similarity"""
self._severity_reference_texts = {
"Critical": [
"emergency situation requiring immediate evacuation",
"critical safety issue fire explosion severe injury",
"fatal accident death kecelakaan fatal kematian",
"kebakaran darurat critical emergency parah severe",
"explosion ledakan collapse runtuh crash tabrakan",
],
"High": [
"significant damage to equipment or property",
"urgent security issue theft stolen dicuri pencurian",
"broken equipment rusak pecah patah torn robek",
"lost baggage hilang unsafe berbahaya risk risiko",
"spillage contamination tumpahan kontaminasi",
],
"Medium": [
"delay in operations terlambat keterlambatan",
"wrong incorrect salah keliru error kesalahan",
"fail gagal kegagalan problem masalah issue",
"complaint keluhan reject ditolak misload",
"missing tidak ada galat mistake",
],
"Low": [
"minor issue kecil ringan small slight",
"routine scheduled terjadwal normal biasa",
"planned direncanakan routine rutin",
"minor damage kerusakan kecil sedikit",
],
"all": [],
}
for severity in ["Critical", "High", "Medium", "Low"]:
self._severity_reference_texts["all"].extend(
self._severity_reference_texts[severity]
)
def _tfidf_severity_fallback(self, text: str) -> Optional[Dict[str, Any]]:
"""Use TF-IDF similarity to find similar reference texts"""
if not hasattr(self, "_severity_tfidf_vectorizer"):
return None
try:
from sklearn.metrics.pairwise import cosine_similarity
text_vector = self._severity_tfidf_vectorizer.transform([text])
similarities = cosine_similarity(text_vector, self._severity_tfidf_matrix)[
0
]
best_idx = similarities.argmax()
best_similarity = similarities[best_idx]
if best_similarity < 0.2:
return None
ref_texts = self._severity_reference_texts["all"]
matched_text = ref_texts[best_idx]
severity = "Medium"
for sev in ["Critical", "High", "Medium", "Low"]:
if matched_text in self._severity_reference_texts.get(sev, []):
severity = sev
break
for ref in self._severity_reference_texts.get(sev, []):
if matched_text in ref or ref in matched_text:
severity = sev
break
confidence = min(0.75, 0.4 + best_similarity * 0.35)
return {
"severity": severity,
"confidence": round(confidence, 2),
"similarity": round(best_similarity, 2),
}
except Exception:
return None
def classify_issue_type(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Classify issue type using Multi-Task (issue_type) or legacy models"""
has_multitask = hasattr(self, "onnx_session") or (
hasattr(self, "multi_task_model") and self.multi_task_model is not None
)
if has_multitask and "issue_type" in self.multi_task_label_encoders:
multi_results = self._classify_with_multitask(texts)
return [
res.get("issue_type", {"label": "Unknown", "confidence": 0.0})
for res in multi_results
]
if self.issue_model is not None and self.issue_tokenizer is not None:
return self._classify_with_model(
texts, self.issue_model, self.issue_tokenizer, self.issue_label_encoder
)
return [{"label": "Unknown", "confidence": 0.5} for _ in texts]
def classify_root_cause(self, texts: List[str]) -> List[Dict[str, Any]]:
"""Classify root cause using Multi-Task head"""
has_multitask = hasattr(self, "onnx_session") or (
hasattr(self, "multi_task_model") and self.multi_task_model is not None
)
if has_multitask and "root_cause" in self.multi_task_label_encoders:
multi_results = self._classify_with_multitask(texts)
return [
res.get("root_cause", {"label": "Unknown", "confidence": 0.0})
for res in multi_results
]
return [{"label": "Unknown", "confidence": 0.0} for _ in texts]
def predict_multi_task(self, texts: List[str]) -> Optional[List[Dict[str, Any]]]:
"""Public method for multi-task predictions"""
if getattr(self, "remote", None) and self.remote.enabled:
res = self.remote.multitask_classify(texts)
if res is not None:
out = []
for r in res:
o = {}
if "severity" in r:
sev = r.get("severity")
if isinstance(sev, dict):
o["severity"] = {"label": sev.get("label") or sev.get("severity"), "confidence": sev.get("confidence") or sev.get("score")}
else:
o["severity"] = {"label": sev, "confidence": r.get("severity_confidence") or r.get("confidence") or 0.8}
if "area" in r:
area = r.get("area")
if isinstance(area, dict):
o["area"] = {"label": area.get("label"), "confidence": area.get("confidence") or area.get("score")}
else:
o["area"] = {"label": area, "confidence": r.get("area_confidence") or 0.8}
if "issue_type" in r or "irregularity" in r:
it = r.get("issue_type") if "issue_type" in r else r.get("irregularity")
if isinstance(it, dict):
o["irregularity"] = {"label": it.get("label"), "confidence": it.get("confidence") or it.get("score")}
else:
o["irregularity"] = {"label": it, "confidence": r.get("issue_type_confidence") or 0.8}
if "root_cause" in r:
rc = r.get("root_cause")
if isinstance(rc, dict):
o["root_cause"] = {"label": rc.get("label"), "confidence": rc.get("confidence") or rc.get("score")}
else:
o["root_cause"] = {"label": rc, "confidence": r.get("root_cause_confidence") or 0.8}
out.append(o)
return out
has_multitask = hasattr(self, "onnx_session") or (
hasattr(self, "multi_task_model") and self.multi_task_model is not None
)
if has_multitask:
return self._classify_with_multitask(texts)
return None
def _classify_with_multitask(
self, texts: List[str], batch_size: int = 16
) -> List[Dict[str, Any]]:
"""Inference using Multi-Task Transformer (ONNX or PyTorch)"""
results = []
# ONNX Inference
if hasattr(self, "onnx_session") and self.onnx_session is not None:
import numpy as np
output_names = [output.name for output in self.onnx_session.get_outputs()]
temperature = float(os.getenv("ONNX_TEMPERATURE", "1.0"))
if temperature <= 0:
temperature = 1.0
for i in range(0, len(texts), batch_size):
chunk = texts[i : i + batch_size]
# Tokenize
max_len = int(os.getenv("ONNX_MAX_LEN", "192"))
inputs = self.multi_task_tokenizer(chunk, padding=True, truncation=True, max_length=max_len, return_tensors="np")
onnx_inputs = {
"input_ids": inputs["input_ids"].astype(np.int64),
"attention_mask": inputs["attention_mask"].astype(np.int64),
}
# Run inference
onnx_outputs = self.onnx_session.run(None, onnx_inputs)
# Process outputs (Vectorized)
chunk_results = [{} for _ in range(len(chunk))]
for k, output_name in enumerate(output_names):
logits_batch = onnx_outputs[k]
if temperature != 1.0:
logits_batch = logits_batch / temperature
# Vectorized Softmax
max_logits = np.max(logits_batch, axis=1, keepdims=True)
exp_logits = np.exp(logits_batch - max_logits)
probs_batch = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
pred_indices = np.argmax(probs_batch, axis=1)
confidences = np.max(probs_batch, axis=1)
if output_name in self.multi_task_label_encoders:
le = self.multi_task_label_encoders[output_name]
try:
labels = le.inverse_transform(pred_indices)
except:
labels = pred_indices.astype(str)
else:
labels = pred_indices.astype(str)
for j in range(len(chunk)):
label_val = str(labels[j])
if output_name == "severity":
label_val = self._severity_index_to_label.get(label_val, label_val)
elif output_name == "issue_type":
if os.getenv("ISSUE_LABEL_NORMALIZE", "1").lower() in {"1","true","yes"}:
norm_map = {
"Baggage": "Baggage Handling",
"Passenger": "Pax Handling",
"Pax": "Pax Handling",
"Cargo": "Cargo Problems",
"Cargo Problem": "Cargo Problems",
}
label_val = norm_map.get(label_val, label_val)
elif output_name == "root_cause":
if os.getenv("ISSUE_LABEL_NORMALIZE", "1").lower() in {"1","true","yes"}:
norm_map = {
"Doc": "Documentation",
"Document": "Documentation",
}
label_val = norm_map.get(label_val, label_val)
chunk_results[j][output_name] = {
"label": label_val,
"confidence": round(float(confidences[j]), 3),
}
results.extend(chunk_results)
return results
# PyTorch Inference
import torch
if not hasattr(self, "multi_task_model") or self.multi_task_model is None:
return []
device = next(self.multi_task_model.parameters()).device
results = []
# Process in chunks to balance speed and memory
for i in range(0, len(texts), batch_size):
chunk = texts[i : i + batch_size]
with torch.no_grad():
inputs = self.multi_task_tokenizer(
chunk,
padding=True,
truncation=True,
max_length=256,
return_tensors="pt",
)
inputs = {k: v.to(device) for k, v in inputs.items()}
logits_dict = self.multi_task_model(**inputs)
# Extract results for each item in the chunk
for j in range(len(chunk)):
prediction = {}
for key, logits in logits_dict.items():
# Get probabilities for the j-th record in batch
probs = torch.softmax(logits[j : j + 1], dim=1)
confidence, pred = torch.max(probs, dim=1)
if key in self.multi_task_label_encoders:
le = self.multi_task_label_encoders[key]
label = le.inverse_transform([pred.item()])[0]
prediction[key] = {
"label": str(label),
"confidence": round(float(confidence.item()), 3),
}
results.append(prediction)
return results
def summarize(self, text: str) -> Dict[str, Any]:
"""Generate summary of report"""
if getattr(self, "remote", None) and self.remote.enabled:
res = self.remote.summarize([text])
if res and isinstance(res, list) and res:
item = res[0]
es = item.get("executive_summary") or item.get("summary") or item.get("executiveSummary") or ""
kp = item.get("key_points") or item.get("keyPoints") or []
return {"executiveSummary": es, "keyPoints": kp}
if self.summarizer is not None:
return self.summarizer.summarize(text)
return self._simple_summarize(text)
def _simple_summarize(self, text: str) -> Dict[str, Any]:
"""Simple extractive summarization fallback"""
if not text or len(text) < 50:
return {
"executiveSummary": text[:200] + "..." if len(text) > 200 else text,
"keyPoints": [],
}
important_keywords = [
"damage",
"torn",
"broken",
"cargo",
"baggage",
"passenger",
"delay",
"late",
"error",
"fail",
]
sentences = text.replace("!", ".").replace("?", ".").split(".")
sentences = [s.strip() for s in sentences if len(s.strip()) > 20]
sentence_scores = []
for sent in sentences:
score = sum(1 for kw in important_keywords if kw in sent.lower())
sentence_scores.append((sent, score))
sentence_scores.sort(key=lambda x: x[1], reverse=True)
top_sentences = [s[0] for s in sentence_scores[:3]]
summary = ". ".join(top_sentences) + "." if top_sentences else text[:200]
key_points = []
if any(kw in text.lower() for kw in ["cargo", "uld", "kargo"]):
key_points.append("Cargo-related issue")
if any(kw in text.lower() for kw in ["baggage", "bag", "bagasi"]):
key_points.append("Baggage handling issue")
if any(kw in text.lower() for kw in ["passenger", "pax", "penumpang"]):
key_points.append("Passenger service issue")
if any(
kw in text.lower()
for kw in ["damage", "torn", "broken", "rusak", "pecah", "robek"]
):
key_points.append("Physical damage reported")
if any(kw in text.lower() for kw in ["delay", "terlambat", "telat"]):
key_points.append("Delay reported")
return {
"executiveSummary": summary[:300] + "..."
if len(summary) > 300
else summary,
"keyPoints": key_points[:5],
}
def analyze_urgency(self, texts: List[str]) -> List[Dict[str, Any]]:
if getattr(self, "remote", None) and self.remote.enabled:
res = self.remote.urgency(texts)
if res is not None and isinstance(res, list):
out = []
for r in res:
out.append(
{
"urgency_score": r.get("urgency_score") or r.get("score") or 0.0,
"sentiment": r.get("sentiment") or "Neutral",
"keywords": r.get("keywords") or [],
}
)
return out
critical = {
"emergency": 1.2,
"darurat": 1.2,
"critical": 1.2,
"kritis": 1.2,
"severe": 1.1,
"parah": 1.1,
"injury": 1.2,
"cedera": 1.2,
"death": 1.3,
"kematian": 1.3,
"fire": 1.3,
"kebakaran": 1.3,
"explosion": 1.3,
"ledakan": 1.3,
"evacuate": 1.2,
"evakuasi": 1.2,
"safety": 1.1,
"keselamatan": 1.1,
"security": 1.0,
"keamanan": 1.0,
"accident": 1.2,
"kecelakaan": 1.2,
"hazard": 1.1,
"dangerous": 1.1,
"unsafe": 1.1,
}
high = {
"damage": 1.0,
"rusak": 1.0,
"broken": 1.0,
"pecah": 1.0,
"patah": 1.0,
"torn": 0.9,
"robek": 0.9,
"spillage": 0.9,
"bocor": 0.9,
"lost": 0.8,
"hilang": 0.8,
"stolen": 0.9,
"dicuri": 0.9,
"theft": 0.9,
"pencurian": 0.9,
"unsafe": 1.0,
"berbahaya": 1.0,
"major": 1.0,
"significant": 0.9,
}
medium = {
"delay": 0.5,
"terlambat": 0.5,
"telat": 0.5,
"late": 0.5,
"misload": 0.6,
"salah muat": 0.6,
"wrong": 0.45,
"incorrect": 0.45,
"keliru": 0.45,
"missing": 0.45,
"tidak ada": 0.45,
"error": 0.45,
"kesalahan": 0.45,
"fail": 0.45,
"gagal": 0.45,
"complaint": 0.45,
"keluhan": 0.45,
"complain": 0.45,
"komplain": 0.45,
}
low = {
"minor": 0.25,
"kecil": 0.25,
"ringan": 0.25,
"small": 0.25,
"slight": 0.25,
"normal": 0.15,
"rutin": 0.15,
"routine": 0.15,
}
intensifiers = {
"very",
"sangat",
"extremely",
"sangatlah",
"urgent",
"mendesak",
"segera",
"immediately",
"secepatnya",
"asap",
"as soon as possible",
"right now",
}
deintensifiers = {"slight", "sedikit", "minor", "low", "ringan", "slightly", "partially", "temporary"}
negations = {"no", "not", "tidak", "bukan", "tanpa", "false alarm", "no damage", "no injury", "not urgent", "no issue", "no problem"}
results = []
for text in texts:
tl = (text or "").lower()
found = set()
score = 0.0
for kw, w in critical.items():
if kw in tl:
found.add(kw)
score += w
for kw, w in high.items():
if kw in tl:
found.add(kw)
score += w
for kw, w in medium.items():
if kw in tl:
found.add(kw)
score += w
for kw, w in low.items():
if kw in tl:
found.add(kw)
score += w
intens_count = sum(1 for t in intensifiers if t in tl)
if intens_count > 0 and score > 0:
score *= min(1.35, 1.0 + self.urgency_intensifier_scale * intens_count)
deint_count = sum(1 for t in deintensifiers if t in tl)
if deint_count > 0:
score *= max(0.5, 1.0 - self.urgency_deintensifier_scale * deint_count)
if any(n in tl for n in negations) and any(
k in tl
for k in [
"damage",
"rusak",
"broken",
"pecah",
"injury",
"cedera",
"fire",
"kebakaran",
]
):
score *= self.urgency_negation_multiplier
excl_bonus = min(0.08, tl.count("!") * 0.02)
caps_ratio = 0.0
letters = [c for c in text if c.isalpha()]
if letters:
caps_ratio = sum(1 for c in letters if c.isupper()) / len(letters)
caps_bonus = 0.08 if caps_ratio > 0.35 else 0.0
score += excl_bonus + caps_bonus
norm = min(1.0, score / self.urgency_norm_denom)
if norm >= self.urgency_critical_threshold:
sentiment = "Critical Negative"
elif norm >= self.urgency_negative_threshold:
sentiment = "Negative"
elif norm >= self.urgency_somewhat_threshold:
sentiment = "Somewhat Negative"
else:
sentiment = "Neutral"
results.append(
{
"urgency_score": round(norm, 2),
"sentiment": sentiment,
"keywords": sorted(list(found))[:15],
}
)
return results