Spaces:
Build error
Build error
| """ | |
| Risk Scoring Service for Gapura AI | |
| Calculates risk scores for airlines, routes, branches, and hubs | |
| """ | |
| import os | |
| import logging | |
| import pickle | |
| import re | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime, timedelta | |
| from collections import Counter, defaultdict | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| class RiskScoringService: | |
| """ | |
| Comprehensive risk scoring system for irregularity reports | |
| Risk Factors: | |
| - Issue frequency (normalized) | |
| - Severity distribution (critical/high weight) | |
| - Resolution time (longer = higher risk) | |
| - Trend direction (increasing = higher risk) | |
| - Category diversity (more types = higher risk) | |
| """ | |
| SEVERITY_WEIGHTS = { | |
| "Critical": 10.0, | |
| "High": 5.0, | |
| "Medium": 2.0, | |
| "Low": 1.0, | |
| "Unknown": 1.5, | |
| } | |
| ISSUE_TYPE_RISK = { | |
| "Cargo Problems": 1.5, | |
| "GSE": 1.3, | |
| "Operation": 1.2, | |
| "Pax Handling": 1.0, | |
| "Baggage Handling": 1.1, | |
| "Flight Document Handling": 0.9, | |
| "Procedure Competencies": 1.0, | |
| } | |
| def __init__(self): | |
| self.risk_data = { | |
| "airlines": {}, | |
| "routes": {}, | |
| "branches": {}, | |
| "hubs": {}, | |
| "categories": {}, | |
| "last_updated": None, | |
| } | |
| self._cached_summary = None | |
| self._compile_regex() | |
| self._load_risk_data() | |
| def _compile_regex(self): | |
| """Compile regex for severity matching""" | |
| critical_keywords = [ | |
| "darurat", "kritis", "emergency", "kecelakaan", "parah", "serius" | |
| ] | |
| high_keywords = ["rusak", "damage", "torn", "broken", "hilang", "lost"] | |
| medium_keywords = ["delay", "terlambat", "salah", "wrong", "error"] | |
| self.critical_regex = re.compile("|".join(critical_keywords), re.IGNORECASE) | |
| self.high_regex = re.compile("|".join(high_keywords), re.IGNORECASE) | |
| self.medium_regex = re.compile("|".join(medium_keywords), re.IGNORECASE) | |
| def _load_risk_data(self): | |
| """Load pre-calculated risk data""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| risk_path = os.path.join(base_dir, "models", "risk_scores.pkl") | |
| if os.path.exists(risk_path): | |
| try: | |
| with open(risk_path, "rb") as f: | |
| self.risk_data = pickle.load(f) | |
| self._cached_summary = None | |
| logger.info("Risk data loaded successfully") | |
| except Exception as e: | |
| logger.warning(f"Failed to load risk data: {e}") | |
| def _save_risk_data(self): | |
| """Save risk data to disk""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| risk_path = os.path.join(base_dir, "models", "risk_scores.pkl") | |
| os.makedirs(os.path.dirname(risk_path), exist_ok=True) | |
| with open(risk_path, "wb") as f: | |
| pickle.dump(self.risk_data, f) | |
| self._cached_summary = None | |
| logger.info(f"Risk data saved to {risk_path}") | |
| def calculate_all_risk_scores(self, data: List[Dict]) -> Dict[str, Any]: | |
| """ | |
| Calculate risk scores for all entities from data | |
| Args: | |
| data: List of report dictionaries | |
| Returns: | |
| Dict with risk scores for airlines, routes, branches, hubs | |
| """ | |
| logger.info(f"Calculating risk scores for {len(data)} records...") | |
| # Aggregate data by entity | |
| airline_data = defaultdict(list) | |
| route_data = defaultdict(list) | |
| branch_data = defaultdict(list) | |
| hub_data = defaultdict(list) | |
| category_data = defaultdict(list) | |
| for record in data: | |
| airline = record.get("Airlines", "Unknown") | |
| route = record.get("Route", "Unknown") | |
| branch = record.get("Branch", "Unknown") | |
| hub = record.get("HUB", "Unknown") | |
| category = record.get("Irregularity_Complain_Category", "Unknown") | |
| record_info = { | |
| "severity": self._extract_severity(record), | |
| "category": category, | |
| "area": record.get("Area", "Unknown"), | |
| "has_root_cause": bool(record.get("Root_Caused")), | |
| "has_action": bool(record.get("Action_Taken")), | |
| } | |
| airline_data[airline].append(record_info) | |
| route_data[route].append(record_info) | |
| branch_data[branch].append(record_info) | |
| hub_data[hub].append(record_info) | |
| category_data[category].append(record_info) | |
| # Calculate scores | |
| self.risk_data["airlines"] = self._calculate_entity_scores( | |
| airline_data, "airline" | |
| ) | |
| self.risk_data["routes"] = self._calculate_entity_scores(route_data, "route") | |
| self.risk_data["branches"] = self._calculate_entity_scores( | |
| branch_data, "branch" | |
| ) | |
| self.risk_data["hubs"] = self._calculate_entity_scores(hub_data, "hub") | |
| self.risk_data["categories"] = self._calculate_entity_scores( | |
| category_data, "category" | |
| ) | |
| self.risk_data["last_updated"] = datetime.now().isoformat() | |
| self._save_risk_data() | |
| return self.risk_data | |
| def _extract_severity(self, record: Dict) -> str: | |
| """Extract severity from record using optimized regex""" | |
| report = ( | |
| record.get("Report", "") + " " + record.get("Root_Caused", "") | |
| ) | |
| if self.critical_regex.search(report): | |
| return "Critical" | |
| elif self.high_regex.search(report): | |
| return "High" | |
| elif self.medium_regex.search(report): | |
| return "Medium" | |
| return "Low" | |
| def _calculate_entity_scores( | |
| self, entity_data: Dict[str, List], entity_type: str | |
| ) -> Dict[str, Dict]: | |
| """Calculate risk scores for entities""" | |
| scores = {} | |
| total_records = sum(len(v) for v in entity_data.values()) | |
| entity_count = len(entity_data) | |
| if entity_count == 0: | |
| return scores | |
| avg_per_entity = total_records / entity_count | |
| for entity, records in entity_data.items(): | |
| if len(records) == 0: | |
| continue | |
| # Optimized single pass aggregation | |
| severity_counts = Counter() | |
| categories = set() | |
| missing_data_count = 0 | |
| for r in records: | |
| severity_counts[r["severity"]] += 1 | |
| cat = r.get("category") | |
| if cat: | |
| categories.add(cat) | |
| if not r.get("has_root_cause") or not r.get("has_action"): | |
| missing_data_count += 1 | |
| # Frequency score (normalized) | |
| freq_score = min(len(records) / max(avg_per_entity, 1), 5.0) | |
| # Severity score | |
| severity_score = sum( | |
| count * self.SEVERITY_WEIGHTS.get(sev, 1.0) | |
| for sev, count in severity_counts.items() | |
| ) / len(records) | |
| # Category diversity score | |
| category_risk = sum( | |
| self.ISSUE_TYPE_RISK.get(cat, 1.0) for cat in categories | |
| ) / max(len(categories), 1) | |
| # Data quality score (lower is better for missing data) | |
| quality_score = 1.0 + (missing_data_count / len(records)) | |
| # Composite risk score (0-100 scale) | |
| risk_score = ( | |
| freq_score * 10 # Max 50 | |
| + severity_score * 5 # Max ~25 | |
| + category_risk * 10 # Max ~15 | |
| + quality_score * 5 # Max ~10 | |
| ) | |
| risk_score = min(100, max(0, risk_score)) | |
| # Determine risk level | |
| if risk_score >= 70: | |
| risk_level = "Critical" | |
| elif risk_score >= 50: | |
| risk_level = "High" | |
| elif risk_score >= 30: | |
| risk_level = "Medium" | |
| else: | |
| risk_level = "Low" | |
| scores[entity] = { | |
| "risk_score": round(risk_score, 2), | |
| "risk_level": risk_level, | |
| "total_issues": len(records), | |
| "severity_distribution": dict(severity_counts), | |
| "issue_categories": list(categories), | |
| "category_count": len(categories), | |
| "data_quality_score": round(quality_score, 2), | |
| "frequency_score": round(freq_score, 2), | |
| "severity_score": round(severity_score, 2), | |
| } | |
| return scores | |
| def get_airline_risk(self, airline: str) -> Optional[Dict]: | |
| """Get risk score for a specific airline""" | |
| return self.risk_data["airlines"].get(airline) | |
| def get_route_risk(self, route: str) -> Optional[Dict]: | |
| """Get risk score for a specific route""" | |
| return self.risk_data["routes"].get(route) | |
| def get_branch_risk(self, branch: str) -> Optional[Dict]: | |
| """Get risk score for a specific branch""" | |
| return self.risk_data["branches"].get(branch) | |
| def get_hub_risk(self, hub: str) -> Optional[Dict]: | |
| """Get risk score for a specific hub""" | |
| return self.risk_data["hubs"].get(hub) | |
| def get_all_airline_risks(self) -> Dict[str, Dict]: | |
| """Get all airline risk scores sorted by risk""" | |
| airlines = self.risk_data.get("airlines", {}) | |
| return dict(sorted(airlines.items(), key=lambda x: -x[1]["risk_score"])) | |
| def get_all_branch_risks(self) -> Dict[str, Dict]: | |
| """Get all branch risk scores sorted by risk""" | |
| branches = self.risk_data.get("branches", {}) | |
| return dict(sorted(branches.items(), key=lambda x: -x[1]["risk_score"])) | |
| def get_all_hub_risks(self) -> Dict[str, Dict]: | |
| """Get all hub risk scores sorted by risk""" | |
| hubs = self.risk_data.get("hubs", {}) | |
| return dict(sorted(hubs.items(), key=lambda x: -x[1]["risk_score"])) | |
| def get_risk_summary(self) -> Dict[str, Any]: | |
| """Get overall risk summary""" | |
| if self._cached_summary: | |
| return self._cached_summary | |
| airlines = self.risk_data.get("airlines", {}) | |
| branches = self.risk_data.get("branches", {}) | |
| hubs = self.risk_data.get("hubs", {}) | |
| def get_risk_counts(data): | |
| counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0} | |
| for v in data.values(): | |
| level = v.get("risk_level", "Low") | |
| counts[level] = counts.get(level, 0) + 1 | |
| return counts | |
| def get_entity_details(data): | |
| details = [] | |
| for name, v in data.items(): | |
| details.append({ | |
| "name": name, | |
| "risk_score": v.get("risk_score", 0), | |
| "risk_level": v.get("risk_level", "Low"), | |
| "severity_distribution": v.get("severity_distribution", {}), | |
| "issue_categories": v.get("issue_categories", []) | |
| }) | |
| return sorted(details, key=lambda x: -x["risk_score"]) | |
| summary = { | |
| "last_updated": self.risk_data.get("last_updated"), | |
| "airline_risks": get_risk_counts(airlines), | |
| "branch_risks": get_risk_counts(branches), | |
| "hub_risks": get_risk_counts(hubs), | |
| "top_risky_airlines": list( | |
| dict( | |
| sorted(airlines.items(), key=lambda x: -x[1]["risk_score"])[:5] | |
| ).keys() | |
| ), | |
| "top_risky_branches": list( | |
| dict( | |
| sorted(branches.items(), key=lambda x: -x[1]["risk_score"])[:5] | |
| ).keys() | |
| ), | |
| "airline_details": get_entity_details(airlines), | |
| "branch_details": get_entity_details(branches), | |
| "hub_details": get_entity_details(hubs), | |
| "total_airlines": len(airlines), | |
| "total_branches": len(branches), | |
| "total_hubs": len(hubs), | |
| } | |
| self._cached_summary = summary | |
| return summary | |
| def get_risk_recommendations(self, entity_type: str, entity_name: str) -> List[str]: | |
| """Get recommendations based on risk score""" | |
| entity_data = None | |
| if entity_type == "airline": | |
| entity_data = self.risk_data["airlines"].get(entity_name) | |
| elif entity_type == "branch": | |
| entity_data = self.risk_data["branches"].get(entity_name) | |
| elif entity_type == "hub": | |
| entity_data = self.risk_data["hubs"].get(entity_name) | |
| if not entity_data: | |
| return [] | |
| recommendations = [] | |
| risk_level = entity_data.get("risk_level", "Low") | |
| risk_score = entity_data.get("risk_score", 0) | |
| if risk_level in ["Critical", "High"]: | |
| recommendations.append( | |
| f"URGENT: {entity_name} has {risk_level} risk level (score: {risk_score})" | |
| ) | |
| recommendations.append("Consider conducting immediate review of operations") | |
| # Category-based recommendations | |
| categories = entity_data.get("issue_categories", []) | |
| if "Cargo Problems" in categories: | |
| recommendations.append("Review cargo handling procedures and equipment") | |
| if "GSE" in categories: | |
| recommendations.append( | |
| "Inspect Ground Support Equipment and maintenance schedules" | |
| ) | |
| if "Pax Handling" in categories: | |
| recommendations.append("Review passenger service training and protocols") | |
| # Severity-based recommendations | |
| severity_dist = entity_data.get("severity_distribution", {}) | |
| if severity_dist.get("Critical", 0) > 0: | |
| recommendations.append( | |
| f"Address {severity_dist['Critical']} critical issues immediately" | |
| ) | |
| # Data quality recommendations | |
| quality_score = entity_data.get("data_quality_score", 1) | |
| if quality_score > 1.5: | |
| recommendations.append( | |
| "Improve documentation: ensure root cause and action taken are recorded" | |
| ) | |
| return recommendations | |
| _risk_service: Optional[RiskScoringService] = None | |
| def get_risk_service() -> RiskScoringService: | |
| """Get singleton risk service instance""" | |
| global _risk_service | |
| if _risk_service is None: | |
| _risk_service = RiskScoringService() | |
| return _risk_service | |