Spaces:
Sleeping
Sleeping
| # app.py — OCI Consumption Forecasting (Markdown Bullets + LLM Narrative) | |
| import os | |
| import tempfile | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| import matplotlib | |
| matplotlib.use("Agg") # headless for Spaces | |
| import matplotlib.pyplot as plt | |
| from statsmodels.tsa.holtwinters import Holt | |
| from statsmodels.tsa.statespace.sarimax import SARIMAX | |
| # --- Provider-targeted bullet summary (OCI default) --- | |
| # Ensure llm_consumption_analysis.py (OCI-first, bullet-enforced) is in the same dir. | |
| from llm_consumption_analysis import analyze_consumption | |
| # --- Optional OpenAI import (only used if OPENAI_API_KEY present) --- | |
| try: | |
| from openai import OpenAI # openai>=1.0 | |
| except Exception: | |
| OpenAI = None # type: ignore | |
| # ---------- Data loading & preparation ---------- | |
| def _read_table(file_obj): | |
| """Read CSV/XLS/XLSX into a pandas DataFrame; accept path or gradio File object.""" | |
| if isinstance(file_obj, (str, os.PathLike)): | |
| path = str(file_obj) | |
| else: | |
| path = getattr(file_obj, "name", None) | |
| if not path: | |
| raise gr.Error("Upload a .csv, .xls, or .xlsx file.") | |
| low = path.lower() | |
| if low.endswith(".csv"): | |
| return pd.read_csv(path) | |
| elif low.endswith(".xls") or low.endswith(".xlsx"): | |
| return pd.read_excel(path) | |
| else: | |
| raise gr.Error("Please upload a .csv, .xls, or .xlsx file.") | |
| def _prepare_series(df: pd.DataFrame) -> pd.Series: | |
| """ | |
| Accepts either: | |
| - ['Metered Service Date', 'Computed Amount CD'] (raw usage export) | |
| - or ['Date', 'Amount'] (already grouped) | |
| Returns monthly Series indexed by month-end with no gaps. | |
| Requires >=24 months of data. | |
| """ | |
| # normalize incoming column names (strip spaces) | |
| cols_map = {c.strip(): c for c in df.columns} | |
| if ("Metered Service Date" in cols_map) and ("Computed Amount CD" in cols_map): | |
| tmp = df[[cols_map["Metered Service Date"], cols_map["Computed Amount CD"]]].copy() | |
| tmp.rename(columns={ | |
| cols_map["Metered Service Date"]: "Date", | |
| cols_map["Computed Amount CD"]: "Amount" | |
| }, inplace=True) | |
| elif ("Date" in df.columns) and ("Amount" in df.columns): | |
| tmp = df[["Date", "Amount"]].copy() | |
| else: | |
| raise gr.Error( | |
| "File must contain either " | |
| "['Metered Service Date','Computed Amount CD'] or ['Date','Amount']." | |
| ) | |
| # Coerce types | |
| tmp["Date"] = pd.to_datetime(tmp["Date"], errors="coerce") | |
| tmp["Amount"] = pd.to_numeric(tmp["Amount"], errors="coerce") | |
| tmp = tmp.dropna(subset=["Date", "Amount"]) | |
| # Normalize to month-end and sum duplicates | |
| tmp["Date"] = tmp["Date"] + pd.offsets.MonthEnd(0) | |
| tmp = tmp.groupby("Date", as_index=False)["Amount"].sum().sort_values("Date") | |
| # Continuous monthly index (no gaps) | |
| y = tmp.set_index("Date")["Amount"] | |
| full_idx = pd.date_range(y.index.min(), y.index.max(), freq="M") | |
| if not y.index.equals(full_idx): | |
| missing = full_idx.difference(y.index) | |
| if len(missing) > 0: | |
| raise gr.Error( | |
| f"Detected missing months ({len(missing)}). Provide continuous monthly data." | |
| ) | |
| y = y.reindex(full_idx) | |
| if len(y) < 24: | |
| raise gr.Error("Need at least two full calendar years (>= 24 months) of monthly totals.") | |
| return y | |
| # ---------- Forecast & intervals ---------- | |
| def _forecast_holt_with_ci( | |
| y: pd.Series, | |
| H: int = 12, | |
| n_boot: int = 800, | |
| alpha: float = 0.05, | |
| interval_method: str = "bootstrap", | |
| ): | |
| """ | |
| Point forecast: Holt (damped, MLE initialization). | |
| Intervals: residual bootstrap (default) or SARIMA intervals (for band only). | |
| """ | |
| # Fit Holt (damped) | |
| holt = Holt(y, damped_trend=True, initialization_method="estimated") | |
| fit = holt.fit(optimized=True, use_brute=True) | |
| future_index = pd.date_range(y.index[-1] + pd.offsets.MonthEnd(1), periods=H, freq="M") | |
| fc_mean = fit.forecast(H) | |
| fc_mean.index = future_index | |
| # Confidence interval | |
| if interval_method == "bootstrap": | |
| resid = (y - fit.fittedvalues).dropna().values | |
| rng = np.random.default_rng(42) | |
| if len(resid) >= 5: | |
| sims = rng.choice(resid, size=(n_boot, H), replace=True) + fc_mean.values | |
| else: | |
| sigma = np.std(resid) if len(resid) else max(1.0, 0.10 * (y.mean() or 1.0)) | |
| sims = rng.normal(fc_mean.values, sigma, size=(n_boot, H)) | |
| lower = np.quantile(sims, alpha / 2, axis=0) | |
| upper = np.quantile(sims, 1 - alpha / 2, axis=0) | |
| elif interval_method == "sarima": | |
| sar = SARIMAX( | |
| y, order=(1, 0, 0), seasonal_order=(0, 1, 1, 12), | |
| enforce_stationarity=False, enforce_invertibility=False | |
| ).fit(disp=False) | |
| pred = sar.get_forecast(steps=H) | |
| ci = pred.conf_int().reindex(pred.predicted_mean.index) | |
| lower = ci.iloc[:, 0].to_numpy() | |
| upper = ci.iloc[:, 1].to_numpy() | |
| else: | |
| raise ValueError("interval_method must be 'bootstrap' or 'sarima'.") | |
| ci_lower = pd.Series(lower, index=future_index, name="Lower") | |
| ci_upper = pd.Series(upper, index=future_index, name="Upper") | |
| return fc_mean, ci_lower, ci_upper | |
| def _make_figure(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series, ci_upper: pd.Series): | |
| """Return a Matplotlib figure with actuals, forecast line, and CI band.""" | |
| fig, ax = plt.subplots(figsize=(14, 6)) | |
| ax.plot(y.index, y.values, label="Actual", linewidth=2) | |
| ax.plot(fc_mean.index, fc_mean.values, "--", label="Forecast (next 12m)") | |
| ax.fill_between(fc_mean.index, ci_lower.values, ci_upper.values, alpha=0.2, label="95% CI") | |
| ax.axvline(y.index[-1], color="k", linewidth=1, alpha=0.6) | |
| ax.set_title("12-Month Forecast with Confidence Interval") | |
| ax.set_xlabel("Date") | |
| ax.set_ylabel("Amount") | |
| ax.legend() | |
| fig.tight_layout() | |
| return fig | |
| # ---------- LLM prose narrative (variables & impact) ---------- | |
| def _series_stats(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series | None, ci_upper: pd.Series | None): | |
| """Compact stats used for the explanatory narrative.""" | |
| y = y.sort_index() | |
| n = len(y) | |
| last12 = y.iloc[-12:] if n >= 12 else y | |
| prev12 = y.iloc[-24:-12] if n >= 24 else y.iloc[:-12] | |
| yoy = None | |
| if len(last12) and len(prev12) and prev12.mean() != 0: | |
| yoy = 100.0 * (last12.mean() - prev12.mean()) / prev12.mean() | |
| months = max(1, (y.index[-1].to_period("M") - y.index[0].to_period("M")).n) | |
| years = months / 12.0 | |
| first, last = float(y.iloc[0]), float(y.iloc[-1]) | |
| cagr = ((last / first) ** (1 / years) - 1) * 100.0 if (first > 0 and years > 0) else 0.0 | |
| x = np.arange(len(y)) | |
| m, _b = np.polyfit(x, y.values.astype(float), 1) | |
| df = y.to_frame("val") | |
| df["month"] = df.index.month | |
| by_month = df.groupby("month")["val"].mean() | |
| seas_strength = 0.0 | |
| if df["val"].std() != 0 and by_month.std() != 0: | |
| seas_strength = float(by_month.std() / df["val"].std()) | |
| def _acf_at_lag(series, lag): | |
| if len(series) <= lag: | |
| return 0.0 | |
| s0 = series - series.mean() | |
| num = (s0.iloc[lag:] * s0.iloc[:-lag]).sum() | |
| den = (s0 * s0).sum() | |
| return float(num / den) if den != 0 else 0.0 | |
| acf12 = _acf_at_lag(y, 12) | |
| avg_ci_w = None | |
| avg_ci_rel = None | |
| if ci_lower is not None and ci_upper is not None and len(ci_lower) == len(fc_mean): | |
| widths = (ci_upper.values - ci_lower.values) | |
| avg_ci_w = float(np.mean(widths)) | |
| denom = np.maximum(np.abs(fc_mean.values), 1e-9) | |
| avg_ci_rel = float(np.mean(widths / denom) * 100.0) | |
| fc_vs_last12 = None | |
| if len(fc_mean): | |
| base = float(last12.mean()) if len(last12) else float(y.mean()) | |
| if base == 0: | |
| base = 1e-9 | |
| fc_vs_last12 = (float(fc_mean.mean()) - base) / base * 100.0 | |
| return { | |
| "n_months": n, | |
| "start": y.index[0].date(), | |
| "end": y.index[-1].date(), | |
| "mean": float(y.mean()), | |
| "median": float(y.median()), | |
| "stdev": float(y.std(ddof=1) if len(y) > 1 else 0.0), | |
| "slope_per_month": float(m), | |
| "cagr_pct": float(cagr), | |
| "last6": float(y.iloc[-6:].mean()) if n >= 6 else float(y.mean()), | |
| "prev6": float(y.iloc[-12:-6].mean()) if n >= 12 else float("nan"), | |
| "yoy_pct": None if yoy is None else float(yoy), | |
| "seas_strength": float(seas_strength), | |
| "acf12": float(acf12), | |
| "avg_ci_w": avg_ci_w, | |
| "avg_ci_rel": avg_ci_rel, | |
| "fc_vs_last12": fc_vs_last12, | |
| } | |
| def _build_prompt(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series | None, ci_upper: pd.Series | None) -> str: | |
| st = _series_stats(y, fc_mean, ci_lower, ci_upper) | |
| def _fmt(v, nd=2): | |
| try: | |
| return f"{v:,.{nd}f}" | |
| except Exception: | |
| return str(v) | |
| bullet = "\n".join([ | |
| f"- Coverage: {st['n_months']} months from {st['start']} to {st['end']}", | |
| f"- Central tendency: mean {_fmt(st['mean'])}, median {_fmt(st['median'])}", | |
| f"- Volatility: stdev {_fmt(st['stdev'])}", | |
| f"- Trend: slope {_fmt(st['slope_per_month'])}/month; CAGR {_fmt(st['cagr_pct'])}%", | |
| f"- 6m momentum: last 6m avg {_fmt(st['last6'])} vs prior 6m {_fmt(st['prev6'])}", | |
| f"- YoY: {'n/a' if st['yoy_pct'] is None else _fmt(st['yoy_pct']) + '%'}", | |
| f"- Seasonality: strength {_fmt(st['seas_strength'])}, ACF(12) {_fmt(st['acf12'])}", | |
| f"- Forecast vs last-12m: {'n/a' if st['fc_vs_last12'] is None else _fmt(st['fc_vs_last12']) + '%'}", | |
| f"- CI: {'width ~' + _fmt(st['avg_ci_w']) + ' (' + _fmt(st['avg_ci_rel']) + '% of forecast)' if st['avg_ci_w'] is not None else 'not available'}", | |
| ]) | |
| return f""" | |
| You are a concise FinOps analyst. Write 8–12 sentences explaining **why** the metrics matter: | |
| - Tie slope and CAGR to long-term consumption pressure. | |
| - Explain seasonality strength/ACF and how peaks/troughs drive reservation/commit decisions. | |
| - Compare the 12-month forecast to the prior year and discuss planning implications. | |
| - Interpret CI width (uncertainty) and what risk controls to put in place. | |
| - End with 2–3 concrete actions (commitments, rightsizing, budget guardrails). | |
| Do NOT use bullet points in your answer. | |
| Context (do not repeat verbatim; use to ground your reasoning): | |
| {bullet} | |
| """.strip() | |
| def _call_openai(prompt: str, model: str = None, temperature: float = 0.2, max_tokens: int = 650) -> str | None: | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key or OpenAI is None: | |
| return None | |
| try: | |
| client = OpenAI(api_key=api_key) | |
| model = os.environ.get("OPENAI_MODEL", model or "gpt-4o-mini") | |
| resp = client.chat.completions.create( | |
| model=model, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| messages=[ | |
| {"role": "system", "content": "You are a concise, numerate FinOps analyst."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| ) | |
| return (resp.choices[0].message.content or "").strip() | |
| except Exception: | |
| return None | |
| def _local_narrative(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series, ci_upper: pd.Series) -> str: | |
| st = _series_stats(y, fc_mean, ci_lower, ci_upper) | |
| trend_word = "rising" if st["slope_per_month"] > 0 else ("declining" if st["slope_per_month"] < 0 else "flat") | |
| vol = "low" if st["stdev"] < 0.15 * st["mean"] else ("elevated" if st["stdev"] > 0.4 * st["mean"] else "moderate") | |
| seas_word = ( | |
| "pronounced" if st["seas_strength"] >= 0.75 or abs(st["acf12"]) >= 0.4 | |
| else "moderate" if st["seas_strength"] >= 0.35 or abs(st["acf12"]) >= 0.2 | |
| else "minimal" | |
| ) | |
| yoy_phrase = ( | |
| f"Year over year, the last twelve months averaged {st['yoy_pct']:.2f}% " | |
| f"{'higher' if (st['yoy_pct'] or 0) > 0 else 'lower'} than the prior year. " | |
| if st["yoy_pct"] is not None else "" | |
| ) | |
| fc_phrase = ( | |
| f"The next twelve months are projected " | |
| f"{'above' if (st['fc_vs_last12'] or 0) > 0 else 'below'} the last year by " | |
| f"{abs(st['fc_vs_last12'] or 0):.2f}%. " | |
| if st["fc_vs_last12"] is not None else "" | |
| ) | |
| ci_phrase = ( | |
| f"Uncertainty averages ~{st['avg_ci_rel']:.1f}% of the forecast, suggesting " | |
| f"{'wide' if (st['avg_ci_rel'] or 0) > 35 else 'tight'} bands. " | |
| if st["avg_ci_rel"] is not None else "Confidence intervals were not computed for this run. " | |
| ) | |
| return ( | |
| f"Consumption shows a {trend_word} long-term trend (slope {st['slope_per_month']:,.2f}/month; " | |
| f"CAGR {st['cagr_pct']:.2f}%). Variability is {vol} " | |
| f"(stdev {st['stdev']:,.2f} vs mean {st['mean']:,.2f}). " | |
| f"Seasonality appears {seas_word} with ACF(12)={st['acf12']:.2f} and dispersion index {st['seas_strength']:.2f}, " | |
| f"which should guide when to time reservations and rightsizing. " | |
| f"Recent momentum: last six months averaged {st['last6']:,.2f} versus {st['prev6']:,.2f}. " | |
| f"{yoy_phrase}{fc_phrase}" | |
| f"{ci_phrase}" | |
| "Actions: (1) Align commitments to seasonal crest months; " | |
| "(2) Right-size persistently under-utilized services ahead of troughs; " | |
| "(3) Set budget guardrails near the forecast mean plus half the average CI width." | |
| ).strip() | |
| def _generate_narrative(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series, ci_upper: pd.Series) -> str: | |
| prompt = _build_prompt(y, fc_mean, ci_lower, ci_upper) | |
| text = _call_openai(prompt) # returns None if no key or error | |
| if text is None or not isinstance(text, str) or not text.strip(): | |
| text = _local_narrative(y, fc_mean, ci_lower, ci_upper) | |
| return text | |
| # ---------- Gradio pipeline ---------- | |
| def run_pipeline(file_obj, interval_source: str = "Bootstrap (recommended)"): | |
| if file_obj is None: | |
| raise gr.Error("Please upload your CSV/XLS file first.") | |
| df = _read_table(file_obj) | |
| y = _prepare_series(df) | |
| method = "bootstrap" if "Bootstrap" in interval_source else "sarima" | |
| fc_mean, ci_lower, ci_upper = _forecast_holt_with_ci(y, H=12, n_boot=800, interval_method=method) | |
| # Build combined CSV (history + 12m forecast + CI) | |
| actual_df = pd.DataFrame({"Actual": y}) | |
| fc_df = pd.DataFrame({"Forecast": fc_mean, "Lower": ci_lower, "Upper": ci_upper}) | |
| out_df = pd.concat([actual_df, fc_df], axis=0) | |
| # Save CSV for download | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix="forecast_12m_with_ci_") | |
| out_path = tmp.name | |
| out_df.to_csv(out_path, index=True) | |
| tmp.close() | |
| # Render figure | |
| fig = _make_figure(y, fc_mean, ci_lower, ci_upper) | |
| # ---- Provider-targeted bullet summary (Markdown) ---- | |
| bullets_md = analyze_consumption( | |
| y=y, | |
| fc_mean=fc_mean, | |
| ci_lower=ci_lower, | |
| ci_upper=ci_upper, | |
| provider="auto", # uses OpenAI if key present; else local deterministic | |
| cloud_provider="oci", # default is OCI; invalid inputs also resolve to OCI | |
| render="md", # Markdown bullets | |
| ) | |
| # ---- Explanatory narrative (Markdown prose) ---- | |
| narrative_md = _generate_narrative(y, fc_mean, ci_lower, ci_upper) | |
| # Combine into one Markdown block | |
| combined_md = ( | |
| "### Provider-Targeted Summary\n" | |
| f"{bullets_md}\n\n" | |
| "---\n\n" | |
| "### Narrative: What the variables mean and why they matter\n" | |
| f"{narrative_md}\n" | |
| ) | |
| return fig, out_path, combined_md | |
| # ---------- UI ---------- | |
| with gr.Blocks(title="OCI Consumption Forecasting") as demo: | |
| gr.Markdown( | |
| "# OCI Consumption Forecasting\n" | |
| "Upload your **CSV/XLS** with either:\n" | |
| "- `Metered Service Date` + `Computed Amount CD` (raw export), or\n" | |
| "- `Date` + `Amount` (already grouped).\n\n" | |
| "**Requirements:** continuous **monthly** data, at least **24 months**." | |
| ) | |
| with gr.Row(): | |
| file_in = gr.File(label="Upload CSV/XLS", file_types=[".csv", ".xls", ".xlsx"]) | |
| interval_src = gr.Dropdown( | |
| ["Bootstrap (recommended)", "SARIMA (intervals only)"], | |
| value="Bootstrap (recommended)", | |
| label="Confidence Interval Method" | |
| ) | |
| run_btn = gr.Button("Run Forecast", variant="primary") | |
| plot_out = gr.Plot(label="12-Month Forecast") | |
| csv_out = gr.File(label="Download forecast_12m_with_ci.csv") | |
| # Markdown output (bullets + narrative) | |
| analysis_out = gr.Markdown(label="Analysis (Markdown)") | |
| run_btn.click( | |
| fn=run_pipeline, | |
| inputs=[file_in, interval_src], | |
| outputs=[plot_out, csv_out, analysis_out] | |
| ) | |
| if __name__ == "__main__": | |
| # On Hugging Face Spaces, share=True is fine; no _js hooks used. | |
| demo.launch(share=True) | |