Spaces:
Sleeping
Sleeping
| """ | |
| llm_consumption_analysis.py | |
| Drop-in module to generate a *bulleted* FinOps narrative focused on the CLOUD PROVIDER lens | |
| (OCI, AWS, Azure, or GCP). Supports Markdown or HTML rendering. | |
| Bullets cover: | |
| - Provider lens / coverage | |
| - Trend & growth | |
| - Seasonality | |
| - Momentum & YoY | |
| - 12-month forecast vs last-12m | |
| - Confidence interval & risk | |
| - Vendor-specific actions | |
| Quick start (after you compute y, fc_mean, ci_lower, ci_upper): | |
| from llm_consumption_analysis import analyze_consumption | |
| # Markdown (default) | |
| md_text = analyze_consumption(y, fc_mean, ci_lower, ci_upper) | |
| # in Gradio: | |
| # gr.Markdown(md_text) | |
| # HTML (true <ul><li> bullets) | |
| html_text = analyze_consumption(y, fc_mean, ci_lower, ci_upper, render="html") | |
| # in Gradio: | |
| # gr.HTML(html_text) | |
| OpenAI env (optional): | |
| - OPENAI_API_KEY (required for provider="openai" or "auto" when key present) | |
| - OPENAI_MODEL (optional; overrides `model`) | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import html as _html | |
| from dataclasses import dataclass | |
| from typing import Dict, Optional | |
| import numpy as np | |
| import pandas as pd | |
| # Optional OpenAI import (only used if key present) | |
| try: | |
| from openai import OpenAI # openai>=1.0 | |
| except Exception: # pragma: no cover | |
| OpenAI = None # type: ignore | |
| # ============================ | |
| # Stats & Feature Engineering | |
| # ============================ | |
| class SeriesStats: | |
| start: pd.Timestamp | |
| end: pd.Timestamp | |
| n_months: int | |
| total_sum: float | |
| mean: float | |
| median: float | |
| std: float | |
| min_val: float | |
| max_val: float | |
| cagr_pct: float | |
| slope_per_month: float | |
| last_6m_mean: float | |
| prev_6m_mean: float | |
| yoy_change_pct: Optional[float] | |
| seasonality_strength: float | |
| acf12: float | |
| avg_ci_width: Optional[float] | |
| avg_ci_rel_width_pct: Optional[float] | |
| def _pct(a: float, b: float) -> Optional[float]: | |
| try: | |
| if b == 0 or np.isnan(a) or np.isnan(b): | |
| return None | |
| return 100.0 * (a - b) / b | |
| except Exception: | |
| return None | |
| def _seasonality_strength(y: pd.Series) -> float: | |
| """Normalized variance of month-of-year means. 0≈none; higher=stronger.""" | |
| df = y.to_frame("val") | |
| df["month"] = df.index.month | |
| grp = df.groupby("month")["val"] | |
| by_month = grp.mean() | |
| if by_month.std() == 0 or df["val"].std() == 0: | |
| return 0.0 | |
| return float((by_month.std() / df["val"].std())) | |
| def _acf_at_lag(y: pd.Series, lag: int) -> float: | |
| if len(y) <= lag: | |
| return 0.0 | |
| y0 = y - y.mean() | |
| num = (y0.iloc[lag:] * y0.iloc[:-lag]).sum() | |
| den = (y0 * y0).sum() | |
| return float(num / den) if den != 0 else 0.0 | |
| def _cagr(y: pd.Series) -> float: | |
| months = max(1, (y.index[-1].to_period("M") - y.index[0].to_period("M")).n) | |
| years = months / 12.0 | |
| first, last = float(y.iloc[0]), float(y.iloc[-1]) | |
| if first <= 0 or years <= 0: | |
| return 0.0 | |
| return float((last / first) ** (1 / years) - 1) * 100.0 | |
| def _slope(y: pd.Series) -> float: | |
| x = np.arange(len(y)) | |
| m, _b = np.polyfit(x, y.values.astype(float), 1) | |
| return float(m) # units per month | |
| def summarize(y: pd.Series, fc_mean: pd.Series, | |
| ci_lower: Optional[pd.Series] = None, | |
| ci_upper: Optional[pd.Series] = None) -> SeriesStats: | |
| y = y.sort_index() | |
| n = len(y) | |
| last_6 = y.iloc[-6:] if n >= 6 else y | |
| prev_6 = y.iloc[-12:-6] if n >= 12 else y.iloc[:-6] | |
| last_12 = y.iloc[-12:] if n >= 12 else y | |
| prev_12 = y.iloc[-24:-12] if n >= 24 else y.iloc[:-12] | |
| yoy_change = None | |
| if len(last_12) and len(prev_12) and prev_12.mean() != 0: | |
| yoy_change = 100.0 * (last_12.mean() - prev_12.mean()) / prev_12.mean() | |
| avg_ci_w = None | |
| avg_ci_rel = None | |
| if ci_lower is not None and ci_upper is not None and len(ci_lower) == len(fc_mean): | |
| widths = (ci_upper.values - ci_lower.values) | |
| avg_ci_w = float(np.mean(widths)) | |
| denom = np.maximum(np.abs(fc_mean.values), 1e-9) | |
| avg_ci_rel = float(np.mean(widths / denom) * 100.0) | |
| return SeriesStats( | |
| start=y.index[0], | |
| end=y.index[-1], | |
| n_months=len(y), | |
| total_sum=float(y.sum()), | |
| mean=float(y.mean()), | |
| median=float(y.median()), | |
| std=float(y.std(ddof=1) if len(y) > 1 else 0.0), | |
| min_val=float(y.min()), | |
| max_val=float(y.max()), | |
| cagr_pct=_cagr(y), | |
| slope_per_month=_slope(y), | |
| last_6m_mean=float(last_6.mean()), | |
| prev_6m_mean=float(prev_6.mean()) if len(prev_6) else float("nan"), | |
| yoy_change_pct=yoy_change, | |
| seasonality_strength=_seasonality_strength(y), | |
| acf12=_acf_at_lag(y, 12), | |
| avg_ci_width=avg_ci_w, | |
| avg_ci_rel_width_pct=avg_ci_rel, | |
| ) | |
| # =============== | |
| # Provider Lens | |
| # =============== | |
| _VENDOR_LENS: Dict[str, Dict[str, str]] = { | |
| "oci": { | |
| "name": "Oracle Cloud Infrastructure (OCI)", | |
| "actions": ( | |
| "- **Actions (OCI):** Align Universal Credit commitments to peak months; apply **Autoscaling** and **Right-Sizing** on OCPU/eCPU heavy services; " | |
| "evaluate **Block Volume tiering** and **Object Storage lifecycle**; consider **Capacity Reservations** for steady compute; " | |
| "govern spend with **Budgets/Alerts** and **Compartment** chargeback." | |
| ), | |
| }, | |
| "aws": { | |
| "name": "Amazon Web Services (AWS)", | |
| "actions": ( | |
| "- **Actions (AWS):** Use **Compute Savings Plans/EC2 RIs** for steady cores; enable **EBS gp3** migration and **S3 Intelligent-Tiering**; " | |
| "apply **Graviton** where viable; enforce **Cost Categories/Budgets**; leverage **Compute Optimizer** rightsizing." | |
| ), | |
| }, | |
| "azure": { | |
| "name": "Microsoft Azure", | |
| "actions": ( | |
| "- **Actions (Azure):** Leverage **Reservations** and **Savings Plan for Compute**; apply **Azure Hybrid Benefit**; " | |
| "use **Advisor** recommendations; optimize **Managed Disks**/**Blob access tiers**; set **Cost Management + Budgets**." | |
| ), | |
| }, | |
| "gcp": { | |
| "name": "Google Cloud Platform (GCP)", | |
| "actions": ( | |
| "- **Actions (GCP):** Apply **Committed Use Discounts (CUDs)**; capture **Sustained Use** where applicable; " | |
| "adopt **Active Assist Recommender** for rightsizing; use **Object Lifecycle** policies; set **Budgets/Alerts**." | |
| ), | |
| }, | |
| } | |
| def _provider_key(cloud_provider: Optional[str]) -> str: | |
| if not cloud_provider: | |
| return "oci" | |
| key = cloud_provider.strip().lower() | |
| return key if key in _VENDOR_LENS else "oci" | |
| # =============== | |
| # Prompt Builder | |
| # =============== | |
| def build_prompt( | |
| y: pd.Series, | |
| fc_mean: pd.Series, | |
| ci_lower: Optional[pd.Series], | |
| ci_upper: Optional[pd.Series], | |
| cloud_provider: str, | |
| ) -> str: | |
| st = summarize(y, fc_mean, ci_lower, ci_upper) | |
| vendor = _VENDOR_LENS[_provider_key(cloud_provider)] | |
| fc12_mean = float(fc_mean.mean()) | |
| last12_mean = float(y.iloc[-12:].mean() if len(y) >= 12 else y.mean()) | |
| fc_vs_last12_pct = _pct(fc12_mean, last12_mean) | |
| ci_note = ( | |
| f"Average CI width ≈ {st.avg_ci_width:,.2f} (~{st.avg_ci_rel_width_pct:.1f}% of forecast)." | |
| if st.avg_ci_width is not None and st.avg_ci_rel_width_pct is not None | |
| else "Confidence interval width not available." | |
| ) | |
| details: Dict[str, str] = { | |
| "Provider lens": f"{vendor['name']} (provider-focused view; not customer-specific).", | |
| "Coverage": f"{st.n_months} months from {st.start.date()} to {st.end.date()}", | |
| "Trend & growth": f"slope {st.slope_per_month:,.2f} units/month; CAGR {st.cagr_pct:.2f}%", | |
| "Central tendency": f"mean {st.mean:,.2f}, median {st.median:,.2f}", | |
| "Volatility": f"stdev {st.std:,.2f}", | |
| "Momentum (6m)": f"last 6m avg {st.last_6m_mean:,.2f} vs prior 6m {st.prev_6m_mean:,.2f}", | |
| "YoY (last 12m vs prior 12m)": (f"{st.yoy_change_pct:.2f}%" if st.yoy_change_pct is not None else "n/a"), | |
| "Seasonality": f"strength {st.seasonality_strength:.2f}, ACF(12) {st.acf12:.2f}", | |
| "Forecast vs last-12m": (f"{fc_vs_last12_pct:.2f}%" if fc_vs_last12_pct is not None else "n/a"), | |
| "CI": ci_note, | |
| } | |
| bullet_lines = "\n".join([f"- **{k}:** {v}" for k, v in details.items()]) | |
| prompt = f""" | |
| You are a FinOps analyst writing for an executive audience. Produce **only Markdown bullet points**. | |
| Each variable must be a single bullet (no paragraphs, no tables): | |
| - Provider lens (explicitly reference the cloud provider) | |
| - Trend & growth | |
| - Seasonality | |
| - Momentum & YoY | |
| - 12-month forecast vs last-12m | |
| - Confidence interval & risk | |
| - 2–3 **provider-specific** action bullets (use native terms) | |
| Rules: | |
| - Keep bullets crisp and numeric when useful. | |
| - No customer-specific language; focus on the cloud provider’s constructs. | |
| - No extra commentary outside bullets. | |
| Context (use to inform the bullets; do not copy verbatim): | |
| {bullet_lines} | |
| After those bullets, add exactly one additional bullet line with **provider-specific actions** using native terms for {vendor['name']}. | |
| """.strip() | |
| return prompt | |
| # ====================== | |
| # LLM Provider Routines | |
| # ====================== | |
| def _maybe_openai_client(model: Optional[str] = None): | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key or OpenAI is None: | |
| return None, None | |
| try: | |
| client = OpenAI(api_key=api_key) | |
| m = os.environ.get("OPENAI_MODEL", model or "gpt-4o-mini") | |
| return client, m | |
| except Exception: | |
| return None, None | |
| def call_openai(prompt: str, model: Optional[str] = None, temperature: float = 0.2, max_tokens: int = 700) -> Optional[str]: | |
| client, m = _maybe_openai_client(model) | |
| if client is None: | |
| return None | |
| try: | |
| resp = client.chat.completions.create( | |
| model=m, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| messages=[ | |
| {"role": "system", "content": "You are a concise, numerate FinOps analyst. Output only Markdown bullet points."}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| ) | |
| return (resp.choices[0].message.content or "").strip() | |
| except Exception: | |
| return None | |
| # ====================== | |
| # Bullet Formatting Helpers | |
| # ====================== | |
| def _looks_like_bullets(text: str) -> bool: | |
| if not text: | |
| return False | |
| lines = [ln.strip() for ln in text.strip().splitlines() if ln.strip()] | |
| if len(lines) < 4: | |
| return False | |
| bulletish = sum(1 for ln in lines if ln.startswith(("-", "*"))) | |
| return bulletish >= max(4, int(0.7 * len(lines))) | |
| def _md_to_html_bullets(md: str) -> str: | |
| """Very small converter: lines starting with '-' or '*' become <li>.""" | |
| items = [] | |
| for ln in md.strip().splitlines(): | |
| s = ln.strip() | |
| if s.startswith(("-", "*")): | |
| item = s.lstrip("-*").strip() | |
| items.append(f"<li>{_html.escape(item)}</li>") | |
| return f"<ul>{''.join(items)}</ul>" if items else "<ul></ul>" | |
| def _html_list_from_lines(lines: list[str]) -> str: | |
| items = [] | |
| for ln in lines: | |
| s = ln.strip() | |
| if s.startswith(("-", "*")): | |
| s = s.lstrip("-*").strip() | |
| items.append(f"<li>{_html.escape(s)}</li>") | |
| return f"<ul>{''.join(items)}</ul>" | |
| # ====================== | |
| # Public Entry Point | |
| # ====================== | |
| def analyze_consumption( | |
| y: pd.Series, | |
| fc_mean: pd.Series, | |
| ci_lower: Optional[pd.Series] = None, | |
| ci_upper: Optional[pd.Series] = None, | |
| provider: str = "auto", # "openai" | "none" | "auto" | |
| model: Optional[str] = None, | |
| cloud_provider: str = "oci", # default strictly OCI (invalid inputs resolve to OCI) | |
| temperature: float = 0.2, | |
| render: str = "md", # "md" or "html" | |
| ) -> str: | |
| """Return a provider-targeted bulleted analysis (Markdown or HTML).""" | |
| prompt = build_prompt(y, fc_mean, ci_lower, ci_upper, cloud_provider=cloud_provider) | |
| use_openai = (provider == "openai") or (provider == "auto" and os.environ.get("OPENAI_API_KEY")) | |
| # ---- LLM path ---- | |
| if use_openai: | |
| text = call_openai(prompt=prompt, model=model, temperature=temperature) | |
| if text and _looks_like_bullets(text): | |
| return _md_to_html_bullets(text) if render == "html" else text | |
| # else fall through to local deterministic bullets | |
| # ---- Local deterministic bullets ---- | |
| st = summarize(y, fc_mean, ci_lower, ci_upper) | |
| vendor = _VENDOR_LENS[_provider_key(cloud_provider)] | |
| fc12_mean = float(fc_mean.mean()) | |
| last12_mean = float(y.iloc[-12:].mean() if len(y) >= 12 else y.mean()) | |
| fc_vs_last12_pct = _pct(fc12_mean, last12_mean) | |
| trend_word = "rising" if st.slope_per_month > 0 else ("declining" if st.slope_per_month < 0 else "flat") | |
| seas_word = ( | |
| "pronounced" if st.seasonality_strength >= 0.75 or abs(st.acf12) >= 0.4 | |
| else "moderate" if st.seasonality_strength >= 0.35 or abs(st.acf12) >= 0.2 | |
| else "minimal" | |
| ) | |
| vol_word = "low" if st.std < 0.15*st.mean else ("elevated" if st.std > 0.4*st.mean else "moderate") | |
| yoy_txt = "n/a" if st.yoy_change_pct is None else f"{st.yoy_change_pct:.2f}%" | |
| fc_txt = "n/a" if fc_vs_last12_pct is None else f"{fc_vs_last12_pct:.2f}%" | |
| ci_txt = ( | |
| f"avg width {st.avg_ci_width:,.2f} (~{st.avg_ci_rel_width_pct:.1f}% of forecast)" | |
| if st.avg_ci_width is not None and st.avg_ci_rel_width_pct is not None | |
| else "not available" | |
| ) | |
| md_lines = [ | |
| f"- **Provider lens:** {vendor['name']} (provider-focused view; not customer-specific).", | |
| f"- **Coverage:** {st.n_months} months ({st.start.date()} → {st.end.date()}).", | |
| f"- **Trend & growth:** {trend_word}; slope {st.slope_per_month:,.2f}/mo; CAGR {st.cagr_pct:.2f}%.", | |
| f"- **Seasonality:** {seas_word}; strength {st.seasonality_strength:.2f}; ACF(12) {st.acf12:.2f}.", | |
| f"- **Momentum & YoY:** last 6m {st.last_6m_mean:,.2f} vs prior 6m {st.prev_6m_mean:,.2f}; YoY {yoy_txt}.", | |
| f"- **12-mo forecast vs last-12m:** {fc_txt}.", | |
| f"- **Confidence & risk:** {ci_txt}; volatility {vol_word} (stdev {st.std:,.2f} vs mean {st.mean:,.2f}).", | |
| _VENDOR_LENS[_provider_key(cloud_provider)]["actions"], | |
| ] | |
| if render == "html": | |
| return _html_list_from_lines(md_lines) | |
| # default: Markdown bullets | |
| return "\n".join(md_lines) | |
| # Convenience: build analysis directly from a dict of series | |
| def analyze_from_outputs(outputs: Dict[str, pd.Series], **kwargs) -> str: | |
| y = outputs["y"] | |
| fc = outputs["fc_mean"] | |
| lo = outputs.get("ci_lower") | |
| up = outputs.get("ci_upper") | |
| return analyze_consumption(y, fc, lo, up, **kwargs) | |