Muhammad Ridzki Nugraha
Deploy API and config (Batch 3)
07476a1 verified
"""
Gapura AI Analysis API
FastAPI server for regression and NLP analysis of irregularity reports
Uses real trained models from ai-model/models/
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, field_validator
from pydantic_core import ValidationError
from typing import List, Optional, Dict, Any, Tuple
from collections import Counter
import os
import json
import logging
from datetime import datetime
import numpy as np
import pickle
import pandas as pd
import sys
from sklearn.preprocessing import LabelEncoder, StandardScaler, LabelBinarizer
from sklearn.metrics import roc_curve, roc_auc_score, brier_score_loss
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import io, base64
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from data.cache_service import get_cache, CacheService
from data.nlp_service import NLPModelService
from data.shap_service import get_shap_explainer
from data.anomaly_service import get_anomaly_detector
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
tags_metadata = [
{
"name": "Analysis",
"description": "Core AI analysis endpoints for irregularity reports.",
},
{
"name": "Health",
"description": "System health and model status checks.",
},
{
"name": "Jobs",
"description": "Asynchronous job management.",
},
{
"name": "Training",
"description": "Model retraining and lifecycle management.",
},
]
app = FastAPI(
title="Gapura AI Analysis API",
description="""
Gapura AI Analysis API provides advanced machine learning capabilities for analyzing irregularity reports.
## Features
* **Regression Analysis**: Predict resolution time (days) based on report details.
* **NLP Classification**: Determine severity (Critical, High, Medium, Low) and categorize issues.
* **Entity Extraction**: Extract key entities like Airlines, Flight Numbers, and Dates.
* **Summarization**: Generate executive summaries and key points from long reports.
* **Trend Analysis**: Analyze trends by Airline, Hub, and Category.
* **Anomaly Detection**: Identify unusual patterns in resolution times.
## Models
* **Regression**: Random Forest Regressor (v1.0.0-trained)
* **NLP**: Hybrid Transformer + Rule-based System (v4.0.0-onnx)
""",
version="2.1.0",
openapi_tags=tags_metadata,
docs_url="/docs",
redoc_url="/redoc",
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(GZipMiddleware, minimum_size=500)
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={
"detail": "Validation error",
"errors": exc.errors(),
"body": exc.json(),
},
)
# ============== Pydantic Models ==============
from enum import Enum
from datetime import date as date_type
class ReportCategoryEnum(str, Enum):
IRREGULARITY = "Irregularity"
COMPLAINT = "Complaint"
class AreaEnum(str, Enum):
APRON = "Apron Area"
TERMINAL = "Terminal Area"
GENERAL = "General"
class StatusEnum(str, Enum):
OPEN = "Open"
CLOSED = "Closed"
IN_PROGRESS = "In Progress"
class IrregularityReport(BaseModel):
Date_of_Event: Optional[str] = Field(None, description="Date of the event")
Airlines: Optional[str] = Field(None, max_length=100)
Flight_Number: Optional[str] = Field(None, max_length=20)
Branch: Optional[str] = Field(None, max_length=10)
HUB: Optional[str] = Field(None, max_length=20)
Route: Optional[str] = Field(None, max_length=50)
Report_Category: Optional[str] = Field(None, max_length=50)
Irregularity_Complain_Category: Optional[str] = Field(None, max_length=100)
Report: Optional[str] = Field(None, max_length=2000)
Root_Caused: Optional[str] = Field(None, max_length=2000)
Action_Taken: Optional[str] = Field(None, max_length=2000)
Area: Optional[str] = Field(None, max_length=50)
Status: Optional[str] = Field(None, max_length=50)
Reported_By: Optional[str] = Field(None, max_length=100)
Upload_Irregularity_Photo: Optional[str] = Field(None)
model_config = {"extra": "allow"}
class AnalysisOptions(BaseModel):
predictResolutionTime: bool = Field(
default=True, description="Run regression model"
)
classifySeverity: bool = Field(
default=True, description="Classify severity using NLP"
)
extractEntities: bool = Field(
default=True, description="Extract entities using NER"
)
generateSummary: bool = Field(default=True, description="Generate text summaries")
analyzeTrends: bool = Field(default=True, description="Analyze trends")
bypassCache: bool = Field(
default=False, description="Bypass cache and fetch fresh data"
)
includeRisk: bool = Field(default=False, description="Include risk assessment in analysis")
class AnalysisRequest(BaseModel):
sheetId: Optional[str] = Field(None, description="Google Sheet ID")
sheetName: Optional[str] = Field(None, description="Sheet name (NON CARGO or CGO)")
rowRange: Optional[str] = Field(None, description="Row range (e.g., A2:Z100)")
data: Optional[List[IrregularityReport]] = Field(
None, description="Direct data upload"
)
options: AnalysisOptions = Field(default_factory=AnalysisOptions)
@field_validator("data")
@classmethod
def validate_data(cls, v):
if v is not None and len(v) == 0:
raise ValueError("data array cannot be empty")
return v
class ShapExplanation(BaseModel):
baseValue: float = Field(description="Base/expected value from model")
predictionExplained: bool = Field(
description="Whether SHAP explanation is available"
)
topFactors: List[Dict[str, Any]] = Field(
default_factory=list, description="Top contributing features"
)
explanation: str = Field(default="", description="Human-readable explanation")
class AnomalyResult(BaseModel):
isAnomaly: bool = Field(description="Whether prediction is anomalous")
anomalyScore: float = Field(description="Anomaly score (0-1)")
anomalies: List[Dict[str, Any]] = Field(
default_factory=list, description="List of detected anomalies"
)
class RegressionPrediction(BaseModel):
reportId: str
predictedDays: float
confidenceInterval: Tuple[float, float]
featureImportance: Dict[str, float]
hasUnknownCategories: bool = Field(
default=False, description="True if unknown categories were used in prediction"
)
shapExplanation: Optional[ShapExplanation] = Field(
default=None, description="SHAP-based explanation for prediction"
)
anomalyDetection: Optional[AnomalyResult] = Field(
default=None, description="Anomaly detection results"
)
class RegressionResult(BaseModel):
predictions: List[RegressionPrediction]
modelMetrics: Dict[str, Any]
class ClassificationResult(BaseModel):
reportId: str
severity: str
severityConfidence: float
areaType: str
issueType: str
issueTypeConfidence: float
class Entity(BaseModel):
text: str
label: str
start: int
end: int
confidence: float
class EntityResult(BaseModel):
reportId: str
entities: List[Entity]
class SummaryResult(BaseModel):
reportId: str
executiveSummary: str
keyPoints: List[str]
class SentimentResult(BaseModel):
reportId: str
urgencyScore: float
sentiment: str
keywords: List[str]
class NLPResult(BaseModel):
classifications: List[ClassificationResult]
entities: List[EntityResult]
summaries: List[SummaryResult]
sentiment: List[SentimentResult]
class TrendData(BaseModel):
count: int
avgResolutionDays: Optional[float]
topIssues: List[str]
class TrendResult(BaseModel):
byAirline: Dict[str, TrendData]
byHub: Dict[str, TrendData]
byCategory: Dict[str, Dict[str, Any]]
timeSeries: List[Dict[str, Any]]
class Metadata(BaseModel):
totalRecords: int
processingTime: float
modelVersions: Dict[str, str]
class AnalysisResponse(BaseModel):
regression: Optional[RegressionResult] = None
nlp: Optional[NLPResult] = None
trends: Optional[TrendResult] = None
risk: Optional["RiskAssessmentResponse"] = None
metadata: Metadata
class RiskItem(BaseModel):
reportId: str
severity: str
severityConfidence: float
predictedDays: float
anomalyScore: float
category: str
hub: str
area: str
riskScore: float
priority: str
recommendedActions: List[Dict[str, Any]] = Field(default_factory=list)
preventiveSuggestions: List[str] = Field(default_factory=list)
class RiskAssessmentResponse(BaseModel):
items: List[RiskItem]
topPatterns: List[Dict[str, Any]]
metadata: Dict[str, Any]
AnalysisResponse.model_rebuild()
def _severity_to_score(level: str) -> float:
m = {"Critical": 1.0, "High": 0.8, "Medium": 0.5, "Low": 0.2}
return m.get(level, 0.3)
def _normalize_days(d: float) -> float:
return max(0.0, min(1.0, float(d) / 7.0))
def _priority_from_score(s: float) -> str:
if s >= 0.75:
return "HIGH"
if s >= 0.45:
return "MEDIUM"
return "LOW"
def _extract_prevention(texts: List[str]) -> List[str]:
kws = ["review", "prosedur", "procedure", "training", "pelatihan", "prevent", "pencegahan", "maintenance", "inspection", "inspeksi", "briefing", "supervision", "checklist", "verify", "verifikasi"]
out = []
seen = set()
for t in texts:
lt = t.lower()
for k in kws:
if k in lt:
if t not in seen:
seen.add(t)
out.append(t)
return out[:5]
# ============== Real Model Service ==============
class ModelService:
"""Service that loads and uses real trained models"""
def __init__(self):
self.regression_version = "1.0.0-trained"
self.nlp_version = "1.0.0-mock"
self.regression_model = None
self.regression_onnx_session = None
self.label_encoders = {}
self.scaler = None
self.feature_names = []
self.model_metrics = {}
self.model_loaded = False
self.nlp_service = None
self._load_regression_model()
self._load_nlp_service()
def _resolve_model_dir(self, subdir: str) -> str:
rid = os.getenv("REGRESSION_MODEL_REPO_ID") or os.getenv("MODEL_REPO_ID")
if rid:
try:
from huggingface_hub import snapshot_download
cache_dir = snapshot_download(repo_id=rid)
# Prefer models/<subdir>, then <subdir>, else cache root
p_models = os.path.join(cache_dir, "models", subdir)
if os.path.isdir(p_models):
return p_models
p_direct = os.path.join(cache_dir, subdir)
if os.path.isdir(p_direct):
return p_direct
return cache_dir
except Exception:
pass
return os.path.join(os.path.dirname(__file__), "..", "models", subdir)
def _load_nlp_service(self):
"""Load NLP service with trained models or fallback"""
try:
from data.nlp_service import get_nlp_service
self.nlp_service = get_nlp_service()
self.nlp_version = self.nlp_service.version
logger.info(f"NLP service loaded (version: {self.nlp_version})")
except Exception as e:
logger.warning(f"Failed to load NLP service: {e}")
def _load_regression_model(self):
"""Load the trained regression model from file"""
try:
if self._try_load_regression_from_artifacts():
return
base_dir = self._resolve_model_dir("regression")
model_path = os.path.join(base_dir, "resolution_predictor_latest.pkl")
if not os.path.exists(model_path):
logger.warning(f"Model file not found at {model_path}")
if self._try_load_regression_from_artifacts():
return
return
logger.info(f"Loading regression model from {model_path}")
with open(model_path, "rb") as f:
model_data = pickle.load(f)
self.regression_model = model_data.get("model")
self.label_encoders = model_data.get("label_encoders", {})
self.scaler = model_data.get("scaler")
self.feature_names = model_data.get("feature_names", [])
self.model_metrics = model_data.get("metrics", {})
self.model_loaded = True
logger.info(f"✓ Regression model loaded successfully")
logger.info(f" - Features: {len(self.feature_names)}")
logger.info(f" - Metrics: MAE={self.model_metrics.get('test_mae', 'N/A')}")
# Try to load ONNX model for faster inference
onnx_path = os.path.join(base_dir, "resolution_predictor.onnx")
if os.path.exists(onnx_path):
try:
import onnxruntime as ort
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = 1
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
self.regression_onnx_session = ort.InferenceSession(onnx_path, sess_options)
logger.info("✓ Regression ONNX model loaded successfully")
except Exception as e:
logger.warning(f"Failed to load Regression ONNX model: {e}")
except Exception as e:
logger.error(f"Failed to load regression model: {e}")
# Fallback to artifact-based loading (XGBoost UBJ + JSON)
if not self._try_load_regression_from_artifacts():
self.model_loaded = False
def _try_load_regression_from_artifacts(self) -> bool:
"""Attempt to load regression model from version-stable artifacts"""
try:
base_dir = self._resolve_model_dir("regression")
ubj_path = os.path.join(base_dir, "resolution_predictor.ubj")
enc_path = os.path.join(base_dir, "label_encoders.json")
scaler_path = os.path.join(base_dir, "scaler.json")
feat_path = os.path.join(base_dir, "feature_names.json")
metrics_path = os.path.join(base_dir, "resolution_predictor_latest_metrics.json")
if not os.path.exists(feat_path):
# Backcompat: try metrics sidecar saved by training
alt_feat_path = os.path.join(base_dir, "feature_names.json")
if os.path.exists(alt_feat_path):
feat_path = alt_feat_path
if not os.path.exists(ubj_path):
return False
try:
import xgboost as xgb
model = xgb.XGBRegressor()
model.load_model(ubj_path)
self.regression_model = model
except Exception as e:
logger.warning(f"Failed to load XGBoost UBJ: {e}")
return False
if os.path.exists(enc_path):
try:
with open(enc_path, "r") as f:
enc_map = json.load(f)
self.label_encoders = {}
for name, classes in enc_map.items():
le = LabelEncoder()
le.classes_ = np.array(classes, dtype=object)
self.label_encoders[name] = le
except Exception as e:
logger.warning(f"Failed to load label encoders json: {e}")
self.label_encoders = {}
if os.path.exists(scaler_path):
try:
with open(scaler_path, "r") as f:
scb = json.load(f)
sc = StandardScaler(with_mean=scb.get("with_mean", True), with_std=scb.get("with_std", True))
if "mean_" in scb:
sc.mean_ = np.array(scb["mean_"], dtype=float)
if "scale_" in scb:
sc.scale_ = np.array(scb["scale_"], dtype=float)
if "var_" in scb:
sc.var_ = np.array(scb["var_"], dtype=float)
n_features = len(scb.get("mean_", [])) or len(scb.get("scale_", [])) or 0
if n_features:
sc.n_features_in_ = n_features
self.scaler = sc
except Exception as e:
logger.warning(f"Failed to load scaler json: {e}")
self.scaler = None
if os.path.exists(feat_path):
try:
with open(feat_path, "r") as f:
self.feature_names = json.load(f)
except Exception as e:
logger.warning(f"Failed to load feature names: {e}")
self.feature_names = []
if os.path.exists(metrics_path):
try:
with open(metrics_path, "r") as f:
self.model_metrics = json.load(f)
except Exception:
self.model_metrics = {}
self.model_loaded = True
logger.info("✓ Regression model loaded from UBJ/JSON artifacts")
return True
except Exception as e:
logger.warning(f"Artifact-based regression load failed: {e}")
return False
def _extract_features(self, report: Dict) -> Optional[np.ndarray]:
"""Extract features from a single report matching training preprocessing"""
try:
# Parse date
date_str = report.get("Date_of_Event", "")
try:
date_obj = pd.to_datetime(date_str, errors="coerce")
if pd.isna(date_obj):
date_obj = datetime.now()
day_of_week = date_obj.dayofweek
month = date_obj.month
is_weekend = day_of_week in [5, 6]
week_of_year = date_obj.isocalendar().week
day_of_year = date_obj.dayofyear
except:
day_of_week = 0
month = 1
is_weekend = False
week_of_year = 1
day_of_year = 1
sin_day_of_week = np.sin(2 * np.pi * day_of_week / 7)
cos_day_of_week = np.cos(2 * np.pi * day_of_week / 7)
sin_month = np.sin(2 * np.pi * month / 12)
cos_month = np.cos(2 * np.pi * month / 12)
sin_day_of_year = np.sin(2 * np.pi * day_of_year / 365)
cos_day_of_year = np.cos(2 * np.pi * day_of_year / 365)
# Text features
report_text = report.get("Report", "")
root_cause = report.get("Root_Caused", "")
action_taken = report.get("Action_Taken", "")
# Categorical
airline = report.get("Airlines", "Unknown")
hub = report.get("HUB", "Unknown")
branch = report.get("Branch", "Unknown")
category = report.get("Irregularity_Complain_Category", "Unknown")
area = report.get("Area", "Unknown")
# Binary features
has_photos = bool(report.get("Upload_Irregularity_Photo", ""))
is_complaint = report.get("Report_Category", "") == "Complaint"
# Encode categorical features
categorical_values = {
"airline": airline,
"hub": hub,
"branch": branch,
"category": category,
"area": area,
}
encoded_values = {}
unknown_flags = {}
for col, value in categorical_values.items():
if col in self.label_encoders:
le = self.label_encoders[col]
value_str = str(value)
if value_str in le.classes_:
encoded_values[f"{col}_encoded"] = le.transform([value_str])[0]
unknown_flags[col] = False
else:
unknown_idx = (
le.transform(["Unknown"])[0]
if "Unknown" in le.classes_
else 0
)
encoded_values[f"{col}_encoded"] = unknown_idx
unknown_flags[col] = True
logger.warning(
f"Unknown {col} value: '{value_str}' - using Unknown category"
)
else:
encoded_values[f"{col}_encoded"] = 0
unknown_flags[col] = True
# Build feature vector in correct order
feature_dict = {
"day_of_week": day_of_week,
"month": month,
"is_weekend": int(is_weekend),
"week_of_year": week_of_year,
"sin_day_of_week": sin_day_of_week,
"cos_day_of_week": cos_day_of_week,
"sin_month": sin_month,
"cos_month": cos_month,
"sin_day_of_year": sin_day_of_year,
"cos_day_of_year": cos_day_of_year,
"report_length": len(report_text),
"report_word_count": len(report_text.split()) if report_text else 0,
"root_cause_length": len(root_cause),
"action_taken_length": len(action_taken),
"has_photos": int(has_photos),
"is_complaint": int(is_complaint),
"text_complexity": (len(report_text) * len(report_text.split()) / 100)
if report_text
else 0,
"has_root_cause": int(bool(root_cause)),
"has_action_taken": int(bool(action_taken)),
}
feature_dict.update(encoded_values)
has_unknown_categories = any(unknown_flags.values())
# Create feature array in correct order
features = []
for feature_name in self.feature_names:
features.append(feature_dict.get(feature_name, 0))
X = np.array([features])
# Scale features
if self.scaler:
X = self.scaler.transform(X)
return X, has_unknown_categories
except Exception as e:
logger.error(f"Feature extraction error: {e}")
return None, True
def _extract_features_batch(self, df: pd.DataFrame) -> Tuple[Optional[np.ndarray], np.ndarray]:
"""Extract features from a dataframe matching training preprocessing (Batch optimized)"""
try:
# Ensure required columns exist
required_cols = [
"Date_of_Event", "Report", "Root_Caused", "Action_Taken",
"Upload_Irregularity_Photo", "Report_Category",
"Airlines", "HUB", "Branch", "Irregularity_Complain_Category", "Area"
]
for col in required_cols:
if col not in df.columns:
df[col] = None
# Copy to avoid modifying original
df = df.copy()
# Parse date (normalize to tz-naive)
df["Date_of_Event"] = pd.to_datetime(df["Date_of_Event"], errors="coerce", utc=True)
df["Date_of_Event"] = df["Date_of_Event"].dt.tz_convert(None)
now = datetime.now()
df["Date_of_Event"] = df["Date_of_Event"].fillna(now)
df["day_of_week"] = df["Date_of_Event"].dt.dayofweek
df["month"] = df["Date_of_Event"].dt.month
df["is_weekend"] = df["day_of_week"].isin([5, 6]).astype(int)
df["week_of_year"] = df["Date_of_Event"].dt.isocalendar().week.astype(int)
df["day_of_year"] = df["Date_of_Event"].dt.dayofyear
# Sin/Cos transforms
df["sin_day_of_week"] = np.sin(2 * np.pi * df["day_of_week"] / 7)
df["cos_day_of_week"] = np.cos(2 * np.pi * df["day_of_week"] / 7)
df["sin_month"] = np.sin(2 * np.pi * df["month"] / 12)
df["cos_month"] = np.cos(2 * np.pi * df["month"] / 12)
df["sin_day_of_year"] = np.sin(2 * np.pi * df["day_of_year"] / 365)
df["cos_day_of_year"] = np.cos(2 * np.pi * df["day_of_year"] / 365)
# Text features
df["Report"] = df["Report"].fillna("").astype(str)
df["Root_Caused"] = df["Root_Caused"].fillna("").astype(str)
df["Action_Taken"] = df["Action_Taken"].fillna("").astype(str)
df["report_length"] = df["Report"].str.len()
df["report_word_count"] = df["Report"].apply(lambda x: len(x.split()) if x else 0)
df["root_cause_length"] = df["Root_Caused"].str.len()
df["action_taken_length"] = df["Action_Taken"].str.len()
df["has_photos"] = df["Upload_Irregularity_Photo"].fillna("").astype(bool).astype(int)
df["is_complaint"] = (df["Report_Category"] == "Complaint").astype(int)
df["text_complexity"] = np.where(
df["Report"].str.len() > 0,
(df["report_length"] * df["report_word_count"] / 100),
0
)
df["has_root_cause"] = (df["Root_Caused"].str.len() > 0).astype(int)
df["has_action_taken"] = (df["Action_Taken"].str.len() > 0).astype(int)
# Categorical encoding
categorical_cols = {
"airline": "Airlines",
"hub": "HUB",
"branch": "Branch",
"category": "Irregularity_Complain_Category",
"area": "Area"
}
unknown_flags = np.zeros(len(df), dtype=bool)
for feature_name, col_name in categorical_cols.items():
df[col_name] = df[col_name].fillna("Unknown").astype(str)
if feature_name in self.label_encoders:
le = self.label_encoders[feature_name]
# Create mapping for fast lookup
mapping = {label: idx for idx, label in enumerate(le.classes_)}
unknown_idx = mapping.get("Unknown", 0)
if "Unknown" in le.classes_:
unknown_idx = mapping["Unknown"]
# Map values
encoded_col = df[col_name].map(mapping)
# Track unknowns (NaN after map means unknown)
is_unknown = encoded_col.isna()
unknown_flags |= is_unknown.values
# Fill unknowns
df[f"{feature_name}_encoded"] = encoded_col.fillna(unknown_idx).astype(int)
else:
df[f"{feature_name}_encoded"] = 0
unknown_flags[:] = True
# Select features in order
for f in self.feature_names:
if f not in df.columns:
df[f] = 0
X = df[self.feature_names].values
# Scale
if self.scaler:
X = self.scaler.transform(X)
return X, unknown_flags
except Exception as e:
logger.error(f"Batch feature extraction error: {e}")
return None, np.ones(len(df), dtype=bool)
def predict_regression(self, data: List[Dict]) -> List[RegressionPrediction]:
"""Predict resolution time using trained model"""
predictions = []
shap_explainer = get_shap_explainer()
anomaly_detector = get_anomaly_detector()
# Batch processing
try:
df = pd.DataFrame(data)
X_batch, unknown_flags_batch = self._extract_features_batch(df)
if X_batch is not None:
if self.regression_onnx_session:
# Use ONNX model
input_name = self.regression_onnx_session.get_inputs()[0].name
predicted_batch = self.regression_onnx_session.run(None, {input_name: X_batch.astype(np.float32)})[0]
predicted_batch = predicted_batch.ravel() # Flatten to 1D array
elif self.regression_model is not None:
# Use Pickle model
predicted_batch = self.regression_model.predict(X_batch)
else:
predicted_batch = None
unknown_flags_batch = [True] * len(data)
else:
predicted_batch = None
unknown_flags_batch = [True] * len(data)
except Exception as e:
logger.error(f"Batch prediction setup failed: {e}")
predicted_batch = None
unknown_flags_batch = [True] * len(data)
for i, item in enumerate(data):
# Use batch results
has_unknown = unknown_flags_batch[i]
features = X_batch[i:i+1] if X_batch is not None else None
category = item.get("Irregularity_Complain_Category", "Unknown")
hub = item.get("HUB", "Unknown")
if predicted_batch is not None:
predicted = predicted_batch[i]
mae = self.model_metrics.get("test_mae", 0.5)
lower = max(0.1, predicted - mae)
upper = predicted + mae
shap_exp = None
if shap_explainer.explainer is not None and features is not None:
try:
shap_result = shap_explainer.explain_prediction(features)
shap_exp = ShapExplanation(
baseValue=shap_result.get("base_value", 0),
predictionExplained=shap_result.get(
"prediction_explained", False
),
topFactors=shap_result.get("top_factors", [])[:5],
explanation=shap_result.get("explanation", ""),
)
except Exception as e:
logger.debug(f"SHAP explanation failed: {e}")
anomaly_result = None
try:
anomaly_data = anomaly_detector.detect_prediction_anomaly(
predicted, category, hub
)
anomaly_result = AnomalyResult(
isAnomaly=anomaly_data.get("is_anomaly", False),
anomalyScore=anomaly_data.get("anomaly_score", 0),
anomalies=anomaly_data.get("anomalies", []),
)
except Exception as e:
logger.debug(f"Anomaly detection failed: {e}")
else:
base_days = {
"Cargo Problems": 2.5,
"Pax Handling": 1.8,
"GSE": 3.2,
"Operation": 2.1,
"Baggage Handling": 1.5,
}.get(category, 2.0)
predicted = base_days + np.random.normal(0, 0.3)
lower = max(0.1, predicted - 0.5)
upper = predicted + 0.5
has_unknown = True
shap_exp = None
anomaly_result = None
if self.model_metrics and "feature_importance" in self.model_metrics:
importance = self.model_metrics["feature_importance"]
else:
importance = {
"category": 0.35,
"airline": 0.28,
"hub": 0.15,
"reportLength": 0.12,
"hasPhotos": 0.10,
}
predictions.append(
RegressionPrediction(
reportId=f"row_{i}",
predictedDays=round(max(0.1, predicted), 2),
confidenceInterval=(round(lower, 2), round(upper, 2)),
featureImportance=importance,
hasUnknownCategories=has_unknown,
shapExplanation=shap_exp,
anomalyDetection=anomaly_result,
)
)
return predictions
def classify_text(self, data: List[Dict]) -> List[ClassificationResult]:
"""Classify text using trained NLP models or rule-based fallback"""
results = []
texts = [
(item.get("Report") or "") + " " + (item.get("Root_Caused") or "")
for item in data
]
# Get multi-task predictions if available
mt_results = None
if self.nlp_service:
mt_results = self.nlp_service.predict_multi_task(texts)
severity_results = self.nlp_service.classify_severity(texts)
else:
severity_results = self._classify_severity_fallback(texts)
for i, (item, sev_result) in enumerate(zip(data, severity_results)):
severity = sev_result.get("severity", "Low")
severity_conf = sev_result.get("confidence", 0.8)
# Use multi-task predictions for area and issue type if available
if mt_results and i < len(mt_results):
mt_res = mt_results[i]
area = mt_res.get("area", {}).get("label", item.get("Area", "Unknown")).replace(" Area", "")
area_conf = mt_res.get("area", {}).get("confidence", 0.85)
issue = mt_res.get("irregularity", {}).get("label", item.get("Irregularity_Complain_Category", "Unknown"))
issue_conf = mt_res.get("irregularity", {}).get("confidence", 0.85)
else:
area = item.get("Area", "Unknown").replace(" Area", "")
area_conf = 0.85
issue = item.get("Irregularity_Complain_Category", "Unknown")
issue_conf = 0.85
results.append(
ClassificationResult(
reportId=f"row_{i}",
severity=severity,
severityConfidence=severity_conf,
areaType=area,
issueType=issue,
issueTypeConfidence=issue_conf,
)
)
return results
def _classify_severity_fallback(self, texts: List[str]) -> List[Dict]:
"""Fallback severity classification"""
results = []
for text in texts:
report = text.lower()
if any(
kw in report
for kw in ["damage", "torn", "broken", "critical", "emergency"]
):
severity = "High"
severity_conf = 0.89
elif any(kw in report for kw in ["delay", "late", "wrong", "error"]):
severity = "Medium"
severity_conf = 0.75
else:
severity = "Low"
severity_conf = 0.82
results.append({"severity": severity, "confidence": severity_conf})
return results
def extract_entities(self, data: List[Dict]) -> List[EntityResult]:
"""Extract entities from reports"""
results = []
for i, item in enumerate(data):
entities = []
report_text = str(item.get("Report", "")) + " " + str(item.get("Root_Caused", ""))
# Extract airline
airline = str(item.get("Airlines", ""))
if airline and airline != "Unknown":
# Find position in text
idx = report_text.lower().find(airline.lower())
start = max(0, idx) if idx >= 0 else 0
entities.append(
Entity(
text=airline,
label="AIRLINE",
start=start,
end=start + len(airline),
confidence=0.95,
)
)
# Extract flight number
flight_val = item.get("Flight_Number", "")
flight = str(flight_val) if flight_val is not None else ""
if flight and flight != "#N/A":
entities.append(
Entity(
text=flight,
label="FLIGHT_NUMBER",
start=0,
end=len(flight),
confidence=0.92,
)
)
# Extract dates
date_val = item.get("Date_of_Event", "")
date_str = str(date_val) if date_val is not None else ""
if date_str:
entities.append(
Entity(
text=date_str,
label="DATE",
start=0,
end=len(date_str),
confidence=0.90,
)
)
results.append(EntityResult(reportId=f"row_{i}", entities=entities))
return results
def generate_summary(self, data: List[Dict]) -> List[SummaryResult]:
"""Generate summaries using NLP service or fallback"""
results = []
for i, item in enumerate(data):
combined_text = (
item.get("Report", "")
+ " "
+ item.get("Root_Caused", "")
+ " "
+ item.get("Action_Taken", "")
)
if self.nlp_service and len(combined_text) > 100:
summary_result = self.nlp_service.summarize(combined_text)
executive_summary = summary_result.get("executiveSummary", "")
key_points = summary_result.get("keyPoints", [])
else:
category = item.get("Irregularity_Complain_Category", "Issue")
max_chars = int(os.getenv("SUMMARY_MAX_CHARS", "600"))
report = str(item.get("Report", ""))[: max_chars // 2]
root_cause = str(item.get("Root_Caused", ""))[: max_chars // 3]
action = str(item.get("Action_Taken", ""))[: max_chars // 3]
executive_summary = f"{category}: {report}"
if root_cause:
executive_summary += f" Root cause: {root_cause}."
key_points = [
f"Category: {category}",
f"Status: {item.get('Status', 'Unknown')}",
f"Area: {item.get('Area', 'Unknown')}",
]
if action:
key_points.append(f"Action: {action[:100]}...")
if len(executive_summary) > max_chars:
executive_summary = executive_summary[:max_chars] + "..."
results.append(
SummaryResult(
reportId=f"row_{i}",
executiveSummary=executive_summary,
keyPoints=key_points,
)
)
return results
def analyze_sentiment(self, data: List[Dict]) -> List[SentimentResult]:
"""Analyze sentiment/urgency using NLP service or fallback"""
results = []
texts = [
item.get("Report", "") + " " + item.get("Root_Caused", "") for item in data
]
if self.nlp_service:
urgency_results = self.nlp_service.analyze_urgency(texts)
else:
urgency_results = self._analyze_urgency_fallback(texts)
for i, (item, urg_result) in enumerate(zip(data, urgency_results)):
results.append(
SentimentResult(
reportId=f"row_{i}",
urgencyScore=urg_result.get("urgency_score", 0.0),
sentiment=urg_result.get("sentiment", "Neutral"),
keywords=urg_result.get("keywords", []),
)
)
return results
def _analyze_urgency_fallback(self, texts: List[str]) -> List[Dict]:
"""Fallback urgency analysis"""
urgency_keywords = [
"damage",
"broken",
"emergency",
"critical",
"urgent",
"torn",
"severe",
]
results = []
for text in texts:
report = text.lower()
keyword_matches = [kw for kw in urgency_keywords if kw in report]
urgency_count = len(keyword_matches)
urgency_score = min(1.0, urgency_count / 3.0)
results.append(
{
"urgency_score": round(urgency_score, 2),
"sentiment": "Negative" if urgency_score > 0.3 else "Neutral",
"keywords": keyword_matches,
}
)
return results
# Initialize model service
model_service = ModelService()
# ============== API Endpoints ==============
@app.get(
"/",
tags=["Health"],
summary="API Root & Status",
)
async def root():
"""Returns basic API status, version, and model availability."""
return {
"status": "healthy",
"service": "Gapura AI Analysis API",
"version": "1.0.0",
"models": {
"regression": "loaded" if model_service.model_loaded else "unavailable",
"nlp": model_service.nlp_service.version if model_service.nlp_service and model_service.nlp_service.models_loaded else "unavailable",
},
"timestamp": datetime.now().isoformat(),
}
@app.get(
"/health",
tags=["Health"],
summary="Detailed Health Check",
)
async def health_check():
"""
Returns detailed health status including:
- **Models**: Version and load status of Regression and NLP models.
- **Cache**: Redis/Local cache connectivity.
- **Metrics**: Current model performance metrics (MAE, RMSE, R2).
"""
cache = get_cache()
cache_health = cache.health_check()
return {
"status": "healthy",
"models": {
"regression": {
"version": model_service.regression_version,
"loaded": model_service.model_loaded,
"metrics": model_service.model_metrics
if model_service.model_loaded
else None,
},
"nlp": {
"version": model_service.nlp_version,
"status": "rule_based",
},
},
"cache": cache_health,
"timestamp": datetime.now().isoformat(),
}
@app.post("/api/ai/risk/assess", response_model=RiskAssessmentResponse, tags=["Analysis"])
async def assess_risk(
request: Optional[AnalysisRequest] = Body(None),
sheetId: Optional[str] = None,
sheetName: Optional[str] = None,
rowRange: Optional[str] = None,
bypass_cache: bool = False,
top_k_patterns: int = 5,
):
from data.sheets_service import GoogleSheetsService
from data.action_service import get_action_service
items_data: List[Dict[str, Any]] = []
if request and request.data:
items_data = [r.model_dump(exclude_none=True) for r in request.data]
elif sheetId and sheetName and rowRange:
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
items_data = sheets_service.fetch_sheet_data(sheetId, sheetName, rowRange, bypass_cache=bypass_cache)
else:
raise HTTPException(status_code=400, detail="sheetId, sheetName, and rowRange are required, or provide data in body")
if len(items_data) == 0:
return RiskAssessmentResponse(items=[], topPatterns=[], metadata={"count": 0})
preds = model_service.predict_regression(items_data)
classes = model_service.classify_text(items_data)
try:
action_service = get_action_service()
eff = action_service.action_effectiveness or {}
except Exception:
eff = {}
items: List[RiskItem] = []
for i, item in enumerate(items_data):
cat = item.get("Irregularity_Complain_Category", "Unknown") or "Unknown"
hub = item.get("HUB", "Unknown") or "Unknown"
area = (item.get("Area", "Unknown") or "Unknown").replace(" Area", "")
pr = preds[i]
cl = classes[i]
sev = cl.severity
sev_conf = cl.severityConfidence
pdays = pr.predictedDays
anom = 0.0
if pr.anomalyDetection:
anom = pr.anomalyDetection.anomalyScore
sev_s = _severity_to_score(sev)
d_s = _normalize_days(pdays)
cat_w = 1.0 - float(eff.get(cat, 0.8))
risk = min(1.0, 0.5 * sev_s + 0.25 * d_s + 0.15 * anom + 0.10 * cat_w)
recs: List[Dict[str, Any]] = []
try:
recs_resp = action_service.recommend(
report=item.get("Report", "") or "",
issue_type=cat,
severity=sev,
area=area if area else None,
airline=item.get("Airlines") or None,
branch=item.get("Branch") or None,
top_n=5,
)
recs = recs_resp.get("recommendations", [])
except Exception:
recs = []
prev = _extract_prevention([r.get("action", "") for r in recs])
items.append(
RiskItem(
reportId=f"row_{i}",
severity=sev,
severityConfidence=sev_conf,
predictedDays=pdays,
anomalyScore=anom,
category=cat,
hub=hub,
area=area,
riskScore=round(risk, 3),
priority=_priority_from_score(risk),
recommendedActions=recs[:5],
preventiveSuggestions=prev,
)
)
groups: Dict[str, Dict[str, Any]] = {}
for it, raw in zip(items, items_data):
key = f"{it.category}|{it.hub}|{it.area}"
g = groups.get(key) or {"key": key, "category": it.category, "hub": it.hub, "area": it.area, "count": 0, "avgRisk": 0.0, "avgDays": 0.0, "highSeverityShare": 0.0}
g["count"] += 1
g["avgRisk"] += it.riskScore
g["avgDays"] += it.predictedDays
g["highSeverityShare"] += 1.0 if it.severity in ("Critical", "High") else 0.0
groups[key] = g
patterns = []
for g in groups.values():
c = g["count"]
g["avgRisk"] = round(g["avgRisk"] / max(1, c), 3)
g["avgDays"] = round(g["avgDays"] / max(1, c), 2)
g["highSeverityShare"] = round(g["highSeverityShare"] / max(1, c), 3)
patterns.append(g)
patterns.sort(key=lambda x: (-x["avgRisk"], -x["highSeverityShare"], -x["avgDays"], -x["count"]))
return RiskAssessmentResponse(
items=sorted(items, key=lambda x: -x.riskScore),
topPatterns=patterns[:top_k_patterns],
metadata={"count": len(items)},
)
from data.job_service import JobService, JobStatus
# Initialize job service
job_service = JobService()
def perform_analysis(data: List[Dict], options: AnalysisOptions, compact: bool) -> AnalysisResponse:
"""Core analysis logic reused by sync and async endpoints"""
start_time = datetime.now()
total_records = len(data)
logger.info(f"Analyzing {total_records} records...")
# Initialize response
response = AnalysisResponse(
metadata=Metadata(
totalRecords=total_records,
processingTime=0.0,
modelVersions={
"regression": model_service.regression_version,
"nlp": model_service.nlp_version,
},
)
)
# Regression Analysis
predictions: List[RegressionPrediction] = []
if options.predictResolutionTime or options.includeRisk:
logger.info(f"Running regression analysis...")
predictions = model_service.predict_regression(data)
# Use real metrics if available
if model_service.model_loaded and model_service.model_metrics:
metrics = {
"mae": round(model_service.model_metrics.get("test_mae", 1.2), 3),
"rmse": round(model_service.model_metrics.get("test_rmse", 1.8), 3),
"r2": round(model_service.model_metrics.get("test_r2", 0.78), 3),
"model_loaded": True,
"note": "Using trained model"
if model_service.model_loaded
else "Using fallback",
}
else:
metrics = {
"mae": None,
"rmse": None,
"r2": None,
"model_loaded": False,
"note": "Model not available - using fallback predictions",
}
if options.predictResolutionTime:
response.regression = RegressionResult(
predictions=predictions,
modelMetrics=metrics,
)
# NLP Analysis
classifications: List[ClassificationResult] = []
if any(
[
options.classifySeverity,
options.extractEntities,
options.generateSummary,
options.includeRisk,
]
):
logger.info(f"Running NLP analysis...")
entities = []
summaries = []
sentiment = []
if options.classifySeverity or options.includeRisk:
classifications = model_service.classify_text(data)
if options.extractEntities:
entities = model_service.extract_entities(data)
if options.generateSummary:
summaries = model_service.generate_summary(data)
sentiment = model_service.analyze_sentiment(data)
response.nlp = NLPResult(
classifications=classifications,
entities=entities,
summaries=summaries,
sentiment=sentiment,
)
# Trend Analysis
if options.analyzeTrends:
logger.info(f"Running trend analysis...")
by_airline = {}
by_hub = {}
by_category = {}
for item in data:
airline = item.get("Airlines", "Unknown")
hub = item.get("HUB", "Unknown")
category = item.get("Irregularity_Complain_Category", "Unknown")
# Airline aggregation
if airline not in by_airline:
by_airline[airline] = {"count": 0, "issues": []}
by_airline[airline]["count"] += 1
by_airline[airline]["issues"].append(category)
# Hub aggregation
if hub not in by_hub:
by_hub[hub] = {"count": 0, "issues": []}
by_hub[hub]["count"] += 1
by_hub[hub]["issues"].append(category)
# Category aggregation
if category not in by_category:
by_category[category] = {"count": 0}
by_category[category]["count"] += 1
# Convert to TrendData format
by_airline_trend = {
k: TrendData(
count=v["count"],
avgResolutionDays=2.0 + np.random.random(),
topIssues=list(set(v["issues"]))[:3],
)
for k, v in by_airline.items()
}
by_hub_trend = {
k: TrendData(
count=v["count"],
avgResolutionDays=2.0 + np.random.random(),
topIssues=list(set(v["issues"]))[:3],
)
for k, v in by_hub.items()
}
by_category_trend = {
k: {"count": v["count"], "trend": "stable"}
for k, v in by_category.items()
}
response.trends = TrendResult(
byAirline=by_airline_trend,
byHub=by_hub_trend,
byCategory=by_category_trend,
timeSeries=[],
)
# Risk Assessment
if options.includeRisk:
try:
from data.action_service import get_action_service
action_service = get_action_service()
eff = action_service.action_effectiveness or {}
except Exception:
eff = {}
action_service = None
items: List[RiskItem] = []
for i, item in enumerate(data):
cat = item.get("Irregularity_Complain_Category", "Unknown") or "Unknown"
hub = item.get("HUB", "Unknown") or "Unknown"
area = (item.get("Area", "Unknown") or "Unknown").replace(" Area", "")
pr = predictions[i] if i < len(predictions) else None
cl = classifications[i] if i < len(classifications) else None
sev = cl.severity if cl else "Low"
sev_conf = cl.severityConfidence if cl else 0.6
pdays = pr.predictedDays if pr else 0.0
anom = pr.anomalyDetection.anomalyScore if pr and pr.anomalyDetection else 0.0
sev_s = _severity_to_score(sev)
d_s = _normalize_days(pdays)
cat_w = 1.0 - float(eff.get(cat, 0.8))
risk = min(1.0, 0.5 * sev_s + 0.25 * d_s + 0.15 * anom + 0.10 * cat_w)
recs: List[Dict[str, Any]] = []
if action_service:
try:
recs_resp = action_service.recommend(
report=item.get("Report", "") or "",
issue_type=cat,
severity=sev,
area=area if area else None,
airline=item.get("Airlines") or None,
branch=item.get("Branch") or None,
top_n=5,
)
recs = recs_resp.get("recommendations", [])
except Exception:
recs = []
prev = _extract_prevention([r.get("action", "") for r in recs])
items.append(
RiskItem(
reportId=f"row_{i}",
severity=sev,
severityConfidence=sev_conf,
predictedDays=pdays,
anomalyScore=anom,
category=cat,
hub=hub,
area=area,
riskScore=round(risk, 3),
priority=_priority_from_score(risk),
recommendedActions=recs[:5],
preventiveSuggestions=prev,
)
)
groups: Dict[str, Dict[str, Any]] = {}
for it, raw in zip(items, data):
key = f"{it.category}|{it.hub}|{it.area}"
g = groups.get(key) or {"key": key, "category": it.category, "hub": it.hub, "area": it.area, "count": 0, "avgRisk": 0.0, "avgDays": 0.0, "highSeverityShare": 0.0}
g["count"] += 1
g["avgRisk"] += it.riskScore
g["avgDays"] += it.predictedDays
g["highSeverityShare"] += 1.0 if it.severity in ("Critical", "High") else 0.0
groups[key] = g
patterns = []
for g in groups.values():
c = g["count"]
g["avgRisk"] = round(g["avgRisk"] / max(1, c), 3)
g["avgDays"] = round(g["avgDays"] / max(1, c), 2)
g["highSeverityShare"] = round(g["highSeverityShare"] / max(1, c), 3)
patterns.append(g)
patterns.sort(key=lambda x: (-x["avgRisk"], -x["highSeverityShare"], -x["avgDays"], -x["count"]))
response.risk = RiskAssessmentResponse(
items=sorted(items, key=lambda x: -x.riskScore),
topPatterns=patterns[:5],
metadata={"count": len(items)},
)
if compact:
if response.regression and response.regression.predictions:
for p in response.regression.predictions:
p.shapExplanation = None
p.anomalyDetection = None
if response.nlp:
response.nlp.entities = []
response.nlp.summaries = []
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds() * 1000
response.metadata.processingTime = round(processing_time, 2)
logger.info(f"Analysis completed in {processing_time:.2f}ms")
return response
@app.post(
"/api/ai/analyze",
response_model=AnalysisResponse,
tags=["Analysis"],
summary="Analyze Irregularity Reports",
response_description="Analysis results including predictions, severity, and entities.",
)
async def analyze_reports(request: AnalysisRequest, compact: bool = False):
"""
Perform comprehensive AI analysis on a batch of irregularity reports.
- **Regression**: Predicts days to resolve based on category and description.
- **NLP**: Classifies severity, extracts entities (Flight No, Airline), and summarizes text.
- **Trends**: Aggregates data by Airline, Hub, and Category.
The endpoint accepts a list of `IrregularityReport` objects.
"""
try:
# Use direct data
if not request.data:
raise HTTPException(
status_code=400,
detail="No data provided. Either sheetId or data must be specified.",
)
# Convert IrregularityReport objects to dicts
data = [report.model_dump(exclude_none=True) for report in request.data]
return perform_analysis(data, request.options, compact)
except HTTPException:
raise
except Exception as e:
logger.error(f"Analysis error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
def background_analysis_task(job_id: str, data: List[Dict], options: AnalysisOptions, compact: bool):
"""Background task for analysis"""
try:
job_service.update_job(job_id, JobStatus.PROCESSING)
response = perform_analysis(data, options, compact)
job_service.update_job(job_id, JobStatus.COMPLETED, result=response.model_dump())
except Exception as e:
logger.error(f"Job {job_id} failed: {e}")
job_service.update_job(job_id, JobStatus.FAILED, error=str(e))
@app.post(
"/api/ai/analyze-async",
response_model=Dict[str, str],
tags=["Analysis", "Jobs"],
summary="Start Async Analysis Job",
)
async def analyze_async(
request: AnalysisRequest, background_tasks: BackgroundTasks, compact: bool = False
):
"""
Start a background analysis job for large datasets.
Returns a `jobId` immediately, which can be used to poll status via `/api/ai/jobs/{jobId}`.
"""
if not request.data:
raise HTTPException(status_code=400, detail="No data provided")
data = [report.model_dump(exclude_none=True) for report in request.data]
job_id = job_service.create_job()
background_tasks.add_task(background_analysis_task, job_id, data, request.options, compact)
return {"job_id": job_id, "status": "queued"}
@app.get(
"/api/ai/jobs/{job_id}",
tags=["Jobs"],
summary="Get Job Status",
)
async def get_job_status(job_id: str):
"""
Retrieve the status and results of a background analysis job.
Possible statuses: `queued`, `processing`, `completed`, `failed`.
"""
job = job_service.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post(
"/api/ai/predict-single",
tags=["Analysis"],
summary="Real-time Single Prediction",
)
async def predict_single(report: IrregularityReport):
"""
Get immediate AI predictions for a single irregularity report.
Useful for real-time validation or "what-if" analysis in the UI.
"""
try:
report_dict = report.model_dump(exclude_none=True)
predictions = model_service.predict_regression([report_dict])
classifications = model_service.classify_text([report_dict])
entities = model_service.extract_entities([report_dict])
summaries = model_service.generate_summary([report_dict])
sentiment = model_service.analyze_sentiment([report_dict])
return {
"prediction": predictions[0],
"classification": classifications[0],
"entities": entities[0],
"summary": summaries[0],
"sentiment": sentiment[0],
"modelLoaded": model_service.model_loaded,
}
except Exception as e:
logger.error(f"Single prediction error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post(
"/api/ai/train",
tags=["Training"],
summary="Trigger Model Retraining",
)
async def train_models(background_tasks: BackgroundTasks, force: bool = False):
"""
Trigger a background task to retrain AI models.
Checks if new data is available in Google Sheets before training, unless `force=True`.
"""
from scripts.scheduled_training import TrainingScheduler
def run_training_task():
scheduler = TrainingScheduler()
result = scheduler.run_training(force=force)
logger.info(f"Training completed: {result}")
background_tasks.add_task(run_training_task)
return {
"status": "training_queued",
"message": "Model retraining has been started in the background",
"force": force,
"timestamp": datetime.now().isoformat(),
}
@app.get(
"/api/ai/train/status",
tags=["Training"],
summary="Get Training Status",
)
async def training_status():
"""
Get the status of the latest training job and training history.
"""
from scripts.scheduled_training import TrainingScheduler
scheduler = TrainingScheduler()
status = scheduler.get_status()
return {
"status": "success",
"data": status,
"timestamp": datetime.now().isoformat(),
}
@app.get("/api/ai/model-info")
async def model_info():
"""Get current model information"""
return {
"regression": {
"version": model_service.regression_version,
"type": "GradientBoostingRegressor",
"status": "loaded" if model_service.model_loaded else "unavailable",
"last_trained": "2025-01-15",
"metrics": model_service.model_metrics
if model_service.model_loaded
else None,
},
"nlp": {
"version": model_service.nlp_version,
"type": "Rule-based + Keyword extraction",
"status": "active",
"tasks": ["classification", "ner", "summarization", "sentiment"],
"note": "Full ML NLP models coming soon",
},
}
@app.post("/api/ai/cache/invalidate")
async def invalidate_cache(sheet_name: Optional[str] = None):
"""Invalidate cache for sheets data"""
cache = get_cache()
if sheet_name:
pattern = f"sheets:*{sheet_name}*"
deleted = cache.delete_pattern(pattern)
return {
"status": "success",
"message": f"Invalidated cache for sheet: {sheet_name}",
"keys_deleted": deleted,
}
else:
deleted = cache.delete_pattern("sheets:*")
return {
"status": "success",
"message": "Invalidated all sheets cache",
"keys_deleted": deleted,
}
@app.get("/api/ai/cache/status")
async def cache_status():
"""Get cache status and statistics"""
cache = get_cache()
return cache.health_check()
class AnalyzeAllResponse(BaseModel):
status: str
metadata: Dict[str, Any]
sheets: Dict[str, Any]
results: List[Dict[str, Any]]
summary: Dict[str, Any]
timestamp: str
class EvalRequest(BaseModel):
y_true: List[str]
y_prob: List[List[float]]
class_names: Optional[List[str]] = None
feature_importance: Optional[Dict[str, float]] = None
shap: Optional[Dict[str, Any]] = None
latency: Optional[Dict[str, Any]] = None
ablation: Optional[Dict[str, Any]] = None
def _fig_to_b64(fig) -> str:
buf = io.BytesIO()
fig.savefig(buf, format="png", bbox_inches="tight")
plt.close(fig)
return base64.b64encode(buf.getvalue()).decode()
@app.get("/api/ai/analyze-all", response_model=AnalyzeAllResponse)
async def analyze_all_sheets(
bypass_cache: bool = False,
include_regression: bool = True,
include_nlp: bool = True,
include_trends: bool = True,
max_rows_per_sheet: int = 10000,
compact: bool = False,
branch: Optional[str] = None,
fast_mode: bool = False,
):
"""
Analyze ALL rows from all Google Sheets
Fetches data from both NON CARGO and CGO sheets, analyzes each row,
and returns comprehensive results.
Args:
bypass_cache: Skip cache and fetch fresh data
include_regression: Include regression predictions
include_nlp: Include NLP analysis (severity, entities, summary)
include_trends: Include trend analysis
max_rows_per_sheet: Maximum rows to process per sheet
"""
start_time = datetime.now()
try:
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(
status_code=500, detail="GOOGLE_SHEET_ID not configured"
)
all_data = []
sheet_info = {}
if fast_mode:
requests = [
{
"name": "NON CARGO",
"required_headers": [
"Date of Event",
"Airlines",
"Flight Number",
"Branch",
"HUB",
"Route",
"Report Category",
"Irregularity/Complain Category",
"Report",
"Status",
"Root Caused",
"Action Taken",
"Upload Irregularity Photo",
],
"max_rows": max_rows_per_sheet,
},
{
"name": "CGO",
"required_headers": [
"Date of Event",
"Airlines",
"Flight Number",
"Branch",
"HUB",
"Route",
"Report Category",
"Irregularity/Complain Category",
"Report",
"Status",
"Root Caused",
"Action Taken",
"Upload Irregularity Photo",
],
"max_rows": max_rows_per_sheet,
},
]
result_map = sheets_service.fetch_sheets_selected_columns(
spreadsheet_id, requests, bypass_cache=bypass_cache
)
for sheet_name, data in result_map.items():
if branch:
data = [
r
for r in data
if (r.get("Branch") or "").strip().lower()
== branch.strip().lower()
]
for row in data:
row["_source_sheet"] = sheet_name
all_data.append(row)
sheet_info[sheet_name] = {
"rows_fetched": len(data),
"status": "success",
"mode": "fast",
}
else:
sheets_to_fetch = [
{"name": "NON CARGO", "range": f"A1:AA{max_rows_per_sheet + 1}"},
{"name": "CGO", "range": f"A1:Z{max_rows_per_sheet + 1}"},
]
try:
result_map = sheets_service.fetch_sheets_batch_data(
spreadsheet_id, sheets_to_fetch, bypass_cache=bypass_cache
)
for sheet_name, data in result_map.items():
if branch:
data = [
r
for r in data
if (r.get("Branch") or "").strip().lower()
== branch.strip().lower()
]
for row in data:
row["_source_sheet"] = sheet_name
all_data.append(row)
sheet_info[sheet_name] = {
"rows_fetched": len(data),
"status": "success",
"mode": "full",
}
except Exception as e:
for sheet in sheets_to_fetch:
try:
sheet_name = sheet["name"]
range_str = sheet["range"]
data = sheets_service.fetch_sheet_data(
spreadsheet_id, sheet_name, range_str, bypass_cache=bypass_cache
)
if branch:
data = [
r
for r in data
if (r.get("Branch") or "").strip().lower()
== branch.strip().lower()
]
for row in data:
row["_source_sheet"] = sheet_name
all_data.append(row)
sheet_info[sheet_name] = {
"rows_fetched": len(data),
"status": "success",
"mode": "full",
}
except Exception as inner:
sheet_info[sheet["name"]] = {
"rows_fetched": 0,
"status": "error",
"error": str(inner),
}
total_records = len(all_data)
if total_records == 0:
raise HTTPException(status_code=404, detail="No data found in any sheet")
logger.info(f"Analyzing {total_records} total records...")
results = []
batch_size = 300
for i in range(0, total_records, batch_size):
batch = all_data[i : i + batch_size]
if include_regression:
regression_preds = model_service.predict_regression(batch)
else:
regression_preds = [None] * len(batch)
if include_nlp:
classifications = model_service.classify_text(batch)
entities = model_service.extract_entities(batch)
summaries = model_service.generate_summary(batch)
sentiments = model_service.analyze_sentiment(batch)
else:
classifications = [None] * len(batch)
entities = [None] * len(batch)
summaries = [None] * len(batch)
sentiments = [None] * len(batch)
for j, row in enumerate(batch):
result = {
"rowId": row.get("_row_id", f"row_{i + j}"),
"sourceSheet": row.get("_source_sheet", "Unknown"),
"originalData": {
"date": row.get("Date_of_Event"),
"airline": row.get("Airlines"),
"flightNumber": row.get("Flight_Number"),
"branch": row.get("Branch"),
"hub": row.get("HUB"),
"route": row.get("Route"),
"category": row.get("Report_Category"),
"issueType": row.get("Irregularity_Complain_Category"),
"report": row.get("Report"),
"status": row.get("Status"),
},
}
if regression_preds[j]:
pred = {
"predictedDays": regression_preds[j].predictedDays,
"confidenceInterval": regression_preds[j].confidenceInterval,
"hasUnknownCategories": regression_preds[j].hasUnknownCategories,
}
if not compact:
pred["shapExplanation"] = (
regression_preds[j].shapExplanation.model_dump()
if regression_preds[j].shapExplanation
else None
)
pred["anomalyDetection"] = (
regression_preds[j].anomalyDetection.model_dump()
if regression_preds[j].anomalyDetection
else None
)
result["prediction"] = pred
if classifications[j]:
result["classification"] = classifications[j].model_dump()
if entities[j] and not compact:
result["entities"] = entities[j].model_dump()
if summaries[j] and not compact:
result["summary"] = summaries[j].model_dump()
if sentiments[j] and not compact:
result["sentiment"] = sentiments[j].model_dump()
results.append(result)
summary = {
"totalRecords": total_records,
"sheetsProcessed": len(
[s for s in sheet_info.values() if s["status"] == "success"]
),
"regressionEnabled": include_regression,
"nlpEnabled": include_nlp,
}
if include_nlp and results:
severity_counts = {}
for r in results:
sev = r.get("classification", {}).get("severity", "Unknown")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
summary["severityDistribution"] = severity_counts
if include_regression and results:
predictions = [
r["prediction"]["predictedDays"] for r in results if r.get("prediction")
]
if predictions:
summary["predictionStats"] = {
"min": round(min(predictions), 2),
"max": round(max(predictions), 2),
"mean": round(sum(predictions) / len(predictions), 2),
}
processing_time = (datetime.now() - start_time).total_seconds()
return AnalyzeAllResponse(
status="success",
metadata={
"totalRecords": total_records,
"processingTimeSeconds": round(processing_time, 2),
"recordsPerSecond": round(total_records / processing_time, 2)
if processing_time > 0
else 0,
"modelVersions": {
"regression": model_service.regression_version,
"nlp": model_service.nlp_version,
},
},
sheets=sheet_info,
results=results,
summary=summary,
timestamp=datetime.now().isoformat(),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Analyze all error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/ai/analyze-all/fast", response_model=AnalyzeAllResponse)
async def analyze_all_sheets_fast(
bypass_cache: bool = False,
include_regression: bool = True,
include_nlp: bool = True,
include_trends: bool = True,
max_rows_per_sheet: int = 10000,
compact: bool = False,
branch: Optional[str] = None,
):
return await analyze_all_sheets(
bypass_cache=bypass_cache,
include_regression=include_regression,
include_nlp=include_nlp,
include_trends=include_trends,
max_rows_per_sheet=max_rows_per_sheet,
compact=compact,
branch=branch,
fast_mode=True,
)
@app.get("/api/analyze-all", response_model=AnalyzeAllResponse)
@app.get("/analyze-all", response_model=AnalyzeAllResponse)
@app.get("/api/ai/analyze_all", response_model=AnalyzeAllResponse)
@app.get("/api/analyze_all", response_model=AnalyzeAllResponse)
@app.get("/analyze_all", response_model=AnalyzeAllResponse)
async def analyze_all_alias(
bypass_cache: bool = False,
include_regression: bool = True,
include_nlp: bool = True,
include_trends: bool = True,
max_rows_per_sheet: int = 10000,
compact: bool = False,
branch: Optional[str] = None,
fast_mode: bool = False,
):
return await analyze_all_sheets(
bypass_cache=bypass_cache,
include_regression=include_regression,
include_nlp=include_nlp,
include_trends=include_trends,
max_rows_per_sheet=max_rows_per_sheet,
compact=compact,
branch=branch,
fast_mode=fast_mode,
)
@app.get("/api/analyze-all/fast", response_model=AnalyzeAllResponse)
@app.get("/analyze-all/fast", response_model=AnalyzeAllResponse)
@app.get("/api/ai/analyze_all/fast", response_model=AnalyzeAllResponse)
@app.get("/api/analyze_all/fast", response_model=AnalyzeAllResponse)
@app.get("/analyze_all/fast", response_model=AnalyzeAllResponse)
async def analyze_all_fast_alias(
bypass_cache: bool = False,
include_regression: bool = True,
include_nlp: bool = True,
include_trends: bool = True,
max_rows_per_sheet: int = 10000,
compact: bool = False,
branch: Optional[str] = None,
):
return await analyze_all_sheets(
bypass_cache=bypass_cache,
include_regression=include_regression,
include_nlp=include_nlp,
include_trends=include_trends,
max_rows_per_sheet=max_rows_per_sheet,
compact=compact,
branch=branch,
fast_mode=True,
)
# ============== Risk Scoring Endpoints ==============
@app.get("/api/ai/risk/summary")
async def risk_summary():
"""Get overall risk summary for all entities"""
from data.risk_service import get_risk_service
risk_service = get_risk_service()
return risk_service.get_risk_summary()
@app.get("/api/ai/risk/airlines")
async def airline_risks():
"""Get risk scores for all airlines"""
from data.risk_service import get_risk_service
risk_service = get_risk_service()
return risk_service.get_all_airline_risks()
@app.get("/api/ai/risk/airlines/{airline_name}")
async def airline_risk(airline_name: str):
"""Get risk score for a specific airline"""
from data.risk_service import get_risk_service
risk_service = get_risk_service()
risk_data = risk_service.get_airline_risk(airline_name)
if not risk_data:
raise HTTPException(
status_code=404, detail=f"Airline '{airline_name}' not found"
)
recommendations = risk_service.get_risk_recommendations("airline", airline_name)
return {
"airline": airline_name,
"risk_data": risk_data,
"recommendations": recommendations,
}
@app.get("/api/ai/risk/branches")
async def branch_risks():
"""Get risk scores for all branches"""
from data.risk_service import get_risk_service
risk_service = get_risk_service()
return risk_service.get_all_branch_risks()
@app.get("/api/ai/risk/hubs")
async def hub_risks():
"""Get risk scores for all hubs"""
from data.risk_service import get_risk_service
risk_service = get_risk_service()
return risk_service.get_all_hub_risks()
@app.post("/api/ai/risk/calculate")
async def calculate_risk_scores(bypass_cache: bool = False):
"""Calculate risk scores from current Google Sheets data"""
from data.risk_service import get_risk_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
# Fetch all data
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA2000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z2000", bypass_cache=bypass_cache
)
all_data = non_cargo + cargo
risk_service = get_risk_service()
risk_data = risk_service.calculate_all_risk_scores(all_data)
return {
"status": "success",
"records_processed": len(all_data),
"risk_summary": risk_service.get_risk_summary(),
}
# ============== Subcategory Classification Endpoints ==============
@app.post("/api/ai/subcategory")
async def classify_subcategory(
report: str,
area: Optional[str] = None,
issue_type: Optional[str] = None,
root_cause: Optional[str] = None,
):
"""Classify report into subcategory"""
from data.subcategory_service import get_subcategory_classifier
classifier = get_subcategory_classifier()
result = classifier.classify(report, area, issue_type, root_cause)
return result
@app.get("/api/ai/subcategory/categories")
async def get_subcategories(area: Optional[str] = None):
"""Get list of available subcategories"""
from data.subcategory_service import get_subcategory_classifier
classifier = get_subcategory_classifier()
return classifier.get_available_categories(area)
# ============== Action Recommendation Endpoints ==============
@app.post("/api/ai/action/recommend")
async def recommend_actions(
report: str,
issue_type: str,
severity: str = "Medium",
area: Optional[str] = None,
airline: Optional[str] = None,
branch: Optional[str] = None,
top_n: int = 5,
):
"""Get action recommendations for an issue"""
from data.action_service import get_action_service
action_service = get_action_service()
recommendations = action_service.recommend(
report=report,
issue_type=issue_type,
severity=severity,
area=area,
airline=airline,
branch=branch,
top_n=top_n,
)
return recommendations
@app.post("/api/ai/action/train")
async def train_action_recommender(
bypass_cache: bool = False, background_tasks: BackgroundTasks = None
):
"""Train action recommender from historical data"""
from data.action_service import get_action_service
from data.sheets_service import GoogleSheetsService
from data.similarity_service import get_similarity_service
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA2000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z2000", bypass_cache=bypass_cache
)
all_data = non_cargo + cargo
similarity_service = get_similarity_service()
similarity_service.build_index(all_data)
action_service = get_action_service()
action_service.train_from_data(all_data)
return {
"status": "success",
"records_processed": len(all_data),
}
# ============== Advanced NER Endpoints ==============
@app.post("/api/ai/ner/extract")
async def extract_entities(text: str):
"""Extract entities from text"""
from data.advanced_ner_service import get_advanced_ner
ner = get_advanced_ner()
entities = ner.extract(text)
summary = ner.extract_summary(text)
return {
"entities": entities,
"summary": summary,
}
# ============== Similarity Endpoints ==============
@app.post("/api/ai/similar")
async def find_similar_reports(
text: str,
top_k: int = 5,
threshold: float = 0.3,
):
"""Find similar reports"""
from data.similarity_service import get_similarity_service
similarity_service = get_similarity_service()
similar = similarity_service.find_similar(text, top_k, threshold)
return {
"query_preview": text[:100],
"similar_reports": similar,
}
@app.post("/api/ai/similar/build-index")
async def build_similarity_index(
bypass_cache: bool = False,
method: Optional[str] = None,
max_rows_per_sheet: int = 2000,
branch: Optional[str] = None,
):
"""Build similarity index from Google Sheets data"""
from data.similarity_service import get_similarity_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", f"A1:AA{max_rows_per_sheet}", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", f"A1:Z{max_rows_per_sheet}", bypass_cache=bypass_cache
)
all_data = non_cargo + cargo
if branch:
all_data = [
r for r in all_data if (r.get("Branch") or "").strip().lower() == branch.strip().lower()
]
similarity_service = get_similarity_service()
similarity_service.build_index(all_data, method=method)
return {
"status": "success",
"records_indexed": len(all_data),
"method": ("embedding" if (method or "").lower() in {"embedding","embed"} else ("embedding" if similarity_service.use_embedding else "tfidf")),
}
@app.post("/api/ai/eval/plots")
async def generate_eval_plots(req: EvalRequest):
y_true = req.y_true
y_prob = np.array(req.y_prob, dtype=float)
if req.class_names:
classes = req.class_names
else:
classes = ["Critical", "High", "Medium", "Low"]
y_pred_idx = np.argmax(y_prob, axis=1).tolist()
y_true_idx = [classes.index(y) if y in classes else -1 for y in y_true]
valid = [i for i, v in enumerate(y_true_idx) if v >= 0]
y_true_idx = [y_true_idx[i] for i in valid]
y_prob = y_prob[valid]
y_pred_idx = [y_pred_idx[i] for i in valid]
cm = confusion_matrix(y_true_idx, y_pred_idx, labels=list(range(len(classes))))
fig1, ax1 = plt.subplots(figsize=(5, 4))
im = ax1.imshow(cm, cmap="Blues")
ax1.set_xticks(range(len(classes)))
ax1.set_yticks(range(len(classes)))
ax1.set_xticklabels(classes, rotation=45, ha="right")
ax1.set_yticklabels(classes)
for i in range(len(classes)):
for j in range(len(classes)):
ax1.text(j, i, str(cm[i, j]), ha="center", va="center", color="black")
fig1.colorbar(im, ax=ax1)
ax1.set_xlabel("Pred")
ax1.set_ylabel("True")
cm_b64 = _fig_to_b64(fig1)
lb = LabelBinarizer()
lb.fit(list(range(len(classes))))
Y_true = lb.transform(y_true_idx)
aucs = {}
fig2, ax2 = plt.subplots(figsize=(6, 5))
for i, cname in enumerate(classes):
yi = Y_true[:, i]
pi = y_prob[:, i]
fpr, tpr, _ = roc_curve(yi, pi)
auci = roc_auc_score(yi, pi)
aucs[cname] = float(auci)
ax2.plot(fpr, tpr, label=f"{cname} AUC={auci:.3f}")
ax2.plot([0, 1], [0, 1], "k--")
ax2.set_xlabel("FPR")
ax2.set_ylabel("TPR")
ax2.legend(loc="lower right", fontsize=8)
roc_b64 = _fig_to_b64(fig2)
bs_list = []
fig3, ax3 = plt.subplots(figsize=(6, 5))
bins = np.linspace(0.0, 1.0, 11)
for i, cname in enumerate(classes):
yi = Y_true[:, i]
pi = y_prob[:, i]
brier = brier_score_loss(yi, pi)
bs_list.append(float(brier))
binids = np.digitize(pi, bins) - 1
bin_true = [np.mean(yi[binids == b]) if np.any(binids == b) else np.nan for b in range(len(bins) - 1)]
bin_pred = [np.mean(pi[binids == b]) if np.any(binids == b) else np.nan for b in range(len(bins) - 1)]
ax3.plot(bin_pred, bin_true, marker="o", linewidth=1, label=f"{cname}")
ax3.plot([0, 1], [0, 1], "k--")
ax3.set_xlabel("Predicted")
ax3.set_ylabel("Observed")
ax3.legend(fontsize=8)
rel_b64 = _fig_to_b64(fig3)
fi_b64 = None
if req.feature_importance:
items = sorted(req.feature_importance.items(), key=lambda x: -x[1])[:20]
labels = [k for k, _ in items]
vals = [v for _, v in items]
fig4, ax4 = plt.subplots(figsize=(7, 5))
ax4.barh(range(len(vals)), vals)
ax4.set_yticks(range(len(labels)))
ax4.set_yticklabels(labels)
ax4.invert_yaxis()
ax4.set_xlabel("Importance")
fi_b64 = _fig_to_b64(fig4)
shap_b64 = None
if req.shap and isinstance(req.shap, dict):
shap_vals = req.shap.get("shap_values") or []
feat_names = req.shap.get("feature_names") or [f"f{i}" for i in range(len(shap_vals))]
pairs = list(zip(feat_names, shap_vals))
pairs.sort(key=lambda x: -abs(x[1]))
pairs = pairs[:15]
labels = [p[0] for p in pairs]
vals = [p[1] for p in pairs]
colors = ["#d62728" if v > 0 else "#1f77b4" for v in vals]
fig5, ax5 = plt.subplots(figsize=(7, 5))
ax5.barh(range(len(vals)), vals, color=colors)
ax5.set_yticks(range(len(labels)))
ax5.set_yticklabels(labels)
ax5.invert_yaxis()
ax5.set_xlabel("SHAP value")
shap_b64 = _fig_to_b64(fig5)
lat_b64 = None
if req.latency and isinstance(req.latency, dict):
def agg(x):
if isinstance(x, list):
arr = np.array(x, dtype=float)
return float(np.mean(arr)), float(np.percentile(arr, 95))
try:
v = float(x)
return v, v
except Exception:
return None, None
b_m, b_p = agg(req.latency.get("baseline"))
o_m, o_p = agg(req.latency.get("onnx"))
q_m, q_p = agg(req.latency.get("onnx_q"))
labels = []
means = []
p95 = []
if b_m is not None:
labels.append("Baseline"); means.append(b_m); p95.append(b_p)
if o_m is not None:
labels.append("ONNX"); means.append(o_m); p95.append(o_p)
if q_m is not None:
labels.append("ONNX Q"); means.append(q_m); p95.append(q_p)
if labels:
x = np.arange(len(labels))
fig6, ax6 = plt.subplots(figsize=(6, 4))
ax6.bar(x, means, yerr=np.maximum(0, np.array(p95) - np.array(means)), capsize=4)
ax6.set_xticks(x); ax6.set_xticklabels(labels)
ax6.set_ylabel("ms")
lat_b64 = _fig_to_b64(fig6)
ab_b64 = None
if req.ablation and isinstance(req.ablation, dict):
cfgs = req.ablation.get("configs") or []
accs = req.ablation.get("accuracy") or []
if cfgs and accs and len(cfgs) == len(accs):
x = np.arange(len(cfgs))
fig7, ax7 = plt.subplots(figsize=(6, 4))
ax7.bar(x, accs)
ax7.set_xticks(x); ax7.set_xticklabels(cfgs, rotation=15, ha="right")
ax7.set_ylabel("Accuracy")
ab_b64 = _fig_to_b64(fig7)
macro_auc = float(np.mean(list(aucs.values()))) if aucs else None
brier_macro = float(np.mean(bs_list)) if bs_list else None
return {
"confusion_matrix_png": cm_b64,
"roc_curves_png": roc_b64,
"reliability_png": rel_b64,
"feature_importance_png": fi_b64,
"shap_force_like_png": shap_b64,
"latency_png": lat_b64,
"ablation_png": ab_b64,
"metrics": {
"macro_auc": macro_auc,
"brier_macro": brier_macro,
"per_class_auc": aucs,
},
}
# ============== Forecasting Endpoints ==============
@app.get("/api/ai/forecast/issues")
async def forecast_issues(periods: int = 4):
"""Forecast issue volume for next periods"""
from data.forecast_service import get_forecast_service
forecast_service = get_forecast_service()
forecast = forecast_service.forecast_issues(periods)
return forecast
@app.get("/api/ai/forecast/trends")
async def predict_trends():
"""Predict category trends"""
from data.forecast_service import get_forecast_service
forecast_service = get_forecast_service()
trends = forecast_service.predict_category_trends()
return trends
@app.get("/api/ai/forecast/seasonal")
async def get_seasonal_patterns():
"""Get seasonal patterns"""
from data.forecast_service import get_forecast_service
forecast_service = get_forecast_service()
patterns = forecast_service.get_seasonal_patterns()
return patterns
@app.post("/api/ai/forecast/build")
async def build_forecast_data(bypass_cache: bool = False):
"""Build forecast historical data from Google Sheets"""
from data.forecast_service import get_forecast_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA2000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z2000", bypass_cache=bypass_cache
)
all_data = non_cargo + cargo
forecast_service = get_forecast_service()
forecast_service.build_historical_data(all_data)
return {
"status": "success",
"records_processed": len(all_data),
"forecast_summary": forecast_service.get_forecast_summary(),
}
# ============== Report Generation Endpoints ==============
@app.post("/api/ai/report/generate")
async def generate_report(
row_id: str,
bypass_cache: bool = False,
):
"""Generate formal incident report"""
from data.report_generator_service import get_report_generator
from data.sheets_service import GoogleSheetsService
from data.risk_service import get_risk_service
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
# Fetch all data and find the record
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA2000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z2000", bypass_cache=bypass_cache
)
all_data = non_cargo + cargo
record = None
for r in all_data:
if r.get("_row_id") == row_id:
record = r
break
if not record:
raise HTTPException(status_code=404, detail=f"Record '{row_id}' not found")
# Generate analysis
report_text = record.get("Report", "") + " " + record.get("Root_Caused", "")
analysis = {
"severity": model_service._classify_severity_fallback([report_text])[0].get(
"severity", "Medium"
),
"issueType": record.get("Irregularity_Complain_Category", ""),
}
# Get risk data
risk_service = get_risk_service()
airline = record.get("Airlines", "")
risk_data = risk_service.get_airline_risk(airline)
# Generate report
report_gen = get_report_generator()
formal_report = report_gen.generate_incident_report(record, analysis, risk_data)
exec_summary = report_gen.generate_executive_summary(record, analysis)
json_report = report_gen.generate_json_report(record, analysis, risk_data)
return {
"row_id": row_id,
"formal_report": formal_report,
"executive_summary": exec_summary,
"structured_report": json_report,
}
# ============== Dashboard Endpoints ==============
@app.get("/api/ai/dashboard/summary")
async def dashboard_summary(bypass_cache: bool = False):
"""Get comprehensive dashboard summary"""
from data.risk_service import get_risk_service
from data.forecast_service import get_forecast_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
# Fetch data
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA2000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z2000", bypass_cache=bypass_cache
)
all_data = non_cargo + cargo
# Get risk summary
risk_service = get_risk_service()
risk_summary = risk_service.get_risk_summary()
# Get forecast summary
forecast_service = get_forecast_service()
forecast_summary = forecast_service.get_forecast_summary()
# Calculate statistics
severity_dist = Counter()
category_dist = Counter()
airline_dist = Counter()
for record in all_data:
report_text = record.get("Report", "") + " " + record.get("Root_Caused", "")
sev = model_service._classify_severity_fallback([report_text])[0].get(
"severity", "Low"
)
severity_dist[sev] += 1
category_dist[record.get("Irregularity_Complain_Category", "Unknown")] += 1
airline_dist[record.get("Airlines", "Unknown")] += 1
return {
"total_records": len(all_data),
"sheets": {
"non_cargo": len(non_cargo),
"cargo": len(cargo),
},
"severity_distribution": dict(severity_dist),
"category_distribution": dict(category_dist.most_common(10)),
"top_airlines": dict(airline_dist.most_common(10)),
"risk_summary": risk_summary,
"forecast_summary": forecast_summary,
"model_status": {
"regression": model_service.model_loaded,
"nlp": model_service.nlp_service is not None,
},
"last_updated": datetime.now().isoformat(),
}
# ============== Seasonality Endpoints ==============
@app.get("/api/ai/seasonality/summary")
async def seasonality_summary(category_type: Optional[str] = None):
"""
Get seasonality summary and patterns
Args:
category_type: "landside_airside", "cgo", or None for both
"""
from data.seasonality_service import get_seasonality_service
service = get_seasonality_service()
return service.get_seasonality_summary(category_type)
@app.get("/api/ai/seasonality/forecast")
async def seasonality_forecast(
category_type: Optional[str] = None,
periods: int = 4,
granularity: str = "weekly",
):
"""
Forecast issue volumes
Args:
category_type: "landside_airside", "cgo", or None for both
periods: Number of periods to forecast
granularity: "daily", "weekly", or "monthly"
"""
from data.seasonality_service import get_seasonality_service
service = get_seasonality_service()
return service.forecast(category_type, periods, granularity)
@app.get("/api/ai/seasonality/peaks")
async def seasonality_peaks(
category_type: Optional[str] = None, threshold: float = 1.2
):
"""
Identify peak periods
Args:
category_type: "landside_airside", "cgo", or None for both
threshold: Multiplier above average (1.2 = 20% above)
"""
from data.seasonality_service import get_seasonality_service
service = get_seasonality_service()
return service.get_peak_periods(category_type, threshold)
@app.post("/api/ai/seasonality/build")
async def build_seasonality_patterns(bypass_cache: bool = False):
"""Build seasonality patterns from Google Sheets data"""
from data.seasonality_service import get_seasonality_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache
)
for row in non_cargo:
row["_sheet_name"] = "NON CARGO"
for row in cargo:
row["_sheet_name"] = "CGO"
all_data = non_cargo + cargo
service = get_seasonality_service()
result = service.build_patterns(all_data)
return {
"status": "success",
"records_processed": len(all_data),
"patterns": result,
}
# ============== Root Cause Endpoints ==============
@app.post("/api/ai/root-cause/classify")
async def classify_root_cause(
root_cause: str,
report: Optional[str] = None,
area: Optional[str] = None,
category: Optional[str] = None,
):
"""
Classify a root cause text into categories
Categories: Equipment Failure, Staff Competency, Process/Procedure,
Communication, External Factors, Documentation, Training Gap, Resource/Manpower
"""
from data.root_cause_service import get_root_cause_service
service = get_root_cause_service()
context = {"area": area, "category": category}
result = service.classify(root_cause, report or "", context)
return result
@app.post("/api/ai/root-cause/classify-batch")
async def classify_root_cause_batch(bypass_cache: bool = False):
"""Classify root causes for all records"""
from data.root_cause_service import get_root_cause_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache
)
all_data = non_cargo + cargo
service = get_root_cause_service()
results = service.classify_batch(all_data)
return {
"status": "success",
"records_processed": len(all_data),
"classifications": results[:100],
"total_classified": len(
[r for r in results if r["primary_category"] != "Unknown"]
),
}
@app.get("/api/ai/root-cause/categories")
async def get_root_cause_categories():
"""Get all available root cause categories"""
from data.root_cause_service import get_root_cause_service
service = get_root_cause_service()
return service.get_categories()
@app.get("/api/ai/root-cause/stats")
async def get_root_cause_stats(source: Optional[str] = None, bypass_cache: bool = False):
"""
Get root cause statistics from data
Args:
source: "NON CARGO", "CGO", or None for both
bypass_cache: Skip cache and fetch fresh data
"""
from data.root_cause_service import get_root_cause_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
all_data = []
# Conditional fetching based on source to reduce I/O and processing
if not source or source.upper() == "NON CARGO":
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache
)
all_data.extend(non_cargo)
if not source or source.upper() == "CGO":
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache
)
all_data.extend(cargo)
service = get_root_cause_service()
stats = service.get_statistics(all_data)
return stats
@app.post("/api/ai/root-cause/dominant")
async def get_dominant_root_cause(
request: Optional[AnalysisRequest] = Body(None),
sheetId: Optional[str] = None,
sheetName: Optional[str] = None,
rowRange: Optional[str] = None,
bypass_cache: bool = False,
max_rows_per_sheet: int = 2000,
branch: Optional[str] = None,
):
from data.root_cause_service import get_root_cause_service
from data.sheets_service import GoogleSheetsService
data: List[Dict[str, Any]] = []
if request and request.data:
data = [r.model_dump(exclude_none=True) for r in request.data]
elif sheetId and sheetName and rowRange:
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
data = sheets_service.fetch_sheet_data(sheetId, sheetName, rowRange, bypass_cache=bypass_cache)
else:
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(spreadsheet_id, "NON CARGO", f"A1:AA{max_rows_per_sheet+1}", bypass_cache=bypass_cache)
cargo = sheets_service.fetch_sheet_data(spreadsheet_id, "CGO", f"A1:Z{max_rows_per_sheet+1}", bypass_cache=bypass_cache)
data = non_cargo + cargo
if branch:
data = [r for r in data if (r.get("Branch") or "").strip().lower() == branch.strip().lower()]
service = get_root_cause_service()
stats = service.get_statistics(data)
top = stats.get("top_categories", [])
if not top:
return {"category": "Unknown", "count": 0, "percentage": 0.0, "classified": stats.get("classified", 0), "total_records": stats.get("total_records", 0)}
name, info = top[0]
return {
"category": name,
"count": info.get("count", 0),
"percentage": info.get("percentage"),
"classified": stats.get("classified", 0),
"total_records": stats.get("total_records", 0),
}
@app.post("/api/ai/root-cause/train")
async def train_root_cause_classifier(background_tasks: BackgroundTasks, bypass_cache: bool = False):
"""Train root cause classifier from historical data"""
from data.root_cause_service import get_root_cause_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache
)
all_data = non_cargo + cargo
service = get_root_cause_service()
# Offload the intensive training process to the background
background_tasks.add_task(service.train_from_data, all_data)
return {
"status": "training_started",
"records_fetched": len(all_data),
"message": "Classification training is now running in the background. The model will be automatically updated once complete."
}
def _gse_bucket(record: Dict) -> str:
txt = " ".join(
[
str(record.get("Report", "")),
str(record.get("Root_Caused", "")),
str(record.get("Action_Taken", "")),
]
).lower()
maint_kw = [
"maintenance",
"perawatan",
"perbaikan",
"rfm",
"request for maintenance",
"work order",
"wo ",
"wo-",
"rusak",
"trouble",
"engine",
"atw",
"push back",
"pushback",
"gpu",
"tractor",
"towbar",
"flat",
"ban",
"filter",
"oli",
"hydraulic",
"hidrolik",
"wiring",
"kabel",
"gearbox",
"starter",
"alternator",
"relay",
"bearing",
"accu",
"battery",
"leak",
"bocor",
"hose",
"hll",
"servis",
"service",
"repair",
]
sdm_kw = [
"sdm",
"staff",
"karyawan",
"pegawai",
"porter",
"operator",
"tenaga",
"manpower",
"distribusi",
"pemetaan",
"kompetensi",
"training",
"pelatihan",
"kurang",
"understaff",
"late show",
"late positioning",
"misslook",
"controller",
"leader",
"human error",
"capek",
"lelah",
"shift",
"schedule",
]
sda_kw = [
"sda",
"empty cart",
"cart kosong",
"empty baggage cart",
"cart",
"trolley",
"peralatan",
"alat",
"kendaraan",
"unit",
"gse",
"availability",
"ketersediaan alat",
"kurang unit",
"unit kurang",
"standby",
"ketersediaan",
"tidak siap",
"tidak tersedia",
]
if any(k in txt for k in maint_kw):
return "Maintenance"
if any(k in txt for k in sdm_kw):
return "SDM"
if any(k in txt for k in sda_kw):
return "SDA"
return "Other"
@app.get("/api/ai/gse/top")
async def get_gse_top(
source: Optional[str] = None,
bypass_cache: bool = False,
branch: Optional[str] = None,
top_k: int = 3,
examples_per_bucket: int = 2,
include_timeline: bool = True,
exclude_zero: bool = True,
):
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
data: List[Dict[str, Any]] = []
if source and source.upper() in {"NON CARGO", "CGO"}:
if source.upper() == "NON CARGO":
data.extend(
sheets_service.fetch_sheet_data(spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache)
)
else:
data.extend(
sheets_service.fetch_sheet_data(spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache)
)
else:
batch = sheets_service.fetch_sheets_batch_data(
spreadsheet_id,
[
{"name": "NON CARGO", "range": "A1:AA5000"},
{"name": "CGO", "range": "A1:Z5000"},
],
bypass_cache=bypass_cache,
)
data.extend(batch.get("NON CARGO", []))
data.extend(batch.get("CGO", []))
if branch:
b = branch.strip().lower()
data = [r for r in data if (str(r.get("Branch", "")).strip().lower() == b)]
gse_data = [
r
for r in data
if str(r.get("Irregularity_Complain_Category", "")).strip().lower() == "gse"
or " gse " in f" {str(r.get('Report','')).lower()} "
]
totals = {"SDA": 0, "SDM": 0, "Maintenance": 0, "Other": 0}
buckets: Dict[str, Dict[str, Any]] = {
"SDA": {
"areas": Counter(),
"branches": Counter(),
"airlines": Counter(),
"keywords": Counter(),
"examples": [],
"months": Counter(),
},
"SDM": {
"areas": Counter(),
"branches": Counter(),
"airlines": Counter(),
"keywords": Counter(),
"examples": [],
"months": Counter(),
},
"Maintenance": {
"areas": Counter(),
"branches": Counter(),
"airlines": Counter(),
"keywords": Counter(),
"examples": [],
"months": Counter(),
},
"Other": {
"areas": Counter(),
"branches": Counter(),
"airlines": Counter(),
"keywords": Counter(),
"examples": [],
"months": Counter(),
},
}
maint_kw = [
"maintenance",
"perawatan",
"perbaikan",
"rfm",
"request for maintenance",
"work order",
"wo ",
"wo-",
"rusak",
"trouble",
"engine",
"atw",
"push back",
"pushback",
"gpu",
"tractor",
"towbar",
"flat",
"ban",
"filter",
"oli",
"hydraulic",
"hidrolik",
"wiring",
"kabel",
"gearbox",
"starter",
"alternator",
"relay",
"bearing",
"accu",
"battery",
"leak",
"bocor",
"hose",
"hll",
"servis",
"service",
"repair",
]
sdm_kw = [
"sdm",
"staff",
"karyawan",
"pegawai",
"porter",
"operator",
"tenaga",
"manpower",
"distribusi",
"pemetaan",
"kompetensi",
"training",
"pelatihan",
"kurang",
"understaff",
"late show",
"late positioning",
"misslook",
"controller",
"leader",
"human error",
"capek",
"lelah",
"shift",
"schedule",
]
sda_kw = [
"sda",
"empty cart",
"cart kosong",
"empty baggage cart",
"cart",
"trolley",
"peralatan",
"alat",
"kendaraan",
"unit",
"gse",
"availability",
"ketersediaan alat",
"kurang unit",
"unit kurang",
"standby",
"ketersediaan",
"tidak siap",
"tidak tersedia",
]
def _month_key(r: Dict[str, Any]) -> Optional[str]:
from datetime import datetime
val = r.get("Date_of_Event") or r.get("dateOfEvent") or r.get("Date")
if not val:
return None
s = str(val).strip()
fmts = ["%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y"]
for f in fmts:
try:
d = datetime.strptime(s, f)
return f"{d.year:04d}-{d.month:02d}"
except Exception:
continue
try:
d = datetime.fromisoformat(s)
return f"{d.year:04d}-{d.month:02d}"
except Exception:
return None
for r in gse_data:
bucket = _gse_bucket(r)
totals[bucket] = totals.get(bucket, 0) + 1
area = str(r.get("Area", "") or "Unknown")
br = str(r.get("Branch", "") or "Unknown")
al = str(r.get("Airlines", "") or "Unknown")
txt = " ".join(
[str(r.get("Report", "")), str(r.get("Root_Caused", "")), str(r.get("Action_Taken", ""))]
).lower()
bdata = buckets[bucket]
bdata["areas"][area] += 1
bdata["branches"][br] += 1
bdata["airlines"][al] += 1
if bucket == "Maintenance":
for kw in maint_kw:
if kw in txt:
bdata["keywords"][kw] += 1
elif bucket == "SDM":
for kw in sdm_kw:
if kw in txt:
bdata["keywords"][kw] += 1
elif bucket == "SDA":
for kw in sda_kw:
if kw in txt:
bdata["keywords"][kw] += 1
if len(bdata["examples"]) < max(0, int(examples_per_bucket)):
bdata["examples"].append(
{
"row_id": r.get("_row_id"),
"report": str(r.get("Report", ""))[:180],
"root_cause": str(r.get("Root_Caused", ""))[:180],
"action_taken": str(r.get("Action_Taken", ""))[:180],
"area": area,
"branch": br,
"airlines": al,
}
)
if include_timeline:
mk = _month_key(r)
if mk:
bdata["months"][mk] += 1
total_count = sum(totals.values())
by_subcategory: Dict[str, Any] = {}
for k in ("SDA", "SDM", "Maintenance"):
c = totals.get(k, 0)
pct = round((c / max(total_count, 1)) * 100, 1)
b = buckets[k]
recs: List[str] = []
if b["keywords"]:
kw_items = list(b["keywords"].items())
rules: List[Tuple[List[str], str]] = []
if k == "SDA":
rules = [
(["empty cart", "cart", "cart kosong", "unit kurang", "ketersediaan", "standby"], "Tambah/realokasi unit cart di jam sibuk dan pastikan standby H-15."),
(["trolley", "alat", "kendaraan", "gse"], "Audit ketersediaan alat GSE dan siapkan backup di area padat."),
]
if k == "SDM":
rules = [
(["understaff", "kurang", "lelah", "capek"], "Redistribusi SDM dan tambah shift malam saat peak hour."),
(["late show", "misslook", "schedule", "shift", "controller", "leader"], "Perkuat kontrol jadwal dan monitoring oleh leader/controller."),
(["training", "pelatihan", "kompetensi"], "Lakukan refresh training terarah sesuai kompetensi unit."),
]
if k == "Maintenance":
rules = [
(["rfm", "work order", "wo", "servis", "service"], "Percepat proses RFM/WO, prioritaskan spare part kritikal."),
(["hose", "leak", "bocor", "hll"], "Cek hose dan kebocoran, tambah inspeksi preventif berkala."),
(["engine", "starter", "alternator", "relay", "wiring", "kabel"], "Jadwalkan inspeksi komponen engine/elektrikal dan penggantian preventif."),
(["atw", "push back", "gpu", "tractor", "towbar"], "Pastikan ATW/GPU/Tractor/Towbar siap pakai dan sediakan backup unit."),
(["flat", "ban", "bearing", "gearbox", "filter", "oli", "hydraulic", "hidrolik"], "Perkuat maintenance rutin: ban, bearing, gearbox, filter, oli/hidrolik."),
]
scored: List[Tuple[int, str]] = []
for kws, msg in rules:
s = 0
for kw, cnt in kw_items:
for key in kws:
if key in kw:
s += cnt
if s > 0:
scored.append((s, msg))
scored.sort(key=lambda x: -x[0])
recs = [m for _, m in scored[:max(1, min(3, top_k))]]
by_subcategory[k] = {
"count": c,
"percentage": pct,
"top_areas": dict(b["areas"].most_common(top_k)),
"top_branches": dict(b["branches"].most_common(top_k)),
"top_airlines": dict(b["airlines"].most_common(top_k)),
"top_keywords": dict(b["keywords"].most_common(top_k)),
"examples": b["examples"],
"recommendations": recs,
}
if include_timeline:
by_subcategory[k]["timeline_monthly"] = dict(sorted(b["months"].items()))
if exclude_zero:
by_subcategory = {k: v for k, v in by_subcategory.items() if v["count"] > 0}
sorted_items = sorted([(k, v) for k, v in by_subcategory.items()], key=lambda x: -x[1]["count"])[:top_k]
top = [{"subcategory": k, "count": v["count"], "percentage": v["percentage"]} for k, v in sorted_items]
return {
"total_records": len(data),
"gse_records": len(gse_data),
"by_subcategory": by_subcategory,
"top": top,
}
def _gse_unit(record: Dict) -> str:
txt = " ".join(
[
str(record.get("Report", "")),
str(record.get("Root_Caused", "")),
str(record.get("Action_Taken", "")),
]
).lower()
mapping = {
"ATW": ["atw", "push back", "pushback"],
"GPU": ["gpu", "ground power unit"],
"Tractor/Tug": ["tractor", "tug", "towing"],
"Towbar": ["towbar", "tow bar"],
"Belt Loader": ["belt loader", "conveyor", "belt"],
"Baggage Cart": ["baggage cart", "cart", "trolley"],
"HLL Hose": ["hll", "hose", "selang"],
"Stair": ["stair", "passenger step", "tangga"],
"Dolly/Palette": ["pallet", "palette", "dolly"],
"Air Start Unit": ["asu", "air start unit"],
}
for unit, kws in mapping.items():
for kw in kws:
if kw in txt:
return unit
return "Other"
def _gse_status(record: Dict) -> str:
txt = " ".join(
[
str(record.get("Report", "")),
str(record.get("Root_Caused", "")),
str(record.get("Action_Taken", "")),
]
).lower()
if any(
k in txt
for k in [
"manual handling",
"manual tanpa alat",
"tanpa alat",
"manual bge",
"tanpa gse",
]
):
return "No Equipment/Manual"
if any(
k in txt
for k in [
"rfm",
"work order",
"wo ",
"wo-",
"maintenance",
"perawatan",
"perbaikan",
"servis",
"service",
"diperbaiki",
"menunggu spare",
"waiting spare",
]
):
return "Maintenance"
if any(
k in txt
for k in [
"unserviceable",
"u/s",
"tidak bisa",
"tidak berfungsi",
"macet",
"rusak",
"error",
"gagal",
"leak",
"bocor",
"flat",
"ban",
"trouble",
]
):
return "Unserviceable"
if any(
k in txt
for k in [
"serviceable",
"normal",
"ok",
"sudah diperbaiki",
"berfungsi",
"ready",
"siap",
]
):
return "Serviceable"
return "Unknown"
def _cgo_bucket(record: Dict) -> str:
txt = " ".join(
[
str(record.get("Report", "")),
str(record.get("Root_Caused", "")),
str(record.get("Action_Taken", "")),
]
).lower()
damage_kw = [
"damage",
"damaged",
"rusak",
"pecah",
"robek",
"torn",
"dented",
"broken",
"loss",
"hilang",
]
delivery_kw = [
"tidak terkirim",
"not delivered",
"misroute",
"missroute",
"missrout",
"salah alamat",
"wrong destination",
"door to door",
"d2d",
"keterlambatan kirim",
"late delivery",
"undelivered",
]
transport_kw = [
"transport",
"truck",
"truk",
"driver",
"kendaraan",
"armada",
"rute",
"route",
"jadwal",
"schedule",
"macet",
"traffic",
"bongkar muat",
"loading unloading",
"loading",
"unloading",
]
doc_kw = ["awb", "smu", "manifest", "dokumen", "document"]
if any(k in txt for k in damage_kw):
return "Damage"
if any(k in txt for k in delivery_kw):
return "Delivery"
if any(k in txt for k in transport_kw):
return "Transport"
if any(k in txt for k in doc_kw):
return "Documentation"
return "Other"
@app.get("/api/ai/cgo/irregularities/top")
async def cgo_irregularities_top(
bypass_cache: bool = False,
branch: Optional[str] = None,
top_k: int = 3,
examples_per_bucket: int = 2,
include_timeline: bool = True,
):
from data.sheets_service import GoogleSheetsService
from data.root_cause_service import get_root_cause_service
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
batch = sheets_service.fetch_sheets_batch_data(
spreadsheet_id,
[{"name": "CGO", "range": "A1:Z5000"}],
bypass_cache=bypass_cache,
)
data = batch.get("CGO", [])
if branch:
b = branch.strip().lower()
data = [r for r in data if (str(r.get("Branch", "")).strip().lower() == b)]
items = []
for r in data:
cat = str(r.get("Irregularity_Complain_Category", "")).strip().lower()
if cat in {"cargo", "cgo"} or "cargo" in (str(r.get("Report", ""))).lower():
items.append(r)
svc = get_root_cause_service()
rc_records = []
for r in items:
rc_records.append(
{
"_row_id": r.get("_row_id"),
"Report": str(r.get("Report", "")),
"Root_Caused": str(r.get("Root_Caused", "")),
}
)
rc_map: Dict[str, Dict[str, Any]] = {}
try:
rc_res = svc.classify_batch(rc_records)
rc_map = {x.get("row_id"): x for x in rc_res}
except Exception:
rc_map = {}
totals = {"Damage": 0, "Delivery": 0, "Transport": 0, "Documentation": 0, "Other": 0}
buckets: Dict[str, Dict[str, Any]] = {
k: {"areas": Counter(), "branches": Counter(), "airlines": Counter(), "keywords": Counter(), "examples": [], "months": Counter()}
for k in totals.keys()
}
damage_kw = ["damage", "damaged", "rusak", "pecah", "robek", "torn", "dented", "broken", "loss", "hilang"]
delivery_kw = ["tidak terkirim", "not delivered", "misroute", "missroute", "salah alamat", "wrong destination", "door to door", "d2d", "keterlambatan kirim", "late delivery", "undelivered"]
transport_kw = ["transport", "truck", "truk", "driver", "kendaraan", "armada", "rute", "route", "jadwal", "schedule", "macet", "traffic", "bongkar muat", "loading unloading", "loading", "unloading"]
doc_kw = ["awb", "smu", "manifest", "dokumen", "document"]
def _month_key(r: Dict[str, Any]) -> Optional[str]:
from datetime import datetime
val = r.get("Date_of_Event") or r.get("dateOfEvent") or r.get("Date")
if not val:
return None
s = str(val).strip()
fmts = ["%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y"]
for f in fmts:
try:
d = datetime.strptime(s, f)
return f"{d.year:04d}-{d.month:02d}"
except Exception:
continue
try:
d = datetime.fromisoformat(s)
return f"{d.year:04d}-{d.month:02d}"
except Exception:
return None
def _bucket_with_ml(r: Dict[str, Any]) -> str:
b0 = _cgo_bucket(r)
rid = r.get("_row_id")
rc = rc_map.get(rid)
if not rc:
return b0
cat = str(rc.get("primary_category", "")).lower()
conf = float(rc.get("confidence") or 0.0)
mlb = None
if any(k in cat for k in ["damage", "rusak", "pecah", "robek", "dented", "broken", "loss", "hilang"]):
mlb = "Damage"
elif any(k in cat for k in ["delivery", "delivered", "undelivered", "misroute", "missroute", "door to door", "wrong destination"]):
mlb = "Delivery"
elif any(k in cat for k in ["transport", "truck", "truk", "driver", "armada", "rute", "route"]):
mlb = "Transport"
elif any(k in cat for k in ["document", "awb", "smu", "manifest", "dokumen"]):
mlb = "Documentation"
if mlb and (rc.get("method") == "ml" or conf >= 0.6):
return mlb
return b0
for r in items:
bucket = _bucket_with_ml(r)
totals[bucket] = totals.get(bucket, 0) + 1
area = str(r.get("Area", "") or "Unknown")
br = str(r.get("Branch", "") or "Unknown")
al = str(r.get("Airlines", "") or "Unknown")
txt = " ".join([str(r.get("Report", "")), str(r.get("Root_Caused", "")), str(r.get("Action_Taken", ""))]).lower()
bdata = buckets[bucket]
bdata["areas"][area] += 1
bdata["branches"][br] += 1
bdata["airlines"][al] += 1
for kw in damage_kw:
if kw in txt and bucket == "Damage":
bdata["keywords"][kw] += 1
for kw in delivery_kw:
if kw in txt and bucket == "Delivery":
bdata["keywords"][kw] += 1
for kw in transport_kw:
if kw in txt and bucket == "Transport":
bdata["keywords"][kw] += 1
for kw in doc_kw:
if kw in txt and bucket == "Documentation":
bdata["keywords"][kw] += 1
if len(bdata["examples"]) < max(0, int(examples_per_bucket)):
bdata["examples"].append(
{
"row_id": r.get("_row_id"),
"report": str(r.get("Report", ""))[:180],
"root_cause": str(r.get("Root_Caused", ""))[:180],
"action_taken": str(r.get("Action_Taken", ""))[:180],
"area": area,
"branch": br,
"airlines": al,
}
)
if include_timeline:
mk = _month_key(r)
if mk:
bdata["months"][mk] += 1
total_count = sum(totals.values())
by_bucket: Dict[str, Any] = {}
for k in ("Damage", "Delivery", "Transport", "Documentation"):
c = totals.get(k, 0)
pct = round((c / max(total_count, 1)) * 100, 1)
b = buckets[k]
recs: List[str] = []
if b["keywords"]:
kw_items = list(b["keywords"].items())
rules: List[Tuple[List[str], str]] = []
if k == "Damage":
rules = [
(["rusak", "pecah", "robek", "torn", "dented", "fragile", "broken"], "Perkuat SOP handling fragile, tambah signage dan verifikasi kemasan."),
(["loss", "hilang"], "Perketat kontrol rantai serah terima dan segel kemasan."),
]
if k == "Delivery":
rules = [
(["tidak terkirim", "not delivered", "undelivered", "door to door"], "Perbaiki alur D2D dan monitoring SLA last-mile."),
(["misroute", "missroute", "wrong destination"], "Validasi alamat/routing dan digitalisasi manifest."),
]
if k == "Transport":
rules = [
(["truck", "truk", "driver", "armada"], "Perkuat kontrol armada dan ketersediaan driver."),
(["macet", "traffic", "rute", "route", "jadwal"], "Optimalkan rute/jadwal dan siapkan buffer waktu."),
(["loading", "unloading", "bongkar muat"], "Standarisasi proses bongkar muat dan alat bantu."),
]
if k == "Documentation":
rules = [
(["awb", "smu", "manifest", "dokumen", "document"], "Validasi AWB/SMU sebelum dispatch dan checklist dokumen."),
]
scored: List[Tuple[int, str]] = []
for kws, msg in rules:
s = 0
for kw, cnt in kw_items:
for key in kws:
if key in kw:
s += cnt
if s > 0:
scored.append((s, msg))
scored.sort(key=lambda x: -x[0])
recs = [m for _, m in scored[:max(1, min(3, top_k))]]
by_bucket[k] = {
"count": c,
"percentage": pct,
"top_areas": dict(b["areas"].most_common(top_k)),
"top_branches": dict(b["branches"].most_common(top_k)),
"top_airlines": dict(b["airlines"].most_common(top_k)),
"top_keywords": dict(b["keywords"].most_common(top_k)),
"examples": b["examples"],
}
if include_timeline:
by_bucket[k]["timeline_monthly"] = dict(sorted(b["months"].items()))
by_bucket[k]["recommendations"] = recs
sorted_items = sorted([(k, v) for k, v in by_bucket.items()], key=lambda x: -x[1]["count"])[:top_k]
top = [{"subcategory": k, "count": v["count"], "percentage": v["percentage"]} for k, v in sorted_items]
return {
"total_records": len(data),
"cargo_irregularity_records": len(items),
"by_bucket": by_bucket,
"top": top,
}
@app.get("/api/ai/cgo/production")
async def cgo_production_summary(
bypass_cache: bool = False,
branch: Optional[str] = None,
include_timeline: bool = True,
top_k: int = 5,
scan_rows: int = 1000,
):
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
batch = sheets_service.fetch_sheets_batch_data(
spreadsheet_id,
[{"name": "CGO", "range": f"A1:Z{scan_rows}"}],
bypass_cache=bypass_cache,
)
rows = batch.get("CGO", [])
if branch:
b = branch.strip().lower()
rows = [r for r in rows if (str(r.get("Branch", "")).strip().lower() == b)]
def _is_num(x: Any) -> bool:
try:
float(str(x).replace(",", "").strip())
return True
except Exception:
return False
headers = set()
for r in rows[:200]:
headers.update(r.keys())
cand_kw = ["ton", "tonnage", "kg", "weight", "berat", "volume", "qty", "jumlah", "pcs", "colly"]
num_cols = []
for h in headers:
hl = h.lower()
if any(k in hl for k in cand_kw):
vals = [r.get(h) for r in rows[:500]]
nums = [v for v in vals if _is_num(v)]
if len(nums) >= max(5, int(0.2 * max(1, len(vals)))):
num_cols.append(h)
totals: Dict[str, float] = {}
for h in num_cols:
s = 0.0
for r in rows:
v = r.get(h)
if _is_num(v):
try:
s += float(str(v).replace(",", "").strip())
except Exception:
continue
totals[h] = round(s, 2)
areas = Counter()
branches = Counter()
airlines = Counter()
routes = Counter()
months = Counter()
def _month_key(r: Dict[str, Any]) -> Optional[str]:
from datetime import datetime
val = r.get("Date_of_Event") or r.get("dateOfEvent") or r.get("Date")
if not val:
return None
s = str(val).strip()
fmts = ["%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y"]
for f in fmts:
try:
d = datetime.strptime(s, f)
return f"{d.year:04d}-{d.month:02d}"
except Exception:
continue
try:
d = datetime.fromisoformat(s)
return f"{d.year:04d}-{d.month:02d}"
except Exception:
return None
for r in rows:
areas[str(r.get("Area", "") or "Unknown")] += 1
branches[str(r.get("Branch", "") or "Unknown")] += 1
airlines[str(r.get("Airlines", "") or "Unknown")] += 1
routes[str(r.get("Route", "") or "Unknown")] += 1
if include_timeline:
mk = _month_key(r)
if mk:
months[mk] += 1
summary = {
"records_scanned": len(rows),
"numeric_totals": totals,
"top_areas": dict(areas.most_common(top_k)),
"top_branches": dict(branches.most_common(top_k)),
"top_airlines": dict(airlines.most_common(top_k)),
"top_routes": dict(routes.most_common(top_k)),
}
if include_timeline:
summary["timeline_monthly_count"] = dict(sorted(months.items()))
if not totals:
summary["note"] = "Tidak ditemukan kolom produksi numerik; menampilkan proxy hitungan baris."
return summary
@app.get("/api/ai/cgo/cases")
async def cgo_cases(
bypass_cache: bool = False,
branch: Optional[str] = None,
bucket: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_severity: bool = True,
include_root_cause: bool = True,
):
from data.sheets_service import GoogleSheetsService
from data.root_cause_service import get_root_cause_service
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
batch = sheets_service.fetch_sheets_batch_data(
spreadsheet_id,
[{"name": "CGO", "range": "A1:Z5000"}],
bypass_cache=bypass_cache,
)
rows = batch.get("CGO", [])
if branch:
b = branch.strip().lower()
rows = [r for r in rows if (str(r.get("Branch", "")).strip().lower() == b)]
cargo_rows = []
for r in rows:
cat = str(r.get("Irregularity_Complain_Category", "")).strip().lower()
if cat in {"cargo", "cgo"} or "cargo" in (str(r.get("Report", ""))).lower():
cargo_rows.append(r)
svc = get_root_cause_service()
rc_records = []
for r in cargo_rows:
rc_records.append(
{
"_row_id": r.get("_row_id"),
"Report": str(r.get("Report", "")),
"Root_Caused": str(r.get("Root_Caused", "")),
}
)
rc_map: Dict[str, Dict[str, Any]] = {}
try:
rc_res = svc.classify_batch(rc_records)
rc_map = {x.get("row_id"): x for x in rc_res}
except Exception:
rc_map = {}
def _bucket_with_ml(r: Dict[str, Any]) -> str:
b0 = _cgo_bucket(r)
rid = r.get("_row_id")
rc = rc_map.get(rid)
if not rc:
return b0
cat = str(rc.get("primary_category", "")).lower()
conf = float(rc.get("confidence") or 0.0)
mlb = None
if any(k in cat for k in ["damage", "rusak", "pecah", "robek", "dented", "broken", "loss", "hilang"]):
mlb = "Damage"
elif any(k in cat for k in ["delivery", "delivered", "undelivered", "misroute", "missroute", "door to door", "wrong destination"]):
mlb = "Delivery"
elif any(k in cat for k in ["transport", "truck", "truk", "driver", "armada", "rute", "route"]):
mlb = "Transport"
elif any(k in cat for k in ["document", "awb", "smu", "manifest", "dokumen"]):
mlb = "Documentation"
if mlb and (rc.get("method") == "ml" or conf >= 0.6):
return mlb
return b0
from datetime import datetime
def _parse_date(s: str) -> Optional[datetime]:
if not s:
return None
s = s.strip()
fmts = ["%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y"]
for f in fmts:
try:
return datetime.strptime(s, f)
except Exception:
continue
try:
return datetime.fromisoformat(s)
except Exception:
return None
df = _parse_date(date_from) if date_from else None
dt = _parse_date(date_to) if date_to else None
items: List[Dict[str, Any]] = []
for r in cargo_rows:
buck = _bucket_with_ml(r)
if bucket and buck.lower() != bucket.strip().lower():
continue
d = r.get("Date_of_Event") or r.get("dateOfEvent") or r.get("Date")
if df or dt:
pd = _parse_date(str(d)) if d else None
if df and (not pd or pd < df):
continue
if dt and (not pd or pd > dt):
continue
items.append(
{
"row_id": r.get("_row_id"),
"date": d,
"airlines": r.get("Airlines"),
"branch": r.get("Branch"),
"area": r.get("Area"),
"bucket": buck,
"report": str(r.get("Report", ""))[:220],
"root_cause_text": str(r.get("Root_Caused", ""))[:220],
}
)
total_filtered = len(items)
items = items[offset : offset + max(0, int(limit))]
if include_severity and items:
try:
from data.nlp_service import get_nlp_service
nlp = get_nlp_service()
texts = [f'{it.get("report","")}' for it in items]
sev = nlp.classify_severity(texts)
for it, sv in zip(items, sev):
it["severity"] = sv.get("severity")
it["severity_confidence"] = sv.get("confidence")
except Exception:
pass
if include_root_cause and items:
try:
svc2 = get_root_cause_service()
records = []
for it in items:
records.append(
{
"_row_id": it["row_id"],
"Report": it.get("report", ""),
"Root_Caused": it.get("root_cause_text", ""),
}
)
rc = svc2.classify_batch(records)
rc_map2 = {r.get("row_id"): r for r in rc}
for it in items:
rid = it["row_id"]
rcr = rc_map2.get(rid)
if rcr:
it["root_cause_primary"] = rcr.get("primary_category")
it["root_cause_confidence"] = rcr.get("confidence")
it["root_cause_method"] = rcr.get("method")
except Exception:
pass
return {
"total_records": len(rows),
"cargo_irregularity_records": len(cargo_rows),
"filtered_count": total_filtered,
"items": items,
}
@app.get("/api/ai/gse/serviceability")
async def get_gse_serviceability(
source: Optional[str] = None,
bypass_cache: bool = False,
branch: Optional[str] = None,
unit_filter: Optional[str] = None,
status_filter: Optional[str] = None,
top_k: int = 3,
examples_per_unit: int = 2,
include_timeline: bool = True,
):
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
data: List[Dict[str, Any]] = []
if source and source.upper() in {"NON CARGO", "CGO"}:
if source.upper() == "NON CARGO":
data.extend(
sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache
)
)
else:
data.extend(
sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache
)
)
else:
batch = sheets_service.fetch_sheets_batch_data(
spreadsheet_id,
[
{"name": "NON CARGO", "range": "A1:AA5000"},
{"name": "CGO", "range": "A1:Z5000"},
],
bypass_cache=bypass_cache,
)
data.extend(batch.get("NON CARGO", []))
data.extend(batch.get("CGO", []))
if branch:
b = branch.strip().lower()
data = [r for r in data if (str(r.get("Branch", "")).strip().lower() == b)]
gse_data = [
r
for r in data
if str(r.get("Irregularity_Complain_Category", "")).strip().lower() == "gse"
or " gse " in f" {str(r.get('Report','')).lower()} "
]
from datetime import datetime
def _month_key(r: Dict[str, Any]) -> Optional[str]:
val = r.get("Date_of_Event") or r.get("dateOfEvent") or r.get("Date")
if not val:
return None
s = str(val).strip()
fmts = ["%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y"]
for f in fmts:
try:
d = datetime.strptime(s, f)
return f"{d.year:04d}-{d.month:02d}"
except Exception:
continue
try:
d = datetime.fromisoformat(s)
return f"{d.year:04d}-{d.month:02d}"
except Exception:
return None
overall_status = Counter()
units: Dict[str, Dict[str, Any]] = {}
for r in gse_data:
unit = _gse_unit(r)
status = _gse_status(r)
overall_status[status] += 1
if unit_filter and unit.lower() != unit_filter.strip().lower():
continue
if status_filter and status.lower() != status_filter.strip().lower():
continue
if unit not in units:
units[unit] = {
"statuses": Counter(),
"areas": Counter(),
"branches": Counter(),
"airlines": Counter(),
"keywords": Counter(),
"examples": [],
"months": Counter(),
}
u = units[unit]
u["statuses"][status] += 1
u["areas"][str(r.get("Area", "") or "Unknown")] += 1
u["branches"][str(r.get("Branch", "") or "Unknown")] += 1
u["airlines"][str(r.get("Airlines", "") or "Unknown")] += 1
txt = " ".join(
[str(r.get("Report", "")), str(r.get("Root_Caused", "")), str(r.get("Action_Taken", ""))]
).lower()
for kw in ["rfm", "wo", "hose", "leak", "bocor", "engine", "atw", "gpu", "tractor", "towbar", "empty cart", "understaff", "manual"]:
if kw in txt:
u["keywords"][kw] += 1
if len(u["examples"]) < max(0, int(examples_per_unit)):
u["examples"].append(
{
"row_id": r.get("_row_id"),
"date": r.get("Date_of_Event") or r.get("dateOfEvent") or r.get("Date"),
"unit": unit,
"status": status,
"report": str(r.get("Report", ""))[:180],
"root_cause": str(r.get("Root_Caused", ""))[:180],
"action_taken": str(r.get("Action_Taken", ""))[:180],
"area": str(r.get("Area", "") or "Unknown"),
"branch": str(r.get("Branch", "") or "Unknown"),
"airlines": str(r.get("Airlines", "") or "Unknown"),
}
)
if include_timeline:
mk = _month_key(r)
if mk:
u["months"][mk] += 1
total_units = sum(sum(v["statuses"].values()) for v in units.values())
by_unit: Dict[str, Any] = {}
for unit, v in units.items():
cnt = sum(v["statuses"].values())
by_unit[unit] = {
"count": cnt,
"percentage": round((cnt / max(total_units, 1)) * 100, 1),
"statuses": dict(v["statuses"]),
"top_areas": dict(v["areas"].most_common(top_k)),
"top_branches": dict(v["branches"].most_common(top_k)),
"top_airlines": dict(v["airlines"].most_common(top_k)),
"top_keywords": dict(v["keywords"].most_common(top_k)),
"examples": v["examples"],
}
if include_timeline:
by_unit[unit]["timeline_monthly"] = dict(sorted(v["months"].items()))
top_units = sorted([(k, v["count"]) for k, v in by_unit.items()], key=lambda x: -x[1])[:top_k]
return {
"total_records": len(data),
"gse_records": len(gse_data),
"overall_status": dict(overall_status),
"by_unit": by_unit,
"top_units": top_units,
}
@app.get("/api/ai/gse/cases")
async def get_gse_cases(
source: Optional[str] = None,
bypass_cache: bool = False,
branch: Optional[str] = None,
unit_type: Optional[str] = None,
status: Optional[str] = None,
bucket: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
limit: int = 50,
offset: int = 0,
include_severity: bool = True,
include_root_cause: bool = True,
):
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
data: List[Dict[str, Any]] = []
if source and source.upper() in {"NON CARGO", "CGO"}:
if source.upper() == "NON CARGO":
data.extend(
sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache
)
)
else:
data.extend(
sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache
)
)
else:
batch = sheets_service.fetch_sheets_batch_data(
spreadsheet_id,
[
{"name": "NON CARGO", "range": "A1:AA5000"},
{"name": "CGO", "range": "A1:Z5000"},
],
bypass_cache=bypass_cache,
)
data.extend(batch.get("NON CARGO", []))
data.extend(batch.get("CGO", []))
if branch:
b = branch.strip().lower()
data = [r for r in data if (str(r.get("Branch", "")).strip().lower() == b)]
gse_data = [
r
for r in data
if str(r.get("Irregularity_Complain_Category", "")).strip().lower() == "gse"
or " gse " in f" {str(r.get('Report','')).lower()} "
]
from datetime import datetime
def _parse_date(s: str) -> Optional[datetime]:
if not s:
return None
s = s.strip()
fmts = ["%B %d, %Y", "%b %d, %Y", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y"]
for f in fmts:
try:
return datetime.strptime(s, f)
except Exception:
continue
try:
return datetime.fromisoformat(s)
except Exception:
return None
df = _parse_date(date_from) if date_from else None
dt = _parse_date(date_to) if date_to else None
items: List[Dict[str, Any]] = []
for r in gse_data:
unit = _gse_unit(r)
st = _gse_status(r)
buck = _gse_bucket(r)
if unit_type and unit.lower() != unit_type.strip().lower():
continue
if status and st.lower() != status.strip().lower():
continue
if bucket and buck.lower() != bucket.strip().lower():
continue
d = r.get("Date_of_Event") or r.get("dateOfEvent") or r.get("Date")
if df or dt:
pd = _parse_date(str(d)) if d else None
if df and (not pd or pd < df):
continue
if dt and (not pd or pd > dt):
continue
items.append(
{
"row_id": r.get("_row_id"),
"date": d,
"airlines": r.get("Airlines"),
"branch": r.get("Branch"),
"area": r.get("Area"),
"unit_type": unit,
"status": st,
"bucket": buck,
"report": str(r.get("Report", ""))[:220],
"root_cause_text": str(r.get("Root_Caused", ""))[:220],
"_idx": len(items),
}
)
total_filtered = len(items)
items = items[offset : offset + max(0, int(limit))]
if include_severity and items:
try:
from data.nlp_service import get_nlp_service
nlp = get_nlp_service()
texts = [f'{it.get("report","")}' for it in items]
sev = nlp.classify_severity(texts)
for it, sv in zip(items, sev):
it["severity"] = sv.get("severity")
it["severity_confidence"] = sv.get("confidence")
except Exception:
pass
if include_root_cause and items:
try:
from data.root_cause_service import get_root_cause_service
svc = get_root_cause_service()
records = []
for it in items:
records.append(
{
"_row_id": it["row_id"],
"Report": it.get("report", ""),
"Root_Caused": it.get("root_cause_text", ""),
}
)
rc = svc.classify_batch(records)
rc_map = {r.get("row_id"): r for r in rc}
for it in items:
rid = it["row_id"]
rcr = rc_map.get(rid)
if rcr:
it["root_cause_primary"] = rcr.get("primary_category")
it["root_cause_confidence"] = rcr.get("confidence")
it["root_cause_method"] = rcr.get("method")
except Exception:
pass
return {
"total_records": len(data),
"gse_records": len(gse_data),
"filtered_count": total_filtered,
"items": items,
}
@app.get("/api/ai/sheets/diagnostics")
async def sheets_diagnostics(bypass_cache: bool = False):
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
sample = sheets_service.fetch_sheets_batch_data(
spreadsheet_id,
[
{"name": "NON CARGO", "range": "A1:Z50"},
{"name": "CGO", "range": "A1:Z50"},
],
bypass_cache=bypass_cache,
)
def _headers(rows: List[Dict[str, Any]]) -> List[str]:
if not rows:
return []
return list(rows[0].keys())
return {
"authenticated": True,
"sheets": {
"NON CARGO": {
"rows": len(sample.get("NON CARGO", [])),
"headers": _headers(sample.get("NON CARGO", []))[:30],
},
"CGO": {
"rows": len(sample.get("CGO", [])),
"headers": _headers(sample.get("CGO", []))[:30],
},
},
}
# ============== Category Summarization Endpoints ==============
class CategorySummaryResponse(BaseModel):
status: str
category_type: str
summary: Dict[str, Any]
timestamp: str
@app.get("/api/ai/summarize", response_model=CategorySummaryResponse)
async def summarize_by_category(category: str = "all", bypass_cache: bool = False):
"""
Get summarized insights for Non-cargo and/or CGO categories
Query Parameters:
category: "non_cargo", "cgo", or "all" (default: "all")
bypass_cache: Skip cache and fetch fresh data (default: false)
Returns aggregated summary including:
- Severity distribution
- Top categories, airlines, hubs, branches
- Key insights and recommendations
- Common issues
- Monthly trends
"""
from data.category_summarization_service import get_category_summarization_service
from data.sheets_service import GoogleSheetsService
valid_categories = ["non_cargo", "cgo", "all"]
if category.lower() not in valid_categories:
raise HTTPException(
status_code=400,
detail=f"Invalid category. Must be one of: {valid_categories}",
)
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache
)
for row in non_cargo:
row["_sheet_name"] = "NON CARGO"
for row in cargo:
row["_sheet_name"] = "CGO"
all_data = non_cargo + cargo
summarization_service = get_category_summarization_service()
summary = summarization_service.summarize_category(all_data, category.lower())
return CategorySummaryResponse(
status="success",
category_type=category.lower(),
summary=summary,
timestamp=datetime.now().isoformat(),
)
@app.get("/api/ai/summarize/non-cargo")
async def summarize_non_cargo(bypass_cache: bool = False):
"""Quick endpoint for Non-cargo summary"""
return await summarize_by_category(category="non_cargo", bypass_cache=bypass_cache)
@app.get("/api/ai/summarize/cgo")
async def summarize_cgo(bypass_cache: bool = False):
"""Quick endpoint for CGO (Cargo) summary"""
return await summarize_by_category(category="cgo", bypass_cache=bypass_cache)
@app.get("/api/ai/summarize/compare")
async def compare_categories(bypass_cache: bool = False):
"""Compare Non-cargo and CGO categories side by side"""
return await summarize_by_category(category="all", bypass_cache=bypass_cache)
# ============== Branch Analytics Endpoints ==============
@app.get("/api/ai/branch/summary")
async def branch_analytics_summary(category_type: Optional[str] = None):
"""
Get branch analytics summary
Args:
category_type: "landside_airside", "cgo", or None for both
"""
from data.branch_analytics_service import get_branch_analytics_service
service = get_branch_analytics_service()
return service.get_summary(category_type)
@app.get("/api/ai/branch/{branch_name}")
async def get_branch_metrics(branch_name: str, category_type: Optional[str] = None):
"""
Get metrics for a specific branch
Args:
branch_name: Branch name
category_type: "landside_airside", "cgo", or None for combined
"""
from data.branch_analytics_service import get_branch_analytics_service
service = get_branch_analytics_service()
data = service.get_branch(branch_name, category_type)
if not data:
raise HTTPException(status_code=404, detail=f"Branch '{branch_name}' not found")
return data
@app.get("/api/ai/branch/ranking")
async def branch_ranking(
category_type: Optional[str] = None,
sort_by: str = "risk_score",
limit: int = 20,
):
"""
Get branch ranking
Args:
category_type: "landside_airside", "cgo", or None for both
sort_by: Field to sort by (risk_score, total_issues, critical_high_count)
limit: Maximum branches to return
"""
from data.branch_analytics_service import get_branch_analytics_service
service = get_branch_analytics_service()
return service.get_ranking(category_type, sort_by, limit)
@app.get("/api/ai/branch/comparison")
async def branch_comparison():
"""Compare all branches across category types"""
from data.branch_analytics_service import get_branch_analytics_service
service = get_branch_analytics_service()
return service.get_comparison()
@app.post("/api/ai/branch/calculate")
async def calculate_branch_metrics(bypass_cache: bool = False):
"""Calculate branch metrics from Google Sheets data"""
from data.branch_analytics_service import get_branch_analytics_service
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "NON CARGO", "A1:AA5000", bypass_cache=bypass_cache
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", "A1:Z5000", bypass_cache=bypass_cache
)
for row in non_cargo:
row["_sheet_name"] = "NON CARGO"
for row in cargo:
row["_sheet_name"] = "CGO"
all_data = non_cargo + cargo
service = get_branch_analytics_service()
result = service.calculate_branch_metrics(all_data)
return {
"status": "success",
"records_processed": len(all_data),
"metrics": result,
}
# ============== Main ==============
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("API_PORT", 8000))
uvicorn.run(app, host="0.0.0.0", port=port)