Spaces:
Running
Running
| """Slipstream - a gr.Server backend (data API) powering a custom Preact SPA. | |
| Two deployment modes via SLIPSTREAM_MODE: | |
| - "llamacpp" (Space A): the distilled MiniCPM-1B agent runs server-side on llama.cpp. | |
| - "webgpu" (Space B): the agent runs in the user's browser via transformers.js/WebGPU; the | |
| backend just serves the TimesFM 2.5 + Earned-Value tools the browser agent calls. | |
| Endpoints return DATA (series, metrics, a tool-by-tool reasoning trace), not images - the SPA | |
| renders the fan chart with Highcharts. Still a Gradio app (gradio.Server). Run: `python app.py`. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| if os.environ.get("SLIPSTREAM_OFFLINE") == "1": | |
| os.environ.setdefault("HF_HUB_OFFLINE", "1") | |
| os.environ.setdefault("TRANSFORMERS_OFFLINE", "1") | |
| import numpy as np | |
| from fastapi.responses import FileResponse, HTMLResponse | |
| from gradio import Server | |
| from src import baselines, evm, forecasting, local_llm, synthetic | |
| FCL = 128 | |
| HERE = os.path.dirname(os.path.abspath(__file__)) | |
| MODE = os.environ.get("SLIPSTREAM_MODE", "llamacpp") # "llamacpp" | "webgpu" | |
| ONNX_REPO = os.environ.get("SLIPSTREAM_ONNX_REPO", "ashaibani/slipstream-minicpm5-1b-onnx") | |
| # --------------------------------------------------------------------------- # | |
| # Samples | |
| # --------------------------------------------------------------------------- # | |
| def _build_samples(): | |
| out = {} | |
| for name, p in synthetic.sample_library().items(): | |
| out[f"Synthetic · {name}"] = p | |
| if os.path.isdir(os.path.join("data", "DSLIB", "Excel")): | |
| try: | |
| from src import dslib | |
| reals = dslib.load_dslib(min_periods=14) | |
| def _wf(p): | |
| n = len(p.ev) | |
| zf = float(np.mean(np.diff(p.ev) <= 0)) | |
| return (16 <= n <= 40 and p.bac > 0 and zf < 0.35 | |
| and p.ev[int(n * 0.4)] / p.bac > 0.08 and p.ev[int(n * 0.6)] / p.bac > 0.30) | |
| for p in sorted((p for p in reals if _wf(p)), key=lambda p: abs(len(p.ev) - 26))[:8]: | |
| out[f"Real (DSLIB) · {p.name}"] = p | |
| except Exception: | |
| pass | |
| return out | |
| SAMPLES = _build_samples() | |
| DEFAULT_SAMPLE = next((k for k in SAMPLES if k.startswith("Real")), next(iter(SAMPLES))) | |
| # The server-side llama.cpp agent is available in BOTH modes (in webgpu mode it's the fallback when | |
| # the in-browser model can't load - the small-quant in-browser path is experimental, see report). | |
| AGENT_OK = local_llm.is_available() | |
| ENGINE_AGENT = "MiniCPM-1B agent" # in-browser (webgpu) or server-side (llamacpp) | |
| ENGINE_TFM = "TimesFM 2.5" | |
| DIRECT = {ENGINE_TFM: lambda o, h, **k: forecasting.timesfm_forecast(o, h, device="cpu", forecast_context_len=FCL, **k), | |
| "Curve fit": baselines.logistic, "Last value": baselines.last_value} | |
| ENGINES = ([ENGINE_AGENT] if AGENT_OK else []) + list(DIRECT) | |
| def _project_from_name(name): | |
| return SAMPLES.get(name, SAMPLES[DEFAULT_SAMPLE]) | |
| def _finish(cum, bac, start): | |
| hit = cum >= 0.999 * bac | |
| return start + int(np.argmax(hit)) if hit.any() else None | |
| def _norm_cdf(z): | |
| from math import erf, sqrt | |
| return 0.5 * (1 + erf(z / sqrt(2))) | |
| def _money(x): | |
| return None if x is None or not np.isfinite(x) else round(float(x)) | |
| def _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth): | |
| """All arrays the Highcharts fan chart needs (rounded; cumulative £).""" | |
| r = lambda a: [round(float(x)) for x in a] # noqa: E731 | |
| return { | |
| "period": list(range(1, n + 1)), | |
| "pv": r(project.pv), "ev": r(project.ev[:k]), "ac": r(project.ac[:k]), | |
| "ev_heldout": (r(project.ev[k - 1:]) if has_truth and n > k else None), | |
| "heldout_start": k, | |
| "fper": [int(x) for x in fper], | |
| "ev_p10": r(cum_lo), "ev_p50": r(cum_q50), "ev_p90": r(cum_hi), | |
| "ac_fore": r(ac_fore), | |
| "bac": round(float(project.bac)), "planned_finish": int(project.planned_finish), | |
| "observed_k": k, "n": n, | |
| } | |
| def _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, eac_lo, eac_hi, | |
| overrun, p_overrun, proj_end, engine): | |
| pf = project.planned_finish | |
| f = lambda x: (f"period {x}" if x else f"beyond {proj_end}") # noqa: E731 | |
| late = fin_likely is None or (fin_likely - pf) > 0.15 * pf | |
| risk = ("HIGH" if late or overrun > 0.10 else | |
| "MEDIUM" if (fin_likely and fin_likely > pf) or overrun > 0.02 else "LOW") | |
| return { | |
| "name": project.name, | |
| "observed_pct": round(ctx_pct), "earned_pct": round(status["pct_complete"] * 100), | |
| "spi": round(status["spi"], 2), "cpi": round(status["cpi"], 2), | |
| "finish_likely": fin_likely, "finish_baseline": pf, | |
| "slip": (fin_likely - pf if fin_likely is not None else None), | |
| "finish_range": [fin_early, fin_late], | |
| "eac": _money(eac_med), "eac_range": [_money(eac_lo), _money(eac_hi)], | |
| "bac": round(float(project.bac)), "overrun_pct": round(overrun * 100), | |
| "p_overrun_pct": round(p_overrun * 100), "risk": risk, "engine": engine, | |
| "finish_str": f(fin_likely), "early_str": f(fin_early), "late_str": f(fin_late), | |
| } | |
| def _summary(project, k, status, fin_likely, eac_med, overrun, p_overrun): | |
| return {"pct_complete": status["pct_complete"], "spi": status["spi"], "cpi": status["cpi"], | |
| "finish": float(fin_likely) if fin_likely else float(project.planned_finish), | |
| "planned": project.planned_finish, "eac": eac_med, "bac": project.bac, | |
| "overrun": overrun, "p_overrun": p_overrun} | |
| # --------------------------------------------------------------------------- # | |
| # Direct forecast (TimesFM / baselines) | |
| # --------------------------------------------------------------------------- # | |
| def _direct(project, k, n, ctx_pct, engine): | |
| ev_inc = evm.to_increments(project.ev) | |
| horizon = min(120, max(8, n - k, int(project.planned_finish * 1.8) - k)) | |
| try: | |
| fc = DIRECT[engine](ev_inc[:k], horizon, bac=project.bac, planned_periods=project.planned_finish) | |
| except Exception: | |
| fc = baselines.logistic(ev_inc[:k], horizon, bac=project.bac) | |
| last, bac = float(project.ev[:k][-1]), project.bac | |
| cum_q50 = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q50"])]), bac) | |
| cum_lo = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q10"])]), bac) | |
| cum_hi = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q90"])]), bac) | |
| fper = np.arange(k, k + horizon + 1) | |
| status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], bac) | |
| cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0 | |
| ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi]) | |
| fin_likely, fin_late, fin_early = (_finish(c, bac, k) for c in (cum_q50, cum_lo, cum_hi)) | |
| eacs = {m: v for m, v in evm.all_eacs(project.pv[:k], project.ev[:k], project.ac[:k], bac).items() if np.isfinite(v)} | |
| vals = list(eacs.values()) or [bac] | |
| eac_med = float(np.median(vals)) | |
| overrun = eac_med / bac - 1.0 | |
| p_over = 1.0 - _norm_cdf((1.1 * bac - eac_med) / max(float(np.std(vals)) or eac_med * .05, 1e-9)) | |
| series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True) | |
| readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, | |
| min(vals), max(vals), overrun, p_over, k + len(fper) - 1, engine) | |
| trace = [ | |
| {"tool": "evm_metrics", "input": {}, "output": { | |
| "SPI": round(status["spi"], 3), "CPI": round(status["cpi"], 3), | |
| "EAC formulas (£)": {m: _money(v) for m, v in eacs.items()}}}, | |
| {"tool": "forecast_series", "input": {"which": "ev", "horizon": int(horizon)}, | |
| "output": {"P50 reaches BAC": (f"period {fin_likely}" if fin_likely else "not within horizon")}}, | |
| ] | |
| return {"series": series, "readout": readout, "trace": trace, | |
| "summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over)} | |
| # --------------------------------------------------------------------------- # | |
| # Server-side agent (llama.cpp) - extracts a tool-by-tool trace from the loop | |
| # --------------------------------------------------------------------------- # | |
| def _paths_from_trace(messages, project, k): | |
| ev_paths, horizon = None, None | |
| for i, m in enumerate(messages): | |
| if m.get("role") == "assistant" and m.get("tool_calls"): | |
| for tc in m["tool_calls"]: | |
| fn = tc["function"] | |
| args = fn["arguments"] if isinstance(fn["arguments"], dict) else json.loads(fn["arguments"] or "{}") | |
| if fn["name"] == "forecast_series" and str(args.get("which")) == "ev": | |
| for tm in messages[i + 1:]: | |
| if tm.get("role") == "tool": | |
| try: | |
| res = json.loads(tm["content"]) | |
| if "cumulative_paths" in res: | |
| ev_paths, horizon = res["cumulative_paths"], res.get("horizon") | |
| except Exception: | |
| pass | |
| break | |
| last = float(project.ev[:k][-1]) | |
| if not ev_paths: | |
| h = max(8, project.planned_finish - k) | |
| fper = np.arange(k, k + h + 1) | |
| flat = np.full(len(fper), last) | |
| return flat, flat, flat, fper | |
| h = horizon or len(ev_paths["q50"]) | |
| fper = np.arange(k, k + h + 1) | |
| pad = lambda a: np.minimum(np.concatenate([[last], np.array(a, float)])[:len(fper)], project.bac) # noqa: E731 | |
| return pad(ev_paths["q10"]), pad(ev_paths["q50"]), pad(ev_paths["q90"]), fper | |
| def trace_steps(messages): | |
| """Pair each tool call (assistant) with its result (tool message) -> [{tool, input, output}].""" | |
| steps, pending = [], {} | |
| for m in messages: | |
| if m.get("role") == "assistant" and m.get("tool_calls"): | |
| for tc in m["tool_calls"]: | |
| fn = tc["function"] | |
| args = fn["arguments"] if isinstance(fn["arguments"], dict) else json.loads(fn["arguments"] or "{}") | |
| step = {"tool": fn["name"], "input": args, "output": None} | |
| steps.append(step) | |
| pending[tc.get("id") or fn["name"]] = step | |
| elif m.get("role") == "tool": | |
| try: | |
| out = json.loads(m["content"]) | |
| except Exception: | |
| out = m.get("content") | |
| tid = m.get("tool_call_id") | |
| (pending.get(tid) or (steps[-1] if steps else {})).update({"output": out}) | |
| return steps | |
| def _agent(project, k, n, ctx_pct): | |
| from src import agent_forecaster as af | |
| client = local_llm.make_client() | |
| r = af.agent_forecast(project, k, client=client, model="minicpm", max_iters=8, temperature=0.2, return_trace=True) | |
| fc = r["forecast"] | |
| if not fc: | |
| return _direct(project, k, n, ctx_pct, ENGINE_TFM) | |
| cum_lo, cum_q50, cum_hi, fper = _paths_from_trace(r["messages"], project, k) | |
| status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], project.bac) | |
| fin_likely = int(round(float(fc.get("finish_period", project.planned_finish)))) | |
| eac_med = float(fc.get("eac", project.bac)) | |
| p_over = float(fc.get("p_overrun", 0.0)) | |
| overrun = eac_med / project.bac - 1.0 | |
| fin_early = _finish(cum_hi, project.bac, k) or fin_likely | |
| fin_late = _finish(cum_lo, project.bac, k) or fin_likely | |
| cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0 | |
| ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi]) | |
| eac_lo, eac_hi = min(eac_med, eac_med * (1 - abs(overrun) * .3)), max(eac_med, eac_med * (1 + abs(overrun) * .3)) | |
| series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True) | |
| readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, | |
| eac_lo, eac_hi, overrun, p_over, k + len(fper) - 1, ENGINE_AGENT) | |
| trace = trace_steps(r["messages"]) | |
| return {"series": series, "readout": readout, "trace": trace, | |
| "summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over), | |
| "meta": {"generations": r["n_api_calls"], "tokens": r["usage"].get("completion_tokens", 0)}} | |
| # --------------------------------------------------------------------------- # | |
| # gr.Server + endpoints | |
| # --------------------------------------------------------------------------- # | |
| app = Server(title="Slipstream") | |
| def config() -> dict: | |
| return {"mode": MODE, "engines": ENGINES, "default_project": DEFAULT_SAMPLE, | |
| "onnx_repo": ONNX_REPO, "agent_available": AGENT_OK} | |
| def projects() -> dict: | |
| out = [{"name": nm, "kind": ("real" if nm.startswith("Real") else "synthetic"), | |
| "n": int(len(p.ev)), "planned_finish": int(p.planned_finish)} for nm, p in SAMPLES.items()] | |
| return {"projects": out} | |
| def project_series(name: str) -> dict: | |
| p = _project_from_name(name) | |
| return {"name": p.name, "n": int(len(p.ev)), "bac": float(p.bac), "planned_finish": int(p.planned_finish), | |
| "pv": [float(x) for x in p.pv], "ev": [float(x) for x in p.ev], "ac": [float(x) for x in p.ac]} | |
| def evm_metrics(pv: list, ev: list, ac: list, bac: float, planned_finish: int, k: int) -> dict: | |
| pv, ev, ac = np.array(pv[:k], float), np.array(ev[:k], float), np.array(ac[:k], float) | |
| s = evm.latest(pv, ev, ac, bac) | |
| return {"SPI": round(s["spi"], 3), "CPI": round(s["cpi"], 3), | |
| "EAC_formulas": {m: round(v) for m, v in evm.all_eacs(pv, ev, ac, bac).items() if np.isfinite(v)}, | |
| "earned_schedule_finish_period": round(float(evm.forecast_finish(pv, ev, planned_finish)), 1)} | |
| def forecast_series(which: str, horizon: int, ev: list, ac: list, bac: float, k: int) -> dict: | |
| cum = np.array((ev if which == "ev" else ac)[:k], float) | |
| fc = forecasting.timesfm_forecast(evm.to_increments(cum), int(horizon), device="cpu", forecast_context_len=FCL, bac=bac) | |
| last = float(cum[-1]) | |
| paths, out = {}, {"horizon": int(horizon)} | |
| for q in ("q10", "q50", "q90"): | |
| c = last + np.cumsum(fc[q]) | |
| if which == "ev": | |
| c = np.minimum(c, bac) | |
| paths[q] = [round(float(x)) for x in c] | |
| out["cumulative_paths"] = paths | |
| if which == "ev": | |
| out["reaches_bac_period"] = { | |
| q: (k + int(np.argmax(np.array(paths[q]) >= 0.999 * bac)) + 1) | |
| if (np.array(paths[q]) >= 0.999 * bac).any() else None for q in ("q10", "q50", "q90")} | |
| return out | |
| def assemble(project_name: str, ctx_pct: float, finish_period: float, eac: float, p_overrun: float, | |
| ev_q10: list, ev_q50: list, ev_q90: list, horizon: int) -> dict: | |
| """Build the chart series + readout from the in-browser agent's forecast (keeps the EVM math | |
| server-side / DRY). Called by browser-agent.js after the in-browser MiniCPM finishes.""" | |
| project = _project_from_name(project_name) | |
| n = len(project.ev) | |
| k = max(4, int(n * ctx_pct / 100.0)) | |
| last = float(project.ev[:k][-1]) | |
| fper = np.arange(k, k + int(horizon) + 1) | |
| pad = lambda a: np.minimum(np.concatenate([[last], np.array(a, float)])[:len(fper)], project.bac) # noqa: E731 | |
| cum_lo, cum_q50, cum_hi = pad(ev_q10), pad(ev_q50), pad(ev_q90) | |
| status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], project.bac) | |
| fin_likely = int(round(float(finish_period))) | |
| eac_med, p_over = float(eac), float(p_overrun) | |
| overrun = eac_med / project.bac - 1.0 | |
| fin_early = _finish(cum_hi, project.bac, k) or fin_likely | |
| fin_late = _finish(cum_lo, project.bac, k) or fin_likely | |
| cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0 | |
| ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi]) | |
| eac_lo, eac_hi = min(eac_med, eac_med * (1 - abs(overrun) * .3)), max(eac_med, eac_med * (1 + abs(overrun) * .3)) | |
| series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True) | |
| readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, | |
| eac_lo, eac_hi, overrun, p_over, k + len(fper) - 1, ENGINE_AGENT) | |
| return {"series": series, "readout": readout, | |
| "summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over)} | |
| def forecast(project_name: str, ctx_pct: float, engine: str) -> dict: | |
| """Streaming: heartbeat every ~1.5s (keeps the SSE alive on slow CPU) then the final result.""" | |
| import threading | |
| import time | |
| box = {} | |
| def run(): | |
| try: | |
| project = _project_from_name(project_name) | |
| n = len(project.ev) | |
| k = max(4, int(n * ctx_pct / 100.0)) | |
| if engine == ENGINE_AGENT and local_llm.is_available(): # server agent (both modes; webgpu fallback) | |
| box["r"] = _agent(project, k, n, ctx_pct) | |
| else: | |
| box["r"] = _direct(project, k, n, ctx_pct, engine) | |
| except Exception as e: # noqa: BLE001 | |
| box["err"] = f"{type(e).__name__}: {e}" | |
| th = threading.Thread(target=run, daemon=True) | |
| th.start() | |
| t0 = time.time() | |
| while th.is_alive(): | |
| time.sleep(1.5) | |
| yield {"status": f"{engine} · {int(time.time() - t0)}s"} | |
| th.join() | |
| if "err" in box: | |
| yield {"error": box["err"]} | |
| return | |
| yield {"done": True, **box["r"]} | |
| async def homepage(): | |
| with open(os.path.join(HERE, "frontend", "index.html"), encoding="utf-8") as f: | |
| html = f.read() | |
| return html.replace("__SLIPSTREAM_MODE__", MODE).replace("__ONNX_REPO__", ONNX_REPO) | |
| async def browser_agent_js(): | |
| return FileResponse(os.path.join(HERE, "frontend", "browser-agent.js"), media_type="application/javascript") | |
| def _warm(): | |
| if os.environ.get("SLIPSTREAM_PREFETCH", "1") != "1": | |
| return | |
| try: | |
| if local_llm.is_available(): # warm the server agent in both modes | |
| local_llm.make_client() | |
| forecasting.timesfm_forecast(np.arange(1, 13, dtype=float), 8, device="cpu", forecast_context_len=FCL) | |
| print("[warm] models resident", flush=True) | |
| except Exception as e: | |
| print("[warm] skipped:", e, flush=True) | |
| demo = app | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) | |