gapura-ai-api / data /forecast_service.py
Muhammad Ridzki Nugraha
Upload folder using huggingface_hub
13c3f2c verified
"""
Forecasting Service for Gapura AI
Predicts future issue volumes and trends
"""
import os
import logging
import pickle
from typing import List, Dict, Any, Optional, Tuple
from collections import Counter, defaultdict
from datetime import datetime, timedelta
import math
logger = logging.getLogger(__name__)
class ForecastingService:
"""
Time series forecasting for irregularity reports
Features:
- Issue volume forecasting
- Category trend prediction
- Seasonal pattern detection
- Anomaly detection in trends
"""
def __init__(self):
self.historical_data = {}
self.forecast_models = {}
self._load_data()
def _load_data(self):
"""Load historical data and models"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_path = os.path.join(base_dir, "models", "forecast_data.pkl")
if os.path.exists(data_path):
try:
with open(data_path, "rb") as f:
data = pickle.load(f)
self.historical_data = data.get("historical_data", {})
self.forecast_models = data.get("forecast_models", {})
logger.info("Forecast data loaded")
except Exception as e:
logger.warning(f"Failed to load forecast data: {e}")
def _save_data(self):
"""Save historical data and models"""
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
data_path = os.path.join(base_dir, "models", "forecast_data.pkl")
os.makedirs(os.path.dirname(data_path), exist_ok=True)
with open(data_path, "wb") as f:
pickle.dump(
{
"historical_data": self.historical_data,
"forecast_models": self.forecast_models,
},
f,
)
def _parse_date(self, date_str: str) -> Optional[Tuple[int, int, int]]:
"""Parse date string to (year, month, week)"""
if not date_str:
return None
try:
import pandas as pd
dt = pd.to_datetime(date_str, errors="coerce")
if pd.isna(dt):
return None
return (dt.year, dt.month, dt.isocalendar().week)
except:
return None
def build_historical_data(self, records: List[Dict]):
"""
Build historical data from records
Args:
records: List of report dictionaries
"""
logger.info(f"Building historical data from {len(records)} records...")
# Aggregate by time periods
monthly_data = defaultdict(lambda: defaultdict(int))
weekly_data = defaultdict(lambda: defaultdict(int))
category_monthly = defaultdict(lambda: defaultdict(int))
airline_monthly = defaultdict(lambda: defaultdict(int))
for record in records:
date_str = record.get("Date_of_Event", "")
parsed = self._parse_date(date_str)
if not parsed:
continue
year, month, week = parsed
month_key = f"{year}-{month:02d}"
week_key = f"{year}-W{week:02d}"
category = record.get("Irregularity_Complain_Category", "Unknown")
airline = record.get("Airlines", "Unknown")
monthly_data[month_key]["total"] += 1
weekly_data[week_key]["total"] += 1
category_monthly[month_key][category] += 1
airline_monthly[month_key][airline] += 1
# Store
self.historical_data = {
"monthly": dict(monthly_data),
"weekly": dict(weekly_data),
"category_monthly": dict(category_monthly),
"airline_monthly": dict(airline_monthly),
"last_updated": datetime.now().isoformat(),
}
self._save_data()
logger.info("Historical data built and saved")
def forecast_issues(self, periods: int = 4) -> Dict[str, Any]:
"""
Forecast issue volume for next periods
Args:
periods: Number of periods to forecast (weeks)
Returns:
Dict with forecast values and confidence intervals
"""
if "weekly" not in self.historical_data:
return {"error": "No historical data available"}
weekly = self.historical_data["weekly"]
# Sort by week
sorted_weeks = sorted(weekly.keys())
if len(sorted_weeks) < 3:
return {"error": "Not enough historical data for forecasting"}
# Get values
values = [weekly[w].get("total", 0) for w in sorted_weeks]
# Simple moving average forecast
window = min(4, len(values))
avg = sum(values[-window:]) / window
# Calculate volatility for confidence interval
if len(values) >= 2:
diffs = [abs(values[i] - values[i - 1]) for i in range(1, len(values))]
volatility = sum(diffs) / len(diffs)
else:
volatility = avg * 0.3
# Generate forecast
forecasts = []
for i in range(periods):
# Add slight trend adjustment
trend = 0.02 * i * avg # 2% growth per period
forecast_val = avg + trend
lower = max(0, forecast_val - volatility * 1.5)
upper = forecast_val + volatility * 1.5
forecasts.append(
{
"period": i + 1,
"predicted": round(forecast_val),
"lower_bound": round(lower),
"upper_bound": round(upper),
"confidence": max(0.5, 0.9 - i * 0.1),
}
)
return {
"forecasts": forecasts,
"method": "moving_average",
"baseline": round(avg, 1),
"volatility": round(volatility, 1),
"historical_periods": len(sorted_weeks),
}
def predict_category_trends(self) -> Dict[str, Any]:
"""
Predict which categories will increase/decrease
Returns:
Dict with category trend predictions
"""
if "category_monthly" not in self.historical_data:
return {"error": "No historical data available"}
category_data = self.historical_data["category_monthly"]
# Sort months
sorted_months = sorted(category_data.keys())
if len(sorted_months) < 2:
return {"error": "Not enough data for trend analysis"}
# Calculate trends
trends = {}
for category in set(
cat for month in category_data.values() for cat in month.keys()
):
values = [category_data[m].get(category, 0) for m in sorted_months]
if sum(values) == 0:
continue
# Calculate trend direction
if len(values) >= 3:
recent = sum(values[-2:]) / 2
earlier = sum(values[:-2]) / max(len(values) - 2, 1)
if earlier > 0:
change_pct = ((recent - earlier) / earlier) * 100
else:
change_pct = 100 if recent > 0 else 0
if change_pct > 20:
direction = "increasing"
elif change_pct < -20:
direction = "decreasing"
else:
direction = "stable"
else:
direction = "unknown"
change_pct = 0
trends[category] = {
"total_issues": sum(values),
"trend_direction": direction,
"change_percentage": round(change_pct, 1),
"recent_avg": round(values[-1], 1) if values else 0,
}
# Sort by total issues
sorted_trends = dict(
sorted(trends.items(), key=lambda x: -x[1]["total_issues"])
)
return {
"trends": sorted_trends,
"increasing": [
k for k, v in trends.items() if v["trend_direction"] == "increasing"
],
"decreasing": [
k for k, v in trends.items() if v["trend_direction"] == "decreasing"
],
"stable": [
k for k, v in trends.items() if v["trend_direction"] == "stable"
],
}
def get_seasonal_patterns(self) -> Dict[str, Any]:
"""
Detect seasonal patterns in the data
Returns:
Dict with seasonal patterns
"""
if "monthly" not in self.historical_data:
return {"error": "No historical data available"}
monthly = self.historical_data["monthly"]
# Aggregate by month number
month_totals = defaultdict(int)
month_counts = defaultdict(int)
for month_key, data in monthly.items():
try:
month_num = int(month_key.split("-")[1])
month_totals[month_num] += data.get("total", 0)
month_counts[month_num] += 1
except:
continue
# Calculate averages
month_averages = {}
for month in range(1, 13):
if month_counts[month] > 0:
month_averages[month] = month_totals[month] / month_counts[month]
# Find peak and low months
if month_averages:
sorted_months = sorted(month_averages.items(), key=lambda x: -x[1])
peak_months = [m[0] for m in sorted_months[:3]]
low_months = [m[0] for m in sorted_months[-3:]]
else:
peak_months = []
low_months = []
month_names = {
1: "January",
2: "February",
3: "March",
4: "April",
5: "May",
6: "June",
7: "July",
8: "August",
9: "September",
10: "October",
11: "November",
12: "December",
}
return {
"monthly_averages": {
month_names.get(m, str(m)): round(avg, 1)
for m, avg in month_averages.items()
},
"peak_months": [month_names.get(m, str(m)) for m in peak_months],
"low_months": [month_names.get(m, str(m)) for m in low_months],
}
def get_forecast_summary(self) -> Dict[str, Any]:
"""Get comprehensive forecast summary"""
issue_forecast = self.forecast_issues()
category_trends = self.predict_category_trends()
seasonal = self.get_seasonal_patterns()
return {
"issue_forecast": issue_forecast.get("forecasts", [])[:4],
"category_trends": category_trends.get("trends", {}),
"increasing_categories": category_trends.get("increasing", []),
"seasonal_patterns": seasonal,
"last_updated": self.historical_data.get("last_updated"),
}
_forecast_service: Optional[ForecastingService] = None
def get_forecast_service() -> ForecastingService:
"""Get singleton forecast service instance"""
global _forecast_service
if _forecast_service is None:
_forecast_service = ForecastingService()
return _forecast_service