Spaces:
Build error
Build error
| # app.py — Corporate Forecast Agent (Gradio, HF Spaces) | |
| # Notes: | |
| # - Upload a CSV with columns: date (YYYY-MM), revenue, cogs, opex | |
| # - Click buttons to run Baseline / Best / Base / Worst scenarios | |
| # - Shows plots and lets you download the forecast CSV | |
| # - Focus on transparency; validations explain why results pass/fail | |
| import io | |
| import json | |
| import hashlib | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| from datetime import datetime | |
| # Optional: statsmodels for SARIMAX baseline | |
| try: | |
| from statsmodels.tsa.statespace.sarimax import SARIMAX | |
| except Exception: | |
| SARIMAX = None | |
| HORIZON_MONTHS = 18 | |
| np.random.seed(42) | |
| # ---------------------- helpers ---------------------- | |
| def audit_row(drivers, status): | |
| ts = datetime.utcnow().isoformat() | |
| h = hashlib.sha256(json.dumps(drivers, sort_keys=True).encode()).hexdigest()[:8] | |
| return {"timestamp": ts, "driver_hash": h, "status": status} | |
| def load_csv(file_obj): | |
| df = pd.read_csv(file_obj) | |
| df.columns = [c.strip().lower() for c in df.columns] | |
| required = {"date","revenue","cogs","opex"} | |
| if not required.issubset(set(df.columns)): | |
| raise ValueError(f"CSV must contain columns: {sorted(list(required))}") | |
| df["date"] = pd.to_datetime(df["date"]).dt.to_period("M").dt.to_timestamp() | |
| df = df.sort_values("date").reset_index(drop=True) | |
| return df | |
| # ---------------------- baseline forecast ---------------------- | |
| def fit_series(y): | |
| # Fit SARIMAX if available; otherwise fallback to simple growth extrapolation | |
| y = pd.Series(y) | |
| if SARIMAX is not None and len(y) >= 12: | |
| try: | |
| model = SARIMAX(y, order=(1,1,1), seasonal_order=(0,0,0,0), trend='t', enforce_stationarity=False, enforce_invertibility=False) | |
| res = model.fit(disp=False) | |
| fc = res.get_forecast(steps=HORIZON_MONTHS).predicted_mean | |
| return np.maximum(fc.values, 0.0) | |
| except Exception: | |
| pass | |
| # Fallback: compound growth based on last 6 months CAGR | |
| growth = (y.iloc[-1] / max(y.iloc[-6], 1e-6)) ** (1/6) - 1 if len(y) > 6 else 0.0 | |
| last = y.iloc[-1] | |
| return np.array([max(last * ((1+growth) ** (i+1)), 0.0) for i in range(HORIZON_MONTHS)]) | |
| def baseline_forecast(df): | |
| rev_fc = fit_series(df["revenue"].values) | |
| cogs_fc = fit_series(df["cogs"].values) | |
| opex_fc = fit_series(df["opex"].values) | |
| start = (df["date"].iloc[-1] + pd.offsets.MonthBegin(1)).to_period("M").to_timestamp() | |
| idx = pd.date_range(start, periods=HORIZON_MONTHS, freq="MS") | |
| out = pd.DataFrame({"date": idx, "revenue": rev_fc, "cogs": cogs_fc, "opex": opex_fc}) | |
| return out | |
| # ---------------------- scenario overlay ---------------------- | |
| def apply_scenario(forecast, drivers): | |
| fc = forecast.copy() | |
| # Revenue growth % | |
| fc["revenue"] = fc["revenue"] * (1 + drivers.get("rev_growth_pct", 0.0)) | |
| # Gross margin bps adjustment -> change COGS accordingly | |
| gm_base = 1 - (fc["cogs"] / np.maximum(fc["revenue"], 1e-6)) | |
| gm_adj = gm_base + drivers.get("gm_bps", 0.0)/10000.0 | |
| gm_adj = np.clip(gm_adj, 0.0, 0.9) | |
| fc["cogs"] = fc["revenue"] * (1 - gm_adj) | |
| # Opex inflation | |
| fc["opex"] = fc["opex"] * (1 + drivers.get("opex_infl_pct", 0.0)) | |
| # FX headwind (% applied to revenue) | |
| fx = drivers.get("fx_pct", 0.0) | |
| fc["revenue"] = fc["revenue"] * (1 + fx) | |
| # Working capital via DSO/DPO/DIO | |
| dso = drivers.get("dso", 60) | |
| dpo = drivers.get("dpo", 45) | |
| dio = drivers.get("dio", 60) | |
| fc["ar"] = fc["revenue"] * (dso/30) | |
| fc["ap"] = fc["cogs"] * (dpo/30) | |
| fc["inv"] = fc["cogs"] * (dio/30) | |
| # P&L and cash | |
| fc["gp"] = fc["revenue"] - fc["cogs"] | |
| fc["ebitda"] = fc["gp"] - fc["opex"] | |
| dep = drivers.get("dep", 0.05) * fc["revenue"] | |
| fc["ebit"] = fc["ebitda"] - dep | |
| interest = drivers.get("interest", 0.01) * fc["revenue"] | |
| taxes = np.maximum(drivers.get("tax_rate", 0.25) * np.maximum(fc["ebit"] - interest, 0.0), 0.0) | |
| fc["ni"] = fc["ebit"] - interest - taxes | |
| # Non-cash = dep | |
| fc["noncash"] = dep | |
| # ΔWC month-over-month | |
| fc[["ar","ap","inv"]] = fc[["ar","ap","inv"]].fillna(0.0) | |
| fc["wc"] = fc["ar"] + fc["inv"] - fc["ap"] | |
| fc["delta_wc"] = fc["wc"].diff().fillna(fc["wc"]) # first month delta vs 0 | |
| fc["cash"] = fc["ni"] + fc["noncash"] - fc["delta_wc"] | |
| return fc | |
| # ---------------------- validators ---------------------- | |
| def check_bounds(drivers): | |
| bounds = { | |
| "rev_growth_pct": (-0.20, 0.30), | |
| "gm_bps": (-500, 500), | |
| "opex_infl_pct": (-0.10, 0.20), | |
| "fx_pct": (-0.10, 0.10), | |
| "dso": (30, 90), | |
| "dpo": (15, 90), | |
| "dio": (20, 120), | |
| } | |
| msgs = [] | |
| ok = True | |
| for k,(lo,hi) in bounds.items(): | |
| v = drivers.get(k) | |
| if v is None: continue | |
| if not (lo <= v <= hi): | |
| ok = False | |
| msgs.append(f"{k} out of bounds: {v} not in [{lo},{hi}]") | |
| return ok, msgs | |
| def check_conservation(df, tol=1e-6): | |
| lhs = df["cash"].sum() | |
| rhs = (df["ni"].sum() + df["noncash"].sum() - df["delta_wc"].sum()) | |
| return abs(lhs - rhs) <= tol | |
| def check_monotonicity(base_df, worst_df, best_df): | |
| # Check totals of EBITDA and Cash across horizon | |
| def total(col, d): return float(d[col].sum()) | |
| eb = { | |
| "worst": total("ebitda", worst_df), | |
| "base": total("ebitda", base_df), | |
| "best": total("ebitda", best_df), | |
| } | |
| ca = { | |
| "worst": total("cash", worst_df), | |
| "base": total("cash", base_df), | |
| "best": total("cash", best_df), | |
| } | |
| mono_eb = eb["worst"] <= eb["base"] <= eb["best"] | |
| mono_ca = ca["worst"] <= ca["base"] <= ca["best"] | |
| return mono_eb and mono_ca, eb, ca | |
| # ---------------------- plotting ---------------------- | |
| def plot_series(df, title): | |
| fig, ax = plt.subplots(2, 2, figsize=(10,6)) | |
| ax = ax.ravel() | |
| ax[0].plot(df["date"], df["revenue"], label="Revenue") | |
| ax[0].plot(df["date"], df["cogs"], label="COGS") | |
| ax[0].plot(df["date"], df["opex"], label="Opex") | |
| ax[0].legend(); ax[0].set_title("P&L") | |
| ax[1].plot(df["date"], df["ebitda"], color="green", label="EBITDA") | |
| ax[1].legend(); ax[1].set_title("EBITDA") | |
| ax[2].plot(df["date"], df["cash"], color="purple", label="Cash Flow") | |
| ax[2].legend(); ax[2].set_title("Cash Flow") | |
| ax[3].plot(df["date"], df["wc"], color="orange", label="Working Capital") | |
| ax[3].legend(); ax[3].set_title("Working Capital") | |
| fig.suptitle(title) | |
| buf = io.BytesIO() | |
| plt.tight_layout() | |
| fig.savefig(buf, format="png"); plt.close(fig) | |
| return buf | |
| # ---------------------- Gradio UI ---------------------- | |
| def run_pipeline(file, scenario): | |
| df_hist = load_csv(file) | |
| fc = baseline_forecast(df_hist) | |
| # Define drivers per scenario | |
| drivers_base = {"rev_growth_pct":0.0, "gm_bps":0, "opex_infl_pct":0.0, "fx_pct":0.0, "dso":60, "dpo":45, "dio":60, "dep":0.05, "interest":0.01, "tax_rate":0.25} | |
| drivers_best = {**drivers_base, "rev_growth_pct":0.10, "gm_bps":200, "opex_infl_pct":-0.05, "fx_pct":0.02} | |
| drivers_worst = {**drivers_base, "rev_growth_pct":-0.10, "gm_bps":-200, "opex_infl_pct":0.10, "fx_pct":-0.03} | |
| if scenario == "Baseline": | |
| out = apply_scenario(fc, drivers_base) | |
| status = "Baseline"; | |
| buf = plot_series(out, "Baseline") | |
| csv_bytes = out.to_csv(index=False).encode() | |
| return buf, csv_bytes, json.dumps(audit_row(drivers_base, status)) | |
| # Base/Best/Worst comparison + validations | |
| base_df = apply_scenario(fc, drivers_base) | |
| best_df = apply_scenario(fc, drivers_best) | |
| worst_df = apply_scenario(fc, drivers_worst) | |
| # Bounds check for all | |
| ok_b_base, _ = check_bounds(drivers_base) | |
| ok_b_best, _ = check_bounds(drivers_best) | |
| ok_b_worst, _ = check_bounds(drivers_worst) | |
| ok_cons_base = check_conservation(base_df) | |
| ok_cons_best = check_conservation(best_df) | |
| ok_cons_worst = check_conservation(worst_df) | |
| ok_mono, eb, ca = check_monotonicity(base_df, worst_df, best_df) | |
| status = { | |
| "bounds": ok_b_base and ok_b_best and ok_b_worst, | |
| "conservation": ok_cons_base and ok_cons_best and ok_cons_worst, | |
| "monotonicity": ok_mono, | |
| } | |
| # choose which to show | |
| out = base_df if scenario == "Base" else best_df if scenario == "Best" else worst_df | |
| buf = plot_series(out, f"Scenario: {scenario}") | |
| meta = { | |
| "scenario": scenario, | |
| "monotonicity_totals": {"EBITDA": eb, "Cash": ca}, | |
| "status": status, | |
| } | |
| csv_bytes = out.to_csv(index=False).encode() | |
| return buf, csv_bytes, json.dumps(meta) | |
| with gr.Blocks(title="Corporate Forecast Agent") as demo: | |
| gr.Markdown("## Corporate Forecast Agent — Best / Base / Worst") | |
| gr.Markdown("Upload CSV with columns: date,revenue,cogs,opex") | |
| file = gr.File(file_types=[".csv"], label="Upload CSV") | |
| scenario = gr.Radio(["Baseline","Best","Base","Worst"], value="Baseline", label="Scenario") | |
| btn = gr.Button("Run") | |
| img = gr.Image(type="numpy", label="Plots") | |
| dl = gr.File(label="Download forecast.csv") | |
| meta = gr.JSON(label="Validation & Audit") | |
| def _run(file_obj, scen): | |
| if file_obj is None: | |
| raise gr.Error("Please upload a CSV first.") | |
| plot_buf, csv_bytes, meta_json = run_pipeline(file_obj.name, scen) | |
| return plot_buf.getvalue(), ("forecast.csv", csv_bytes), meta_json | |
| btn.click(_run, inputs=[file, scenario], outputs=[img, dl, meta]) | |
| if __name__ == "__main__": | |
| demo.launch() | |