""" Category Summarization Service Provides aggregated summaries for Non-cargo and CGO categories """ import os import logging from typing import Dict, Any, List, Optional from collections import Counter from datetime import datetime, timedelta import pandas as pd logger = logging.getLogger(__name__) class CategorySummarizationService: """Service for generating category-based summaries""" def __init__(self): self.severity_keywords = { "critical": [ "emergency", "darurat", "critical", "kritis", "accident", "kecelakaan", "injury", "cedera", ], "high": [ "damage", "rusak", "torn", "robek", "broken", "pecah", "urgent", "mendesak", "lost", "hilang", "stolen", "dicuri", ], "medium": [ "delay", "terlambat", "wrong", "salah", "error", "kesalahan", "missing", "problem", "masalah", ], "low": ["minor", "kecil", "small", "sedikit", "normal", "biasa"], } self._data_cache = {} self._last_updated = None def summarize_category( self, data: List[Dict], category_type: str = "all" ) -> Dict[str, Any]: """ Generate summary for a specific category Args: data: List of records with _sheet_name field category_type: "non_cargo", "cgo", or "all" """ if category_type == "all": return { "non_cargo": self._summarize_single_category(data, "NON CARGO"), "cgo": self._summarize_single_category(data, "CGO"), "comparison": self._compare_categories(data), } elif category_type == "non_cargo": return self._summarize_single_category(data, "NON CARGO") elif category_type == "cgo": return self._summarize_single_category(data, "CGO") else: return {"error": f"Unknown category type: {category_type}"} def _summarize_single_category( self, data: List[Dict], sheet_name: str ) -> Dict[str, Any]: """Generate summary for a single category""" filtered_data = [r for r in data if r.get("_sheet_name") == sheet_name] if not filtered_data: return { "sheet_name": sheet_name, "total_records": 0, "message": "No data available for this category", } total_records = len(filtered_data) severity_dist = Counter() category_dist = Counter() airline_dist = Counter() hub_dist = Counter() branch_dist = Counter() area_dist = Counter() status_dist = Counter() issue_type_dist = Counter() root_cause_dist = Counter() monthly_trend = Counter() reports_text = [] root_causes_text = [] actions_text = [] for record in filtered_data: report_text = record.get("Report", "") or "" root_cause = record.get("Root_Caused", "") or "" combined = f"{report_text} {root_cause}".lower() severity = self._classify_severity(combined) severity_dist[severity] += 1 category = record.get("Irregularity_Complain_Category", "Unknown") category_dist[category] += 1 airline = record.get("Airlines", "Unknown") airline_dist[airline] += 1 hub = record.get("HUB", "Unknown") hub_dist[hub] += 1 branch = record.get("Branch", "Unknown") branch_dist[branch] += 1 area = record.get("Area", "Unknown") area_dist[area] += 1 status = record.get("Status", "Unknown") status_dist[status] += 1 if category and category != "Unknown": issue_type_dist[category] += 1 if root_cause: root_cause_dist[self._categorize_root_cause(root_cause)] += 1 date_str = record.get("Date_of_Event", "") if date_str: try: date_obj = pd.to_datetime(date_str, errors="coerce") if not pd.isna(date_obj): month_key = date_obj.strftime("%Y-%m") monthly_trend[month_key] += 1 except: pass if report_text: reports_text.append(report_text) if root_cause: root_causes_text.append(root_cause) action = record.get("Action_Taken", "") if action: actions_text.append(action) critical_high_count = severity_dist.get("Critical", 0) + severity_dist.get( "High", 0 ) critical_high_pct = ( round((critical_high_count / total_records) * 100, 1) if total_records > 0 else 0 ) open_count = status_dist.get("Open", 0) + status_dist.get("In Progress", 0) open_pct = ( round((open_count / total_records) * 100, 1) if total_records > 0 else 0 ) key_insights = self._generate_key_insights( sheet_name, total_records, severity_dist, category_dist, airline_dist, critical_high_pct, open_pct, ) common_issues = self._extract_common_issues(reports_text) return { "sheet_name": sheet_name, "total_records": total_records, "severity_distribution": dict(severity_dist), "critical_high_percentage": critical_high_pct, "open_issues_percentage": open_pct, "top_categories": dict(category_dist.most_common(5)), "top_airlines": dict(airline_dist.most_common(5)), "top_hubs": dict(hub_dist.most_common(5)), "top_branches": dict(branch_dist.most_common(5)), "area_distribution": dict(area_dist), "status_distribution": dict(status_dist), "root_cause_categories": dict(root_cause_dist.most_common(5)), "monthly_trend": dict(sorted(monthly_trend.items())[-6:]), "key_insights": key_insights, "common_issues": common_issues, "recommendations": self._generate_recommendations( severity_dist, category_dist, root_cause_dist ), "last_updated": datetime.now().isoformat(), } def _classify_severity(self, text: str) -> str: """Classify severity based on keywords""" text_lower = text.lower() for level, keywords in self.severity_keywords.items(): for kw in keywords: if kw in text_lower: return level.capitalize() return "Low" def _categorize_root_cause(self, root_cause: str) -> str: """Categorize root cause into categories""" rc_lower = root_cause.lower() categories = { "Equipment Failure": [ "equipment", "mesin", "alat", "tool", "machine", "device", "broken", "rusak", "malfunction", ], "Staff Competency": [ "staff", "staffing", "kompetensi", "skill", "training", "pelatihan", "human error", "kurang", ], "Process/Procedure": [ "procedure", "prosedur", "process", "proses", "sop", "workflow", "system", ], "Communication": [ "communication", "komunikasi", "informasi", "koordinasi", "coordination", "miscommunication", ], "External Factors": [ "weather", "cuaca", "external", "flight delay", "airline", "airline delay", "faktor luar", ], "Resource/Manpower": [ "manpower", "tenaga", "shortage", "kurang", "resource", "sumber daya", "lack of", ], "Documentation": [ "document", "dokumen", "paperwork", "paper", "label", "tag", "manifest", ], } for category, keywords in categories.items(): for kw in keywords: if kw in rc_lower: return category return "Other" def _generate_key_insights( self, sheet_name: str, total_records: int, severity_dist: Counter, category_dist: Counter, airline_dist: Counter, critical_high_pct: float, open_pct: float, ) -> List[str]: """Generate key insights from the data""" insights = [] category_label = "Non-Cargo" if sheet_name == "NON CARGO" else "Cargo" insights.append(f"Total {total_records} {category_label} reports analyzed") if critical_high_pct > 20: insights.append( f"High priority attention needed: {critical_high_pct}% Critical/High severity issues" ) elif critical_high_pct > 10: insights.append( f"Moderate concern: {critical_high_pct}% Critical/High severity issues" ) else: insights.append( f"Severity levels manageable: Only {critical_high_pct}% Critical/High severity" ) if open_pct > 30: insights.append(f"Action required: {open_pct}% issues still open/pending") top_category = category_dist.most_common(1) if top_category: insights.append( f"Most common issue type: {top_category[0][0]} ({top_category[0][1]} occurrences)" ) top_airline = airline_dist.most_common(1) if top_airline and top_airline[0][0] != "Unknown": insights.append( f"Highest reporting airline: {top_airline[0][0]} ({top_airline[0][1]} reports)" ) critical_count = severity_dist.get("Critical", 0) if critical_count > 0: insights.append( f"ATTENTION: {critical_count} Critical severity issues require immediate action" ) return insights def _extract_common_issues(self, reports: List[str]) -> List[Dict[str, Any]]: """Extract common issues from reports""" issue_keywords = { "Damage/Destruction": [ "damage", "rusak", "broken", "pecah", "torn", "robek", "destroyed", ], "Delay/Late": ["delay", "terlambat", "late", "telat", "waiting", "tunggu"], "Missing/Lost Items": [ "missing", "hilang", "lost", "not found", "tidak ada", ], "Documentation Error": [ "wrong document", "salah dokumen", "incorrect", "label error", "tag salah", ], "Handling Issue": [ "handling", "penanganan", "mishandled", "rough", "kasar", ], "Communication Issue": [ "communication", "komunikasi", "information", "informasi", "coordinate", ], "Equipment Problem": [ "equipment", "alat", "mesin", "machine", "device", "tool", ], "Security Concern": [ "security", "keamanan", "unauthorized", "access", "theft", "pencurian", ], } issue_counts = Counter() for report in reports: report_lower = report.lower() for issue_type, keywords in issue_keywords.items(): for kw in keywords: if kw in report_lower: issue_counts[issue_type] += 1 break return [ {"issue": issue, "count": count} for issue, count in issue_counts.most_common(8) ] def _generate_recommendations( self, severity_dist: Counter, category_dist: Counter, root_cause_dist: Counter ) -> List[str]: """Generate actionable recommendations""" recommendations = [] critical_count = severity_dist.get("Critical", 0) + severity_dist.get("High", 0) if critical_count > 10: recommendations.append( "Establish dedicated task force for critical/high severity issues" ) top_root_cause = root_cause_dist.most_common(1) if top_root_cause: rc = top_root_cause[0][0] if rc == "Equipment Failure": recommendations.append("Schedule preventive maintenance for equipment") elif rc == "Staff Competency": recommendations.append( "Implement refresher training programs for staff" ) elif rc == "Process/Procedure": recommendations.append( "Review and update standard operating procedures" ) elif rc == "Communication": recommendations.append( "Improve inter-department communication protocols" ) elif rc == "Resource/Manpower": recommendations.append( "Evaluate resource allocation and staffing levels" ) top_category = category_dist.most_common(1) if top_category: cat = top_category[0][0] if "damage" in cat.lower() or "broken" in cat.lower(): recommendations.append( "Implement enhanced handling protocols to reduce damage" ) elif "delay" in cat.lower(): recommendations.append("Optimize workflow to minimize delays") if not recommendations: recommendations.append( "Continue monitoring trends and maintain current performance" ) return recommendations def _compare_categories(self, data: List[Dict]) -> Dict[str, Any]: """Compare Non-cargo and CGO categories""" non_cargo_data = [r for r in data if r.get("_sheet_name") == "NON CARGO"] cgo_data = [r for r in data if r.get("_sheet_name") == "CGO"] non_cargo_summary = self._summarize_single_category(data, "NON CARGO") cgo_summary = self._summarize_single_category(data, "CGO") comparison = { "total_records": { "non_cargo": len(non_cargo_data), "cgo": len(cgo_data), "difference": len(non_cargo_data) - len(cgo_data), }, "critical_high_percentage": { "non_cargo": non_cargo_summary.get("critical_high_percentage", 0), "cgo": cgo_summary.get("critical_high_percentage", 0), }, "open_issues_percentage": { "non_cargo": non_cargo_summary.get("open_issues_percentage", 0), "cgo": cgo_summary.get("open_issues_percentage", 0), }, "top_categories_comparison": { "non_cargo": list(non_cargo_summary.get("top_categories", {}).keys())[ :3 ], "cgo": list(cgo_summary.get("top_categories", {}).keys())[:3], }, "severity_comparison": { "non_cargo": non_cargo_summary.get("severity_distribution", {}), "cgo": cgo_summary.get("severity_distribution", {}), }, "insights": [], } if len(non_cargo_data) > len(cgo_data): comparison["insights"].append( f"Non-Cargo has {len(non_cargo_data) - len(cgo_data)} more reports than Cargo" ) elif len(cgo_data) > len(non_cargo_data): comparison["insights"].append( f"Cargo has {len(cgo_data) - len(non_cargo_data)} more reports than Non-Cargo" ) nc_crit = non_cargo_summary.get("critical_high_percentage", 0) cgo_crit = cgo_summary.get("critical_high_percentage", 0) if nc_crit > cgo_crit + 5: comparison["insights"].append( "Non-Cargo has higher critical/high severity rate - needs attention" ) elif cgo_crit > nc_crit + 5: comparison["insights"].append( "Cargo has higher critical/high severity rate - needs attention" ) return comparison _service_instance = None def get_category_summarization_service() -> CategorySummarizationService: """Get or create singleton service instance""" global _service_instance if _service_instance is None: _service_instance = CategorySummarizationService() return _service_instance