Spaces:
Paused
Paused
| import math | |
| from typing import Dict, List, Optional, Tuple | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| from scipy import stats | |
| try: | |
| from statsmodels.tsa.holtwinters import ExponentialSmoothing | |
| from statsmodels.tsa.stattools import adfuller | |
| HAS_STATSMODELS = True | |
| except ImportError: | |
| HAS_STATSMODELS = False | |
| DATE_HINTS = ("date", "time", "month", "year", "period") | |
| METRIC_HINTS = ("sale", "amount", "revenue", "profit", "price", "total") | |
| MAX_POINTS = 60 | |
| def _detect_datetime_column(df: pd.DataFrame) -> Optional[str]: | |
| # Prefer columns already datetime typed | |
| for col in df.columns: | |
| if pd.api.types.is_datetime64_any_dtype(df[col]): | |
| return col | |
| # Fallback to columns whose names hint at date/time | |
| for col in df.columns: | |
| low = col.lower() | |
| if any(hint in low for hint in DATE_HINTS): | |
| try: | |
| pd.to_datetime(df[col]) | |
| return col | |
| except Exception: | |
| continue | |
| return None | |
| def _detect_metric_column(df: pd.DataFrame) -> Optional[str]: | |
| numeric_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])] | |
| if not numeric_cols: | |
| return None | |
| for col in numeric_cols: | |
| low = col.lower() | |
| if any(hint in low for hint in METRIC_HINTS): | |
| return col | |
| return numeric_cols[0] | |
| def _build_time_series(df: pd.DataFrame) -> Optional[pd.Series]: | |
| if df is None or df.empty: | |
| return None | |
| date_col = _detect_datetime_column(df) | |
| metric_col = _detect_metric_column(df) | |
| if not date_col or not metric_col: | |
| return None | |
| ts = df[[date_col, metric_col]].copy() | |
| ts[date_col] = pd.to_datetime(ts[date_col], errors="coerce") | |
| ts = ts.dropna(subset=[date_col, metric_col]) | |
| if ts.empty: | |
| return None | |
| # Aggregate by month for smoother signals | |
| ts["period"] = ts[date_col].dt.to_period("M").dt.to_timestamp() | |
| grouped = ts.groupby("period")[metric_col].sum().sort_index() | |
| if len(grouped) > MAX_POINTS: | |
| grouped = grouped[-MAX_POINTS:] | |
| return grouped | |
| def _linear_trend(series: pd.Series) -> Optional[Dict[str, object]]: | |
| if series is None or len(series) < 3: | |
| return None | |
| x = np.arange(len(series)) | |
| y = series.values.astype(float) | |
| # Linear regression with confidence intervals | |
| slope, intercept = np.polyfit(x, y, 1) | |
| y_pred = slope * x + intercept | |
| # Calculate standard error and confidence intervals | |
| residuals = y - y_pred | |
| n = len(y) | |
| degrees_freedom = n - 2 | |
| residual_std_error = np.sqrt(np.sum(residuals**2) / degrees_freedom) if degrees_freedom > 0 else 0 | |
| # Standard error of slope | |
| x_mean = np.mean(x) | |
| se_slope = residual_std_error / np.sqrt(np.sum((x - x_mean)**2)) if np.sum((x - x_mean)**2) > 0 else 0 | |
| # 95% confidence interval for slope | |
| t_val = stats.t.ppf(0.975, degrees_freedom) if degrees_freedom > 0 else 1.96 | |
| slope_ci_lower = slope - t_val * se_slope | |
| slope_ci_upper = slope + t_val * se_slope | |
| # Prediction intervals for the trend line | |
| prediction_intervals = [] | |
| for i in range(len(x)): | |
| se_pred = residual_std_error * np.sqrt(1 + 1/n + (x[i] - x_mean)**2 / np.sum((x - x_mean)**2)) | |
| pi_lower = y_pred[i] - t_val * se_pred | |
| pi_upper = y_pred[i] + t_val * se_pred | |
| prediction_intervals.append({ | |
| "lower": float(pi_lower), | |
| "upper": float(pi_upper) | |
| }) | |
| start_val = float(y[0]) | |
| end_val = float(y[-1]) | |
| change_pct = None | |
| if not math.isclose(start_val, 0.0): | |
| change_pct = ((end_val - start_val) / abs(start_val)) * 100 | |
| direction = "flat" | |
| if slope > 0.02 * np.mean(y): | |
| direction = "upward" | |
| elif slope < -0.02 * np.mean(y): | |
| direction = "downward" | |
| summary = f"{direction.capitalize()} trend detected" if direction != "flat" else "Minimal trend detected" | |
| if change_pct is not None: | |
| summary += f" ({change_pct:+.1f}% over period)" | |
| summary += f" with 95% confidence [slope: {slope_ci_lower:.2f} to {slope_ci_upper:.2f}]" | |
| return { | |
| "summary": summary, | |
| "start": start_val, | |
| "end": end_val, | |
| "slope": float(slope), | |
| "slope_ci_lower": float(slope_ci_lower), | |
| "slope_ci_upper": float(slope_ci_upper), | |
| "std_error": float(residual_std_error), | |
| "r_squared": float(1 - np.sum(residuals**2) / np.sum((y - np.mean(y))**2)) if np.sum((y - np.mean(y))**2) > 0 else 0, | |
| "change_pct": change_pct, | |
| "points": [ | |
| {"period": period.strftime("%Y-%m"), "value": float(value)} | |
| for period, value in series.items() | |
| ], | |
| "prediction_intervals": prediction_intervals, | |
| } | |
| def _anomaly_scan(series: pd.Series) -> Optional[Dict[str, object]]: | |
| if series is None or len(series) < 4: | |
| return None | |
| values = series.values.astype(float) | |
| mean = float(np.mean(values)) | |
| std = float(np.std(values)) | |
| if math.isclose(std, 0.0): | |
| return None | |
| z_scores = (values - mean) / std | |
| anomalies: List[Dict[str, object]] = [] | |
| for idx, z in enumerate(z_scores): | |
| if abs(z) >= 2.0: | |
| period = series.index[idx] | |
| anomalies.append( | |
| { | |
| "period": period.strftime("%Y-%m"), | |
| "value": float(values[idx]), | |
| "z_score": float(z), | |
| } | |
| ) | |
| if not anomalies: | |
| return None | |
| top = sorted(anomalies, key=lambda a: abs(a["z_score"]), reverse=True)[:3] | |
| summary = "Anomalies detected at " + ", ".join( | |
| [f"{a['period']} (z={a['z_score']:+.1f})" for a in top] | |
| ) | |
| return {"summary": summary, "anomalies": anomalies, "mean": mean, "std": std} | |
| def run_advanced_analytics(df: pd.DataFrame) -> Dict[str, Optional[Dict[str, object]]]: | |
| series = _build_time_series(df) | |
| trend = _linear_trend(series) | |
| anomaly = _anomaly_scan(series) | |
| forecast = _forecast_next_periods(series) | |
| statistical_tests = _run_statistical_tests(series) | |
| return { | |
| "trend": trend, | |
| "anomaly": anomaly, | |
| "forecast": forecast, | |
| "statistical_tests": statistical_tests, | |
| } | |
| def _forecast_next_periods(series: pd.Series, periods: int = 3) -> Optional[Dict[str, object]]: | |
| """Generate forecasts using exponential smoothing with prediction intervals.""" | |
| if series is None or len(series) < 6: | |
| return None | |
| if not HAS_STATSMODELS: | |
| return { | |
| "summary": "Forecasting unavailable (statsmodels not installed)", | |
| "forecasts": [], | |
| } | |
| try: | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore") | |
| # Try Holt's exponential smoothing (trend method) | |
| model = ExponentialSmoothing( | |
| series.values, | |
| trend='add', | |
| seasonal=None, | |
| initialization_method="estimated" | |
| ) | |
| fitted = model.fit(optimized=True, remove_bias=False) | |
| # Generate forecasts | |
| forecast_values = fitted.forecast(steps=periods) | |
| # Calculate prediction intervals using residual std | |
| residuals = series.values - fitted.fittedvalues | |
| residual_std = np.std(residuals) | |
| # Generate future periods | |
| last_period = series.index[-1] | |
| freq = pd.infer_freq(series.index) or 'MS' | |
| future_periods = pd.date_range(start=last_period, periods=periods + 1, freq=freq)[1:] | |
| forecasts = [] | |
| for i, (period, value) in enumerate(zip(future_periods, forecast_values)): | |
| # Prediction interval widens with forecast horizon | |
| interval_width = residual_std * np.sqrt(i + 1) * 1.96 | |
| forecasts.append({ | |
| "period": period.strftime("%Y-%m"), | |
| "value": float(value), | |
| "lower_bound": float(value - interval_width), | |
| "upper_bound": float(value + interval_width), | |
| }) | |
| summary = f"Forecast for next {periods} periods using exponential smoothing" | |
| if len(forecasts) > 0: | |
| first_forecast = forecasts[0]["value"] | |
| last_actual = float(series.values[-1]) | |
| change = ((first_forecast - last_actual) / abs(last_actual)) * 100 if last_actual != 0 else 0 | |
| summary += f" (next period: {first_forecast:.1f}, {change:+.1f}% vs current)" | |
| return { | |
| "summary": summary, | |
| "method": "Exponential Smoothing (Holt)", | |
| "forecasts": forecasts, | |
| "model_params": { | |
| "alpha": float(fitted.params.get('smoothing_level', 0)), | |
| "beta": float(fitted.params.get('smoothing_trend', 0)) if fitted.params.get('smoothing_trend') else None, | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| "summary": f"Forecasting failed: {str(e)[:100]}", | |
| "forecasts": [], | |
| } | |
| def _run_statistical_tests(series: pd.Series) -> Optional[Dict[str, object]]: | |
| """Run statistical comparison tests on time series data.""" | |
| if series is None or len(series) < 6: | |
| return None | |
| results = {} | |
| # Split into two halves for comparison (e.g., first half vs second half) | |
| mid = len(series) // 2 | |
| first_half = series.values[:mid] | |
| second_half = series.values[mid:] | |
| # T-test: Are the two periods significantly different? | |
| try: | |
| t_stat, p_value = stats.ttest_ind(first_half, second_half) | |
| results["period_comparison"] = { | |
| "test": "Independent t-test", | |
| "comparison": "First half vs Second half", | |
| "t_statistic": float(t_stat), | |
| "p_value": float(p_value), | |
| "significant": p_value < 0.05, | |
| "summary": f"{'Significant' if p_value < 0.05 else 'No significant'} difference between periods (p={p_value:.4f})" | |
| } | |
| except Exception: | |
| pass | |
| # Test for stationarity (Augmented Dickey-Fuller test) | |
| if HAS_STATSMODELS: | |
| try: | |
| adf_result = adfuller(series.values, autolag='AIC') | |
| results["stationarity"] = { | |
| "test": "Augmented Dickey-Fuller", | |
| "adf_statistic": float(adf_result[0]), | |
| "p_value": float(adf_result[1]), | |
| "is_stationary": adf_result[1] < 0.05, | |
| "summary": f"Series is {'stationary' if adf_result[1] < 0.05 else 'non-stationary'} (p={adf_result[1]:.4f})" | |
| } | |
| except Exception: | |
| pass | |
| # Quartile-based comparison (ANOVA-like for segments) | |
| try: | |
| quartiles = pd.qcut(series.index.to_series(), q=4, labels=False, duplicates='drop') | |
| groups = [series.values[quartiles == i] for i in range(4) if len(series.values[quartiles == i]) > 0] | |
| if len(groups) >= 2: | |
| f_stat, p_value = stats.f_oneway(*groups) | |
| results["quarterly_variance"] = { | |
| "test": "One-way ANOVA", | |
| "comparison": "Across quarters", | |
| "f_statistic": float(f_stat), | |
| "p_value": float(p_value), | |
| "significant": p_value < 0.05, | |
| "summary": f"{'Significant' if p_value < 0.05 else 'No significant'} variance across quarters (p={p_value:.4f})" | |
| } | |
| except Exception: | |
| pass | |
| if not results: | |
| return None | |
| # Overall summary | |
| sig_tests = [v["summary"] for v in results.values() if "significant" in v.get("summary", "").lower()] | |
| overall = f"{len(sig_tests)} significant finding(s): " + "; ".join(sig_tests[:2]) if sig_tests else "No significant patterns detected" | |
| return { | |
| "summary": overall, | |
| "tests": results, | |
| } | |