kgauvin603's picture
Rename app-v2.py to app.py
a8f5aba verified
# app.py — OCI Consumption Forecasting (Markdown Bullets + LLM Narrative)
import os
import tempfile
import numpy as np
import pandas as pd
import gradio as gr
import matplotlib
matplotlib.use("Agg") # headless for Spaces
import matplotlib.pyplot as plt
from statsmodels.tsa.holtwinters import Holt
from statsmodels.tsa.statespace.sarimax import SARIMAX
# --- Provider-targeted bullet summary (OCI default) ---
# Ensure llm_consumption_analysis.py (OCI-first, bullet-enforced) is in the same dir.
from llm_consumption_analysis import analyze_consumption
# --- Optional OpenAI import (only used if OPENAI_API_KEY present) ---
try:
from openai import OpenAI # openai>=1.0
except Exception:
OpenAI = None # type: ignore
# ---------- Data loading & preparation ----------
def _read_table(file_obj):
"""Read CSV/XLS/XLSX into a pandas DataFrame; accept path or gradio File object."""
if isinstance(file_obj, (str, os.PathLike)):
path = str(file_obj)
else:
path = getattr(file_obj, "name", None)
if not path:
raise gr.Error("Upload a .csv, .xls, or .xlsx file.")
low = path.lower()
if low.endswith(".csv"):
return pd.read_csv(path)
elif low.endswith(".xls") or low.endswith(".xlsx"):
return pd.read_excel(path)
else:
raise gr.Error("Please upload a .csv, .xls, or .xlsx file.")
def _prepare_series(df: pd.DataFrame) -> pd.Series:
"""
Accepts either:
- ['Metered Service Date', 'Computed Amount CD'] (raw usage export)
- or ['Date', 'Amount'] (already grouped)
Returns monthly Series indexed by month-end with no gaps.
Requires >=24 months of data.
"""
# normalize incoming column names (strip spaces)
cols_map = {c.strip(): c for c in df.columns}
if ("Metered Service Date" in cols_map) and ("Computed Amount CD" in cols_map):
tmp = df[[cols_map["Metered Service Date"], cols_map["Computed Amount CD"]]].copy()
tmp.rename(columns={
cols_map["Metered Service Date"]: "Date",
cols_map["Computed Amount CD"]: "Amount"
}, inplace=True)
elif ("Date" in df.columns) and ("Amount" in df.columns):
tmp = df[["Date", "Amount"]].copy()
else:
raise gr.Error(
"File must contain either "
"['Metered Service Date','Computed Amount CD'] or ['Date','Amount']."
)
# Coerce types
tmp["Date"] = pd.to_datetime(tmp["Date"], errors="coerce")
tmp["Amount"] = pd.to_numeric(tmp["Amount"], errors="coerce")
tmp = tmp.dropna(subset=["Date", "Amount"])
# Normalize to month-end and sum duplicates
tmp["Date"] = tmp["Date"] + pd.offsets.MonthEnd(0)
tmp = tmp.groupby("Date", as_index=False)["Amount"].sum().sort_values("Date")
# Continuous monthly index (no gaps)
y = tmp.set_index("Date")["Amount"]
full_idx = pd.date_range(y.index.min(), y.index.max(), freq="M")
if not y.index.equals(full_idx):
missing = full_idx.difference(y.index)
if len(missing) > 0:
raise gr.Error(
f"Detected missing months ({len(missing)}). Provide continuous monthly data."
)
y = y.reindex(full_idx)
if len(y) < 24:
raise gr.Error("Need at least two full calendar years (>= 24 months) of monthly totals.")
return y
# ---------- Forecast & intervals ----------
def _forecast_holt_with_ci(
y: pd.Series,
H: int = 12,
n_boot: int = 800,
alpha: float = 0.05,
interval_method: str = "bootstrap",
):
"""
Point forecast: Holt (damped, MLE initialization).
Intervals: residual bootstrap (default) or SARIMA intervals (for band only).
"""
# Fit Holt (damped)
holt = Holt(y, damped_trend=True, initialization_method="estimated")
fit = holt.fit(optimized=True, use_brute=True)
future_index = pd.date_range(y.index[-1] + pd.offsets.MonthEnd(1), periods=H, freq="M")
fc_mean = fit.forecast(H)
fc_mean.index = future_index
# Confidence interval
if interval_method == "bootstrap":
resid = (y - fit.fittedvalues).dropna().values
rng = np.random.default_rng(42)
if len(resid) >= 5:
sims = rng.choice(resid, size=(n_boot, H), replace=True) + fc_mean.values
else:
sigma = np.std(resid) if len(resid) else max(1.0, 0.10 * (y.mean() or 1.0))
sims = rng.normal(fc_mean.values, sigma, size=(n_boot, H))
lower = np.quantile(sims, alpha / 2, axis=0)
upper = np.quantile(sims, 1 - alpha / 2, axis=0)
elif interval_method == "sarima":
sar = SARIMAX(
y, order=(1, 0, 0), seasonal_order=(0, 1, 1, 12),
enforce_stationarity=False, enforce_invertibility=False
).fit(disp=False)
pred = sar.get_forecast(steps=H)
ci = pred.conf_int().reindex(pred.predicted_mean.index)
lower = ci.iloc[:, 0].to_numpy()
upper = ci.iloc[:, 1].to_numpy()
else:
raise ValueError("interval_method must be 'bootstrap' or 'sarima'.")
ci_lower = pd.Series(lower, index=future_index, name="Lower")
ci_upper = pd.Series(upper, index=future_index, name="Upper")
return fc_mean, ci_lower, ci_upper
def _make_figure(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series, ci_upper: pd.Series):
"""Return a Matplotlib figure with actuals, forecast line, and CI band."""
fig, ax = plt.subplots(figsize=(14, 6))
ax.plot(y.index, y.values, label="Actual", linewidth=2)
ax.plot(fc_mean.index, fc_mean.values, "--", label="Forecast (next 12m)")
ax.fill_between(fc_mean.index, ci_lower.values, ci_upper.values, alpha=0.2, label="95% CI")
ax.axvline(y.index[-1], color="k", linewidth=1, alpha=0.6)
ax.set_title("12-Month Forecast with Confidence Interval")
ax.set_xlabel("Date")
ax.set_ylabel("Amount")
ax.legend()
fig.tight_layout()
return fig
# ---------- LLM prose narrative (variables & impact) ----------
def _series_stats(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series | None, ci_upper: pd.Series | None):
"""Compact stats used for the explanatory narrative."""
y = y.sort_index()
n = len(y)
last12 = y.iloc[-12:] if n >= 12 else y
prev12 = y.iloc[-24:-12] if n >= 24 else y.iloc[:-12]
yoy = None
if len(last12) and len(prev12) and prev12.mean() != 0:
yoy = 100.0 * (last12.mean() - prev12.mean()) / prev12.mean()
months = max(1, (y.index[-1].to_period("M") - y.index[0].to_period("M")).n)
years = months / 12.0
first, last = float(y.iloc[0]), float(y.iloc[-1])
cagr = ((last / first) ** (1 / years) - 1) * 100.0 if (first > 0 and years > 0) else 0.0
x = np.arange(len(y))
m, _b = np.polyfit(x, y.values.astype(float), 1)
df = y.to_frame("val")
df["month"] = df.index.month
by_month = df.groupby("month")["val"].mean()
seas_strength = 0.0
if df["val"].std() != 0 and by_month.std() != 0:
seas_strength = float(by_month.std() / df["val"].std())
def _acf_at_lag(series, lag):
if len(series) <= lag:
return 0.0
s0 = series - series.mean()
num = (s0.iloc[lag:] * s0.iloc[:-lag]).sum()
den = (s0 * s0).sum()
return float(num / den) if den != 0 else 0.0
acf12 = _acf_at_lag(y, 12)
avg_ci_w = None
avg_ci_rel = None
if ci_lower is not None and ci_upper is not None and len(ci_lower) == len(fc_mean):
widths = (ci_upper.values - ci_lower.values)
avg_ci_w = float(np.mean(widths))
denom = np.maximum(np.abs(fc_mean.values), 1e-9)
avg_ci_rel = float(np.mean(widths / denom) * 100.0)
fc_vs_last12 = None
if len(fc_mean):
base = float(last12.mean()) if len(last12) else float(y.mean())
if base == 0:
base = 1e-9
fc_vs_last12 = (float(fc_mean.mean()) - base) / base * 100.0
return {
"n_months": n,
"start": y.index[0].date(),
"end": y.index[-1].date(),
"mean": float(y.mean()),
"median": float(y.median()),
"stdev": float(y.std(ddof=1) if len(y) > 1 else 0.0),
"slope_per_month": float(m),
"cagr_pct": float(cagr),
"last6": float(y.iloc[-6:].mean()) if n >= 6 else float(y.mean()),
"prev6": float(y.iloc[-12:-6].mean()) if n >= 12 else float("nan"),
"yoy_pct": None if yoy is None else float(yoy),
"seas_strength": float(seas_strength),
"acf12": float(acf12),
"avg_ci_w": avg_ci_w,
"avg_ci_rel": avg_ci_rel,
"fc_vs_last12": fc_vs_last12,
}
def _build_prompt(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series | None, ci_upper: pd.Series | None) -> str:
st = _series_stats(y, fc_mean, ci_lower, ci_upper)
def _fmt(v, nd=2):
try:
return f"{v:,.{nd}f}"
except Exception:
return str(v)
bullet = "\n".join([
f"- Coverage: {st['n_months']} months from {st['start']} to {st['end']}",
f"- Central tendency: mean {_fmt(st['mean'])}, median {_fmt(st['median'])}",
f"- Volatility: stdev {_fmt(st['stdev'])}",
f"- Trend: slope {_fmt(st['slope_per_month'])}/month; CAGR {_fmt(st['cagr_pct'])}%",
f"- 6m momentum: last 6m avg {_fmt(st['last6'])} vs prior 6m {_fmt(st['prev6'])}",
f"- YoY: {'n/a' if st['yoy_pct'] is None else _fmt(st['yoy_pct']) + '%'}",
f"- Seasonality: strength {_fmt(st['seas_strength'])}, ACF(12) {_fmt(st['acf12'])}",
f"- Forecast vs last-12m: {'n/a' if st['fc_vs_last12'] is None else _fmt(st['fc_vs_last12']) + '%'}",
f"- CI: {'width ~' + _fmt(st['avg_ci_w']) + ' (' + _fmt(st['avg_ci_rel']) + '% of forecast)' if st['avg_ci_w'] is not None else 'not available'}",
])
return f"""
You are a concise FinOps analyst. Write 8–12 sentences explaining **why** the metrics matter:
- Tie slope and CAGR to long-term consumption pressure.
- Explain seasonality strength/ACF and how peaks/troughs drive reservation/commit decisions.
- Compare the 12-month forecast to the prior year and discuss planning implications.
- Interpret CI width (uncertainty) and what risk controls to put in place.
- End with 2–3 concrete actions (commitments, rightsizing, budget guardrails).
Do NOT use bullet points in your answer.
Context (do not repeat verbatim; use to ground your reasoning):
{bullet}
""".strip()
def _call_openai(prompt: str, model: str = None, temperature: float = 0.2, max_tokens: int = 650) -> str | None:
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key or OpenAI is None:
return None
try:
client = OpenAI(api_key=api_key)
model = os.environ.get("OPENAI_MODEL", model or "gpt-4o-mini")
resp = client.chat.completions.create(
model=model,
temperature=temperature,
max_tokens=max_tokens,
messages=[
{"role": "system", "content": "You are a concise, numerate FinOps analyst."},
{"role": "user", "content": prompt},
],
)
return (resp.choices[0].message.content or "").strip()
except Exception:
return None
def _local_narrative(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series, ci_upper: pd.Series) -> str:
st = _series_stats(y, fc_mean, ci_lower, ci_upper)
trend_word = "rising" if st["slope_per_month"] > 0 else ("declining" if st["slope_per_month"] < 0 else "flat")
vol = "low" if st["stdev"] < 0.15 * st["mean"] else ("elevated" if st["stdev"] > 0.4 * st["mean"] else "moderate")
seas_word = (
"pronounced" if st["seas_strength"] >= 0.75 or abs(st["acf12"]) >= 0.4
else "moderate" if st["seas_strength"] >= 0.35 or abs(st["acf12"]) >= 0.2
else "minimal"
)
yoy_phrase = (
f"Year over year, the last twelve months averaged {st['yoy_pct']:.2f}% "
f"{'higher' if (st['yoy_pct'] or 0) > 0 else 'lower'} than the prior year. "
if st["yoy_pct"] is not None else ""
)
fc_phrase = (
f"The next twelve months are projected "
f"{'above' if (st['fc_vs_last12'] or 0) > 0 else 'below'} the last year by "
f"{abs(st['fc_vs_last12'] or 0):.2f}%. "
if st["fc_vs_last12"] is not None else ""
)
ci_phrase = (
f"Uncertainty averages ~{st['avg_ci_rel']:.1f}% of the forecast, suggesting "
f"{'wide' if (st['avg_ci_rel'] or 0) > 35 else 'tight'} bands. "
if st["avg_ci_rel"] is not None else "Confidence intervals were not computed for this run. "
)
return (
f"Consumption shows a {trend_word} long-term trend (slope {st['slope_per_month']:,.2f}/month; "
f"CAGR {st['cagr_pct']:.2f}%). Variability is {vol} "
f"(stdev {st['stdev']:,.2f} vs mean {st['mean']:,.2f}). "
f"Seasonality appears {seas_word} with ACF(12)={st['acf12']:.2f} and dispersion index {st['seas_strength']:.2f}, "
f"which should guide when to time reservations and rightsizing. "
f"Recent momentum: last six months averaged {st['last6']:,.2f} versus {st['prev6']:,.2f}. "
f"{yoy_phrase}{fc_phrase}"
f"{ci_phrase}"
"Actions: (1) Align commitments to seasonal crest months; "
"(2) Right-size persistently under-utilized services ahead of troughs; "
"(3) Set budget guardrails near the forecast mean plus half the average CI width."
).strip()
def _generate_narrative(y: pd.Series, fc_mean: pd.Series, ci_lower: pd.Series, ci_upper: pd.Series) -> str:
prompt = _build_prompt(y, fc_mean, ci_lower, ci_upper)
text = _call_openai(prompt) # returns None if no key or error
if text is None or not isinstance(text, str) or not text.strip():
text = _local_narrative(y, fc_mean, ci_lower, ci_upper)
return text
# ---------- Gradio pipeline ----------
def run_pipeline(file_obj, interval_source: str = "Bootstrap (recommended)"):
if file_obj is None:
raise gr.Error("Please upload your CSV/XLS file first.")
df = _read_table(file_obj)
y = _prepare_series(df)
method = "bootstrap" if "Bootstrap" in interval_source else "sarima"
fc_mean, ci_lower, ci_upper = _forecast_holt_with_ci(y, H=12, n_boot=800, interval_method=method)
# Build combined CSV (history + 12m forecast + CI)
actual_df = pd.DataFrame({"Actual": y})
fc_df = pd.DataFrame({"Forecast": fc_mean, "Lower": ci_lower, "Upper": ci_upper})
out_df = pd.concat([actual_df, fc_df], axis=0)
# Save CSV for download
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix="forecast_12m_with_ci_")
out_path = tmp.name
out_df.to_csv(out_path, index=True)
tmp.close()
# Render figure
fig = _make_figure(y, fc_mean, ci_lower, ci_upper)
# ---- Provider-targeted bullet summary (Markdown) ----
bullets_md = analyze_consumption(
y=y,
fc_mean=fc_mean,
ci_lower=ci_lower,
ci_upper=ci_upper,
provider="auto", # uses OpenAI if key present; else local deterministic
cloud_provider="oci", # default is OCI; invalid inputs also resolve to OCI
render="md", # Markdown bullets
)
# ---- Explanatory narrative (Markdown prose) ----
narrative_md = _generate_narrative(y, fc_mean, ci_lower, ci_upper)
# Combine into one Markdown block
combined_md = (
"### Provider-Targeted Summary\n"
f"{bullets_md}\n\n"
"---\n\n"
"### Narrative: What the variables mean and why they matter\n"
f"{narrative_md}\n"
)
return fig, out_path, combined_md
# ---------- UI ----------
with gr.Blocks(title="OCI Consumption Forecasting") as demo:
gr.Markdown(
"# OCI Consumption Forecasting\n"
"Upload your **CSV/XLS** with either:\n"
"- `Metered Service Date` + `Computed Amount CD` (raw export), or\n"
"- `Date` + `Amount` (already grouped).\n\n"
"**Requirements:** continuous **monthly** data, at least **24 months**."
)
with gr.Row():
file_in = gr.File(label="Upload CSV/XLS", file_types=[".csv", ".xls", ".xlsx"])
interval_src = gr.Dropdown(
["Bootstrap (recommended)", "SARIMA (intervals only)"],
value="Bootstrap (recommended)",
label="Confidence Interval Method"
)
run_btn = gr.Button("Run Forecast", variant="primary")
plot_out = gr.Plot(label="12-Month Forecast")
csv_out = gr.File(label="Download forecast_12m_with_ci.csv")
# Markdown output (bullets + narrative)
analysis_out = gr.Markdown(label="Analysis (Markdown)")
run_btn.click(
fn=run_pipeline,
inputs=[file_in, interval_src],
outputs=[plot_out, csv_out, analysis_out]
)
if __name__ == "__main__":
# On Hugging Face Spaces, share=True is fine; no _js hooks used.
demo.launch(share=True)