Spaces:
Build error
Build error
| """ | |
| Action Recommendation Service for Gapura AI | |
| Recommends actions based on historical patterns and similarity | |
| """ | |
| import os | |
| import logging | |
| import pickle | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from collections import Counter, defaultdict | |
| import re | |
| logger = logging.getLogger(__name__) | |
| class ActionRecommendationService: | |
| """ | |
| Recommends actions based on historical data | |
| Features: | |
| - Pattern matching on issue types | |
| - Historical successful actions | |
| - Category-specific recommendations | |
| - Priority-based suggestions | |
| """ | |
| ACTION_TEMPLATES = { | |
| "Cargo Problems": [ | |
| "Coordinate with warehouse team to verify cargo condition", | |
| "Document damage with photos and report to supervisor", | |
| "Contact airline cargo department for claim process", | |
| "Segregate damaged cargo and notify relevant parties", | |
| "Review loading/unloading procedures", | |
| "Update cargo manifest and notify destination", | |
| ], | |
| "GSE": [ | |
| "Report equipment issue to maintenance team", | |
| "Request replacement equipment if available", | |
| "Document equipment malfunction details", | |
| "Implement temporary workaround procedure", | |
| "Schedule preventive maintenance check", | |
| "Review equipment inspection logs", | |
| ], | |
| "Operation": [ | |
| "Review and update standard operating procedures", | |
| "Brief staff on correct procedures", | |
| "Implement additional checks and verification", | |
| "Coordinate with relevant departments", | |
| "Document incident for training purposes", | |
| "Schedule follow-up review meeting", | |
| ], | |
| "Pax Handling": [ | |
| "Assist passenger with immediate needs", | |
| "Coordinate with airline customer service", | |
| "Document passenger complaint details", | |
| "Escalate to supervisor if needed", | |
| "Provide explanation and apology to passenger", | |
| "Follow up on passenger compensation if applicable", | |
| ], | |
| "Baggage Handling": [ | |
| "Initiate baggage tracing process", | |
| "Document baggage details and damage", | |
| "Coordinate with airline baggage services", | |
| "Assist passenger with baggage claim", | |
| "Review baggage handling procedures", | |
| "Implement additional baggage checks", | |
| ], | |
| "Flight Document Handling": [ | |
| "Verify and correct documentation errors", | |
| "Coordinate with flight operations team", | |
| "Update all relevant parties on corrections", | |
| "Review document handling procedures", | |
| "Implement additional verification steps", | |
| "Train staff on proper documentation", | |
| ], | |
| "Procedure Competencies": [ | |
| "Review current SOP with staff involved", | |
| "Provide additional training if needed", | |
| "Update procedures if gaps identified", | |
| "Implement supervision until competency verified", | |
| "Document incident for training records", | |
| "Schedule regular competency assessments", | |
| ], | |
| } | |
| SEVERITY_ACTIONS = { | |
| "Critical": [ | |
| "IMMEDIATE: Notify supervisor and management", | |
| "IMMEDIATE: Secure the area and ensure safety", | |
| "IMMEDIATE: Document all details thoroughly", | |
| "IMMEDIATE: Implement containment measures", | |
| "URGENT: Coordinate emergency response if needed", | |
| "URGENT: Prepare detailed incident report", | |
| ], | |
| "High": [ | |
| "Notify supervisor immediately", | |
| "Implement corrective action", | |
| "Document incident with photos if applicable", | |
| "Inform relevant departments", | |
| "Schedule follow-up review", | |
| ], | |
| "Medium": [ | |
| "Document the incident", | |
| "Review procedures", | |
| "Inform team lead", | |
| "Monitor for recurrence", | |
| ], | |
| "Low": [ | |
| "Log the incident", | |
| "Brief staff on prevention", | |
| "Update records", | |
| ], | |
| } | |
| def __init__(self): | |
| self.historical_actions = {} | |
| self.action_effectiveness = {} | |
| self._load_data() | |
| def _load_data(self): | |
| """Load historical action data""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| data_path = os.path.join(base_dir, "models", "action_recommendations.pkl") | |
| if os.path.exists(data_path): | |
| try: | |
| with open(data_path, "rb") as f: | |
| data = pickle.load(f) | |
| self.historical_actions = data.get("historical_actions", {}) | |
| self.action_effectiveness = data.get("action_effectiveness", {}) | |
| logger.info("Action recommendation data loaded") | |
| except Exception as e: | |
| logger.warning(f"Failed to load action data: {e}") | |
| def _save_data(self): | |
| """Save action data""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| data_path = os.path.join(base_dir, "models", "action_recommendations.pkl") | |
| os.makedirs(os.path.dirname(data_path), exist_ok=True) | |
| with open(data_path, "wb") as f: | |
| pickle.dump( | |
| { | |
| "historical_actions": self.historical_actions, | |
| "action_effectiveness": self.action_effectiveness, | |
| }, | |
| f, | |
| ) | |
| def train_from_data(self, records: List[Dict]): | |
| """ | |
| Train recommendation system from historical data | |
| Args: | |
| records: List of historical records with Action_Taken | |
| """ | |
| logger.info(f"Training action recommender from {len(records)} records...") | |
| for record in records: | |
| issue_type = record.get("Irregularity_Complain_Category", "Unknown") | |
| action = record.get("Action_Taken", "") | |
| gapura_action = record.get("Gapura_KPS_Action_Taken", "") | |
| status = record.get("Status", "") | |
| if not issue_type or issue_type == "Unknown": | |
| continue | |
| if issue_type not in self.historical_actions: | |
| self.historical_actions[issue_type] = [] | |
| # Store successful actions | |
| if action and action != "-": | |
| self.historical_actions[issue_type].append( | |
| { | |
| "action": action, | |
| "status": status, | |
| "airline": record.get("Airlines", ""), | |
| "area": record.get("Area", ""), | |
| } | |
| ) | |
| if gapura_action and gapura_action != "-": | |
| self.historical_actions[issue_type].append( | |
| { | |
| "action": gapura_action, | |
| "status": status, | |
| "is_company_action": True, | |
| } | |
| ) | |
| # Calculate effectiveness scores | |
| for issue_type, actions in self.historical_actions.items(): | |
| closed_count = sum( | |
| 1 for a in actions if a.get("status", "").lower() == "closed" | |
| ) | |
| total_count = len(actions) | |
| if total_count > 0: | |
| self.action_effectiveness[issue_type] = closed_count / total_count | |
| self._save_data() | |
| logger.info("Action recommender trained and saved") | |
| def recommend( | |
| self, | |
| report: str, | |
| issue_type: str, | |
| severity: str = "Medium", | |
| area: str = None, | |
| airline: str = None, | |
| top_n: int = 5, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get action recommendations | |
| Args: | |
| report: Report text for context | |
| issue_type: Type of issue (Cargo Problems, GSE, etc.) | |
| severity: Severity level (Critical, High, Medium, Low) | |
| area: Area type (Terminal Area, Apron Area) | |
| airline: Airline name | |
| top_n: Number of recommendations to return | |
| Returns: | |
| Dict with recommendations and confidence | |
| """ | |
| recommendations = [] | |
| # Get severity-based actions | |
| severity_actions = self.SEVERITY_ACTIONS.get(severity, []) | |
| for action in severity_actions[:2]: | |
| recommendations.append( | |
| { | |
| "action": action, | |
| "priority": "HIGH" if "IMMEDIATE" in action else "MEDIUM", | |
| "source": "severity_protocol", | |
| "rationale": f"Standar protokol untuk tingkat keparahan {severity}", | |
| "confidence": 0.9, | |
| } | |
| ) | |
| # Better category matching | |
| matched_category = None | |
| for cat_key in self.ACTION_TEMPLATES.keys(): | |
| if cat_key.lower() in issue_type.lower() or issue_type.lower() in cat_key.lower(): | |
| matched_category = cat_key | |
| break | |
| # Get category-based actions | |
| category_actions = self.ACTION_TEMPLATES.get(matched_category or issue_type, []) | |
| for i, action in enumerate(category_actions[:3]): | |
| recommendations.append( | |
| { | |
| "action": action, | |
| "priority": "MEDIUM", | |
| "source": "category_template", | |
| "rationale": f"Tindakan berdasarkan kategori {matched_category or issue_type}", | |
| "confidence": 0.8 - (i * 0.1), | |
| } | |
| ) | |
| # Retrieve actions from similar historical reports | |
| similar_candidates: List[Tuple[str, float, Dict[str, Any]]] = [] | |
| try: | |
| from data.similarity_service import get_similarity_service | |
| similarity_service = get_similarity_service() | |
| similar = similarity_service.find_similar(report, top_k=30, threshold=0.2) | |
| # Prefer matches within the same category when available | |
| filtered = [] | |
| for s in similar: | |
| meta = s.get("report_metadata", {}) | |
| cat = (meta.get("category") or "").lower() | |
| if cat and (cat in (issue_type or "").lower() or (issue_type or "").lower() in cat): | |
| filtered.append(s) | |
| pool = filtered if filtered else similar | |
| for s in pool: | |
| meta = s.get("report_metadata", {}) | |
| act1 = (meta.get("action_taken") or "").strip() | |
| act2 = (meta.get("gapura_action") or "").strip() | |
| if act1: | |
| similar_candidates.append((act1, float(s.get("similarity", 0.0)), meta)) | |
| if act2: | |
| similar_candidates.append((act2, float(s.get("similarity", 0.0)), meta)) | |
| except Exception: | |
| pass | |
| # Aggregate and score retrieved actions | |
| if similar_candidates: | |
| agg: Dict[str, Dict[str, Any]] = {} | |
| for action_text, sim, meta in similar_candidates: | |
| key = self._normalize_action(action_text) | |
| if not key: | |
| continue | |
| if key not in agg: | |
| agg[key] = { | |
| "text": action_text[:400], | |
| "score": 0.0, | |
| "count": 0, | |
| "closed": 0, | |
| "matches": [], | |
| "airline_match": 0, | |
| "area_match": 0, | |
| } | |
| weight = sim | |
| if isinstance(meta.get("status", ""), str) and meta.get("status", "").lower() == "closed": | |
| weight += 0.1 | |
| agg[key]["closed"] += 1 | |
| if airline and meta.get("airline") and airline.lower() == meta.get("airline", "").lower(): | |
| weight += 0.05 | |
| agg[key]["airline_match"] += 1 | |
| if area and meta.get("area") and area.lower() in meta.get("area", "").lower(): | |
| weight += 0.05 | |
| agg[key]["area_match"] += 1 | |
| agg[key]["score"] += max(0.0, weight) | |
| agg[key]["count"] += 1 | |
| if len(agg[key]["matches"]) < 3: | |
| agg[key]["matches"].append( | |
| { | |
| "row_id": meta.get("row_id"), | |
| "similarity": round(sim, 3), | |
| "airline": meta.get("airline"), | |
| "area": meta.get("area"), | |
| "status": meta.get("status"), | |
| } | |
| ) | |
| ranked = sorted( | |
| agg.values(), | |
| key=lambda x: (-(x["score"] / max(1, x["count"])), -x["count"], -x["closed"]), | |
| ) | |
| for item in ranked[: max(0, top_n - len(recommendations))]: | |
| prio = "MEDIUM" if item["closed"] > 0 else "LOW" | |
| conf = min(0.95, 0.6 + (item["score"] / max(1, item["count"])) * 0.3 + min(item["closed"], 3) * 0.03) | |
| rationale_bits = [] | |
| if item["closed"] > 0: | |
| rationale_bits.append(f"{item['closed']} kasus closed") | |
| if item["airline_match"] > 0: | |
| rationale_bits.append("konteks maskapai cocok") | |
| if item["area_match"] > 0: | |
| rationale_bits.append("konteks area cocok") | |
| rationale_txt = " ; ".join(rationale_bits) if rationale_bits else "berdasarkan kemiripan historis" | |
| recommendations.append( | |
| { | |
| "action": item["text"], | |
| "priority": prio, | |
| "source": "retrieval", | |
| "rationale": rationale_txt, | |
| "confidence": round(conf, 3), | |
| "supportingCases": item["matches"], | |
| } | |
| ) | |
| # Fallback to historical simple matching when needed | |
| historical = self.historical_actions.get(issue_type, []) | |
| if historical and len(recommendations) < top_n: | |
| report_keywords = self._extract_keywords(report) | |
| for hist in historical: | |
| if len(recommendations) >= top_n: | |
| break | |
| hist_action = hist.get("action", "") | |
| if not hist_action or hist_action == "-": | |
| continue | |
| hist_keywords = self._extract_keywords(hist_action) | |
| common_keywords = report_keywords & hist_keywords | |
| if len(common_keywords) > 0 or len(historical) < 10: | |
| priority = "LOW" | |
| if hist.get("status", "").lower() == "closed": | |
| priority = "MEDIUM" | |
| if not any(r["action"] == hist_action for r in recommendations): | |
| recommendations.append( | |
| { | |
| "action": hist_action[:200], | |
| "priority": priority, | |
| "source": "historical", | |
| "rationale": "tindakan historis relevan", | |
| "confidence": 0.65 + len(common_keywords) * 0.05, | |
| "from_airline": hist.get("airline"), | |
| } | |
| ) | |
| # Sort by confidence and priority | |
| priority_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2} | |
| recommendations.sort( | |
| key=lambda x: (priority_order.get(x["priority"], 2), -x["confidence"]) | |
| ) | |
| # Deduplicate and limit | |
| seen = set() | |
| unique_recommendations = [] | |
| for r in recommendations: | |
| action_key = r["action"][:50] | |
| if action_key not in seen: | |
| seen.add(action_key) | |
| unique_recommendations.append(r) | |
| return { | |
| "recommendations": unique_recommendations[:top_n], | |
| "issue_type": issue_type, | |
| "severity": severity, | |
| "effectiveness_score": self.action_effectiveness.get(issue_type, 0.5), | |
| "historical_cases": len(historical), | |
| } | |
| def _extract_keywords(self, text: str) -> set: | |
| """Extract keywords from text""" | |
| if not text: | |
| return set() | |
| # Common Indonesian/English stop words | |
| stop_words = { | |
| "dan", | |
| "yang", | |
| "di", | |
| "ke", | |
| "dari", | |
| "untuk", | |
| "pada", | |
| "dengan", | |
| "the", | |
| "and", | |
| "to", | |
| "for", | |
| "from", | |
| "with", | |
| "by", | |
| "at", | |
| "in", | |
| "adalah", | |
| "ini", | |
| "itu", | |
| "tidak", | |
| "ada", | |
| "oleh", | |
| "jika", | |
| "a", | |
| "an", | |
| "is", | |
| "are", | |
| "was", | |
| "were", | |
| "be", | |
| "been", | |
| } | |
| words = re.findall(r"[a-zA-Z]+", text.lower()) | |
| return set(w for w in words if w not in stop_words and len(w) > 2) | |
| def _normalize_action(self, text: str) -> str: | |
| if not text: | |
| return "" | |
| t = re.sub(r"\s+", " ", text.strip().lower()) | |
| t = re.sub(r"[^\w\s]", "", t) | |
| return t | |
| def get_quick_actions(self, issue_type: str) -> List[str]: | |
| """Get quick action suggestions for an issue type""" | |
| templates = self.ACTION_TEMPLATES.get(issue_type, []) | |
| return templates[:3] | |
| _action_service: Optional[ActionRecommendationService] = None | |
| def get_action_service() -> ActionRecommendationService: | |
| """Get singleton action service instance""" | |
| global _action_service | |
| if _action_service is None: | |
| _action_service = ActionRecommendationService() | |
| return _action_service | |