physics-llm / app.py
AlexWortega's picture
Add /evaluate endpoint: numerical position MSE model vs Pymunk
970e1f2 unverified
"""Physics LLM — a Gradio-native demo.
Pick a physics scenario; a fine-tuned `LFM2-350M` (GGUF, Q4_K_M, run on CPU via
llama-cpp-python) autoregressively predicts the next frames in a structured text
format. Each predicted frame is parsed and rendered to a 2D canvas with
matplotlib, streamed live as the rollout proceeds.
The model and prompt format follow AlexWortega/PhysicsLLMEngine: the prompt is
the scene header + a few seed frames + "Predict next frame:", and the model
continues with "Frame N: …\n obj_i: pos=(x,y), vel=(vx,vy), a=…, av=…".
"""
from __future__ import annotations
import glob
import io
import json
import os
import re
import time
from pathlib import Path
def _preload_cuda() -> None:
"""ZeroGPU: the CUDA build of llama-cpp-python needs libcudart/libcublas on
the loader path at import time, but they aren't there by default. The pip
nvidia-* packages ship the .so's; preload them globally so `import llama_cpp`
succeeds. No-op off GPU / when the packages are absent."""
import ctypes
try:
import nvidia # noqa: F401
base = os.path.dirname(nvidia.__file__)
except Exception:
return
for sub in ("cuda_runtime", "cublas"):
for so in sorted(glob.glob(os.path.join(base, sub, "lib", "*.so*"))):
try:
ctypes.CDLL(so, mode=ctypes.RTLD_GLOBAL)
except OSError:
pass
_preload_cuda()
# Persist CUDA's PTX-JIT cache across requests (ZeroGPU frees the GPU between
# calls but the container/filesystem persist) so only the first cold request
# pays the ~40s kernel-compile cost.
os.environ.setdefault("CUDA_CACHE_PATH", "/tmp/cuda_jit_cache")
os.environ.setdefault("CUDA_CACHE_MAXSIZE", str(2 * 1024 * 1024 * 1024))
try:
import spaces
gpu = spaces.GPU
except Exception: # local / non-Spaces: make @gpu(...) a no-op
def gpu(*args, **kwargs):
if len(args) == 1 and callable(args[0]) and not kwargs:
return args[0]
return lambda f: f
import gradio as gr
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.patches import Circle, Rectangle
import numpy as np
from PIL import Image
HERE = Path(__file__).parent
EXAMPLES_DIR = HERE / "backend" / "examples"
GGUF_REPO = "AlexWortega/lfm2-scenarios-GGUF"
GGUF_FILE = "lfm2-scenarios-Q4_K_M.gguf"
# Model's native context = 8192. With full-context rollout we keep as many
# recent frames as fit; per-frame cost grows as the context fills (linear
# prompt-eval per step), so deep rollouts get slower toward the end.
N_CTX = 8192
N_THREADS = 2
CTX_MARGIN = 32 # tokens reserved beyond prompt + max_new
# -----------------------------------------------------------------------------
# Prompt format (ported from PhysicsLLMEngine/browser_demo/src/promptFormat.ts)
# -----------------------------------------------------------------------------
def _f(n: float, d: int) -> str:
return f"{n:.{d}f}"
def fmt_header(h: dict) -> str:
lines = [f"Scene: {h.get('description', '')}"]
g = h.get("gravity", {}) or {}
lines.append(f"Gravity: ({g.get('x', 0)}, {g.get('y', 0)})")
lines.append(f"Timestep: {_f(h.get('timestep', 0.01667), 5)}")
if h.get("scenario_type"):
lines.append(f"Type: {h['scenario_type']}")
if h.get("difficulty") is not None:
lines.append(f"Difficulty: {h['difficulty']}")
parts = []
for sg in h.get("static_geometry", []) or []:
if sg.get("type") == "segment":
p1, p2 = sg["p1"], sg["p2"]
parts.append(
f"seg ({round(p1['x'])},{round(p1['y'])})-({round(p2['x'])},{round(p2['y'])})"
)
elif sg.get("type") == "circle":
c = sg["center"]
parts.append(f"peg ({round(c['x'])},{round(c['y'])}) r={round(sg['radius'])}")
if parts:
lines.append("Static: " + "; ".join(parts))
constr = h.get("constraints", []) or []
if constr:
lines.append(
"Constraints: " + "; ".join(f"{c['type']} {c['body_a']}->{c['body_b']}" for c in constr)
)
lines.append("")
return "\n".join(lines)
def fmt_frame(fr: dict) -> str:
lines = [f"Frame {fr['frame']}: {fr.get('description', '')}"]
for o in fr["objects"]:
p = o["position"]
v = o.get("velocity", {"x": 0, "y": 0}) or {"x": 0, "y": 0}
a = o.get("angle", 0) or 0
av = o.get("angular_velocity", 0) or 0
s = f" obj_{o['id']}: pos=({_f(p['x'], 4)}, {_f(p['y'], 4)}), vel=({_f(v['x'], 4)}, {_f(v['y'], 4)})"
if abs(a) > 0.001 or abs(av) > 0.001:
s += f", a={_f(a, 4)}, av={_f(av, 4)}"
lines.append(s)
lines.append("")
return "\n".join(lines)
OBJ_RE = re.compile(
r"obj_(\d+):\s*pos=\(\s*(-?[\d.]+)\s*,\s*(-?[\d.]+)\s*\),\s*vel=\(\s*(-?[\d.]+)\s*,\s*(-?[\d.]+)\s*\)"
r"(?:,\s*a=(-?[\d.]+),\s*av=(-?[\d.]+))?"
)
# Anchor on line start (allowing indent) so the "Frame N:" embedded inside a
# frame's own description ("Frame 5: Frame 5: 7 of 11 …") is NOT treated as a
# new frame boundary — only the real header at the start of a line is.
FRAME_RE = re.compile(r"(?:^|\n)[ \t]*Frame\s+\d+:")
def split_first_frame(text: str) -> str:
m = list(FRAME_RE.finditer(text))
if not m:
return text
first_end = m[0].end()
second_start = m[1].start() if len(m) > 1 else len(text)
return text[first_end:second_start]
def emitted_description(text: str) -> str:
"""Pull the model's own description for the frame it just emitted — the
content after the first `Frame N:` on its starting line. Round-tripping
this back into context (instead of a synthetic "simulation in progress")
keeps the prompt in-distribution and avoids drift / lost collisions."""
m = re.search(r"^[ \t]*Frame\s+\d+:\s*(.*)", text, re.MULTILINE)
return m.group(1).rstrip() if m else ""
def parse_frame(text: str, n_obj: int) -> dict[int, dict]:
out: dict[int, dict] = {}
for m in OBJ_RE.finditer(text):
i = int(m.group(1))
if i < n_obj:
out[i] = {
"id": i,
"position": {"x": float(m.group(2)), "y": float(m.group(3))},
"velocity": {"x": float(m.group(4)), "y": float(m.group(5))},
"angle": float(m.group(6)) if m.group(6) else 0.0,
"angular_velocity": float(m.group(7)) if m.group(7) else 0.0,
}
return out
# -----------------------------------------------------------------------------
# Pymunk ground-truth rollout (the engine LFM2-scenarios was distilled from).
# Generates a deterministic Pymunk simulation from the same starting state the
# model gets, so we can render model vs Pymunk side-by-side per frame.
# -----------------------------------------------------------------------------
def pymunk_rollout(header: dict, seed_frame: dict, n_frames: int) -> list[dict]:
try:
import pymunk
except Exception as exc: # noqa: BLE001
print(f"[pymunk] unavailable: {exc}", flush=True)
return []
g = header.get("gravity", {}) or {}
dt = float(header.get("timestep") or (1.0 / 60.0))
space = pymunk.Space()
space.gravity = (float(g.get("x", 0.0)), float(g.get("y", 0.0)))
for sg in header.get("static_geometry", []) or []:
if sg.get("type") == "segment":
seg = pymunk.Segment(
space.static_body,
(sg["p1"]["x"], sg["p1"]["y"]),
(sg["p2"]["x"], sg["p2"]["y"]),
radius=1.0,
)
seg.friction = float(sg.get("friction", 0.5))
seg.elasticity = float(sg.get("elasticity", 0.5))
space.add(seg)
elif sg.get("type") == "circle":
peg = pymunk.Circle(
space.static_body,
float(sg.get("radius", 4)),
offset=(sg["center"]["x"], sg["center"]["y"]),
)
peg.friction = float(sg.get("friction", 0.5))
peg.elasticity = float(sg.get("elasticity", 0.5))
space.add(peg)
state_by_id = {o["id"]: o for o in (seed_frame.get("objects") or [])}
bodies: dict[int, tuple] = {}
for ho in header.get("objects", []) or []:
oid = ho["id"]
st = state_by_id.get(oid, {})
mat = ho.get("material", {}) or {}
mass = float(mat.get("mass", 1.0))
if ho["type"] == "circle":
r = float(ho.get("radius", 12))
moment = pymunk.moment_for_circle(mass, 0, r)
body = pymunk.Body(mass, moment)
shape = pymunk.Circle(body, r)
else:
w, h = float(ho.get("width", 20)), float(ho.get("height", 20))
moment = pymunk.moment_for_box(mass, (w, h))
body = pymunk.Body(mass, moment)
shape = pymunk.Poly.create_box(body, (w, h))
pos = st.get("position") or ho.get("position") or {"x": 0, "y": 0}
body.position = (float(pos.get("x", 0)), float(pos.get("y", 0)))
v = st.get("velocity") or {"x": 0, "y": 0}
body.velocity = (float(v.get("x", 0)), float(v.get("y", 0)))
body.angle = float(st.get("angle", 0) or 0)
body.angular_velocity = float(st.get("angular_velocity", 0) or 0)
shape.friction = float(mat.get("friction", 0.5))
shape.elasticity = float(mat.get("elasticity", 0.4))
space.add(body, shape)
bodies[oid] = (body, ho)
start_idx = int(seed_frame.get("frame", 0))
frames: list[dict] = []
for i in range(1, n_frames + 1):
space.step(dt)
objs = []
for oid, (body, meta) in sorted(bodies.items()):
objs.append({
"id": oid,
"type": meta["type"],
"position": {"x": float(body.position.x), "y": float(body.position.y)},
"velocity": {"x": float(body.velocity.x), "y": float(body.velocity.y)},
"angle": float(body.angle),
"angular_velocity": float(body.angular_velocity),
})
frames.append({"frame": start_idx + i, "description": f"Frame {start_idx+i}: pymunk", "objects": objs})
return frames
# -----------------------------------------------------------------------------
# Scenarios
# -----------------------------------------------------------------------------
def load_scenarios() -> dict[str, dict]:
out: dict[str, dict] = {}
for p in sorted(EXAMPLES_DIR.glob("*.jsonl")):
try:
lines = [ln for ln in p.read_text().splitlines() if ln.strip()]
header = json.loads(lines[0])
frames = [json.loads(ln) for ln in lines[1:] if ln.startswith("{")]
initial = frames[:4]
# ground_truth = the full Pymunk rollout (the dataset this LFM2 was
# distilled from). We render it side-by-side with the model's
# rollout so divergence is visible frame-by-frame.
out[p.stem] = {
"header": header,
"initial_frames": initial,
"ground_truth": frames,
}
except Exception as exc: # noqa: BLE001
print(f"[scenarios] skip {p.name}: {exc}", flush=True)
return out
SCENARIOS = load_scenarios()
HELD_OUT = {"pong", "bowling", "ramp_roll", "angry_birds", "hourglass", "newtons_cradle"}
# Curated demos that look good in this setup (kept to scenes where the model
# was trained and the rollout stays physically plausible for tens of frames).
# `bowling` / `newtons_cradle` are held-out so they're more of a stress test
# but they're iconic so we keep them.
# Vetted by running each through the live API + checking model=N/N with no
# 'held' (truncated) objects. Replaced 'dominos' (only emitted ~22/26 obj per
# frame, 3 frozen each step) with 'pyramid' (28/28 clean).
FEATURED = [s for s in (
"projectile", "pendulum", "billiards", "pyramid", "plinko", "orbit",
"bowling", "newtons_cradle",
) if s in SCENARIOS]
# -----------------------------------------------------------------------------
# Model (lazy)
# -----------------------------------------------------------------------------
def get_llm(log=lambda s: None):
# Built fresh each call: ZeroGPU frees the GPU between requests, so a cached
# GPU-resident model would be stale. The GGUF stays disk-cached, so only the
# (fast) load repeats.
from huggingface_hub import hf_hub_download
from llama_cpp import Llama
log("Fetching model (≈216 MB, cached after first run)…")
path = hf_hub_download(repo_id=GGUF_REPO, filename=GGUF_FILE)
# NB: prompt-lookup speculative decoding (the practical stand-in for EAGLE,
# which llama.cpp lacks) was tried but `draft_model` on this CUDA wheel
# fails with `llama_decode returned -1` — both with and without flash_attn.
# flash_attn alone works and is the win here (~+36% tok/s), so we use that
# with a plain fallback. Each config is validated with a real warmup decode.
base = dict(model_path=path, n_ctx=N_CTX, n_gpu_layers=-1, n_batch=1024,
n_threads=N_THREADS, n_threads_batch=N_THREADS, verbose=False)
warmup = "Frame 1:\n obj_0: pos=(1.0, 2.0), vel=(0.0, 0.0)\nFrame 2:"
for name, kw in (("flash_attn", dict(base, flash_attn=True)), ("plain", dict(base))):
try:
log(f"Loading LFM2-350M (GPU) · backend: {name}…")
llm = Llama(**kw)
# Warmup forces the ~40s CUDA PTX-JIT here (uniform per-frame timing
# later) and surfaces any decode-time incompatibility now. We also
# bump the batch size when constructing to speed prompt-eval over
# large contexts (n_batch=1024 is set in base; could go higher).
llm.create_completion(warmup, max_tokens=8)
log(f"Model ready · backend={name}")
return llm
except Exception as exc: # noqa: BLE001
log(f"backend {name} failed: {str(exc)[:90]}")
raise RuntimeError("no working llama.cpp backend found")
# -----------------------------------------------------------------------------
# Rendering
# -----------------------------------------------------------------------------
BG = "#0b0f17"
WALL = "#5b6677"
PEG = "#8a93a6"
PALETTE = ["#4ea1ff", "#ff7c5b", "#ffd166", "#06d6a0", "#c77dff", "#ff5dac",
"#7ee787", "#f78166", "#79c0ff", "#d2a8ff"]
def scene_bounds(header: dict) -> tuple[float, float, float, float]:
xs, ys = [], []
for o in header.get("objects", []):
xs.append(o["position"]["x"])
ys.append(o["position"]["y"])
for sg in header.get("static_geometry", []) or []:
if sg.get("type") == "segment":
xs += [sg["p1"]["x"], sg["p2"]["x"]]
ys += [sg["p1"]["y"], sg["p2"]["y"]]
elif sg.get("type") == "circle":
xs.append(sg["center"]["x"]); ys.append(sg["center"]["y"])
if not xs:
return 0, 800, 0, 600
pad = 40
return min(xs) - pad, max(xs) + pad, min(ys) - pad, max(ys) + pad
def render(header: dict, obj_map: dict[int, dict], bounds, title: str) -> Image.Image:
x0, x1, y0, y1 = bounds
meta = {o["id"]: o for o in header.get("objects", [])}
fig, ax = plt.subplots(figsize=(7.2, 5.4), dpi=100)
fig.patch.set_facecolor(BG)
ax.set_facecolor(BG)
ax.set_xlim(x0, x1); ax.set_ylim(y0, y1)
ax.set_aspect("equal"); ax.axis("off")
for sg in header.get("static_geometry", []) or []:
if sg.get("type") == "segment":
ax.plot([sg["p1"]["x"], sg["p2"]["x"]], [sg["p1"]["y"], sg["p2"]["y"]],
color=WALL, lw=3, solid_capstyle="round", zorder=1)
elif sg.get("type") == "circle":
ax.add_patch(Circle((sg["center"]["x"], sg["center"]["y"]), sg["radius"],
color=PEG, zorder=1))
for oid, o in sorted(obj_map.items()):
m = meta.get(oid, {})
p = o["position"]
color = PALETTE[oid % len(PALETTE)]
otype = m.get("type", "circle")
if otype == "circle":
r = m.get("radius", 12)
ax.add_patch(Circle((p["x"], p["y"]), r, color=color, ec="white", lw=0.6, zorder=3))
else:
w = m.get("width", 20); h = m.get("height", 20)
ang = np.degrees(o.get("angle", 0) or 0)
rect = Rectangle((p["x"] - w / 2, p["y"] - h / 2), w, h, color=color,
ec="white", lw=0.6, zorder=3)
t = (matplotlib.transforms.Affine2D()
.rotate_deg_around(p["x"], p["y"], ang) + ax.transData)
rect.set_transform(t)
ax.add_patch(rect)
ax.set_title(title, color="#c9d1d9", fontsize=11, loc="left", pad=8)
fig.tight_layout(pad=0.5)
buf = io.BytesIO()
fig.savefig(buf, format="png", facecolor=BG)
plt.close(fig)
buf.seek(0)
return Image.open(buf).convert("RGB")
# -----------------------------------------------------------------------------
# Prompt fitting (full context — pack as many recent frames as fit in N_CTX)
# -----------------------------------------------------------------------------
def _build_prompt(header: dict, frames: list[dict]) -> str:
ctx = fmt_header(header)
for fr in frames:
ctx += fmt_frame(fr)
return ctx + "Predict next frame:"
def fit_prompt(llm, header: dict, all_frames: list[dict], max_new: int) -> tuple[str, int]:
"""Pack header + as many recent frames as fit into N_CTX - max_new - margin."""
encode = lambda s: llm.tokenize(s.encode("utf-8"), add_bos=True) # noqa: E731
budget_tokens = N_CTX - max_new - CTX_MARGIN
keep = len(all_frames)
while keep > 0:
prompt = _build_prompt(header, all_frames[-keep:])
if len(encode(prompt)) <= budget_tokens:
return prompt, keep
keep -= 1
return _build_prompt(header, []), 0
# -----------------------------------------------------------------------------
# Scenario helpers (UI)
# -----------------------------------------------------------------------------
def scene_to_json(name: str) -> str:
sc = SCENARIOS.get(name)
if not sc:
return "{}"
return json.dumps(
{"header": sc["header"], "initial_frames": sc["initial_frames"]},
indent=2, ensure_ascii=False,
)
# -----------------------------------------------------------------------------
# Interactive canvas editor (Konva-in-iframe; gradio doesn't sanitize iframe
# srcdoc, so the JS reliably runs and can talk back to a hidden gr.Textbox).
# -----------------------------------------------------------------------------
_EDITOR_IFRAME = r"""<!doctype html><html><head><meta charset="utf-8"><style>
html,body{margin:0;padding:0;background:#0b0f17;color:#c9d1d9;font:13px/1.4 -apple-system,Segoe UI,sans-serif}
#bar{display:flex;gap:6px;padding:8px;background:#161b22;border-bottom:1px solid #30363d;flex-wrap:wrap;align-items:center}
#bar button{background:#21262d;color:#c9d1d9;border:1px solid #30363d;border-radius:6px;padding:6px 12px;cursor:pointer;font:inherit}
#bar button:hover{background:#30363d}
#bar button.on{background:#1f6feb;border-color:#1f6feb;color:#fff}
#bar .sp{flex:1}
#hint{color:#8b949e;font-size:12px;margin-right:8px}
#sync{font-size:12px;padding:4px 10px;border-radius:6px;background:#21262d;color:#8b949e}
#sync.ok{background:#1a4d2e;color:#7ee787}
#sync.err{background:#5a1e1e;color:#ff7b72}
#wrap{padding:8px;background:#0b0f17}
#stage{background:#0b0f17;border-radius:8px;display:block;margin:0 auto;cursor:default}
</style></head><body>
<div id="bar">
<button data-tool="drag" class="on">↖ Drag</button>
<button data-tool="ball">● + Ball</button>
<button data-tool="box">■ + Box</button>
<button data-tool="vel">✦ Velocity</button>
<button data-tool="del">✕ Delete</button>
<span id="hint">drag · click empty to add · in Velocity mode drag the red dot</span>
<div class="sp"></div>
<span id="sync">⏳ not synced</span>
</div>
<div id="wrap"><div id="stage"></div></div>
<script src="https://unpkg.com/konva@9/konva.min.js"></script>
<script>
(function(){
const SCENE = __SCENE__;
const W = Math.max(640, document.body.clientWidth - 16), H = 460;
const VEL_PIX = 0.2; // pixels per world-vel unit (vel=100 -> 20 px arrow)
function bounds(){
let xs=[], ys=[];
for (const o of (SCENE.header.objects||[])) { xs.push(o.position.x); ys.push(o.position.y); }
for (const sg of (SCENE.header.static_geometry||[])) {
if (sg.type==='segment'){ xs.push(sg.p1.x,sg.p2.x); ys.push(sg.p1.y,sg.p2.y); }
else if (sg.type==='circle'){ xs.push(sg.center.x); ys.push(sg.center.y); }
}
if (!xs.length) return {x0:0,x1:800,y0:0,y1:600};
const px=40, py=40;
return {x0:Math.min.apply(null,xs)-px, x1:Math.max.apply(null,xs)+px,
y0:Math.min.apply(null,ys)-py, y1:Math.max.apply(null,ys)+py};
}
const B = bounds();
const sc = Math.min(W/(B.x1-B.x0), H/(B.y1-B.y0));
const SW = (B.x1-B.x0)*sc, SH = (B.y1-B.y0)*sc;
function w2p(x,y){ return {x:(x-B.x0)*sc, y:(B.y1-y)*sc}; } // flip y
function p2w(x,y){ return {x:x/sc+B.x0, y:B.y1-y/sc}; }
const stage = new Konva.Stage({container:'stage', width:SW, height:SH});
const statL = new Konva.Layer({listening:false});
const objL = new Konva.Layer();
const velL = new Konva.Layer();
stage.add(statL); stage.add(objL); stage.add(velL);
const PALETTE = ['#4ea1ff','#ff7c5b','#ffd166','#06d6a0','#c77dff','#ff5dac','#7ee787','#f78166','#79c0ff','#d2a8ff'];
function getVel(id){
const fr0 = (SCENE.initial_frames && SCENE.initial_frames[0]) || {objects:[]};
const m = (fr0.objects||[]).find(function(o){return o.id===id});
return (m && m.velocity) ? m.velocity : {x:0, y:0};
}
function setVel(id, vx, vy){
if (!SCENE.initial_frames || !SCENE.initial_frames[0]) syncSeed();
const m = SCENE.initial_frames[0].objects.find(function(o){return o.id===id});
if (m) { m.velocity = {x:vx, y:vy}; }
}
function drawStatic(){
statL.destroyChildren();
for (const sg of (SCENE.header.static_geometry||[])) {
if (sg.type==='segment') {
const a=w2p(sg.p1.x,sg.p1.y), b=w2p(sg.p2.x,sg.p2.y);
statL.add(new Konva.Line({points:[a.x,a.y,b.x,b.y], stroke:'#5b6677', strokeWidth:3, lineCap:'round'}));
} else if (sg.type==='circle') {
const c=w2p(sg.center.x,sg.center.y);
statL.add(new Konva.Circle({x:c.x, y:c.y, radius:sg.radius*sc, fill:'#8a93a6'}));
}
}
statL.draw();
}
function drawObjects(){
objL.destroyChildren();
for (const o of (SCENE.header.objects||[])) {
const p = w2p(o.position.x, o.position.y);
const color = PALETTE[o.id % PALETTE.length];
let shape;
const dragOK = (tool==='drag');
if (o.type==='circle') {
shape = new Konva.Circle({x:p.x, y:p.y, radius:(o.radius||12)*sc,
fill:color, stroke:'#fff', strokeWidth:1, draggable:dragOK});
} else {
const w=(o.width||20)*sc, h=(o.height||20)*sc;
shape = new Konva.Rect({x:p.x-w/2, y:p.y-h/2, width:w, height:h,
fill:color, stroke:'#fff', strokeWidth:1, draggable:dragOK});
}
shape.attrs._id = o.id;
shape.on('dragend', function(){
let cx, cy;
if (this.className==='Circle'){ cx=this.x(); cy=this.y(); }
else { cx=this.x()+this.width()/2; cy=this.y()+this.height()/2; }
const wp = p2w(cx, cy);
const id = this.attrs._id;
const ho = SCENE.header.objects.find(function(x){return x.id===id});
if (ho) { ho.position.x = wp.x; ho.position.y = wp.y; }
syncSeed(); commit();
if (tool==='vel') drawVelocity();
});
shape.on('mouseenter', function(){
if (tool==='del') stage.container().style.cursor = 'not-allowed';
else if (tool==='drag') stage.container().style.cursor = 'move';
});
shape.on('mouseleave', function(){
stage.container().style.cursor = (tool==='ball'||tool==='box')?'crosshair':'default';
});
shape.on('click tap', function(){
if (tool==='del') {
const id = this.attrs._id;
SCENE.header.objects = SCENE.header.objects.filter(function(x){return x.id!==id});
SCENE.header.object_count = SCENE.header.objects.length;
syncSeed(); commit(); drawObjects(); drawVelocity();
}
});
objL.add(shape);
}
objL.draw();
}
function drawVelocity(){
velL.destroyChildren();
if (tool !== 'vel') { velL.draw(); return; }
for (const ho of (SCENE.header.objects||[])) {
const p = w2p(ho.position.x, ho.position.y);
const v = getVel(ho.id);
let tipX = p.x + v.x * VEL_PIX;
let tipY = p.y - v.y * VEL_PIX; // flip y for canvas
// ensure tip is always at least a tiny offset so the handle is grabbable
if (Math.abs(tipX-p.x)<6 && Math.abs(tipY-p.y)<6) { tipX = p.x + 24; tipY = p.y; setVel(ho.id, 24/VEL_PIX, 0); }
const arrow = new Konva.Arrow({
points:[p.x, p.y, tipX, tipY],
stroke:'#ffd166', fill:'#ffd166', strokeWidth:2,
pointerLength:8, pointerWidth:8, listening:false,
});
const handle = new Konva.Circle({
x:tipX, y:tipY, radius:7, fill:'#ff7c5b', stroke:'#fff', strokeWidth:1,
draggable:true,
});
handle.attrs._id = ho.id;
handle.attrs._anchor = p;
handle.attrs._arrow = arrow;
handle.on('dragmove', function(){
const a = this.attrs._anchor;
this.attrs._arrow.points([a.x, a.y, this.x(), this.y()]);
const vx = (this.x() - a.x) / VEL_PIX;
const vy = -(this.y() - a.y) / VEL_PIX;
setVel(this.attrs._id, vx, vy);
velL.batchDraw();
});
handle.on('dragend', function(){ commit(); });
velL.add(arrow); velL.add(handle);
}
velL.draw();
}
let tool = 'drag';
const btns = document.querySelectorAll('#bar [data-tool]');
btns.forEach(function(b){ b.addEventListener('click', function(){
tool = b.getAttribute('data-tool');
btns.forEach(function(x){ x.classList.toggle('on', x===b); });
stage.container().style.cursor = (tool==='ball'||tool==='box')?'crosshair':'default';
drawObjects(); // re-render to update draggable flag
drawVelocity(); // toggle velocity arrows
}); });
stage.on('click tap', function(e){
if (!(tool==='ball' || tool==='box')) return;
if (e.target !== stage && (!e.target.parent || e.target.parent !== statL)) return;
const pos = stage.getPointerPosition();
const wp = p2w(pos.x, pos.y);
const nextId = (SCENE.header.objects||[]).reduce(function(m,o){return Math.max(m,o.id)},-1)+1;
const mat = {mass:1.0, friction:0.5, elasticity:0.4};
const o = (tool==='ball')
? {id:nextId, type:'circle', position:wp, material:mat, radius:20}
: {id:nextId, type:'rectangle', position:wp, material:mat, width:30, height:30};
SCENE.header.objects.push(o);
SCENE.header.object_count = SCENE.header.objects.length;
syncSeed(); commit(); drawObjects(); drawVelocity();
});
function syncSeed(){
const fr0 = (SCENE.initial_frames && SCENE.initial_frames[0]) || {};
const oldById = {};
for (const o of (fr0.objects||[])) oldById[o.id] = o;
const newObjs = [];
for (const ho of (SCENE.header.objects||[])) {
const prev = oldById[ho.id] || {};
const e = {
id: ho.id, type: ho.type,
position: {x: ho.position.x, y: ho.position.y},
velocity: prev.velocity || {x:0, y:0},
angle: prev.angle || 0,
angular_velocity: prev.angular_velocity || 0,
material: ho.material || {mass:1, friction:0.5, elasticity:0.4},
};
if (ho.radius!==undefined) e.radius = ho.radius;
if (ho.width!==undefined) e.width = ho.width;
if (ho.height!==undefined) e.height = ho.height;
newObjs.push(e);
}
SCENE.initial_frames = [{frame:1, description:"Frame 1: edited scene.", objects:newObjs}];
}
const syncEl = document.getElementById('sync');
function setSync(state, msg){ syncEl.className = state; syncEl.textContent = msg; }
function findTextarea(){
const cands = [
'#ph-scene-state textarea',
'[id="ph-scene-state"] textarea',
'div[id*="ph-scene-state"] textarea',
'label[for*="ph-scene-state"] textarea',
];
for (const sel of cands) {
try {
const el = window.parent.document.querySelector(sel);
if (el) return el;
} catch(_) {}
}
return null;
}
function commit(){
try {
const ta = findTextarea();
if (!ta) { setSync('err', '✗ no textarea (gradio hidden component?)'); console.warn('[editor] textarea not found'); return; }
const payload = JSON.stringify(SCENE);
try {
const proto = window.parent.HTMLTextAreaElement.prototype;
const set = Object.getOwnPropertyDescriptor(proto, 'value').set;
set.call(ta, payload);
} catch(_) {
ta.value = payload;
}
ta.dispatchEvent(new Event('input', {bubbles:true}));
// verify the value actually stuck
setTimeout(function(){
const ok = ta.value === payload;
setSync(ok?'ok':'err', (ok?'✓ synced · ':'✗ not committed · ') + payload.length + ' bytes · ' + new Date().toLocaleTimeString());
}, 0);
console.log('[editor] committed', payload.length, 'bytes');
} catch(e) {
setSync('err', '✗ commit threw: ' + (e && e.message || e));
console.error('[editor] commit error', e);
}
}
drawStatic(); drawObjects(); drawVelocity(); commit();
})();
</script></body></html>"""
def editor_html(scene: dict) -> str:
"""Return a gr.HTML value: an iframe whose srcdoc contains the canvas editor
with the scene baked in as a JS literal."""
import html as _html
inner = _EDITOR_IFRAME.replace("__SCENE__", json.dumps(scene))
srcdoc = _html.escape(inner, quote=True)
return (
f'<iframe srcdoc="{srcdoc}" '
f'style="width:100%;height:560px;border:1px solid #30363d;border-radius:8px;background:#0b0f17"></iframe>'
)
def scene_loaded(name: str) -> tuple[str, str]:
"""Scenario.change/Reset.click → (new editor HTML, new hidden state JSON)."""
sc = SCENARIOS.get(name) or {"header": {}, "initial_frames": []}
bundle = {"header": sc["header"], "initial_frames": sc["initial_frames"]}
return editor_html(bundle), json.dumps(bundle)
# -----------------------------------------------------------------------------
# Numerical evaluation: model rollout vs Pymunk ground truth (position MSE).
# Exposed via the api_name="/evaluate" endpoint so we can benchmark featured
# demos from a script without scraping the UI.
# -----------------------------------------------------------------------------
@gpu(duration=300)
def evaluate(scene_json: str, scenario_name: str, n_frames: int):
bundle = json.loads(scene_json)
header = bundle["header"]
initial = bundle.get("initial_frames") or []
n_obj = (header.get("object_count")
or len(header.get("objects", []))
or (len(initial[0]["objects"]) if initial else 0))
x0, x1, y0, y1 = scene_bounds(header)
diag = ((x1 - x0) ** 2 + (y1 - y0) ** 2) ** 0.5
gt_frames = pymunk_rollout(header, initial[-1], int(n_frames))
gt_by_frame = {f["frame"]: f for f in gt_frames}
from llama_cpp import Llama # noqa: F401 (preload may be required)
llm = get_llm(lambda s: None)
budget = int(min(2000, n_obj * 36 + 100))
rolled: list[dict] = list(initial)
last_idx = initial[-1]["frame"] if initial else 0
per_frame: list[dict] = []
t0 = time.time()
for _ in range(int(n_frames)):
prompt, _ctx_frames = fit_prompt(llm, header, rolled, budget)
next_idx = last_idx + 2
stops = [f"Frame {next_idx+d}:" for d in range(0, 4)]
out = llm.create_completion(prompt, max_tokens=budget, temperature=0.0, top_p=0.95, stop=stops)
text = out["choices"][0]["text"]
parsed = parse_frame(split_first_frame(text), n_obj)
modeled = len(parsed)
prev_objs = {o["id"]: o for o in rolled[-1]["objects"]} if rolled else {}
new_objs = dict(parsed) if parsed else dict(prev_objs)
if modeled < n_obj:
for oid, o in prev_objs.items():
new_objs.setdefault(oid, o)
last_idx += 1
rolled.append({
"frame": last_idx,
"description": emitted_description(text) or f"Frame {last_idx}: simulation in progress.",
"objects": list(new_objs.values()),
})
gt = gt_by_frame.get(last_idx)
if gt:
gt_pos = {o["id"]: o["position"] for o in gt["objects"]}
errs = []
for oid, o in new_objs.items():
if oid in gt_pos:
dx = gt_pos[oid]["x"] - o["position"]["x"]
dy = gt_pos[oid]["y"] - o["position"]["y"]
errs.append((dx * dx + dy * dy) ** 0.5)
per_frame.append({
"frame": last_idx, "modeled": modeled,
"mean_dist": (sum(errs) / len(errs)) if errs else None,
"max_dist": max(errs) if errs else None,
})
valid = [p for p in per_frame if p["mean_dist"] is not None]
mean_dist = (sum(p["mean_dist"] for p in valid) / len(valid)) if valid else None
return json.dumps({
"scenario": scenario_name,
"n_obj": n_obj,
"scene_diag": diag,
"frames_done": len(per_frame),
"frames_held_avg": sum(n_obj - p["modeled"] for p in per_frame) / max(1, len(per_frame)),
"mean_dist": mean_dist,
"mean_dist_pct_diag": (mean_dist / diag * 100.0) if (mean_dist and diag) else None,
"elapsed": round(time.time() - t0, 2),
"per_frame": per_frame,
})
# -----------------------------------------------------------------------------
# Simulation (streamed)
# -----------------------------------------------------------------------------
@gpu(duration=300)
def simulate(scene_json: str, scenario_name: str, n_frames: int, temperature: float):
log_lines: list[str] = []
def log(s: str):
log_lines.append(s)
print("[sim]", s, flush=True)
try:
bundle = json.loads(scene_json)
header = bundle["header"]
initial = bundle.get("initial_frames") or []
except Exception as exc: # noqa: BLE001
yield None, None, None, f"Scene JSON parse error: {exc}"
return
n_obj = (header.get("object_count")
or len(header.get("objects", []))
or (len(initial[0]["objects"]) if initial else 0))
bounds = scene_bounds(header)
title = header.get("scenario_type") or header.get("description", "scene")[:32]
log(f"Scene: {title} · {n_obj} objects · {len(initial)} seed frames")
# Pymunk ground truth — generated fresh from THE ACTUAL scene state (same
# initial conditions the model gets), so the side-by-side comparison is
# apples-to-apples even after the user edits the canvas.
gt_by_frame: dict[int, dict] = {}
if initial:
gt_n = int(n_frames) + 1
t_gt = time.time()
try:
gt_frames = pymunk_rollout(header, initial[-1], gt_n)
for f in gt_frames:
gt_by_frame[f["frame"]] = f
log(f"Pymunk ground truth: {len(gt_by_frame)} frames in {time.time()-t_gt:.2f}s")
except Exception as exc: # noqa: BLE001
log(f"Pymunk rollout failed ({exc}); GT panel disabled")
rolled_frames: list[dict] = list(initial)
last_idx = initial[-1]["frame"] if initial else 0
gif_frames: list[Image.Image] = []
def render_model() -> Image.Image:
last = rolled_frames[-1] if rolled_frames else {"objects": []}
obj_map = {o["id"]: o for o in last.get("objects", [])}
return render(header, obj_map, bounds, f"Model · frame {last_idx}")
def render_truth() -> Image.Image | None:
gt = gt_by_frame.get(last_idx)
if not gt:
return None
obj_map = {o["id"]: o for o in gt.get("objects", [])}
return render(header, obj_map, bounds, f"Pymunk · frame {last_idx}")
if rolled_frames:
gif_frames.append(render_model())
yield gif_frames[-1], render_truth(), None, "Loading model…\n" + "\n".join(log_lines[-12:])
try:
llm = get_llm(log)
except Exception as exc: # noqa: BLE001
yield (gif_frames[-1] if gif_frames else None), None, None, f"Model load failed: {exc}"
return
# ≈ one object line per ~28 tokens (with angle/av fields). For dense
# scenes (30-40 objects) we need ~1300+ tokens for a single complete
# frame; a tight cap silently truncates the tail and the parser then
# holds those objects from the prior frame (user sees "frozen balls").
# Budget linearly with n_obj; cap so we don't waste GPU on huge scenes.
budget = int(min(2000, n_obj * 36 + 100))
t0 = time.time()
for step in range(int(n_frames)):
prompt, ctx_frames = fit_prompt(llm, header, rolled_frames, budget)
# Dynamic stop: the model emits "Frame K: ...\n obj_…\nFrame K+1:…".
# Stopping as soon as the *next* frame header starts saves the wasted
# generation that bleeds into frame K+1. Cover a few likely indices in
# case the model drifts by ±1.
next_idx = last_idx + 2
stops = [f"Frame {next_idx+d}:" for d in range(0, 4)]
try:
out = llm.create_completion(
prompt, max_tokens=budget,
temperature=max(0.0, float(temperature)), top_p=0.95,
stop=stops,
)
text = out["choices"][0]["text"]
except Exception: # noqa: BLE001
import traceback
tb = traceback.format_exc()
log(f"generation error at step {step+1}")
yield (gif_frames[-1] if gif_frames else None), None, None, "GENERATION ERROR:\n" + tb[-1500:]
return
parsed = parse_frame(split_first_frame(text), n_obj)
modeled = len(parsed)
new_objs = dict(parsed)
prev_objs = {o["id"]: o for o in rolled_frames[-1]["objects"]} if rolled_frames else {}
if not new_objs:
new_objs = dict(prev_objs)
elif modeled < n_obj:
for oid, o in prev_objs.items():
new_objs.setdefault(oid, o)
last_idx += 1
emitted = emitted_description(text)
# Training format: description literally starts with "Frame N:" — keep
# the model's own emitted text so the round-tripped context matches.
desc = emitted or f"Frame {last_idx}: simulation in progress."
rolled_frames.append({
"frame": last_idx,
"description": desc,
"objects": list(new_objs.values()),
})
elapsed = time.time() - t0
fps = (step + 1) / max(elapsed, 1e-3)
# Show modeled vs final separately: if modeled < n_obj, some objects
# were held from the previous frame (model didn't get to emit them).
held = n_obj - modeled
held_note = f" ({held} held)" if held > 0 else ""
log(f"step {step+1}/{int(n_frames)}: model={modeled}/{n_obj}{held_note} · ctx={ctx_frames}f · {elapsed:.1f}s · {fps:.2f} frame/s")
gif_frames.append(render_model())
status = f"Simulating… frame {step+1}/{int(n_frames)}\n" + "\n".join(log_lines[-12:])
yield gif_frames[-1], render_truth(), None, status
gif_path = None
if len(gif_frames) > 1:
gif_path = str(HERE / "rollout.gif")
gif_frames[0].save(gif_path, save_all=True, append_images=gif_frames[1:],
duration=60, loop=0)
log(f"Done — {len(gif_frames)} frames in {time.time()-t0:.1f}s")
yield gif_frames[-1], render_truth(), gif_path, "Done.\n" + "\n".join(log_lines[-12:])
# -----------------------------------------------------------------------------
# UI
# -----------------------------------------------------------------------------
_DEFAULT = "bowling" if "bowling" in SCENARIOS else (sorted(SCENARIOS)[0] if SCENARIOS else None)
_DEFAULT_BUNDLE = {
"header": SCENARIOS[_DEFAULT]["header"],
"initial_frames": SCENARIOS[_DEFAULT]["initial_frames"],
} if _DEFAULT else {"header": {}, "initial_frames": []}
with gr.Blocks(title="Physics LLM 🪀") as demo:
gr.Markdown(
"# Physics LLM 🪀\n"
"A fine-tuned **LFM2-350M** predicts 2D rigid-body physics frame-by-frame, "
"as text — no physics engine. Pick a preset, **drag the balls around or "
"drop new ones** on the canvas, then watch the model roll it forward live. "
"The model sees as much of the prior trajectory as fits its 8192-token "
"context.\n\n"
"Six scenarios (`pong`, `bowling`, `ramp_roll`, `angry_birds`, `hourglass`, "
"`newtons_cradle`) were **never seen in training**."
)
gr.Markdown(
"**✨ Featured demos** (model handles these cleanly) — click to load:"
)
featured = gr.Radio(
choices=FEATURED, value="bowling" if "bowling" in FEATURED else (FEATURED[0] if FEATURED else None),
label="", show_label=False,
)
with gr.Row():
scenario = gr.Dropdown(
choices=sorted(SCENARIOS.keys()), value=_DEFAULT,
label="All 30 scenarios", scale=4,
)
reset = gr.Button("Reset to preset", scale=1)
scene_html = gr.HTML(value=editor_html(_DEFAULT_BUNDLE), sanitize_html=False)
scene_state = gr.Textbox(
value=json.dumps(_DEFAULT_BUNDLE),
lines=2, max_lines=4,
label="Scene state (auto-synced from canvas; Simulate reads this)",
elem_id="ph-scene-state",
)
with gr.Row():
with gr.Column(scale=1):
n_frames = gr.Slider(5, 200, value=60, step=1, label="Frames to predict")
temperature = gr.Slider(0.0, 1.0, value=0.0, step=0.05,
label="Temperature (0 = greedy)")
run = gr.Button("▶ Simulate", variant="primary")
gif = gr.Image(label="Replay (animated, model)", type="filepath", height=200)
with gr.Column(scale=3):
with gr.Row():
view = gr.Image(label="Model prediction", height=380)
view_truth = gr.Image(label="Pymunk ground truth (distilled from)", height=380)
status = gr.Textbox(label="Log", lines=12, max_lines=12)
# Featured radio mirrors the dropdown + repaints the editor + state.
def _pick_featured(name):
html, state = scene_loaded(name)
return name, html, state
featured.change(_pick_featured, [featured], [scenario, scene_html, scene_state])
scenario.change(scene_loaded, [scenario], [scene_html, scene_state])
reset.click(scene_loaded, [scenario], [scene_html, scene_state])
run.click(simulate, [scene_state, scenario, n_frames, temperature],
[view, view_truth, gif, status])
with gr.Accordion("📊 Compute position MSE vs Pymunk (numerical)", open=False):
with gr.Row():
eval_frames = gr.Slider(5, 30, value=15, step=1, label="Frames to evaluate")
eval_btn = gr.Button("Run evaluation", scale=1)
eval_out = gr.Code(language="json", lines=12, label="Result")
eval_btn.click(evaluate, [scene_state, scenario, eval_frames], [eval_out])
if __name__ == "__main__":
demo.launch()