""" Root Cause Classification Service for Gapura AI Auto-classifies root causes into standard categories """ import os import logging import pickle import re from typing import List, Dict, Any, Optional, Tuple from collections import Counter, defaultdict import numpy as np logger = logging.getLogger(__name__) ROOT_CAUSE_CATEGORIES = { "Equipment Failure": { "keywords": [ "equipment", "mesin", "alat", "rusak", "broken", "damage", "malfunction", "scanner", "printer", "computer", "komputer", "conveyor", "conveyor belt", "cbl", "belt", "forklift", "pallet", "scale", "timbangan", "x-ray", "scanner", "system down", "hardware", "software", "error system", "gangguan sistem", "kerusakan", "breakdown", "tidak berfungsi", "tidak bisa digunakan", "trouble", "hose", "hll", "leak", "bocor", "oil", "fuel", "solenoid", "selenoid", ], "severity_multiplier": 1.3, "description": "Issues caused by equipment malfunction or failure", }, "Staff Competency": { "keywords": [ "staff", "karyawan", "pegawai", "operator", "kurang", "tidak paham", "tidak mengerti", "salah", "mistake", "human error", "kelalaian", "incompetence", "skill", "pengalaman", "baru", "training", "pelatihan", "tidak fokus", "tidak teliti", "teledor", "lupa", "inexperienced", "inappropriate handling", "careless", "negligence", "tanpa supervisor", ], "severity_multiplier": 1.2, "description": "Issues caused by staff knowledge or skill gaps", }, "Process/Procedure": { "keywords": [ "procedure", "prosedur", "process", "proses", "sop", "standard", "tidak sesuai", "non-compliance", "pelanggaran", "violation", "bypass", "skip", "langkah", "step", "urutan", "sequence", "tidak patuh", "melanggar", "aturan", "rules", "policy", "inconsistency", "tidak konsisten", "deviasi", "deviation", "flow", "workflow", "ketidaksesuaian", ], "severity_multiplier": 1.1, "description": "Issues caused by procedure violations or process failures", }, "Communication": { "keywords": [ "communication", "komunikasi", "informasi", "koordinasi", "coordination", "miscommunication", "misunderstanding", "tidak jelas", "unclear", "konfirmasi", "confirmation", "notif", "notification", "pemberitahuan", "sosialisasi", "tidak terima", "tidak sampai", "lost in translation", "bahasa", "language", "interpretasi", "interpretation", "beda informasi", "informasi berbeda", "salah paham", "miskomunikasi", "koordinasi kurang", ], "severity_multiplier": 1.0, "description": "Issues caused by communication breakdowns", }, "External Factors": { "keywords": [ "weather", "cuaca", "rain", "hujan", "storm", "badai", "lightning", "flight delay", "delay", "airport", "bandara", "customs", "bea cukai", "airline", "maskapai", "vendor", "third party", "pihak ketiga", "schedule", "jadwal", "traffic", "macet", "force majeure", "bencana", "disaster", "pandemic", "covid", "external", "di luar kendali", "beyond control", "unforeseen", "unexpected", "konflik", "demo", "strikes", "pemogokan", ], "severity_multiplier": 0.8, "description": "Issues caused by external events or parties", }, "Documentation": { "keywords": [ "document", "dokumen", "paperwork", "awb", "air waybill", "manifest", "label", "tag", "sticker", "barcode", "missing", "hilang", "lost", "incomplete", "tidak lengkap", "wrong", "salah", "error", "typo", "incorrect", "tidak sesuai", "mismatch", "data entry", "input", "recording", "pencatatan", "reporting", "pelaporan", "faktur", "invoice", "packing list", "dokumen tidak lengkap", "admin error", ], "severity_multiplier": 1.0, "description": "Issues caused by documentation errors or missing documents", }, "Training Gap": { "keywords": [ "training", "pelatihan", "education", "edukasi", "briefing", "arahan", "new employee", "karyawan baru", "orientation", "orientasi", "tidak dilatih", "untrained", "refresher", "update", "perbaruan", "knowledge gap", "kesenjangan pengetahuan", "competency", "kompetensi", "sertifikasi", "certification", "qualification", "kualifikasi", "belum pernah", "never done", "jarak training terakhir", ], "severity_multiplier": 1.1, "description": "Issues caused by lack of training or refresher", }, "Resource/Manpower": { "keywords": [ "manpower", "manpower shortage", "kekurangan", "shortage", "understaffed", "kurang staf", "overwhelmed", "kewalahan", "peak hour", "jam sibuk", "high volume", "volume tinggi", "many flights", "banyak penerbangan", "resource", "sumber daya", "allocation", "alokasi", "overtime", "lembur", "shift", "jadwal", "schedule conflict", "konflik jadwal", "double task", "multitasking", "terlalu banyak", "overload", ], "severity_multiplier": 1.0, "description": "Issues caused by resource or manpower constraints", }, } class RootCauseService: """ Root cause classification service Uses keyword matching + TF-IDF similarity for classification """ def __init__(self): self.classifier = None self.vectorizer = None self.patterns = {} self._classification_cache = {} # In-memory cache for speed self._load_model() def _load_model(self): """Load trained classifier if available""" base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) model_dir = os.path.join(base_dir, "models", "root_cause") clf_path = os.path.join(model_dir, "classifier.pkl") vec_path = os.path.join(model_dir, "vectorizer.pkl") if os.path.exists(clf_path) and os.path.exists(vec_path): try: with open(clf_path, "rb") as f: self.classifier = pickle.load(f) with open(vec_path, "rb") as f: self.vectorizer = pickle.load(f) logger.info("✓ Root cause ML classifier loaded") except Exception as e: logger.warning(f"Failed to load root cause ML model: {e}") def _classify_ml(self, root_cause_text: str, report_text: str) -> Optional[Dict[str, Any]]: """ Classify using trained TF-IDF + classifier if available """ if self.classifier is None or self.vectorizer is None: return None combined = f"{root_cause_text} {report_text}".strip() if not combined or len(combined) < 6: return None try: X = self.vectorizer.transform([combined]) if hasattr(self.classifier, "predict_proba"): proba = self.classifier.predict_proba(X)[0] idx = int(np.argmax(proba)) label = self.classifier.classes_[idx] conf = float(proba[idx]) else: label = self.classifier.predict(X)[0] conf = 0.7 if conf < 0.4: return None return { "primary_category": str(label), "confidence": round(conf, 2), "all_scores": {str(label): round(conf, 2)}, "keywords_matched": [], "secondary_categories": [], "description": ROOT_CAUSE_CATEGORIES.get(str(label), {}).get("description", ""), "method": "tfidf-ml", } except Exception as e: logger.warning(f"ML RC classification failed: {e}") return None def _classify_keyword(self, root_cause_text: str, report_text: str) -> Dict[str, Any]: """ Classify using keyword matching (Fast fallback) """ combined_text = f"{root_cause_text} {report_text}".lower() scores = {} matched_keywords = defaultdict(list) for category, config in ROOT_CAUSE_CATEGORIES.items(): keywords = config["keywords"] score = 0 matched = [] for keyword in keywords: keyword_lower = keyword.lower() if keyword_lower in combined_text: score += 1 matched.append(keyword) if score > 0: score *= config["severity_multiplier"] scores[category] = round(score, 2) matched_keywords[category] = matched if not scores: return { "primary_category": "Unknown", "confidence": 0.0, "all_scores": {}, "keywords_matched": [], "secondary_categories": [], } sorted_scores = sorted(scores.items(), key=lambda x: -x[1]) primary_category = sorted_scores[0][0] primary_score = sorted_scores[0][1] total_score = sum(scores.values()) confidence = min(0.95, primary_score / max(total_score, 1)) secondary = [ {"category": cat, "score": score} for cat, score in sorted_scores[1:3] if score > 0 ] return { "primary_category": primary_category, "confidence": round(confidence, 2), "all_scores": scores, "keywords_matched": matched_keywords.get(primary_category, []), "secondary_categories": secondary, "description": ROOT_CAUSE_CATEGORIES.get(primary_category, {}).get( "description", "" ), "method": "keyword" } def classify( self, root_cause_text: str, report_text: str = "", context: Optional[Dict] = None, ) -> Dict[str, Any]: """ Classify a root cause text into categories using Transformer + Rule-based hybrid """ if not root_cause_text and not report_text: return { "primary_category": "Unknown", "confidence": 0.0, "all_scores": {}, "keywords_matched": [], "secondary_categories": [], } # Check Cache cache_key = f"{report_text}|{root_cause_text}" if cache_key in self._classification_cache: return self._classification_cache[cache_key].copy() result = None # 1. Try Transformer (High Accuracy) try: from data.nlp_service import get_nlp_service nlp = get_nlp_service() if nlp.models_loaded: combined = f"{report_text} [SEP] {root_cause_text}" rc_pred = nlp.classify_root_cause([combined])[0] if rc_pred["label"] != "Unknown" and rc_pred["confidence"] > 0.4: result = { "primary_category": rc_pred["label"], "confidence": rc_pred["confidence"], "all_scores": {rc_pred["label"]: rc_pred["confidence"]}, "keywords_matched": [], "secondary_categories": [], "description": ROOT_CAUSE_CATEGORIES.get(rc_pred["label"], {}).get("description", ""), "method": "transformer" } except Exception as e: logger.warning(f"Transformer RC classification failed: {e}") # 2. Try ML classifier if available if result is None: result = self._classify_ml(root_cause_text, report_text) # 3. Fallback to Keyword Matching (Legacy) if result is None: result = self._classify_keyword(root_cause_text, report_text) # Update Cache self._classification_cache[cache_key] = result.copy() return result def classify_batch( self, records: List[Dict], text_field: str = "Root_Caused" ) -> List[Dict[str, Any]]: """ Classify multiple records using optimized batch inference """ results = [] texts_for_transformer = [] valid_indices = [] for i, record in enumerate(records): root_cause = record.get(text_field, "") report = record.get("Report", "") # 0. Check Cache First (O(1) vs heavy O(Transformer)) cache_key = f"{report}|{root_cause}" if cache_key in self._classification_cache: results.append({**self._classification_cache[cache_key], "row_id": record.get("_row_id")}) continue # Prepare context for transformer if root_cause or report: combined = f"{report} [SEP] {root_cause}" texts_for_transformer.append(combined) valid_indices.append(i) # Initialize results with placeholders results.append({ "primary_category": "Unknown", "confidence": 0.0, "all_scores": {}, "keywords_matched": [], "secondary_categories": [], "row_id": record.get("_row_id") }) # 1. Batch Transformer Inference (High Efficiency) if texts_for_transformer: try: from data.nlp_service import get_nlp_service nlp = get_nlp_service() if nlp.models_loaded: mt_results = nlp.classify_root_cause(texts_for_transformer) for idx, mt_res in zip(valid_indices, mt_results): if mt_res["label"] != "Unknown" and mt_res["confidence"] > 0.4: results[idx].update({ "primary_category": mt_res["label"], "confidence": mt_res["confidence"], "all_scores": {mt_res["label"]: mt_res["confidence"]}, "description": ROOT_CAUSE_CATEGORIES.get(mt_res["label"], {}).get("description", ""), "method": "transformer" }) # Update cache record = records[idx] cache_key = f"{record.get('Report', '')}|{record.get(text_field, '')}" self._classification_cache[cache_key] = results[idx].copy() del self._classification_cache[cache_key]["row_id"] # Don't cache row_id except Exception as e: logger.warning(f"Batch transformer classification failed, falling back: {e}") # 2. ML classification for remaining Unknowns if self.classifier is not None and self.vectorizer is not None: unknown_indices = [i for i, r in enumerate(results) if r["primary_category"] == "Unknown"] if unknown_indices: texts = [] for i in unknown_indices: rc = records[i].get(text_field, "") rep = records[i].get("Report", "") texts.append(f"{rc} {rep}".strip()) try: X = self.vectorizer.transform(texts) if hasattr(self.classifier, "predict_proba"): proba = self.classifier.predict_proba(X) preds = np.argmax(proba, axis=1) labels = self.classifier.classes_[preds] confs = proba[np.arange(len(preds)), preds] else: labels = self.classifier.predict(X) confs = np.full(len(labels), 0.7) for j, i in enumerate(unknown_indices): label = str(labels[j]) conf = float(confs[j]) if conf >= 0.4 and label: results[i].update({ "primary_category": label, "confidence": round(conf, 2), "all_scores": {label: round(conf, 2)}, "description": ROOT_CAUSE_CATEGORIES.get(label, {}).get("description", ""), "method": "tfidf-ml" }) except Exception as e: logger.warning(f"Batch ML classification failed: {e}") # 3. Fallback to Keyword Matching for any remaining Unknowns for i, record in enumerate(records): if results[i]["primary_category"] == "Unknown": root_cause = record.get(text_field, "") report = record.get("Report", "") if root_cause or report: res = self._classify_keyword(root_cause, report) if res["primary_category"] != "Unknown": row_id = results[i]["row_id"] results[i] = res results[i]["row_id"] = row_id return results def train_from_data(self, records: List[Dict]) -> Dict[str, Any]: """ Train/improve classifier from labeled data using batch processing """ from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split from sklearn.utils.class_weight import compute_class_weight from sklearn.metrics import classification_report logger.info(f"Training root cause classifier from {len(records)} records using batch labeling...") # Bulk classify to get labels classifications = self.classify_batch(records) texts = [] labels = [] for record, classification in zip(records, classifications): root_cause = record.get("Root_Caused", "") report = record.get("Report", "") combined = f"{root_cause} {report}".strip() if not combined or len(combined) < 10: continue category = classification["primary_category"] if category != "Unknown": texts.append(combined) labels.append(category) if len(set(labels)) < 2: logger.warning("Not enough variety in labels for training") return { "status": "insufficient_data", "records_processed": len(records), "usable_records": len(texts), } X_train, X_test, y_train, y_test = train_test_split( texts, labels, test_size=0.2, random_state=42 ) vectorizer = TfidfVectorizer( max_features=20000, ngram_range=(1, 2), sublinear_tf=True, lowercase=True, min_df=2 ) X_train_vec = vectorizer.fit_transform(X_train) X_test_vec = vectorizer.transform(X_test) classes = np.unique(y_train) try: class_weights = compute_class_weight(class_weight="balanced", classes=classes, y=y_train) cw = {c: w for c, w in zip(classes, class_weights)} except Exception: cw = None classifier = LogisticRegression(max_iter=2000) if cw: try: classifier.set_params(class_weight=cw) except Exception: pass classifier.fit(X_train_vec, y_train) accuracy = classifier.score(X_test_vec, y_test) logger.info(f"Root cause classifier trained with accuracy: {accuracy:.2%}") try: y_pred = classifier.predict(X_test_vec) logger.info("Root cause classification report:\n" + classification_report(y_test, y_pred)) except Exception: pass base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) model_dir = os.path.join(base_dir, "models", "root_cause") os.makedirs(model_dir, exist_ok=True) with open(os.path.join(model_dir, "classifier.pkl"), "wb") as f: pickle.dump(classifier, f) with open(os.path.join(model_dir, "vectorizer.pkl"), "wb") as f: pickle.dump(vectorizer, f) self.classifier = classifier self.vectorizer = vectorizer self._classification_cache = {} # Clear cache as models changed category_dist = Counter(labels) return { "status": "success", "records_processed": len(records), "usable_records": len(texts), "accuracy": round(float(accuracy), 3), "category_distribution": dict(category_dist), } def get_statistics(self, records: List[Dict]) -> Dict[str, Any]: """ Get root cause statistics from records Args: records: List of records to analyze Returns: Dict with statistics by category """ stats = defaultdict( lambda: { "count": 0, "categories": Counter(), "areas": Counter(), "airlines": Counter(), } ) total_classified = 0 total_unknown = 0 # Use batch classification for performance (O(N/batch_size) instead of O(N)) classifications = self.classify_batch(records) for record, classification in zip(records, classifications): category = classification["primary_category"] if category != "Unknown": total_classified += 1 stats[category]["count"] += 1 stats[category]["categories"][ record.get("Irregularity_Complain_Category", "Unknown") ] += 1 stats[category]["areas"][record.get("Area", "Unknown")] += 1 stats[category]["airlines"][record.get("Airlines", "Unknown")] += 1 else: total_unknown += 1 result = {} for category, data in stats.items(): result[category] = { "count": data["count"], "percentage": round(data["count"] / max(total_classified, 1) * 100, 1), "top_issue_categories": dict(data["categories"].most_common(3)), "top_areas": dict(data["areas"].most_common(3)), "top_airlines": dict(data["airlines"].most_common(3)), "description": ROOT_CAUSE_CATEGORIES.get(category, {}).get( "description", "" ), } return { "total_records": len(records), "classified": total_classified, "unknown": total_unknown, "classification_rate": round( total_classified / max(len(records), 1) * 100, 1 ), "by_category": result, "top_categories": sorted(result.items(), key=lambda x: -x[1]["count"])[:5], } def get_categories(self) -> Dict[str, Dict]: """Get all available root cause categories""" return { cat: { "name": cat, "description": config["description"], "keyword_count": len(config["keywords"]), "severity_multiplier": config["severity_multiplier"], } for cat, config in ROOT_CAUSE_CATEGORIES.items() } _root_cause_service: Optional[RootCauseService] = None def get_root_cause_service() -> RootCauseService: """Get singleton instance""" global _root_cause_service if _root_cause_service is None: _root_cause_service = RootCauseService() return _root_cause_service