gapura-oneclick / data /root_cause_service.py
Muhammad Ridzki Nugraha
Deploy API and config (Batch 3)
07476a1 verified
"""
Root Cause Classification Service for Gapura AI
Auto-classifies root causes into standard categories
"""
import os
import logging
import pickle
import re
from typing import List, Dict, Any, Optional, Tuple
from collections import Counter, defaultdict
import numpy as np
logger = logging.getLogger(__name__)
ROOT_CAUSE_CATEGORIES = {
"Equipment Failure": {
"keywords": [
"equipment",
"mesin",
"alat",
"rusak",
"broken",
"damage",
"malfunction",
"scanner",
"printer",
"computer",
"komputer",
"conveyor",
"conveyor belt",
"cbl",
"belt",
"forklift",
"pallet",
"scale",
"timbangan",
"x-ray",
"scanner",
"system down",
"hardware",
"software",
"error system",
"gangguan sistem",
"kerusakan",
"breakdown",
"tidak berfungsi",
"tidak bisa digunakan",
"trouble",
"hose",
"hll",
"leak",
"bocor",
"oil",
"fuel",
"solenoid",
"selenoid",
],
"severity_multiplier": 1.3,
"description": "Issues caused by equipment malfunction or failure",
},
"Staff Competency": {
"keywords": [
"staff",
"karyawan",
"pegawai",
"operator",
"kurang",
"tidak paham",
"tidak mengerti",
"salah",
"mistake",
"human error",
"kelalaian",
"incompetence",
"skill",
"pengalaman",
"baru",
"training",
"pelatihan",
"tidak fokus",
"tidak teliti",
"teledor",
"lupa",
"inexperienced",
"inappropriate handling",
"careless",
"negligence",
"tanpa supervisor",
],
"severity_multiplier": 1.2,
"description": "Issues caused by staff knowledge or skill gaps",
},
"Process/Procedure": {
"keywords": [
"procedure",
"prosedur",
"process",
"proses",
"sop",
"standard",
"tidak sesuai",
"non-compliance",
"pelanggaran",
"violation",
"bypass",
"skip",
"langkah",
"step",
"urutan",
"sequence",
"tidak patuh",
"melanggar",
"aturan",
"rules",
"policy",
"inconsistency",
"tidak konsisten",
"deviasi",
"deviation",
"flow",
"workflow",
"ketidaksesuaian",
],
"severity_multiplier": 1.1,
"description": "Issues caused by procedure violations or process failures",
},
"Communication": {
"keywords": [
"communication",
"komunikasi",
"informasi",
"koordinasi",
"coordination",
"miscommunication",
"misunderstanding",
"tidak jelas",
"unclear",
"konfirmasi",
"confirmation",
"notif",
"notification",
"pemberitahuan",
"sosialisasi",
"tidak terima",
"tidak sampai",
"lost in translation",
"bahasa",
"language",
"interpretasi",
"interpretation",
"beda informasi",
"informasi berbeda",
"salah paham",
"miskomunikasi",
"koordinasi kurang",
],
"severity_multiplier": 1.0,
"description": "Issues caused by communication breakdowns",
},
"External Factors": {
"keywords": [
"weather",
"cuaca",
"rain",
"hujan",
"storm",
"badai",
"lightning",
"flight delay",
"delay",
"airport",
"bandara",
"customs",
"bea cukai",
"airline",
"maskapai",
"vendor",
"third party",
"pihak ketiga",
"schedule",
"jadwal",
"traffic",
"macet",
"force majeure",
"bencana",
"disaster",
"pandemic",
"covid",
"external",
"di luar kendali",
"beyond control",
"unforeseen",
"unexpected",
"konflik",
"demo",
"strikes",
"pemogokan",
],
"severity_multiplier": 0.8,
"description": "Issues caused by external events or parties",
},
"Documentation": {
"keywords": [
"document",
"dokumen",
"paperwork",
"awb",
"air waybill",
"manifest",
"label",
"tag",
"sticker",
"barcode",
"missing",
"hilang",
"lost",
"incomplete",
"tidak lengkap",
"wrong",
"salah",
"error",
"typo",
"incorrect",
"tidak sesuai",
"mismatch",
"data entry",
"input",
"recording",
"pencatatan",
"reporting",
"pelaporan",
"faktur",
"invoice",
"packing list",
"dokumen tidak lengkap",
"admin error",
],
"severity_multiplier": 1.0,
"description": "Issues caused by documentation errors or missing documents",
},
"Training Gap": {
"keywords": [
"training",
"pelatihan",
"education",
"edukasi",
"briefing",
"arahan",
"new employee",
"karyawan baru",
"orientation",
"orientasi",
"tidak dilatih",
"untrained",
"refresher",
"update",
"perbaruan",
"knowledge gap",
"kesenjangan pengetahuan",
"competency",
"kompetensi",
"sertifikasi",
"certification",
"qualification",
"kualifikasi",
"belum pernah",
"never done",
"jarak training terakhir",
],
"severity_multiplier": 1.1,
"description": "Issues caused by lack of training or refresher",
},
"Resource/Manpower": {
"keywords": [
"manpower",
"manpower shortage",
"kekurangan",
"shortage",
"understaffed",
"kurang staf",
"overwhelmed",
"kewalahan",
"peak hour",
"jam sibuk",
"high volume",
"volume tinggi",
"many flights",
"banyak penerbangan",
"resource",
"sumber daya",
"allocation",
"alokasi",
"overtime",
"lembur",
"shift",
"jadwal",
"schedule conflict",
"konflik jadwal",
"double task",
"multitasking",
"terlalu banyak",
"overload",
],
"severity_multiplier": 1.0,
"description": "Issues caused by resource or manpower constraints",
},
}
class RootCauseService:
"""
Root cause classification service
Uses keyword matching + TF-IDF similarity for classification
"""
def __init__(self):
self.classifier = None
self.vectorizer = None
self.patterns = {}
self._classification_cache = {} # In-memory cache for speed
self._load_model()
def _load_model(self):
"""Load trained classifier if available"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
model_dir = os.path.join(base_dir, "models", "root_cause")
clf_path = os.path.join(model_dir, "classifier.pkl")
vec_path = os.path.join(model_dir, "vectorizer.pkl")
if os.path.exists(clf_path) and os.path.exists(vec_path):
try:
with open(clf_path, "rb") as f:
self.classifier = pickle.load(f)
with open(vec_path, "rb") as f:
self.vectorizer = pickle.load(f)
logger.info("✓ Root cause ML classifier loaded")
except Exception as e:
logger.warning(f"Failed to load root cause ML model: {e}")
def _classify_ml(self, root_cause_text: str, report_text: str) -> Optional[Dict[str, Any]]:
"""
Classify using trained TF-IDF + classifier if available
"""
if self.classifier is None or self.vectorizer is None:
return None
combined = f"{root_cause_text} {report_text}".strip()
if not combined or len(combined) < 6:
return None
try:
X = self.vectorizer.transform([combined])
if hasattr(self.classifier, "predict_proba"):
proba = self.classifier.predict_proba(X)[0]
idx = int(np.argmax(proba))
label = self.classifier.classes_[idx]
conf = float(proba[idx])
else:
label = self.classifier.predict(X)[0]
conf = 0.7
if conf < 0.4:
return None
return {
"primary_category": str(label),
"confidence": round(conf, 2),
"all_scores": {str(label): round(conf, 2)},
"keywords_matched": [],
"secondary_categories": [],
"description": ROOT_CAUSE_CATEGORIES.get(str(label), {}).get("description", ""),
"method": "tfidf-ml",
}
except Exception as e:
logger.warning(f"ML RC classification failed: {e}")
return None
def _classify_keyword(self, root_cause_text: str, report_text: str) -> Dict[str, Any]:
"""
Classify using keyword matching (Fast fallback)
"""
combined_text = f"{root_cause_text} {report_text}".lower()
scores = {}
matched_keywords = defaultdict(list)
for category, config in ROOT_CAUSE_CATEGORIES.items():
keywords = config["keywords"]
score = 0
matched = []
for keyword in keywords:
keyword_lower = keyword.lower()
if keyword_lower in combined_text:
score += 1
matched.append(keyword)
if score > 0:
score *= config["severity_multiplier"]
scores[category] = round(score, 2)
matched_keywords[category] = matched
if not scores:
return {
"primary_category": "Unknown",
"confidence": 0.0,
"all_scores": {},
"keywords_matched": [],
"secondary_categories": [],
}
sorted_scores = sorted(scores.items(), key=lambda x: -x[1])
primary_category = sorted_scores[0][0]
primary_score = sorted_scores[0][1]
total_score = sum(scores.values())
confidence = min(0.95, primary_score / max(total_score, 1))
secondary = [
{"category": cat, "score": score}
for cat, score in sorted_scores[1:3]
if score > 0
]
return {
"primary_category": primary_category,
"confidence": round(confidence, 2),
"all_scores": scores,
"keywords_matched": matched_keywords.get(primary_category, []),
"secondary_categories": secondary,
"description": ROOT_CAUSE_CATEGORIES.get(primary_category, {}).get(
"description", ""
),
"method": "keyword"
}
def classify(
self,
root_cause_text: str,
report_text: str = "",
context: Optional[Dict] = None,
) -> Dict[str, Any]:
"""
Classify a root cause text into categories using Transformer + Rule-based hybrid
"""
if not root_cause_text and not report_text:
return {
"primary_category": "Unknown",
"confidence": 0.0,
"all_scores": {},
"keywords_matched": [],
"secondary_categories": [],
}
# Check Cache
cache_key = f"{report_text}|{root_cause_text}"
if cache_key in self._classification_cache:
return self._classification_cache[cache_key].copy()
result = None
# 1. Try Transformer (High Accuracy)
try:
from data.nlp_service import get_nlp_service
nlp = get_nlp_service()
if nlp.models_loaded:
combined = f"{report_text} [SEP] {root_cause_text}"
rc_pred = nlp.classify_root_cause([combined])[0]
if rc_pred["label"] != "Unknown" and rc_pred["confidence"] > 0.4:
result = {
"primary_category": rc_pred["label"],
"confidence": rc_pred["confidence"],
"all_scores": {rc_pred["label"]: rc_pred["confidence"]},
"keywords_matched": [],
"secondary_categories": [],
"description": ROOT_CAUSE_CATEGORIES.get(rc_pred["label"], {}).get("description", ""),
"method": "transformer"
}
except Exception as e:
logger.warning(f"Transformer RC classification failed: {e}")
# 2. Try ML classifier if available
if result is None:
result = self._classify_ml(root_cause_text, report_text)
# 3. Fallback to Keyword Matching (Legacy)
if result is None:
result = self._classify_keyword(root_cause_text, report_text)
# Update Cache
self._classification_cache[cache_key] = result.copy()
return result
def classify_batch(
self, records: List[Dict], text_field: str = "Root_Caused"
) -> List[Dict[str, Any]]:
"""
Classify multiple records using optimized batch inference
"""
results = []
texts_for_transformer = []
valid_indices = []
for i, record in enumerate(records):
root_cause = record.get(text_field, "")
report = record.get("Report", "")
# 0. Check Cache First (O(1) vs heavy O(Transformer))
cache_key = f"{report}|{root_cause}"
if cache_key in self._classification_cache:
results.append({**self._classification_cache[cache_key], "row_id": record.get("_row_id")})
continue
# Prepare context for transformer
if root_cause or report:
combined = f"{report} [SEP] {root_cause}"
texts_for_transformer.append(combined)
valid_indices.append(i)
# Initialize results with placeholders
results.append({
"primary_category": "Unknown",
"confidence": 0.0,
"all_scores": {},
"keywords_matched": [],
"secondary_categories": [],
"row_id": record.get("_row_id")
})
# 1. Batch Transformer Inference (High Efficiency)
if texts_for_transformer:
try:
from data.nlp_service import get_nlp_service
nlp = get_nlp_service()
if nlp.models_loaded:
mt_results = nlp.classify_root_cause(texts_for_transformer)
for idx, mt_res in zip(valid_indices, mt_results):
if mt_res["label"] != "Unknown" and mt_res["confidence"] > 0.4:
results[idx].update({
"primary_category": mt_res["label"],
"confidence": mt_res["confidence"],
"all_scores": {mt_res["label"]: mt_res["confidence"]},
"description": ROOT_CAUSE_CATEGORIES.get(mt_res["label"], {}).get("description", ""),
"method": "transformer"
})
# Update cache
record = records[idx]
cache_key = f"{record.get('Report', '')}|{record.get(text_field, '')}"
self._classification_cache[cache_key] = results[idx].copy()
del self._classification_cache[cache_key]["row_id"] # Don't cache row_id
except Exception as e:
logger.warning(f"Batch transformer classification failed, falling back: {e}")
# 2. ML classification for remaining Unknowns
if self.classifier is not None and self.vectorizer is not None:
unknown_indices = [i for i, r in enumerate(results) if r["primary_category"] == "Unknown"]
if unknown_indices:
texts = []
for i in unknown_indices:
rc = records[i].get(text_field, "")
rep = records[i].get("Report", "")
texts.append(f"{rc} {rep}".strip())
try:
X = self.vectorizer.transform(texts)
if hasattr(self.classifier, "predict_proba"):
proba = self.classifier.predict_proba(X)
preds = np.argmax(proba, axis=1)
labels = self.classifier.classes_[preds]
confs = proba[np.arange(len(preds)), preds]
else:
labels = self.classifier.predict(X)
confs = np.full(len(labels), 0.7)
for j, i in enumerate(unknown_indices):
label = str(labels[j])
conf = float(confs[j])
if conf >= 0.4 and label:
results[i].update({
"primary_category": label,
"confidence": round(conf, 2),
"all_scores": {label: round(conf, 2)},
"description": ROOT_CAUSE_CATEGORIES.get(label, {}).get("description", ""),
"method": "tfidf-ml"
})
except Exception as e:
logger.warning(f"Batch ML classification failed: {e}")
# 3. Fallback to Keyword Matching for any remaining Unknowns
for i, record in enumerate(records):
if results[i]["primary_category"] == "Unknown":
root_cause = record.get(text_field, "")
report = record.get("Report", "")
if root_cause or report:
res = self._classify_keyword(root_cause, report)
if res["primary_category"] != "Unknown":
row_id = results[i]["row_id"]
results[i] = res
results[i]["row_id"] = row_id
return results
def train_from_data(self, records: List[Dict]) -> Dict[str, Any]:
"""
Train/improve classifier from labeled data using batch processing
"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report
logger.info(f"Training root cause classifier from {len(records)} records using batch labeling...")
# Bulk classify to get labels
classifications = self.classify_batch(records)
texts = []
labels = []
for record, classification in zip(records, classifications):
root_cause = record.get("Root_Caused", "")
report = record.get("Report", "")
combined = f"{root_cause} {report}".strip()
if not combined or len(combined) < 10:
continue
category = classification["primary_category"]
if category != "Unknown":
texts.append(combined)
labels.append(category)
if len(set(labels)) < 2:
logger.warning("Not enough variety in labels for training")
return {
"status": "insufficient_data",
"records_processed": len(records),
"usable_records": len(texts),
}
X_train, X_test, y_train, y_test = train_test_split(
texts, labels, test_size=0.2, random_state=42
)
vectorizer = TfidfVectorizer(
max_features=20000, ngram_range=(1, 2), sublinear_tf=True, lowercase=True, min_df=2
)
X_train_vec = vectorizer.fit_transform(X_train)
X_test_vec = vectorizer.transform(X_test)
classes = np.unique(y_train)
try:
class_weights = compute_class_weight(class_weight="balanced", classes=classes, y=y_train)
cw = {c: w for c, w in zip(classes, class_weights)}
except Exception:
cw = None
classifier = LogisticRegression(max_iter=2000)
if cw:
try:
classifier.set_params(class_weight=cw)
except Exception:
pass
classifier.fit(X_train_vec, y_train)
accuracy = classifier.score(X_test_vec, y_test)
logger.info(f"Root cause classifier trained with accuracy: {accuracy:.2%}")
try:
y_pred = classifier.predict(X_test_vec)
logger.info("Root cause classification report:\n" + classification_report(y_test, y_pred))
except Exception:
pass
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
model_dir = os.path.join(base_dir, "models", "root_cause")
os.makedirs(model_dir, exist_ok=True)
with open(os.path.join(model_dir, "classifier.pkl"), "wb") as f:
pickle.dump(classifier, f)
with open(os.path.join(model_dir, "vectorizer.pkl"), "wb") as f:
pickle.dump(vectorizer, f)
self.classifier = classifier
self.vectorizer = vectorizer
self._classification_cache = {} # Clear cache as models changed
category_dist = Counter(labels)
return {
"status": "success",
"records_processed": len(records),
"usable_records": len(texts),
"accuracy": round(float(accuracy), 3),
"category_distribution": dict(category_dist),
}
def get_statistics(self, records: List[Dict]) -> Dict[str, Any]:
"""
Get root cause statistics from records
Args:
records: List of records to analyze
Returns:
Dict with statistics by category
"""
stats = defaultdict(
lambda: {
"count": 0,
"categories": Counter(),
"areas": Counter(),
"airlines": Counter(),
}
)
total_classified = 0
total_unknown = 0
# Use batch classification for performance (O(N/batch_size) instead of O(N))
classifications = self.classify_batch(records)
for record, classification in zip(records, classifications):
category = classification["primary_category"]
if category != "Unknown":
total_classified += 1
stats[category]["count"] += 1
stats[category]["categories"][
record.get("Irregularity_Complain_Category", "Unknown")
] += 1
stats[category]["areas"][record.get("Area", "Unknown")] += 1
stats[category]["airlines"][record.get("Airlines", "Unknown")] += 1
else:
total_unknown += 1
result = {}
for category, data in stats.items():
result[category] = {
"count": data["count"],
"percentage": round(data["count"] / max(total_classified, 1) * 100, 1),
"top_issue_categories": dict(data["categories"].most_common(3)),
"top_areas": dict(data["areas"].most_common(3)),
"top_airlines": dict(data["airlines"].most_common(3)),
"description": ROOT_CAUSE_CATEGORIES.get(category, {}).get(
"description", ""
),
}
return {
"total_records": len(records),
"classified": total_classified,
"unknown": total_unknown,
"classification_rate": round(
total_classified / max(len(records), 1) * 100, 1
),
"by_category": result,
"top_categories": sorted(result.items(), key=lambda x: -x[1]["count"])[:5],
}
def get_categories(self) -> Dict[str, Dict]:
"""Get all available root cause categories"""
return {
cat: {
"name": cat,
"description": config["description"],
"keyword_count": len(config["keywords"]),
"severity_multiplier": config["severity_multiplier"],
}
for cat, config in ROOT_CAUSE_CATEGORIES.items()
}
_root_cause_service: Optional[RootCauseService] = None
def get_root_cause_service() -> RootCauseService:
"""Get singleton instance"""
global _root_cause_service
if _root_cause_service is None:
_root_cause_service = RootCauseService()
return _root_cause_service