gapura-ai-api / data /branch_analytics_service.py
Muhammad Ridzki Nugraha
Upload folder using huggingface_hub
13c3f2c verified
"""
Branch Analytics Service for Gapura AI
Comprehensive branch performance dashboard
Separated by: Landside & Airside (Non-Cargo) and CGO (Cargo)
"""
import os
import logging
import pickle
from typing import List, Dict, Any, Optional
from collections import Counter, defaultdict
from datetime import datetime
import numpy as np
logger = logging.getLogger(__name__)
CATEGORY_TYPES = {
"landside_airside": {
"name": "Landside & Airside",
"description": "Non-cargo operations (terminal, passenger, baggage handling)",
"sheet": "NON CARGO",
},
"cgo": {"name": "CGO", "description": "Cargo operations", "sheet": "CGO"},
}
class BranchAnalyticsService:
"""
Branch performance analytics service
Provides comprehensive metrics for each branch with category type separation
"""
def __init__(self):
self.branch_data = {"landside_airside": {}, "cgo": {}}
self.last_updated = None
self._load_data()
def _load_data(self):
"""Load cached branch data"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_path = os.path.join(base_dir, "models", "branch_analytics.pkl")
if os.path.exists(data_path):
try:
with open(data_path, "rb") as f:
cached = pickle.load(f)
self.branch_data = cached.get("branch_data", self.branch_data)
self.last_updated = cached.get("last_updated")
logger.info("Branch analytics data loaded")
except Exception as e:
logger.warning(f"Failed to load branch analytics: {e}")
def _save_data(self):
"""Save branch data to cache"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_path = os.path.join(base_dir, "models", "branch_analytics.pkl")
os.makedirs(os.path.dirname(data_path), exist_ok=True)
with open(data_path, "wb") as f:
pickle.dump(
{
"branch_data": self.branch_data,
"last_updated": self.last_updated,
},
f,
)
logger.info(f"Branch analytics saved to {data_path}")
def _get_category_type(self, record: Dict) -> str:
"""Determine category type from record"""
sheet = record.get("_sheet_name", record.get("_source_sheet", ""))
if sheet == "NON CARGO":
return "landside_airside"
elif sheet == "CGO":
return "cgo"
return "landside_airside"
def calculate_branch_metrics(
self, records: List[Dict], category_type: str = None
) -> Dict[str, Any]:
"""
Calculate comprehensive metrics for branches
Args:
records: List of all records
category_type: "landside_airside", "cgo", or None for both
Returns:
Dict with branch metrics
"""
logger.info(f"Calculating branch metrics for {len(records)} records...")
if category_type:
category_types = [category_type]
else:
category_types = ["landside_airside", "cgo"]
for cat_type in category_types:
self.branch_data[cat_type] = {}
branch_aggregates = {ct: defaultdict(list) for ct in category_types}
monthly_by_branch = {
ct: defaultdict(lambda: defaultdict(int)) for ct in category_types
}
for record in records:
ct = self._get_category_type(record)
if ct not in category_types:
continue
branch = record.get("Branch", "Unknown")
if not branch or branch == "Unknown":
continue
severity = self._extract_severity(record)
date_str = record.get("Date_of_Event", "")
record_info = {
"severity": severity,
"category": record.get("Irregularity_Complain_Category", "Unknown"),
"airline": record.get("Airlines", "Unknown"),
"hub": record.get("HUB", "Unknown"),
"area": record.get("Area", "Unknown"),
"status": record.get("Status", "Unknown"),
"has_root_cause": bool(record.get("Root_Caused")),
"has_action": bool(record.get("Action_Taken")),
"date": date_str,
}
branch_aggregates[ct][branch].append(record_info)
if date_str:
try:
import pandas as pd
dt = pd.to_datetime(date_str, errors="coerce")
if not pd.isna(dt):
month_key = f"{dt.year}-{dt.month:02d}"
monthly_by_branch[ct][branch][month_key] += 1
except:
pass
for ct in category_types:
for branch, records_list in branch_aggregates[ct].items():
if not records_list:
continue
total = len(records_list)
severity_dist = Counter(r["severity"] for r in records_list)
category_dist = Counter(r["category"] for r in records_list)
airline_dist = Counter(r["airline"] for r in records_list)
status_dist = Counter(r["status"] for r in records_list)
critical_high = severity_dist.get("Critical", 0) + severity_dist.get(
"High", 0
)
with_root_cause = sum(1 for r in records_list if r["has_root_cause"])
with_action = sum(1 for r in records_list if r["has_action"])
months = monthly_by_branch[ct][branch]
if months:
month_values = list(months.values())
avg_monthly = np.mean(month_values)
trend = self._calculate_trend(month_values)
else:
avg_monthly = total
trend = "stable"
risk_score = self._calculate_branch_risk(
total, critical_high, with_root_cause, with_action, trend
)
self.branch_data[ct][branch] = {
"total_issues": total,
"severity_distribution": dict(severity_dist),
"top_categories": dict(category_dist.most_common(5)),
"top_airlines": dict(airline_dist.most_common(5)),
"status_distribution": dict(status_dist),
"critical_high_count": critical_high,
"critical_high_percentage": round(critical_high / total * 100, 1),
"resolution_rate": round(
status_dist.get("Closed", 0) / total * 100, 1
),
"documentation_rate": round(
(with_root_cause + with_action) / (total * 2) * 100, 1
),
"avg_monthly_issues": round(avg_monthly, 1),
"trend": trend,
"risk_score": risk_score,
"risk_level": self._get_risk_level(risk_score),
"hubs": list(set(r["hub"] for r in records_list if r["hub"])),
"areas": list(set(r["area"] for r in records_list if r["area"])),
}
self.last_updated = datetime.now().isoformat()
self._save_data()
return {
"status": "success",
"last_updated": self.last_updated,
"category_types": {
ct: {"branches": len(self.branch_data[ct])} for ct in category_types
},
}
def _extract_severity(self, record: Dict) -> str:
"""Extract severity from record text"""
report = (
record.get("Report", "") + " " + record.get("Root_Caused", "")
).lower()
critical_keywords = [
"darurat",
"kritis",
"emergency",
"kecelakaan",
"parah",
"serius",
]
high_keywords = ["rusak", "damage", "torn", "broken", "hilang", "lost"]
medium_keywords = ["delay", "terlambat", "salah", "wrong", "error"]
if any(kw in report for kw in critical_keywords):
return "Critical"
elif any(kw in report for kw in high_keywords):
return "High"
elif any(kw in report for kw in medium_keywords):
return "Medium"
return "Low"
def _calculate_trend(self, values: List[int]) -> str:
"""Calculate trend direction from time series values"""
if len(values) < 3:
return "stable"
recent = np.mean(values[-3:])
earlier = np.mean(values[:3])
if earlier == 0:
return "stable"
change_pct = (recent - earlier) / earlier * 100
if change_pct > 20:
return "increasing"
elif change_pct < -20:
return "decreasing"
return "stable"
def _calculate_branch_risk(
self,
total: int,
critical_high: int,
with_root_cause: int,
with_action: int,
trend: str,
) -> float:
"""Calculate risk score for a branch"""
volume_score = min(total / 50, 2.0) * 20
severity_score = (critical_high / max(total, 1)) * 30
doc_rate = (with_root_cause + with_action) / max(total * 2, 1)
doc_score = (1 - doc_rate) * 25
trend_score = {"increasing": 25, "stable": 10, "decreasing": 0}.get(trend, 10)
risk_score = volume_score + severity_score + doc_score + trend_score
return round(min(100, max(0, risk_score)), 2)
def _get_risk_level(self, score: float) -> str:
"""Convert risk score to level"""
if score >= 70:
return "Critical"
elif score >= 50:
return "High"
elif score >= 30:
return "Medium"
return "Low"
def get_branch(
self, branch: str, category_type: str = None
) -> Optional[Dict[str, Any]]:
"""
Get metrics for a specific branch
Args:
branch: Branch name
category_type: "landside_airside", "cgo", or None for combined
Returns:
Branch metrics or None if not found
"""
if category_type:
return self.branch_data.get(category_type, {}).get(branch)
ls_data = self.branch_data.get("landside_airside", {}).get(branch, {})
cgo_data = self.branch_data.get("cgo", {}).get(branch, {})
if not ls_data and not cgo_data:
return None
if ls_data and not cgo_data:
ls_data["category_type"] = "landside_airside"
return ls_data
if cgo_data and not ls_data:
cgo_data["category_type"] = "cgo"
return cgo_data
combined = {
"branch": branch,
"total_issues": ls_data.get("total_issues", 0)
+ cgo_data.get("total_issues", 0),
"landside_airside": ls_data,
"cgo": cgo_data,
"combined_risk_score": round(
(ls_data.get("risk_score", 0) + cgo_data.get("risk_score", 0)) / 2, 2
),
}
return combined
def get_ranking(
self,
category_type: str = None,
sort_by: str = "risk_score",
limit: int = 20,
) -> List[Dict[str, Any]]:
"""
Get branch ranking
Args:
category_type: "landside_airside", "cgo", or None for both
sort_by: Field to sort by (risk_score, total_issues, critical_high_count)
limit: Maximum branches to return
Returns:
List of branch metrics sorted by specified field
"""
if category_type:
branches = self.branch_data.get(category_type, {})
ranking = [
{"branch": name, "category_type": category_type, **data}
for name, data in branches.items()
]
else:
ranking = []
for ct in ["landside_airside", "cgo"]:
for name, data in self.branch_data.get(ct, {}).items():
ranking.append({"branch": name, "category_type": ct, **data})
reverse_sort = sort_by in ["risk_score", "total_issues", "critical_high_count"]
ranking.sort(key=lambda x: x.get(sort_by, 0), reverse=reverse_sort)
return ranking[:limit]
def get_comparison(self) -> Dict[str, Any]:
"""
Compare all branches across category types
Returns:
Dict with comparison data
"""
ls_branches = self.branch_data.get("landside_airside", {})
cgo_branches = self.branch_data.get("cgo", {})
all_branch_names = set(ls_branches.keys()) | set(cgo_branches.keys())
comparison = []
for branch in all_branch_names:
ls = ls_branches.get(branch, {})
cgo = cgo_branches.get(branch, {})
comparison.append(
{
"branch": branch,
"landside_airside": {
"total_issues": ls.get("total_issues", 0),
"risk_score": ls.get("risk_score", 0),
"trend": ls.get("trend", "N/A"),
}
if ls
else None,
"cgo": {
"total_issues": cgo.get("total_issues", 0),
"risk_score": cgo.get("risk_score", 0),
"trend": cgo.get("trend", "N/A"),
}
if cgo
else None,
"total_combined": ls.get("total_issues", 0)
+ cgo.get("total_issues", 0),
}
)
comparison.sort(key=lambda x: -x["total_combined"])
return {
"last_updated": self.last_updated,
"total_branches": len(all_branch_names),
"landside_airside_only": len(
[
b
for b in all_branch_names
if b in ls_branches and b not in cgo_branches
]
),
"cgo_only": len(
[
b
for b in all_branch_names
if b in cgo_branches and b not in ls_branches
]
),
"both": len(
[b for b in all_branch_names if b in ls_branches and b in cgo_branches]
),
"branches": comparison,
}
def get_summary(self, category_type: str = None) -> Dict[str, Any]:
"""
Get overall summary of branch analytics
Args:
category_type: Optional filter by category type
Returns:
Summary statistics
"""
if category_type:
branches = self.branch_data.get(category_type, {})
total_issues = sum(b.get("total_issues", 0) for b in branches.values())
avg_risk = (
np.mean([b.get("risk_score", 0) for b in branches.values()])
if branches
else 0
)
risk_levels = Counter(
b.get("risk_level", "Unknown") for b in branches.values()
)
trends = Counter(b.get("trend", "Unknown") for b in branches.values())
return {
"category_type": category_type,
"total_branches": len(branches),
"total_issues": total_issues,
"avg_risk_score": round(avg_risk, 2),
"risk_level_distribution": dict(risk_levels),
"trend_distribution": dict(trends),
"last_updated": self.last_updated,
}
ls_summary = self.get_summary("landside_airside")
cgo_summary = self.get_summary("cgo")
return {
"landside_airside": ls_summary,
"cgo": cgo_summary,
"comparison": {
"ls_total_issues": ls_summary.get("total_issues", 0),
"cgo_total_issues": cgo_summary.get("total_issues", 0),
"ls_avg_risk": ls_summary.get("avg_risk_score", 0),
"cgo_avg_risk": cgo_summary.get("avg_risk_score", 0),
},
"last_updated": self.last_updated,
}
_branch_analytics_service: Optional[BranchAnalyticsService] = None
def get_branch_analytics_service() -> BranchAnalyticsService:
"""Get singleton instance"""
global _branch_analytics_service
if _branch_analytics_service is None:
_branch_analytics_service = BranchAnalyticsService()
return _branch_analytics_service