""" Branch Analytics Service for Gapura AI Comprehensive branch performance dashboard Separated by: Landside & Airside (Non-Cargo) and CGO (Cargo) """ import os import logging import pickle from typing import List, Dict, Any, Optional from collections import Counter, defaultdict from datetime import datetime import numpy as np logger = logging.getLogger(__name__) CATEGORY_TYPES = { "landside_airside": { "name": "Landside & Airside", "description": "Non-cargo operations (terminal, passenger, baggage handling)", "sheet": "NON CARGO", }, "cgo": {"name": "CGO", "description": "Cargo operations", "sheet": "CGO"}, } class BranchAnalyticsService: """ Branch performance analytics service Provides comprehensive metrics for each branch with category type separation """ def __init__(self): self.branch_data = {"landside_airside": {}, "cgo": {}} self.last_updated = None self._load_data() def _load_data(self): """Load cached branch data""" base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) data_path = os.path.join(base_dir, "models", "branch_analytics.pkl") if os.path.exists(data_path): try: with open(data_path, "rb") as f: cached = pickle.load(f) self.branch_data = cached.get("branch_data", self.branch_data) self.last_updated = cached.get("last_updated") logger.info("Branch analytics data loaded") except Exception as e: logger.warning(f"Failed to load branch analytics: {e}") def _save_data(self): """Save branch data to cache""" base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) data_path = os.path.join(base_dir, "models", "branch_analytics.pkl") os.makedirs(os.path.dirname(data_path), exist_ok=True) with open(data_path, "wb") as f: pickle.dump( { "branch_data": self.branch_data, "last_updated": self.last_updated, }, f, ) logger.info(f"Branch analytics saved to {data_path}") def _get_category_type(self, record: Dict) -> str: """Determine category type from record""" sheet = record.get("_sheet_name", record.get("_source_sheet", "")) if sheet == "NON CARGO": return "landside_airside" elif sheet == "CGO": return "cgo" return "landside_airside" def calculate_branch_metrics( self, records: List[Dict], category_type: str = None ) -> Dict[str, Any]: """ Calculate comprehensive metrics for branches Args: records: List of all records category_type: "landside_airside", "cgo", or None for both Returns: Dict with branch metrics """ logger.info(f"Calculating branch metrics for {len(records)} records...") if category_type: category_types = [category_type] else: category_types = ["landside_airside", "cgo"] for cat_type in category_types: self.branch_data[cat_type] = {} branch_aggregates = {ct: defaultdict(list) for ct in category_types} monthly_by_branch = { ct: defaultdict(lambda: defaultdict(int)) for ct in category_types } for record in records: ct = self._get_category_type(record) if ct not in category_types: continue branch = record.get("Branch", "Unknown") if not branch or branch == "Unknown": continue severity = self._extract_severity(record) date_str = record.get("Date_of_Event", "") record_info = { "severity": severity, "category": record.get("Irregularity_Complain_Category", "Unknown"), "airline": record.get("Airlines", "Unknown"), "hub": record.get("HUB", "Unknown"), "area": record.get("Area", "Unknown"), "status": record.get("Status", "Unknown"), "has_root_cause": bool(record.get("Root_Caused")), "has_action": bool(record.get("Action_Taken")), "date": date_str, } branch_aggregates[ct][branch].append(record_info) if date_str: try: import pandas as pd dt = pd.to_datetime(date_str, errors="coerce") if not pd.isna(dt): month_key = f"{dt.year}-{dt.month:02d}" monthly_by_branch[ct][branch][month_key] += 1 except: pass for ct in category_types: for branch, records_list in branch_aggregates[ct].items(): if not records_list: continue total = len(records_list) severity_dist = Counter(r["severity"] for r in records_list) category_dist = Counter(r["category"] for r in records_list) airline_dist = Counter(r["airline"] for r in records_list) status_dist = Counter(r["status"] for r in records_list) critical_high = severity_dist.get("Critical", 0) + severity_dist.get( "High", 0 ) with_root_cause = sum(1 for r in records_list if r["has_root_cause"]) with_action = sum(1 for r in records_list if r["has_action"]) months = monthly_by_branch[ct][branch] if months: month_values = list(months.values()) avg_monthly = np.mean(month_values) trend = self._calculate_trend(month_values) else: avg_monthly = total trend = "stable" risk_score = self._calculate_branch_risk( total, critical_high, with_root_cause, with_action, trend ) self.branch_data[ct][branch] = { "total_issues": total, "severity_distribution": dict(severity_dist), "top_categories": dict(category_dist.most_common(5)), "top_airlines": dict(airline_dist.most_common(5)), "status_distribution": dict(status_dist), "critical_high_count": critical_high, "critical_high_percentage": round(critical_high / total * 100, 1), "resolution_rate": round( status_dist.get("Closed", 0) / total * 100, 1 ), "documentation_rate": round( (with_root_cause + with_action) / (total * 2) * 100, 1 ), "avg_monthly_issues": round(avg_monthly, 1), "trend": trend, "risk_score": risk_score, "risk_level": self._get_risk_level(risk_score), "hubs": list(set(r["hub"] for r in records_list if r["hub"])), "areas": list(set(r["area"] for r in records_list if r["area"])), } self.last_updated = datetime.now().isoformat() self._save_data() return { "status": "success", "last_updated": self.last_updated, "category_types": { ct: {"branches": len(self.branch_data[ct])} for ct in category_types }, } def _extract_severity(self, record: Dict) -> str: """Extract severity from record text""" report = ( record.get("Report", "") + " " + record.get("Root_Caused", "") ).lower() critical_keywords = [ "darurat", "kritis", "emergency", "kecelakaan", "parah", "serius", ] high_keywords = ["rusak", "damage", "torn", "broken", "hilang", "lost"] medium_keywords = ["delay", "terlambat", "salah", "wrong", "error"] if any(kw in report for kw in critical_keywords): return "Critical" elif any(kw in report for kw in high_keywords): return "High" elif any(kw in report for kw in medium_keywords): return "Medium" return "Low" def _calculate_trend(self, values: List[int]) -> str: """Calculate trend direction from time series values""" if len(values) < 3: return "stable" recent = np.mean(values[-3:]) earlier = np.mean(values[:3]) if earlier == 0: return "stable" change_pct = (recent - earlier) / earlier * 100 if change_pct > 20: return "increasing" elif change_pct < -20: return "decreasing" return "stable" def _calculate_branch_risk( self, total: int, critical_high: int, with_root_cause: int, with_action: int, trend: str, ) -> float: """Calculate risk score for a branch""" volume_score = min(total / 50, 2.0) * 20 severity_score = (critical_high / max(total, 1)) * 30 doc_rate = (with_root_cause + with_action) / max(total * 2, 1) doc_score = (1 - doc_rate) * 25 trend_score = {"increasing": 25, "stable": 10, "decreasing": 0}.get(trend, 10) risk_score = volume_score + severity_score + doc_score + trend_score return round(min(100, max(0, risk_score)), 2) def _get_risk_level(self, score: float) -> str: """Convert risk score to level""" if score >= 70: return "Critical" elif score >= 50: return "High" elif score >= 30: return "Medium" return "Low" def get_branch( self, branch: str, category_type: str = None ) -> Optional[Dict[str, Any]]: """ Get metrics for a specific branch Args: branch: Branch name category_type: "landside_airside", "cgo", or None for combined Returns: Branch metrics or None if not found """ if category_type: return self.branch_data.get(category_type, {}).get(branch) ls_data = self.branch_data.get("landside_airside", {}).get(branch, {}) cgo_data = self.branch_data.get("cgo", {}).get(branch, {}) if not ls_data and not cgo_data: return None if ls_data and not cgo_data: ls_data["category_type"] = "landside_airside" return ls_data if cgo_data and not ls_data: cgo_data["category_type"] = "cgo" return cgo_data combined = { "branch": branch, "total_issues": ls_data.get("total_issues", 0) + cgo_data.get("total_issues", 0), "landside_airside": ls_data, "cgo": cgo_data, "combined_risk_score": round( (ls_data.get("risk_score", 0) + cgo_data.get("risk_score", 0)) / 2, 2 ), } return combined def get_ranking( self, category_type: str = None, sort_by: str = "risk_score", limit: int = 20, ) -> List[Dict[str, Any]]: """ Get branch ranking Args: category_type: "landside_airside", "cgo", or None for both sort_by: Field to sort by (risk_score, total_issues, critical_high_count) limit: Maximum branches to return Returns: List of branch metrics sorted by specified field """ if category_type: branches = self.branch_data.get(category_type, {}) ranking = [ {"branch": name, "category_type": category_type, **data} for name, data in branches.items() ] else: ranking = [] for ct in ["landside_airside", "cgo"]: for name, data in self.branch_data.get(ct, {}).items(): ranking.append({"branch": name, "category_type": ct, **data}) reverse_sort = sort_by in ["risk_score", "total_issues", "critical_high_count"] ranking.sort(key=lambda x: x.get(sort_by, 0), reverse=reverse_sort) return ranking[:limit] def get_comparison(self) -> Dict[str, Any]: """ Compare all branches across category types Returns: Dict with comparison data """ ls_branches = self.branch_data.get("landside_airside", {}) cgo_branches = self.branch_data.get("cgo", {}) all_branch_names = set(ls_branches.keys()) | set(cgo_branches.keys()) comparison = [] for branch in all_branch_names: ls = ls_branches.get(branch, {}) cgo = cgo_branches.get(branch, {}) comparison.append( { "branch": branch, "landside_airside": { "total_issues": ls.get("total_issues", 0), "risk_score": ls.get("risk_score", 0), "trend": ls.get("trend", "N/A"), } if ls else None, "cgo": { "total_issues": cgo.get("total_issues", 0), "risk_score": cgo.get("risk_score", 0), "trend": cgo.get("trend", "N/A"), } if cgo else None, "total_combined": ls.get("total_issues", 0) + cgo.get("total_issues", 0), } ) comparison.sort(key=lambda x: -x["total_combined"]) return { "last_updated": self.last_updated, "total_branches": len(all_branch_names), "landside_airside_only": len( [ b for b in all_branch_names if b in ls_branches and b not in cgo_branches ] ), "cgo_only": len( [ b for b in all_branch_names if b in cgo_branches and b not in ls_branches ] ), "both": len( [b for b in all_branch_names if b in ls_branches and b in cgo_branches] ), "branches": comparison, } def get_summary(self, category_type: str = None) -> Dict[str, Any]: """ Get overall summary of branch analytics Args: category_type: Optional filter by category type Returns: Summary statistics """ if category_type: branches = self.branch_data.get(category_type, {}) total_issues = sum(b.get("total_issues", 0) for b in branches.values()) avg_risk = ( np.mean([b.get("risk_score", 0) for b in branches.values()]) if branches else 0 ) risk_levels = Counter( b.get("risk_level", "Unknown") for b in branches.values() ) trends = Counter(b.get("trend", "Unknown") for b in branches.values()) return { "category_type": category_type, "total_branches": len(branches), "total_issues": total_issues, "avg_risk_score": round(avg_risk, 2), "risk_level_distribution": dict(risk_levels), "trend_distribution": dict(trends), "last_updated": self.last_updated, } ls_summary = self.get_summary("landside_airside") cgo_summary = self.get_summary("cgo") return { "landside_airside": ls_summary, "cgo": cgo_summary, "comparison": { "ls_total_issues": ls_summary.get("total_issues", 0), "cgo_total_issues": cgo_summary.get("total_issues", 0), "ls_avg_risk": ls_summary.get("avg_risk_score", 0), "cgo_avg_risk": cgo_summary.get("avg_risk_score", 0), }, "last_updated": self.last_updated, } _branch_analytics_service: Optional[BranchAnalyticsService] = None def get_branch_analytics_service() -> BranchAnalyticsService: """Get singleton instance""" global _branch_analytics_service if _branch_analytics_service is None: _branch_analytics_service = BranchAnalyticsService() return _branch_analytics_service