OCIconsumptionForecaster / llm_consumption_analysis.py
kgauvin603's picture
Update llm_consumption_analysis.py
d73b0d2 verified
"""
llm_consumption_analysis.py
Drop-in module to generate a *bulleted* FinOps narrative focused on the CLOUD PROVIDER lens
(OCI, AWS, Azure, or GCP). Supports Markdown or HTML rendering.
Bullets cover:
- Provider lens / coverage
- Trend & growth
- Seasonality
- Momentum & YoY
- 12-month forecast vs last-12m
- Confidence interval & risk
- Vendor-specific actions
Quick start (after you compute y, fc_mean, ci_lower, ci_upper):
from llm_consumption_analysis import analyze_consumption
# Markdown (default)
md_text = analyze_consumption(y, fc_mean, ci_lower, ci_upper)
# in Gradio:
# gr.Markdown(md_text)
# HTML (true <ul><li> bullets)
html_text = analyze_consumption(y, fc_mean, ci_lower, ci_upper, render="html")
# in Gradio:
# gr.HTML(html_text)
OpenAI env (optional):
- OPENAI_API_KEY (required for provider="openai" or "auto" when key present)
- OPENAI_MODEL (optional; overrides `model`)
"""
from __future__ import annotations
import os
import html as _html
from dataclasses import dataclass
from typing import Dict, Optional
import numpy as np
import pandas as pd
# Optional OpenAI import (only used if key present)
try:
from openai import OpenAI # openai>=1.0
except Exception: # pragma: no cover
OpenAI = None # type: ignore
# ============================
# Stats & Feature Engineering
# ============================
@dataclass
class SeriesStats:
start: pd.Timestamp
end: pd.Timestamp
n_months: int
total_sum: float
mean: float
median: float
std: float
min_val: float
max_val: float
cagr_pct: float
slope_per_month: float
last_6m_mean: float
prev_6m_mean: float
yoy_change_pct: Optional[float]
seasonality_strength: float
acf12: float
avg_ci_width: Optional[float]
avg_ci_rel_width_pct: Optional[float]
def _pct(a: float, b: float) -> Optional[float]:
try:
if b == 0 or np.isnan(a) or np.isnan(b):
return None
return 100.0 * (a - b) / b
except Exception:
return None
def _seasonality_strength(y: pd.Series) -> float:
"""Normalized variance of month-of-year means. 0≈none; higher=stronger."""
df = y.to_frame("val")
df["month"] = df.index.month
grp = df.groupby("month")["val"]
by_month = grp.mean()
if by_month.std() == 0 or df["val"].std() == 0:
return 0.0
return float((by_month.std() / df["val"].std()))
def _acf_at_lag(y: pd.Series, lag: int) -> float:
if len(y) <= lag:
return 0.0
y0 = y - y.mean()
num = (y0.iloc[lag:] * y0.iloc[:-lag]).sum()
den = (y0 * y0).sum()
return float(num / den) if den != 0 else 0.0
def _cagr(y: pd.Series) -> float:
months = max(1, (y.index[-1].to_period("M") - y.index[0].to_period("M")).n)
years = months / 12.0
first, last = float(y.iloc[0]), float(y.iloc[-1])
if first <= 0 or years <= 0:
return 0.0
return float((last / first) ** (1 / years) - 1) * 100.0
def _slope(y: pd.Series) -> float:
x = np.arange(len(y))
m, _b = np.polyfit(x, y.values.astype(float), 1)
return float(m) # units per month
def summarize(y: pd.Series, fc_mean: pd.Series,
ci_lower: Optional[pd.Series] = None,
ci_upper: Optional[pd.Series] = None) -> SeriesStats:
y = y.sort_index()
n = len(y)
last_6 = y.iloc[-6:] if n >= 6 else y
prev_6 = y.iloc[-12:-6] if n >= 12 else y.iloc[:-6]
last_12 = y.iloc[-12:] if n >= 12 else y
prev_12 = y.iloc[-24:-12] if n >= 24 else y.iloc[:-12]
yoy_change = None
if len(last_12) and len(prev_12) and prev_12.mean() != 0:
yoy_change = 100.0 * (last_12.mean() - prev_12.mean()) / prev_12.mean()
avg_ci_w = None
avg_ci_rel = None
if ci_lower is not None and ci_upper is not None and len(ci_lower) == len(fc_mean):
widths = (ci_upper.values - ci_lower.values)
avg_ci_w = float(np.mean(widths))
denom = np.maximum(np.abs(fc_mean.values), 1e-9)
avg_ci_rel = float(np.mean(widths / denom) * 100.0)
return SeriesStats(
start=y.index[0],
end=y.index[-1],
n_months=len(y),
total_sum=float(y.sum()),
mean=float(y.mean()),
median=float(y.median()),
std=float(y.std(ddof=1) if len(y) > 1 else 0.0),
min_val=float(y.min()),
max_val=float(y.max()),
cagr_pct=_cagr(y),
slope_per_month=_slope(y),
last_6m_mean=float(last_6.mean()),
prev_6m_mean=float(prev_6.mean()) if len(prev_6) else float("nan"),
yoy_change_pct=yoy_change,
seasonality_strength=_seasonality_strength(y),
acf12=_acf_at_lag(y, 12),
avg_ci_width=avg_ci_w,
avg_ci_rel_width_pct=avg_ci_rel,
)
# ===============
# Provider Lens
# ===============
_VENDOR_LENS: Dict[str, Dict[str, str]] = {
"oci": {
"name": "Oracle Cloud Infrastructure (OCI)",
"actions": (
"- **Actions (OCI):** Align Universal Credit commitments to peak months; apply **Autoscaling** and **Right-Sizing** on OCPU/eCPU heavy services; "
"evaluate **Block Volume tiering** and **Object Storage lifecycle**; consider **Capacity Reservations** for steady compute; "
"govern spend with **Budgets/Alerts** and **Compartment** chargeback."
),
},
"aws": {
"name": "Amazon Web Services (AWS)",
"actions": (
"- **Actions (AWS):** Use **Compute Savings Plans/EC2 RIs** for steady cores; enable **EBS gp3** migration and **S3 Intelligent-Tiering**; "
"apply **Graviton** where viable; enforce **Cost Categories/Budgets**; leverage **Compute Optimizer** rightsizing."
),
},
"azure": {
"name": "Microsoft Azure",
"actions": (
"- **Actions (Azure):** Leverage **Reservations** and **Savings Plan for Compute**; apply **Azure Hybrid Benefit**; "
"use **Advisor** recommendations; optimize **Managed Disks**/**Blob access tiers**; set **Cost Management + Budgets**."
),
},
"gcp": {
"name": "Google Cloud Platform (GCP)",
"actions": (
"- **Actions (GCP):** Apply **Committed Use Discounts (CUDs)**; capture **Sustained Use** where applicable; "
"adopt **Active Assist Recommender** for rightsizing; use **Object Lifecycle** policies; set **Budgets/Alerts**."
),
},
}
def _provider_key(cloud_provider: Optional[str]) -> str:
if not cloud_provider:
return "oci"
key = cloud_provider.strip().lower()
return key if key in _VENDOR_LENS else "oci"
# ===============
# Prompt Builder
# ===============
def build_prompt(
y: pd.Series,
fc_mean: pd.Series,
ci_lower: Optional[pd.Series],
ci_upper: Optional[pd.Series],
cloud_provider: str,
) -> str:
st = summarize(y, fc_mean, ci_lower, ci_upper)
vendor = _VENDOR_LENS[_provider_key(cloud_provider)]
fc12_mean = float(fc_mean.mean())
last12_mean = float(y.iloc[-12:].mean() if len(y) >= 12 else y.mean())
fc_vs_last12_pct = _pct(fc12_mean, last12_mean)
ci_note = (
f"Average CI width ≈ {st.avg_ci_width:,.2f} (~{st.avg_ci_rel_width_pct:.1f}% of forecast)."
if st.avg_ci_width is not None and st.avg_ci_rel_width_pct is not None
else "Confidence interval width not available."
)
details: Dict[str, str] = {
"Provider lens": f"{vendor['name']} (provider-focused view; not customer-specific).",
"Coverage": f"{st.n_months} months from {st.start.date()} to {st.end.date()}",
"Trend & growth": f"slope {st.slope_per_month:,.2f} units/month; CAGR {st.cagr_pct:.2f}%",
"Central tendency": f"mean {st.mean:,.2f}, median {st.median:,.2f}",
"Volatility": f"stdev {st.std:,.2f}",
"Momentum (6m)": f"last 6m avg {st.last_6m_mean:,.2f} vs prior 6m {st.prev_6m_mean:,.2f}",
"YoY (last 12m vs prior 12m)": (f"{st.yoy_change_pct:.2f}%" if st.yoy_change_pct is not None else "n/a"),
"Seasonality": f"strength {st.seasonality_strength:.2f}, ACF(12) {st.acf12:.2f}",
"Forecast vs last-12m": (f"{fc_vs_last12_pct:.2f}%" if fc_vs_last12_pct is not None else "n/a"),
"CI": ci_note,
}
bullet_lines = "\n".join([f"- **{k}:** {v}" for k, v in details.items()])
prompt = f"""
You are a FinOps analyst writing for an executive audience. Produce **only Markdown bullet points**.
Each variable must be a single bullet (no paragraphs, no tables):
- Provider lens (explicitly reference the cloud provider)
- Trend & growth
- Seasonality
- Momentum & YoY
- 12-month forecast vs last-12m
- Confidence interval & risk
- 2–3 **provider-specific** action bullets (use native terms)
Rules:
- Keep bullets crisp and numeric when useful.
- No customer-specific language; focus on the cloud provider’s constructs.
- No extra commentary outside bullets.
Context (use to inform the bullets; do not copy verbatim):
{bullet_lines}
After those bullets, add exactly one additional bullet line with **provider-specific actions** using native terms for {vendor['name']}.
""".strip()
return prompt
# ======================
# LLM Provider Routines
# ======================
def _maybe_openai_client(model: Optional[str] = None):
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key or OpenAI is None:
return None, None
try:
client = OpenAI(api_key=api_key)
m = os.environ.get("OPENAI_MODEL", model or "gpt-4o-mini")
return client, m
except Exception:
return None, None
def call_openai(prompt: str, model: Optional[str] = None, temperature: float = 0.2, max_tokens: int = 700) -> Optional[str]:
client, m = _maybe_openai_client(model)
if client is None:
return None
try:
resp = client.chat.completions.create(
model=m,
temperature=temperature,
max_tokens=max_tokens,
messages=[
{"role": "system", "content": "You are a concise, numerate FinOps analyst. Output only Markdown bullet points."},
{"role": "user", "content": prompt},
],
)
return (resp.choices[0].message.content or "").strip()
except Exception:
return None
# ======================
# Bullet Formatting Helpers
# ======================
def _looks_like_bullets(text: str) -> bool:
if not text:
return False
lines = [ln.strip() for ln in text.strip().splitlines() if ln.strip()]
if len(lines) < 4:
return False
bulletish = sum(1 for ln in lines if ln.startswith(("-", "*")))
return bulletish >= max(4, int(0.7 * len(lines)))
def _md_to_html_bullets(md: str) -> str:
"""Very small converter: lines starting with '-' or '*' become <li>."""
items = []
for ln in md.strip().splitlines():
s = ln.strip()
if s.startswith(("-", "*")):
item = s.lstrip("-*").strip()
items.append(f"<li>{_html.escape(item)}</li>")
return f"<ul>{''.join(items)}</ul>" if items else "<ul></ul>"
def _html_list_from_lines(lines: list[str]) -> str:
items = []
for ln in lines:
s = ln.strip()
if s.startswith(("-", "*")):
s = s.lstrip("-*").strip()
items.append(f"<li>{_html.escape(s)}</li>")
return f"<ul>{''.join(items)}</ul>"
# ======================
# Public Entry Point
# ======================
def analyze_consumption(
y: pd.Series,
fc_mean: pd.Series,
ci_lower: Optional[pd.Series] = None,
ci_upper: Optional[pd.Series] = None,
provider: str = "auto", # "openai" | "none" | "auto"
model: Optional[str] = None,
cloud_provider: str = "oci", # default strictly OCI (invalid inputs resolve to OCI)
temperature: float = 0.2,
render: str = "md", # "md" or "html"
) -> str:
"""Return a provider-targeted bulleted analysis (Markdown or HTML)."""
prompt = build_prompt(y, fc_mean, ci_lower, ci_upper, cloud_provider=cloud_provider)
use_openai = (provider == "openai") or (provider == "auto" and os.environ.get("OPENAI_API_KEY"))
# ---- LLM path ----
if use_openai:
text = call_openai(prompt=prompt, model=model, temperature=temperature)
if text and _looks_like_bullets(text):
return _md_to_html_bullets(text) if render == "html" else text
# else fall through to local deterministic bullets
# ---- Local deterministic bullets ----
st = summarize(y, fc_mean, ci_lower, ci_upper)
vendor = _VENDOR_LENS[_provider_key(cloud_provider)]
fc12_mean = float(fc_mean.mean())
last12_mean = float(y.iloc[-12:].mean() if len(y) >= 12 else y.mean())
fc_vs_last12_pct = _pct(fc12_mean, last12_mean)
trend_word = "rising" if st.slope_per_month > 0 else ("declining" if st.slope_per_month < 0 else "flat")
seas_word = (
"pronounced" if st.seasonality_strength >= 0.75 or abs(st.acf12) >= 0.4
else "moderate" if st.seasonality_strength >= 0.35 or abs(st.acf12) >= 0.2
else "minimal"
)
vol_word = "low" if st.std < 0.15*st.mean else ("elevated" if st.std > 0.4*st.mean else "moderate")
yoy_txt = "n/a" if st.yoy_change_pct is None else f"{st.yoy_change_pct:.2f}%"
fc_txt = "n/a" if fc_vs_last12_pct is None else f"{fc_vs_last12_pct:.2f}%"
ci_txt = (
f"avg width {st.avg_ci_width:,.2f} (~{st.avg_ci_rel_width_pct:.1f}% of forecast)"
if st.avg_ci_width is not None and st.avg_ci_rel_width_pct is not None
else "not available"
)
md_lines = [
f"- **Provider lens:** {vendor['name']} (provider-focused view; not customer-specific).",
f"- **Coverage:** {st.n_months} months ({st.start.date()}{st.end.date()}).",
f"- **Trend & growth:** {trend_word}; slope {st.slope_per_month:,.2f}/mo; CAGR {st.cagr_pct:.2f}%.",
f"- **Seasonality:** {seas_word}; strength {st.seasonality_strength:.2f}; ACF(12) {st.acf12:.2f}.",
f"- **Momentum & YoY:** last 6m {st.last_6m_mean:,.2f} vs prior 6m {st.prev_6m_mean:,.2f}; YoY {yoy_txt}.",
f"- **12-mo forecast vs last-12m:** {fc_txt}.",
f"- **Confidence & risk:** {ci_txt}; volatility {vol_word} (stdev {st.std:,.2f} vs mean {st.mean:,.2f}).",
_VENDOR_LENS[_provider_key(cloud_provider)]["actions"],
]
if render == "html":
return _html_list_from_lines(md_lines)
# default: Markdown bullets
return "\n".join(md_lines)
# Convenience: build analysis directly from a dict of series
def analyze_from_outputs(outputs: Dict[str, pd.Series], **kwargs) -> str:
y = outputs["y"]
fc = outputs["fc_mean"]
lo = outputs.get("ci_lower")
up = outputs.get("ci_upper")
return analyze_consumption(y, fc, lo, up, **kwargs)