""" Forecasting Service for Gapura AI Predicts future issue volumes and trends """ import os import logging import pickle from typing import List, Dict, Any, Optional, Tuple from collections import Counter, defaultdict from datetime import datetime, timedelta import math logger = logging.getLogger(__name__) class ForecastingService: """ Time series forecasting for irregularity reports Features: - Issue volume forecasting - Category trend prediction - Seasonal pattern detection - Anomaly detection in trends """ def __init__(self): self.historical_data = {} self.forecast_models = {} self._load_data() def _load_data(self): """Load historical data and models""" base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) data_path = os.path.join(base_dir, "models", "forecast_data.pkl") if os.path.exists(data_path): try: with open(data_path, "rb") as f: data = pickle.load(f) self.historical_data = data.get("historical_data", {}) self.forecast_models = data.get("forecast_models", {}) logger.info("Forecast data loaded") except Exception as e: logger.warning(f"Failed to load forecast data: {e}") def _save_data(self): """Save historical data and models""" base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) data_path = os.path.join(base_dir, "models", "forecast_data.pkl") os.makedirs(os.path.dirname(data_path), exist_ok=True) with open(data_path, "wb") as f: pickle.dump( { "historical_data": self.historical_data, "forecast_models": self.forecast_models, }, f, ) def _parse_date(self, date_str: str) -> Optional[Tuple[int, int, int]]: """Parse date string to (year, month, week)""" if not date_str: return None try: import pandas as pd dt = pd.to_datetime(date_str, errors="coerce") if pd.isna(dt): return None return (dt.year, dt.month, dt.isocalendar().week) except: return None def build_historical_data(self, records: List[Dict]): """ Build historical data from records Args: records: List of report dictionaries """ logger.info(f"Building historical data from {len(records)} records...") # Aggregate by time periods monthly_data = defaultdict(lambda: defaultdict(int)) weekly_data = defaultdict(lambda: defaultdict(int)) category_monthly = defaultdict(lambda: defaultdict(int)) airline_monthly = defaultdict(lambda: defaultdict(int)) for record in records: date_str = record.get("Date_of_Event", "") parsed = self._parse_date(date_str) if not parsed: continue year, month, week = parsed month_key = f"{year}-{month:02d}" week_key = f"{year}-W{week:02d}" category = record.get("Irregularity_Complain_Category", "Unknown") airline = record.get("Airlines", "Unknown") monthly_data[month_key]["total"] += 1 weekly_data[week_key]["total"] += 1 category_monthly[month_key][category] += 1 airline_monthly[month_key][airline] += 1 # Store self.historical_data = { "monthly": dict(monthly_data), "weekly": dict(weekly_data), "category_monthly": dict(category_monthly), "airline_monthly": dict(airline_monthly), "last_updated": datetime.now().isoformat(), } self._save_data() logger.info("Historical data built and saved") def forecast_issues(self, periods: int = 4) -> Dict[str, Any]: """ Forecast issue volume for next periods Args: periods: Number of periods to forecast (weeks) Returns: Dict with forecast values and confidence intervals """ if "weekly" not in self.historical_data: return {"error": "No historical data available"} weekly = self.historical_data["weekly"] # Sort by week sorted_weeks = sorted(weekly.keys()) if len(sorted_weeks) < 3: return {"error": "Not enough historical data for forecasting"} # Get values values = [weekly[w].get("total", 0) for w in sorted_weeks] # Simple moving average forecast window = min(4, len(values)) avg = sum(values[-window:]) / window # Calculate volatility for confidence interval if len(values) >= 2: diffs = [abs(values[i] - values[i - 1]) for i in range(1, len(values))] volatility = sum(diffs) / len(diffs) else: volatility = avg * 0.3 # Generate forecast forecasts = [] for i in range(periods): # Add slight trend adjustment trend = 0.02 * i * avg # 2% growth per period forecast_val = avg + trend lower = max(0, forecast_val - volatility * 1.5) upper = forecast_val + volatility * 1.5 forecasts.append( { "period": i + 1, "predicted": round(forecast_val), "lower_bound": round(lower), "upper_bound": round(upper), "confidence": max(0.5, 0.9 - i * 0.1), } ) return { "forecasts": forecasts, "method": "moving_average", "baseline": round(avg, 1), "volatility": round(volatility, 1), "historical_periods": len(sorted_weeks), } def predict_category_trends(self) -> Dict[str, Any]: """ Predict which categories will increase/decrease Returns: Dict with category trend predictions """ if "category_monthly" not in self.historical_data: return {"error": "No historical data available"} category_data = self.historical_data["category_monthly"] # Sort months sorted_months = sorted(category_data.keys()) if len(sorted_months) < 2: return {"error": "Not enough data for trend analysis"} # Calculate trends trends = {} for category in set( cat for month in category_data.values() for cat in month.keys() ): values = [category_data[m].get(category, 0) for m in sorted_months] if sum(values) == 0: continue # Calculate trend direction if len(values) >= 3: recent = sum(values[-2:]) / 2 earlier = sum(values[:-2]) / max(len(values) - 2, 1) if earlier > 0: change_pct = ((recent - earlier) / earlier) * 100 else: change_pct = 100 if recent > 0 else 0 if change_pct > 20: direction = "increasing" elif change_pct < -20: direction = "decreasing" else: direction = "stable" else: direction = "unknown" change_pct = 0 trends[category] = { "total_issues": sum(values), "trend_direction": direction, "change_percentage": round(change_pct, 1), "recent_avg": round(values[-1], 1) if values else 0, } # Sort by total issues sorted_trends = dict( sorted(trends.items(), key=lambda x: -x[1]["total_issues"]) ) return { "trends": sorted_trends, "increasing": [ k for k, v in trends.items() if v["trend_direction"] == "increasing" ], "decreasing": [ k for k, v in trends.items() if v["trend_direction"] == "decreasing" ], "stable": [ k for k, v in trends.items() if v["trend_direction"] == "stable" ], } def get_seasonal_patterns(self) -> Dict[str, Any]: """ Detect seasonal patterns in the data Returns: Dict with seasonal patterns """ if "monthly" not in self.historical_data: return {"error": "No historical data available"} monthly = self.historical_data["monthly"] # Aggregate by month number month_totals = defaultdict(int) month_counts = defaultdict(int) for month_key, data in monthly.items(): try: month_num = int(month_key.split("-")[1]) month_totals[month_num] += data.get("total", 0) month_counts[month_num] += 1 except: continue # Calculate averages month_averages = {} for month in range(1, 13): if month_counts[month] > 0: month_averages[month] = month_totals[month] / month_counts[month] # Find peak and low months if month_averages: sorted_months = sorted(month_averages.items(), key=lambda x: -x[1]) peak_months = [m[0] for m in sorted_months[:3]] low_months = [m[0] for m in sorted_months[-3:]] else: peak_months = [] low_months = [] month_names = { 1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June", 7: "July", 8: "August", 9: "September", 10: "October", 11: "November", 12: "December", } return { "monthly_averages": { month_names.get(m, str(m)): round(avg, 1) for m, avg in month_averages.items() }, "peak_months": [month_names.get(m, str(m)) for m in peak_months], "low_months": [month_names.get(m, str(m)) for m in low_months], } def get_forecast_summary(self) -> Dict[str, Any]: """Get comprehensive forecast summary""" issue_forecast = self.forecast_issues() category_trends = self.predict_category_trends() seasonal = self.get_seasonal_patterns() return { "issue_forecast": issue_forecast.get("forecasts", [])[:4], "category_trends": category_trends.get("trends", {}), "increasing_categories": category_trends.get("increasing", []), "seasonal_patterns": seasonal, "last_updated": self.historical_data.get("last_updated"), } _forecast_service: Optional[ForecastingService] = None def get_forecast_service() -> ForecastingService: """Get singleton forecast service instance""" global _forecast_service if _forecast_service is None: _forecast_service = ForecastingService() return _forecast_service