Spaces:
Build error
Build error
| """ | |
| Branch Analytics Service for Gapura AI | |
| Comprehensive branch performance dashboard | |
| Separated by: Landside & Airside (Non-Cargo) and CGO (Cargo) | |
| """ | |
| import os | |
| import logging | |
| import pickle | |
| from typing import List, Dict, Any, Optional | |
| from collections import Counter, defaultdict | |
| from datetime import datetime | |
| import numpy as np | |
| logger = logging.getLogger(__name__) | |
| CATEGORY_TYPES = { | |
| "landside_airside": { | |
| "name": "Landside & Airside", | |
| "description": "Non-cargo operations (terminal, passenger, baggage handling)", | |
| "sheet": "NON CARGO", | |
| }, | |
| "cgo": {"name": "CGO", "description": "Cargo operations", "sheet": "CGO"}, | |
| } | |
| class BranchAnalyticsService: | |
| """ | |
| Branch performance analytics service | |
| Provides comprehensive metrics for each branch with category type separation | |
| """ | |
| def __init__(self): | |
| self.branch_data = {"landside_airside": {}, "cgo": {}} | |
| self.last_updated = None | |
| self._load_data() | |
| def _load_data(self): | |
| """Load cached branch data""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| data_path = os.path.join(base_dir, "models", "branch_analytics.pkl") | |
| if os.path.exists(data_path): | |
| try: | |
| with open(data_path, "rb") as f: | |
| cached = pickle.load(f) | |
| self.branch_data = cached.get("branch_data", self.branch_data) | |
| self.last_updated = cached.get("last_updated") | |
| logger.info("Branch analytics data loaded") | |
| except Exception as e: | |
| logger.warning(f"Failed to load branch analytics: {e}") | |
| def _save_data(self): | |
| """Save branch data to cache""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| data_path = os.path.join(base_dir, "models", "branch_analytics.pkl") | |
| os.makedirs(os.path.dirname(data_path), exist_ok=True) | |
| with open(data_path, "wb") as f: | |
| pickle.dump( | |
| { | |
| "branch_data": self.branch_data, | |
| "last_updated": self.last_updated, | |
| }, | |
| f, | |
| ) | |
| logger.info(f"Branch analytics saved to {data_path}") | |
| def _get_category_type(self, record: Dict) -> str: | |
| """Determine category type from record""" | |
| sheet = record.get("_sheet_name", record.get("_source_sheet", "")) | |
| if sheet == "NON CARGO": | |
| return "landside_airside" | |
| elif sheet == "CGO": | |
| return "cgo" | |
| return "landside_airside" | |
| def calculate_branch_metrics( | |
| self, records: List[Dict], category_type: str = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Calculate comprehensive metrics for branches | |
| Args: | |
| records: List of all records | |
| category_type: "landside_airside", "cgo", or None for both | |
| Returns: | |
| Dict with branch metrics | |
| """ | |
| logger.info(f"Calculating branch metrics for {len(records)} records...") | |
| if category_type: | |
| category_types = [category_type] | |
| else: | |
| category_types = ["landside_airside", "cgo"] | |
| for cat_type in category_types: | |
| self.branch_data[cat_type] = {} | |
| branch_aggregates = {ct: defaultdict(list) for ct in category_types} | |
| monthly_by_branch = { | |
| ct: defaultdict(lambda: defaultdict(int)) for ct in category_types | |
| } | |
| for record in records: | |
| ct = self._get_category_type(record) | |
| if ct not in category_types: | |
| continue | |
| branch = record.get("Branch", "Unknown") | |
| if not branch or branch == "Unknown": | |
| continue | |
| severity = self._extract_severity(record) | |
| date_str = record.get("Date_of_Event", "") | |
| record_info = { | |
| "severity": severity, | |
| "category": record.get("Irregularity_Complain_Category", "Unknown"), | |
| "airline": record.get("Airlines", "Unknown"), | |
| "hub": record.get("HUB", "Unknown"), | |
| "area": record.get("Area", "Unknown"), | |
| "status": record.get("Status", "Unknown"), | |
| "has_root_cause": bool(record.get("Root_Caused")), | |
| "has_action": bool(record.get("Action_Taken")), | |
| "date": date_str, | |
| } | |
| branch_aggregates[ct][branch].append(record_info) | |
| if date_str: | |
| try: | |
| import pandas as pd | |
| dt = pd.to_datetime(date_str, errors="coerce") | |
| if not pd.isna(dt): | |
| month_key = f"{dt.year}-{dt.month:02d}" | |
| monthly_by_branch[ct][branch][month_key] += 1 | |
| except: | |
| pass | |
| for ct in category_types: | |
| for branch, records_list in branch_aggregates[ct].items(): | |
| if not records_list: | |
| continue | |
| total = len(records_list) | |
| severity_dist = Counter(r["severity"] for r in records_list) | |
| category_dist = Counter(r["category"] for r in records_list) | |
| airline_dist = Counter(r["airline"] for r in records_list) | |
| status_dist = Counter(r["status"] for r in records_list) | |
| critical_high = severity_dist.get("Critical", 0) + severity_dist.get( | |
| "High", 0 | |
| ) | |
| with_root_cause = sum(1 for r in records_list if r["has_root_cause"]) | |
| with_action = sum(1 for r in records_list if r["has_action"]) | |
| months = monthly_by_branch[ct][branch] | |
| if months: | |
| month_values = list(months.values()) | |
| avg_monthly = np.mean(month_values) | |
| trend = self._calculate_trend(month_values) | |
| else: | |
| avg_monthly = total | |
| trend = "stable" | |
| risk_score = self._calculate_branch_risk( | |
| total, critical_high, with_root_cause, with_action, trend | |
| ) | |
| self.branch_data[ct][branch] = { | |
| "total_issues": total, | |
| "severity_distribution": dict(severity_dist), | |
| "top_categories": dict(category_dist.most_common(5)), | |
| "top_airlines": dict(airline_dist.most_common(5)), | |
| "status_distribution": dict(status_dist), | |
| "critical_high_count": critical_high, | |
| "critical_high_percentage": round(critical_high / total * 100, 1), | |
| "resolution_rate": round( | |
| status_dist.get("Closed", 0) / total * 100, 1 | |
| ), | |
| "documentation_rate": round( | |
| (with_root_cause + with_action) / (total * 2) * 100, 1 | |
| ), | |
| "avg_monthly_issues": round(avg_monthly, 1), | |
| "trend": trend, | |
| "risk_score": risk_score, | |
| "risk_level": self._get_risk_level(risk_score), | |
| "hubs": list(set(r["hub"] for r in records_list if r["hub"])), | |
| "areas": list(set(r["area"] for r in records_list if r["area"])), | |
| } | |
| self.last_updated = datetime.now().isoformat() | |
| self._save_data() | |
| return { | |
| "status": "success", | |
| "last_updated": self.last_updated, | |
| "category_types": { | |
| ct: {"branches": len(self.branch_data[ct])} for ct in category_types | |
| }, | |
| } | |
| def _extract_severity(self, record: Dict) -> str: | |
| """Extract severity from record text""" | |
| report = ( | |
| record.get("Report", "") + " " + record.get("Root_Caused", "") | |
| ).lower() | |
| critical_keywords = [ | |
| "darurat", | |
| "kritis", | |
| "emergency", | |
| "kecelakaan", | |
| "parah", | |
| "serius", | |
| ] | |
| high_keywords = ["rusak", "damage", "torn", "broken", "hilang", "lost"] | |
| medium_keywords = ["delay", "terlambat", "salah", "wrong", "error"] | |
| if any(kw in report for kw in critical_keywords): | |
| return "Critical" | |
| elif any(kw in report for kw in high_keywords): | |
| return "High" | |
| elif any(kw in report for kw in medium_keywords): | |
| return "Medium" | |
| return "Low" | |
| def _calculate_trend(self, values: List[int]) -> str: | |
| """Calculate trend direction from time series values""" | |
| if len(values) < 3: | |
| return "stable" | |
| recent = np.mean(values[-3:]) | |
| earlier = np.mean(values[:3]) | |
| if earlier == 0: | |
| return "stable" | |
| change_pct = (recent - earlier) / earlier * 100 | |
| if change_pct > 20: | |
| return "increasing" | |
| elif change_pct < -20: | |
| return "decreasing" | |
| return "stable" | |
| def _calculate_branch_risk( | |
| self, | |
| total: int, | |
| critical_high: int, | |
| with_root_cause: int, | |
| with_action: int, | |
| trend: str, | |
| ) -> float: | |
| """Calculate risk score for a branch""" | |
| volume_score = min(total / 50, 2.0) * 20 | |
| severity_score = (critical_high / max(total, 1)) * 30 | |
| doc_rate = (with_root_cause + with_action) / max(total * 2, 1) | |
| doc_score = (1 - doc_rate) * 25 | |
| trend_score = {"increasing": 25, "stable": 10, "decreasing": 0}.get(trend, 10) | |
| risk_score = volume_score + severity_score + doc_score + trend_score | |
| return round(min(100, max(0, risk_score)), 2) | |
| def _get_risk_level(self, score: float) -> str: | |
| """Convert risk score to level""" | |
| if score >= 70: | |
| return "Critical" | |
| elif score >= 50: | |
| return "High" | |
| elif score >= 30: | |
| return "Medium" | |
| return "Low" | |
| def get_branch( | |
| self, branch: str, category_type: str = None | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get metrics for a specific branch | |
| Args: | |
| branch: Branch name | |
| category_type: "landside_airside", "cgo", or None for combined | |
| Returns: | |
| Branch metrics or None if not found | |
| """ | |
| if category_type: | |
| return self.branch_data.get(category_type, {}).get(branch) | |
| ls_data = self.branch_data.get("landside_airside", {}).get(branch, {}) | |
| cgo_data = self.branch_data.get("cgo", {}).get(branch, {}) | |
| if not ls_data and not cgo_data: | |
| return None | |
| if ls_data and not cgo_data: | |
| ls_data["category_type"] = "landside_airside" | |
| return ls_data | |
| if cgo_data and not ls_data: | |
| cgo_data["category_type"] = "cgo" | |
| return cgo_data | |
| combined = { | |
| "branch": branch, | |
| "total_issues": ls_data.get("total_issues", 0) | |
| + cgo_data.get("total_issues", 0), | |
| "landside_airside": ls_data, | |
| "cgo": cgo_data, | |
| "combined_risk_score": round( | |
| (ls_data.get("risk_score", 0) + cgo_data.get("risk_score", 0)) / 2, 2 | |
| ), | |
| } | |
| return combined | |
| def get_ranking( | |
| self, | |
| category_type: str = None, | |
| sort_by: str = "risk_score", | |
| limit: int = 20, | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Get branch ranking | |
| Args: | |
| category_type: "landside_airside", "cgo", or None for both | |
| sort_by: Field to sort by (risk_score, total_issues, critical_high_count) | |
| limit: Maximum branches to return | |
| Returns: | |
| List of branch metrics sorted by specified field | |
| """ | |
| if category_type: | |
| branches = self.branch_data.get(category_type, {}) | |
| ranking = [ | |
| {"branch": name, "category_type": category_type, **data} | |
| for name, data in branches.items() | |
| ] | |
| else: | |
| ranking = [] | |
| for ct in ["landside_airside", "cgo"]: | |
| for name, data in self.branch_data.get(ct, {}).items(): | |
| ranking.append({"branch": name, "category_type": ct, **data}) | |
| reverse_sort = sort_by in ["risk_score", "total_issues", "critical_high_count"] | |
| ranking.sort(key=lambda x: x.get(sort_by, 0), reverse=reverse_sort) | |
| return ranking[:limit] | |
| def get_comparison(self) -> Dict[str, Any]: | |
| """ | |
| Compare all branches across category types | |
| Returns: | |
| Dict with comparison data | |
| """ | |
| ls_branches = self.branch_data.get("landside_airside", {}) | |
| cgo_branches = self.branch_data.get("cgo", {}) | |
| all_branch_names = set(ls_branches.keys()) | set(cgo_branches.keys()) | |
| comparison = [] | |
| for branch in all_branch_names: | |
| ls = ls_branches.get(branch, {}) | |
| cgo = cgo_branches.get(branch, {}) | |
| comparison.append( | |
| { | |
| "branch": branch, | |
| "landside_airside": { | |
| "total_issues": ls.get("total_issues", 0), | |
| "risk_score": ls.get("risk_score", 0), | |
| "trend": ls.get("trend", "N/A"), | |
| } | |
| if ls | |
| else None, | |
| "cgo": { | |
| "total_issues": cgo.get("total_issues", 0), | |
| "risk_score": cgo.get("risk_score", 0), | |
| "trend": cgo.get("trend", "N/A"), | |
| } | |
| if cgo | |
| else None, | |
| "total_combined": ls.get("total_issues", 0) | |
| + cgo.get("total_issues", 0), | |
| } | |
| ) | |
| comparison.sort(key=lambda x: -x["total_combined"]) | |
| return { | |
| "last_updated": self.last_updated, | |
| "total_branches": len(all_branch_names), | |
| "landside_airside_only": len( | |
| [ | |
| b | |
| for b in all_branch_names | |
| if b in ls_branches and b not in cgo_branches | |
| ] | |
| ), | |
| "cgo_only": len( | |
| [ | |
| b | |
| for b in all_branch_names | |
| if b in cgo_branches and b not in ls_branches | |
| ] | |
| ), | |
| "both": len( | |
| [b for b in all_branch_names if b in ls_branches and b in cgo_branches] | |
| ), | |
| "branches": comparison, | |
| } | |
| def get_summary(self, category_type: str = None) -> Dict[str, Any]: | |
| """ | |
| Get overall summary of branch analytics | |
| Args: | |
| category_type: Optional filter by category type | |
| Returns: | |
| Summary statistics | |
| """ | |
| if category_type: | |
| branches = self.branch_data.get(category_type, {}) | |
| total_issues = sum(b.get("total_issues", 0) for b in branches.values()) | |
| avg_risk = ( | |
| np.mean([b.get("risk_score", 0) for b in branches.values()]) | |
| if branches | |
| else 0 | |
| ) | |
| risk_levels = Counter( | |
| b.get("risk_level", "Unknown") for b in branches.values() | |
| ) | |
| trends = Counter(b.get("trend", "Unknown") for b in branches.values()) | |
| return { | |
| "category_type": category_type, | |
| "total_branches": len(branches), | |
| "total_issues": total_issues, | |
| "avg_risk_score": round(avg_risk, 2), | |
| "risk_level_distribution": dict(risk_levels), | |
| "trend_distribution": dict(trends), | |
| "last_updated": self.last_updated, | |
| } | |
| ls_summary = self.get_summary("landside_airside") | |
| cgo_summary = self.get_summary("cgo") | |
| return { | |
| "landside_airside": ls_summary, | |
| "cgo": cgo_summary, | |
| "comparison": { | |
| "ls_total_issues": ls_summary.get("total_issues", 0), | |
| "cgo_total_issues": cgo_summary.get("total_issues", 0), | |
| "ls_avg_risk": ls_summary.get("avg_risk_score", 0), | |
| "cgo_avg_risk": cgo_summary.get("avg_risk_score", 0), | |
| }, | |
| "last_updated": self.last_updated, | |
| } | |
| _branch_analytics_service: Optional[BranchAnalyticsService] = None | |
| def get_branch_analytics_service() -> BranchAnalyticsService: | |
| """Get singleton instance""" | |
| global _branch_analytics_service | |
| if _branch_analytics_service is None: | |
| _branch_analytics_service = BranchAnalyticsService() | |
| return _branch_analytics_service | |