Spaces:
Running
Running
| """ | |
| Root Cause Classification Service for Gapura AI | |
| Auto-classifies root causes into standard categories | |
| """ | |
| import os | |
| import logging | |
| import pickle | |
| import re | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from collections import Counter, defaultdict | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| ROOT_CAUSE_CATEGORIES = { | |
| "Equipment Failure": { | |
| "keywords": [ | |
| "equipment", | |
| "mesin", | |
| "alat", | |
| "rusak", | |
| "broken", | |
| "damage", | |
| "malfunction", | |
| "scanner", | |
| "printer", | |
| "computer", | |
| "komputer", | |
| "conveyor", | |
| "conveyor belt", | |
| "cbl", | |
| "belt", | |
| "forklift", | |
| "pallet", | |
| "scale", | |
| "timbangan", | |
| "x-ray", | |
| "scanner", | |
| "system down", | |
| "hardware", | |
| "software", | |
| "error system", | |
| "gangguan sistem", | |
| "kerusakan", | |
| "breakdown", | |
| "tidak berfungsi", | |
| "tidak bisa digunakan", | |
| "trouble", | |
| "hose", | |
| "hll", | |
| "leak", | |
| "bocor", | |
| "oil", | |
| "fuel", | |
| "solenoid", | |
| "selenoid", | |
| ], | |
| "severity_multiplier": 1.3, | |
| "description": "Issues caused by equipment malfunction or failure", | |
| }, | |
| "Staff Competency": { | |
| "keywords": [ | |
| "staff", | |
| "karyawan", | |
| "pegawai", | |
| "operator", | |
| "kurang", | |
| "tidak paham", | |
| "tidak mengerti", | |
| "salah", | |
| "mistake", | |
| "human error", | |
| "kelalaian", | |
| "incompetence", | |
| "skill", | |
| "pengalaman", | |
| "baru", | |
| "training", | |
| "pelatihan", | |
| "tidak fokus", | |
| "tidak teliti", | |
| "teledor", | |
| "lupa", | |
| "inexperienced", | |
| "inappropriate handling", | |
| "careless", | |
| "negligence", | |
| "tanpa supervisor", | |
| ], | |
| "severity_multiplier": 1.2, | |
| "description": "Issues caused by staff knowledge or skill gaps", | |
| }, | |
| "Process/Procedure": { | |
| "keywords": [ | |
| "procedure", | |
| "prosedur", | |
| "process", | |
| "proses", | |
| "sop", | |
| "standard", | |
| "tidak sesuai", | |
| "non-compliance", | |
| "pelanggaran", | |
| "violation", | |
| "bypass", | |
| "skip", | |
| "langkah", | |
| "step", | |
| "urutan", | |
| "sequence", | |
| "tidak patuh", | |
| "melanggar", | |
| "aturan", | |
| "rules", | |
| "policy", | |
| "inconsistency", | |
| "tidak konsisten", | |
| "deviasi", | |
| "deviation", | |
| "flow", | |
| "workflow", | |
| "ketidaksesuaian", | |
| ], | |
| "severity_multiplier": 1.1, | |
| "description": "Issues caused by procedure violations or process failures", | |
| }, | |
| "Communication": { | |
| "keywords": [ | |
| "communication", | |
| "komunikasi", | |
| "informasi", | |
| "koordinasi", | |
| "coordination", | |
| "miscommunication", | |
| "misunderstanding", | |
| "tidak jelas", | |
| "unclear", | |
| "konfirmasi", | |
| "confirmation", | |
| "notif", | |
| "notification", | |
| "pemberitahuan", | |
| "sosialisasi", | |
| "tidak terima", | |
| "tidak sampai", | |
| "lost in translation", | |
| "bahasa", | |
| "language", | |
| "interpretasi", | |
| "interpretation", | |
| "beda informasi", | |
| "informasi berbeda", | |
| "salah paham", | |
| "miskomunikasi", | |
| "koordinasi kurang", | |
| ], | |
| "severity_multiplier": 1.0, | |
| "description": "Issues caused by communication breakdowns", | |
| }, | |
| "External Factors": { | |
| "keywords": [ | |
| "weather", | |
| "cuaca", | |
| "rain", | |
| "hujan", | |
| "storm", | |
| "badai", | |
| "lightning", | |
| "flight delay", | |
| "delay", | |
| "airport", | |
| "bandara", | |
| "customs", | |
| "bea cukai", | |
| "airline", | |
| "maskapai", | |
| "vendor", | |
| "third party", | |
| "pihak ketiga", | |
| "schedule", | |
| "jadwal", | |
| "traffic", | |
| "macet", | |
| "force majeure", | |
| "bencana", | |
| "disaster", | |
| "pandemic", | |
| "covid", | |
| "external", | |
| "di luar kendali", | |
| "beyond control", | |
| "unforeseen", | |
| "unexpected", | |
| "konflik", | |
| "demo", | |
| "strikes", | |
| "pemogokan", | |
| ], | |
| "severity_multiplier": 0.8, | |
| "description": "Issues caused by external events or parties", | |
| }, | |
| "Documentation": { | |
| "keywords": [ | |
| "document", | |
| "dokumen", | |
| "paperwork", | |
| "awb", | |
| "air waybill", | |
| "manifest", | |
| "label", | |
| "tag", | |
| "sticker", | |
| "barcode", | |
| "missing", | |
| "hilang", | |
| "lost", | |
| "incomplete", | |
| "tidak lengkap", | |
| "wrong", | |
| "salah", | |
| "error", | |
| "typo", | |
| "incorrect", | |
| "tidak sesuai", | |
| "mismatch", | |
| "data entry", | |
| "input", | |
| "recording", | |
| "pencatatan", | |
| "reporting", | |
| "pelaporan", | |
| "faktur", | |
| "invoice", | |
| "packing list", | |
| "dokumen tidak lengkap", | |
| "admin error", | |
| ], | |
| "severity_multiplier": 1.0, | |
| "description": "Issues caused by documentation errors or missing documents", | |
| }, | |
| "Training Gap": { | |
| "keywords": [ | |
| "training", | |
| "pelatihan", | |
| "education", | |
| "edukasi", | |
| "briefing", | |
| "arahan", | |
| "new employee", | |
| "karyawan baru", | |
| "orientation", | |
| "orientasi", | |
| "tidak dilatih", | |
| "untrained", | |
| "refresher", | |
| "update", | |
| "perbaruan", | |
| "knowledge gap", | |
| "kesenjangan pengetahuan", | |
| "competency", | |
| "kompetensi", | |
| "sertifikasi", | |
| "certification", | |
| "qualification", | |
| "kualifikasi", | |
| "belum pernah", | |
| "never done", | |
| "jarak training terakhir", | |
| ], | |
| "severity_multiplier": 1.1, | |
| "description": "Issues caused by lack of training or refresher", | |
| }, | |
| "Resource/Manpower": { | |
| "keywords": [ | |
| "manpower", | |
| "manpower shortage", | |
| "kekurangan", | |
| "shortage", | |
| "understaffed", | |
| "kurang staf", | |
| "overwhelmed", | |
| "kewalahan", | |
| "peak hour", | |
| "jam sibuk", | |
| "high volume", | |
| "volume tinggi", | |
| "many flights", | |
| "banyak penerbangan", | |
| "resource", | |
| "sumber daya", | |
| "allocation", | |
| "alokasi", | |
| "overtime", | |
| "lembur", | |
| "shift", | |
| "jadwal", | |
| "schedule conflict", | |
| "konflik jadwal", | |
| "double task", | |
| "multitasking", | |
| "terlalu banyak", | |
| "overload", | |
| ], | |
| "severity_multiplier": 1.0, | |
| "description": "Issues caused by resource or manpower constraints", | |
| }, | |
| } | |
| class RootCauseService: | |
| """ | |
| Root cause classification service | |
| Uses keyword matching + TF-IDF similarity for classification | |
| """ | |
| def __init__(self): | |
| self.classifier = None | |
| self.vectorizer = None | |
| self.patterns = {} | |
| self._classification_cache = {} # In-memory cache for speed | |
| self._load_model() | |
| def _load_model(self): | |
| """Load trained classifier if available""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| model_dir = os.path.join(base_dir, "models", "root_cause") | |
| clf_path = os.path.join(model_dir, "classifier.pkl") | |
| vec_path = os.path.join(model_dir, "vectorizer.pkl") | |
| if os.path.exists(clf_path) and os.path.exists(vec_path): | |
| try: | |
| with open(clf_path, "rb") as f: | |
| self.classifier = pickle.load(f) | |
| with open(vec_path, "rb") as f: | |
| self.vectorizer = pickle.load(f) | |
| logger.info("✓ Root cause ML classifier loaded") | |
| except Exception as e: | |
| logger.warning(f"Failed to load root cause ML model: {e}") | |
| def _classify_ml(self, root_cause_text: str, report_text: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Classify using trained TF-IDF + classifier if available | |
| """ | |
| if self.classifier is None or self.vectorizer is None: | |
| return None | |
| combined = f"{root_cause_text} {report_text}".strip() | |
| if not combined or len(combined) < 6: | |
| return None | |
| try: | |
| X = self.vectorizer.transform([combined]) | |
| if hasattr(self.classifier, "predict_proba"): | |
| proba = self.classifier.predict_proba(X)[0] | |
| idx = int(np.argmax(proba)) | |
| label = self.classifier.classes_[idx] | |
| conf = float(proba[idx]) | |
| else: | |
| label = self.classifier.predict(X)[0] | |
| conf = 0.7 | |
| if conf < 0.4: | |
| return None | |
| return { | |
| "primary_category": str(label), | |
| "confidence": round(conf, 2), | |
| "all_scores": {str(label): round(conf, 2)}, | |
| "keywords_matched": [], | |
| "secondary_categories": [], | |
| "description": ROOT_CAUSE_CATEGORIES.get(str(label), {}).get("description", ""), | |
| "method": "tfidf-ml", | |
| } | |
| except Exception as e: | |
| logger.warning(f"ML RC classification failed: {e}") | |
| return None | |
| def _classify_keyword(self, root_cause_text: str, report_text: str) -> Dict[str, Any]: | |
| """ | |
| Classify using keyword matching (Fast fallback) | |
| """ | |
| combined_text = f"{root_cause_text} {report_text}".lower() | |
| scores = {} | |
| matched_keywords = defaultdict(list) | |
| for category, config in ROOT_CAUSE_CATEGORIES.items(): | |
| keywords = config["keywords"] | |
| score = 0 | |
| matched = [] | |
| for keyword in keywords: | |
| keyword_lower = keyword.lower() | |
| if keyword_lower in combined_text: | |
| score += 1 | |
| matched.append(keyword) | |
| if score > 0: | |
| score *= config["severity_multiplier"] | |
| scores[category] = round(score, 2) | |
| matched_keywords[category] = matched | |
| if not scores: | |
| return { | |
| "primary_category": "Unknown", | |
| "confidence": 0.0, | |
| "all_scores": {}, | |
| "keywords_matched": [], | |
| "secondary_categories": [], | |
| } | |
| sorted_scores = sorted(scores.items(), key=lambda x: -x[1]) | |
| primary_category = sorted_scores[0][0] | |
| primary_score = sorted_scores[0][1] | |
| total_score = sum(scores.values()) | |
| confidence = min(0.95, primary_score / max(total_score, 1)) | |
| secondary = [ | |
| {"category": cat, "score": score} | |
| for cat, score in sorted_scores[1:3] | |
| if score > 0 | |
| ] | |
| return { | |
| "primary_category": primary_category, | |
| "confidence": round(confidence, 2), | |
| "all_scores": scores, | |
| "keywords_matched": matched_keywords.get(primary_category, []), | |
| "secondary_categories": secondary, | |
| "description": ROOT_CAUSE_CATEGORIES.get(primary_category, {}).get( | |
| "description", "" | |
| ), | |
| "method": "keyword" | |
| } | |
| def classify( | |
| self, | |
| root_cause_text: str, | |
| report_text: str = "", | |
| context: Optional[Dict] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Classify a root cause text into categories using Transformer + Rule-based hybrid | |
| """ | |
| if not root_cause_text and not report_text: | |
| return { | |
| "primary_category": "Unknown", | |
| "confidence": 0.0, | |
| "all_scores": {}, | |
| "keywords_matched": [], | |
| "secondary_categories": [], | |
| } | |
| # Check Cache | |
| cache_key = f"{report_text}|{root_cause_text}" | |
| if cache_key in self._classification_cache: | |
| return self._classification_cache[cache_key].copy() | |
| result = None | |
| # 1. Try Transformer (High Accuracy) | |
| try: | |
| from data.nlp_service import get_nlp_service | |
| nlp = get_nlp_service() | |
| if nlp.models_loaded: | |
| combined = f"{report_text} [SEP] {root_cause_text}" | |
| rc_pred = nlp.classify_root_cause([combined])[0] | |
| if rc_pred["label"] != "Unknown" and rc_pred["confidence"] > 0.4: | |
| result = { | |
| "primary_category": rc_pred["label"], | |
| "confidence": rc_pred["confidence"], | |
| "all_scores": {rc_pred["label"]: rc_pred["confidence"]}, | |
| "keywords_matched": [], | |
| "secondary_categories": [], | |
| "description": ROOT_CAUSE_CATEGORIES.get(rc_pred["label"], {}).get("description", ""), | |
| "method": "transformer" | |
| } | |
| except Exception as e: | |
| logger.warning(f"Transformer RC classification failed: {e}") | |
| # 2. Try ML classifier if available | |
| if result is None: | |
| result = self._classify_ml(root_cause_text, report_text) | |
| # 3. Fallback to Keyword Matching (Legacy) | |
| if result is None: | |
| result = self._classify_keyword(root_cause_text, report_text) | |
| # Update Cache | |
| self._classification_cache[cache_key] = result.copy() | |
| return result | |
| def classify_batch( | |
| self, records: List[Dict], text_field: str = "Root_Caused" | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Classify multiple records using optimized batch inference | |
| """ | |
| results = [] | |
| texts_for_transformer = [] | |
| valid_indices = [] | |
| for i, record in enumerate(records): | |
| root_cause = record.get(text_field, "") | |
| report = record.get("Report", "") | |
| # 0. Check Cache First (O(1) vs heavy O(Transformer)) | |
| cache_key = f"{report}|{root_cause}" | |
| if cache_key in self._classification_cache: | |
| results.append({**self._classification_cache[cache_key], "row_id": record.get("_row_id")}) | |
| continue | |
| # Prepare context for transformer | |
| if root_cause or report: | |
| combined = f"{report} [SEP] {root_cause}" | |
| texts_for_transformer.append(combined) | |
| valid_indices.append(i) | |
| # Initialize results with placeholders | |
| results.append({ | |
| "primary_category": "Unknown", | |
| "confidence": 0.0, | |
| "all_scores": {}, | |
| "keywords_matched": [], | |
| "secondary_categories": [], | |
| "row_id": record.get("_row_id") | |
| }) | |
| # 1. Batch Transformer Inference (High Efficiency) | |
| if texts_for_transformer: | |
| try: | |
| from data.nlp_service import get_nlp_service | |
| nlp = get_nlp_service() | |
| if nlp.models_loaded: | |
| mt_results = nlp.classify_root_cause(texts_for_transformer) | |
| for idx, mt_res in zip(valid_indices, mt_results): | |
| if mt_res["label"] != "Unknown" and mt_res["confidence"] > 0.4: | |
| results[idx].update({ | |
| "primary_category": mt_res["label"], | |
| "confidence": mt_res["confidence"], | |
| "all_scores": {mt_res["label"]: mt_res["confidence"]}, | |
| "description": ROOT_CAUSE_CATEGORIES.get(mt_res["label"], {}).get("description", ""), | |
| "method": "transformer" | |
| }) | |
| # Update cache | |
| record = records[idx] | |
| cache_key = f"{record.get('Report', '')}|{record.get(text_field, '')}" | |
| self._classification_cache[cache_key] = results[idx].copy() | |
| del self._classification_cache[cache_key]["row_id"] # Don't cache row_id | |
| except Exception as e: | |
| logger.warning(f"Batch transformer classification failed, falling back: {e}") | |
| # 2. ML classification for remaining Unknowns | |
| if self.classifier is not None and self.vectorizer is not None: | |
| unknown_indices = [i for i, r in enumerate(results) if r["primary_category"] == "Unknown"] | |
| if unknown_indices: | |
| texts = [] | |
| for i in unknown_indices: | |
| rc = records[i].get(text_field, "") | |
| rep = records[i].get("Report", "") | |
| texts.append(f"{rc} {rep}".strip()) | |
| try: | |
| X = self.vectorizer.transform(texts) | |
| if hasattr(self.classifier, "predict_proba"): | |
| proba = self.classifier.predict_proba(X) | |
| preds = np.argmax(proba, axis=1) | |
| labels = self.classifier.classes_[preds] | |
| confs = proba[np.arange(len(preds)), preds] | |
| else: | |
| labels = self.classifier.predict(X) | |
| confs = np.full(len(labels), 0.7) | |
| for j, i in enumerate(unknown_indices): | |
| label = str(labels[j]) | |
| conf = float(confs[j]) | |
| if conf >= 0.4 and label: | |
| results[i].update({ | |
| "primary_category": label, | |
| "confidence": round(conf, 2), | |
| "all_scores": {label: round(conf, 2)}, | |
| "description": ROOT_CAUSE_CATEGORIES.get(label, {}).get("description", ""), | |
| "method": "tfidf-ml" | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Batch ML classification failed: {e}") | |
| # 3. Fallback to Keyword Matching for any remaining Unknowns | |
| for i, record in enumerate(records): | |
| if results[i]["primary_category"] == "Unknown": | |
| root_cause = record.get(text_field, "") | |
| report = record.get("Report", "") | |
| if root_cause or report: | |
| res = self._classify_keyword(root_cause, report) | |
| if res["primary_category"] != "Unknown": | |
| row_id = results[i]["row_id"] | |
| results[i] = res | |
| results[i]["row_id"] = row_id | |
| return results | |
| def train_from_data(self, records: List[Dict]) -> Dict[str, Any]: | |
| """ | |
| Train/improve classifier from labeled data using batch processing | |
| """ | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.utils.class_weight import compute_class_weight | |
| from sklearn.metrics import classification_report | |
| logger.info(f"Training root cause classifier from {len(records)} records using batch labeling...") | |
| # Bulk classify to get labels | |
| classifications = self.classify_batch(records) | |
| texts = [] | |
| labels = [] | |
| for record, classification in zip(records, classifications): | |
| root_cause = record.get("Root_Caused", "") | |
| report = record.get("Report", "") | |
| combined = f"{root_cause} {report}".strip() | |
| if not combined or len(combined) < 10: | |
| continue | |
| category = classification["primary_category"] | |
| if category != "Unknown": | |
| texts.append(combined) | |
| labels.append(category) | |
| if len(set(labels)) < 2: | |
| logger.warning("Not enough variety in labels for training") | |
| return { | |
| "status": "insufficient_data", | |
| "records_processed": len(records), | |
| "usable_records": len(texts), | |
| } | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| texts, labels, test_size=0.2, random_state=42 | |
| ) | |
| vectorizer = TfidfVectorizer( | |
| max_features=20000, ngram_range=(1, 2), sublinear_tf=True, lowercase=True, min_df=2 | |
| ) | |
| X_train_vec = vectorizer.fit_transform(X_train) | |
| X_test_vec = vectorizer.transform(X_test) | |
| classes = np.unique(y_train) | |
| try: | |
| class_weights = compute_class_weight(class_weight="balanced", classes=classes, y=y_train) | |
| cw = {c: w for c, w in zip(classes, class_weights)} | |
| except Exception: | |
| cw = None | |
| classifier = LogisticRegression(max_iter=2000) | |
| if cw: | |
| try: | |
| classifier.set_params(class_weight=cw) | |
| except Exception: | |
| pass | |
| classifier.fit(X_train_vec, y_train) | |
| accuracy = classifier.score(X_test_vec, y_test) | |
| logger.info(f"Root cause classifier trained with accuracy: {accuracy:.2%}") | |
| try: | |
| y_pred = classifier.predict(X_test_vec) | |
| logger.info("Root cause classification report:\n" + classification_report(y_test, y_pred)) | |
| except Exception: | |
| pass | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| model_dir = os.path.join(base_dir, "models", "root_cause") | |
| os.makedirs(model_dir, exist_ok=True) | |
| with open(os.path.join(model_dir, "classifier.pkl"), "wb") as f: | |
| pickle.dump(classifier, f) | |
| with open(os.path.join(model_dir, "vectorizer.pkl"), "wb") as f: | |
| pickle.dump(vectorizer, f) | |
| self.classifier = classifier | |
| self.vectorizer = vectorizer | |
| self._classification_cache = {} # Clear cache as models changed | |
| category_dist = Counter(labels) | |
| return { | |
| "status": "success", | |
| "records_processed": len(records), | |
| "usable_records": len(texts), | |
| "accuracy": round(float(accuracy), 3), | |
| "category_distribution": dict(category_dist), | |
| } | |
| def get_statistics(self, records: List[Dict]) -> Dict[str, Any]: | |
| """ | |
| Get root cause statistics from records | |
| Args: | |
| records: List of records to analyze | |
| Returns: | |
| Dict with statistics by category | |
| """ | |
| stats = defaultdict( | |
| lambda: { | |
| "count": 0, | |
| "categories": Counter(), | |
| "areas": Counter(), | |
| "airlines": Counter(), | |
| } | |
| ) | |
| total_classified = 0 | |
| total_unknown = 0 | |
| # Use batch classification for performance (O(N/batch_size) instead of O(N)) | |
| classifications = self.classify_batch(records) | |
| for record, classification in zip(records, classifications): | |
| category = classification["primary_category"] | |
| if category != "Unknown": | |
| total_classified += 1 | |
| stats[category]["count"] += 1 | |
| stats[category]["categories"][ | |
| record.get("Irregularity_Complain_Category", "Unknown") | |
| ] += 1 | |
| stats[category]["areas"][record.get("Area", "Unknown")] += 1 | |
| stats[category]["airlines"][record.get("Airlines", "Unknown")] += 1 | |
| else: | |
| total_unknown += 1 | |
| result = {} | |
| for category, data in stats.items(): | |
| result[category] = { | |
| "count": data["count"], | |
| "percentage": round(data["count"] / max(total_classified, 1) * 100, 1), | |
| "top_issue_categories": dict(data["categories"].most_common(3)), | |
| "top_areas": dict(data["areas"].most_common(3)), | |
| "top_airlines": dict(data["airlines"].most_common(3)), | |
| "description": ROOT_CAUSE_CATEGORIES.get(category, {}).get( | |
| "description", "" | |
| ), | |
| } | |
| return { | |
| "total_records": len(records), | |
| "classified": total_classified, | |
| "unknown": total_unknown, | |
| "classification_rate": round( | |
| total_classified / max(len(records), 1) * 100, 1 | |
| ), | |
| "by_category": result, | |
| "top_categories": sorted(result.items(), key=lambda x: -x[1]["count"])[:5], | |
| } | |
| def get_categories(self) -> Dict[str, Dict]: | |
| """Get all available root cause categories""" | |
| return { | |
| cat: { | |
| "name": cat, | |
| "description": config["description"], | |
| "keyword_count": len(config["keywords"]), | |
| "severity_multiplier": config["severity_multiplier"], | |
| } | |
| for cat, config in ROOT_CAUSE_CATEGORIES.items() | |
| } | |
| _root_cause_service: Optional[RootCauseService] = None | |
| def get_root_cause_service() -> RootCauseService: | |
| """Get singleton instance""" | |
| global _root_cause_service | |
| if _root_cause_service is None: | |
| _root_cause_service = RootCauseService() | |
| return _root_cause_service | |