Spaces:
Sleeping
Sleeping
| """ | |
| NLP Model Service for Gapura AI | |
| Loads and uses trained NLP models for severity classification, issue type classification, etc. | |
| Falls back to rule-based logic if models are not available. | |
| """ | |
| import os | |
| import logging | |
| import pickle | |
| from typing import List, Dict, Any, Optional | |
| import numpy as np | |
| import inspect | |
| logger = logging.getLogger(__name__) | |
| _nlp_instance = None | |
| def get_nlp_service(): | |
| """Get or create singleton NLPModelService instance""" | |
| global _nlp_instance | |
| if _nlp_instance is None: | |
| _nlp_instance = NLPModelService() | |
| return _nlp_instance | |
| class NLPModelService: | |
| """Service for NLP predictions using trained models or rule-based fallback""" | |
| def __init__(self): | |
| self.severity_model = None | |
| self.severity_tokenizer = None | |
| self.severity_label_encoder = None | |
| self.severity_vectorizer = None | |
| self.severity_classifier = None # For TF-IDF classifier | |
| self.issue_model = None | |
| self.issue_tokenizer = None | |
| self.issue_label_encoder = None | |
| self.summarizer = None | |
| self.version = "1.0.0-rule-based" | |
| self.models_loaded = False | |
| self._load_models() | |
| def _load_models(self): | |
| """Load or create NLP models""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| nlp_dir = os.path.join(base_dir, "models", "nlp") | |
| multitask_onnx_path = os.path.join( | |
| base_dir, "models", "multi_task_transformer.onnx" | |
| ) | |
| multitask_pt_path = os.path.join( | |
| base_dir, "models", "multi_task_transformer.pt" | |
| ) | |
| encoders_pkl = os.path.join(base_dir, "models", "multi_task_label_encoders.pkl") | |
| encoders_json = os.path.join( | |
| base_dir, "models", "multi_task_label_encoders.json" | |
| ) | |
| disable_multitask = os.getenv("NLP_DISABLE_MULTITASK", "").lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| } | |
| allow_transformers = os.getenv("NLP_ALLOW_TRANSFORMERS", "").lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| } | |
| prefer_multitask = os.getenv("NLP_PREFER_MULTITASK", "").lower() in { | |
| "1", | |
| "true", | |
| "yes", | |
| } | |
| # Try severity_bert (fine-tuned) first, then severity_classifier | |
| severity_bert_dir = os.path.join(nlp_dir, "severity_bert") | |
| severity_classifier_dir = os.path.join(nlp_dir, "severity_classifier") | |
| # Try fine-tuned severity_bert first | |
| if os.path.exists(severity_bert_dir): | |
| try: | |
| cfg = os.path.join(severity_bert_dir, "config.json") | |
| has_model = os.path.exists( | |
| os.path.join(severity_bert_dir, "model.safetensors") | |
| ) | |
| has_tok = os.path.exists( | |
| os.path.join(severity_bert_dir, "tokenizer.json") | |
| ) | |
| if os.path.exists(cfg) and has_model and has_tok: | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| ) | |
| import torch | |
| logger.info("Loading fine-tuned severity_bert model...") | |
| self.severity_tokenizer = AutoTokenizer.from_pretrained( | |
| severity_bert_dir | |
| ) | |
| self.severity_model = ( | |
| AutoModelForSequenceClassification.from_pretrained( | |
| severity_bert_dir | |
| ) | |
| ) | |
| self.severity_model.to( | |
| torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| ) | |
| self.severity_model.eval() | |
| try: | |
| with open( | |
| os.path.join(severity_bert_dir, "label_encoder.pkl"), "rb" | |
| ) as f: | |
| self.severity_label_encoder = pickle.load(f) | |
| except Exception: | |
| self.severity_label_encoder = { | |
| "Low": 0, | |
| "Medium": 1, | |
| "High": 2, | |
| "Critical": 3, | |
| } | |
| self.models_loaded = True | |
| self.version = "3.0.0-severity-bert" | |
| logger.info( | |
| f"Loaded fine-tuned severity_bert model (v{self.version})" | |
| ) | |
| if not prefer_multitask: | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load severity_bert: {e}") | |
| # Fallback to severity_classifier (TF-IDF or HF) | |
| if not self.models_loaded and os.path.exists(severity_classifier_dir): | |
| try: | |
| cfg = os.path.join(severity_classifier_dir, "config.json") | |
| has_pt = os.path.exists( | |
| os.path.join(severity_classifier_dir, "pytorch_model.bin") | |
| ) or os.path.exists( | |
| os.path.join(severity_classifier_dir, "model.safetensors") | |
| ) | |
| has_tok = os.path.exists( | |
| os.path.join(severity_classifier_dir, "tokenizer.json") | |
| ) or os.path.exists(os.path.join(severity_classifier_dir, "vocab.txt")) | |
| if os.path.exists(cfg) and (has_pt or has_tok): | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| ) | |
| import torch | |
| self.severity_tokenizer = AutoTokenizer.from_pretrained( | |
| severity_classifier_dir | |
| ) | |
| self.severity_model = ( | |
| AutoModelForSequenceClassification.from_pretrained( | |
| severity_classifier_dir | |
| ) | |
| ) | |
| self.severity_model.to( | |
| torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| ) | |
| try: | |
| with open( | |
| os.path.join(severity_classifier_dir, "label_encoder.pkl"), | |
| "rb", | |
| ) as f: | |
| self.severity_label_encoder = pickle.load(f) | |
| except Exception: | |
| self.severity_label_encoder = None | |
| self.models_loaded = True | |
| self.version = "2.0.0-hf" | |
| logger.info("Loaded HF severity classifier") | |
| if not prefer_multitask: | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load HF severity classifier: {e}") | |
| if self.severity_model is None and self.severity_vectorizer is None: | |
| try: | |
| with open( | |
| os.path.join(severity_classifier_dir, "vectorizer.pkl"), "rb" | |
| ) as f: | |
| self.severity_vectorizer = pickle.load(f) | |
| with open( | |
| os.path.join(severity_classifier_dir, "classifier.pkl"), "rb" | |
| ) as f: | |
| self.severity_model = pickle.load(f) | |
| try: | |
| with open( | |
| os.path.join(severity_classifier_dir, "label_encoder.pkl"), | |
| "rb", | |
| ) as f: | |
| label_data = pickle.load(f) | |
| self.severity_label_encoder = label_data.get( | |
| "reverse_map", | |
| {0: "Critical", 1: "High", 2: "Medium", 3: "Low"}, | |
| ) | |
| except Exception: | |
| self.severity_label_encoder = None | |
| self.models_loaded = True | |
| self.version = "1.0.0-trained-tfidf" | |
| logger.info( | |
| "Loaded trained severity classifier (TF-IDF + RandomForest)" | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to load TF-IDF classifier: {e}") | |
| # Prefer ONNX multi-task only if explicitly preferred | |
| if ( | |
| prefer_multitask | |
| and not disable_multitask | |
| and allow_transformers | |
| and os.path.exists(multitask_onnx_path) | |
| ): | |
| try: | |
| from transformers import DistilBertTokenizer, AutoTokenizer | |
| import onnxruntime as ort | |
| import json | |
| try: | |
| self.multi_task_tokenizer = DistilBertTokenizer.from_pretrained( | |
| "distilbert-base-uncased", local_files_only=True | |
| ) | |
| except Exception: | |
| self.multi_task_tokenizer = AutoTokenizer.from_pretrained( | |
| "distilbert-base-uncased" | |
| ) | |
| sess_options = ort.SessionOptions() | |
| sess_options.intra_op_num_threads = 1 | |
| sess_options.graph_optimization_level = ( | |
| ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| ) | |
| self.onnx_session = ort.InferenceSession( | |
| multitask_onnx_path, sess_options | |
| ) | |
| self.multi_task_label_encoders = {} | |
| if os.path.exists(encoders_pkl): | |
| try: | |
| with open(encoders_pkl, "rb") as f: | |
| self.multi_task_label_encoders = pickle.load(f) | |
| except Exception: | |
| self.multi_task_label_encoders = {} | |
| elif os.path.exists(encoders_json): | |
| try: | |
| with open(encoders_json, "r", encoding="utf-8") as f: | |
| self.multi_task_label_encoders = json.load(f) | |
| except Exception: | |
| self.multi_task_label_encoders = {} | |
| self.models_loaded = True | |
| self.version = "4.0.0-onnx" | |
| logger.info(f"Loaded optimized ONNX multi-task model (v{self.version})") | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load ONNX multi-task model: {e}") | |
| # Fallback: PyTorch checkpoint (may be heavy) | |
| if ( | |
| prefer_multitask | |
| and not disable_multitask | |
| and allow_transformers | |
| and os.path.exists(multitask_pt_path) | |
| ): | |
| try: | |
| import torch | |
| from transformers import DistilBertTokenizer | |
| # Load metadata from PyTorch checkpoint | |
| checkpoint = torch.load( | |
| multitask_pt_path, weights_only=False, map_location="cpu" | |
| ) | |
| self.multi_task_label_encoders = checkpoint["label_encoders"] | |
| try: | |
| self.multi_task_tokenizer = DistilBertTokenizer.from_pretrained( | |
| "distilbert-base-uncased", local_files_only=True | |
| ) | |
| except Exception: | |
| raise RuntimeError("DistilBert tokenizer not available locally") | |
| # Fallback to PyTorch model | |
| from data.transformer_architecture import MultiTaskDistilBert | |
| num_labels_dict = checkpoint["num_labels_dict"] | |
| self.multi_task_model = MultiTaskDistilBert(num_labels_dict) | |
| self.multi_task_model.load_state_dict(checkpoint["model_state_dict"]) | |
| self.multi_task_model.eval() | |
| self.models_loaded = True | |
| self.version = "4.0.0-synthetic-plus" | |
| logger.info( | |
| f"Loaded trained multi-task transformer model (v{self.version})" | |
| ) | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load multi-task transformer: {e}") | |
| # Try to load IndoBERT classifier (slower but more accurate) | |
| bert_dir = os.path.join(nlp_dir, "severity_bert") | |
| if allow_transformers and os.path.exists(bert_dir): | |
| try: | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| ) | |
| import torch | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.severity_tokenizer = AutoTokenizer.from_pretrained(bert_dir) | |
| self.severity_model = ( | |
| AutoModelForSequenceClassification.from_pretrained(bert_dir) | |
| ) | |
| self.severity_model.to(device) | |
| self.severity_model.eval() | |
| with open(os.path.join(bert_dir, "label_encoder.pkl"), "rb") as f: | |
| self.severity_label_encoder = pickle.load(f) | |
| self.models_loaded = True | |
| self.version = "1.0.0-trained-bert" | |
| logger.info("Loaded trained IndoBERT severity classifier") | |
| return | |
| except Exception as e: | |
| logger.warning(f"Failed to load IndoBERT classifier: {e}") | |
| # If any model has been loaded successfully, keep it and skip fallback | |
| if self.models_loaded: | |
| logger.info("Using loaded NLP models") | |
| return | |
| else: | |
| logger.info("No trained NLP models found, using rule-based fallback") | |
| self.models_loaded = False | |
| def classify_severity(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Classify severity of reports""" | |
| # Multi-task model provides 'report' category (Irregularity vs Complaint) | |
| if hasattr(self, "multi_task_model") and self.multi_task_model is not None: | |
| # For severity, we still might want to use the dedicated severity model if it exists | |
| # but here we can at least show we have the multi-task data. | |
| # Actually, the user request was to upgrade to multi-task for ALL targets. | |
| # If severity is NOT one of the multi-task targets, we use the fallback or dedicated model. | |
| pass | |
| # Check if IndoBERT model is loaded (dedicated severity) | |
| if ( | |
| hasattr(self, "severity_model") | |
| and self.severity_model is not None | |
| and self.severity_tokenizer is not None | |
| ): | |
| return self._classify_with_model( | |
| texts, | |
| self.severity_model, | |
| self.severity_tokenizer, | |
| self.severity_label_encoder, | |
| ) | |
| # Check if TF-IDF model is loaded | |
| if ( | |
| hasattr(self, "severity_vectorizer") | |
| and self.severity_vectorizer is not None | |
| ): | |
| return self._classify_with_tfidf(texts) | |
| return self._classify_severity_rule_based(texts) | |
| def _classify_with_tfidf(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Use trained TF-IDF + classifier for classification""" | |
| results = [] | |
| X = self.severity_vectorizer.transform(texts) | |
| predictions = self.severity_model.predict(X) | |
| probabilities = self.severity_model.predict_proba(X) | |
| for i, (pred, probs) in enumerate(zip(predictions, probabilities)): | |
| if isinstance(self.severity_label_encoder, dict): | |
| label_map = self.severity_label_encoder.get("label_map", {}) | |
| if isinstance(pred, (int, np.integer)): | |
| label = label_map.get(pred, "Low") | |
| else: | |
| label = str(pred) | |
| else: | |
| label = str(pred) | |
| confidence = float(max(probs)) | |
| results.append( | |
| { | |
| "severity": label, | |
| "confidence": round(confidence, 2), | |
| } | |
| ) | |
| return results | |
| def _classify_with_model( | |
| self, texts: List[str], model, tokenizer, label_encoder | |
| ) -> List[Dict]: | |
| """Use trained model for classification""" | |
| import torch | |
| device = next(model.parameters()).device | |
| results = [] | |
| with torch.no_grad(): | |
| for text in texts: | |
| inputs = tokenizer( | |
| text, | |
| padding=True, | |
| truncation=True, | |
| max_length=512, | |
| return_tensors="pt", | |
| ) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| sig = inspect.signature(model.forward) | |
| allowed = set(sig.parameters.keys()) | |
| filtered_inputs = {k: v for k, v in inputs.items() if k in allowed} | |
| outputs = model(**filtered_inputs) | |
| probs = torch.softmax(outputs.logits, dim=1) | |
| confidence, pred = torch.max(probs, dim=1) | |
| if label_encoder is not None and hasattr( | |
| label_encoder, "inverse_transform" | |
| ): | |
| label = label_encoder.inverse_transform([pred.item()])[0] | |
| elif hasattr(model, "config") and hasattr(model.config, "id2label"): | |
| label = model.config.id2label.get(pred.item(), str(pred.item())) | |
| else: | |
| label = str(pred.item()) | |
| results.append( | |
| { | |
| "severity": label, | |
| "confidence": confidence.item(), | |
| } | |
| ) | |
| return results | |
| def _classify_severity_rule_based(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Rule-based severity classification fallback with bilingual keywords""" | |
| critical_keywords = [ | |
| "emergency", | |
| "darurat", | |
| "critical", | |
| "kritis", | |
| "genting", | |
| "severe", | |
| "parah", | |
| "serius", | |
| "injury", | |
| "cedera", | |
| "luka", | |
| "accident", | |
| "kecelakaan", | |
| "safety issue", | |
| "masalah keselamatan", | |
| "fire", | |
| "kebakaran", | |
| "api", | |
| "explosion", | |
| "ledakan", | |
| "death", | |
| "kematian", | |
| "meninggal", | |
| ] | |
| high_keywords = [ | |
| "damage", | |
| "rusak", | |
| "kerusakan", | |
| "torn", | |
| "robek", | |
| "sobek", | |
| "broken", | |
| "pecah", | |
| "patah", | |
| "urgent", | |
| "mendesak", | |
| "segera", | |
| "lost", | |
| "hilang", | |
| "stolen", | |
| "dicuri", | |
| "theft", | |
| "pencurian", | |
| "security", | |
| "keamanan", | |
| "safety", | |
| "keselamatan", | |
| ] | |
| medium_keywords = [ | |
| "delay", | |
| "terlambat", | |
| "keterlambatan", | |
| "telat", | |
| "late", | |
| "terlambat", | |
| "wrong", | |
| "salah", | |
| "incorrect", | |
| "tidak benar", | |
| "keliru", | |
| "missing", | |
| "hilang", | |
| "tidak ada", | |
| "error", | |
| "kesalahan", | |
| "galat", | |
| "fail", | |
| "gagal", | |
| "kegagalan", | |
| "problem", | |
| "masalah", | |
| "issue", | |
| "isu", | |
| "complaint", | |
| "keluhan", | |
| "reject", | |
| "tolak", | |
| "ditolak", | |
| "complain", | |
| "komplain", | |
| "keluh", | |
| ] | |
| low_keywords = [ | |
| "minor", | |
| "kecil", | |
| "ringan", | |
| "small", | |
| "sedikit", | |
| "slight", | |
| "tipis", | |
| "normal", | |
| "biasa", | |
| "routine", | |
| "rutin", | |
| ] | |
| results = [] | |
| for text in texts: | |
| text_lower = text.lower() | |
| critical_count = sum(1 for kw in critical_keywords if kw in text_lower) | |
| high_count = sum(1 for kw in high_keywords if kw in text_lower) | |
| medium_count = sum(1 for kw in medium_keywords if kw in text_lower) | |
| low_count = sum(1 for kw in low_keywords if kw in text_lower) | |
| if critical_count >= 1: | |
| severity = "Critical" | |
| confidence = 0.92 | |
| elif high_count >= 2: | |
| severity = "High" | |
| confidence = 0.88 | |
| elif high_count >= 1: | |
| severity = "High" | |
| confidence = 0.82 | |
| elif medium_count >= 2: | |
| severity = "Medium" | |
| confidence = 0.78 | |
| elif medium_count >= 1: | |
| severity = "Medium" | |
| confidence = 0.72 | |
| elif low_count >= 1: | |
| severity = "Low" | |
| confidence = 0.85 | |
| else: | |
| severity = "Low" | |
| confidence = 0.80 | |
| results.append( | |
| { | |
| "severity": severity, | |
| "confidence": confidence, | |
| "keyword_counts": { | |
| "critical": critical_count, | |
| "high": high_count, | |
| "medium": medium_count, | |
| "low": low_count, | |
| }, | |
| } | |
| ) | |
| return results | |
| def classify_issue_type(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Classify issue type using Multi-Task (issue_type) or legacy models""" | |
| has_multitask = hasattr(self, "onnx_session") or ( | |
| hasattr(self, "multi_task_model") and self.multi_task_model is not None | |
| ) | |
| if has_multitask and "issue_type" in self.multi_task_label_encoders: | |
| multi_results = self._classify_with_multitask(texts) | |
| return [ | |
| res.get("issue_type", {"label": "Unknown", "confidence": 0.0}) | |
| for res in multi_results | |
| ] | |
| if self.issue_model is not None and self.issue_tokenizer is not None: | |
| return self._classify_with_model( | |
| texts, self.issue_model, self.issue_tokenizer, self.issue_label_encoder | |
| ) | |
| return [{"label": "Unknown", "confidence": 0.5} for _ in texts] | |
| def classify_root_cause(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| """Classify root cause using Multi-Task head""" | |
| has_multitask = hasattr(self, "onnx_session") or ( | |
| hasattr(self, "multi_task_model") and self.multi_task_model is not None | |
| ) | |
| if has_multitask and "root_cause" in self.multi_task_label_encoders: | |
| multi_results = self._classify_with_multitask(texts) | |
| return [ | |
| res.get("root_cause", {"label": "Unknown", "confidence": 0.0}) | |
| for res in multi_results | |
| ] | |
| return [{"label": "Unknown", "confidence": 0.0} for _ in texts] | |
| def predict_multi_task(self, texts: List[str]) -> Optional[List[Dict[str, Any]]]: | |
| """Public method for multi-task predictions""" | |
| has_multitask = hasattr(self, "onnx_session") or ( | |
| hasattr(self, "multi_task_model") and self.multi_task_model is not None | |
| ) | |
| if has_multitask: | |
| return self._classify_with_multitask(texts) | |
| return None | |
| def _classify_with_multitask( | |
| self, texts: List[str], batch_size: int = 16 | |
| ) -> List[Dict[str, Any]]: | |
| """Inference using Multi-Task Transformer (ONNX or PyTorch)""" | |
| results = [] | |
| # ONNX Inference | |
| if hasattr(self, "onnx_session") and self.onnx_session is not None: | |
| import numpy as np | |
| output_names = [output.name for output in self.onnx_session.get_outputs()] | |
| for i in range(0, len(texts), batch_size): | |
| chunk = texts[i : i + batch_size] | |
| # Tokenize | |
| inputs = self.multi_task_tokenizer( | |
| chunk, | |
| padding=True, | |
| truncation=True, | |
| max_length=256, | |
| return_tensors="np", | |
| ) | |
| onnx_inputs = { | |
| "input_ids": inputs["input_ids"].astype(np.int64), | |
| "attention_mask": inputs["attention_mask"].astype(np.int64), | |
| } | |
| # Run inference | |
| onnx_outputs = self.onnx_session.run(None, onnx_inputs) | |
| # Process outputs (Vectorized) | |
| chunk_results = [{} for _ in range(len(chunk))] | |
| for k, output_name in enumerate(output_names): | |
| logits_batch = onnx_outputs[k] | |
| # Vectorized Softmax | |
| max_logits = np.max(logits_batch, axis=1, keepdims=True) | |
| exp_logits = np.exp(logits_batch - max_logits) | |
| probs_batch = exp_logits / np.sum(exp_logits, axis=1, keepdims=True) | |
| pred_indices = np.argmax(probs_batch, axis=1) | |
| confidences = np.max(probs_batch, axis=1) | |
| if output_name in self.multi_task_label_encoders: | |
| le = self.multi_task_label_encoders[output_name] | |
| try: | |
| labels = le.inverse_transform(pred_indices) | |
| except: | |
| labels = pred_indices.astype(str) | |
| else: | |
| labels = pred_indices.astype(str) | |
| for j in range(len(chunk)): | |
| chunk_results[j][output_name] = { | |
| "label": str(labels[j]), | |
| "confidence": round(float(confidences[j]), 3), | |
| } | |
| results.extend(chunk_results) | |
| return results | |
| # PyTorch Inference | |
| import torch | |
| if not hasattr(self, "multi_task_model") or self.multi_task_model is None: | |
| return [] | |
| device = next(self.multi_task_model.parameters()).device | |
| results = [] | |
| # Process in chunks to balance speed and memory | |
| for i in range(0, len(texts), batch_size): | |
| chunk = texts[i : i + batch_size] | |
| with torch.no_grad(): | |
| inputs = self.multi_task_tokenizer( | |
| chunk, | |
| padding=True, | |
| truncation=True, | |
| max_length=256, | |
| return_tensors="pt", | |
| ) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| logits_dict = self.multi_task_model(**inputs) | |
| # Extract results for each item in the chunk | |
| for j in range(len(chunk)): | |
| prediction = {} | |
| for key, logits in logits_dict.items(): | |
| # Get probabilities for the j-th record in batch | |
| probs = torch.softmax(logits[j : j + 1], dim=1) | |
| confidence, pred = torch.max(probs, dim=1) | |
| if key in self.multi_task_label_encoders: | |
| le = self.multi_task_label_encoders[key] | |
| label = le.inverse_transform([pred.item()])[0] | |
| prediction[key] = { | |
| "label": str(label), | |
| "confidence": round(float(confidence.item()), 3), | |
| } | |
| results.append(prediction) | |
| return results | |
| def summarize(self, text: str) -> Dict[str, Any]: | |
| """Generate summary of report""" | |
| if self.summarizer is not None: | |
| return self.summarizer.summarize(text) | |
| return self._simple_summarize(text) | |
| def _simple_summarize(self, text: str) -> Dict[str, Any]: | |
| """Simple extractive summarization fallback""" | |
| max_chars = int(os.getenv("SUMMARY_MAX_CHARS", "600")) | |
| if not text or len(text) < 50: | |
| return { | |
| "executiveSummary": (text[:max_chars] + "...") | |
| if len(text) > max_chars | |
| else text, | |
| "keyPoints": [], | |
| } | |
| important_keywords = [ | |
| "damage", | |
| "torn", | |
| "broken", | |
| "cargo", | |
| "baggage", | |
| "passenger", | |
| "delay", | |
| "late", | |
| "error", | |
| "fail", | |
| ] | |
| sentences = text.replace("!", ".").replace("?", ".").split(".") | |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 20] | |
| sentence_scores = [] | |
| for sent in sentences: | |
| score = sum(1 for kw in important_keywords if kw in sent.lower()) | |
| sentence_scores.append((sent, score)) | |
| sentence_scores.sort(key=lambda x: x[1], reverse=True) | |
| top_sentences = [s[0] for s in sentence_scores[:3]] | |
| summary = ". ".join(top_sentences) + "." if top_sentences else text | |
| key_points = [] | |
| if any(kw in text.lower() for kw in ["cargo", "uld", "kargo"]): | |
| key_points.append("Cargo-related issue") | |
| if any(kw in text.lower() for kw in ["baggage", "bag", "bagasi"]): | |
| key_points.append("Baggage handling issue") | |
| if any(kw in text.lower() for kw in ["passenger", "pax", "penumpang"]): | |
| key_points.append("Passenger service issue") | |
| if any( | |
| kw in text.lower() | |
| for kw in ["damage", "torn", "broken", "rusak", "pecah", "robek"] | |
| ): | |
| key_points.append("Physical damage reported") | |
| if any(kw in text.lower() for kw in ["delay", "terlambat", "telat"]): | |
| key_points.append("Delay reported") | |
| return { | |
| "executiveSummary": (summary[:max_chars] + "...") | |
| if len(summary) > max_chars | |
| else summary, | |
| "keyPoints": key_points[:5], | |
| } | |
| def analyze_urgency(self, texts: List[str]) -> List[Dict[str, Any]]: | |
| critical = { | |
| "emergency": 1.2, | |
| "darurat": 1.2, | |
| "critical": 1.2, | |
| "kritis": 1.2, | |
| "severe": 1.1, | |
| "parah": 1.1, | |
| "injury": 1.2, | |
| "cedera": 1.2, | |
| "death": 1.3, | |
| "kematian": 1.3, | |
| "fire": 1.3, | |
| "kebakaran": 1.3, | |
| "explosion": 1.3, | |
| "ledakan": 1.3, | |
| "evacuate": 1.2, | |
| "evakuasi": 1.2, | |
| "safety": 1.1, | |
| "keselamatan": 1.1, | |
| "security": 1.0, | |
| "keamanan": 1.0, | |
| "accident": 1.2, | |
| "kecelakaan": 1.2, | |
| } | |
| high = { | |
| "damage": 1.0, | |
| "rusak": 1.0, | |
| "broken": 1.0, | |
| "pecah": 1.0, | |
| "patah": 1.0, | |
| "torn": 0.9, | |
| "robek": 0.9, | |
| "spillage": 0.9, | |
| "bocor": 0.9, | |
| "lost": 0.8, | |
| "hilang": 0.8, | |
| "stolen": 0.9, | |
| "dicuri": 0.9, | |
| "theft": 0.9, | |
| "pencurian": 0.9, | |
| "unsafe": 1.0, | |
| "berbahaya": 1.0, | |
| } | |
| medium = { | |
| "delay": 0.6, | |
| "terlambat": 0.6, | |
| "telat": 0.6, | |
| "late": 0.6, | |
| "misload": 0.6, | |
| "salah muat": 0.6, | |
| "wrong": 0.5, | |
| "incorrect": 0.5, | |
| "keliru": 0.5, | |
| "missing": 0.5, | |
| "tidak ada": 0.5, | |
| "error": 0.5, | |
| "kesalahan": 0.5, | |
| "fail": 0.5, | |
| "gagal": 0.5, | |
| "complaint": 0.5, | |
| "keluhan": 0.5, | |
| "complain": 0.5, | |
| "komplain": 0.5, | |
| } | |
| low = { | |
| "minor": 0.3, | |
| "kecil": 0.3, | |
| "ringan": 0.3, | |
| "small": 0.3, | |
| "slight": 0.3, | |
| "normal": 0.2, | |
| "rutin": 0.2, | |
| "routine": 0.2, | |
| } | |
| intensifiers = { | |
| "very", | |
| "sangat", | |
| "extremely", | |
| "sangatlah", | |
| "urgent", | |
| "mendesak", | |
| "segera", | |
| "immediately", | |
| "secepatnya", | |
| } | |
| deintensifiers = {"slight", "sedikit", "minor", "low", "ringan"} | |
| negations = {"no", "not", "tidak", "bukan", "tanpa", "false alarm"} | |
| results = [] | |
| for text in texts: | |
| tl = (text or "").lower() | |
| found = set() | |
| score = 0.0 | |
| for kw, w in critical.items(): | |
| if kw in tl: | |
| found.add(kw) | |
| score += w | |
| for kw, w in high.items(): | |
| if kw in tl: | |
| found.add(kw) | |
| score += w | |
| for kw, w in medium.items(): | |
| if kw in tl: | |
| found.add(kw) | |
| score += w | |
| for kw, w in low.items(): | |
| if kw in tl: | |
| found.add(kw) | |
| score += w | |
| intens_count = sum(1 for t in intensifiers if t in tl) | |
| if intens_count > 0 and score > 0: | |
| score *= min(1.4, 1.0 + 0.15 * intens_count) | |
| deint_count = sum(1 for t in deintensifiers if t in tl) | |
| if deint_count > 0: | |
| score *= max(0.6, 1.0 - 0.1 * deint_count) | |
| if any(n in tl for n in negations) and any( | |
| k in tl | |
| for k in [ | |
| "damage", | |
| "rusak", | |
| "broken", | |
| "pecah", | |
| "injury", | |
| "cedera", | |
| "fire", | |
| "kebakaran", | |
| ] | |
| ): | |
| score *= 0.6 | |
| excl_bonus = min(0.1, tl.count("!") * 0.03) | |
| caps_ratio = 0.0 | |
| letters = [c for c in text if c.isalpha()] | |
| if letters: | |
| caps_ratio = sum(1 for c in letters if c.isupper()) / len(letters) | |
| caps_bonus = 0.1 if caps_ratio > 0.35 else 0.0 | |
| score += excl_bonus + caps_bonus | |
| norm = min(1.0, score / 3.0) | |
| if norm >= 0.75: | |
| sentiment = "Critical Negative" | |
| elif norm >= 0.5: | |
| sentiment = "Negative" | |
| elif norm >= 0.25: | |
| sentiment = "Somewhat Negative" | |
| else: | |
| sentiment = "Neutral" | |
| results.append( | |
| { | |
| "urgency_score": round(norm, 2), | |
| "sentiment": sentiment, | |
| "keywords": sorted(list(found))[:15], | |
| } | |
| ) | |
| return results | |