Spaces:
Build error
Build error
| """ | |
| Forecasting Service for Gapura AI | |
| Predicts future issue volumes and trends | |
| """ | |
| import os | |
| import logging | |
| import pickle | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from collections import Counter, defaultdict | |
| from datetime import datetime, timedelta | |
| import math | |
| logger = logging.getLogger(__name__) | |
| class ForecastingService: | |
| """ | |
| Time series forecasting for irregularity reports | |
| Features: | |
| - Issue volume forecasting | |
| - Category trend prediction | |
| - Seasonal pattern detection | |
| - Anomaly detection in trends | |
| """ | |
| def __init__(self): | |
| self.historical_data = {} | |
| self.forecast_models = {} | |
| self._load_data() | |
| def _load_data(self): | |
| """Load historical data and models""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| data_path = os.path.join(base_dir, "models", "forecast_data.pkl") | |
| if os.path.exists(data_path): | |
| try: | |
| with open(data_path, "rb") as f: | |
| data = pickle.load(f) | |
| self.historical_data = data.get("historical_data", {}) | |
| self.forecast_models = data.get("forecast_models", {}) | |
| logger.info("Forecast data loaded") | |
| except Exception as e: | |
| logger.warning(f"Failed to load forecast data: {e}") | |
| def _save_data(self): | |
| """Save historical data and models""" | |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| data_path = os.path.join(base_dir, "models", "forecast_data.pkl") | |
| os.makedirs(os.path.dirname(data_path), exist_ok=True) | |
| with open(data_path, "wb") as f: | |
| pickle.dump( | |
| { | |
| "historical_data": self.historical_data, | |
| "forecast_models": self.forecast_models, | |
| }, | |
| f, | |
| ) | |
| def _parse_date(self, date_str: str) -> Optional[Tuple[int, int, int]]: | |
| """Parse date string to (year, month, week)""" | |
| if not date_str: | |
| return None | |
| try: | |
| import pandas as pd | |
| dt = pd.to_datetime(date_str, errors="coerce") | |
| if pd.isna(dt): | |
| return None | |
| return (dt.year, dt.month, dt.isocalendar().week) | |
| except: | |
| return None | |
| def build_historical_data(self, records: List[Dict]): | |
| """ | |
| Build historical data from records | |
| Args: | |
| records: List of report dictionaries | |
| """ | |
| logger.info(f"Building historical data from {len(records)} records...") | |
| # Aggregate by time periods | |
| monthly_data = defaultdict(lambda: defaultdict(int)) | |
| weekly_data = defaultdict(lambda: defaultdict(int)) | |
| category_monthly = defaultdict(lambda: defaultdict(int)) | |
| airline_monthly = defaultdict(lambda: defaultdict(int)) | |
| for record in records: | |
| date_str = record.get("Date_of_Event", "") | |
| parsed = self._parse_date(date_str) | |
| if not parsed: | |
| continue | |
| year, month, week = parsed | |
| month_key = f"{year}-{month:02d}" | |
| week_key = f"{year}-W{week:02d}" | |
| category = record.get("Irregularity_Complain_Category", "Unknown") | |
| airline = record.get("Airlines", "Unknown") | |
| monthly_data[month_key]["total"] += 1 | |
| weekly_data[week_key]["total"] += 1 | |
| category_monthly[month_key][category] += 1 | |
| airline_monthly[month_key][airline] += 1 | |
| # Store | |
| self.historical_data = { | |
| "monthly": dict(monthly_data), | |
| "weekly": dict(weekly_data), | |
| "category_monthly": dict(category_monthly), | |
| "airline_monthly": dict(airline_monthly), | |
| "last_updated": datetime.now().isoformat(), | |
| } | |
| self._save_data() | |
| logger.info("Historical data built and saved") | |
| def forecast_issues(self, periods: int = 4) -> Dict[str, Any]: | |
| """ | |
| Forecast issue volume for next periods | |
| Args: | |
| periods: Number of periods to forecast (weeks) | |
| Returns: | |
| Dict with forecast values and confidence intervals | |
| """ | |
| if "weekly" not in self.historical_data: | |
| return {"error": "No historical data available"} | |
| weekly = self.historical_data["weekly"] | |
| # Sort by week | |
| sorted_weeks = sorted(weekly.keys()) | |
| if len(sorted_weeks) < 3: | |
| return {"error": "Not enough historical data for forecasting"} | |
| # Get values | |
| values = [weekly[w].get("total", 0) for w in sorted_weeks] | |
| # Simple moving average forecast | |
| window = min(4, len(values)) | |
| avg = sum(values[-window:]) / window | |
| # Calculate volatility for confidence interval | |
| if len(values) >= 2: | |
| diffs = [abs(values[i] - values[i - 1]) for i in range(1, len(values))] | |
| volatility = sum(diffs) / len(diffs) | |
| else: | |
| volatility = avg * 0.3 | |
| # Generate forecast | |
| forecasts = [] | |
| for i in range(periods): | |
| # Add slight trend adjustment | |
| trend = 0.02 * i * avg # 2% growth per period | |
| forecast_val = avg + trend | |
| lower = max(0, forecast_val - volatility * 1.5) | |
| upper = forecast_val + volatility * 1.5 | |
| forecasts.append( | |
| { | |
| "period": i + 1, | |
| "predicted": round(forecast_val), | |
| "lower_bound": round(lower), | |
| "upper_bound": round(upper), | |
| "confidence": max(0.5, 0.9 - i * 0.1), | |
| } | |
| ) | |
| return { | |
| "forecasts": forecasts, | |
| "method": "moving_average", | |
| "baseline": round(avg, 1), | |
| "volatility": round(volatility, 1), | |
| "historical_periods": len(sorted_weeks), | |
| } | |
| def predict_category_trends(self) -> Dict[str, Any]: | |
| """ | |
| Predict which categories will increase/decrease | |
| Returns: | |
| Dict with category trend predictions | |
| """ | |
| if "category_monthly" not in self.historical_data: | |
| return {"error": "No historical data available"} | |
| category_data = self.historical_data["category_monthly"] | |
| # Sort months | |
| sorted_months = sorted(category_data.keys()) | |
| if len(sorted_months) < 2: | |
| return {"error": "Not enough data for trend analysis"} | |
| # Calculate trends | |
| trends = {} | |
| for category in set( | |
| cat for month in category_data.values() for cat in month.keys() | |
| ): | |
| values = [category_data[m].get(category, 0) for m in sorted_months] | |
| if sum(values) == 0: | |
| continue | |
| # Calculate trend direction | |
| if len(values) >= 3: | |
| recent = sum(values[-2:]) / 2 | |
| earlier = sum(values[:-2]) / max(len(values) - 2, 1) | |
| if earlier > 0: | |
| change_pct = ((recent - earlier) / earlier) * 100 | |
| else: | |
| change_pct = 100 if recent > 0 else 0 | |
| if change_pct > 20: | |
| direction = "increasing" | |
| elif change_pct < -20: | |
| direction = "decreasing" | |
| else: | |
| direction = "stable" | |
| else: | |
| direction = "unknown" | |
| change_pct = 0 | |
| trends[category] = { | |
| "total_issues": sum(values), | |
| "trend_direction": direction, | |
| "change_percentage": round(change_pct, 1), | |
| "recent_avg": round(values[-1], 1) if values else 0, | |
| } | |
| # Sort by total issues | |
| sorted_trends = dict( | |
| sorted(trends.items(), key=lambda x: -x[1]["total_issues"]) | |
| ) | |
| return { | |
| "trends": sorted_trends, | |
| "increasing": [ | |
| k for k, v in trends.items() if v["trend_direction"] == "increasing" | |
| ], | |
| "decreasing": [ | |
| k for k, v in trends.items() if v["trend_direction"] == "decreasing" | |
| ], | |
| "stable": [ | |
| k for k, v in trends.items() if v["trend_direction"] == "stable" | |
| ], | |
| } | |
| def get_seasonal_patterns(self) -> Dict[str, Any]: | |
| """ | |
| Detect seasonal patterns in the data | |
| Returns: | |
| Dict with seasonal patterns | |
| """ | |
| if "monthly" not in self.historical_data: | |
| return {"error": "No historical data available"} | |
| monthly = self.historical_data["monthly"] | |
| # Aggregate by month number | |
| month_totals = defaultdict(int) | |
| month_counts = defaultdict(int) | |
| for month_key, data in monthly.items(): | |
| try: | |
| month_num = int(month_key.split("-")[1]) | |
| month_totals[month_num] += data.get("total", 0) | |
| month_counts[month_num] += 1 | |
| except: | |
| continue | |
| # Calculate averages | |
| month_averages = {} | |
| for month in range(1, 13): | |
| if month_counts[month] > 0: | |
| month_averages[month] = month_totals[month] / month_counts[month] | |
| # Find peak and low months | |
| if month_averages: | |
| sorted_months = sorted(month_averages.items(), key=lambda x: -x[1]) | |
| peak_months = [m[0] for m in sorted_months[:3]] | |
| low_months = [m[0] for m in sorted_months[-3:]] | |
| else: | |
| peak_months = [] | |
| low_months = [] | |
| month_names = { | |
| 1: "January", | |
| 2: "February", | |
| 3: "March", | |
| 4: "April", | |
| 5: "May", | |
| 6: "June", | |
| 7: "July", | |
| 8: "August", | |
| 9: "September", | |
| 10: "October", | |
| 11: "November", | |
| 12: "December", | |
| } | |
| return { | |
| "monthly_averages": { | |
| month_names.get(m, str(m)): round(avg, 1) | |
| for m, avg in month_averages.items() | |
| }, | |
| "peak_months": [month_names.get(m, str(m)) for m in peak_months], | |
| "low_months": [month_names.get(m, str(m)) for m in low_months], | |
| } | |
| def get_forecast_summary(self) -> Dict[str, Any]: | |
| """Get comprehensive forecast summary""" | |
| issue_forecast = self.forecast_issues() | |
| category_trends = self.predict_category_trends() | |
| seasonal = self.get_seasonal_patterns() | |
| return { | |
| "issue_forecast": issue_forecast.get("forecasts", [])[:4], | |
| "category_trends": category_trends.get("trends", {}), | |
| "increasing_categories": category_trends.get("increasing", []), | |
| "seasonal_patterns": seasonal, | |
| "last_updated": self.historical_data.get("last_updated"), | |
| } | |
| _forecast_service: Optional[ForecastingService] = None | |
| def get_forecast_service() -> ForecastingService: | |
| """Get singleton forecast service instance""" | |
| global _forecast_service | |
| if _forecast_service is None: | |
| _forecast_service = ForecastingService() | |
| return _forecast_service | |