gapura-ai-api / data /risk_service.py
Muhammad Ridzki Nugraha
Upload folder using huggingface_hub
13c3f2c verified
"""
Risk Scoring Service for Gapura AI
Calculates risk scores for airlines, routes, branches, and hubs
"""
import os
import logging
import pickle
import re
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from collections import Counter, defaultdict
import numpy as np
logger = logging.getLogger(__name__)
class RiskScoringService:
"""
Comprehensive risk scoring system for irregularity reports
Risk Factors:
- Issue frequency (normalized)
- Severity distribution (critical/high weight)
- Resolution time (longer = higher risk)
- Trend direction (increasing = higher risk)
- Category diversity (more types = higher risk)
"""
SEVERITY_WEIGHTS = {
"Critical": 10.0,
"High": 5.0,
"Medium": 2.0,
"Low": 1.0,
"Unknown": 1.5,
}
ISSUE_TYPE_RISK = {
"Cargo Problems": 1.5,
"GSE": 1.3,
"Operation": 1.2,
"Pax Handling": 1.0,
"Baggage Handling": 1.1,
"Flight Document Handling": 0.9,
"Procedure Competencies": 1.0,
}
def __init__(self):
self.risk_data = {
"airlines": {},
"routes": {},
"branches": {},
"hubs": {},
"categories": {},
"last_updated": None,
}
self._cached_summary = None
self._compile_regex()
self._load_risk_data()
def _compile_regex(self):
"""Compile regex for severity matching"""
critical_keywords = [
"darurat", "kritis", "emergency", "kecelakaan", "parah", "serius"
]
high_keywords = ["rusak", "damage", "torn", "broken", "hilang", "lost"]
medium_keywords = ["delay", "terlambat", "salah", "wrong", "error"]
self.critical_regex = re.compile("|".join(critical_keywords), re.IGNORECASE)
self.high_regex = re.compile("|".join(high_keywords), re.IGNORECASE)
self.medium_regex = re.compile("|".join(medium_keywords), re.IGNORECASE)
def _load_risk_data(self):
"""Load pre-calculated risk data"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
risk_path = os.path.join(base_dir, "models", "risk_scores.pkl")
if os.path.exists(risk_path):
try:
with open(risk_path, "rb") as f:
self.risk_data = pickle.load(f)
self._cached_summary = None
logger.info("Risk data loaded successfully")
except Exception as e:
logger.warning(f"Failed to load risk data: {e}")
def _save_risk_data(self):
"""Save risk data to disk"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
risk_path = os.path.join(base_dir, "models", "risk_scores.pkl")
os.makedirs(os.path.dirname(risk_path), exist_ok=True)
with open(risk_path, "wb") as f:
pickle.dump(self.risk_data, f)
self._cached_summary = None
logger.info(f"Risk data saved to {risk_path}")
def calculate_all_risk_scores(self, data: List[Dict]) -> Dict[str, Any]:
"""
Calculate risk scores for all entities from data
Args:
data: List of report dictionaries
Returns:
Dict with risk scores for airlines, routes, branches, hubs
"""
logger.info(f"Calculating risk scores for {len(data)} records...")
# Aggregate data by entity
airline_data = defaultdict(list)
route_data = defaultdict(list)
branch_data = defaultdict(list)
hub_data = defaultdict(list)
category_data = defaultdict(list)
for record in data:
airline = record.get("Airlines", "Unknown")
route = record.get("Route", "Unknown")
branch = record.get("Branch", "Unknown")
hub = record.get("HUB", "Unknown")
category = record.get("Irregularity_Complain_Category", "Unknown")
record_info = {
"severity": self._extract_severity(record),
"category": category,
"area": record.get("Area", "Unknown"),
"has_root_cause": bool(record.get("Root_Caused")),
"has_action": bool(record.get("Action_Taken")),
}
airline_data[airline].append(record_info)
route_data[route].append(record_info)
branch_data[branch].append(record_info)
hub_data[hub].append(record_info)
category_data[category].append(record_info)
# Calculate scores
self.risk_data["airlines"] = self._calculate_entity_scores(
airline_data, "airline"
)
self.risk_data["routes"] = self._calculate_entity_scores(route_data, "route")
self.risk_data["branches"] = self._calculate_entity_scores(
branch_data, "branch"
)
self.risk_data["hubs"] = self._calculate_entity_scores(hub_data, "hub")
self.risk_data["categories"] = self._calculate_entity_scores(
category_data, "category"
)
self.risk_data["last_updated"] = datetime.now().isoformat()
self._save_risk_data()
return self.risk_data
def _extract_severity(self, record: Dict) -> str:
"""Extract severity from record using optimized regex"""
report = (
record.get("Report", "") + " " + record.get("Root_Caused", "")
)
if self.critical_regex.search(report):
return "Critical"
elif self.high_regex.search(report):
return "High"
elif self.medium_regex.search(report):
return "Medium"
return "Low"
def _calculate_entity_scores(
self, entity_data: Dict[str, List], entity_type: str
) -> Dict[str, Dict]:
"""Calculate risk scores for entities"""
scores = {}
total_records = sum(len(v) for v in entity_data.values())
entity_count = len(entity_data)
if entity_count == 0:
return scores
avg_per_entity = total_records / entity_count
for entity, records in entity_data.items():
if len(records) == 0:
continue
# Optimized single pass aggregation
severity_counts = Counter()
categories = set()
missing_data_count = 0
for r in records:
severity_counts[r["severity"]] += 1
cat = r.get("category")
if cat:
categories.add(cat)
if not r.get("has_root_cause") or not r.get("has_action"):
missing_data_count += 1
# Frequency score (normalized)
freq_score = min(len(records) / max(avg_per_entity, 1), 5.0)
# Severity score
severity_score = sum(
count * self.SEVERITY_WEIGHTS.get(sev, 1.0)
for sev, count in severity_counts.items()
) / len(records)
# Category diversity score
category_risk = sum(
self.ISSUE_TYPE_RISK.get(cat, 1.0) for cat in categories
) / max(len(categories), 1)
# Data quality score (lower is better for missing data)
quality_score = 1.0 + (missing_data_count / len(records))
# Composite risk score (0-100 scale)
risk_score = (
freq_score * 10 # Max 50
+ severity_score * 5 # Max ~25
+ category_risk * 10 # Max ~15
+ quality_score * 5 # Max ~10
)
risk_score = min(100, max(0, risk_score))
# Determine risk level
if risk_score >= 70:
risk_level = "Critical"
elif risk_score >= 50:
risk_level = "High"
elif risk_score >= 30:
risk_level = "Medium"
else:
risk_level = "Low"
scores[entity] = {
"risk_score": round(risk_score, 2),
"risk_level": risk_level,
"total_issues": len(records),
"severity_distribution": dict(severity_counts),
"issue_categories": list(categories),
"category_count": len(categories),
"data_quality_score": round(quality_score, 2),
"frequency_score": round(freq_score, 2),
"severity_score": round(severity_score, 2),
}
return scores
def get_airline_risk(self, airline: str) -> Optional[Dict]:
"""Get risk score for a specific airline"""
return self.risk_data["airlines"].get(airline)
def get_route_risk(self, route: str) -> Optional[Dict]:
"""Get risk score for a specific route"""
return self.risk_data["routes"].get(route)
def get_branch_risk(self, branch: str) -> Optional[Dict]:
"""Get risk score for a specific branch"""
return self.risk_data["branches"].get(branch)
def get_hub_risk(self, hub: str) -> Optional[Dict]:
"""Get risk score for a specific hub"""
return self.risk_data["hubs"].get(hub)
def get_all_airline_risks(self) -> Dict[str, Dict]:
"""Get all airline risk scores sorted by risk"""
airlines = self.risk_data.get("airlines", {})
return dict(sorted(airlines.items(), key=lambda x: -x[1]["risk_score"]))
def get_all_branch_risks(self) -> Dict[str, Dict]:
"""Get all branch risk scores sorted by risk"""
branches = self.risk_data.get("branches", {})
return dict(sorted(branches.items(), key=lambda x: -x[1]["risk_score"]))
def get_all_hub_risks(self) -> Dict[str, Dict]:
"""Get all hub risk scores sorted by risk"""
hubs = self.risk_data.get("hubs", {})
return dict(sorted(hubs.items(), key=lambda x: -x[1]["risk_score"]))
def get_risk_summary(self) -> Dict[str, Any]:
"""Get overall risk summary"""
if self._cached_summary:
return self._cached_summary
airlines = self.risk_data.get("airlines", {})
branches = self.risk_data.get("branches", {})
hubs = self.risk_data.get("hubs", {})
def get_risk_counts(data):
counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0}
for v in data.values():
level = v.get("risk_level", "Low")
counts[level] = counts.get(level, 0) + 1
return counts
def get_entity_details(data):
details = []
for name, v in data.items():
details.append({
"name": name,
"risk_score": v.get("risk_score", 0),
"risk_level": v.get("risk_level", "Low"),
"severity_distribution": v.get("severity_distribution", {}),
"issue_categories": v.get("issue_categories", [])
})
return sorted(details, key=lambda x: -x["risk_score"])
summary = {
"last_updated": self.risk_data.get("last_updated"),
"airline_risks": get_risk_counts(airlines),
"branch_risks": get_risk_counts(branches),
"hub_risks": get_risk_counts(hubs),
"top_risky_airlines": list(
dict(
sorted(airlines.items(), key=lambda x: -x[1]["risk_score"])[:5]
).keys()
),
"top_risky_branches": list(
dict(
sorted(branches.items(), key=lambda x: -x[1]["risk_score"])[:5]
).keys()
),
"airline_details": get_entity_details(airlines),
"branch_details": get_entity_details(branches),
"hub_details": get_entity_details(hubs),
"total_airlines": len(airlines),
"total_branches": len(branches),
"total_hubs": len(hubs),
}
self._cached_summary = summary
return summary
def get_risk_recommendations(self, entity_type: str, entity_name: str) -> List[str]:
"""Get recommendations based on risk score"""
entity_data = None
if entity_type == "airline":
entity_data = self.risk_data["airlines"].get(entity_name)
elif entity_type == "branch":
entity_data = self.risk_data["branches"].get(entity_name)
elif entity_type == "hub":
entity_data = self.risk_data["hubs"].get(entity_name)
if not entity_data:
return []
recommendations = []
risk_level = entity_data.get("risk_level", "Low")
risk_score = entity_data.get("risk_score", 0)
if risk_level in ["Critical", "High"]:
recommendations.append(
f"URGENT: {entity_name} has {risk_level} risk level (score: {risk_score})"
)
recommendations.append("Consider conducting immediate review of operations")
# Category-based recommendations
categories = entity_data.get("issue_categories", [])
if "Cargo Problems" in categories:
recommendations.append("Review cargo handling procedures and equipment")
if "GSE" in categories:
recommendations.append(
"Inspect Ground Support Equipment and maintenance schedules"
)
if "Pax Handling" in categories:
recommendations.append("Review passenger service training and protocols")
# Severity-based recommendations
severity_dist = entity_data.get("severity_distribution", {})
if severity_dist.get("Critical", 0) > 0:
recommendations.append(
f"Address {severity_dist['Critical']} critical issues immediately"
)
# Data quality recommendations
quality_score = entity_data.get("data_quality_score", 1)
if quality_score > 1.5:
recommendations.append(
"Improve documentation: ensure root cause and action taken are recorded"
)
return recommendations
_risk_service: Optional[RiskScoringService] = None
def get_risk_service() -> RiskScoringService:
"""Get singleton risk service instance"""
global _risk_service
if _risk_service is None:
_risk_service = RiskScoringService()
return _risk_service