gapura-ai-api / data /category_summarization_service.py
Muhammad Ridzki Nugraha
Upload folder using huggingface_hub
13c3f2c verified
"""
Category Summarization Service
Provides aggregated summaries for Non-cargo and CGO categories
"""
import os
import logging
from typing import Dict, Any, List, Optional
from collections import Counter
from datetime import datetime, timedelta
import pandas as pd
logger = logging.getLogger(__name__)
class CategorySummarizationService:
"""Service for generating category-based summaries"""
def __init__(self):
self.severity_keywords = {
"critical": [
"emergency",
"darurat",
"critical",
"kritis",
"accident",
"kecelakaan",
"injury",
"cedera",
],
"high": [
"damage",
"rusak",
"torn",
"robek",
"broken",
"pecah",
"urgent",
"mendesak",
"lost",
"hilang",
"stolen",
"dicuri",
],
"medium": [
"delay",
"terlambat",
"wrong",
"salah",
"error",
"kesalahan",
"missing",
"problem",
"masalah",
],
"low": ["minor", "kecil", "small", "sedikit", "normal", "biasa"],
}
self._data_cache = {}
self._last_updated = None
def summarize_category(
self, data: List[Dict], category_type: str = "all"
) -> Dict[str, Any]:
"""
Generate summary for a specific category
Args:
data: List of records with _sheet_name field
category_type: "non_cargo", "cgo", or "all"
"""
if category_type == "all":
return {
"non_cargo": self._summarize_single_category(data, "NON CARGO"),
"cgo": self._summarize_single_category(data, "CGO"),
"comparison": self._compare_categories(data),
}
elif category_type == "non_cargo":
return self._summarize_single_category(data, "NON CARGO")
elif category_type == "cgo":
return self._summarize_single_category(data, "CGO")
else:
return {"error": f"Unknown category type: {category_type}"}
def _summarize_single_category(
self, data: List[Dict], sheet_name: str
) -> Dict[str, Any]:
"""Generate summary for a single category"""
filtered_data = [r for r in data if r.get("_sheet_name") == sheet_name]
if not filtered_data:
return {
"sheet_name": sheet_name,
"total_records": 0,
"message": "No data available for this category",
}
total_records = len(filtered_data)
severity_dist = Counter()
category_dist = Counter()
airline_dist = Counter()
hub_dist = Counter()
branch_dist = Counter()
area_dist = Counter()
status_dist = Counter()
issue_type_dist = Counter()
root_cause_dist = Counter()
monthly_trend = Counter()
reports_text = []
root_causes_text = []
actions_text = []
for record in filtered_data:
report_text = record.get("Report", "") or ""
root_cause = record.get("Root_Caused", "") or ""
combined = f"{report_text} {root_cause}".lower()
severity = self._classify_severity(combined)
severity_dist[severity] += 1
category = record.get("Irregularity_Complain_Category", "Unknown")
category_dist[category] += 1
airline = record.get("Airlines", "Unknown")
airline_dist[airline] += 1
hub = record.get("HUB", "Unknown")
hub_dist[hub] += 1
branch = record.get("Branch", "Unknown")
branch_dist[branch] += 1
area = record.get("Area", "Unknown")
area_dist[area] += 1
status = record.get("Status", "Unknown")
status_dist[status] += 1
if category and category != "Unknown":
issue_type_dist[category] += 1
if root_cause:
root_cause_dist[self._categorize_root_cause(root_cause)] += 1
date_str = record.get("Date_of_Event", "")
if date_str:
try:
date_obj = pd.to_datetime(date_str, errors="coerce")
if not pd.isna(date_obj):
month_key = date_obj.strftime("%Y-%m")
monthly_trend[month_key] += 1
except:
pass
if report_text:
reports_text.append(report_text)
if root_cause:
root_causes_text.append(root_cause)
action = record.get("Action_Taken", "")
if action:
actions_text.append(action)
critical_high_count = severity_dist.get("Critical", 0) + severity_dist.get(
"High", 0
)
critical_high_pct = (
round((critical_high_count / total_records) * 100, 1)
if total_records > 0
else 0
)
open_count = status_dist.get("Open", 0) + status_dist.get("In Progress", 0)
open_pct = (
round((open_count / total_records) * 100, 1) if total_records > 0 else 0
)
key_insights = self._generate_key_insights(
sheet_name,
total_records,
severity_dist,
category_dist,
airline_dist,
critical_high_pct,
open_pct,
)
common_issues = self._extract_common_issues(reports_text)
return {
"sheet_name": sheet_name,
"total_records": total_records,
"severity_distribution": dict(severity_dist),
"critical_high_percentage": critical_high_pct,
"open_issues_percentage": open_pct,
"top_categories": dict(category_dist.most_common(5)),
"top_airlines": dict(airline_dist.most_common(5)),
"top_hubs": dict(hub_dist.most_common(5)),
"top_branches": dict(branch_dist.most_common(5)),
"area_distribution": dict(area_dist),
"status_distribution": dict(status_dist),
"root_cause_categories": dict(root_cause_dist.most_common(5)),
"monthly_trend": dict(sorted(monthly_trend.items())[-6:]),
"key_insights": key_insights,
"common_issues": common_issues,
"recommendations": self._generate_recommendations(
severity_dist, category_dist, root_cause_dist
),
"last_updated": datetime.now().isoformat(),
}
def _classify_severity(self, text: str) -> str:
"""Classify severity based on keywords"""
text_lower = text.lower()
for level, keywords in self.severity_keywords.items():
for kw in keywords:
if kw in text_lower:
return level.capitalize()
return "Low"
def _categorize_root_cause(self, root_cause: str) -> str:
"""Categorize root cause into categories"""
rc_lower = root_cause.lower()
categories = {
"Equipment Failure": [
"equipment",
"mesin",
"alat",
"tool",
"machine",
"device",
"broken",
"rusak",
"malfunction",
],
"Staff Competency": [
"staff",
"staffing",
"kompetensi",
"skill",
"training",
"pelatihan",
"human error",
"kurang",
],
"Process/Procedure": [
"procedure",
"prosedur",
"process",
"proses",
"sop",
"workflow",
"system",
],
"Communication": [
"communication",
"komunikasi",
"informasi",
"koordinasi",
"coordination",
"miscommunication",
],
"External Factors": [
"weather",
"cuaca",
"external",
"flight delay",
"airline",
"airline delay",
"faktor luar",
],
"Resource/Manpower": [
"manpower",
"tenaga",
"shortage",
"kurang",
"resource",
"sumber daya",
"lack of",
],
"Documentation": [
"document",
"dokumen",
"paperwork",
"paper",
"label",
"tag",
"manifest",
],
}
for category, keywords in categories.items():
for kw in keywords:
if kw in rc_lower:
return category
return "Other"
def _generate_key_insights(
self,
sheet_name: str,
total_records: int,
severity_dist: Counter,
category_dist: Counter,
airline_dist: Counter,
critical_high_pct: float,
open_pct: float,
) -> List[str]:
"""Generate key insights from the data"""
insights = []
category_label = "Non-Cargo" if sheet_name == "NON CARGO" else "Cargo"
insights.append(f"Total {total_records} {category_label} reports analyzed")
if critical_high_pct > 20:
insights.append(
f"High priority attention needed: {critical_high_pct}% Critical/High severity issues"
)
elif critical_high_pct > 10:
insights.append(
f"Moderate concern: {critical_high_pct}% Critical/High severity issues"
)
else:
insights.append(
f"Severity levels manageable: Only {critical_high_pct}% Critical/High severity"
)
if open_pct > 30:
insights.append(f"Action required: {open_pct}% issues still open/pending")
top_category = category_dist.most_common(1)
if top_category:
insights.append(
f"Most common issue type: {top_category[0][0]} ({top_category[0][1]} occurrences)"
)
top_airline = airline_dist.most_common(1)
if top_airline and top_airline[0][0] != "Unknown":
insights.append(
f"Highest reporting airline: {top_airline[0][0]} ({top_airline[0][1]} reports)"
)
critical_count = severity_dist.get("Critical", 0)
if critical_count > 0:
insights.append(
f"ATTENTION: {critical_count} Critical severity issues require immediate action"
)
return insights
def _extract_common_issues(self, reports: List[str]) -> List[Dict[str, Any]]:
"""Extract common issues from reports"""
issue_keywords = {
"Damage/Destruction": [
"damage",
"rusak",
"broken",
"pecah",
"torn",
"robek",
"destroyed",
],
"Delay/Late": ["delay", "terlambat", "late", "telat", "waiting", "tunggu"],
"Missing/Lost Items": [
"missing",
"hilang",
"lost",
"not found",
"tidak ada",
],
"Documentation Error": [
"wrong document",
"salah dokumen",
"incorrect",
"label error",
"tag salah",
],
"Handling Issue": [
"handling",
"penanganan",
"mishandled",
"rough",
"kasar",
],
"Communication Issue": [
"communication",
"komunikasi",
"information",
"informasi",
"coordinate",
],
"Equipment Problem": [
"equipment",
"alat",
"mesin",
"machine",
"device",
"tool",
],
"Security Concern": [
"security",
"keamanan",
"unauthorized",
"access",
"theft",
"pencurian",
],
}
issue_counts = Counter()
for report in reports:
report_lower = report.lower()
for issue_type, keywords in issue_keywords.items():
for kw in keywords:
if kw in report_lower:
issue_counts[issue_type] += 1
break
return [
{"issue": issue, "count": count}
for issue, count in issue_counts.most_common(8)
]
def _generate_recommendations(
self, severity_dist: Counter, category_dist: Counter, root_cause_dist: Counter
) -> List[str]:
"""Generate actionable recommendations"""
recommendations = []
critical_count = severity_dist.get("Critical", 0) + severity_dist.get("High", 0)
if critical_count > 10:
recommendations.append(
"Establish dedicated task force for critical/high severity issues"
)
top_root_cause = root_cause_dist.most_common(1)
if top_root_cause:
rc = top_root_cause[0][0]
if rc == "Equipment Failure":
recommendations.append("Schedule preventive maintenance for equipment")
elif rc == "Staff Competency":
recommendations.append(
"Implement refresher training programs for staff"
)
elif rc == "Process/Procedure":
recommendations.append(
"Review and update standard operating procedures"
)
elif rc == "Communication":
recommendations.append(
"Improve inter-department communication protocols"
)
elif rc == "Resource/Manpower":
recommendations.append(
"Evaluate resource allocation and staffing levels"
)
top_category = category_dist.most_common(1)
if top_category:
cat = top_category[0][0]
if "damage" in cat.lower() or "broken" in cat.lower():
recommendations.append(
"Implement enhanced handling protocols to reduce damage"
)
elif "delay" in cat.lower():
recommendations.append("Optimize workflow to minimize delays")
if not recommendations:
recommendations.append(
"Continue monitoring trends and maintain current performance"
)
return recommendations
def _compare_categories(self, data: List[Dict]) -> Dict[str, Any]:
"""Compare Non-cargo and CGO categories"""
non_cargo_data = [r for r in data if r.get("_sheet_name") == "NON CARGO"]
cgo_data = [r for r in data if r.get("_sheet_name") == "CGO"]
non_cargo_summary = self._summarize_single_category(data, "NON CARGO")
cgo_summary = self._summarize_single_category(data, "CGO")
comparison = {
"total_records": {
"non_cargo": len(non_cargo_data),
"cgo": len(cgo_data),
"difference": len(non_cargo_data) - len(cgo_data),
},
"critical_high_percentage": {
"non_cargo": non_cargo_summary.get("critical_high_percentage", 0),
"cgo": cgo_summary.get("critical_high_percentage", 0),
},
"open_issues_percentage": {
"non_cargo": non_cargo_summary.get("open_issues_percentage", 0),
"cgo": cgo_summary.get("open_issues_percentage", 0),
},
"top_categories_comparison": {
"non_cargo": list(non_cargo_summary.get("top_categories", {}).keys())[
:3
],
"cgo": list(cgo_summary.get("top_categories", {}).keys())[:3],
},
"severity_comparison": {
"non_cargo": non_cargo_summary.get("severity_distribution", {}),
"cgo": cgo_summary.get("severity_distribution", {}),
},
"insights": [],
}
if len(non_cargo_data) > len(cgo_data):
comparison["insights"].append(
f"Non-Cargo has {len(non_cargo_data) - len(cgo_data)} more reports than Cargo"
)
elif len(cgo_data) > len(non_cargo_data):
comparison["insights"].append(
f"Cargo has {len(cgo_data) - len(non_cargo_data)} more reports than Non-Cargo"
)
nc_crit = non_cargo_summary.get("critical_high_percentage", 0)
cgo_crit = cgo_summary.get("critical_high_percentage", 0)
if nc_crit > cgo_crit + 5:
comparison["insights"].append(
"Non-Cargo has higher critical/high severity rate - needs attention"
)
elif cgo_crit > nc_crit + 5:
comparison["insights"].append(
"Cargo has higher critical/high severity rate - needs attention"
)
return comparison
_service_instance = None
def get_category_summarization_service() -> CategorySummarizationService:
"""Get or create singleton service instance"""
global _service_instance
if _service_instance is None:
_service_instance = CategorySummarizationService()
return _service_instance