"""Slipstream - a gr.Server backend (data API) powering a custom Preact SPA. Two deployment modes via SLIPSTREAM_MODE: - "llamacpp" (Space A): the distilled MiniCPM-1B agent runs server-side on llama.cpp. - "webgpu" (Space B): the agent runs in the user's browser via transformers.js/WebGPU; the backend just serves the TimesFM 2.5 + Earned-Value tools the browser agent calls. Endpoints return DATA (series, metrics, a tool-by-tool reasoning trace), not images - the SPA renders the fan chart with Highcharts. Still a Gradio app (gradio.Server). Run: `python app.py`. """ from __future__ import annotations import json import os if os.environ.get("SLIPSTREAM_OFFLINE") == "1": os.environ.setdefault("HF_HUB_OFFLINE", "1") os.environ.setdefault("TRANSFORMERS_OFFLINE", "1") import numpy as np from fastapi.responses import FileResponse, HTMLResponse from gradio import Server from src import baselines, evm, forecasting, local_llm, synthetic FCL = 128 HERE = os.path.dirname(os.path.abspath(__file__)) MODE = os.environ.get("SLIPSTREAM_MODE", "llamacpp") # "llamacpp" | "webgpu" ONNX_REPO = os.environ.get("SLIPSTREAM_ONNX_REPO", "ashaibani/slipstream-minicpm5-1b-onnx") # --------------------------------------------------------------------------- # # Samples # --------------------------------------------------------------------------- # def _build_samples(): out = {} for name, p in synthetic.sample_library().items(): out[f"Synthetic · {name}"] = p if os.path.isdir(os.path.join("data", "DSLIB", "Excel")): try: from src import dslib reals = dslib.load_dslib(min_periods=14) def _wf(p): n = len(p.ev) zf = float(np.mean(np.diff(p.ev) <= 0)) return (16 <= n <= 40 and p.bac > 0 and zf < 0.35 and p.ev[int(n * 0.4)] / p.bac > 0.08 and p.ev[int(n * 0.6)] / p.bac > 0.30) for p in sorted((p for p in reals if _wf(p)), key=lambda p: abs(len(p.ev) - 26))[:8]: out[f"Real (DSLIB) · {p.name}"] = p except Exception: pass return out SAMPLES = _build_samples() DEFAULT_SAMPLE = next((k for k in SAMPLES if k.startswith("Real")), next(iter(SAMPLES))) # The server-side llama.cpp agent is available in BOTH modes (in webgpu mode it's the fallback when # the in-browser model can't load - the small-quant in-browser path is experimental, see report). AGENT_OK = local_llm.is_available() ENGINE_AGENT = "MiniCPM-1B agent" # in-browser (webgpu) or server-side (llamacpp) ENGINE_TFM = "TimesFM 2.5" DIRECT = {ENGINE_TFM: lambda o, h, **k: forecasting.timesfm_forecast(o, h, device="cpu", forecast_context_len=FCL, **k), "Curve fit": baselines.logistic, "Last value": baselines.last_value} ENGINES = ([ENGINE_AGENT] if AGENT_OK else []) + list(DIRECT) def _project_from_name(name): return SAMPLES.get(name, SAMPLES[DEFAULT_SAMPLE]) def _finish(cum, bac, start): hit = cum >= 0.999 * bac return start + int(np.argmax(hit)) if hit.any() else None def _norm_cdf(z): from math import erf, sqrt return 0.5 * (1 + erf(z / sqrt(2))) def _money(x): return None if x is None or not np.isfinite(x) else round(float(x)) def _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth): """All arrays the Highcharts fan chart needs (rounded; cumulative £).""" r = lambda a: [round(float(x)) for x in a] # noqa: E731 return { "period": list(range(1, n + 1)), "pv": r(project.pv), "ev": r(project.ev[:k]), "ac": r(project.ac[:k]), "ev_heldout": (r(project.ev[k - 1:]) if has_truth and n > k else None), "heldout_start": k, "fper": [int(x) for x in fper], "ev_p10": r(cum_lo), "ev_p50": r(cum_q50), "ev_p90": r(cum_hi), "ac_fore": r(ac_fore), "bac": round(float(project.bac)), "planned_finish": int(project.planned_finish), "observed_k": k, "n": n, } def _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, eac_lo, eac_hi, overrun, p_overrun, proj_end, engine): pf = project.planned_finish f = lambda x: (f"period {x}" if x else f"beyond {proj_end}") # noqa: E731 late = fin_likely is None or (fin_likely - pf) > 0.15 * pf risk = ("HIGH" if late or overrun > 0.10 else "MEDIUM" if (fin_likely and fin_likely > pf) or overrun > 0.02 else "LOW") return { "name": project.name, "observed_pct": round(ctx_pct), "earned_pct": round(status["pct_complete"] * 100), "spi": round(status["spi"], 2), "cpi": round(status["cpi"], 2), "finish_likely": fin_likely, "finish_baseline": pf, "slip": (fin_likely - pf if fin_likely is not None else None), "finish_range": [fin_early, fin_late], "eac": _money(eac_med), "eac_range": [_money(eac_lo), _money(eac_hi)], "bac": round(float(project.bac)), "overrun_pct": round(overrun * 100), "p_overrun_pct": round(p_overrun * 100), "risk": risk, "engine": engine, "finish_str": f(fin_likely), "early_str": f(fin_early), "late_str": f(fin_late), } def _summary(project, k, status, fin_likely, eac_med, overrun, p_overrun): return {"pct_complete": status["pct_complete"], "spi": status["spi"], "cpi": status["cpi"], "finish": float(fin_likely) if fin_likely else float(project.planned_finish), "planned": project.planned_finish, "eac": eac_med, "bac": project.bac, "overrun": overrun, "p_overrun": p_overrun} # --------------------------------------------------------------------------- # # Direct forecast (TimesFM / baselines) # --------------------------------------------------------------------------- # def _direct(project, k, n, ctx_pct, engine): ev_inc = evm.to_increments(project.ev) horizon = min(120, max(8, n - k, int(project.planned_finish * 1.8) - k)) try: fc = DIRECT[engine](ev_inc[:k], horizon, bac=project.bac, planned_periods=project.planned_finish) except Exception: fc = baselines.logistic(ev_inc[:k], horizon, bac=project.bac) last, bac = float(project.ev[:k][-1]), project.bac cum_q50 = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q50"])]), bac) cum_lo = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q10"])]), bac) cum_hi = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q90"])]), bac) fper = np.arange(k, k + horizon + 1) status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], bac) cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0 ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi]) fin_likely, fin_late, fin_early = (_finish(c, bac, k) for c in (cum_q50, cum_lo, cum_hi)) eacs = {m: v for m, v in evm.all_eacs(project.pv[:k], project.ev[:k], project.ac[:k], bac).items() if np.isfinite(v)} vals = list(eacs.values()) or [bac] eac_med = float(np.median(vals)) overrun = eac_med / bac - 1.0 p_over = 1.0 - _norm_cdf((1.1 * bac - eac_med) / max(float(np.std(vals)) or eac_med * .05, 1e-9)) series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True) readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, min(vals), max(vals), overrun, p_over, k + len(fper) - 1, engine) trace = [ {"tool": "evm_metrics", "input": {}, "output": { "SPI": round(status["spi"], 3), "CPI": round(status["cpi"], 3), "EAC formulas (£)": {m: _money(v) for m, v in eacs.items()}}}, {"tool": "forecast_series", "input": {"which": "ev", "horizon": int(horizon)}, "output": {"P50 reaches BAC": (f"period {fin_likely}" if fin_likely else "not within horizon")}}, ] return {"series": series, "readout": readout, "trace": trace, "summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over)} # --------------------------------------------------------------------------- # # Server-side agent (llama.cpp) - extracts a tool-by-tool trace from the loop # --------------------------------------------------------------------------- # def _paths_from_trace(messages, project, k): ev_paths, horizon = None, None for i, m in enumerate(messages): if m.get("role") == "assistant" and m.get("tool_calls"): for tc in m["tool_calls"]: fn = tc["function"] args = fn["arguments"] if isinstance(fn["arguments"], dict) else json.loads(fn["arguments"] or "{}") if fn["name"] == "forecast_series" and str(args.get("which")) == "ev": for tm in messages[i + 1:]: if tm.get("role") == "tool": try: res = json.loads(tm["content"]) if "cumulative_paths" in res: ev_paths, horizon = res["cumulative_paths"], res.get("horizon") except Exception: pass break last = float(project.ev[:k][-1]) if not ev_paths: h = max(8, project.planned_finish - k) fper = np.arange(k, k + h + 1) flat = np.full(len(fper), last) return flat, flat, flat, fper h = horizon or len(ev_paths["q50"]) fper = np.arange(k, k + h + 1) pad = lambda a: np.minimum(np.concatenate([[last], np.array(a, float)])[:len(fper)], project.bac) # noqa: E731 return pad(ev_paths["q10"]), pad(ev_paths["q50"]), pad(ev_paths["q90"]), fper def trace_steps(messages): """Pair each tool call (assistant) with its result (tool message) -> [{tool, input, output}].""" steps, pending = [], {} for m in messages: if m.get("role") == "assistant" and m.get("tool_calls"): for tc in m["tool_calls"]: fn = tc["function"] args = fn["arguments"] if isinstance(fn["arguments"], dict) else json.loads(fn["arguments"] or "{}") step = {"tool": fn["name"], "input": args, "output": None} steps.append(step) pending[tc.get("id") or fn["name"]] = step elif m.get("role") == "tool": try: out = json.loads(m["content"]) except Exception: out = m.get("content") tid = m.get("tool_call_id") (pending.get(tid) or (steps[-1] if steps else {})).update({"output": out}) return steps def _agent(project, k, n, ctx_pct): from src import agent_forecaster as af client = local_llm.make_client() r = af.agent_forecast(project, k, client=client, model="minicpm", max_iters=8, temperature=0.2, return_trace=True) fc = r["forecast"] if not fc: return _direct(project, k, n, ctx_pct, ENGINE_TFM) cum_lo, cum_q50, cum_hi, fper = _paths_from_trace(r["messages"], project, k) status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], project.bac) fin_likely = int(round(float(fc.get("finish_period", project.planned_finish)))) eac_med = float(fc.get("eac", project.bac)) p_over = float(fc.get("p_overrun", 0.0)) overrun = eac_med / project.bac - 1.0 fin_early = _finish(cum_hi, project.bac, k) or fin_likely fin_late = _finish(cum_lo, project.bac, k) or fin_likely cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0 ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi]) eac_lo, eac_hi = min(eac_med, eac_med * (1 - abs(overrun) * .3)), max(eac_med, eac_med * (1 + abs(overrun) * .3)) series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True) readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, eac_lo, eac_hi, overrun, p_over, k + len(fper) - 1, ENGINE_AGENT) trace = trace_steps(r["messages"]) return {"series": series, "readout": readout, "trace": trace, "summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over), "meta": {"generations": r["n_api_calls"], "tokens": r["usage"].get("completion_tokens", 0)}} # --------------------------------------------------------------------------- # # gr.Server + endpoints # --------------------------------------------------------------------------- # app = Server(title="Slipstream") @app.api(name="config") def config() -> dict: return {"mode": MODE, "engines": ENGINES, "default_project": DEFAULT_SAMPLE, "onnx_repo": ONNX_REPO, "agent_available": AGENT_OK} @app.api(name="projects") def projects() -> dict: out = [{"name": nm, "kind": ("real" if nm.startswith("Real") else "synthetic"), "n": int(len(p.ev)), "planned_finish": int(p.planned_finish)} for nm, p in SAMPLES.items()] return {"projects": out} @app.api(name="project_series") def project_series(name: str) -> dict: p = _project_from_name(name) return {"name": p.name, "n": int(len(p.ev)), "bac": float(p.bac), "planned_finish": int(p.planned_finish), "pv": [float(x) for x in p.pv], "ev": [float(x) for x in p.ev], "ac": [float(x) for x in p.ac]} @app.api(name="evm_metrics") def evm_metrics(pv: list, ev: list, ac: list, bac: float, planned_finish: int, k: int) -> dict: pv, ev, ac = np.array(pv[:k], float), np.array(ev[:k], float), np.array(ac[:k], float) s = evm.latest(pv, ev, ac, bac) return {"SPI": round(s["spi"], 3), "CPI": round(s["cpi"], 3), "EAC_formulas": {m: round(v) for m, v in evm.all_eacs(pv, ev, ac, bac).items() if np.isfinite(v)}, "earned_schedule_finish_period": round(float(evm.forecast_finish(pv, ev, planned_finish)), 1)} @app.api(name="forecast_series") def forecast_series(which: str, horizon: int, ev: list, ac: list, bac: float, k: int) -> dict: cum = np.array((ev if which == "ev" else ac)[:k], float) fc = forecasting.timesfm_forecast(evm.to_increments(cum), int(horizon), device="cpu", forecast_context_len=FCL, bac=bac) last = float(cum[-1]) paths, out = {}, {"horizon": int(horizon)} for q in ("q10", "q50", "q90"): c = last + np.cumsum(fc[q]) if which == "ev": c = np.minimum(c, bac) paths[q] = [round(float(x)) for x in c] out["cumulative_paths"] = paths if which == "ev": out["reaches_bac_period"] = { q: (k + int(np.argmax(np.array(paths[q]) >= 0.999 * bac)) + 1) if (np.array(paths[q]) >= 0.999 * bac).any() else None for q in ("q10", "q50", "q90")} return out @app.api(name="assemble") def assemble(project_name: str, ctx_pct: float, finish_period: float, eac: float, p_overrun: float, ev_q10: list, ev_q50: list, ev_q90: list, horizon: int) -> dict: """Build the chart series + readout from the in-browser agent's forecast (keeps the EVM math server-side / DRY). Called by browser-agent.js after the in-browser MiniCPM finishes.""" project = _project_from_name(project_name) n = len(project.ev) k = max(4, int(n * ctx_pct / 100.0)) last = float(project.ev[:k][-1]) fper = np.arange(k, k + int(horizon) + 1) pad = lambda a: np.minimum(np.concatenate([[last], np.array(a, float)])[:len(fper)], project.bac) # noqa: E731 cum_lo, cum_q50, cum_hi = pad(ev_q10), pad(ev_q50), pad(ev_q90) status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], project.bac) fin_likely = int(round(float(finish_period))) eac_med, p_over = float(eac), float(p_overrun) overrun = eac_med / project.bac - 1.0 fin_early = _finish(cum_hi, project.bac, k) or fin_likely fin_late = _finish(cum_lo, project.bac, k) or fin_likely cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0 ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi]) eac_lo, eac_hi = min(eac_med, eac_med * (1 - abs(overrun) * .3)), max(eac_med, eac_med * (1 + abs(overrun) * .3)) series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True) readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, eac_lo, eac_hi, overrun, p_over, k + len(fper) - 1, ENGINE_AGENT) return {"series": series, "readout": readout, "summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over)} @app.api(name="forecast", concurrency_limit=2, time_limit=900) def forecast(project_name: str, ctx_pct: float, engine: str) -> dict: """Streaming: heartbeat every ~1.5s (keeps the SSE alive on slow CPU) then the final result.""" import threading import time box = {} def run(): try: project = _project_from_name(project_name) n = len(project.ev) k = max(4, int(n * ctx_pct / 100.0)) if engine == ENGINE_AGENT and local_llm.is_available(): # server agent (both modes; webgpu fallback) box["r"] = _agent(project, k, n, ctx_pct) else: box["r"] = _direct(project, k, n, ctx_pct, engine) except Exception as e: # noqa: BLE001 box["err"] = f"{type(e).__name__}: {e}" th = threading.Thread(target=run, daemon=True) th.start() t0 = time.time() while th.is_alive(): time.sleep(1.5) yield {"status": f"{engine} · {int(time.time() - t0)}s"} th.join() if "err" in box: yield {"error": box["err"]} return yield {"done": True, **box["r"]} @app.get("/", response_class=HTMLResponse) async def homepage(): with open(os.path.join(HERE, "frontend", "index.html"), encoding="utf-8") as f: html = f.read() return html.replace("__SLIPSTREAM_MODE__", MODE).replace("__ONNX_REPO__", ONNX_REPO) @app.get("/browser-agent.js") async def browser_agent_js(): return FileResponse(os.path.join(HERE, "frontend", "browser-agent.js"), media_type="application/javascript") @app.on_event("startup") def _warm(): if os.environ.get("SLIPSTREAM_PREFETCH", "1") != "1": return try: if local_llm.is_available(): # warm the server agent in both modes local_llm.make_client() forecasting.timesfm_forecast(np.arange(1, 13, dtype=float), 8, device="cpu", forecast_context_len=FCL) print("[warm] models resident", flush=True) except Exception as e: print("[warm] skipped:", e, flush=True) demo = app if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)