Spaces:
Paused
Paused
File size: 12,013 Bytes
eff8aa5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
import math
from typing import Dict, List, Optional, Tuple
import warnings
import numpy as np
import pandas as pd
from scipy import stats
try:
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from statsmodels.tsa.stattools import adfuller
HAS_STATSMODELS = True
except ImportError:
HAS_STATSMODELS = False
DATE_HINTS = ("date", "time", "month", "year", "period")
METRIC_HINTS = ("sale", "amount", "revenue", "profit", "price", "total")
MAX_POINTS = 60
def _detect_datetime_column(df: pd.DataFrame) -> Optional[str]:
# Prefer columns already datetime typed
for col in df.columns:
if pd.api.types.is_datetime64_any_dtype(df[col]):
return col
# Fallback to columns whose names hint at date/time
for col in df.columns:
low = col.lower()
if any(hint in low for hint in DATE_HINTS):
try:
pd.to_datetime(df[col])
return col
except Exception:
continue
return None
def _detect_metric_column(df: pd.DataFrame) -> Optional[str]:
numeric_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
if not numeric_cols:
return None
for col in numeric_cols:
low = col.lower()
if any(hint in low for hint in METRIC_HINTS):
return col
return numeric_cols[0]
def _build_time_series(df: pd.DataFrame) -> Optional[pd.Series]:
if df is None or df.empty:
return None
date_col = _detect_datetime_column(df)
metric_col = _detect_metric_column(df)
if not date_col or not metric_col:
return None
ts = df[[date_col, metric_col]].copy()
ts[date_col] = pd.to_datetime(ts[date_col], errors="coerce")
ts = ts.dropna(subset=[date_col, metric_col])
if ts.empty:
return None
# Aggregate by month for smoother signals
ts["period"] = ts[date_col].dt.to_period("M").dt.to_timestamp()
grouped = ts.groupby("period")[metric_col].sum().sort_index()
if len(grouped) > MAX_POINTS:
grouped = grouped[-MAX_POINTS:]
return grouped
def _linear_trend(series: pd.Series) -> Optional[Dict[str, object]]:
if series is None or len(series) < 3:
return None
x = np.arange(len(series))
y = series.values.astype(float)
# Linear regression with confidence intervals
slope, intercept = np.polyfit(x, y, 1)
y_pred = slope * x + intercept
# Calculate standard error and confidence intervals
residuals = y - y_pred
n = len(y)
degrees_freedom = n - 2
residual_std_error = np.sqrt(np.sum(residuals**2) / degrees_freedom) if degrees_freedom > 0 else 0
# Standard error of slope
x_mean = np.mean(x)
se_slope = residual_std_error / np.sqrt(np.sum((x - x_mean)**2)) if np.sum((x - x_mean)**2) > 0 else 0
# 95% confidence interval for slope
t_val = stats.t.ppf(0.975, degrees_freedom) if degrees_freedom > 0 else 1.96
slope_ci_lower = slope - t_val * se_slope
slope_ci_upper = slope + t_val * se_slope
# Prediction intervals for the trend line
prediction_intervals = []
for i in range(len(x)):
se_pred = residual_std_error * np.sqrt(1 + 1/n + (x[i] - x_mean)**2 / np.sum((x - x_mean)**2))
pi_lower = y_pred[i] - t_val * se_pred
pi_upper = y_pred[i] + t_val * se_pred
prediction_intervals.append({
"lower": float(pi_lower),
"upper": float(pi_upper)
})
start_val = float(y[0])
end_val = float(y[-1])
change_pct = None
if not math.isclose(start_val, 0.0):
change_pct = ((end_val - start_val) / abs(start_val)) * 100
direction = "flat"
if slope > 0.02 * np.mean(y):
direction = "upward"
elif slope < -0.02 * np.mean(y):
direction = "downward"
summary = f"{direction.capitalize()} trend detected" if direction != "flat" else "Minimal trend detected"
if change_pct is not None:
summary += f" ({change_pct:+.1f}% over period)"
summary += f" with 95% confidence [slope: {slope_ci_lower:.2f} to {slope_ci_upper:.2f}]"
return {
"summary": summary,
"start": start_val,
"end": end_val,
"slope": float(slope),
"slope_ci_lower": float(slope_ci_lower),
"slope_ci_upper": float(slope_ci_upper),
"std_error": float(residual_std_error),
"r_squared": float(1 - np.sum(residuals**2) / np.sum((y - np.mean(y))**2)) if np.sum((y - np.mean(y))**2) > 0 else 0,
"change_pct": change_pct,
"points": [
{"period": period.strftime("%Y-%m"), "value": float(value)}
for period, value in series.items()
],
"prediction_intervals": prediction_intervals,
}
def _anomaly_scan(series: pd.Series) -> Optional[Dict[str, object]]:
if series is None or len(series) < 4:
return None
values = series.values.astype(float)
mean = float(np.mean(values))
std = float(np.std(values))
if math.isclose(std, 0.0):
return None
z_scores = (values - mean) / std
anomalies: List[Dict[str, object]] = []
for idx, z in enumerate(z_scores):
if abs(z) >= 2.0:
period = series.index[idx]
anomalies.append(
{
"period": period.strftime("%Y-%m"),
"value": float(values[idx]),
"z_score": float(z),
}
)
if not anomalies:
return None
top = sorted(anomalies, key=lambda a: abs(a["z_score"]), reverse=True)[:3]
summary = "Anomalies detected at " + ", ".join(
[f"{a['period']} (z={a['z_score']:+.1f})" for a in top]
)
return {"summary": summary, "anomalies": anomalies, "mean": mean, "std": std}
def run_advanced_analytics(df: pd.DataFrame) -> Dict[str, Optional[Dict[str, object]]]:
series = _build_time_series(df)
trend = _linear_trend(series)
anomaly = _anomaly_scan(series)
forecast = _forecast_next_periods(series)
statistical_tests = _run_statistical_tests(series)
return {
"trend": trend,
"anomaly": anomaly,
"forecast": forecast,
"statistical_tests": statistical_tests,
}
def _forecast_next_periods(series: pd.Series, periods: int = 3) -> Optional[Dict[str, object]]:
"""Generate forecasts using exponential smoothing with prediction intervals."""
if series is None or len(series) < 6:
return None
if not HAS_STATSMODELS:
return {
"summary": "Forecasting unavailable (statsmodels not installed)",
"forecasts": [],
}
try:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Try Holt's exponential smoothing (trend method)
model = ExponentialSmoothing(
series.values,
trend='add',
seasonal=None,
initialization_method="estimated"
)
fitted = model.fit(optimized=True, remove_bias=False)
# Generate forecasts
forecast_values = fitted.forecast(steps=periods)
# Calculate prediction intervals using residual std
residuals = series.values - fitted.fittedvalues
residual_std = np.std(residuals)
# Generate future periods
last_period = series.index[-1]
freq = pd.infer_freq(series.index) or 'MS'
future_periods = pd.date_range(start=last_period, periods=periods + 1, freq=freq)[1:]
forecasts = []
for i, (period, value) in enumerate(zip(future_periods, forecast_values)):
# Prediction interval widens with forecast horizon
interval_width = residual_std * np.sqrt(i + 1) * 1.96
forecasts.append({
"period": period.strftime("%Y-%m"),
"value": float(value),
"lower_bound": float(value - interval_width),
"upper_bound": float(value + interval_width),
})
summary = f"Forecast for next {periods} periods using exponential smoothing"
if len(forecasts) > 0:
first_forecast = forecasts[0]["value"]
last_actual = float(series.values[-1])
change = ((first_forecast - last_actual) / abs(last_actual)) * 100 if last_actual != 0 else 0
summary += f" (next period: {first_forecast:.1f}, {change:+.1f}% vs current)"
return {
"summary": summary,
"method": "Exponential Smoothing (Holt)",
"forecasts": forecasts,
"model_params": {
"alpha": float(fitted.params.get('smoothing_level', 0)),
"beta": float(fitted.params.get('smoothing_trend', 0)) if fitted.params.get('smoothing_trend') else None,
}
}
except Exception as e:
return {
"summary": f"Forecasting failed: {str(e)[:100]}",
"forecasts": [],
}
def _run_statistical_tests(series: pd.Series) -> Optional[Dict[str, object]]:
"""Run statistical comparison tests on time series data."""
if series is None or len(series) < 6:
return None
results = {}
# Split into two halves for comparison (e.g., first half vs second half)
mid = len(series) // 2
first_half = series.values[:mid]
second_half = series.values[mid:]
# T-test: Are the two periods significantly different?
try:
t_stat, p_value = stats.ttest_ind(first_half, second_half)
results["period_comparison"] = {
"test": "Independent t-test",
"comparison": "First half vs Second half",
"t_statistic": float(t_stat),
"p_value": float(p_value),
"significant": p_value < 0.05,
"summary": f"{'Significant' if p_value < 0.05 else 'No significant'} difference between periods (p={p_value:.4f})"
}
except Exception:
pass
# Test for stationarity (Augmented Dickey-Fuller test)
if HAS_STATSMODELS:
try:
adf_result = adfuller(series.values, autolag='AIC')
results["stationarity"] = {
"test": "Augmented Dickey-Fuller",
"adf_statistic": float(adf_result[0]),
"p_value": float(adf_result[1]),
"is_stationary": adf_result[1] < 0.05,
"summary": f"Series is {'stationary' if adf_result[1] < 0.05 else 'non-stationary'} (p={adf_result[1]:.4f})"
}
except Exception:
pass
# Quartile-based comparison (ANOVA-like for segments)
try:
quartiles = pd.qcut(series.index.to_series(), q=4, labels=False, duplicates='drop')
groups = [series.values[quartiles == i] for i in range(4) if len(series.values[quartiles == i]) > 0]
if len(groups) >= 2:
f_stat, p_value = stats.f_oneway(*groups)
results["quarterly_variance"] = {
"test": "One-way ANOVA",
"comparison": "Across quarters",
"f_statistic": float(f_stat),
"p_value": float(p_value),
"significant": p_value < 0.05,
"summary": f"{'Significant' if p_value < 0.05 else 'No significant'} variance across quarters (p={p_value:.4f})"
}
except Exception:
pass
if not results:
return None
# Overall summary
sig_tests = [v["summary"] for v in results.values() if "significant" in v.get("summary", "").lower()]
overall = f"{len(sig_tests)} significant finding(s): " + "; ".join(sig_tests[:2]) if sig_tests else "No significant patterns detected"
return {
"summary": overall,
"tests": results,
}
|