ashaibani's picture
server agent in both modes; honest in-browser framing
0a3c9f9 verified
"""Slipstream - a gr.Server backend (data API) powering a custom Preact SPA.
Two deployment modes via SLIPSTREAM_MODE:
- "llamacpp" (Space A): the distilled MiniCPM-1B agent runs server-side on llama.cpp.
- "webgpu" (Space B): the agent runs in the user's browser via transformers.js/WebGPU; the
backend just serves the TimesFM 2.5 + Earned-Value tools the browser agent calls.
Endpoints return DATA (series, metrics, a tool-by-tool reasoning trace), not images - the SPA
renders the fan chart with Highcharts. Still a Gradio app (gradio.Server). Run: `python app.py`.
"""
from __future__ import annotations
import json
import os
if os.environ.get("SLIPSTREAM_OFFLINE") == "1":
os.environ.setdefault("HF_HUB_OFFLINE", "1")
os.environ.setdefault("TRANSFORMERS_OFFLINE", "1")
import numpy as np
from fastapi.responses import FileResponse, HTMLResponse
from gradio import Server
from src import baselines, evm, forecasting, local_llm, synthetic
FCL = 128
HERE = os.path.dirname(os.path.abspath(__file__))
MODE = os.environ.get("SLIPSTREAM_MODE", "llamacpp") # "llamacpp" | "webgpu"
ONNX_REPO = os.environ.get("SLIPSTREAM_ONNX_REPO", "ashaibani/slipstream-minicpm5-1b-onnx")
# --------------------------------------------------------------------------- #
# Samples
# --------------------------------------------------------------------------- #
def _build_samples():
out = {}
for name, p in synthetic.sample_library().items():
out[f"Synthetic · {name}"] = p
if os.path.isdir(os.path.join("data", "DSLIB", "Excel")):
try:
from src import dslib
reals = dslib.load_dslib(min_periods=14)
def _wf(p):
n = len(p.ev)
zf = float(np.mean(np.diff(p.ev) <= 0))
return (16 <= n <= 40 and p.bac > 0 and zf < 0.35
and p.ev[int(n * 0.4)] / p.bac > 0.08 and p.ev[int(n * 0.6)] / p.bac > 0.30)
for p in sorted((p for p in reals if _wf(p)), key=lambda p: abs(len(p.ev) - 26))[:8]:
out[f"Real (DSLIB) · {p.name}"] = p
except Exception:
pass
return out
SAMPLES = _build_samples()
DEFAULT_SAMPLE = next((k for k in SAMPLES if k.startswith("Real")), next(iter(SAMPLES)))
# The server-side llama.cpp agent is available in BOTH modes (in webgpu mode it's the fallback when
# the in-browser model can't load - the small-quant in-browser path is experimental, see report).
AGENT_OK = local_llm.is_available()
ENGINE_AGENT = "MiniCPM-1B agent" # in-browser (webgpu) or server-side (llamacpp)
ENGINE_TFM = "TimesFM 2.5"
DIRECT = {ENGINE_TFM: lambda o, h, **k: forecasting.timesfm_forecast(o, h, device="cpu", forecast_context_len=FCL, **k),
"Curve fit": baselines.logistic, "Last value": baselines.last_value}
ENGINES = ([ENGINE_AGENT] if AGENT_OK else []) + list(DIRECT)
def _project_from_name(name):
return SAMPLES.get(name, SAMPLES[DEFAULT_SAMPLE])
def _finish(cum, bac, start):
hit = cum >= 0.999 * bac
return start + int(np.argmax(hit)) if hit.any() else None
def _norm_cdf(z):
from math import erf, sqrt
return 0.5 * (1 + erf(z / sqrt(2)))
def _money(x):
return None if x is None or not np.isfinite(x) else round(float(x))
def _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth):
"""All arrays the Highcharts fan chart needs (rounded; cumulative £)."""
r = lambda a: [round(float(x)) for x in a] # noqa: E731
return {
"period": list(range(1, n + 1)),
"pv": r(project.pv), "ev": r(project.ev[:k]), "ac": r(project.ac[:k]),
"ev_heldout": (r(project.ev[k - 1:]) if has_truth and n > k else None),
"heldout_start": k,
"fper": [int(x) for x in fper],
"ev_p10": r(cum_lo), "ev_p50": r(cum_q50), "ev_p90": r(cum_hi),
"ac_fore": r(ac_fore),
"bac": round(float(project.bac)), "planned_finish": int(project.planned_finish),
"observed_k": k, "n": n,
}
def _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med, eac_lo, eac_hi,
overrun, p_overrun, proj_end, engine):
pf = project.planned_finish
f = lambda x: (f"period {x}" if x else f"beyond {proj_end}") # noqa: E731
late = fin_likely is None or (fin_likely - pf) > 0.15 * pf
risk = ("HIGH" if late or overrun > 0.10 else
"MEDIUM" if (fin_likely and fin_likely > pf) or overrun > 0.02 else "LOW")
return {
"name": project.name,
"observed_pct": round(ctx_pct), "earned_pct": round(status["pct_complete"] * 100),
"spi": round(status["spi"], 2), "cpi": round(status["cpi"], 2),
"finish_likely": fin_likely, "finish_baseline": pf,
"slip": (fin_likely - pf if fin_likely is not None else None),
"finish_range": [fin_early, fin_late],
"eac": _money(eac_med), "eac_range": [_money(eac_lo), _money(eac_hi)],
"bac": round(float(project.bac)), "overrun_pct": round(overrun * 100),
"p_overrun_pct": round(p_overrun * 100), "risk": risk, "engine": engine,
"finish_str": f(fin_likely), "early_str": f(fin_early), "late_str": f(fin_late),
}
def _summary(project, k, status, fin_likely, eac_med, overrun, p_overrun):
return {"pct_complete": status["pct_complete"], "spi": status["spi"], "cpi": status["cpi"],
"finish": float(fin_likely) if fin_likely else float(project.planned_finish),
"planned": project.planned_finish, "eac": eac_med, "bac": project.bac,
"overrun": overrun, "p_overrun": p_overrun}
# --------------------------------------------------------------------------- #
# Direct forecast (TimesFM / baselines)
# --------------------------------------------------------------------------- #
def _direct(project, k, n, ctx_pct, engine):
ev_inc = evm.to_increments(project.ev)
horizon = min(120, max(8, n - k, int(project.planned_finish * 1.8) - k))
try:
fc = DIRECT[engine](ev_inc[:k], horizon, bac=project.bac, planned_periods=project.planned_finish)
except Exception:
fc = baselines.logistic(ev_inc[:k], horizon, bac=project.bac)
last, bac = float(project.ev[:k][-1]), project.bac
cum_q50 = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q50"])]), bac)
cum_lo = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q10"])]), bac)
cum_hi = np.minimum(np.concatenate([[last], last + np.cumsum(fc["q90"])]), bac)
fper = np.arange(k, k + horizon + 1)
status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], bac)
cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0
ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi])
fin_likely, fin_late, fin_early = (_finish(c, bac, k) for c in (cum_q50, cum_lo, cum_hi))
eacs = {m: v for m, v in evm.all_eacs(project.pv[:k], project.ev[:k], project.ac[:k], bac).items() if np.isfinite(v)}
vals = list(eacs.values()) or [bac]
eac_med = float(np.median(vals))
overrun = eac_med / bac - 1.0
p_over = 1.0 - _norm_cdf((1.1 * bac - eac_med) / max(float(np.std(vals)) or eac_med * .05, 1e-9))
series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True)
readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med,
min(vals), max(vals), overrun, p_over, k + len(fper) - 1, engine)
trace = [
{"tool": "evm_metrics", "input": {}, "output": {
"SPI": round(status["spi"], 3), "CPI": round(status["cpi"], 3),
"EAC formulas (£)": {m: _money(v) for m, v in eacs.items()}}},
{"tool": "forecast_series", "input": {"which": "ev", "horizon": int(horizon)},
"output": {"P50 reaches BAC": (f"period {fin_likely}" if fin_likely else "not within horizon")}},
]
return {"series": series, "readout": readout, "trace": trace,
"summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over)}
# --------------------------------------------------------------------------- #
# Server-side agent (llama.cpp) - extracts a tool-by-tool trace from the loop
# --------------------------------------------------------------------------- #
def _paths_from_trace(messages, project, k):
ev_paths, horizon = None, None
for i, m in enumerate(messages):
if m.get("role") == "assistant" and m.get("tool_calls"):
for tc in m["tool_calls"]:
fn = tc["function"]
args = fn["arguments"] if isinstance(fn["arguments"], dict) else json.loads(fn["arguments"] or "{}")
if fn["name"] == "forecast_series" and str(args.get("which")) == "ev":
for tm in messages[i + 1:]:
if tm.get("role") == "tool":
try:
res = json.loads(tm["content"])
if "cumulative_paths" in res:
ev_paths, horizon = res["cumulative_paths"], res.get("horizon")
except Exception:
pass
break
last = float(project.ev[:k][-1])
if not ev_paths:
h = max(8, project.planned_finish - k)
fper = np.arange(k, k + h + 1)
flat = np.full(len(fper), last)
return flat, flat, flat, fper
h = horizon or len(ev_paths["q50"])
fper = np.arange(k, k + h + 1)
pad = lambda a: np.minimum(np.concatenate([[last], np.array(a, float)])[:len(fper)], project.bac) # noqa: E731
return pad(ev_paths["q10"]), pad(ev_paths["q50"]), pad(ev_paths["q90"]), fper
def trace_steps(messages):
"""Pair each tool call (assistant) with its result (tool message) -> [{tool, input, output}]."""
steps, pending = [], {}
for m in messages:
if m.get("role") == "assistant" and m.get("tool_calls"):
for tc in m["tool_calls"]:
fn = tc["function"]
args = fn["arguments"] if isinstance(fn["arguments"], dict) else json.loads(fn["arguments"] or "{}")
step = {"tool": fn["name"], "input": args, "output": None}
steps.append(step)
pending[tc.get("id") or fn["name"]] = step
elif m.get("role") == "tool":
try:
out = json.loads(m["content"])
except Exception:
out = m.get("content")
tid = m.get("tool_call_id")
(pending.get(tid) or (steps[-1] if steps else {})).update({"output": out})
return steps
def _agent(project, k, n, ctx_pct):
from src import agent_forecaster as af
client = local_llm.make_client()
r = af.agent_forecast(project, k, client=client, model="minicpm", max_iters=8, temperature=0.2, return_trace=True)
fc = r["forecast"]
if not fc:
return _direct(project, k, n, ctx_pct, ENGINE_TFM)
cum_lo, cum_q50, cum_hi, fper = _paths_from_trace(r["messages"], project, k)
status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], project.bac)
fin_likely = int(round(float(fc.get("finish_period", project.planned_finish))))
eac_med = float(fc.get("eac", project.bac))
p_over = float(fc.get("p_overrun", 0.0))
overrun = eac_med / project.bac - 1.0
fin_early = _finish(cum_hi, project.bac, k) or fin_likely
fin_late = _finish(cum_lo, project.bac, k) or fin_likely
cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0
ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi])
eac_lo, eac_hi = min(eac_med, eac_med * (1 - abs(overrun) * .3)), max(eac_med, eac_med * (1 + abs(overrun) * .3))
series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True)
readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med,
eac_lo, eac_hi, overrun, p_over, k + len(fper) - 1, ENGINE_AGENT)
trace = trace_steps(r["messages"])
return {"series": series, "readout": readout, "trace": trace,
"summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over),
"meta": {"generations": r["n_api_calls"], "tokens": r["usage"].get("completion_tokens", 0)}}
# --------------------------------------------------------------------------- #
# gr.Server + endpoints
# --------------------------------------------------------------------------- #
app = Server(title="Slipstream")
@app.api(name="config")
def config() -> dict:
return {"mode": MODE, "engines": ENGINES, "default_project": DEFAULT_SAMPLE,
"onnx_repo": ONNX_REPO, "agent_available": AGENT_OK}
@app.api(name="projects")
def projects() -> dict:
out = [{"name": nm, "kind": ("real" if nm.startswith("Real") else "synthetic"),
"n": int(len(p.ev)), "planned_finish": int(p.planned_finish)} for nm, p in SAMPLES.items()]
return {"projects": out}
@app.api(name="project_series")
def project_series(name: str) -> dict:
p = _project_from_name(name)
return {"name": p.name, "n": int(len(p.ev)), "bac": float(p.bac), "planned_finish": int(p.planned_finish),
"pv": [float(x) for x in p.pv], "ev": [float(x) for x in p.ev], "ac": [float(x) for x in p.ac]}
@app.api(name="evm_metrics")
def evm_metrics(pv: list, ev: list, ac: list, bac: float, planned_finish: int, k: int) -> dict:
pv, ev, ac = np.array(pv[:k], float), np.array(ev[:k], float), np.array(ac[:k], float)
s = evm.latest(pv, ev, ac, bac)
return {"SPI": round(s["spi"], 3), "CPI": round(s["cpi"], 3),
"EAC_formulas": {m: round(v) for m, v in evm.all_eacs(pv, ev, ac, bac).items() if np.isfinite(v)},
"earned_schedule_finish_period": round(float(evm.forecast_finish(pv, ev, planned_finish)), 1)}
@app.api(name="forecast_series")
def forecast_series(which: str, horizon: int, ev: list, ac: list, bac: float, k: int) -> dict:
cum = np.array((ev if which == "ev" else ac)[:k], float)
fc = forecasting.timesfm_forecast(evm.to_increments(cum), int(horizon), device="cpu", forecast_context_len=FCL, bac=bac)
last = float(cum[-1])
paths, out = {}, {"horizon": int(horizon)}
for q in ("q10", "q50", "q90"):
c = last + np.cumsum(fc[q])
if which == "ev":
c = np.minimum(c, bac)
paths[q] = [round(float(x)) for x in c]
out["cumulative_paths"] = paths
if which == "ev":
out["reaches_bac_period"] = {
q: (k + int(np.argmax(np.array(paths[q]) >= 0.999 * bac)) + 1)
if (np.array(paths[q]) >= 0.999 * bac).any() else None for q in ("q10", "q50", "q90")}
return out
@app.api(name="assemble")
def assemble(project_name: str, ctx_pct: float, finish_period: float, eac: float, p_overrun: float,
ev_q10: list, ev_q50: list, ev_q90: list, horizon: int) -> dict:
"""Build the chart series + readout from the in-browser agent's forecast (keeps the EVM math
server-side / DRY). Called by browser-agent.js after the in-browser MiniCPM finishes."""
project = _project_from_name(project_name)
n = len(project.ev)
k = max(4, int(n * ctx_pct / 100.0))
last = float(project.ev[:k][-1])
fper = np.arange(k, k + int(horizon) + 1)
pad = lambda a: np.minimum(np.concatenate([[last], np.array(a, float)])[:len(fper)], project.bac) # noqa: E731
cum_lo, cum_q50, cum_hi = pad(ev_q10), pad(ev_q50), pad(ev_q90)
status = evm.latest(project.pv[:k], project.ev[:k], project.ac[:k], project.bac)
fin_likely = int(round(float(finish_period)))
eac_med, p_over = float(eac), float(p_overrun)
overrun = eac_med / project.bac - 1.0
fin_early = _finish(cum_hi, project.bac, k) or fin_likely
fin_late = _finish(cum_lo, project.bac, k) or fin_likely
cpi = status["cpi"] if status["cpi"] and status["cpi"] > 0 else 1.0
ac_fore = np.concatenate([[project.ac[:k][-1]], project.ac[:k][-1] + np.cumsum(np.diff(cum_q50)) / cpi])
eac_lo, eac_hi = min(eac_med, eac_med * (1 - abs(overrun) * .3)), max(eac_med, eac_med * (1 + abs(overrun) * .3))
series = _series(project, k, n, cum_lo, cum_q50, cum_hi, ac_fore, fper, has_truth=True)
readout = _readout(project, k, n, ctx_pct, status, fin_likely, fin_early, fin_late, eac_med,
eac_lo, eac_hi, overrun, p_over, k + len(fper) - 1, ENGINE_AGENT)
return {"series": series, "readout": readout,
"summary": _summary(project, k, status, fin_likely, eac_med, overrun, p_over)}
@app.api(name="forecast", concurrency_limit=2, time_limit=900)
def forecast(project_name: str, ctx_pct: float, engine: str) -> dict:
"""Streaming: heartbeat every ~1.5s (keeps the SSE alive on slow CPU) then the final result."""
import threading
import time
box = {}
def run():
try:
project = _project_from_name(project_name)
n = len(project.ev)
k = max(4, int(n * ctx_pct / 100.0))
if engine == ENGINE_AGENT and local_llm.is_available(): # server agent (both modes; webgpu fallback)
box["r"] = _agent(project, k, n, ctx_pct)
else:
box["r"] = _direct(project, k, n, ctx_pct, engine)
except Exception as e: # noqa: BLE001
box["err"] = f"{type(e).__name__}: {e}"
th = threading.Thread(target=run, daemon=True)
th.start()
t0 = time.time()
while th.is_alive():
time.sleep(1.5)
yield {"status": f"{engine} · {int(time.time() - t0)}s"}
th.join()
if "err" in box:
yield {"error": box["err"]}
return
yield {"done": True, **box["r"]}
@app.get("/", response_class=HTMLResponse)
async def homepage():
with open(os.path.join(HERE, "frontend", "index.html"), encoding="utf-8") as f:
html = f.read()
return html.replace("__SLIPSTREAM_MODE__", MODE).replace("__ONNX_REPO__", ONNX_REPO)
@app.get("/browser-agent.js")
async def browser_agent_js():
return FileResponse(os.path.join(HERE, "frontend", "browser-agent.js"), media_type="application/javascript")
@app.on_event("startup")
def _warm():
if os.environ.get("SLIPSTREAM_PREFETCH", "1") != "1":
return
try:
if local_llm.is_available(): # warm the server agent in both modes
local_llm.make_client()
forecasting.timesfm_forecast(np.arange(1, 13, dtype=float), 8, device="cpu", forecast_context_len=FCL)
print("[warm] models resident", flush=True)
except Exception as e:
print("[warm] skipped:", e, flush=True)
demo = app
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False)