Spaces:
Runtime error
Runtime error
| """ | |
| NLP Model Service for Gapura AI | |
| Loads and uses trained NLP models for severity classification, issue type classification, etc. | |
| Falls back to rule-based logic if models are not available. | |
| """ | |
| import os | |
| import logging | |
| import pickle | |
| import hashlib | |
| import time | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import numpy as np | |
| from functools import lru_cache | |
| logger = logging.getLogger(__name__) | |
| EMBEDDING_CACHE: Dict[str, Tuple[np.ndarray, float]] = {} | |
| EMBEDDING_CACHE_TTL = 3600 | |
| EMBEDDING_CACHE_MAX_SIZE = 5000 | |
| _nlp_instance = None | |
| def get_nlp_service(): | |
| """Get or create singleton NLPModelService instance""" | |
| global _nlp_instance | |
| if _nlp_instance is None: | |
| _nlp_instance = NLPModelService() | |
| return _nlp_instance | |
| class NLPModelService: | |
| """Service for NLP predictions using trained models or rule-based fallback""" | |
| def __init__(self): | |
| labels_env = os.getenv("MULTITASK_SEVERITY_LABELS") | |
| if labels_env: | |
| parts = [p.strip() for p in labels_env.split(",") if p.strip()] | |
| self._severity_index_to_label = {str(i): parts[i] for i in range(len(parts))} | |
| else: | |
| self._severity_index_to_label = { | |
| "0": "Low", | |
| "1": "Medium", | |
| "2": "High", | |
| "3": "Critical", | |
| } | |
| self.urgency_critical_threshold = float(os.getenv("URGENCY_CRITICAL_THRESHOLD", "0.85")) | |
| self.urgency_negative_threshold = float(os.getenv("URGENCY_NEGATIVE_THRESHOLD", "0.6")) | |
| self.urgency_somewhat_threshold = float(os.getenv("URGENCY_SOMEWHAT_THRESHOLD", "0.35")) | |
| self.urgency_norm_denom = float(os.getenv("URGENCY_NORM_DENOM", "3.5")) | |
| self.urgency_intensifier_scale = float(os.getenv("URGENCY_INTENSIFIER_SCALE", "0.1")) | |
| self.urgency_deintensifier_scale = float(os.getenv("URGENCY_DEINTENSIFIER_SCALE", "0.15")) | |
| self.urgency_negation_multiplier = float(os.getenv("URGENCY_NEGATION_MULTIPLIER", "0.5")) | |
| self.severity_model = None | |
| self.severity_tokenizer = None | |
| self.severity_label_encoder = None | |
| self.severity_vectorizer = None | |
| self.severity_classifier = None # For TF-IDF classifier | |
| self.issue_model = None | |
| self.issue_tokenizer = None | |
| self.issue_label_encoder = None | |
| self.summarizer = None | |
| self.version = "1.0.0-rule-based" | |
| self.models_loaded = False | |
| from .remote_client import get_remote_client | |
| self.remote = get_remote_client() | |
| self._load_models() | |
| def _load_models(self): | |
| """Load or create NLP models""" | |
| if getattr(self, "remote", None) and self.remote.enabled and os.getenv("NLP_REMOTE_ONLY", "1").lower() in {"1","true","yes"}: | |
| self.version = "remote-dl" | |
| self.models_loaded = False | |
| return | |
| rid = os.getenv("NLP_MODEL_REPO_ID") or os.getenv("MODEL_REPO_ID") | |
| roots = [] | |
| app_models = os.path.join("/app", "models") | |
| if os.path.isdir(app_models): | |
| roots.append(app_models) | |
| if rid: | |
| try: | |
| from huggingface_hub import snapshot_download | |
| cache_dir = snapshot_download(repo_id=rid) | |
| roots.append(cache_dir) | |
| except Exception: | |
| pass | |
| roots.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| def _path(*parts): | |
| for r in roots: | |
| p1 = os.path.join(r, *parts) | |
| if os.path.exists(p1): | |
| return p1 | |
| p2 = os.path.join(r, "models", *parts) | |
| if os.path.exists(p2): | |
| return p2 | |
| return os.path.join(roots[0], *parts) | |
| nlp_dir = _path("nlp") | |
| multitask_onnx_path = _path("multi_task_transformer.onnx") | |
| multitask_pt_path = _path("multi_task_transformer.pt") | |
| encoders_pkl = _path("multi_task_label_encoders.pkl") | |
| encoders_json = _path("multi_task_label_encoders.json") | |
| disable_multitask = os.getenv("NLP_DISABLE_MULTITASK", "").lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| } | |
| allow_transformers = os.getenv("NLP_ALLOW_TRANSFORMERS", "").lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| } | |
| prefer_multitask = os.getenv("NLP_PREFER_MULTITASK", "").lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| } | |
| if ( | |
| prefer_multitask | |
| and not disable_multitask | |
| and os.path.exists(multitask_onnx_path) | |
| ): | |
| try: | |
| from transformers import AutoTokenizer | |
| import onnxruntime as ort | |
| import json | |
| try: | |
| self.multi_task_tokenizer = AutoTokenizer.from_pretrained( | |
| "distilbert-base-uncased", local_files_only=True, use_fast=True | |
| ) | |
| except Exception: | |
| self.multi_task_tokenizer = AutoTokenizer.from_pretrained( | |
| "distilbert-base-uncased", use_fast=True | |
| ) | |
| sess_options = ort.SessionOptions() | |
| sess_options.intra_op_num_threads = int(os.getenv("ONNX_THREADS", "1")) | |
| sess_options.graph_optimization_level = ( | |
| ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| ) | |
| sess_options.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL | |
| int8_onnx_path = _path("multi_task_transformer_int8.onnx") | |
| use_int8 = os.path.exists(int8_onnx_path) and os.getenv( | |
| "ONNX_USE_INT8", "1" | |
| ).lower() in {"1", "true", "yes"} | |
| selected_onnx_path = int8_onnx_path if use_int8 else multitask_onnx_path | |
| self.onnx_session = ort.InferenceSession( | |
| selected_onnx_path, sess_options | |
| ) | |
| self.onnx_int8 = use_int8 | |
| self.multi_task_label_encoders = {} | |
| if os.path.exists(encoders_pkl): | |
| try: | |
| with open(encoders_pkl, "rb") as f: | |
| self.multi_task_label_encoders = pickle.load(f) | |
| except Exception: | |
| self.multi_task_label_encoders = {} | |
| elif os.path.exists(encoders_json): | |
| try: | |
| with open(encoders_json, "r", encoding="utf-8") as f: | |
| self.multi_task_label_encoders = json.load(f) | |
| except Exception: | |
| self.multi_task_label_encoders = {} | |
| self.models_loaded = True | |
| self.version = "4.0.0-onnx-int8" if use_int8 else "4.0.0-onnx" | |
| logger.info(f"Loaded optimized ONNX multi-task model (v{self.version})") | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load ONNX multi-task model: {e}") | |
| # First try a lightweight trained TF-IDF severity classifier | |
| severity_dir = os.path.join(nlp_dir, "severity_classifier") | |
| if os.path.exists(severity_dir): | |
| # Try HuggingFace format first (only if real HF weights/tokenizer exist) | |
| try: | |
| cfg = os.path.join(severity_dir, "config.json") | |
| has_pt = os.path.exists( | |
| os.path.join(severity_dir, "pytorch_model.bin") | |
| ) or os.path.exists(os.path.join(severity_dir, "model.safetensors")) | |
| has_tok = os.path.exists( | |
| os.path.join(severity_dir, "tokenizer.json") | |
| ) or os.path.exists(os.path.join(severity_dir, "vocab.txt")) | |
| if os.path.exists(cfg) and (has_pt or has_tok): | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| ) | |
| import torch | |
| self.severity_tokenizer = AutoTokenizer.from_pretrained( | |
| severity_dir | |
| ) | |
| self.severity_model = ( | |
| AutoModelForSequenceClassification.from_pretrained(severity_dir) | |
| ) | |
| self.severity_model.to( | |
| torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| ) | |
| try: | |
| with open( | |
| os.path.join(severity_dir, "label_encoder.pkl"), "rb" | |
| ) as f: | |
| self.severity_label_encoder = pickle.load(f) | |
| except Exception: | |
| self.severity_label_encoder = None | |
| self.models_loaded = True | |
| ver = os.getenv("NLP_SEVERITY_MODEL_VERSION") | |
| if not ver: | |
| try: | |
| import json | |
| cfg_path = os.path.join(severity_dir, "config.json") | |
| if os.path.exists(cfg_path): | |
| with open(cfg_path, "r", encoding="utf-8") as f: | |
| cfg = json.load(f) | |
| ver = cfg.get("version") | |
| except Exception: | |
| ver = None | |
| self.version = ver or "2.0.0-hf" | |
| logger.info("Loaded HF severity classifier") | |
| if not prefer_multitask: | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load HF severity classifier: {e}") | |
| # Fallback to TF-IDF artifacts if present | |
| try: | |
| with open(os.path.join(severity_dir, "vectorizer.pkl"), "rb") as f: | |
| self.severity_vectorizer = pickle.load(f) | |
| with open(os.path.join(severity_dir, "classifier.pkl"), "rb") as f: | |
| self.severity_model = pickle.load(f) | |
| try: | |
| with open( | |
| os.path.join(severity_dir, "label_encoder.pkl"), "rb" | |
| ) as f: | |
| label_obj = pickle.load(f) | |
| # Support either a dict-based mapping or a sklearn LabelEncoder | |
| if isinstance(label_obj, dict) and "reverse_map" in label_obj: | |
| self.severity_label_encoder = label_obj | |
| elif hasattr(label_obj, "classes_"): | |
| self.severity_label_encoder = label_obj | |
| else: | |
| self.severity_label_encoder = None | |
| except Exception: | |
| self.severity_label_encoder = None | |
| self.models_loaded = True | |
| self.version = "1.0.0-trained-tfidf" | |
| logger.info( | |
| "Loaded trained severity classifier (TF-IDF + RandomForest)" | |
| ) | |
| if not prefer_multitask: | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load TF-IDF classifier: {e}") | |
| # Prefer ONNX multi-task only if explicitly preferred | |
| if ( | |
| prefer_multitask | |
| and not disable_multitask | |
| and allow_transformers | |
| and os.path.exists(multitask_onnx_path) | |
| ): | |
| try: | |
| from transformers import AutoTokenizer | |
| import onnxruntime as ort | |
| import json | |
| self.multi_task_tokenizer = AutoTokenizer.from_pretrained( | |
| "distilbert-base-uncased", local_files_only=True, use_fast=True | |
| ) | |
| sess_options = ort.SessionOptions() | |
| sess_options.intra_op_num_threads = 1 | |
| sess_options.graph_optimization_level = ( | |
| ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| ) | |
| self.onnx_session = ort.InferenceSession( | |
| multitask_onnx_path, sess_options | |
| ) | |
| self.multi_task_label_encoders = {} | |
| if os.path.exists(encoders_pkl): | |
| try: | |
| with open(encoders_pkl, "rb") as f: | |
| self.multi_task_label_encoders = pickle.load(f) | |
| except Exception: | |
| self.multi_task_label_encoders = {} | |
| elif os.path.exists(encoders_json): | |
| try: | |
| with open(encoders_json, "r", encoding="utf-8") as f: | |
| self.multi_task_label_encoders = json.load(f) | |
| except Exception: | |
| self.multi_task_label_encoders = {} | |
| self.models_loaded = True | |
| self.version = "4.0.0-onnx" | |
| logger.info(f"Loaded optimized ONNX multi-task model (v{self.version})") | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load ONNX multi-task model: {e}") | |
| # Fallback: PyTorch checkpoint (may be heavy) | |
| if ( | |
| prefer_multitask | |
| and not disable_multitask | |
| and allow_transformers | |
| and os.path.exists(multitask_pt_path) | |
| ): | |
| try: | |
| import torch | |
| from transformers import DistilBertTokenizer | |
| # Load metadata from PyTorch checkpoint | |
| checkpoint = torch.load( | |
| multitask_pt_path, weights_only=False, map_location="cpu" | |
| ) | |
| self.multi_task_label_encoders = checkpoint["label_encoders"] | |
| try: | |
| self.multi_task_tokenizer = DistilBertTokenizer.from_pretrained( | |
| "distilbert-base-uncased", local_files_only=True | |
| ) | |
| except Exception: | |
| raise RuntimeError("DistilBert tokenizer not available locally") | |
| # Fallback to PyTorch model | |
| from data.transformer_architecture import MultiTaskDistilBert | |
| num_labels_dict = checkpoint["num_labels_dict"] | |
| self.multi_task_model = MultiTaskDistilBert(num_labels_dict) | |
| self.multi_task_model.load_state_dict(checkpoint["model_state_dict"]) | |
| self.multi_task_model.eval() | |
| self.models_loaded = True | |
| self.version = "4.0.0-synthetic-plus" | |
| logger.info( | |
| f"Loaded trained multi-task transformer model (v{self.version})" | |
| ) | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load multi-task transformer: {e}") | |
| # Try to load HF Issue Classifier | |
| issue_dir = os.path.join(nlp_dir, "issue_classifier") | |
| if allow_transformers and os.path.exists(issue_dir): | |
| try: | |
| if os.path.exists(os.path.join(issue_dir, "config.json")): | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| ) | |
| import torch | |
| self.issue_tokenizer = AutoTokenizer.from_pretrained(issue_dir) | |
| self.issue_model = ( | |
| AutoModelForSequenceClassification.from_pretrained(issue_dir) | |
| ) | |
| self.issue_model.to( | |
| torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| ) | |
| try: | |
| with open( | |
| os.path.join(issue_dir, "label_encoder.pkl"), "rb" | |
| ) as f: | |
| self.issue_label_encoder = pickle.load(f) | |
| except Exception: | |
| self.issue_label_encoder = None | |
| self.models_loaded = True | |
| logger.info("Loaded HF issue classifier") | |
| except Exception as e: | |
| logger.warning(f"Failed to load HF issue classifier: {e}") | |
| # Legacy IndoBERT path support | |
| bert_dir = os.path.join(nlp_dir, "severity_bert") | |
| if allow_transformers and os.path.exists(bert_dir): | |
| try: | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| ) | |
| import torch | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.severity_tokenizer = AutoTokenizer.from_pretrained(bert_dir) | |
| self.severity_model = ( | |
| AutoModelForSequenceClassification.from_pretrained(bert_dir) | |
| ) | |
| self.severity_model.to(device) | |
| self.severity_model.eval() | |
| with open(os.path.join(bert_dir, "label_encoder.pkl"), "rb") as f: | |
| self.severity_label_encoder = pickle.load(f) | |
| self.models_loaded = True | |
| self.version = "1.0.0-trained-bert" | |
| logger.info("Loaded trained IndoBERT severity classifier") | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load IndoBERT classifier: {e}") | |
| try: | |
| summ_path = os.path.join(nlp_dir, "summarizer.pkl") | |
| if os.path.exists(summ_path): | |
| with open(summ_path, "rb") as f: | |
| self.summarizer = pickle.load(f) | |
| except Exception as e: | |
| logger.warning(f"Failed to load summarizer: {e}") | |
| logger.info("No trained NLP models found, using rule-based fallback") | |
| self.models_loaded = False | |
| def _get_embedding_cache_key(self, text: str) -> str: | |
| return hashlib.md5(text.encode("utf-8")).hexdigest() | |
| def _get_cached_embedding(self, text: str) -> Optional[np.ndarray]: | |
| key = self._get_embedding_cache_key(text) | |
| if key in EMBEDDING_CACHE: | |
| embedding, expiry = EMBEDDING_CACHE[key] | |
| if time.time() < expiry: | |
| return embedding | |
| del EMBEDDING_CACHE[key] | |
| return None | |
| def _cache_embedding(self, text: str, embedding: np.ndarray): | |
| if len(EMBEDDING_CACHE) >= EMBEDDING_CACHE_MAX_SIZE: | |
| now = time.time() | |
| expired = [k for k, (v, e) in EMBEDDING_CACHE.items() if e < now] | |
| for k in expired: | |
| del EMBEDDING_CACHE[k] | |
| if len(EMBEDDING_CACHE) >= EMBEDDING_CACHE_MAX_SIZE: | |
| oldest_key = min( | |
| EMBEDDING_CACHE.keys(), key=lambda k: EMBEDDING_CACHE[k][1] | |
| ) | |
| del EMBEDDING_CACHE[oldest_key] | |
| key = self._get_embedding_cache_key(text) | |
| EMBEDDING_CACHE[key] = (embedding, time.time() + EMBEDDING_CACHE_TTL) | |
| def predict_multi_task_combined( | |
| self, texts: List[str], tasks: Optional[List[str]] = None | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Combined multi-task inference - runs all classification tasks in a single pass. | |
| Returns severity, issue_type, root_cause, and report_type in one call. | |
| """ | |
| if tasks is None: | |
| tasks = ["severity", "issue_type", "root_cause", "report"] | |
| has_multitask = ( | |
| hasattr(self, "onnx_session") and self.onnx_session is not None | |
| ) or (hasattr(self, "multi_task_model") and self.multi_task_model is not None) | |
| if has_multitask: | |
| import os | |
| bs = int(os.getenv("ONNX_BATCH", "32")) | |
| multitask_results = self._classify_with_multitask(texts, batch_size=bs) | |
| # Cheap rule-based severity for reinforcement without extra model cost | |
| rb_sev_list: List[Dict[str, Any]] = self._classify_severity_rule_based(texts) | |
| conf_thr = float(os.getenv("SEVERITY_CONF_THRESHOLD", "0.6")) | |
| enable_norm = os.getenv("ISSUE_LABEL_NORMALIZE", "1").lower() in {"1", "true", "yes"} | |
| results = [] | |
| for idx, res in enumerate(multitask_results): | |
| combined = {} | |
| for task in tasks: | |
| if task in res: | |
| combined[task] = res[task] | |
| else: | |
| combined[task] = {"label": "Unknown", "confidence": 0.0} | |
| # Normalize issue/root_cause labels | |
| if enable_norm and "issue_type" in combined: | |
| lbl = (combined["issue_type"].get("label") or "").strip() | |
| norm_map = { | |
| "Baggage": "Baggage Handling", | |
| "Passenger": "Pax Handling", | |
| "Pax": "Pax Handling", | |
| "Cargo": "Cargo Problems", | |
| "Cargo Problem": "Cargo Problems", | |
| } | |
| combined["issue_type"]["label"] = norm_map.get(lbl, lbl) | |
| if enable_norm and "root_cause" in combined: | |
| lbl = (combined["root_cause"].get("label") or "").strip() | |
| norm_map = { | |
| "Doc": "Documentation", | |
| "Document": "Documentation", | |
| } | |
| combined["root_cause"]["label"] = norm_map.get(lbl, lbl) | |
| # Reinforce severity with rule-based if confidence is low but signals are strong | |
| try: | |
| rb = rb_sev_list[idx] if idx < len(rb_sev_list) else None | |
| if rb: | |
| rb_label = rb.get("severity") | |
| rb_conf = rb.get("confidence", 0.6) | |
| sev_pred = combined.get("severity", {}) | |
| sev_label = sev_pred.get("label") | |
| sev_conf = float(sev_pred.get("confidence", 0.0)) | |
| kc = rb.get("keyword_counts", {}) | |
| high_hits = int(kc.get("high", 0)) + int(kc.get("critical", 0)) | |
| if sev_conf < conf_thr and rb_label in {"High", "Critical"}: | |
| combined["severity"] = { | |
| "label": rb_label, | |
| "confidence": round(min(0.9, max(rb_conf, conf_thr)), 2), | |
| } | |
| elif str(sev_label).lower() in {"low", "0"} and high_hits >= 1: | |
| combined["severity"] = { | |
| "label": rb_label, | |
| "confidence": round(max(rb_conf, sev_conf), 2), | |
| } | |
| except Exception: | |
| pass | |
| results.append(combined) | |
| return results | |
| results = [] | |
| for text in texts: | |
| combined = {} | |
| if "severity" in tasks: | |
| sev = self.classify_severity([text])[0] | |
| combined["severity"] = sev | |
| if "issue_type" in tasks: | |
| issue = self.classify_issue_type([text])[0] | |
| combined["issue_type"] = issue | |
| if "root_cause" in tasks: | |
| rc = self.classify_root_cause([text])[0] | |
| combined["root_cause"] = rc | |
| if "report" in tasks: | |
| combined["report"] = {"label": "Unknown", "confidence": 0.5} | |
| results.append(combined) | |
| return results | |
| def classify_severity(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Classify severity of reports""" | |
| if getattr(self, "remote", None) and self.remote.enabled: | |
| r = self.remote.severity(texts) | |
| if r is not None and isinstance(r, list): | |
| out = [] | |
| for item in r: | |
| s = item.get("severity") or item.get("label") or "Medium" | |
| c = item.get("confidence") or item.get("score") or 0.8 | |
| out.append({"severity": s, "confidence": c}) | |
| return out | |
| # Multi-task model provides 'report' category (Irregularity vs Complaint) | |
| if hasattr(self, "multi_task_model") and self.multi_task_model is not None: | |
| # For severity, we still might want to use the dedicated severity model if it exists | |
| # but here we can at least show we have the multi-task data. | |
| # Actually, the user request was to upgrade to multi-task for ALL targets. | |
| # If severity is NOT one of the multi-task targets, we use the fallback or dedicated model. | |
| pass | |
| # Check if IndoBERT model is loaded (dedicated severity) | |
| if ( | |
| hasattr(self, "severity_model") | |
| and self.severity_model is not None | |
| and self.severity_tokenizer is not None | |
| ): | |
| return self._classify_with_model( | |
| texts, | |
| self.severity_model, | |
| self.severity_tokenizer, | |
| self.severity_label_encoder, | |
| ) | |
| # Check if TF-IDF model is loaded | |
| if ( | |
| hasattr(self, "severity_vectorizer") | |
| and self.severity_vectorizer is not None | |
| ): | |
| return self._classify_with_tfidf(texts) | |
| return self._classify_severity_rule_based(texts) | |
| def _classify_with_tfidf(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Use trained TF-IDF + classifier for classification""" | |
| results = [] | |
| X = self.severity_vectorizer.transform(texts) | |
| predictions = self.severity_model.predict(X) | |
| probabilities = self.severity_model.predict_proba(X) | |
| for i, (pred, probs) in enumerate(zip(predictions, probabilities)): | |
| label = None | |
| try: | |
| if isinstance(self.severity_label_encoder, dict): | |
| # Expect a mapping dict or reverse_map style | |
| label_map = ( | |
| self.severity_label_encoder.get("label_map") | |
| or self.severity_label_encoder.get("reverse_map") | |
| or {} | |
| ) | |
| if isinstance(pred, (int, np.integer)): | |
| label = label_map.get(int(pred)) | |
| elif hasattr(self.severity_label_encoder, "inverse_transform"): | |
| label = self.severity_label_encoder.inverse_transform([pred])[0] | |
| except Exception: | |
| label = None | |
| if label is None: | |
| label = str(pred) | |
| confidence = float(max(probs)) | |
| results.append( | |
| { | |
| "severity": label, | |
| "confidence": round(confidence, 2), | |
| } | |
| ) | |
| return results | |
| def _classify_with_model( | |
| self, texts: List[str], model, tokenizer, label_encoder | |
| ) -> List[Dict]: | |
| """Use trained model for classification""" | |
| import torch | |
| device = next(model.parameters()).device | |
| results = [] | |
| with torch.no_grad(): | |
| for text in texts: | |
| inputs = tokenizer( | |
| text, | |
| padding=True, | |
| truncation=True, | |
| max_length=512, | |
| return_tensors="pt", | |
| ) | |
| if "token_type_ids" in inputs: | |
| inputs.pop("token_type_ids") | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| outputs = model(**inputs) | |
| probs = torch.softmax(outputs.logits, dim=1) | |
| confidence, pred = torch.max(probs, dim=1) | |
| label = None | |
| try: | |
| if label_encoder is not None: | |
| label = label_encoder.inverse_transform([pred.item()])[0] | |
| elif hasattr(model, "config") and hasattr(model.config, "id2label"): | |
| id2label = getattr(model.config, "id2label", {}) | |
| label = id2label.get(str(int(pred.item()))) or id2label.get( | |
| int(pred.item()) | |
| ) | |
| except Exception: | |
| label = None | |
| if label is None: | |
| label = str(int(pred.item())) | |
| results.append( | |
| { | |
| "severity": label, | |
| "confidence": confidence.item(), | |
| } | |
| ) | |
| return results | |
| def _classify_severity_rule_based(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Enhanced rule-based severity classification with context-aware matching and calibrated confidence""" | |
| critical_keywords = [ | |
| "emergency", | |
| "darurat", | |
| "critical", | |
| "kritis", | |
| "genting", | |
| "severe", | |
| "parah", | |
| "serius", | |
| "injury", | |
| "cedera", | |
| "luka", | |
| "accident", | |
| "kecelakaan", | |
| "safety issue", | |
| "masalah keselamatan", | |
| "fire", | |
| "kebakaran", | |
| "api", | |
| "explosion", | |
| "ledakan", | |
| "death", | |
| "kematian", | |
| "meninggal", | |
| "fatality", | |
| "fatal", | |
| "evacuation", | |
| "evakuasi", | |
| "collapse", | |
| "runtuh", | |
| "crash", | |
| "tabrakan", | |
| ] | |
| high_keywords = [ | |
| "damage", | |
| "rusak", | |
| "kerusakan", | |
| "torn", | |
| "robek", | |
| "sobek", | |
| "broken", | |
| "pecah", | |
| "patah", | |
| "urgent", | |
| "mendesak", | |
| "segera", | |
| "lost", | |
| "hilang", | |
| "stolen", | |
| "dicuri", | |
| "theft", | |
| "pencurian", | |
| "security", | |
| "keamanan", | |
| "safety", | |
| "keselamatan", | |
| "spillage", | |
| "tumpahan", | |
| "contamination", | |
| "kontaminasi", | |
| "unsafe", | |
| "berbahaya", | |
| "risk", | |
| "risiko", | |
| "threat", | |
| "ancaman", | |
| ] | |
| medium_keywords = [ | |
| "delay", | |
| "terlambat", | |
| "keterlambatan", | |
| "telat", | |
| "late", | |
| "wrong", | |
| "salah", | |
| "incorrect", | |
| "tidak benar", | |
| "keliru", | |
| "missing", | |
| "tidak ada", | |
| "error", | |
| "kesalahan", | |
| "galat", | |
| "fail", | |
| "gagal", | |
| "kegagalan", | |
| "problem", | |
| "masalah", | |
| "issue", | |
| "isu", | |
| "complaint", | |
| "keluhan", | |
| "reject", | |
| "tolak", | |
| "ditolak", | |
| "complain", | |
| "komplain", | |
| "keluh", | |
| "misload", | |
| "overload", | |
| ] | |
| low_keywords = [ | |
| "minor", | |
| "kecil", | |
| "ringan", | |
| "small", | |
| "sedikit", | |
| "slight", | |
| "tipis", | |
| "normal", | |
| "biasa", | |
| "routine", | |
| "rutin", | |
| "scheduled", | |
| "terjadwal", | |
| "planned", | |
| "direncanakan", | |
| ] | |
| negation_patterns = [ | |
| "no damage", | |
| "tidak rusak", | |
| "no injury", | |
| "tidak cedera", | |
| "not broken", | |
| "tidak pecah", | |
| "no accident", | |
| "tidak kecelakaan", | |
| "false alarm", | |
| "salah alarm", | |
| "near miss", | |
| "hampir terjadi", | |
| "almost", | |
| "nearly", | |
| "minor damage", | |
| "kerusakan kecil", | |
| "slight damage", | |
| "sedikit rusak", | |
| "no safety", | |
| "tidak keselamatan", | |
| "not urgent", | |
| "tidak mendesak", | |
| "no emergency", | |
| "tidak darurat", | |
| "potential only", | |
| "potensi saja", | |
| ] | |
| intensifier_patterns = [ | |
| "very", | |
| "sangat", | |
| "extremely", | |
| "amat", | |
| "highly", | |
| "urgent", | |
| "mendesak", | |
| "segera", | |
| "immediately", | |
| "secepatnya", | |
| "asap", | |
| "serious", | |
| "serius", | |
| "major", | |
| "besar", | |
| ] | |
| deintensifier_patterns = [ | |
| "slight", | |
| "sedikit", | |
| "minor", | |
| "kecil", | |
| "small", | |
| "partial", | |
| "sebagian", | |
| "temporary", | |
| "sementara", | |
| "minor", | |
| "ringan", | |
| "almost resolved", | |
| "hampir selesai", | |
| ] | |
| results = [] | |
| for text in texts: | |
| text_lower = (text or "").lower() | |
| critical_count = sum(1 for kw in critical_keywords if kw in text_lower) | |
| high_count = sum(1 for kw in high_keywords if kw in text_lower) | |
| medium_count = sum(1 for kw in medium_keywords if kw in text_lower) | |
| low_count = sum(1 for kw in low_keywords if kw in text_lower) | |
| negation_count = sum(1 for neg in negation_patterns if neg in text_lower) | |
| intensifier_count = sum( | |
| 1 for intens in intensifier_patterns if intens in text_lower | |
| ) | |
| deintensifier_count = sum( | |
| 1 for deint in deintensifier_patterns if deint in text_lower | |
| ) | |
| base_severity = "Low" | |
| base_confidence = 0.5 | |
| total_matches = critical_count + high_count + medium_count + low_count | |
| if critical_count >= 1: | |
| base_severity = "Critical" | |
| base_confidence = min(0.95, 0.85 + critical_count * 0.03) | |
| elif high_count >= 2: | |
| base_severity = "High" | |
| base_confidence = min(0.90, 0.80 + high_count * 0.03) | |
| elif high_count >= 1: | |
| base_severity = "High" | |
| base_confidence = 0.75 | |
| elif medium_count >= 2: | |
| base_severity = "Medium" | |
| base_confidence = min(0.85, 0.70 + medium_count * 0.04) | |
| elif medium_count >= 1: | |
| base_severity = "Medium" | |
| base_confidence = 0.65 | |
| elif low_count >= 1: | |
| base_severity = "Low" | |
| base_confidence = 0.70 | |
| else: | |
| base_severity = "Low" | |
| base_confidence = 0.50 | |
| if negation_count > 0 and base_severity in ["Critical", "High"]: | |
| severity_downgrade = { | |
| "Critical": "Medium", | |
| "High": "Medium", | |
| "Medium": "Low", | |
| "Low": "Low", | |
| } | |
| base_severity = severity_downgrade.get(base_severity, base_severity) | |
| base_confidence = max(0.50, base_confidence - 0.15 * negation_count) | |
| if intensifier_count > 0: | |
| base_confidence = min(0.95, base_confidence + 0.05 * intensifier_count) | |
| if base_severity == "Medium" and intensifier_count >= 2: | |
| base_severity = "High" | |
| base_confidence = min(0.85, base_confidence + 0.10) | |
| if deintensifier_count > 0: | |
| base_confidence = max( | |
| 0.40, base_confidence - 0.05 * deintensifier_count | |
| ) | |
| base_confidence = round(base_confidence, 2) | |
| results.append( | |
| { | |
| "severity": base_severity, | |
| "confidence": base_confidence, | |
| "keyword_counts": { | |
| "critical": critical_count, | |
| "high": high_count, | |
| "medium": medium_count, | |
| "low": low_count, | |
| }, | |
| "modifiers": { | |
| "negations": negation_count, | |
| "intensifiers": intensifier_count, | |
| "deintensifiers": deintensifier_count, | |
| }, | |
| } | |
| ) | |
| return results | |
| def classify_severity_with_tfidf_fallback( | |
| self, texts: List[str] | |
| ) -> List[Dict[str, Any]]: | |
| """Classify severity with TF-IDF similarity fallback for low-confidence cases""" | |
| results = self.classify_severity(texts) | |
| if not hasattr(self, "_severity_reference_texts"): | |
| self._init_severity_reference_texts() | |
| if not hasattr(self, "_severity_tfidf_vectorizer"): | |
| try: | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| self._severity_tfidf_vectorizer = TfidfVectorizer( | |
| max_features=1000, ngram_range=(1, 2), stop_words="english" | |
| ) | |
| self._severity_tfidf_matrix = ( | |
| self._severity_tfidf_vectorizer.fit_transform( | |
| self._severity_reference_texts["all"] | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize TF-IDF fallback: {e}") | |
| return results | |
| for i, (text, result) in enumerate(zip(texts, results)): | |
| if result.get("confidence", 0) < 0.6: | |
| try: | |
| tfidf_fallback = self._tfidf_severity_fallback(text) | |
| if tfidf_fallback and tfidf_fallback.get( | |
| "confidence", 0 | |
| ) > result.get("confidence", 0): | |
| result["severity"] = tfidf_fallback["severity"] | |
| result["confidence"] = tfidf_fallback["confidence"] | |
| result["method"] = "tfidf_fallback" | |
| except Exception: | |
| pass | |
| return results | |
| def _init_severity_reference_texts(self): | |
| """Initialize reference texts for TF-IDF similarity""" | |
| self._severity_reference_texts = { | |
| "Critical": [ | |
| "emergency situation requiring immediate evacuation", | |
| "critical safety issue fire explosion severe injury", | |
| "fatal accident death kecelakaan fatal kematian", | |
| "kebakaran darurat critical emergency parah severe", | |
| "explosion ledakan collapse runtuh crash tabrakan", | |
| ], | |
| "High": [ | |
| "significant damage to equipment or property", | |
| "urgent security issue theft stolen dicuri pencurian", | |
| "broken equipment rusak pecah patah torn robek", | |
| "lost baggage hilang unsafe berbahaya risk risiko", | |
| "spillage contamination tumpahan kontaminasi", | |
| ], | |
| "Medium": [ | |
| "delay in operations terlambat keterlambatan", | |
| "wrong incorrect salah keliru error kesalahan", | |
| "fail gagal kegagalan problem masalah issue", | |
| "complaint keluhan reject ditolak misload", | |
| "missing tidak ada galat mistake", | |
| ], | |
| "Low": [ | |
| "minor issue kecil ringan small slight", | |
| "routine scheduled terjadwal normal biasa", | |
| "planned direncanakan routine rutin", | |
| "minor damage kerusakan kecil sedikit", | |
| ], | |
| "all": [], | |
| } | |
| for severity in ["Critical", "High", "Medium", "Low"]: | |
| self._severity_reference_texts["all"].extend( | |
| self._severity_reference_texts[severity] | |
| ) | |
| def _tfidf_severity_fallback(self, text: str) -> Optional[Dict[str, Any]]: | |
| """Use TF-IDF similarity to find similar reference texts""" | |
| if not hasattr(self, "_severity_tfidf_vectorizer"): | |
| return None | |
| try: | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| text_vector = self._severity_tfidf_vectorizer.transform([text]) | |
| similarities = cosine_similarity(text_vector, self._severity_tfidf_matrix)[ | |
| 0 | |
| ] | |
| best_idx = similarities.argmax() | |
| best_similarity = similarities[best_idx] | |
| if best_similarity < 0.2: | |
| return None | |
| ref_texts = self._severity_reference_texts["all"] | |
| matched_text = ref_texts[best_idx] | |
| severity = "Medium" | |
| for sev in ["Critical", "High", "Medium", "Low"]: | |
| if matched_text in self._severity_reference_texts.get(sev, []): | |
| severity = sev | |
| break | |
| for ref in self._severity_reference_texts.get(sev, []): | |
| if matched_text in ref or ref in matched_text: | |
| severity = sev | |
| break | |
| confidence = min(0.75, 0.4 + best_similarity * 0.35) | |
| return { | |
| "severity": severity, | |
| "confidence": round(confidence, 2), | |
| "similarity": round(best_similarity, 2), | |
| } | |
| except Exception: | |
| return None | |
| def classify_issue_type(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Classify issue type using Multi-Task (issue_type) or legacy models""" | |
| has_multitask = hasattr(self, "onnx_session") or ( | |
| hasattr(self, "multi_task_model") and self.multi_task_model is not None | |
| ) | |
| if has_multitask and "issue_type" in self.multi_task_label_encoders: | |
| multi_results = self._classify_with_multitask(texts) | |
| return [ | |
| res.get("issue_type", {"label": "Unknown", "confidence": 0.0}) | |
| for res in multi_results | |
| ] | |
| if self.issue_model is not None and self.issue_tokenizer is not None: | |
| return self._classify_with_model( | |
| texts, self.issue_model, self.issue_tokenizer, self.issue_label_encoder | |
| ) | |
| return [{"label": "Unknown", "confidence": 0.5} for _ in texts] | |
| def classify_root_cause(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Classify root cause using Multi-Task head""" | |
| has_multitask = hasattr(self, "onnx_session") or ( | |
| hasattr(self, "multi_task_model") and self.multi_task_model is not None | |
| ) | |
| if has_multitask and "root_cause" in self.multi_task_label_encoders: | |
| multi_results = self._classify_with_multitask(texts) | |
| return [ | |
| res.get("root_cause", {"label": "Unknown", "confidence": 0.0}) | |
| for res in multi_results | |
| ] | |
| return [{"label": "Unknown", "confidence": 0.0} for _ in texts] | |
| def predict_multi_task(self, texts: List[str]) -> Optional[List[Dict[str, Any]]]: | |
| """Public method for multi-task predictions""" | |
| if getattr(self, "remote", None) and self.remote.enabled: | |
| res = self.remote.multitask_classify(texts) | |
| if res is not None: | |
| out = [] | |
| for r in res: | |
| o = {} | |
| if "severity" in r: | |
| sev = r.get("severity") | |
| if isinstance(sev, dict): | |
| o["severity"] = {"label": sev.get("label") or sev.get("severity"), "confidence": sev.get("confidence") or sev.get("score")} | |
| else: | |
| o["severity"] = {"label": sev, "confidence": r.get("severity_confidence") or r.get("confidence") or 0.8} | |
| if "area" in r: | |
| area = r.get("area") | |
| if isinstance(area, dict): | |
| o["area"] = {"label": area.get("label"), "confidence": area.get("confidence") or area.get("score")} | |
| else: | |
| o["area"] = {"label": area, "confidence": r.get("area_confidence") or 0.8} | |
| if "issue_type" in r or "irregularity" in r: | |
| it = r.get("issue_type") if "issue_type" in r else r.get("irregularity") | |
| if isinstance(it, dict): | |
| o["irregularity"] = {"label": it.get("label"), "confidence": it.get("confidence") or it.get("score")} | |
| else: | |
| o["irregularity"] = {"label": it, "confidence": r.get("issue_type_confidence") or 0.8} | |
| if "root_cause" in r: | |
| rc = r.get("root_cause") | |
| if isinstance(rc, dict): | |
| o["root_cause"] = {"label": rc.get("label"), "confidence": rc.get("confidence") or rc.get("score")} | |
| else: | |
| o["root_cause"] = {"label": rc, "confidence": r.get("root_cause_confidence") or 0.8} | |
| out.append(o) | |
| return out | |
| has_multitask = hasattr(self, "onnx_session") or ( | |
| hasattr(self, "multi_task_model") and self.multi_task_model is not None | |
| ) | |
| if has_multitask: | |
| return self._classify_with_multitask(texts) | |
| return None | |
| def _classify_with_multitask( | |
| self, texts: List[str], batch_size: int = 16 | |
| ) -> List[Dict[str, Any]]: | |
| """Inference using Multi-Task Transformer (ONNX or PyTorch)""" | |
| results = [] | |
| # ONNX Inference | |
| if hasattr(self, "onnx_session") and self.onnx_session is not None: | |
| import numpy as np | |
| output_names = [output.name for output in self.onnx_session.get_outputs()] | |
| temperature = float(os.getenv("ONNX_TEMPERATURE", "1.0")) | |
| if temperature <= 0: | |
| temperature = 1.0 | |
| for i in range(0, len(texts), batch_size): | |
| chunk = texts[i : i + batch_size] | |
| # Tokenize | |
| max_len = int(os.getenv("ONNX_MAX_LEN", "192")) | |
| inputs = self.multi_task_tokenizer(chunk, padding=True, truncation=True, max_length=max_len, return_tensors="np") | |
| onnx_inputs = { | |
| "input_ids": inputs["input_ids"].astype(np.int64), | |
| "attention_mask": inputs["attention_mask"].astype(np.int64), | |
| } | |
| # Run inference | |
| onnx_outputs = self.onnx_session.run(None, onnx_inputs) | |
| # Process outputs (Vectorized) | |
| chunk_results = [{} for _ in range(len(chunk))] | |
| for k, output_name in enumerate(output_names): | |
| logits_batch = onnx_outputs[k] | |
| if temperature != 1.0: | |
| logits_batch = logits_batch / temperature | |
| # Vectorized Softmax | |
| max_logits = np.max(logits_batch, axis=1, keepdims=True) | |
| exp_logits = np.exp(logits_batch - max_logits) | |
| probs_batch = exp_logits / np.sum(exp_logits, axis=1, keepdims=True) | |
| pred_indices = np.argmax(probs_batch, axis=1) | |
| confidences = np.max(probs_batch, axis=1) | |
| if output_name in self.multi_task_label_encoders: | |
| le = self.multi_task_label_encoders[output_name] | |
| try: | |
| labels = le.inverse_transform(pred_indices) | |
| except: | |
| labels = pred_indices.astype(str) | |
| else: | |
| labels = pred_indices.astype(str) | |
| for j in range(len(chunk)): | |
| label_val = str(labels[j]) | |
| if output_name == "severity": | |
| label_val = self._severity_index_to_label.get(label_val, label_val) | |
| elif output_name == "issue_type": | |
| if os.getenv("ISSUE_LABEL_NORMALIZE", "1").lower() in {"1","true","yes"}: | |
| norm_map = { | |
| "Baggage": "Baggage Handling", | |
| "Passenger": "Pax Handling", | |
| "Pax": "Pax Handling", | |
| "Cargo": "Cargo Problems", | |
| "Cargo Problem": "Cargo Problems", | |
| } | |
| label_val = norm_map.get(label_val, label_val) | |
| elif output_name == "root_cause": | |
| if os.getenv("ISSUE_LABEL_NORMALIZE", "1").lower() in {"1","true","yes"}: | |
| norm_map = { | |
| "Doc": "Documentation", | |
| "Document": "Documentation", | |
| } | |
| label_val = norm_map.get(label_val, label_val) | |
| chunk_results[j][output_name] = { | |
| "label": label_val, | |
| "confidence": round(float(confidences[j]), 3), | |
| } | |
| results.extend(chunk_results) | |
| return results | |
| # PyTorch Inference | |
| import torch | |
| if not hasattr(self, "multi_task_model") or self.multi_task_model is None: | |
| return [] | |
| device = next(self.multi_task_model.parameters()).device | |
| results = [] | |
| # Process in chunks to balance speed and memory | |
| for i in range(0, len(texts), batch_size): | |
| chunk = texts[i : i + batch_size] | |
| with torch.no_grad(): | |
| inputs = self.multi_task_tokenizer( | |
| chunk, | |
| padding=True, | |
| truncation=True, | |
| max_length=256, | |
| return_tensors="pt", | |
| ) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| logits_dict = self.multi_task_model(**inputs) | |
| # Extract results for each item in the chunk | |
| for j in range(len(chunk)): | |
| prediction = {} | |
| for key, logits in logits_dict.items(): | |
| # Get probabilities for the j-th record in batch | |
| probs = torch.softmax(logits[j : j + 1], dim=1) | |
| confidence, pred = torch.max(probs, dim=1) | |
| if key in self.multi_task_label_encoders: | |
| le = self.multi_task_label_encoders[key] | |
| label = le.inverse_transform([pred.item()])[0] | |
| prediction[key] = { | |
| "label": str(label), | |
| "confidence": round(float(confidence.item()), 3), | |
| } | |
| results.append(prediction) | |
| return results | |
| def summarize(self, text: str) -> Dict[str, Any]: | |
| """Generate summary of report""" | |
| if getattr(self, "remote", None) and self.remote.enabled: | |
| res = self.remote.summarize([text]) | |
| if res and isinstance(res, list) and res: | |
| item = res[0] | |
| es = item.get("executive_summary") or item.get("summary") or item.get("executiveSummary") or "" | |
| kp = item.get("key_points") or item.get("keyPoints") or [] | |
| return {"executiveSummary": es, "keyPoints": kp} | |
| if self.summarizer is not None: | |
| return self.summarizer.summarize(text) | |
| return self._simple_summarize(text) | |
| def _simple_summarize(self, text: str) -> Dict[str, Any]: | |
| """Simple extractive summarization fallback""" | |
| if not text or len(text) < 50: | |
| return { | |
| "executiveSummary": text[:200] + "..." if len(text) > 200 else text, | |
| "keyPoints": [], | |
| } | |
| important_keywords = [ | |
| "damage", | |
| "torn", | |
| "broken", | |
| "cargo", | |
| "baggage", | |
| "passenger", | |
| "delay", | |
| "late", | |
| "error", | |
| "fail", | |
| ] | |
| sentences = text.replace("!", ".").replace("?", ".").split(".") | |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 20] | |
| sentence_scores = [] | |
| for sent in sentences: | |
| score = sum(1 for kw in important_keywords if kw in sent.lower()) | |
| sentence_scores.append((sent, score)) | |
| sentence_scores.sort(key=lambda x: x[1], reverse=True) | |
| top_sentences = [s[0] for s in sentence_scores[:3]] | |
| summary = ". ".join(top_sentences) + "." if top_sentences else text[:200] | |
| key_points = [] | |
| if any(kw in text.lower() for kw in ["cargo", "uld", "kargo"]): | |
| key_points.append("Cargo-related issue") | |
| if any(kw in text.lower() for kw in ["baggage", "bag", "bagasi"]): | |
| key_points.append("Baggage handling issue") | |
| if any(kw in text.lower() for kw in ["passenger", "pax", "penumpang"]): | |
| key_points.append("Passenger service issue") | |
| if any( | |
| kw in text.lower() | |
| for kw in ["damage", "torn", "broken", "rusak", "pecah", "robek"] | |
| ): | |
| key_points.append("Physical damage reported") | |
| if any(kw in text.lower() for kw in ["delay", "terlambat", "telat"]): | |
| key_points.append("Delay reported") | |
| return { | |
| "executiveSummary": summary[:300] + "..." | |
| if len(summary) > 300 | |
| else summary, | |
| "keyPoints": key_points[:5], | |
| } | |
| def analyze_urgency(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| if getattr(self, "remote", None) and self.remote.enabled: | |
| res = self.remote.urgency(texts) | |
| if res is not None and isinstance(res, list): | |
| out = [] | |
| for r in res: | |
| out.append( | |
| { | |
| "urgency_score": r.get("urgency_score") or r.get("score") or 0.0, | |
| "sentiment": r.get("sentiment") or "Neutral", | |
| "keywords": r.get("keywords") or [], | |
| } | |
| ) | |
| return out | |
| critical = { | |
| "emergency": 1.2, | |
| "darurat": 1.2, | |
| "critical": 1.2, | |
| "kritis": 1.2, | |
| "severe": 1.1, | |
| "parah": 1.1, | |
| "injury": 1.2, | |
| "cedera": 1.2, | |
| "death": 1.3, | |
| "kematian": 1.3, | |
| "fire": 1.3, | |
| "kebakaran": 1.3, | |
| "explosion": 1.3, | |
| "ledakan": 1.3, | |
| "evacuate": 1.2, | |
| "evakuasi": 1.2, | |
| "safety": 1.1, | |
| "keselamatan": 1.1, | |
| "security": 1.0, | |
| "keamanan": 1.0, | |
| "accident": 1.2, | |
| "kecelakaan": 1.2, | |
| "hazard": 1.1, | |
| "dangerous": 1.1, | |
| "unsafe": 1.1, | |
| } | |
| high = { | |
| "damage": 1.0, | |
| "rusak": 1.0, | |
| "broken": 1.0, | |
| "pecah": 1.0, | |
| "patah": 1.0, | |
| "torn": 0.9, | |
| "robek": 0.9, | |
| "spillage": 0.9, | |
| "bocor": 0.9, | |
| "lost": 0.8, | |
| "hilang": 0.8, | |
| "stolen": 0.9, | |
| "dicuri": 0.9, | |
| "theft": 0.9, | |
| "pencurian": 0.9, | |
| "unsafe": 1.0, | |
| "berbahaya": 1.0, | |
| "major": 1.0, | |
| "significant": 0.9, | |
| } | |
| medium = { | |
| "delay": 0.5, | |
| "terlambat": 0.5, | |
| "telat": 0.5, | |
| "late": 0.5, | |
| "misload": 0.6, | |
| "salah muat": 0.6, | |
| "wrong": 0.45, | |
| "incorrect": 0.45, | |
| "keliru": 0.45, | |
| "missing": 0.45, | |
| "tidak ada": 0.45, | |
| "error": 0.45, | |
| "kesalahan": 0.45, | |
| "fail": 0.45, | |
| "gagal": 0.45, | |
| "complaint": 0.45, | |
| "keluhan": 0.45, | |
| "complain": 0.45, | |
| "komplain": 0.45, | |
| } | |
| low = { | |
| "minor": 0.25, | |
| "kecil": 0.25, | |
| "ringan": 0.25, | |
| "small": 0.25, | |
| "slight": 0.25, | |
| "normal": 0.15, | |
| "rutin": 0.15, | |
| "routine": 0.15, | |
| } | |
| intensifiers = { | |
| "very", | |
| "sangat", | |
| "extremely", | |
| "sangatlah", | |
| "urgent", | |
| "mendesak", | |
| "segera", | |
| "immediately", | |
| "secepatnya", | |
| "asap", | |
| "as soon as possible", | |
| "right now", | |
| } | |
| deintensifiers = {"slight", "sedikit", "minor", "low", "ringan", "slightly", "partially", "temporary"} | |
| negations = {"no", "not", "tidak", "bukan", "tanpa", "false alarm", "no damage", "no injury", "not urgent", "no issue", "no problem"} | |
| results = [] | |
| for text in texts: | |
| tl = (text or "").lower() | |
| found = set() | |
| score = 0.0 | |
| for kw, w in critical.items(): | |
| if kw in tl: | |
| found.add(kw) | |
| score += w | |
| for kw, w in high.items(): | |
| if kw in tl: | |
| found.add(kw) | |
| score += w | |
| for kw, w in medium.items(): | |
| if kw in tl: | |
| found.add(kw) | |
| score += w | |
| for kw, w in low.items(): | |
| if kw in tl: | |
| found.add(kw) | |
| score += w | |
| intens_count = sum(1 for t in intensifiers if t in tl) | |
| if intens_count > 0 and score > 0: | |
| score *= min(1.35, 1.0 + self.urgency_intensifier_scale * intens_count) | |
| deint_count = sum(1 for t in deintensifiers if t in tl) | |
| if deint_count > 0: | |
| score *= max(0.5, 1.0 - self.urgency_deintensifier_scale * deint_count) | |
| if any(n in tl for n in negations) and any( | |
| k in tl | |
| for k in [ | |
| "damage", | |
| "rusak", | |
| "broken", | |
| "pecah", | |
| "injury", | |
| "cedera", | |
| "fire", | |
| "kebakaran", | |
| ] | |
| ): | |
| score *= self.urgency_negation_multiplier | |
| excl_bonus = min(0.08, tl.count("!") * 0.02) | |
| caps_ratio = 0.0 | |
| letters = [c for c in text if c.isalpha()] | |
| if letters: | |
| caps_ratio = sum(1 for c in letters if c.isupper()) / len(letters) | |
| caps_bonus = 0.08 if caps_ratio > 0.35 else 0.0 | |
| score += excl_bonus + caps_bonus | |
| norm = min(1.0, score / self.urgency_norm_denom) | |
| if norm >= self.urgency_critical_threshold: | |
| sentiment = "Critical Negative" | |
| elif norm >= self.urgency_negative_threshold: | |
| sentiment = "Negative" | |
| elif norm >= self.urgency_somewhat_threshold: | |
| sentiment = "Somewhat Negative" | |
| else: | |
| sentiment = "Neutral" | |
| results.append( | |
| { | |
| "urgency_score": round(norm, 2), | |
| "sentiment": sentiment, | |
| "keywords": sorted(list(found))[:15], | |
| } | |
| ) | |
| return results | |